CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-api

Core application programming interface for the Cask Data Application Platform enabling development of scalable data processing applications on Hadoop ecosystems.

Pending
Overview
Eval results
Files

service-programs.mddocs/

Service Programs

Service programs in CDAP provide long-running services including HTTP APIs, custom protocols, and background processing services with built-in lifecycle management and resource allocation.

Core Service Interfaces

Service

public interface Service extends ProgramLifecycle<ServiceContext> {
    void configure(ServiceConfigurer configurer);
}

Base interface for service programs providing configuration and lifecycle hooks.

AbstractService

public abstract class AbstractService implements Service {
    public abstract void configure(ServiceConfigurer configurer);
    
    @Override
    public void initialize(ServiceContext context) throws Exception {
        // Optional initialization logic
    }
    
    @Override
    public void destroy() {
        // Optional cleanup logic
    }
}

Base implementation class for service programs.

BasicService

public class BasicService extends AbstractService {
    private final HttpServiceHandler[] handlers;
    
    public BasicService(HttpServiceHandler... handlers);
    public BasicService(String name, HttpServiceHandler... handlers);
    
    @Override
    public void configure(ServiceConfigurer configurer);
}

Simple service implementation for HTTP handlers.

HTTP Services

HttpServiceHandler

public interface HttpServiceHandler {
    void initialize(HttpServiceContext context) throws Exception;
    void destroy();
}

Base interface for HTTP request handlers.

AbstractHttpServiceHandler

public abstract class AbstractHttpServiceHandler implements HttpServiceHandler {
    @Override
    public void initialize(HttpServiceContext context) throws Exception {
        // Optional initialization
    }
    
    @Override
    public void destroy() {
        // Optional cleanup
    }
}

Base implementation for HTTP service handlers with annotation-based request mapping.

HTTP Request and Response

public class HttpServiceRequest {
    public String getMethod();
    public String getUri();
    public Map<String, List<String>> getAllHeaders();
    public String getHeader(String name);
    public Map<String, String> getHeaders();
    
    public ByteBuffer getContent();
    public String getContentAsString();
    public <T> T getContent(Type type);
}

public interface HttpServiceResponder {
    void sendJson(int status, Object object);
    void sendJson(Object object);
    void sendString(int status, String data, Charset charset);
    void sendString(String data);
    void sendStatus(int status);
    void sendError(int status, String errorMessage);
    
    void send(int status, ByteBuffer content, String contentType);
    void send(ByteBuffer content, String contentType);
    
    ChunkResponder sendChunkStart(int status, Map<String, String> headers);
}

HTTP request and response handling interfaces.

Content Handling

public interface HttpContentConsumer {
    void onReceived(ByteBuffer chunk, Transactional transactional) throws Exception;
    void onFinish(HttpServiceResponder responder) throws Exception;
    void onError(HttpServiceResponder responder, Throwable failureCause);
}

public interface HttpContentProducer {
    ByteBuffer nextChunk(Transactional transactional) throws Exception;
    void onFinish() throws Exception;
    void onError(Throwable failureCause);
}

Interfaces for handling large request/response bodies with streaming support.

Service Configuration

ServiceConfigurer

public interface ServiceConfigurer extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {
    void addHandler(HttpServiceHandler handler);
    void setInstances(int instances);
    void setResources(Resources resources);
}

Interface for configuring service programs.

HttpServiceConfigurer

public interface HttpServiceConfigurer {
    void setName(String name);
    void setDescription(String description);
    void setProperties(Map<String, String> properties);
    void setResources(Resources resources);
}

Configuration interface for HTTP service handlers.

Service Context

ServiceContext

public interface ServiceContext extends RuntimeContext, DatasetContext, ServiceDiscoverer {
    int getInstanceCount();
    int getInstanceId();
    Map<String, String> getRuntimeArguments();
    
    PluginContext getPluginContext();
    Metrics getMetrics();
    Admin getAdmin();
}

Runtime context for service programs.

HttpServiceContext

public interface HttpServiceContext extends ServiceContext {
    HttpServiceHandlerSpecification getSpecification();
    int getInstanceCount();
    int getInstanceId();
}

HTTP-specific service context providing handler metadata.

Usage Examples

Basic HTTP Service

public class UserService extends AbstractService {
    
    @Override
    public void configure(ServiceConfigurer configurer) {
        configurer.setName("UserService");
        configurer.setDescription("REST API for user management");
        configurer.addHandler(new UserHandler());
        configurer.setInstances(2);
        configurer.setResources(new Resources(1024)); // 1GB memory
        configurer.useDataset("users");
    }
}

public class UserHandler extends AbstractHttpServiceHandler {
    
    @UseDataSet("users")
    private ObjectStore<User> userStore;
    
    @GET
    @Path("/users/{id}")
    public void getUser(HttpServiceRequest request, HttpServiceResponder responder,
                       @PathParam("id") String userId) {
        
        try {
            User user = userStore.read(userId);
            if (user != null) {
                responder.sendJson(user);
            } else {
                responder.sendError(404, "User not found");
            }
        } catch (Exception e) {
            responder.sendError(500, "Internal server error");
        }
    }
    
    @POST
    @Path("/users")
    public void createUser(HttpServiceRequest request, HttpServiceResponder responder) {
        
        try {
            User user = request.getContent(User.class);
            userStore.write(user.getId(), user);
            responder.sendJson(201, user);
        } catch (Exception e) {
            responder.sendError(400, "Invalid user data");
        }
    }
    
    @PUT
    @Path("/users/{id}")
    public void updateUser(HttpServiceRequest request, HttpServiceResponder responder,
                          @PathParam("id") String userId) {
        
        try {
            User existingUser = userStore.read(userId);
            if (existingUser == null) {
                responder.sendError(404, "User not found");
                return;
            }
            
            User updatedUser = request.getContent(User.class);
            updatedUser.setId(userId);
            userStore.write(userId, updatedUser);
            responder.sendJson(updatedUser);
        } catch (Exception e) {
            responder.sendError(500, "Update failed");
        }
    }
    
    @DELETE
    @Path("/users/{id}")
    public void deleteUser(HttpServiceRequest request, HttpServiceResponder responder,
                          @PathParam("id") String userId) {
        
        try {
            User user = userStore.read(userId);
            if (user != null) {
                userStore.delete(userId);
                responder.sendStatus(204);
            } else {
                responder.sendError(404, "User not found");
            }
        } catch (Exception e) {
            responder.sendError(500, "Delete failed");
        }
    }
}

Service with Large Content Handling

public class FileUploadHandler extends AbstractHttpServiceHandler {
    
    @UseDataSet("files")
    private FileSet fileStorage;
    
    @POST
    @Path("/upload/{filename}")
    public HttpContentConsumer upload(@PathParam("filename") String filename) {
        
        return new HttpContentConsumer() {
            private OutputStream outputStream;
            private Location fileLocation;
            
            @Override
            public void onReceived(ByteBuffer chunk, Transactional transactional) throws Exception {
                if (outputStream == null) {
                    fileLocation = fileStorage.getLocation(filename);
                    outputStream = fileLocation.getOutputStream();
                }
                
                byte[] bytes = new byte[chunk.remaining()];
                chunk.get(bytes);
                outputStream.write(bytes);
            }
            
            @Override
            public void onFinish(HttpServiceResponder responder) throws Exception {
                if (outputStream != null) {
                    outputStream.close();
                }
                
                responder.sendJson(200, Collections.singletonMap("status", "uploaded"));
            }
            
            @Override
            public void onError(HttpServiceResponder responder, Throwable failureCause) {
                try {
                    if (outputStream != null) {
                        outputStream.close();
                    }
                    if (fileLocation != null) {
                        fileLocation.delete();
                    }
                } catch (IOException e) {
                    // Log error
                }
                
                responder.sendError(500, "Upload failed: " + failureCause.getMessage());
            }
        };
    }
    
    @GET
    @Path("/download/{filename}")
    public HttpContentProducer download(@PathParam("filename") String filename,
                                       HttpServiceResponder responder) throws IOException {
        
        Location fileLocation = fileStorage.getLocation(filename);
        if (!fileLocation.exists()) {
            responder.sendError(404, "File not found");
            return null;
        }
        
        return new HttpContentProducer() {
            private InputStream inputStream = fileLocation.getInputStream();
            private final byte[] buffer = new byte[8192];
            
            @Override
            public ByteBuffer nextChunk(Transactional transactional) throws Exception {
                int bytesRead = inputStream.read(buffer);
                if (bytesRead == -1) {
                    return null; // End of file
                }
                
                return ByteBuffer.wrap(buffer, 0, bytesRead);
            }
            
            @Override
            public void onFinish() throws Exception {
                inputStream.close();
            }
            
            @Override
            public void onError(Throwable failureCause) {
                try {
                    inputStream.close();
                } catch (IOException e) {
                    // Log error
                }
            }
        };
    }
}

Service with Plugin Integration

public class ProcessingService extends AbstractService {
    
    @Override
    public void configure(ServiceConfigurer configurer) {
        configurer.setName("ProcessingService");
        configurer.addHandler(new ProcessingHandler());
        
        // Use transformation plugin
        configurer.usePlugin("transform", "dataTransform", "transformer",
                            PluginProperties.builder()
                                .add("operation", "normalize")
                                .build());
    }
}

public class ProcessingHandler extends AbstractHttpServiceHandler {
    
    @UseDataSet("inputData")
    private ObjectStore<DataRecord> inputStore;
    
    @UseDataSet("outputData")
    private ObjectStore<DataRecord> outputStore;
    
    private DataTransformer transformer;
    
    @Override
    public void initialize(HttpServiceContext context) throws Exception {
        super.initialize(context);
        transformer = context.getPluginContext().newPluginInstance("transformer");
    }
    
    @POST
    @Path("/process/{recordId}")
    public void processRecord(HttpServiceRequest request, HttpServiceResponder responder,
                             @PathParam("recordId") String recordId) {
        
        try {
            DataRecord input = inputStore.read(recordId);
            if (input == null) {
                responder.sendError(404, "Record not found");
                return;
            }
            
            DataRecord processed = transformer.transform(input);
            outputStore.write(recordId, processed);
            
            responder.sendJson(Collections.singletonMap("status", "processed"));
        } catch (Exception e) {
            responder.sendError(500, "Processing failed: " + e.getMessage());
        }
    }
}

Multi-Handler Service

public class APIGatewayService extends AbstractService {
    
    @Override
    public void configure(ServiceConfigurer configurer) {
        configurer.setName("APIGateway");
        configurer.setDescription("Unified API gateway service");
        
        // Add multiple handlers for different API areas
        configurer.addHandler(new UserHandler());
        configurer.addHandler(new ProductHandler());
        configurer.addHandler(new OrderHandler());
        configurer.addHandler(new AuthHandler());
        
        configurer.setInstances(3);
        configurer.setResources(new Resources(2048, 2)); // 2GB, 2 cores
        
        // Use common datasets
        configurer.useDataset("users");
        configurer.useDataset("products");
        configurer.useDataset("orders");
        configurer.useDataset("sessions");
    }
}

Service programs provide the foundation for building REST APIs, real-time data processing endpoints, and custom protocol handlers within the CDAP platform.

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-api

docs

annotations.md

application-framework.md

dataset-management.md

index.md

mapreduce-programs.md

plugin-framework.md

scheduling.md

service-programs.md

spark-programs.md

system-services.md

transactions.md

worker-programs.md

workflow-programs.md

tile.json