CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-cdap-cdap--cdap

The Cask Data Application Platform (CDAP) is an integrated, open source application development platform for the Hadoop ecosystem that provides developers with data and application abstractions to simplify and accelerate application development.

Pending
Overview
Eval results
Files

data-processing.mddocs/

Data Processing

CDAP provides comprehensive support for batch and streaming data processing through native integration with Apache MapReduce and Apache Spark. These processing frameworks enable scalable data transformation, analytics, and machine learning workflows.

MapReduce Integration

CDAP's MapReduce integration provides a familiar programming model with enhanced features for dataset access, metrics collection, and operational control.

MapReduce Program Definition

import io.cdap.cdap.api.mapreduce.*;
import org.apache.hadoop.mapreduce.*;

// MapReduce program interface
public interface MapReduce extends ProgramLifecycle<MapReduceContext> {
    void configure(MapReduceConfigurer configurer);
}

// Abstract MapReduce implementation
public abstract class AbstractMapReduce 
    extends AbstractPluginConfigurable<MapReduceConfigurer>
    implements MapReduce {
    
    @Override
    public void initialize(MapReduceContext context) throws Exception {
        // Initialize MapReduce resources
        // Configure Hadoop Job here
    }
    
    @Override  
    public void destroy() {
        // Cleanup MapReduce resources
    }
}

// MapReduce configurer interface
public interface MapReduceConfigurer 
    extends DatasetConfigurer, ProgramConfigurer, PluginConfigurer {
    
    void setDriverResources(Resources resources);
    void setMapperResources(Resources resources);
    void setReducerResources(Resources resources);
}

MapReduce Context

// MapReduce runtime context
public interface MapReduceContext 
    extends SchedulableProgramContext, RuntimeContext, DatasetContext, 
            ServiceDiscoverer, PluginContext, LineageRecorder {
    
    // Hadoop Job configuration
    Job getHadoopJob() throws IOException;
    void setInput(Input input);
    void addInput(Input input);
    void setOutput(Output output);
    void addOutput(Output output);
    
    // Workflow integration
    void write(String key, Object value);
    
    // Resource access
    Map<String, LocalizeResource> getResourcesToLocalize();
    void localize(String name, URI uri);
    void localize(String name, URI uri, boolean archive);
}

// Task-level context for mapper and reducer
public interface MapReduceTaskContext<KEYOUT, VALUEOUT> 
    extends SchedulableProgramContext, TaskLocalizationContext, RuntimeContext,
            DatasetContext, ServiceDiscoverer, PluginContext {
    
    // Hadoop context access
    TaskAttemptContext getHadoopContext();
    
    // Metrics and progress
    void progress();
    void setStatus(String msg);
    Counter getCounter(String groupName, String counterName);
    Counter getCounter(Enum<?> counterName);
}

MapReduce Implementation Examples

// Basic MapReduce program
public class WordCountMapReduce extends AbstractMapReduce {
    
    @Override
    public void configure(MapReduceConfigurer configurer) {
        configurer.setName("WordCountMapReduce");
        configurer.setDescription("Counts word occurrences in input data");
        
        // Set resource requirements
        configurer.setDriverResources(new Resources(512, 1));
        configurer.setMapperResources(new Resources(1024, 1)); 
        configurer.setReducerResources(new Resources(1024, 1));
    }
    
    @Override
    public void initialize(MapReduceContext context) throws Exception {
        Job job = context.getHadoopJob();
        
        // Configure input and output
        context.setInput(Input.ofDataset("input_text"));
        context.setOutput(Output.ofDataset("word_counts"));
        
        // Configure Hadoop job
        job.setMapperClass(WordMapper.class);
        job.setCombinerClass(WordReducer.class);
        job.setReducerClass(WordReducer.class);
        
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
    }
    
    // Mapper implementation
    public static class WordMapper extends Mapper<byte[], String, Text, IntWritable> {
        
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        
        @Override
        protected void map(byte[] key, String value, Context context) 
            throws IOException, InterruptedException {
            
            // Split text into words
            String[] words = value.toLowerCase().split("\\s+");
            for (String w : words) {
                if (!w.isEmpty()) {
                    word.set(w);
                    context.write(word, one);
                }
            }
        }
    }
    
    // Reducer implementation
    public static class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        
        private IntWritable result = new IntWritable();
        
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
            
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            
            result.set(sum);
            context.write(key, result);
        }
    }
}

// Advanced MapReduce with dataset operations
public class UserAnalyticsMapReduce extends AbstractMapReduce {
    
    @Override
    public void configure(MapReduceConfigurer configurer) {
        configurer.setName("UserAnalyticsMapReduce");
        configurer.setDescription("Analyzes user behavior patterns");
    }
    
    @Override
    public void initialize(MapReduceContext context) throws Exception {
        Job job = context.getHadoopJob();
        
        // Multiple input datasets
        context.addInput(Input.ofDataset("user_events"));
        context.addInput(Input.ofDataset("user_profiles"));
        
        // Multiple output datasets
        context.addOutput(Output.ofDataset("user_stats"));
        context.addOutput(Output.ofDataset("behavior_patterns"));
        
        // Configure job
        job.setMapperClass(UserAnalyticsMapper.class);
        job.setReducerClass(UserAnalyticsReducer.class);
        
        // Custom partitioning for better load distribution
        job.setPartitionerClass(UserPartitioner.class);
        job.setNumReduceTasks(10);
    }
    
    public static class UserAnalyticsMapper 
        extends Mapper<byte[], Row, Text, UserActivity> {
        
        @Override
        protected void map(byte[] key, Row row, Context context) 
            throws IOException, InterruptedException {
            
            String userId = row.getString("user_id");
            String eventType = row.getString("event_type");
            long timestamp = row.getLong("timestamp");
            
            UserActivity activity = new UserActivity(eventType, timestamp);
            context.write(new Text(userId), activity);
        }
    }
    
    public static class UserAnalyticsReducer 
        extends Reducer<Text, UserActivity, byte[], Put> {
        
        @Override
        protected void reduce(Text userId, Iterable<UserActivity> activities, 
                            Context context) throws IOException, InterruptedException {
            
            Map<String, Integer> eventCounts = new HashMap<>();
            long firstActivity = Long.MAX_VALUE;
            long lastActivity = Long.MIN_VALUE;
            
            for (UserActivity activity : activities) {
                String eventType = activity.getEventType();
                eventCounts.put(eventType, eventCounts.getOrDefault(eventType, 0) + 1);
                
                long timestamp = activity.getTimestamp();
                firstActivity = Math.min(firstActivity, timestamp);
                lastActivity = Math.max(lastActivity, timestamp);
            }
            
            // Create output record
            byte[] rowKey = Bytes.toBytes(userId.toString());
            Put put = new Put(rowKey);
            put.add("stats", "first_activity", firstActivity);
            put.add("stats", "last_activity", lastActivity);
            put.add("stats", "session_duration", lastActivity - firstActivity);
            
            for (Map.Entry<String, Integer> entry : eventCounts.entrySet()) {
                put.add("events", entry.getKey(), entry.getValue());
            }
            
            context.write(rowKey, put);
        }
    }
}

Input and Output Configuration

// Input configuration for MapReduce
public class Input {
    public static Input ofDataset(String datasetName) { /* dataset input */ }
    public static Input ofDataset(String datasetName, Map<String, String> arguments) { /* dataset with args */ }
    public static Input ofDataset(String datasetName, Split split) { /* dataset with split */ }
    
    // Batch input configuration
    public static BatchSource.Builder<?, ?, ?> batch() { /* batch input builder */ }
}

// Output configuration for MapReduce  
public class Output {
    public static Output ofDataset(String datasetName) { /* dataset output */ }
    public static Output ofDataset(String datasetName, Map<String, String> arguments) { /* dataset with args */ }
    
    // Batch output configuration
    public static BatchSink.Builder<?, ?, ?> batch() { /* batch output builder */ }
}

// Split specification for parallel processing
public interface Split {
    long getLength();
    List<String> getLocations();
}

Spark Integration

CDAP provides native Apache Spark integration supporting both batch and streaming processing with enhanced dataset access and operational features.

Spark Program Definition

import io.cdap.cdap.api.spark.*;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

// Spark program interface
public interface Spark extends ProgramLifecycle<SparkClientContext> {
    void configure(SparkConfigurer configurer);
}

// Abstract Spark implementation
public abstract class AbstractSpark 
    extends AbstractPluginConfigurable<SparkConfigurer>
    implements Spark {
    
    @Override
    public void initialize(SparkClientContext context) throws Exception {
        // Initialize Spark resources
    }
    
    @Override
    public void destroy() {
        // Cleanup Spark resources
    }
}

// Spark configurer interface
public interface SparkConfigurer 
    extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {
    
    void setMainClassName(String className);
    void setDriverResources(Resources resources);  
    void setExecutorResources(Resources resources);
    void setNumExecutors(int numExecutors);
    void setExecutorCores(int cores);
    void setClientResources(Resources resources);
    
    // Spark configuration
    void setSparkConf(Map<String, String> sparkConf);
    void addSparkConf(String key, String value);
}

Spark Context

// Spark client context
public interface SparkClientContext 
    extends SchedulableProgramContext, RuntimeContext, DatasetContext, 
            ServiceDiscoverer, PluginContext, LineageRecorder {
    
    // Spark session access
    SparkSession getSparkSession();
    JavaSparkContext getOriginalSparkContext();
    
    // Dataset integration  
    <K, V> JavaPairRDD<K, V> fromDataset(String datasetName);
    <K, V> JavaPairRDD<K, V> fromDataset(String datasetName, Map<String, String> arguments);
    <K, V> JavaPairRDD<K, V> fromDataset(String datasetName, Split... splits);
    
    void saveAsDataset(JavaPairRDD<?, ?> rdd, String datasetName);
    void saveAsDataset(JavaPairRDD<?, ?> rdd, String datasetName, Map<String, String> arguments);
    
    // Localization
    Map<String, LocalizeResource> getResourcesToLocalize();
    void localize(String name, URI uri);
    void localize(String name, URI uri, boolean archive);
}

Spark Implementation Examples

// Basic Spark program
public class DataTransformationSpark extends AbstractSpark {
    
    @Override
    public void configure(SparkConfigurer configurer) {
        configurer.setName("DataTransformationSpark");
        configurer.setDescription("Transforms raw data using Spark");
        configurer.setMainClassName(DataTransformationSpark.class.getName());
        
        // Resource configuration
        configurer.setDriverResources(new Resources(1024, 2));
        configurer.setExecutorResources(new Resources(2048, 2));
        configurer.setNumExecutors(4);
        
        // Spark configuration
        configurer.addSparkConf("spark.sql.adaptive.enabled", "true");
        configurer.addSparkConf("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    }
    
    @Override
    public void run(SparkClientContext context) throws Exception {
        SparkSession spark = context.getSparkSession();
        
        // Read data from CDAP dataset
        JavaPairRDD<byte[], Row> inputRDD = context.fromDataset("raw_data");
        
        // Transform to DataFrame for SQL operations
        Dataset<Row> inputDF = spark.createDataFrame(
            inputRDD.map(tuple -> {
                Row row = tuple._2();
                return RowFactory.create(
                    row.getString("id"),
                    row.getString("name"),
                    row.getLong("timestamp"),
                    row.getDouble("value")
                );
            }).rdd(),
            DataTypes.createStructType(Arrays.asList(
                DataTypes.createStructField("id", DataTypes.StringType, false),
                DataTypes.createStructField("name", DataTypes.StringType, false),
                DataTypes.createStructField("timestamp", DataTypes.LongType, false),
                DataTypes.createStructField("value", DataTypes.DoubleType, false)
            ))
        );
        
        // Register temporary view for SQL
        inputDF.createOrReplaceTempView("raw_data");
        
        // Perform transformations using Spark SQL
        Dataset<Row> transformedDF = spark.sql(
            "SELECT " +
            "  id, " +
            "  name, " +
            "  DATE(FROM_UNIXTIME(timestamp/1000)) as date, " +
            "  ROUND(value * 1.1, 2) as adjusted_value, " +
            "  CASE " +
            "    WHEN value > 100 THEN 'high' " +
            "    WHEN value > 50 THEN 'medium' " +
            "    ELSE 'low' " +
            "  END as category " +
            "FROM raw_data " +
            "WHERE value IS NOT NULL AND value > 0"
        );
        
        // Convert back to RDD for dataset output
        JavaPairRDD<byte[], Put> outputRDD = transformedDF.javaRDD().mapToPair(row -> {
            String id = row.getAs("id");
            byte[] key = Bytes.toBytes(id);
            
            Put put = new Put(key);
            put.add("data", "name", row.getAs("name"));
            put.add("data", "date", row.getAs("date").toString());
            put.add("data", "adjusted_value", row.getAs("adjusted_value"));
            put.add("data", "category", row.getAs("category"));
            
            return new Tuple2<>(key, put);
        });
        
        // Save to CDAP dataset
        context.saveAsDataset(outputRDD, "transformed_data");
    }
}

// Advanced Spark program with streaming
public class RealTimeAnalyticsSpark extends AbstractSpark {
    
    @Override
    public void configure(SparkConfigurer configurer) {
        configurer.setName("RealTimeAnalyticsSpark");
        configurer.setDescription("Real-time analytics using Spark Streaming");
        configurer.setMainClassName(RealTimeAnalyticsSpark.class.getName());
        
        // Streaming configuration
        configurer.addSparkConf("spark.streaming.stopGracefullyOnShutdown", "true");
        configurer.addSparkConf("spark.sql.streaming.checkpointLocation", "/tmp/spark-checkpoint");
    }
    
    @Override
    public void run(SparkClientContext context) throws Exception {
        SparkSession spark = context.getSparkSession();
        
        // Read streaming data (example with Kafka integration)
        Dataset<Row> streamingDF = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("subscribe", "events")
            .load();
        
        // Parse JSON data
        Dataset<Row> eventsDF = streamingDF
            .select(from_json(col("value").cast("string"), getEventSchema()).as("event"))
            .select("event.*")
            .withWatermark("timestamp", "10 minutes");
        
        // Perform windowed aggregations
        Dataset<Row> aggregatedDF = eventsDF
            .groupBy(
                window(col("timestamp"), "5 minutes"),
                col("event_type"),
                col("user_id")
            )
            .agg(
                count("*").as("event_count"),
                avg("duration").as("avg_duration"),
                max("duration").as("max_duration")
            );
        
        // Write results to CDAP dataset using foreachBatch
        StreamingQuery query = aggregatedDF
            .writeStream()
            .outputMode("update")
            .foreachBatch((Dataset<Row> batchDF, Long batchId) -> {
                // Convert DataFrame to RDD for dataset output
                JavaPairRDD<byte[], Put> outputRDD = batchDF.javaRDD().mapToPair(row -> {
                    String key = String.format("%s_%s_%d", 
                        row.getAs("user_id"),
                        row.getAs("event_type"), 
                        row.getStruct(0).getLong(0) // window start
                    );
                    
                    Put put = new Put(Bytes.toBytes(key));
                    put.add("stats", "event_count", row.getAs("event_count"));
                    put.add("stats", "avg_duration", row.getAs("avg_duration"));
                    put.add("stats", "max_duration", row.getAs("max_duration"));
                    put.add("stats", "batch_id", batchId);
                    
                    return new Tuple2<>(Bytes.toBytes(key), put);
                });
                
                context.saveAsDataset(outputRDD, "real_time_analytics");
            })
            .start();
        
        // Wait for termination
        query.awaitTermination();
    }
    
    private StructType getEventSchema() {
        return DataTypes.createStructType(Arrays.asList(
            DataTypes.createStructField("user_id", DataTypes.StringType, false),
            DataTypes.createStructField("event_type", DataTypes.StringType, false),
            DataTypes.createStructField("timestamp", DataTypes.TimestampType, false),
            DataTypes.createStructField("duration", DataTypes.DoubleType, true)
        ));
    }
}

Spark SQL Integration

// Using Spark SQL with CDAP datasets
public class SparkSQLAnalytics extends AbstractSpark {
    
    @Override 
    public void run(SparkClientContext context) throws Exception {
        SparkSession spark = context.getSparkSession();
        
        // Load multiple datasets as DataFrames
        Dataset<Row> usersDF = loadDatasetAsDF(context, spark, "users", getUserSchema());
        Dataset<Row> ordersDF = loadDatasetAsDF(context, spark, "orders", getOrderSchema());
        Dataset<Row> productsDF = loadDatasetAsDF(context, spark, "products", getProductSchema());
        
        // Register as temporary views
        usersDF.createOrReplaceTempView("users");
        ordersDF.createOrReplaceTempView("orders"); 
        productsDF.createOrReplaceTempView("products");
        
        // Complex analytical queries
        Dataset<Row> customerAnalytics = spark.sql(
            "SELECT " +
            "  u.user_id, " +
            "  u.name, " +
            "  COUNT(o.order_id) as total_orders, " +
            "  SUM(o.total_amount) as total_spent, " +
            "  AVG(o.total_amount) as avg_order_value, " +
            "  COLLECT_LIST(DISTINCT p.category) as purchased_categories, " +
            "  DATEDIFF(CURRENT_DATE(), MAX(o.order_date)) as days_since_last_order " +
            "FROM users u " +
            "LEFT JOIN orders o ON u.user_id = o.user_id " +
            "LEFT JOIN products p ON o.product_id = p.product_id " +
            "GROUP BY u.user_id, u.name " +
            "HAVING total_orders > 0 " +
            "ORDER BY total_spent DESC"
        );
        
        // Save results
        saveDataFrameAsDataset(context, customerAnalytics, "customer_analytics");
        
        // Create customer segments
        Dataset<Row> customerSegments = spark.sql(
            "SELECT " +
            "  user_id, " +
            "  name, " +
            "  total_spent, " +
            "  CASE " +
            "    WHEN total_spent > 1000 THEN 'VIP' " +
            "    WHEN total_spent > 500 THEN 'Premium' " +
            "    WHEN total_spent > 100 THEN 'Regular' " +
            "    ELSE 'Basic' " +
            "  END as segment, " +
            "  CASE " +
            "    WHEN days_since_last_order <= 30 THEN 'Active' " +
            "    WHEN days_since_last_order <= 90 THEN 'At Risk' " +
            "    ELSE 'Churned' " +
            "  END as status " +
            "FROM customer_analytics"
        );
        
        customerSegments.createOrReplaceTempView("customer_segments");
        saveDataFrameAsDataset(context, customerSegments, "customer_segments");
    }
    
    private Dataset<Row> loadDatasetAsDF(SparkClientContext context, SparkSession spark, 
                                       String datasetName, StructType schema) {
        JavaPairRDD<byte[], Row> rdd = context.fromDataset(datasetName);
        return spark.createDataFrame(rdd.map(Tuple2::_2).rdd(), schema);
    }
    
    private void saveDataFrameAsDataset(SparkClientContext context, Dataset<Row> df, 
                                      String datasetName) {
        JavaPairRDD<byte[], Put> outputRDD = df.javaRDD().mapToPair(row -> {
            String key = row.getAs("user_id").toString();
            Put put = new Put(Bytes.toBytes(key));
            
            // Add all columns to the Put
            for (String fieldName : row.schema().fieldNames()) {
                Object value = row.getAs(fieldName);
                if (value != null) {
                    put.add("data", fieldName, value.toString());
                }
            }
            
            return new Tuple2<>(Bytes.toBytes(key), put);
        });
        
        context.saveAsDataset(outputRDD, datasetName);
    }
}

Custom Actions for Data Processing

Custom actions can be used in workflows for specialized data processing tasks:

import io.cdap.cdap.api.customaction.*;

// Custom data processing action
public class DataQualityCheckAction extends AbstractCustomAction {
    
    @Override
    public void configure(CustomActionConfigurer configurer) {
        configurer.setName("DataQualityCheck");
        configurer.setDescription("Validates data quality and sets processing flags");
    }
    
    @Override
    public void run(CustomActionContext context) throws Exception {
        Table inputData = context.getDataset("raw_data");
        WorkflowToken token = context.getWorkflowToken();
        Metrics metrics = context.getMetrics();
        
        // Perform comprehensive data quality checks
        DataQualityResults results = performQualityChecks(inputData);
        
        // Store results in workflow token for downstream processing
        token.put("dq.total_records", String.valueOf(results.getTotalRecords()));
        token.put("dq.valid_records", String.valueOf(results.getValidRecords()));
        token.put("dq.error_rate", String.valueOf(results.getErrorRate()));
        token.put("dq.null_rate", String.valueOf(results.getNullRate()));
        token.put("dq.duplicate_rate", String.valueOf(results.getDuplicateRate()));
        
        // Emit metrics for monitoring
        metrics.gauge("data_quality.error_rate", results.getErrorRate());
        metrics.gauge("data_quality.null_rate", results.getNullRate());
        metrics.gauge("data_quality.duplicate_rate", results.getDuplicateRate());
        metrics.count("data_quality.total_records", results.getTotalRecords());
        
        // Determine if processing should continue
        boolean canProceed = results.getErrorRate() < 0.05 && 
                           results.getNullRate() < 0.1 && 
                           results.getDuplicateRate() < 0.02;
        
        token.put("dq.can_proceed", String.valueOf(canProceed));
        
        if (!canProceed) {
            // Optionally fail the action to stop workflow
            throw new RuntimeException(
                String.format("Data quality check failed: error_rate=%.2f%%, null_rate=%.2f%%, duplicate_rate=%.2f%%",
                    results.getErrorRate() * 100,
                    results.getNullRate() * 100, 
                    results.getDuplicateRate() * 100)
            );
        }
    }
    
    private DataQualityResults performQualityChecks(Table inputData) {
        // Implementation of data quality validation logic
        return new DataQualityResults();
    }
}

// Data processing coordination action
public class ProcessingCoordinatorAction extends AbstractCustomAction {
    
    @Override
    public void run(CustomActionContext context) throws Exception {
        WorkflowToken token = context.getWorkflowToken();
        
        // Read processing parameters from token
        long totalRecords = Long.parseLong(token.get("dq.total_records").toString());
        double errorRate = Double.parseDouble(token.get("dq.error_rate").toString());
        
        // Determine optimal processing strategy
        ProcessingStrategy strategy = determineStrategy(totalRecords, errorRate);
        
        // Configure downstream processing
        token.put("processing.strategy", strategy.name());
        token.put("processing.batch_size", String.valueOf(strategy.getBatchSize()));
        token.put("processing.parallel_level", String.valueOf(strategy.getParallelism()));
        
        context.getMetrics().gauge("processing.batch_size", strategy.getBatchSize());
    }
    
    private ProcessingStrategy determineStrategy(long totalRecords, double errorRate) {
        if (totalRecords > 10_000_000) {
            return ProcessingStrategy.LARGE_BATCH;
        } else if (totalRecords > 1_000_000) {
            return ProcessingStrategy.MEDIUM_BATCH;
        } else {
            return ProcessingStrategy.SMALL_BATCH;
        }
    }
}

The CDAP data processing framework provides enterprise-grade capabilities for both batch and streaming data processing, with seamless integration between MapReduce, Spark, and CDAP's dataset and operational features.

Install with Tessl CLI

npx tessl i tessl/maven-io-cdap-cdap--cdap

docs

application-framework.md

data-management.md

data-processing.md

index.md

operational.md

plugin-system.md

security-metadata.md

tile.json