CDAP ETL API provides comprehensive abstractions for building Extract, Transform, and Load pipeline applications on the CDAP platform
npx @tessl/cli install tessl/maven-io-cdap-cdap--cdap-etl-api@6.11.0A comprehensive Java API for building Extract, Transform, Load (ETL) pipelines in the Cloud Data Application Platform (CDAP). This API provides a rich set of interfaces and classes for developing data processing plugins including sources, sinks, transforms, aggregators, joiners, and actions.
Maven Dependency:
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-etl-api</artifactId>
<version>6.11.0</version>
</dependency>Java Package: io.cdap.cdap.etl.api
Java Version: 8+
// Core Pipeline Components
import io.cdap.cdap.etl.api.Transform;
import io.cdap.cdap.etl.api.Transformation;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageContext;
import io.cdap.cdap.etl.api.TransformContext;
// Batch Processing
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchAggregator;
import io.cdap.cdap.etl.api.batch.BatchJoiner;
// Data Types
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
// Plugin Configuration
import io.cdap.cdap.etl.api.PipelineConfigurable;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.StageLifecycle;
// Actions and Conditions
import io.cdap.cdap.etl.api.action.Action;
import io.cdap.cdap.etl.api.condition.Condition;
// Error Handling and Validation
import io.cdap.cdap.etl.api.ErrorEmitter;
import io.cdap.cdap.etl.api.InvalidEntry;
import io.cdap.cdap.etl.api.validation.FailureCollector;@Plugin(type = Transform.PLUGIN_TYPE)
@Name("MyTransform")
public class MyTransform extends Transform<StructuredRecord, StructuredRecord> {
private Config config;
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
Schema inputSchema = stageConfigurer.getInputSchema();
if (inputSchema != null) {
Schema outputSchema = buildOutputSchema(inputSchema);
stageConfigurer.setOutputSchema(outputSchema);
}
}
@Override
public void initialize(TransformContext context) throws Exception {
this.config = getConfig();
}
@Override
public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) throws Exception {
StructuredRecord.Builder builder = StructuredRecord.builder(getContext().getOutputSchema());
// Transform logic here
for (Schema.Field field : input.getSchema().getFields()) {
builder.set(field.getName(), input.get(field.getName()));
}
emitter.emit(builder.build());
}
}public class MyPipeline implements PipelineConfigurable {
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
// Configure stage properties
stageConfigurer.setName("MyStage");
stageConfigurer.setDescription("A sample ETL stage");
// Set input/output schemas
stageConfigurer.setInputSchema(inputSchema);
stageConfigurer.setOutputSchema(outputSchema);
// Validate configuration
FailureCollector collector = stageConfigurer.getFailureCollector();
validateConfig(collector);
}
}The CDAP ETL API is built around several core concepts:
Data flows through pipeline stages using the Emitter Pattern:
Emitter<T> - Emit regular data recordsErrorEmitter<T> - Emit error records for error handlingAlertEmitter - Emit alerts and notificationsMultiOutputEmitter<T> - Emit to multiple output portsBuild ETL pipeline components with comprehensive lifecycle management, configuration validation, and multi-engine support.
// Transform interface
public void transform(IN input, Emitter<OUT> emitter) throws Exception
// Aggregator interface
public void aggregate(GROUP_KEY groupKey, Iterator<GROUP_VALUE> groupValues,
Emitter<OUT> emitter) throws Exception
// Joiner interface
public OUT merge(JOIN_KEY joinKey, Iterable<JoinElement<INPUT_RECORD>> joinResult)
throws ExceptionComprehensive support for batch data processing with sources, sinks, and specialized batch operations.
// Batch source
public abstract class BatchSource<KEY_IN, VAL_IN, OUT>
extends BatchConfigurable<BatchSourceContext>
// Batch sink
public abstract class BatchSink<IN, KEY_OUT, VAL_OUT>
extends BatchConfigurable<BatchSinkContext>Framework for building data connectors with browse, sample, and specification generation capabilities.
// Connector interface
BrowseDetail browse(ConnectorContext context, BrowseRequest request)
SampleDetail sample(ConnectorContext context, SampleRequest request)
ConnectorSpec generateSpec(ConnectorContext context, ConnectorSpecRequest request)Advanced join operations with automatic join optimization and comprehensive join definitions.
// AutoJoiner interface
JoinDefinition define(AutoJoinerContext context)
// Join definition builder
JoinDefinition.builder()
.select(fields)
.from(stages)
.on(joinConditions)
.build()Robust validation system for early error detection and structured error reporting.
// Validation failure collection
FailureCollector collector = context.getFailureCollector();
ValidationFailure failure = collector.addFailure("Invalid configuration", "Use valid value")
.withConfigProperty("propertyName")
.withCorrectiveAction("Set property to valid range");
// Exception with structured failures
throw new ValidationException(collector.getValidationFailures());SQL engine integration for advanced query processing and optimization.
// SQL Engine interface
boolean canTransform(SQLTransformDefinition transformDefinition)
SQLDataset transform(SQLTransformRequest transformRequest)
SQLDataset join(SQLJoinRequest joinRequest)Track data lineage and field-level transformations for governance and debugging.
// Field lineage recording
LineageRecorder recorder = context.getLineageRecorder();
recorder.record(Arrays.asList(
new FieldReadOperation("read", "Read source field",
Collections.singletonList("input.field")),
new FieldTransformOperation("transform", "Transform field",
Arrays.asList("input.field"), Arrays.asList("output.field"))
));Pipeline actions and conditional execution for workflow control and external integrations.
// Action implementation
public abstract class Action implements PipelineConfigurable,
SubmitterLifecycle<ActionContext>,
StageLifecycle<ActionContext>
// Condition evaluation
public abstract ConditionResult apply() throws ExceptionThe API supports these plugin types for the CDAP plugin system:
| Plugin Type | Constant | Description |
|---|---|---|
| Transform | "transform" | Data transformation stages |
| Splitter Transform | "splittertransform" | Multi-output transformation stages |
| Error Transform | "errortransform" | Error record transformation stages |
| Batch Source | "batchsource" | Batch data sources |
| Batch Sink | "batchsink" | Batch data sinks |
| Batch Aggregator | "batchaggregator" | Batch aggregation operations |
| Batch Joiner | "batchjoiner" | Batch join operations |
| Action | "action" | Pipeline actions |
| Post Action | "postaction" | Post-run pipeline actions |
| Condition | "condition" | Conditional execution |
| Alert Publisher | "alertpublisher" | Alert publishing |
| Connector | "connector" | Data connectors |
| SQL Engine | "sqlengine" | SQL query processing engines |
// Core transformation interface
interface Transformation<IN, OUT> {
void transform(IN input, Emitter<OUT> emitter) throws Exception;
}
// Data emission interface
interface Emitter<T> extends AlertEmitter, ErrorEmitter<T> {
void emit(T value);
}
// Pipeline configuration interface
interface PipelineConfigurable {
void configurePipeline(PipelineConfigurer pipelineConfigurer);
}
// Pipeline configurer interface
interface PipelineConfigurer extends PluginConfigurer, DatasetConfigurer, FeatureFlagsProvider {
StageConfigurer getStageConfigurer();
}
// Stage configurer interface
interface StageConfigurer {
@Nullable Schema getInputSchema();
String getStageName();
void setOutputSchema(@Nullable Schema outputSchema);
FailureCollector getFailureCollector();
}
// Stage lifecycle interface
interface StageLifecycle<T> extends Destroyable {
void initialize(T context) throws Exception;
}
// Runtime context interface
interface StageContext extends ServiceDiscoverer, MetadataReader, MetadataWriter,
LineageRecorder, FeatureFlagsProvider {
String getStageName();
StageMetrics getMetrics();
PluginProperties getPluginProperties();
PluginProperties getPluginProperties(String pluginId);
<T> T newPluginInstance(String pluginId) throws InstantiationException;
Schema getInputSchema();
Map<String, Schema> getInputSchemas();
Schema getOutputSchema();
Map<String, Schema> getOutputSchemas();
Arguments getArguments();
}