CDAP Common provides core common utilities and abstractions for the CDAP (Cask Data Application Platform) ecosystem including exception handling, service management, configuration, HTTP utilities, metadata management, security abstractions, discovery services, and various utility classes that are shared across CDAP components.
—
HTTP service building and request/response handling components for creating Netty-based HTTP services with authentication, body handling, and configuration management.
Components for building and configuring Netty-based HTTP services.
/**
* Builder for creating common Netty HTTP services
*/
public class CommonNettyHttpServiceBuilder {
public CommonNettyHttpServiceBuilder();
/**
* Set the host to bind to
*/
public CommonNettyHttpServiceBuilder setHost(String host);
/**
* Set the port to bind to
*/
public CommonNettyHttpServiceBuilder setPort(int port);
/**
* Set the number of boss threads
*/
public CommonNettyHttpServiceBuilder setBossThreadPoolSize(int bossThreads);
/**
* Set the number of worker threads
*/
public CommonNettyHttpServiceBuilder setWorkerThreadPoolSize(int workerThreads);
/**
* Set connection backlog size
*/
public CommonNettyHttpServiceBuilder setConnectionBacklog(int backlog);
/**
* Enable/disable HTTP compression
*/
public CommonNettyHttpServiceBuilder enableCompression(boolean compress);
/**
* Set maximum content length
*/
public CommonNettyHttpServiceBuilder setMaxContentLength(int maxContentLength);
/**
* Add HTTP handlers
*/
public CommonNettyHttpServiceBuilder addHttpHandlers(Iterable<? extends HttpHandler> handlers);
/**
* Build the HTTP service
*/
public NettyHttpService build();
}
/**
* Factory for creating HTTP services
*/
public class CommonNettyHttpServiceFactory {
/**
* Create a new HTTP service builder
*/
public static CommonNettyHttpServiceBuilder builder();
/**
* Create HTTP service with default configuration
*/
public static NettyHttpService create(String host, int port,
Iterable<? extends HttpHandler> handlers);
}Usage Examples:
import io.cdap.cdap.common.http.*;
import co.cask.http.HttpHandler;
import co.cask.http.NettyHttpService;
// Build HTTP service with custom configuration
NettyHttpService httpService = CommonNettyHttpServiceFactory.builder()
.setHost("0.0.0.0")
.setPort(8080)
.setBossThreadPoolSize(1)
.setWorkerThreadPoolSize(10)
.setConnectionBacklog(1000)
.enableCompression(true)
.setMaxContentLength(10 * 1024 * 1024) // 10MB
.addHttpHandlers(Arrays.asList(new MyHttpHandler()))
.build();
// Start the service
httpService.startAsync().awaitRunning();
System.out.println("HTTP service started on port: " + httpService.getBindAddress().getPort());
// Stop the service
httpService.stopAsync().awaitTerminated();
// Simple service creation
NettyHttpService simpleService = CommonNettyHttpServiceFactory.create(
"localhost", 9090, Arrays.asList(new PingHandler())
);Components for handling HTTP request/response bodies and content processing.
/**
* Abstract base class for consuming HTTP request bodies
*/
public abstract class AbstractBodyConsumer extends BodyConsumer {
protected AbstractBodyConsumer();
/**
* Handle incoming chunk of data
*/
@Override
public void chunk(ChannelBuffer content, HttpResponder responder);
/**
* Handle end of request body
*/
@Override
public void finished(HttpResponder responder);
/**
* Handle error during body consumption
*/
@Override
public void handleError(Throwable cause);
}
/**
* Body consumer that can spill to disk when memory limits are exceeded
*/
public class SpillableBodyConsumer extends AbstractBodyConsumer {
public SpillableBodyConsumer(File spillDir, long memoryLimit);
/**
* Get the consumed content as input stream
*/
public InputStream getInputStream() throws IOException;
/**
* Get the total content length
*/
public long getContentLength();
/**
* Clean up resources
*/
public void close() throws IOException;
}
/**
* Body producer that reads from a location (file system, etc.)
*/
public class LocationBodyProducer extends BodyProducer {
public LocationBodyProducer(Location location);
@Override
public ByteBuf nextChunk() throws Exception;
@Override
public void finished() throws Exception;
@Override
public void handleError(Throwable cause);
}Configuration classes for HTTP services and client requests.
/**
* Default HTTP request configuration
*/
public class DefaultHttpRequestConfig implements HttpRequestConfig {
public DefaultHttpRequestConfig();
public DefaultHttpRequestConfig(int connectTimeout, int readTimeout);
@Override
public int getConnectTimeout();
@Override
public int getReadTimeout();
@Override
public boolean isVerifySSLCert();
@Override
public KeyStore getKeyStore();
@Override
public String getKeyStorePassword();
@Override
public KeyStore getTrustStore();
@Override
public String getTrustStorePassword();
}
/**
* HTTP status code utilities
*/
public class HttpCodes {
public static final int OK = 200;
public static final int CREATED = 201;
public static final int ACCEPTED = 202;
public static final int NO_CONTENT = 204;
public static final int BAD_REQUEST = 400;
public static final int UNAUTHORIZED = 401;
public static final int FORBIDDEN = 403;
public static final int NOT_FOUND = 404;
public static final int CONFLICT = 409;
public static final int INTERNAL_SERVER_ERROR = 500;
/**
* Check if status code indicates success (2xx)
*/
public static boolean isSuccess(int statusCode);
/**
* Check if status code indicates client error (4xx)
*/
public static boolean isClientError(int statusCode);
/**
* Check if status code indicates server error (5xx)
*/
public static boolean isServerError(int statusCode);
}HTTP authentication and security components.
/**
* Channel handler for HTTP authentication
*/
public class AuthenticationChannelHandler extends SimpleChannelInboundHandler<HttpRequest> {
public AuthenticationChannelHandler(AuthenticationContext authContext);
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest request) throws Exception;
/**
* Authenticate the HTTP request
*/
protected boolean authenticate(HttpRequest request, ChannelHandlerContext ctx);
/**
* Handle authentication failure
*/
protected void handleAuthFailure(ChannelHandlerContext ctx, String reason);
}Advanced Usage Examples:
import io.cdap.cdap.common.http.*;
import co.cask.http.HttpHandler;
// Custom HTTP handler with body processing
@Path("/api/v1")
public class DataUploadHandler extends AbstractHttpHandler {
@POST
@Path("/upload")
public void uploadData(HttpRequest request, HttpResponder responder) throws Exception {
// Use spillable body consumer for large uploads
SpillableBodyConsumer bodyConsumer = new SpillableBodyConsumer(
new File("/tmp/uploads"),
5 * 1024 * 1024 // 5MB memory limit
);
request.getContent().readBytes(bodyConsumer);
try (InputStream inputStream = bodyConsumer.getInputStream()) {
// Process the uploaded data
processUploadedData(inputStream);
responder.sendStatus(HttpCodes.ACCEPTED);
} finally {
bodyConsumer.close();
}
}
@GET
@Path("/download/{fileId}")
public void downloadFile(@PathParam("fileId") String fileId,
HttpRequest request, HttpResponder responder) throws Exception {
Location fileLocation = fileStore.getLocation(fileId);
if (fileLocation == null || !fileLocation.exists()) {
responder.sendStatus(HttpCodes.NOT_FOUND);
return;
}
// Use location body producer for file downloads
LocationBodyProducer bodyProducer = new LocationBodyProducer(fileLocation);
responder.sendContent(HttpCodes.OK, bodyProducer,
ImmutableMultimap.of("Content-Type", "application/octet-stream"));
}
}
// HTTP client with custom configuration
public class ApiClient {
private final HttpRequestConfig requestConfig;
public ApiClient(int connectTimeoutMs, int readTimeoutMs) {
this.requestConfig = new DefaultHttpRequestConfig(connectTimeoutMs, readTimeoutMs);
}
public String makeRequest(String url) throws IOException {
HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
connection.setConnectTimeout(requestConfig.getConnectTimeout());
connection.setReadTimeout(requestConfig.getReadTimeout());
int responseCode = connection.getResponseCode();
if (HttpCodes.isSuccess(responseCode)) {
try (InputStream inputStream = connection.getInputStream()) {
return IOUtils.toString(inputStream, StandardCharsets.UTF_8);
}
} else if (HttpCodes.isClientError(responseCode)) {
throw new IOException("Client error: " + responseCode);
} else if (HttpCodes.isServerError(responseCode)) {
throw new IOException("Server error: " + responseCode);
}
throw new IOException("Unexpected response code: " + responseCode);
}
}
// Authenticated HTTP service
@Path("/secure")
public class SecureHandler extends AbstractHttpHandler {
@GET
@Path("/data")
public void getSecureData(HttpRequest request, HttpResponder responder,
@Context AuthenticationContext authContext) throws Exception {
if (authContext.getPrincipal() == null) {
responder.sendStatus(HttpCodes.UNAUTHORIZED);
return;
}
// Process authenticated request
String userData = fetchUserData(authContext.getPrincipal().getName());
responder.sendJson(HttpCodes.OK, userData);
}
}
// Service with authentication
public class AuthenticatedService {
public NettyHttpService createService(AuthenticationContext authContext) {
AuthenticationChannelHandler authHandler =
new AuthenticationChannelHandler(authContext);
return CommonNettyHttpServiceFactory.builder()
.setHost("0.0.0.0")
.setPort(8443)
.addChannelHandler(authHandler)
.addHttpHandlers(Arrays.asList(new SecureHandler()))
.build();
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-cdap-cdap--cdap-common