The Cask Data Application Platform (CDAP) is an integrated, open source application development platform for the Hadoop ecosystem that provides developers with data and application abstractions to simplify and accelerate application development.
npx @tessl/cli install tessl/maven-io-cdap-cdap--cdap@6.11.0The Cask Data Application Platform (CDAP) is an integrated, open source application development platform for the Hadoop ecosystem that provides developers with data and application abstractions to simplify and accelerate application development, address a broader range of real-time and batch use cases, and deploy applications into production while satisfying enterprise requirements.
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-api</artifactId>
<version>6.11.0</version>
</dependency>CDAP abstracts the complexity of underlying infrastructure while providing developers with powerful tools for building portable, maintainable data applications. From simple MapReduce jobs to complex ETL pipelines and real-time data processing workflows, CDAP enables enterprise-ready development with features like automatic metadata capture, operational control, security integration, and lineage tracking.
// Core application framework
import io.cdap.cdap.api.app.*;
import io.cdap.cdap.api.*;
// Configuration
import io.cdap.cdap.api.Config;
// Common utilities
import io.cdap.cdap.api.common.*;For specific program types:
// MapReduce
import io.cdap.cdap.api.mapreduce.*;
// Spark (Beta)
import io.cdap.cdap.api.spark.*;
// Services
import io.cdap.cdap.api.service.*;
import io.cdap.cdap.api.service.http.*;
// Workers
import io.cdap.cdap.api.worker.*;
// Workflows
import io.cdap.cdap.api.workflow.*;import io.cdap.cdap.api.app.*;
import io.cdap.cdap.api.*;
// 1. Define application configuration
public class MyAppConfig extends Config {
private String inputDataset = "input";
private String outputDataset = "output";
// getters and setters...
}
// 2. Create a simple application
public class MyDataApp extends AbstractApplication<MyAppConfig> {
@Override
public void configure(ApplicationConfigurer configurer,
ApplicationContext<MyAppConfig> context) {
MyAppConfig config = context.getConfig();
// Set application metadata
configurer.setName("MyDataProcessingApp");
configurer.setDescription("Processes data from input to output");
// Add a simple MapReduce program
configurer.addMapReduce(new MyMapReduce());
// Create datasets
configurer.createDataset(config.inputDataset, Table.class);
configurer.createDataset(config.outputDataset, Table.class);
}
}
// 3. Simple MapReduce program
public class MyMapReduce extends AbstractMapReduce {
@Override
public void configure(MapReduceConfigurer configurer) {
configurer.setName("MyMapReduce");
configurer.setDescription("Simple data processing");
}
}The CDAP API is organized into logical functional areas:
// Core API packages
io.cdap.cdap.api.* // Base interfaces and classes
io.cdap.cdap.api.annotation.* // Annotations for configuration and metadata
// Application Framework
io.cdap.cdap.api.app.* // Application building blocks
io.cdap.cdap.api.service.* // HTTP services and handlers
io.cdap.cdap.api.worker.* // Worker programs
io.cdap.cdap.api.workflow.* // Workflow orchestration
// Data Processing
io.cdap.cdap.api.mapreduce.* // MapReduce integration
io.cdap.cdap.api.spark.* // Spark integration
io.cdap.cdap.api.customaction.* // Custom workflow actions
// Data Management
io.cdap.cdap.api.dataset.* // Dataset abstractions and implementations
io.cdap.cdap.api.messaging.* // Messaging system integration
// Plugin System
io.cdap.cdap.api.plugin.* // Extensibility framework
// Operations & Governance
io.cdap.cdap.api.metrics.* // Metrics collection
io.cdap.cdap.api.metadata.* // Metadata management
io.cdap.cdap.api.lineage.* // Data lineage tracking
io.cdap.cdap.api.security.* // Authentication and authorizationCDAP applications follow a component-based architecture where different types of programs work together:
// Application - Root container
public interface Application<T extends Config> {
void configure(ApplicationConfigurer configurer, ApplicationContext<T> context);
default boolean isUpdateSupported() { return false; }
default ApplicationUpdateResult<T> updateConfig(ApplicationUpdateContext applicationUpdateContext)
throws Exception {
throw new UnsupportedOperationException("Application config update operation is not supported.");
}
}
// Base program types
public enum ProgramType {
MAPREDUCE, // Batch data processing with Hadoop MapReduce
SPARK, // Batch/streaming processing with Apache Spark
SERVICE, // HTTP services for real-time data access
WORKER, // Long-running background processes
WORKFLOW // Orchestration of multiple programs
}
// Resource allocation
public final class Resources {
public static final int DEFAULT_VIRTUAL_CORES = 1;
public static final int DEFAULT_MEMORY_MB = 512;
public Resources() { /* 512MB, 1 core */ }
public Resources(int memoryMB) { /* specified memory, 1 core */ }
public Resources(int memoryMB, int cores) { /* specified memory and cores */ }
public int getMemoryMB() { /* returns allocated memory */ }
public int getVirtualCores() { /* returns allocated CPU cores */ }
}// Program lifecycle interface
public interface ProgramLifecycle<T extends RuntimeContext> {
@TransactionPolicy(TransactionControl.IMPLICIT)
void initialize(T context) throws Exception;
@TransactionPolicy(TransactionControl.IMPLICIT)
void destroy();
}
// Program execution states
public enum ProgramStatus {
INITIALIZING, // Program is starting up
RUNNING, // Program is executing
STOPPING, // Program is shutting down
COMPLETED, // Program finished successfully
FAILED, // Program failed with error
KILLED; // Program was terminated
public static final Set<ProgramStatus> TERMINAL_STATES =
EnumSet.of(COMPLETED, FAILED, KILLED);
}import io.cdap.cdap.api.app.*;
import io.cdap.cdap.api.Config;
// 1. Define application configuration
public class MyAppConfig extends Config {
@Description("Input dataset name")
private String inputDataset = "input";
@Description("Output dataset name")
private String outputDataset = "output";
// getters and setters...
}
// 2. Create the application
public class MyDataApp extends AbstractApplication<MyAppConfig> {
@Override
public void configure(ApplicationConfigurer configurer,
ApplicationContext<MyAppConfig> context) {
MyAppConfig config = context.getConfig();
// Set application metadata
configurer.setName("MyDataProcessingApp");
configurer.setDescription("Processes data from input to output");
// Add programs
configurer.addMapReduce(new MyMapReduce());
configurer.addSpark(new MySparkProgram());
configurer.addService(new MyDataService());
configurer.addWorkflow(new MyProcessingWorkflow());
// Create datasets
configurer.createDataset(config.inputDataset, Table.class);
configurer.createDataset(config.outputDataset, Table.class);
}
}All CDAP programs receive runtime context providing access to system services:
// Base runtime context
public interface RuntimeContext extends FeatureFlagsProvider {
String getNamespace();
ApplicationSpecification getApplicationSpecification();
String getClusterName();
long getLogicalStartTime();
Map<String, String> getRuntimeArguments();
Metrics getMetrics();
}
// Dataset access context
public interface DatasetContext {
<T extends Dataset> T getDataset(String name) throws DataSetException;
<T extends Dataset> T getDataset(String namespace, String name) throws DataSetException;
void releaseDataset(Dataset dataset);
void discardDataset(Dataset dataset);
}
// Service discovery
public interface ServiceDiscoverer {
URL getServiceURL(String applicationId, String serviceId);
URL getServiceURL(String applicationId, String serviceId, String methodPath);
}// Configuration base class
public class Config implements Serializable {
// Base for all configuration classes
}
// Plugin configuration
public abstract class PluginConfig extends Config {
// Base for plugin configurations
}
// Plugin interface
public interface PluginConfigurer {
<T> T usePlugin(String pluginType, String pluginName, String pluginId,
PluginProperties properties);
<T> Class<T> usePluginClass(String pluginType, String pluginName, String pluginId,
PluginProperties properties);
}// Transactional interface for explicit transaction control
public interface Transactional {
<T> T execute(TxRunnable runnable) throws TransactionFailureException;
<T> T execute(int timeoutInSeconds, TxRunnable runnable)
throws TransactionFailureException;
}
// Transactional operations
public interface TxRunnable {
void run(DatasetContext context) throws Exception;
}
public interface TxCallable<V> {
V call(DatasetContext context) throws Exception;
}
// Utility for transaction operations
public final class Transactionals {
// Utility methods for transaction management
}Key annotations for configuration and behavior control:
// Core annotations
@Description("Provides descriptive text for API elements")
@Name("Specifies custom names for elements")
@Property // Marks fields as configuration properties
@Macro // Enables macro substitution in field values
// Plugin annotations
@Plugin(type = "source") // Marks classes as plugins of specific types
@Category("transform") // Categorizes elements for organization
// Metadata annotations
@Metadata(properties = {@MetadataProperty(key = "author", value = "team")})// Core application framework
import io.cdap.cdap.api.app.*;
import io.cdap.cdap.api.*;
// Data processing
import io.cdap.cdap.api.mapreduce.*;
import io.cdap.cdap.api.spark.*;
// Data access
import io.cdap.cdap.api.dataset.*;
import io.cdap.cdap.api.dataset.lib.*;
// Services
import io.cdap.cdap.api.service.*;
import io.cdap.cdap.api.service.http.*;
// Workflows
import io.cdap.cdap.api.workflow.*;
// Plugin system
import io.cdap.cdap.api.plugin.*;
// Annotations
import io.cdap.cdap.api.annotation.*;
// Utilities
import io.cdap.cdap.api.common.*;
import io.cdap.cdap.api.metrics.*;| Program Type | Purpose | Context Interface | Use Cases |
|---|---|---|---|
| MapReduce | Batch processing | MapReduceContext | ETL, data transformation, aggregation |
| Spark | Batch/Stream processing | SparkClientContext | ML, real-time analytics, complex transformations |
| Service | HTTP endpoints | HttpServiceContext | REST APIs, real-time queries, data serving |
| Worker | Background processing | WorkerContext | Data ingestion, monitoring, housekeeping |
| Workflow | Orchestration | WorkflowContext | Multi-step pipelines, conditional logic |
The CDAP API provides a complete toolkit for enterprise data application development, combining the power of Hadoop ecosystem tools with enterprise-grade operational features.