Core application programming interface for the Cask Data Application Platform enabling development of scalable data processing applications on Hadoop ecosystems.
—
Service programs in CDAP provide long-running services including HTTP APIs, custom protocols, and background processing services with built-in lifecycle management and resource allocation.
public interface Service extends ProgramLifecycle<ServiceContext> {
void configure(ServiceConfigurer configurer);
}Base interface for service programs providing configuration and lifecycle hooks.
public abstract class AbstractService implements Service {
public abstract void configure(ServiceConfigurer configurer);
@Override
public void initialize(ServiceContext context) throws Exception {
// Optional initialization logic
}
@Override
public void destroy() {
// Optional cleanup logic
}
}Base implementation class for service programs.
public class BasicService extends AbstractService {
private final HttpServiceHandler[] handlers;
public BasicService(HttpServiceHandler... handlers);
public BasicService(String name, HttpServiceHandler... handlers);
@Override
public void configure(ServiceConfigurer configurer);
}Simple service implementation for HTTP handlers.
public interface HttpServiceHandler {
void initialize(HttpServiceContext context) throws Exception;
void destroy();
}Base interface for HTTP request handlers.
public abstract class AbstractHttpServiceHandler implements HttpServiceHandler {
@Override
public void initialize(HttpServiceContext context) throws Exception {
// Optional initialization
}
@Override
public void destroy() {
// Optional cleanup
}
}Base implementation for HTTP service handlers with annotation-based request mapping.
public class HttpServiceRequest {
public String getMethod();
public String getUri();
public Map<String, List<String>> getAllHeaders();
public String getHeader(String name);
public Map<String, String> getHeaders();
public ByteBuffer getContent();
public String getContentAsString();
public <T> T getContent(Type type);
}
public interface HttpServiceResponder {
void sendJson(int status, Object object);
void sendJson(Object object);
void sendString(int status, String data, Charset charset);
void sendString(String data);
void sendStatus(int status);
void sendError(int status, String errorMessage);
void send(int status, ByteBuffer content, String contentType);
void send(ByteBuffer content, String contentType);
ChunkResponder sendChunkStart(int status, Map<String, String> headers);
}HTTP request and response handling interfaces.
public interface HttpContentConsumer {
void onReceived(ByteBuffer chunk, Transactional transactional) throws Exception;
void onFinish(HttpServiceResponder responder) throws Exception;
void onError(HttpServiceResponder responder, Throwable failureCause);
}
public interface HttpContentProducer {
ByteBuffer nextChunk(Transactional transactional) throws Exception;
void onFinish() throws Exception;
void onError(Throwable failureCause);
}Interfaces for handling large request/response bodies with streaming support.
public interface ServiceConfigurer extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {
void addHandler(HttpServiceHandler handler);
void setInstances(int instances);
void setResources(Resources resources);
}Interface for configuring service programs.
public interface HttpServiceConfigurer {
void setName(String name);
void setDescription(String description);
void setProperties(Map<String, String> properties);
void setResources(Resources resources);
}Configuration interface for HTTP service handlers.
public interface ServiceContext extends RuntimeContext, DatasetContext, ServiceDiscoverer {
int getInstanceCount();
int getInstanceId();
Map<String, String> getRuntimeArguments();
PluginContext getPluginContext();
Metrics getMetrics();
Admin getAdmin();
}Runtime context for service programs.
public interface HttpServiceContext extends ServiceContext {
HttpServiceHandlerSpecification getSpecification();
int getInstanceCount();
int getInstanceId();
}HTTP-specific service context providing handler metadata.
public class UserService extends AbstractService {
@Override
public void configure(ServiceConfigurer configurer) {
configurer.setName("UserService");
configurer.setDescription("REST API for user management");
configurer.addHandler(new UserHandler());
configurer.setInstances(2);
configurer.setResources(new Resources(1024)); // 1GB memory
configurer.useDataset("users");
}
}
public class UserHandler extends AbstractHttpServiceHandler {
@UseDataSet("users")
private ObjectStore<User> userStore;
@GET
@Path("/users/{id}")
public void getUser(HttpServiceRequest request, HttpServiceResponder responder,
@PathParam("id") String userId) {
try {
User user = userStore.read(userId);
if (user != null) {
responder.sendJson(user);
} else {
responder.sendError(404, "User not found");
}
} catch (Exception e) {
responder.sendError(500, "Internal server error");
}
}
@POST
@Path("/users")
public void createUser(HttpServiceRequest request, HttpServiceResponder responder) {
try {
User user = request.getContent(User.class);
userStore.write(user.getId(), user);
responder.sendJson(201, user);
} catch (Exception e) {
responder.sendError(400, "Invalid user data");
}
}
@PUT
@Path("/users/{id}")
public void updateUser(HttpServiceRequest request, HttpServiceResponder responder,
@PathParam("id") String userId) {
try {
User existingUser = userStore.read(userId);
if (existingUser == null) {
responder.sendError(404, "User not found");
return;
}
User updatedUser = request.getContent(User.class);
updatedUser.setId(userId);
userStore.write(userId, updatedUser);
responder.sendJson(updatedUser);
} catch (Exception e) {
responder.sendError(500, "Update failed");
}
}
@DELETE
@Path("/users/{id}")
public void deleteUser(HttpServiceRequest request, HttpServiceResponder responder,
@PathParam("id") String userId) {
try {
User user = userStore.read(userId);
if (user != null) {
userStore.delete(userId);
responder.sendStatus(204);
} else {
responder.sendError(404, "User not found");
}
} catch (Exception e) {
responder.sendError(500, "Delete failed");
}
}
}public class FileUploadHandler extends AbstractHttpServiceHandler {
@UseDataSet("files")
private FileSet fileStorage;
@POST
@Path("/upload/{filename}")
public HttpContentConsumer upload(@PathParam("filename") String filename) {
return new HttpContentConsumer() {
private OutputStream outputStream;
private Location fileLocation;
@Override
public void onReceived(ByteBuffer chunk, Transactional transactional) throws Exception {
if (outputStream == null) {
fileLocation = fileStorage.getLocation(filename);
outputStream = fileLocation.getOutputStream();
}
byte[] bytes = new byte[chunk.remaining()];
chunk.get(bytes);
outputStream.write(bytes);
}
@Override
public void onFinish(HttpServiceResponder responder) throws Exception {
if (outputStream != null) {
outputStream.close();
}
responder.sendJson(200, Collections.singletonMap("status", "uploaded"));
}
@Override
public void onError(HttpServiceResponder responder, Throwable failureCause) {
try {
if (outputStream != null) {
outputStream.close();
}
if (fileLocation != null) {
fileLocation.delete();
}
} catch (IOException e) {
// Log error
}
responder.sendError(500, "Upload failed: " + failureCause.getMessage());
}
};
}
@GET
@Path("/download/{filename}")
public HttpContentProducer download(@PathParam("filename") String filename,
HttpServiceResponder responder) throws IOException {
Location fileLocation = fileStorage.getLocation(filename);
if (!fileLocation.exists()) {
responder.sendError(404, "File not found");
return null;
}
return new HttpContentProducer() {
private InputStream inputStream = fileLocation.getInputStream();
private final byte[] buffer = new byte[8192];
@Override
public ByteBuffer nextChunk(Transactional transactional) throws Exception {
int bytesRead = inputStream.read(buffer);
if (bytesRead == -1) {
return null; // End of file
}
return ByteBuffer.wrap(buffer, 0, bytesRead);
}
@Override
public void onFinish() throws Exception {
inputStream.close();
}
@Override
public void onError(Throwable failureCause) {
try {
inputStream.close();
} catch (IOException e) {
// Log error
}
}
};
}
}public class ProcessingService extends AbstractService {
@Override
public void configure(ServiceConfigurer configurer) {
configurer.setName("ProcessingService");
configurer.addHandler(new ProcessingHandler());
// Use transformation plugin
configurer.usePlugin("transform", "dataTransform", "transformer",
PluginProperties.builder()
.add("operation", "normalize")
.build());
}
}
public class ProcessingHandler extends AbstractHttpServiceHandler {
@UseDataSet("inputData")
private ObjectStore<DataRecord> inputStore;
@UseDataSet("outputData")
private ObjectStore<DataRecord> outputStore;
private DataTransformer transformer;
@Override
public void initialize(HttpServiceContext context) throws Exception {
super.initialize(context);
transformer = context.getPluginContext().newPluginInstance("transformer");
}
@POST
@Path("/process/{recordId}")
public void processRecord(HttpServiceRequest request, HttpServiceResponder responder,
@PathParam("recordId") String recordId) {
try {
DataRecord input = inputStore.read(recordId);
if (input == null) {
responder.sendError(404, "Record not found");
return;
}
DataRecord processed = transformer.transform(input);
outputStore.write(recordId, processed);
responder.sendJson(Collections.singletonMap("status", "processed"));
} catch (Exception e) {
responder.sendError(500, "Processing failed: " + e.getMessage());
}
}
}public class APIGatewayService extends AbstractService {
@Override
public void configure(ServiceConfigurer configurer) {
configurer.setName("APIGateway");
configurer.setDescription("Unified API gateway service");
// Add multiple handlers for different API areas
configurer.addHandler(new UserHandler());
configurer.addHandler(new ProductHandler());
configurer.addHandler(new OrderHandler());
configurer.addHandler(new AuthHandler());
configurer.setInstances(3);
configurer.setResources(new Resources(2048, 2)); // 2GB, 2 cores
// Use common datasets
configurer.useDataset("users");
configurer.useDataset("products");
configurer.useDataset("orders");
configurer.useDataset("sessions");
}
}Service programs provide the foundation for building REST APIs, real-time data processing endpoints, and custom protocol handlers within the CDAP platform.
Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-api