CDAP ETL API provides comprehensive abstractions for building Extract, Transform, and Load pipeline applications on the CDAP platform
—
Core ETL pipeline interfaces and classes for building transformation stages, handling data flow, and managing stage lifecycle in CDAP ETL pipelines.
Base abstract class for transformation stages in ETL pipelines.
package io.cdap.cdap.etl.api;
public abstract class Transform<IN, OUT>
implements StageLifecycle<TransformContext>,
SubmitterLifecycle<StageSubmitterContext>,
Transformation<IN, OUT>, PipelineConfigurable {
public static final String PLUGIN_TYPE = "transform";
// Lifecycle methods
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {}
public void initialize(TransformContext context) throws Exception {}
public void prepareRun(StageSubmitterContext context) throws Exception {}
public void onRunFinish(boolean succeeded, StageSubmitterContext context) {}
public void destroy() {}
// Access to runtime context
protected TransformContext getContext();
}Usage Example:
@Plugin(type = Transform.PLUGIN_TYPE)
@Name("TextCleaner")
@Description("Cleans and normalizes text fields")
public class TextCleanerTransform extends Transform<StructuredRecord, StructuredRecord> {
private final Config config;
private Schema outputSchema;
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
Schema inputSchema = stageConfigurer.getInputSchema();
if (inputSchema != null) {
outputSchema = buildOutputSchema(inputSchema);
stageConfigurer.setOutputSchema(outputSchema);
}
config.validate(stageConfigurer.getFailureCollector());
}
@Override
public void initialize(TransformContext context) throws Exception {
outputSchema = context.getOutputSchema();
}
@Override
public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter)
throws Exception {
StructuredRecord.Builder builder = StructuredRecord.builder(outputSchema);
for (Schema.Field field : input.getSchema().getFields()) {
Object value = input.get(field.getName());
if (field.getSchema().getType() == Schema.Type.STRING && value != null) {
// Clean text: trim, normalize whitespace, remove special chars
String cleanedValue = value.toString().trim().replaceAll("\\s+", " ");
builder.set(field.getName(), cleanedValue);
} else {
builder.set(field.getName(), value);
}
}
emitter.emit(builder.build());
}
}Core interface for data transformation operations.
package io.cdap.cdap.etl.api;
public interface Transformation<IN, OUT> {
/**
* Transform input record and emit zero or more output records.
*
* @param input input record to transform
* @param emitter emitter for output records
* @throws Exception if transformation fails
*/
void transform(IN input, Emitter<OUT> emitter) throws Exception;
}Transform that splits data to multiple named outputs.
package io.cdap.cdap.etl.api;
public abstract class SplitterTransform<IN, OUT>
implements MultiOutputPipelineConfigurable,
StageLifecycle<TransformContext>,
SubmitterLifecycle<StageSubmitterContext>,
MultiOutputTransformation<IN, OUT> {
public static final String PLUGIN_TYPE = "splittertransform";
}Interface for transformations with multiple outputs.
package io.cdap.cdap.etl.api;
public interface MultiOutputTransformation<IN, E> {
/**
* Transform input and emit to multiple named outputs.
*/
void transform(IN input, MultiOutputEmitter<E> emitter) throws Exception;
}Usage Example:
public class DataSplitter extends SplitterTransform<StructuredRecord, StructuredRecord> {
@Override
public void transform(StructuredRecord input, MultiOutputEmitter<StructuredRecord> emitter) {
String category = input.get("category");
if ("valid".equals(category)) {
emitter.emit("valid-data", input);
} else if ("error".equals(category)) {
emitter.emit("error-data", input);
} else {
emitter.emit("unknown-data", input);
}
}
}Transform for handling error records from other stages.
package io.cdap.cdap.etl.api;
public abstract class ErrorTransform<ERR_IN, OUT>
implements StageLifecycle<TransformContext>,
SubmitterLifecycle<StageSubmitterContext>,
PipelineConfigurable {
public static final String PLUGIN_TYPE = "errortransform";
public void initialize(TransformContext context) throws Exception {}
/**
* Transform error record.
*/
public abstract void transform(ErrorRecord<ERR_IN> input, Emitter<OUT> emitter)
throws Exception;
}Primary interface for emitting data to next stage.
package io.cdap.cdap.etl.api;
public interface Emitter<T> extends AlertEmitter, ErrorEmitter<T> {
/**
* Emit a record to the next stage.
*/
void emit(T value);
}Interface for emitting error records.
package io.cdap.cdap.etl.api;
public interface ErrorEmitter<T> {
/**
* Emit an error record.
*/
void emitError(InvalidEntry<T> invalidEntry);
}Interface for emitting alerts.
package io.cdap.cdap.etl.api;
public interface AlertEmitter {
/**
* Emit an alert with payload.
*/
void emitAlert(Map<String, String> payload);
}Emitter for multi-output transformations.
package io.cdap.cdap.etl.api;
public interface MultiOutputEmitter<E> extends AlertEmitter, ErrorEmitter<Object> {
/**
* Emit to a specific output port.
*/
void emit(String port, E value);
}Interface for aggregation operations.
package io.cdap.cdap.etl.api;
public interface Aggregator<GROUP_KEY, GROUP_VALUE, OUT> {
/**
* Emit group key for the group value.
*/
void groupBy(GROUP_VALUE groupValue, Emitter<GROUP_KEY> emitter) throws Exception;
/**
* Aggregate all values for a group key into output records.
*/
void aggregate(GROUP_KEY groupKey, Iterator<GROUP_VALUE> groupValues,
Emitter<OUT> emitter) throws Exception;
}Usage Example:
public class SumAggregator implements Aggregator<String, StructuredRecord, StructuredRecord> {
@Override
public void groupBy(StructuredRecord groupValue, Emitter<String> emitter) throws Exception {
String groupKey = groupValue.get("department");
emitter.emit(groupKey);
}
@Override
public void aggregate(String groupKey, Iterator<StructuredRecord> groupValues,
Emitter<StructuredRecord> emitter) throws Exception {
double sum = 0.0;
int count = 0;
while (groupValues.hasNext()) {
StructuredRecord record = groupValues.next();
Double salary = record.get("salary");
if (salary != null) {
sum += salary;
count++;
}
}
StructuredRecord result = StructuredRecord.builder(outputSchema)
.set("department", groupKey)
.set("total_salary", sum)
.set("employee_count", count)
.set("average_salary", count > 0 ? sum / count : 0.0)
.build();
emitter.emit(result);
}
}Aggregator with reducible intermediate values for better performance.
package io.cdap.cdap.etl.api;
public interface ReducibleAggregator<GROUP_KEY, GROUP_VALUE, AGGREGATE_VALUE, OUT> {
/**
* Emit group key for the group value.
*/
void groupBy(GROUP_VALUE groupValue, Emitter<GROUP_KEY> emitter) throws Exception;
/**
* Aggregate group values into intermediate aggregate values.
*/
void aggregate(GROUP_KEY groupKey, Iterator<GROUP_VALUE> groupValues,
Emitter<AGGREGATE_VALUE> emitter) throws Exception;
/**
* Reduce intermediate aggregate values into final output.
*/
void reduce(GROUP_KEY groupKey, Iterator<AGGREGATE_VALUE> aggregateValues,
Emitter<OUT> emitter) throws Exception;
}Interface for join operations between multiple inputs.
package io.cdap.cdap.etl.api;
public interface Joiner<JOIN_KEY, INPUT_RECORD, OUT> {
/**
* Get join keys for input record (deprecated - use getJoinKeys).
*/
@Deprecated
default JOIN_KEY joinOn(String stageName, INPUT_RECORD inputRecord) throws Exception {
throw new UnsupportedOperationException("joinOn method is deprecated");
}
/**
* Get collection of join keys for input record.
*/
default Collection<JOIN_KEY> getJoinKeys(String stageName, INPUT_RECORD inputRecord)
throws Exception {
JOIN_KEY key = joinOn(stageName, inputRecord);
return key == null ? Collections.emptySet() : Collections.singleton(key);
}
/**
* Get join configuration.
*/
JoinConfig getJoinConfig() throws Exception;
/**
* Merge records from join result.
*/
OUT merge(JOIN_KEY joinKey, Iterable<JoinElement<INPUT_RECORD>> joinResult)
throws Exception;
}Configuration for join operations.
package io.cdap.cdap.etl.api;
public class JoinConfig {
/**
* Create join config with required input stages.
*/
public JoinConfig(Iterable<String> requiredInputs) {}
/**
* Get required input stages for the join.
*/
public Iterable<String> getRequiredInputs() {}
}Element in join result containing stage name and input record.
package io.cdap.cdap.etl.api;
public class JoinElement<INPUT_RECORD> {
public JoinElement(String stageName, INPUT_RECORD inputRecord) {}
public String getStageName() {}
public INPUT_RECORD getInputRecord() {}
}Interface for stage initialization and cleanup.
package io.cdap.cdap.etl.api;
public interface StageLifecycle<T> extends Destroyable {
/**
* Initialize stage with runtime context.
*/
void initialize(T context) throws Exception;
}Interface for submission lifecycle management.
package io.cdap.cdap.etl.api;
public interface SubmitterLifecycle<T> {
/**
* Prepare for pipeline run.
*/
void prepareRun(T context) throws Exception;
/**
* Handle run completion.
*/
void onRunFinish(boolean succeeded, T context);
}Interface for resource cleanup.
package io.cdap.cdap.etl.api;
public interface Destroyable {
/**
* Cleanup resources.
*/
void destroy();
}Base runtime context for pipeline stages.
package io.cdap.cdap.etl.api;
public interface StageContext extends RuntimeContext, PluginContext,
ServiceDiscoverer, FeatureFlagsProvider,
ConnectionConfigurable {
// Provides access to:
// - Runtime arguments and metrics
// - Plugin instantiation
// - Service discovery
// - Feature flags
// - Connection configuration
}Context for transform stages with lookup capabilities.
package io.cdap.cdap.etl.api;
public interface TransformContext extends StageContext, LookupProvider {
// Inherits StageContext capabilities
// Adds lookup provider for data lookups
}Context for stage submission operations.
package io.cdap.cdap.etl.api;
public interface StageSubmitterContext {
/**
* Get runtime arguments.
*/
Arguments getArguments();
/**
* Get stage metrics.
*/
StageMetrics getMetrics();
}Interface for pipeline configuration.
package io.cdap.cdap.etl.api;
public interface PipelineConfigurable {
/**
* Configure the pipeline stage.
*/
void configurePipeline(PipelineConfigurer pipelineConfigurer);
}Configuration interface for stages with multiple inputs.
package io.cdap.cdap.etl.api;
public interface MultiInputPipelineConfigurable {
/**
* Configure multi-input pipeline stage.
*/
void configurePipeline(MultiInputPipelineConfigurer multiInputPipelineConfigurer);
}Configuration interface for stages with multiple outputs.
package io.cdap.cdap.etl.api;
public interface MultiOutputPipelineConfigurable {
/**
* Configure multi-output pipeline stage.
*/
void configurePipeline(MultiOutputPipelineConfigurer multiOutputPipelineConfigurer);
}Represents an error record from pipeline execution.
package io.cdap.cdap.etl.api;
public class ErrorRecord<T> {
public ErrorRecord(T record, String errorMessage, int stage) {}
public ErrorRecord(T record, String errorMessage) {}
public T getRecord() {}
public String getErrorMessage() {}
public int getStage() {}
}Represents an invalid entry with error details.
package io.cdap.cdap.etl.api;
public class InvalidEntry<T> {
public InvalidEntry(int errorCode, String errorMsg, T invalidRecord) {}
public int getErrorCode() {}
public String getErrorMsg() {}
public T getInvalidRecord() {}
}Serializable transformation interface.
package io.cdap.cdap.etl.api;
public interface SerializableTransform<IN, OUT>
extends Transformation<IN, OUT>, Serializable {
// Combines transformation with Java serialization
}Transform input to key-value pairs.
package io.cdap.cdap.etl.api;
public interface ToKeyValueTransform<IN, KEY, VAL> {
/**
* Transform input into key-value pairs.
*/
void transform(IN input, Emitter<KeyValue<KEY, VAL>> emitter) throws Exception;
}Transform from key-value pairs to output records.
package io.cdap.cdap.etl.api;
public interface FromKeyValueTransform<KEY, VAL, OUT> {
/**
* Transform key-value pairs into output records.
*/
void transform(KeyValue<KEY, VAL> input, Emitter<OUT> emitter) throws Exception;
}Install with Tessl CLI
npx tessl i tessl/maven-io-cdap-cdap--cdap-etl-api