CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-cdap-cdap--cdap-etl-api

CDAP ETL API provides comprehensive abstractions for building Extract, Transform, and Load pipeline applications on the CDAP platform

Pending
Overview
Eval results
Files

batch-processing.mddocs/

Batch Processing

Comprehensive batch processing capabilities including batch sources, sinks, aggregators, joiners, and post-actions for large-scale data processing in CDAP ETL pipelines.

Batch Sources

BatchSource<KEY_IN, VAL_IN, OUT>

Base abstract class for batch data sources that read from external systems.

package io.cdap.cdap.etl.api.batch;

public abstract class BatchSource<KEY_IN, VAL_IN, OUT> 
    extends BatchConfigurable<BatchSourceContext>
    implements Transformation<KeyValue<KEY_IN, VAL_IN>, OUT>,
               StageLifecycle<BatchRuntimeContext> {
    
    public static final String PLUGIN_TYPE = "batchsource";
    public static final String FORMAT_PLUGIN_TYPE = "inputformat";
    
    // Lifecycle methods
    public void initialize(BatchRuntimeContext context) throws Exception {}
    public void destroy() {}
    
    // Data transformation
    public void transform(KeyValue<KEY_IN, VAL_IN> input, Emitter<OUT> emitter) 
        throws Exception {}
}

Usage Example:

@Plugin(type = BatchSource.PLUGIN_TYPE)
@Name("FileSource")
@Description("Reads data from files")
public class FileSource extends BatchSource<LongWritable, Text, StructuredRecord> {
    
    private final Config config;
    private Schema schema;
    
    @Override
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
        
        // Validate configuration
        config.validate(stageConfigurer.getFailureCollector());
        
        // Set output schema
        schema = Schema.parseJson(config.schema);
        stageConfigurer.setOutputSchema(schema);
    }
    
    @Override
    public void prepareRun(BatchSourceContext context) throws Exception {
        // Configure input format and input paths
        Job job = context.getHadoopJob();
        Configuration conf = job.getConfiguration();
        
        TextInputFormat.setInputPaths(job, config.path);
        context.setInput(Input.of(config.referenceName, 
                         new SourceInputFormatProvider(TextInputFormat.class, conf)));
    }
    
    @Override
    public void initialize(BatchRuntimeContext context) throws Exception {
        schema = context.getOutputSchema();
    }
    
    @Override
    public void transform(KeyValue<LongWritable, Text> input, 
                         Emitter<StructuredRecord> emitter) throws Exception {
        String line = input.getValue().toString();
        String[] fields = line.split(config.delimiter);
        
        StructuredRecord.Builder builder = StructuredRecord.builder(schema);
        List<Schema.Field> schemaFields = schema.getFields();
        
        for (int i = 0; i < Math.min(fields.length, schemaFields.size()); i++) {
            Schema.Field field = schemaFields.get(i);
            builder.set(field.getName(), convertValue(fields[i], field.getSchema()));
        }
        
        emitter.emit(builder.build());
    }
}

Batch Sinks

BatchSink<IN, KEY_OUT, VAL_OUT>

Base abstract class for batch data sinks that write to external systems.

package io.cdap.cdap.etl.api.batch;

public abstract class BatchSink<IN, KEY_OUT, VAL_OUT> 
    extends BatchConfigurable<BatchSinkContext>
    implements Transformation<IN, KeyValue<KEY_OUT, VAL_OUT>>,
               StageLifecycle<BatchRuntimeContext> {
    
    public static final String PLUGIN_TYPE = "batchsink";
    public static final String FORMAT_PLUGIN_TYPE = "outputformat";
    
    // Lifecycle methods
    public void initialize(BatchRuntimeContext context) throws Exception {}
    public void destroy() {}
    
    // Data transformation
    public void transform(IN input, Emitter<KeyValue<KEY_OUT, VAL_OUT>> emitter) 
        throws Exception {}
}

Usage Example:

@Plugin(type = BatchSink.PLUGIN_TYPE)
@Name("FileSink")
@Description("Writes data to files")
public class FileSink extends BatchSink<StructuredRecord, NullWritable, Text> {
    
    private final Config config;
    
    @Override
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
        config.validate(stageConfigurer.getFailureCollector());
    }
    
    @Override
    public void prepareRun(BatchSinkContext context) throws Exception {
        Job job = context.getHadoopJob();
        Configuration conf = job.getConfiguration();
        
        TextOutputFormat.setOutputPath(job, new Path(config.path));
        context.addOutput(Output.of(config.referenceName,
                         new SinkOutputFormatProvider(TextOutputFormat.class, conf)));
    }
    
    @Override
    public void transform(StructuredRecord input, 
                         Emitter<KeyValue<NullWritable, Text>> emitter) throws Exception {
        StringBuilder line = new StringBuilder();
        
        List<Schema.Field> fields = input.getSchema().getFields();
        for (int i = 0; i < fields.size(); i++) {
            if (i > 0) line.append(config.delimiter);
            
            Object value = input.get(fields.get(i).getName());
            line.append(value != null ? value.toString() : "");
        }
        
        emitter.emit(new KeyValue<>(NullWritable.get(), new Text(line.toString())));
    }
}

Batch Configuration Base Classes

BatchConfigurable<T>

Base abstract class for batch stage configuration providing common lifecycle methods.

package io.cdap.cdap.etl.api.batch;

public abstract class BatchConfigurable<T> 
    implements PipelineConfigurable, SubmitterLifecycle<T> {
    
    // Pipeline configuration
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {}
    
    // Submission lifecycle
    public abstract void prepareRun(T context) throws Exception;
    public void onRunFinish(boolean succeeded, T context) {}
}

MultiInputBatchConfigurable<T>

Base class for batch stages with multiple inputs.

package io.cdap.cdap.etl.api.batch;

public abstract class MultiInputBatchConfigurable<T> 
    implements MultiInputPipelineConfigurable, SubmitterLifecycle<T> {
    
    public void configurePipeline(MultiInputPipelineConfigurer multiInputPipelineConfigurer) {}
    public abstract void prepareRun(T context) throws Exception;
    public void onRunFinish(boolean succeeded, T context) {}
}

Batch Aggregators

BatchAggregator<GROUP_KEY, GROUP_VALUE, OUT>

Batch implementation of aggregation operations.

package io.cdap.cdap.etl.api.batch;

public abstract class BatchAggregator<GROUP_KEY, GROUP_VALUE, OUT> 
    extends BatchConfigurable<BatchAggregatorContext>
    implements Aggregator<GROUP_KEY, GROUP_VALUE, OUT>,
               StageLifecycle<BatchRuntimeContext> {
    
    public static final String PLUGIN_TYPE = "batchaggregator";
    
    // Lifecycle methods
    public void initialize(BatchRuntimeContext context) throws Exception {}
    public void destroy() {}
    
    // Aggregation methods
    public abstract void groupBy(GROUP_VALUE groupValue, Emitter<GROUP_KEY> emitter) 
        throws Exception;
    public abstract void aggregate(GROUP_KEY groupKey, Iterator<GROUP_VALUE> groupValues, 
                                  Emitter<OUT> emitter) throws Exception;
}

Usage Example:

@Plugin(type = BatchAggregator.PLUGIN_TYPE)
@Name("SalesAggregator")
public class SalesAggregator extends BatchAggregator<String, StructuredRecord, StructuredRecord> {
    
    private Schema outputSchema;
    
    @Override
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        outputSchema = Schema.recordOf("sales_summary",
            Schema.Field.of("region", Schema.of(Schema.Type.STRING)),
            Schema.Field.of("total_sales", Schema.of(Schema.Type.DOUBLE)),
            Schema.Field.of("order_count", Schema.of(Schema.Type.INT)),
            Schema.Field.of("avg_order_value", Schema.of(Schema.Type.DOUBLE))
        );
        pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema);
    }
    
    @Override
    public void prepareRun(BatchAggregatorContext context) throws Exception {
        context.setNumPartitions(10); // Optimize parallelism
    }
    
    @Override
    public void groupBy(StructuredRecord groupValue, Emitter<String> emitter) throws Exception {
        String region = groupValue.get("region");
        emitter.emit(region);
    }
    
    @Override
    public void aggregate(String groupKey, Iterator<StructuredRecord> groupValues,
                         Emitter<StructuredRecord> emitter) throws Exception {
        double totalSales = 0.0;
        int orderCount = 0;
        
        while (groupValues.hasNext()) {
            StructuredRecord record = groupValues.next();
            Double sales = record.get("sales_amount");
            if (sales != null) {
                totalSales += sales;
                orderCount++;
            }
        }
        
        double avgOrderValue = orderCount > 0 ? totalSales / orderCount : 0.0;
        
        StructuredRecord result = StructuredRecord.builder(outputSchema)
            .set("region", groupKey)
            .set("total_sales", totalSales)
            .set("order_count", orderCount)
            .set("avg_order_value", avgOrderValue)
            .build();
            
        emitter.emit(result);
    }
}

BatchReducibleAggregator<GROUP_KEY, GROUP_VALUE, AGGREGATE_VALUE, OUT>

Batch aggregator with reducible intermediate values for improved performance.

package io.cdap.cdap.etl.api.batch;

public abstract class BatchReducibleAggregator<GROUP_KEY, GROUP_VALUE, AGGREGATE_VALUE, OUT> 
    extends BatchConfigurable<BatchAggregatorContext>
    implements ReducibleAggregator<GROUP_KEY, GROUP_VALUE, AGGREGATE_VALUE, OUT>,
               StageLifecycle<BatchRuntimeContext> {
    
    public static final String PLUGIN_TYPE = "batchaggregator";
}

Batch Joiners

BatchJoiner<JOIN_KEY, INPUT_RECORD, OUT>

Batch implementation of join operations.

package io.cdap.cdap.etl.api.batch;

public abstract class BatchJoiner<JOIN_KEY, INPUT_RECORD, OUT> 
    extends BatchConfigurable<BatchJoinerContext>
    implements Joiner<JOIN_KEY, INPUT_RECORD, OUT>,
               StageLifecycle<BatchJoinerRuntimeContext> {
    
    public static final String PLUGIN_TYPE = "batchjoiner";
    
    // Lifecycle methods
    public void initialize(BatchJoinerRuntimeContext context) throws Exception {}
    public void destroy() {}
    
    // Join methods
    public abstract Collection<JOIN_KEY> getJoinKeys(String stageName, INPUT_RECORD inputRecord) 
        throws Exception;
    public abstract JoinConfig getJoinConfig() throws Exception;
    public abstract OUT merge(JOIN_KEY joinKey, Iterable<JoinElement<INPUT_RECORD>> joinResult) 
        throws Exception;
}

BatchAutoJoiner

Auto-joiner implementation for batch processing with automatic join optimization.

package io.cdap.cdap.etl.api.batch;

public abstract class BatchAutoJoiner 
    extends BatchConfigurable<BatchJoinerContext>
    implements AutoJoiner, StageLifecycle<BatchJoinerRuntimeContext> {
    
    public static final String PLUGIN_TYPE = "batchjoiner";
}

Usage Example:

@Plugin(type = BatchAutoJoiner.PLUGIN_TYPE)
@Name("CustomerOrderJoiner")
public class CustomerOrderJoiner extends BatchAutoJoiner {
    
    @Override
    public JoinDefinition define(AutoJoinerContext context) {
        return JoinDefinition.builder()
            .select(Arrays.asList(
                new JoinField("customers", "customer_id", "customer_id"),
                new JoinField("customers", "name", "customer_name"),
                new JoinField("orders", "order_id", "order_id"),
                new JoinField("orders", "amount", "order_amount")
            ))
            .from(Arrays.asList(
                new JoinStage("customers", JoinType.REQUIRED, 
                            Collections.emptyList(), true, false),
                new JoinStage("orders", JoinType.OUTER, 
                            Collections.emptyList(), false, false)
            ))
            .on(JoinCondition.onKeys()
                .addKey(new JoinKey("customers", Arrays.asList("customer_id")))
                .addKey(new JoinKey("orders", Arrays.asList("customer_id"))))
            .build();
    }
}

Batch Contexts

BatchContext

Base context interface for batch operations.

package io.cdap.cdap.etl.api.batch;

public interface BatchContext extends StageSubmitterContext {
    // Base context for batch operations
    // Inherits arguments and metrics from StageSubmitterContext
}

BatchRuntimeContext

Runtime context for batch stages providing access to runtime services.

package io.cdap.cdap.etl.api.batch;

public interface BatchRuntimeContext extends StageContext, LookupProvider {
    // Combines stage context with lookup capabilities
    // Provides access to:
    // - Runtime arguments and metrics
    // - Plugin instantiation
    // - Service discovery
    // - Data lookups
}

BatchSourceContext

Context for batch source operations.

package io.cdap.cdap.etl.api.batch;

public interface BatchSourceContext extends BatchContext {
    /**
     * Set input for the batch source.
     */
    void setInput(Input input);
    
    /**
     * Check if preview mode is enabled.
     */
    boolean isPreviewEnabled();
    
    /**
     * Set error dataset for invalid records.
     */
    void setErrorDataset(String errorDatasetName);
}

BatchSinkContext

Context for batch sink operations.

package io.cdap.cdap.etl.api.batch;

public interface BatchSinkContext extends BatchContext {
    /**
     * Add output for the batch sink.
     */
    void addOutput(Output output);
    
    /**
     * Add named output for the batch sink.
     */
    void addOutput(String outputName, Output output);
    
    /**
     * Check if preview mode is enabled.
     */
    boolean isPreviewEnabled();
}

BatchAggregatorContext

Context for batch aggregator operations.

package io.cdap.cdap.etl.api.batch;

public interface BatchAggregatorContext extends BatchContext {
    /**
     * Set number of partitions for aggregation.
     */
    void setNumPartitions(int numPartitions);
    
    /**
     * Set memory for group-by operations.
     */
    void setGroupByMemoryMB(int memoryMB);
}

BatchJoinerContext

Context for batch joiner operations.

package io.cdap.cdap.etl.api.batch;

public interface BatchJoinerContext extends BatchContext {
    /**
     * Set number of partitions for join operations.
     */
    void setNumPartitions(int numPartitions);
}

BatchJoinerRuntimeContext

Runtime context for batch joiner operations.

package io.cdap.cdap.etl.api.batch;

public interface BatchJoinerRuntimeContext extends BatchRuntimeContext {
    // Combines batch runtime context for join operations
}

Post Actions

PostAction

Abstract class for post-execution actions in batch pipelines.

package io.cdap.cdap.etl.api.batch;

public abstract class PostAction 
    extends BatchConfigurable<BatchActionContext>
    implements StageLifecycle<BatchActionContext> {
    
    public static final String PLUGIN_TYPE = "postaction";
    
    // Lifecycle methods
    public void initialize(BatchActionContext context) throws Exception {}
    public void destroy() {}
    
    // Action execution
    public abstract void run() throws Exception;
}

Usage Example:

@Plugin(type = PostAction.PLUGIN_TYPE)
@Name("EmailNotification")
@Description("Sends email notification after pipeline completion")
public class EmailNotificationAction extends PostAction {
    
    private final Config config;
    
    @Override
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        config.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector());
    }
    
    @Override
    public void prepareRun(BatchActionContext context) throws Exception {
        // Prepare email configuration
    }
    
    @Override
    public void run() throws Exception {
        // Send email notification
        EmailService emailService = new EmailService(config.smtpServer, config.port);
        
        String subject = "Pipeline Execution Completed";
        String body = String.format("Pipeline '%s' completed successfully at %s", 
                                   config.pipelineName, new Date());
        
        emailService.sendEmail(config.recipients, subject, body);
    }
}

Batch Connectors

BatchConnector

Base class for batch connectors.

package io.cdap.cdap.etl.api.batch;

public abstract class BatchConnector extends BatchConfigurable<BatchContext> {
    public static final String PLUGIN_TYPE = "batchconnector";
}

Performance Optimization

Partitioning Control

// In aggregator context
@Override
public void prepareRun(BatchAggregatorContext context) throws Exception {
    // Set optimal number of partitions based on data size
    context.setNumPartitions(calculateOptimalPartitions());
    
    // Set memory for group-by operations
    context.setGroupByMemoryMB(2048);
}

Memory Management

// Configure memory for aggregation operations
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    // Set stage properties for memory optimization
    Map<String, String> properties = new HashMap<>();
    properties.put("spark.executor.memory", "4g");
    properties.put("spark.sql.shuffle.partitions", "200");
    
    pipelineConfigurer.getStageConfigurer().addProperties(properties);
}

Input/Output Optimization

// Optimize input format configuration
@Override
public void prepareRun(BatchSourceContext context) throws Exception {
    Job job = context.getHadoopJob();
    Configuration conf = job.getConfiguration();
    
    // Enable compression
    conf.setBoolean("mapreduce.map.output.compress", true);
    conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
    
    // Set optimal split size
    conf.setLong("mapreduce.input.fileinputformat.split.maxsize", 128 * 1024 * 1024); // 128MB
    
    context.setInput(Input.of(config.referenceName, 
                     new SourceInputFormatProvider(inputFormatClass, conf)));
}

Error Handling in Batch Operations

Error Dataset Configuration

@Override
public void prepareRun(BatchSourceContext context) throws Exception {
    // Configure error dataset for invalid records
    if (config.errorDataset != null) {
        context.setErrorDataset(config.errorDataset);
    }
    
    // Set input with error handling
    context.setInput(Input.of(config.referenceName, inputFormatProvider));
}

Error Record Processing

@Override
public void transform(KeyValue<LongWritable, Text> input, 
                     Emitter<StructuredRecord> emitter) throws Exception {
    try {
        StructuredRecord record = parseRecord(input.getValue().toString());
        emitter.emit(record);
    } catch (Exception e) {
        // Emit error record instead of failing the entire pipeline
        ErrorRecord<String> errorRecord = new ErrorRecord<>(
            input.getValue().toString(), 
            "Failed to parse record: " + e.getMessage()
        );
        emitter.emitError(new InvalidEntry<>(400, errorRecord.getErrorMessage(), errorRecord));
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-io-cdap-cdap--cdap-etl-api

docs

actions-conditions.md

batch-processing.md

core-pipeline.md

data-connectors.md

index.md

join-operations.md

lineage-metadata.md

sql-engine.md

validation.md

tile.json