The Cask Data Application Platform (CDAP) is an integrated, open source application development platform for the Hadoop ecosystem that provides developers with data and application abstractions to simplify and accelerate application development.
—
The CDAP Application Framework provides the core building blocks for creating enterprise data applications. It enables developers to compose applications from different types of programs including services, workflows, workers, MapReduce, and Spark programs.
Applications are the top-level container for all programs and resources in CDAP:
import io.cdap.cdap.api.app.*;
import io.cdap.cdap.api.*;
// Base application interface
public interface Application<T extends Config> {
void configure(ApplicationConfigurer configurer, ApplicationContext<T> context);
default boolean isUpdateSupported() {
return false;
}
default ApplicationUpdateResult<T> updateConfig(ApplicationUpdateContext applicationUpdateContext)
throws Exception {
throw new UnsupportedOperationException("Application config update operation is not supported.");
}
}
// Abstract base implementation
public abstract class AbstractApplication<T extends Config>
extends AbstractPluginConfigurable<ApplicationConfigurer>
implements Application<T> {
@Override
public void configure(ApplicationConfigurer configurer,
ApplicationContext<T> context) {
// Override in subclass to configure application
}
@Override
public boolean isUpdateSupported() {
return false;
}
@Override
public ApplicationUpdateResult<T> updateConfig(ApplicationUpdateContext context) {
throw new UnsupportedOperationException();
}
}// Application configurer interface
public interface ApplicationConfigurer extends PluginConfigurer, DatasetConfigurer, FeatureFlagsProvider {
void setName(String name);
void setDescription(String description);
// Add program types
void addMapReduce(MapReduce mapReduce);
void addSpark(Spark spark);
void addService(Service service);
void addWorker(Worker worker);
void addWorkflow(Workflow workflow);
// Schedule workflows
ScheduleBuilder buildSchedule(String scheduleName, ProgramType programType, String programName);
void schedule(ScheduleCreationSpec scheduleCreationSpec);
// Additional methods
void emitMetadata(Metadata metadata, MetadataScope scope);
TriggerFactory getTriggerFactory();
RuntimeConfigurer getRuntimeConfigurer();
String getDeployedNamespace();
ApplicationSpecification getDeployedApplicationSpec();
}
// Application context
public interface ApplicationContext<T extends Config> {
T getConfig();
}
// Application update support
public class ApplicationUpdateResult<T extends Config> {
public T getNewConfig() { /* returns updated configuration */ }
public ApplicationConfigUpdateAction getUpdateAction() { /* returns update action */ }
}
public enum ApplicationConfigUpdateAction {
UPGRADE_ARTIFACT, // Upgrade to new artifact version
UPDATE_CONFIG // Update application configuration
}All CDAP programs receive runtime context providing access to system services:
// Base runtime context
public interface RuntimeContext extends FeatureFlagsProvider {
ApplicationSpecification getApplicationSpecification();
Map<String, String> getRuntimeArguments();
String getClusterName();
String getNamespace();
RunId getRunId();
Admin getAdmin();
DataTracer getDataTracer(String dataTracerName);
}Services provide HTTP endpoints for real-time data access and application interaction.
import io.cdap.cdap.api.service.*;
import io.cdap.cdap.api.service.http.*;
// Service interface
public interface Service extends ProgramLifecycle<ServiceContext> {
void configure(ServiceConfigurer configurer);
}
// Abstract service implementation
public abstract class AbstractService implements Service {
@Override
public void initialize(ServiceContext context) throws Exception {
// Initialize service resources
}
@Override
public void destroy() {
// Cleanup service resources
}
}
// Basic service with HTTP handlers
public class BasicService extends AbstractService {
@Override
public void configure(ServiceConfigurer configurer) {
configurer.setName("MyService");
configurer.setDescription("HTTP service for data access");
configurer.addHandler(new MyHttpHandler());
configurer.setInstances(2);
configurer.setResources(new Resources(1024, 2));
}
}// HTTP service handler interface
public interface HttpServiceHandler extends ProgramLifecycle<HttpServiceContext> {
// Lifecycle methods inherited from ProgramLifecycle
}
// Abstract handler implementation
public abstract class AbstractHttpServiceHandler implements HttpServiceHandler {
@Override
public void initialize(HttpServiceContext context) throws Exception {
// Initialize handler
}
@Override
public void destroy() {
// Cleanup handler
}
}
// HTTP service context
public interface HttpServiceContext
extends RuntimeContext, DatasetContext, ServiceDiscoverer, PluginContext {
int getInstanceId();
int getInstanceCount();
DiscoveryServiceClient getDiscoveryServiceClient();
}// HTTP request and response interfaces
public interface HttpServiceRequest {
String getMethod();
String getUri();
Map<String, List<String>> getAllHeaders();
String getHeader(String name);
Map<String, List<String>> getAllParameters();
String getParameter(String name);
byte[] getContent();
String getContentType();
int getContentLength();
}
public interface HttpServiceResponder {
void sendString(int status, String data, String contentType);
void sendBytes(int status, byte[] data, String contentType);
void sendJson(int status, Object object);
void sendError(int status, String errorMessage);
void send(int status, ByteBuffer content, String contentType, Map<String, String> headers);
}
// Content streaming interfaces
public interface HttpContentProducer {
ByteBuffer nextChunk(TransferContext transferContext) throws Exception;
void onFinish() throws Exception;
void onError(Throwable failureCause);
}
public interface HttpContentConsumer {
void onReceived(ByteBuffer chunk, TransferContext transferContext) throws Exception;
void onFinish() throws Exception;
void onError(Throwable failureCause);
}// Example HTTP service handler
@Path("/data")
public class DataServiceHandler extends AbstractHttpServiceHandler {
@UseDataSet("users")
private Table users;
@GET
@Path("/user/{id}")
public void getUser(HttpServiceRequest request, HttpServiceResponder responder,
@PathParam("id") String userId) {
try {
Row row = users.get(Bytes.toBytes(userId));
if (row.isEmpty()) {
responder.sendError(404, "User not found");
} else {
String userData = row.getString("data");
responder.sendString(200, userData, "application/json");
}
} catch (Exception e) {
responder.sendError(500, "Internal error: " + e.getMessage());
}
}
@POST
@Path("/user")
public void createUser(HttpServiceRequest request, HttpServiceResponder responder) {
try {
String content = Charset.forName("UTF-8").decode(
ByteBuffer.wrap(request.getContent())).toString();
JsonObject user = new JsonParser().parse(content).getAsJsonObject();
String userId = user.get("id").getAsString();
users.put(Bytes.toBytes(userId), "data", content);
responder.sendString(201, "User created", "text/plain");
} catch (Exception e) {
responder.sendError(400, "Invalid request: " + e.getMessage());
}
}
}Workers are long-running background programs for continuous data processing, monitoring, or housekeeping tasks.
import io.cdap.cdap.api.worker.*;
// Worker interface
public interface Worker extends ProgramLifecycle<WorkerContext> {
void configure(WorkerConfigurer configurer);
void run() throws Exception;
void stop();
}
// Abstract worker implementation
public abstract class AbstractWorker
extends AbstractPluginConfigurable<WorkerConfigurer>
implements ProgramLifecycle<WorkerContext>, Worker {
@Override
public void initialize(WorkerContext context) throws Exception {
// Initialize worker resources
}
@Override
public abstract void run() throws Exception;
@Override
public void stop() {
// Graceful shutdown logic
}
@Override
public void destroy() {
// Cleanup resources
}
}
// Worker context
public interface WorkerContext
extends RuntimeContext, DatasetContext, ServiceDiscoverer, PluginContext {
WorkerSpecification getSpecification();
int getInstanceId();
int getInstanceCount();
}// Worker configurer interface
public interface WorkerConfigurer extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {
void setName(String name);
void setDescription(String description);
void setInstances(int instances);
void setResources(Resources resources);
}
// Worker specification
public class WorkerSpecification extends AbstractProgramSpecification {
public int getInstances() { /* returns number of instances */ }
public Resources getResources() { /* returns resource allocation */ }
}// Example data ingestion worker
public class DataIngestionWorker extends AbstractWorker {
private volatile boolean running;
@Override
public void configure(WorkerConfigurer configurer) {
configurer.setName("DataIngestionWorker");
configurer.setDescription("Continuously ingests data from external source");
configurer.setInstances(3);
configurer.setResources(new Resources(512, 1));
}
@Override
public void run() throws Exception {
running = true;
while (running) {
try {
// Get context and datasets
WorkerContext context = getContext();
Table outputTable = context.getDataset("ingested_data");
// Ingest data (example)
List<DataRecord> records = fetchDataFromSource();
for (DataRecord record : records) {
outputTable.put(
Bytes.toBytes(record.getId()),
"data", record.getData(),
"timestamp", System.currentTimeMillis()
);
}
// Sleep before next iteration
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
LOG.error("Error in data ingestion", e);
Thread.sleep(10000); // Wait before retry
}
}
}
@Override
public void stop() {
running = false;
}
private List<DataRecord> fetchDataFromSource() {
// Implementation for fetching data
return new ArrayList<>();
}
}
// Example monitoring worker
public class MetricsCollectionWorker extends AbstractWorker {
@Override
public void configure(WorkerConfigurer configurer) {
configurer.setName("MetricsCollector");
configurer.setDescription("Collects and aggregates application metrics");
}
@Override
public void run() throws Exception {
WorkerContext context = getContext();
Metrics metrics = context.getMetrics();
while (context.getState().equals(ProgramRunStatus.RUNNING)) {
// Collect custom metrics
collectSystemMetrics(metrics);
collectApplicationMetrics(context, metrics);
Thread.sleep(60000); // Collect every minute
}
}
private void collectSystemMetrics(Metrics metrics) {
// Emit system-level metrics
metrics.gauge("system.memory.used", getUsedMemory());
metrics.gauge("system.cpu.usage", getCpuUsage());
}
private void collectApplicationMetrics(WorkerContext context, Metrics metrics) {
// Collect application-specific metrics
Table userTable = context.getDataset("users");
long userCount = countTableRows(userTable);
metrics.gauge("app.users.count", userCount);
}
}Workflows orchestrate the execution of multiple programs in a defined sequence, with support for conditional logic, parallel execution, and data passing.
import io.cdap.cdap.api.workflow.*;
// Workflow interface
public interface Workflow {
void configure(WorkflowConfigurer configurer);
}
// Abstract workflow implementation
public abstract class AbstractWorkflow
extends AbstractPluginConfigurable<WorkflowConfigurer>
implements Workflow {
@Override
public abstract void configure(WorkflowConfigurer configurer);
}
// Workflow context
public interface WorkflowContext
extends RuntimeContext, DatasetContext, ServiceDiscoverer, PluginContext {
WorkflowToken getToken();
WorkflowInfo getWorkflowInfo();
WorkflowNodeState getNodeState(String nodeId);
}// Workflow configurer interface
public interface WorkflowConfigurer
extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {
// Add program execution nodes
void addMapReduce(String mapReduce);
void addSpark(String spark);
void addAction(WorkflowAction action);
// Control flow constructs
WorkflowForkConfigurer fork();
WorkflowConditionConfigurer condition(Predicate<WorkflowContext> predicate);
// Resource allocation
void setDriverResources(Resources resources);
}
// Fork configurer for parallel execution
public interface WorkflowForkConfigurer {
WorkflowForkConfigurer addMapReduce(String mapReduce);
WorkflowForkConfigurer addSpark(String spark);
WorkflowForkConfigurer addAction(WorkflowAction action);
WorkflowForkConfigurer fork();
WorkflowForkConfigurer condition(Predicate<WorkflowContext> predicate);
WorkflowConfigurer join();
}
// Condition configurer for conditional execution
public interface WorkflowConditionConfigurer {
WorkflowConditionConfigurer addMapReduce(String mapReduce);
WorkflowConditionConfigurer addSpark(String spark);
WorkflowConditionConfigurer addAction(WorkflowAction action);
WorkflowConditionConfigurer fork();
WorkflowConditionConfigurer condition(Predicate<WorkflowContext> predicate);
WorkflowConditionConfigurer otherwise();
WorkflowConfigurer end();
}Workflows use tokens to pass data between nodes:
// Workflow token interface
public interface WorkflowToken {
void put(String key, String value);
void put(String key, String value, WorkflowToken.Scope scope);
Value get(String key);
Value get(String key, String nodeName);
Value get(String key, WorkflowToken.Scope scope);
Map<String, Value> getAll();
Map<String, Value> getAll(WorkflowToken.Scope scope);
Map<String, Map<String, Value>> getAllFromNodes();
// Token scopes
enum Scope {
SYSTEM, // System-wide token data
USER // User-defined token data
}
}
// Token value container
public class Value {
public String toString() { /* returns string representation */ }
public long getAsLong() { /* returns as long value */ }
public double getAsDouble() { /* returns as double value */ }
public boolean getAsBoolean() { /* returns as boolean value */ }
}
// Node value for specific workflow nodes
public class NodeValue {
public String getNodeName() { /* returns node name */ }
public Value getValue() { /* returns node value */ }
}// Workflow node types
public enum WorkflowNodeType {
ACTION, // Custom action node
MAPREDUCE, // MapReduce program node
SPARK, // Spark program node
FORK, // Parallel execution fork
JOIN, // Fork join point
CONDITION // Conditional execution node
}
// Workflow node interface
public interface WorkflowNode {
String getNodeId();
WorkflowNodeType getType();
}
// Specific node implementations
public class WorkflowActionNode implements WorkflowNode {
public WorkflowActionSpecification getProgram() { /* returns action spec */ }
}
public class WorkflowForkNode implements WorkflowNode {
public List<List<WorkflowNode>> getBranches() { /* returns fork branches */ }
}
public class WorkflowConditionNode implements WorkflowNode {
public List<WorkflowNode> getIfBranch() { /* returns if branch */ }
public List<WorkflowNode> getElseBranch() { /* returns else branch */ }
public Predicate<WorkflowContext> getPredicate() { /* returns condition */ }
}
// Node status and state
public enum NodeStatus {
STARTING, // Node is initializing
RUNNING, // Node is executing
COMPLETED, // Node completed successfully
FAILED, // Node failed with error
KILLED // Node was terminated
}
public class WorkflowNodeState {
public String getNodeId() { /* returns node ID */ }
public NodeStatus getNodeStatus() { /* returns current status */ }
public String getFailureCause() { /* returns failure reason if failed */ }
}Create custom workflow actions for specialized processing:
// Custom action interface
public interface CustomAction extends ProgramLifecycle<CustomActionContext> {
void configure(CustomActionConfigurer configurer);
void run(CustomActionContext context) throws Exception;
}
// Abstract custom action
public abstract class AbstractCustomAction implements CustomAction {
@Override
public void initialize(CustomActionContext context) throws Exception {
// Initialize action resources
}
@Override
public abstract void run(CustomActionContext context) throws Exception;
@Override
public void destroy() {
// Cleanup action resources
}
}
// Custom action context
public interface CustomActionContext
extends RuntimeContext, DatasetContext, ServiceDiscoverer, PluginContext {
WorkflowToken getWorkflowToken();
CustomActionSpecification getSpecification();
}// Example data processing workflow
public class DataProcessingWorkflow extends AbstractWorkflow {
@Override
public void configure(WorkflowConfigurer configurer) {
configurer.setName("DataProcessingWorkflow");
configurer.setDescription("Complete data processing pipeline");
// Sequential execution
configurer.addAction(new DataValidationAction());
configurer.addMapReduce("DataCleaningMapReduce");
// Conditional processing
configurer.condition(new DataQualityCondition())
.addSpark("DataTransformationSpark")
.addMapReduce("DataAggregationMapReduce")
.otherwise()
.addAction(new DataRepairAction())
.end();
// Parallel processing
configurer.fork()
.addSpark("ModelTrainingSpark")
.fork()
.addMapReduce("ReportGeneration")
.addAction(new NotificationAction())
.join()
.join();
configurer.addAction(new CleanupAction());
}
}
// Example condition implementation
public class DataQualityCondition implements Predicate<WorkflowContext> {
@Override
public boolean apply(WorkflowContext context) {
WorkflowToken token = context.getToken();
Value errorRate = token.get("data.error_rate");
if (errorRate != null) {
double rate = errorRate.getAsDouble();
return rate < 0.05; // Proceed only if error rate < 5%
}
return false; // Default to repair path if no data
}
}
// Example custom action
public class DataValidationAction extends AbstractCustomAction {
@Override
public void configure(CustomActionConfigurer configurer) {
configurer.setName("DataValidation");
configurer.setDescription("Validates input data quality");
}
@Override
public void run(CustomActionContext context) throws Exception {
Table inputData = context.getDataset("raw_data");
WorkflowToken token = context.getWorkflowToken();
// Perform data validation
long totalRecords = 0;
long errorRecords = 0;
Scanner scanner = inputData.scan(null, null);
try {
Row row;
while ((row = scanner.next()) != null) {
totalRecords++;
if (!isValidRecord(row)) {
errorRecords++;
}
}
} finally {
scanner.close();
}
// Store results in workflow token
double errorRate = (double) errorRecords / totalRecords;
token.put("data.total_records", String.valueOf(totalRecords));
token.put("data.error_records", String.valueOf(errorRecords));
token.put("data.error_rate", String.valueOf(errorRate));
context.getMetrics().gauge("validation.error_rate", errorRate);
}
private boolean isValidRecord(Row row) {
// Implement validation logic
return row.get("id") != null && row.get("data") != null;
}
}Applications can persist state across runs using the App State Store:
// Application state store interface
public interface AppStateStore {
void saveState(String key, byte[] value) throws IOException;
byte[] getState(String key) throws IOException;
void deleteState(String key) throws IOException;
}
// Usage in application context
public class StatefulApplication extends AbstractApplication {
@Override
public void configure(ApplicationConfigurer configurer, ApplicationContext context) {
// Application can access state store through admin interface
// State persists across application updates and restarts
}
}The Application Framework provides the foundation for building complex, multi-component data applications with enterprise-grade operational features including service discovery, resource management, state persistence, and orchestration capabilities.
Install with Tessl CLI
npx tessl i tessl/maven-io-cdap-cdap--cdap