CDAP ETL API provides comprehensive abstractions for building Extract, Transform, and Load pipeline applications on the CDAP platform
—
Comprehensive batch processing capabilities including batch sources, sinks, aggregators, joiners, and post-actions for large-scale data processing in CDAP ETL pipelines.
Base abstract class for batch data sources that read from external systems.
package io.cdap.cdap.etl.api.batch;
public abstract class BatchSource<KEY_IN, VAL_IN, OUT>
extends BatchConfigurable<BatchSourceContext>
implements Transformation<KeyValue<KEY_IN, VAL_IN>, OUT>,
StageLifecycle<BatchRuntimeContext> {
public static final String PLUGIN_TYPE = "batchsource";
public static final String FORMAT_PLUGIN_TYPE = "inputformat";
// Lifecycle methods
public void initialize(BatchRuntimeContext context) throws Exception {}
public void destroy() {}
// Data transformation
public void transform(KeyValue<KEY_IN, VAL_IN> input, Emitter<OUT> emitter)
throws Exception {}
}Usage Example:
@Plugin(type = BatchSource.PLUGIN_TYPE)
@Name("FileSource")
@Description("Reads data from files")
public class FileSource extends BatchSource<LongWritable, Text, StructuredRecord> {
private final Config config;
private Schema schema;
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
// Validate configuration
config.validate(stageConfigurer.getFailureCollector());
// Set output schema
schema = Schema.parseJson(config.schema);
stageConfigurer.setOutputSchema(schema);
}
@Override
public void prepareRun(BatchSourceContext context) throws Exception {
// Configure input format and input paths
Job job = context.getHadoopJob();
Configuration conf = job.getConfiguration();
TextInputFormat.setInputPaths(job, config.path);
context.setInput(Input.of(config.referenceName,
new SourceInputFormatProvider(TextInputFormat.class, conf)));
}
@Override
public void initialize(BatchRuntimeContext context) throws Exception {
schema = context.getOutputSchema();
}
@Override
public void transform(KeyValue<LongWritable, Text> input,
Emitter<StructuredRecord> emitter) throws Exception {
String line = input.getValue().toString();
String[] fields = line.split(config.delimiter);
StructuredRecord.Builder builder = StructuredRecord.builder(schema);
List<Schema.Field> schemaFields = schema.getFields();
for (int i = 0; i < Math.min(fields.length, schemaFields.size()); i++) {
Schema.Field field = schemaFields.get(i);
builder.set(field.getName(), convertValue(fields[i], field.getSchema()));
}
emitter.emit(builder.build());
}
}Base abstract class for batch data sinks that write to external systems.
package io.cdap.cdap.etl.api.batch;
public abstract class BatchSink<IN, KEY_OUT, VAL_OUT>
extends BatchConfigurable<BatchSinkContext>
implements Transformation<IN, KeyValue<KEY_OUT, VAL_OUT>>,
StageLifecycle<BatchRuntimeContext> {
public static final String PLUGIN_TYPE = "batchsink";
public static final String FORMAT_PLUGIN_TYPE = "outputformat";
// Lifecycle methods
public void initialize(BatchRuntimeContext context) throws Exception {}
public void destroy() {}
// Data transformation
public void transform(IN input, Emitter<KeyValue<KEY_OUT, VAL_OUT>> emitter)
throws Exception {}
}Usage Example:
@Plugin(type = BatchSink.PLUGIN_TYPE)
@Name("FileSink")
@Description("Writes data to files")
public class FileSink extends BatchSink<StructuredRecord, NullWritable, Text> {
private final Config config;
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
config.validate(stageConfigurer.getFailureCollector());
}
@Override
public void prepareRun(BatchSinkContext context) throws Exception {
Job job = context.getHadoopJob();
Configuration conf = job.getConfiguration();
TextOutputFormat.setOutputPath(job, new Path(config.path));
context.addOutput(Output.of(config.referenceName,
new SinkOutputFormatProvider(TextOutputFormat.class, conf)));
}
@Override
public void transform(StructuredRecord input,
Emitter<KeyValue<NullWritable, Text>> emitter) throws Exception {
StringBuilder line = new StringBuilder();
List<Schema.Field> fields = input.getSchema().getFields();
for (int i = 0; i < fields.size(); i++) {
if (i > 0) line.append(config.delimiter);
Object value = input.get(fields.get(i).getName());
line.append(value != null ? value.toString() : "");
}
emitter.emit(new KeyValue<>(NullWritable.get(), new Text(line.toString())));
}
}Base abstract class for batch stage configuration providing common lifecycle methods.
package io.cdap.cdap.etl.api.batch;
public abstract class BatchConfigurable<T>
implements PipelineConfigurable, SubmitterLifecycle<T> {
// Pipeline configuration
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {}
// Submission lifecycle
public abstract void prepareRun(T context) throws Exception;
public void onRunFinish(boolean succeeded, T context) {}
}Base class for batch stages with multiple inputs.
package io.cdap.cdap.etl.api.batch;
public abstract class MultiInputBatchConfigurable<T>
implements MultiInputPipelineConfigurable, SubmitterLifecycle<T> {
public void configurePipeline(MultiInputPipelineConfigurer multiInputPipelineConfigurer) {}
public abstract void prepareRun(T context) throws Exception;
public void onRunFinish(boolean succeeded, T context) {}
}Batch implementation of aggregation operations.
package io.cdap.cdap.etl.api.batch;
public abstract class BatchAggregator<GROUP_KEY, GROUP_VALUE, OUT>
extends BatchConfigurable<BatchAggregatorContext>
implements Aggregator<GROUP_KEY, GROUP_VALUE, OUT>,
StageLifecycle<BatchRuntimeContext> {
public static final String PLUGIN_TYPE = "batchaggregator";
// Lifecycle methods
public void initialize(BatchRuntimeContext context) throws Exception {}
public void destroy() {}
// Aggregation methods
public abstract void groupBy(GROUP_VALUE groupValue, Emitter<GROUP_KEY> emitter)
throws Exception;
public abstract void aggregate(GROUP_KEY groupKey, Iterator<GROUP_VALUE> groupValues,
Emitter<OUT> emitter) throws Exception;
}Usage Example:
@Plugin(type = BatchAggregator.PLUGIN_TYPE)
@Name("SalesAggregator")
public class SalesAggregator extends BatchAggregator<String, StructuredRecord, StructuredRecord> {
private Schema outputSchema;
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
outputSchema = Schema.recordOf("sales_summary",
Schema.Field.of("region", Schema.of(Schema.Type.STRING)),
Schema.Field.of("total_sales", Schema.of(Schema.Type.DOUBLE)),
Schema.Field.of("order_count", Schema.of(Schema.Type.INT)),
Schema.Field.of("avg_order_value", Schema.of(Schema.Type.DOUBLE))
);
pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema);
}
@Override
public void prepareRun(BatchAggregatorContext context) throws Exception {
context.setNumPartitions(10); // Optimize parallelism
}
@Override
public void groupBy(StructuredRecord groupValue, Emitter<String> emitter) throws Exception {
String region = groupValue.get("region");
emitter.emit(region);
}
@Override
public void aggregate(String groupKey, Iterator<StructuredRecord> groupValues,
Emitter<StructuredRecord> emitter) throws Exception {
double totalSales = 0.0;
int orderCount = 0;
while (groupValues.hasNext()) {
StructuredRecord record = groupValues.next();
Double sales = record.get("sales_amount");
if (sales != null) {
totalSales += sales;
orderCount++;
}
}
double avgOrderValue = orderCount > 0 ? totalSales / orderCount : 0.0;
StructuredRecord result = StructuredRecord.builder(outputSchema)
.set("region", groupKey)
.set("total_sales", totalSales)
.set("order_count", orderCount)
.set("avg_order_value", avgOrderValue)
.build();
emitter.emit(result);
}
}Batch aggregator with reducible intermediate values for improved performance.
package io.cdap.cdap.etl.api.batch;
public abstract class BatchReducibleAggregator<GROUP_KEY, GROUP_VALUE, AGGREGATE_VALUE, OUT>
extends BatchConfigurable<BatchAggregatorContext>
implements ReducibleAggregator<GROUP_KEY, GROUP_VALUE, AGGREGATE_VALUE, OUT>,
StageLifecycle<BatchRuntimeContext> {
public static final String PLUGIN_TYPE = "batchaggregator";
}Batch implementation of join operations.
package io.cdap.cdap.etl.api.batch;
public abstract class BatchJoiner<JOIN_KEY, INPUT_RECORD, OUT>
extends BatchConfigurable<BatchJoinerContext>
implements Joiner<JOIN_KEY, INPUT_RECORD, OUT>,
StageLifecycle<BatchJoinerRuntimeContext> {
public static final String PLUGIN_TYPE = "batchjoiner";
// Lifecycle methods
public void initialize(BatchJoinerRuntimeContext context) throws Exception {}
public void destroy() {}
// Join methods
public abstract Collection<JOIN_KEY> getJoinKeys(String stageName, INPUT_RECORD inputRecord)
throws Exception;
public abstract JoinConfig getJoinConfig() throws Exception;
public abstract OUT merge(JOIN_KEY joinKey, Iterable<JoinElement<INPUT_RECORD>> joinResult)
throws Exception;
}Auto-joiner implementation for batch processing with automatic join optimization.
package io.cdap.cdap.etl.api.batch;
public abstract class BatchAutoJoiner
extends BatchConfigurable<BatchJoinerContext>
implements AutoJoiner, StageLifecycle<BatchJoinerRuntimeContext> {
public static final String PLUGIN_TYPE = "batchjoiner";
}Usage Example:
@Plugin(type = BatchAutoJoiner.PLUGIN_TYPE)
@Name("CustomerOrderJoiner")
public class CustomerOrderJoiner extends BatchAutoJoiner {
@Override
public JoinDefinition define(AutoJoinerContext context) {
return JoinDefinition.builder()
.select(Arrays.asList(
new JoinField("customers", "customer_id", "customer_id"),
new JoinField("customers", "name", "customer_name"),
new JoinField("orders", "order_id", "order_id"),
new JoinField("orders", "amount", "order_amount")
))
.from(Arrays.asList(
new JoinStage("customers", JoinType.REQUIRED,
Collections.emptyList(), true, false),
new JoinStage("orders", JoinType.OUTER,
Collections.emptyList(), false, false)
))
.on(JoinCondition.onKeys()
.addKey(new JoinKey("customers", Arrays.asList("customer_id")))
.addKey(new JoinKey("orders", Arrays.asList("customer_id"))))
.build();
}
}Base context interface for batch operations.
package io.cdap.cdap.etl.api.batch;
public interface BatchContext extends StageSubmitterContext {
// Base context for batch operations
// Inherits arguments and metrics from StageSubmitterContext
}Runtime context for batch stages providing access to runtime services.
package io.cdap.cdap.etl.api.batch;
public interface BatchRuntimeContext extends StageContext, LookupProvider {
// Combines stage context with lookup capabilities
// Provides access to:
// - Runtime arguments and metrics
// - Plugin instantiation
// - Service discovery
// - Data lookups
}Context for batch source operations.
package io.cdap.cdap.etl.api.batch;
public interface BatchSourceContext extends BatchContext {
/**
* Set input for the batch source.
*/
void setInput(Input input);
/**
* Check if preview mode is enabled.
*/
boolean isPreviewEnabled();
/**
* Set error dataset for invalid records.
*/
void setErrorDataset(String errorDatasetName);
}Context for batch sink operations.
package io.cdap.cdap.etl.api.batch;
public interface BatchSinkContext extends BatchContext {
/**
* Add output for the batch sink.
*/
void addOutput(Output output);
/**
* Add named output for the batch sink.
*/
void addOutput(String outputName, Output output);
/**
* Check if preview mode is enabled.
*/
boolean isPreviewEnabled();
}Context for batch aggregator operations.
package io.cdap.cdap.etl.api.batch;
public interface BatchAggregatorContext extends BatchContext {
/**
* Set number of partitions for aggregation.
*/
void setNumPartitions(int numPartitions);
/**
* Set memory for group-by operations.
*/
void setGroupByMemoryMB(int memoryMB);
}Context for batch joiner operations.
package io.cdap.cdap.etl.api.batch;
public interface BatchJoinerContext extends BatchContext {
/**
* Set number of partitions for join operations.
*/
void setNumPartitions(int numPartitions);
}Runtime context for batch joiner operations.
package io.cdap.cdap.etl.api.batch;
public interface BatchJoinerRuntimeContext extends BatchRuntimeContext {
// Combines batch runtime context for join operations
}Abstract class for post-execution actions in batch pipelines.
package io.cdap.cdap.etl.api.batch;
public abstract class PostAction
extends BatchConfigurable<BatchActionContext>
implements StageLifecycle<BatchActionContext> {
public static final String PLUGIN_TYPE = "postaction";
// Lifecycle methods
public void initialize(BatchActionContext context) throws Exception {}
public void destroy() {}
// Action execution
public abstract void run() throws Exception;
}Usage Example:
@Plugin(type = PostAction.PLUGIN_TYPE)
@Name("EmailNotification")
@Description("Sends email notification after pipeline completion")
public class EmailNotificationAction extends PostAction {
private final Config config;
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
config.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector());
}
@Override
public void prepareRun(BatchActionContext context) throws Exception {
// Prepare email configuration
}
@Override
public void run() throws Exception {
// Send email notification
EmailService emailService = new EmailService(config.smtpServer, config.port);
String subject = "Pipeline Execution Completed";
String body = String.format("Pipeline '%s' completed successfully at %s",
config.pipelineName, new Date());
emailService.sendEmail(config.recipients, subject, body);
}
}Base class for batch connectors.
package io.cdap.cdap.etl.api.batch;
public abstract class BatchConnector extends BatchConfigurable<BatchContext> {
public static final String PLUGIN_TYPE = "batchconnector";
}// In aggregator context
@Override
public void prepareRun(BatchAggregatorContext context) throws Exception {
// Set optimal number of partitions based on data size
context.setNumPartitions(calculateOptimalPartitions());
// Set memory for group-by operations
context.setGroupByMemoryMB(2048);
}// Configure memory for aggregation operations
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
// Set stage properties for memory optimization
Map<String, String> properties = new HashMap<>();
properties.put("spark.executor.memory", "4g");
properties.put("spark.sql.shuffle.partitions", "200");
pipelineConfigurer.getStageConfigurer().addProperties(properties);
}// Optimize input format configuration
@Override
public void prepareRun(BatchSourceContext context) throws Exception {
Job job = context.getHadoopJob();
Configuration conf = job.getConfiguration();
// Enable compression
conf.setBoolean("mapreduce.map.output.compress", true);
conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
// Set optimal split size
conf.setLong("mapreduce.input.fileinputformat.split.maxsize", 128 * 1024 * 1024); // 128MB
context.setInput(Input.of(config.referenceName,
new SourceInputFormatProvider(inputFormatClass, conf)));
}@Override
public void prepareRun(BatchSourceContext context) throws Exception {
// Configure error dataset for invalid records
if (config.errorDataset != null) {
context.setErrorDataset(config.errorDataset);
}
// Set input with error handling
context.setInput(Input.of(config.referenceName, inputFormatProvider));
}@Override
public void transform(KeyValue<LongWritable, Text> input,
Emitter<StructuredRecord> emitter) throws Exception {
try {
StructuredRecord record = parseRecord(input.getValue().toString());
emitter.emit(record);
} catch (Exception e) {
// Emit error record instead of failing the entire pipeline
ErrorRecord<String> errorRecord = new ErrorRecord<>(
input.getValue().toString(),
"Failed to parse record: " + e.getMessage()
);
emitter.emitError(new InvalidEntry<>(400, errorRecord.getErrorMessage(), errorRecord));
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-cdap-cdap--cdap-etl-api