CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-cdap-cdap--cdap-etl-api

CDAP ETL API provides comprehensive abstractions for building Extract, Transform, and Load pipeline applications on the CDAP platform

Pending
Overview
Eval results
Files

core-pipeline.mddocs/

Core Pipeline Components

Core ETL pipeline interfaces and classes for building transformation stages, handling data flow, and managing stage lifecycle in CDAP ETL pipelines.

Transform Operations

Transform<IN, OUT>

Base abstract class for transformation stages in ETL pipelines.

package io.cdap.cdap.etl.api;

public abstract class Transform<IN, OUT> 
    implements StageLifecycle<TransformContext>, 
               SubmitterLifecycle<StageSubmitterContext>,
               Transformation<IN, OUT>, PipelineConfigurable {
    
    public static final String PLUGIN_TYPE = "transform";
    
    // Lifecycle methods
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {}
    public void initialize(TransformContext context) throws Exception {}
    public void prepareRun(StageSubmitterContext context) throws Exception {}
    public void onRunFinish(boolean succeeded, StageSubmitterContext context) {}
    public void destroy() {}
    
    // Access to runtime context
    protected TransformContext getContext();
}

Usage Example:

@Plugin(type = Transform.PLUGIN_TYPE)
@Name("TextCleaner")
@Description("Cleans and normalizes text fields")
public class TextCleanerTransform extends Transform<StructuredRecord, StructuredRecord> {
    
    private final Config config;
    private Schema outputSchema;
    
    @Override
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
        Schema inputSchema = stageConfigurer.getInputSchema();
        
        if (inputSchema != null) {
            outputSchema = buildOutputSchema(inputSchema);
            stageConfigurer.setOutputSchema(outputSchema);
        }
        
        config.validate(stageConfigurer.getFailureCollector());
    }
    
    @Override
    public void initialize(TransformContext context) throws Exception {
        outputSchema = context.getOutputSchema();
    }
    
    @Override
    public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) 
            throws Exception {
        StructuredRecord.Builder builder = StructuredRecord.builder(outputSchema);
        
        for (Schema.Field field : input.getSchema().getFields()) {
            Object value = input.get(field.getName());
            if (field.getSchema().getType() == Schema.Type.STRING && value != null) {
                // Clean text: trim, normalize whitespace, remove special chars
                String cleanedValue = value.toString().trim().replaceAll("\\s+", " ");
                builder.set(field.getName(), cleanedValue);
            } else {
                builder.set(field.getName(), value);
            }
        }
        
        emitter.emit(builder.build());
    }
}

Transformation<IN, OUT>

Core interface for data transformation operations.

package io.cdap.cdap.etl.api;

public interface Transformation<IN, OUT> {
    /**
     * Transform input record and emit zero or more output records.
     * 
     * @param input input record to transform
     * @param emitter emitter for output records
     * @throws Exception if transformation fails
     */
    void transform(IN input, Emitter<OUT> emitter) throws Exception;
}

Multi-Output Transforms

SplitterTransform<IN, OUT>

Transform that splits data to multiple named outputs.

package io.cdap.cdap.etl.api;

public abstract class SplitterTransform<IN, OUT> 
    implements MultiOutputPipelineConfigurable,
               StageLifecycle<TransformContext>,
               SubmitterLifecycle<StageSubmitterContext>,
               MultiOutputTransformation<IN, OUT> {
    
    public static final String PLUGIN_TYPE = "splittertransform";
}

MultiOutputTransformation<IN, E>

Interface for transformations with multiple outputs.

package io.cdap.cdap.etl.api;

public interface MultiOutputTransformation<IN, E> {
    /**
     * Transform input and emit to multiple named outputs.
     */
    void transform(IN input, MultiOutputEmitter<E> emitter) throws Exception;
}

Usage Example:

public class DataSplitter extends SplitterTransform<StructuredRecord, StructuredRecord> {
    
    @Override
    public void transform(StructuredRecord input, MultiOutputEmitter<StructuredRecord> emitter) {
        String category = input.get("category");
        
        if ("valid".equals(category)) {
            emitter.emit("valid-data", input);
        } else if ("error".equals(category)) {
            emitter.emit("error-data", input);
        } else {
            emitter.emit("unknown-data", input);
        }
    }
}

Error Transforms

ErrorTransform<ERR_IN, OUT>

Transform for handling error records from other stages.

package io.cdap.cdap.etl.api;

public abstract class ErrorTransform<ERR_IN, OUT> 
    implements StageLifecycle<TransformContext>,
               SubmitterLifecycle<StageSubmitterContext>,
               PipelineConfigurable {
    
    public static final String PLUGIN_TYPE = "errortransform";
    
    public void initialize(TransformContext context) throws Exception {}
    
    /**
     * Transform error record.
     */
    public abstract void transform(ErrorRecord<ERR_IN> input, Emitter<OUT> emitter) 
        throws Exception;
}

Data Emission

Emitter<T>

Primary interface for emitting data to next stage.

package io.cdap.cdap.etl.api;

public interface Emitter<T> extends AlertEmitter, ErrorEmitter<T> {
    /**
     * Emit a record to the next stage.
     */
    void emit(T value);
}

ErrorEmitter<T>

Interface for emitting error records.

package io.cdap.cdap.etl.api;

public interface ErrorEmitter<T> {
    /**
     * Emit an error record.
     */
    void emitError(InvalidEntry<T> invalidEntry);
}

AlertEmitter

Interface for emitting alerts.

package io.cdap.cdap.etl.api;

public interface AlertEmitter {
    /**
     * Emit an alert with payload.
     */
    void emitAlert(Map<String, String> payload);
}

MultiOutputEmitter<E>

Emitter for multi-output transformations.

package io.cdap.cdap.etl.api;

public interface MultiOutputEmitter<E> extends AlertEmitter, ErrorEmitter<Object> {
    /**
     * Emit to a specific output port.
     */
    void emit(String port, E value);
}

Aggregation Operations

Aggregator<GROUP_KEY, GROUP_VALUE, OUT>

Interface for aggregation operations.

package io.cdap.cdap.etl.api;

public interface Aggregator<GROUP_KEY, GROUP_VALUE, OUT> {
    /**
     * Emit group key for the group value.
     */
    void groupBy(GROUP_VALUE groupValue, Emitter<GROUP_KEY> emitter) throws Exception;
    
    /**
     * Aggregate all values for a group key into output records.
     */
    void aggregate(GROUP_KEY groupKey, Iterator<GROUP_VALUE> groupValues, 
                  Emitter<OUT> emitter) throws Exception;
}

Usage Example:

public class SumAggregator implements Aggregator<String, StructuredRecord, StructuredRecord> {
    
    @Override
    public void groupBy(StructuredRecord groupValue, Emitter<String> emitter) throws Exception {
        String groupKey = groupValue.get("department");
        emitter.emit(groupKey);
    }
    
    @Override
    public void aggregate(String groupKey, Iterator<StructuredRecord> groupValues, 
                         Emitter<StructuredRecord> emitter) throws Exception {
        double sum = 0.0;
        int count = 0;
        
        while (groupValues.hasNext()) {
            StructuredRecord record = groupValues.next();
            Double salary = record.get("salary");
            if (salary != null) {
                sum += salary;
                count++;
            }
        }
        
        StructuredRecord result = StructuredRecord.builder(outputSchema)
            .set("department", groupKey)
            .set("total_salary", sum)
            .set("employee_count", count)
            .set("average_salary", count > 0 ? sum / count : 0.0)
            .build();
            
        emitter.emit(result);
    }
}

ReducibleAggregator<GROUP_KEY, GROUP_VALUE, AGGREGATE_VALUE, OUT>

Aggregator with reducible intermediate values for better performance.

package io.cdap.cdap.etl.api;

public interface ReducibleAggregator<GROUP_KEY, GROUP_VALUE, AGGREGATE_VALUE, OUT> {
    /**
     * Emit group key for the group value.
     */
    void groupBy(GROUP_VALUE groupValue, Emitter<GROUP_KEY> emitter) throws Exception;
    
    /**
     * Aggregate group values into intermediate aggregate values.
     */
    void aggregate(GROUP_KEY groupKey, Iterator<GROUP_VALUE> groupValues, 
                  Emitter<AGGREGATE_VALUE> emitter) throws Exception;
    
    /**
     * Reduce intermediate aggregate values into final output.
     */
    void reduce(GROUP_KEY groupKey, Iterator<AGGREGATE_VALUE> aggregateValues, 
               Emitter<OUT> emitter) throws Exception;
}

Join Operations

Joiner<JOIN_KEY, INPUT_RECORD, OUT>

Interface for join operations between multiple inputs.

package io.cdap.cdap.etl.api;

public interface Joiner<JOIN_KEY, INPUT_RECORD, OUT> {
    /**
     * Get join keys for input record (deprecated - use getJoinKeys).
     */
    @Deprecated
    default JOIN_KEY joinOn(String stageName, INPUT_RECORD inputRecord) throws Exception {
        throw new UnsupportedOperationException("joinOn method is deprecated");
    }
    
    /**
     * Get collection of join keys for input record.
     */
    default Collection<JOIN_KEY> getJoinKeys(String stageName, INPUT_RECORD inputRecord) 
        throws Exception {
        JOIN_KEY key = joinOn(stageName, inputRecord);
        return key == null ? Collections.emptySet() : Collections.singleton(key);
    }
    
    /**
     * Get join configuration.
     */
    JoinConfig getJoinConfig() throws Exception;
    
    /**
     * Merge records from join result.
     */
    OUT merge(JOIN_KEY joinKey, Iterable<JoinElement<INPUT_RECORD>> joinResult) 
        throws Exception;
}

JoinConfig

Configuration for join operations.

package io.cdap.cdap.etl.api;

public class JoinConfig {
    /**
     * Create join config with required input stages.
     */
    public JoinConfig(Iterable<String> requiredInputs) {}
    
    /**
     * Get required input stages for the join.
     */
    public Iterable<String> getRequiredInputs() {}
}

JoinElement<INPUT_RECORD>

Element in join result containing stage name and input record.

package io.cdap.cdap.etl.api;

public class JoinElement<INPUT_RECORD> {
    public JoinElement(String stageName, INPUT_RECORD inputRecord) {}
    
    public String getStageName() {}
    public INPUT_RECORD getInputRecord() {}
}

Lifecycle Management

StageLifecycle<T>

Interface for stage initialization and cleanup.

package io.cdap.cdap.etl.api;

public interface StageLifecycle<T> extends Destroyable {
    /**
     * Initialize stage with runtime context.
     */
    void initialize(T context) throws Exception;
}

SubmitterLifecycle<T>

Interface for submission lifecycle management.

package io.cdap.cdap.etl.api;

public interface SubmitterLifecycle<T> {
    /**
     * Prepare for pipeline run.
     */
    void prepareRun(T context) throws Exception;
    
    /**
     * Handle run completion.
     */
    void onRunFinish(boolean succeeded, T context);
}

Destroyable

Interface for resource cleanup.

package io.cdap.cdap.etl.api;

public interface Destroyable {
    /**
     * Cleanup resources.
     */
    void destroy();
}

Context Interfaces

StageContext

Base runtime context for pipeline stages.

package io.cdap.cdap.etl.api;

public interface StageContext extends RuntimeContext, PluginContext, 
                                    ServiceDiscoverer, FeatureFlagsProvider,
                                    ConnectionConfigurable {
    // Provides access to:
    // - Runtime arguments and metrics
    // - Plugin instantiation
    // - Service discovery
    // - Feature flags
    // - Connection configuration
}

TransformContext

Context for transform stages with lookup capabilities.

package io.cdap.cdap.etl.api;

public interface TransformContext extends StageContext, LookupProvider {
    // Inherits StageContext capabilities
    // Adds lookup provider for data lookups
}

StageSubmitterContext

Context for stage submission operations.

package io.cdap.cdap.etl.api;

public interface StageSubmitterContext {
    /**
     * Get runtime arguments.
     */
    Arguments getArguments();
    
    /**
     * Get stage metrics.
     */
    StageMetrics getMetrics();
}

Configuration

PipelineConfigurable

Interface for pipeline configuration.

package io.cdap.cdap.etl.api;

public interface PipelineConfigurable {
    /**
     * Configure the pipeline stage.
     */
    void configurePipeline(PipelineConfigurer pipelineConfigurer);
}

MultiInputPipelineConfigurable

Configuration interface for stages with multiple inputs.

package io.cdap.cdap.etl.api;

public interface MultiInputPipelineConfigurable {
    /**
     * Configure multi-input pipeline stage.
     */
    void configurePipeline(MultiInputPipelineConfigurer multiInputPipelineConfigurer);
}

MultiOutputPipelineConfigurable

Configuration interface for stages with multiple outputs.

package io.cdap.cdap.etl.api;

public interface MultiOutputPipelineConfigurable {
    /**
     * Configure multi-output pipeline stage.
     */
    void configurePipeline(MultiOutputPipelineConfigurer multiOutputPipelineConfigurer);
}

Error Handling

ErrorRecord<T>

Represents an error record from pipeline execution.

package io.cdap.cdap.etl.api;

public class ErrorRecord<T> {
    public ErrorRecord(T record, String errorMessage, int stage) {}
    public ErrorRecord(T record, String errorMessage) {}
    
    public T getRecord() {}
    public String getErrorMessage() {}
    public int getStage() {}
}

InvalidEntry<T>

Represents an invalid entry with error details.

package io.cdap.cdap.etl.api;

public class InvalidEntry<T> {
    public InvalidEntry(int errorCode, String errorMsg, T invalidRecord) {}
    
    public int getErrorCode() {}
    public String getErrorMsg() {}
    public T getInvalidRecord() {}
}

Specialized Transform Types

SerializableTransform<IN, OUT>

Serializable transformation interface.

package io.cdap.cdap.etl.api;

public interface SerializableTransform<IN, OUT> 
    extends Transformation<IN, OUT>, Serializable {
    // Combines transformation with Java serialization
}

ToKeyValueTransform<IN, KEY, VAL>

Transform input to key-value pairs.

package io.cdap.cdap.etl.api;

public interface ToKeyValueTransform<IN, KEY, VAL> {
    /**
     * Transform input into key-value pairs.
     */
    void transform(IN input, Emitter<KeyValue<KEY, VAL>> emitter) throws Exception;
}

FromKeyValueTransform<KEY, VAL, OUT>

Transform from key-value pairs to output records.

package io.cdap.cdap.etl.api;

public interface FromKeyValueTransform<KEY, VAL, OUT> {
    /**
     * Transform key-value pairs into output records.
     */
    void transform(KeyValue<KEY, VAL> input, Emitter<OUT> emitter) throws Exception;
}

Install with Tessl CLI

npx tessl i tessl/maven-io-cdap-cdap--cdap-etl-api

docs

actions-conditions.md

batch-processing.md

core-pipeline.md

data-connectors.md

index.md

join-operations.md

lineage-metadata.md

sql-engine.md

validation.md

tile.json