The Cask Data Application Platform (CDAP) is an integrated, open source application development platform for the Hadoop ecosystem that provides developers with data and application abstractions to simplify and accelerate application development.
—
CDAP provides comprehensive support for batch and streaming data processing through native integration with Apache MapReduce and Apache Spark. These processing frameworks enable scalable data transformation, analytics, and machine learning workflows.
CDAP's MapReduce integration provides a familiar programming model with enhanced features for dataset access, metrics collection, and operational control.
import io.cdap.cdap.api.mapreduce.*;
import org.apache.hadoop.mapreduce.*;
// MapReduce program interface
public interface MapReduce extends ProgramLifecycle<MapReduceContext> {
void configure(MapReduceConfigurer configurer);
}
// Abstract MapReduce implementation
public abstract class AbstractMapReduce
extends AbstractPluginConfigurable<MapReduceConfigurer>
implements MapReduce {
@Override
public void initialize(MapReduceContext context) throws Exception {
// Initialize MapReduce resources
// Configure Hadoop Job here
}
@Override
public void destroy() {
// Cleanup MapReduce resources
}
}
// MapReduce configurer interface
public interface MapReduceConfigurer
extends DatasetConfigurer, ProgramConfigurer, PluginConfigurer {
void setDriverResources(Resources resources);
void setMapperResources(Resources resources);
void setReducerResources(Resources resources);
}// MapReduce runtime context
public interface MapReduceContext
extends SchedulableProgramContext, RuntimeContext, DatasetContext,
ServiceDiscoverer, PluginContext, LineageRecorder {
// Hadoop Job configuration
Job getHadoopJob() throws IOException;
void setInput(Input input);
void addInput(Input input);
void setOutput(Output output);
void addOutput(Output output);
// Workflow integration
void write(String key, Object value);
// Resource access
Map<String, LocalizeResource> getResourcesToLocalize();
void localize(String name, URI uri);
void localize(String name, URI uri, boolean archive);
}
// Task-level context for mapper and reducer
public interface MapReduceTaskContext<KEYOUT, VALUEOUT>
extends SchedulableProgramContext, TaskLocalizationContext, RuntimeContext,
DatasetContext, ServiceDiscoverer, PluginContext {
// Hadoop context access
TaskAttemptContext getHadoopContext();
// Metrics and progress
void progress();
void setStatus(String msg);
Counter getCounter(String groupName, String counterName);
Counter getCounter(Enum<?> counterName);
}// Basic MapReduce program
public class WordCountMapReduce extends AbstractMapReduce {
@Override
public void configure(MapReduceConfigurer configurer) {
configurer.setName("WordCountMapReduce");
configurer.setDescription("Counts word occurrences in input data");
// Set resource requirements
configurer.setDriverResources(new Resources(512, 1));
configurer.setMapperResources(new Resources(1024, 1));
configurer.setReducerResources(new Resources(1024, 1));
}
@Override
public void initialize(MapReduceContext context) throws Exception {
Job job = context.getHadoopJob();
// Configure input and output
context.setInput(Input.ofDataset("input_text"));
context.setOutput(Output.ofDataset("word_counts"));
// Configure Hadoop job
job.setMapperClass(WordMapper.class);
job.setCombinerClass(WordReducer.class);
job.setReducerClass(WordReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
}
// Mapper implementation
public static class WordMapper extends Mapper<byte[], String, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(byte[] key, String value, Context context)
throws IOException, InterruptedException {
// Split text into words
String[] words = value.toLowerCase().split("\\s+");
for (String w : words) {
if (!w.isEmpty()) {
word.set(w);
context.write(word, one);
}
}
}
}
// Reducer implementation
public static class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
result.set(sum);
context.write(key, result);
}
}
}
// Advanced MapReduce with dataset operations
public class UserAnalyticsMapReduce extends AbstractMapReduce {
@Override
public void configure(MapReduceConfigurer configurer) {
configurer.setName("UserAnalyticsMapReduce");
configurer.setDescription("Analyzes user behavior patterns");
}
@Override
public void initialize(MapReduceContext context) throws Exception {
Job job = context.getHadoopJob();
// Multiple input datasets
context.addInput(Input.ofDataset("user_events"));
context.addInput(Input.ofDataset("user_profiles"));
// Multiple output datasets
context.addOutput(Output.ofDataset("user_stats"));
context.addOutput(Output.ofDataset("behavior_patterns"));
// Configure job
job.setMapperClass(UserAnalyticsMapper.class);
job.setReducerClass(UserAnalyticsReducer.class);
// Custom partitioning for better load distribution
job.setPartitionerClass(UserPartitioner.class);
job.setNumReduceTasks(10);
}
public static class UserAnalyticsMapper
extends Mapper<byte[], Row, Text, UserActivity> {
@Override
protected void map(byte[] key, Row row, Context context)
throws IOException, InterruptedException {
String userId = row.getString("user_id");
String eventType = row.getString("event_type");
long timestamp = row.getLong("timestamp");
UserActivity activity = new UserActivity(eventType, timestamp);
context.write(new Text(userId), activity);
}
}
public static class UserAnalyticsReducer
extends Reducer<Text, UserActivity, byte[], Put> {
@Override
protected void reduce(Text userId, Iterable<UserActivity> activities,
Context context) throws IOException, InterruptedException {
Map<String, Integer> eventCounts = new HashMap<>();
long firstActivity = Long.MAX_VALUE;
long lastActivity = Long.MIN_VALUE;
for (UserActivity activity : activities) {
String eventType = activity.getEventType();
eventCounts.put(eventType, eventCounts.getOrDefault(eventType, 0) + 1);
long timestamp = activity.getTimestamp();
firstActivity = Math.min(firstActivity, timestamp);
lastActivity = Math.max(lastActivity, timestamp);
}
// Create output record
byte[] rowKey = Bytes.toBytes(userId.toString());
Put put = new Put(rowKey);
put.add("stats", "first_activity", firstActivity);
put.add("stats", "last_activity", lastActivity);
put.add("stats", "session_duration", lastActivity - firstActivity);
for (Map.Entry<String, Integer> entry : eventCounts.entrySet()) {
put.add("events", entry.getKey(), entry.getValue());
}
context.write(rowKey, put);
}
}
}// Input configuration for MapReduce
public class Input {
public static Input ofDataset(String datasetName) { /* dataset input */ }
public static Input ofDataset(String datasetName, Map<String, String> arguments) { /* dataset with args */ }
public static Input ofDataset(String datasetName, Split split) { /* dataset with split */ }
// Batch input configuration
public static BatchSource.Builder<?, ?, ?> batch() { /* batch input builder */ }
}
// Output configuration for MapReduce
public class Output {
public static Output ofDataset(String datasetName) { /* dataset output */ }
public static Output ofDataset(String datasetName, Map<String, String> arguments) { /* dataset with args */ }
// Batch output configuration
public static BatchSink.Builder<?, ?, ?> batch() { /* batch output builder */ }
}
// Split specification for parallel processing
public interface Split {
long getLength();
List<String> getLocations();
}CDAP provides native Apache Spark integration supporting both batch and streaming processing with enhanced dataset access and operational features.
import io.cdap.cdap.api.spark.*;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
// Spark program interface
public interface Spark extends ProgramLifecycle<SparkClientContext> {
void configure(SparkConfigurer configurer);
}
// Abstract Spark implementation
public abstract class AbstractSpark
extends AbstractPluginConfigurable<SparkConfigurer>
implements Spark {
@Override
public void initialize(SparkClientContext context) throws Exception {
// Initialize Spark resources
}
@Override
public void destroy() {
// Cleanup Spark resources
}
}
// Spark configurer interface
public interface SparkConfigurer
extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {
void setMainClassName(String className);
void setDriverResources(Resources resources);
void setExecutorResources(Resources resources);
void setNumExecutors(int numExecutors);
void setExecutorCores(int cores);
void setClientResources(Resources resources);
// Spark configuration
void setSparkConf(Map<String, String> sparkConf);
void addSparkConf(String key, String value);
}// Spark client context
public interface SparkClientContext
extends SchedulableProgramContext, RuntimeContext, DatasetContext,
ServiceDiscoverer, PluginContext, LineageRecorder {
// Spark session access
SparkSession getSparkSession();
JavaSparkContext getOriginalSparkContext();
// Dataset integration
<K, V> JavaPairRDD<K, V> fromDataset(String datasetName);
<K, V> JavaPairRDD<K, V> fromDataset(String datasetName, Map<String, String> arguments);
<K, V> JavaPairRDD<K, V> fromDataset(String datasetName, Split... splits);
void saveAsDataset(JavaPairRDD<?, ?> rdd, String datasetName);
void saveAsDataset(JavaPairRDD<?, ?> rdd, String datasetName, Map<String, String> arguments);
// Localization
Map<String, LocalizeResource> getResourcesToLocalize();
void localize(String name, URI uri);
void localize(String name, URI uri, boolean archive);
}// Basic Spark program
public class DataTransformationSpark extends AbstractSpark {
@Override
public void configure(SparkConfigurer configurer) {
configurer.setName("DataTransformationSpark");
configurer.setDescription("Transforms raw data using Spark");
configurer.setMainClassName(DataTransformationSpark.class.getName());
// Resource configuration
configurer.setDriverResources(new Resources(1024, 2));
configurer.setExecutorResources(new Resources(2048, 2));
configurer.setNumExecutors(4);
// Spark configuration
configurer.addSparkConf("spark.sql.adaptive.enabled", "true");
configurer.addSparkConf("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
}
@Override
public void run(SparkClientContext context) throws Exception {
SparkSession spark = context.getSparkSession();
// Read data from CDAP dataset
JavaPairRDD<byte[], Row> inputRDD = context.fromDataset("raw_data");
// Transform to DataFrame for SQL operations
Dataset<Row> inputDF = spark.createDataFrame(
inputRDD.map(tuple -> {
Row row = tuple._2();
return RowFactory.create(
row.getString("id"),
row.getString("name"),
row.getLong("timestamp"),
row.getDouble("value")
);
}).rdd(),
DataTypes.createStructType(Arrays.asList(
DataTypes.createStructField("id", DataTypes.StringType, false),
DataTypes.createStructField("name", DataTypes.StringType, false),
DataTypes.createStructField("timestamp", DataTypes.LongType, false),
DataTypes.createStructField("value", DataTypes.DoubleType, false)
))
);
// Register temporary view for SQL
inputDF.createOrReplaceTempView("raw_data");
// Perform transformations using Spark SQL
Dataset<Row> transformedDF = spark.sql(
"SELECT " +
" id, " +
" name, " +
" DATE(FROM_UNIXTIME(timestamp/1000)) as date, " +
" ROUND(value * 1.1, 2) as adjusted_value, " +
" CASE " +
" WHEN value > 100 THEN 'high' " +
" WHEN value > 50 THEN 'medium' " +
" ELSE 'low' " +
" END as category " +
"FROM raw_data " +
"WHERE value IS NOT NULL AND value > 0"
);
// Convert back to RDD for dataset output
JavaPairRDD<byte[], Put> outputRDD = transformedDF.javaRDD().mapToPair(row -> {
String id = row.getAs("id");
byte[] key = Bytes.toBytes(id);
Put put = new Put(key);
put.add("data", "name", row.getAs("name"));
put.add("data", "date", row.getAs("date").toString());
put.add("data", "adjusted_value", row.getAs("adjusted_value"));
put.add("data", "category", row.getAs("category"));
return new Tuple2<>(key, put);
});
// Save to CDAP dataset
context.saveAsDataset(outputRDD, "transformed_data");
}
}
// Advanced Spark program with streaming
public class RealTimeAnalyticsSpark extends AbstractSpark {
@Override
public void configure(SparkConfigurer configurer) {
configurer.setName("RealTimeAnalyticsSpark");
configurer.setDescription("Real-time analytics using Spark Streaming");
configurer.setMainClassName(RealTimeAnalyticsSpark.class.getName());
// Streaming configuration
configurer.addSparkConf("spark.streaming.stopGracefullyOnShutdown", "true");
configurer.addSparkConf("spark.sql.streaming.checkpointLocation", "/tmp/spark-checkpoint");
}
@Override
public void run(SparkClientContext context) throws Exception {
SparkSession spark = context.getSparkSession();
// Read streaming data (example with Kafka integration)
Dataset<Row> streamingDF = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.load();
// Parse JSON data
Dataset<Row> eventsDF = streamingDF
.select(from_json(col("value").cast("string"), getEventSchema()).as("event"))
.select("event.*")
.withWatermark("timestamp", "10 minutes");
// Perform windowed aggregations
Dataset<Row> aggregatedDF = eventsDF
.groupBy(
window(col("timestamp"), "5 minutes"),
col("event_type"),
col("user_id")
)
.agg(
count("*").as("event_count"),
avg("duration").as("avg_duration"),
max("duration").as("max_duration")
);
// Write results to CDAP dataset using foreachBatch
StreamingQuery query = aggregatedDF
.writeStream()
.outputMode("update")
.foreachBatch((Dataset<Row> batchDF, Long batchId) -> {
// Convert DataFrame to RDD for dataset output
JavaPairRDD<byte[], Put> outputRDD = batchDF.javaRDD().mapToPair(row -> {
String key = String.format("%s_%s_%d",
row.getAs("user_id"),
row.getAs("event_type"),
row.getStruct(0).getLong(0) // window start
);
Put put = new Put(Bytes.toBytes(key));
put.add("stats", "event_count", row.getAs("event_count"));
put.add("stats", "avg_duration", row.getAs("avg_duration"));
put.add("stats", "max_duration", row.getAs("max_duration"));
put.add("stats", "batch_id", batchId);
return new Tuple2<>(Bytes.toBytes(key), put);
});
context.saveAsDataset(outputRDD, "real_time_analytics");
})
.start();
// Wait for termination
query.awaitTermination();
}
private StructType getEventSchema() {
return DataTypes.createStructType(Arrays.asList(
DataTypes.createStructField("user_id", DataTypes.StringType, false),
DataTypes.createStructField("event_type", DataTypes.StringType, false),
DataTypes.createStructField("timestamp", DataTypes.TimestampType, false),
DataTypes.createStructField("duration", DataTypes.DoubleType, true)
));
}
}// Using Spark SQL with CDAP datasets
public class SparkSQLAnalytics extends AbstractSpark {
@Override
public void run(SparkClientContext context) throws Exception {
SparkSession spark = context.getSparkSession();
// Load multiple datasets as DataFrames
Dataset<Row> usersDF = loadDatasetAsDF(context, spark, "users", getUserSchema());
Dataset<Row> ordersDF = loadDatasetAsDF(context, spark, "orders", getOrderSchema());
Dataset<Row> productsDF = loadDatasetAsDF(context, spark, "products", getProductSchema());
// Register as temporary views
usersDF.createOrReplaceTempView("users");
ordersDF.createOrReplaceTempView("orders");
productsDF.createOrReplaceTempView("products");
// Complex analytical queries
Dataset<Row> customerAnalytics = spark.sql(
"SELECT " +
" u.user_id, " +
" u.name, " +
" COUNT(o.order_id) as total_orders, " +
" SUM(o.total_amount) as total_spent, " +
" AVG(o.total_amount) as avg_order_value, " +
" COLLECT_LIST(DISTINCT p.category) as purchased_categories, " +
" DATEDIFF(CURRENT_DATE(), MAX(o.order_date)) as days_since_last_order " +
"FROM users u " +
"LEFT JOIN orders o ON u.user_id = o.user_id " +
"LEFT JOIN products p ON o.product_id = p.product_id " +
"GROUP BY u.user_id, u.name " +
"HAVING total_orders > 0 " +
"ORDER BY total_spent DESC"
);
// Save results
saveDataFrameAsDataset(context, customerAnalytics, "customer_analytics");
// Create customer segments
Dataset<Row> customerSegments = spark.sql(
"SELECT " +
" user_id, " +
" name, " +
" total_spent, " +
" CASE " +
" WHEN total_spent > 1000 THEN 'VIP' " +
" WHEN total_spent > 500 THEN 'Premium' " +
" WHEN total_spent > 100 THEN 'Regular' " +
" ELSE 'Basic' " +
" END as segment, " +
" CASE " +
" WHEN days_since_last_order <= 30 THEN 'Active' " +
" WHEN days_since_last_order <= 90 THEN 'At Risk' " +
" ELSE 'Churned' " +
" END as status " +
"FROM customer_analytics"
);
customerSegments.createOrReplaceTempView("customer_segments");
saveDataFrameAsDataset(context, customerSegments, "customer_segments");
}
private Dataset<Row> loadDatasetAsDF(SparkClientContext context, SparkSession spark,
String datasetName, StructType schema) {
JavaPairRDD<byte[], Row> rdd = context.fromDataset(datasetName);
return spark.createDataFrame(rdd.map(Tuple2::_2).rdd(), schema);
}
private void saveDataFrameAsDataset(SparkClientContext context, Dataset<Row> df,
String datasetName) {
JavaPairRDD<byte[], Put> outputRDD = df.javaRDD().mapToPair(row -> {
String key = row.getAs("user_id").toString();
Put put = new Put(Bytes.toBytes(key));
// Add all columns to the Put
for (String fieldName : row.schema().fieldNames()) {
Object value = row.getAs(fieldName);
if (value != null) {
put.add("data", fieldName, value.toString());
}
}
return new Tuple2<>(Bytes.toBytes(key), put);
});
context.saveAsDataset(outputRDD, datasetName);
}
}Custom actions can be used in workflows for specialized data processing tasks:
import io.cdap.cdap.api.customaction.*;
// Custom data processing action
public class DataQualityCheckAction extends AbstractCustomAction {
@Override
public void configure(CustomActionConfigurer configurer) {
configurer.setName("DataQualityCheck");
configurer.setDescription("Validates data quality and sets processing flags");
}
@Override
public void run(CustomActionContext context) throws Exception {
Table inputData = context.getDataset("raw_data");
WorkflowToken token = context.getWorkflowToken();
Metrics metrics = context.getMetrics();
// Perform comprehensive data quality checks
DataQualityResults results = performQualityChecks(inputData);
// Store results in workflow token for downstream processing
token.put("dq.total_records", String.valueOf(results.getTotalRecords()));
token.put("dq.valid_records", String.valueOf(results.getValidRecords()));
token.put("dq.error_rate", String.valueOf(results.getErrorRate()));
token.put("dq.null_rate", String.valueOf(results.getNullRate()));
token.put("dq.duplicate_rate", String.valueOf(results.getDuplicateRate()));
// Emit metrics for monitoring
metrics.gauge("data_quality.error_rate", results.getErrorRate());
metrics.gauge("data_quality.null_rate", results.getNullRate());
metrics.gauge("data_quality.duplicate_rate", results.getDuplicateRate());
metrics.count("data_quality.total_records", results.getTotalRecords());
// Determine if processing should continue
boolean canProceed = results.getErrorRate() < 0.05 &&
results.getNullRate() < 0.1 &&
results.getDuplicateRate() < 0.02;
token.put("dq.can_proceed", String.valueOf(canProceed));
if (!canProceed) {
// Optionally fail the action to stop workflow
throw new RuntimeException(
String.format("Data quality check failed: error_rate=%.2f%%, null_rate=%.2f%%, duplicate_rate=%.2f%%",
results.getErrorRate() * 100,
results.getNullRate() * 100,
results.getDuplicateRate() * 100)
);
}
}
private DataQualityResults performQualityChecks(Table inputData) {
// Implementation of data quality validation logic
return new DataQualityResults();
}
}
// Data processing coordination action
public class ProcessingCoordinatorAction extends AbstractCustomAction {
@Override
public void run(CustomActionContext context) throws Exception {
WorkflowToken token = context.getWorkflowToken();
// Read processing parameters from token
long totalRecords = Long.parseLong(token.get("dq.total_records").toString());
double errorRate = Double.parseDouble(token.get("dq.error_rate").toString());
// Determine optimal processing strategy
ProcessingStrategy strategy = determineStrategy(totalRecords, errorRate);
// Configure downstream processing
token.put("processing.strategy", strategy.name());
token.put("processing.batch_size", String.valueOf(strategy.getBatchSize()));
token.put("processing.parallel_level", String.valueOf(strategy.getParallelism()));
context.getMetrics().gauge("processing.batch_size", strategy.getBatchSize());
}
private ProcessingStrategy determineStrategy(long totalRecords, double errorRate) {
if (totalRecords > 10_000_000) {
return ProcessingStrategy.LARGE_BATCH;
} else if (totalRecords > 1_000_000) {
return ProcessingStrategy.MEDIUM_BATCH;
} else {
return ProcessingStrategy.SMALL_BATCH;
}
}
}The CDAP data processing framework provides enterprise-grade capabilities for both batch and streaming data processing, with seamless integration between MapReduce, Spark, and CDAP's dataset and operational features.
Install with Tessl CLI
npx tessl i tessl/maven-io-cdap-cdap--cdap