CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-cdap-cdap--cdap-etl-api

CDAP ETL API provides comprehensive abstractions for building Extract, Transform, and Load pipeline applications on the CDAP platform

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

index.mddocs/

CDAP ETL API

A comprehensive Java API for building Extract, Transform, Load (ETL) pipelines in the Cloud Data Application Platform (CDAP). This API provides a rich set of interfaces and classes for developing data processing plugins including sources, sinks, transforms, aggregators, joiners, and actions.

Package Information

Maven Dependency:

<dependency>
    <groupId>io.cdap.cdap</groupId>
    <artifactId>cdap-etl-api</artifactId>
    <version>6.11.0</version>
</dependency>

Java Package: io.cdap.cdap.etl.api
Java Version: 8+

Core Imports

// Core Pipeline Components
import io.cdap.cdap.etl.api.Transform;
import io.cdap.cdap.etl.api.Transformation;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageContext;
import io.cdap.cdap.etl.api.TransformContext;

// Batch Processing
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchAggregator;
import io.cdap.cdap.etl.api.batch.BatchJoiner;

// Data Types
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;

// Plugin Configuration
import io.cdap.cdap.etl.api.PipelineConfigurable;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.StageLifecycle;

// Actions and Conditions
import io.cdap.cdap.etl.api.action.Action;
import io.cdap.cdap.etl.api.condition.Condition;

// Error Handling and Validation
import io.cdap.cdap.etl.api.ErrorEmitter;
import io.cdap.cdap.etl.api.InvalidEntry;
import io.cdap.cdap.etl.api.validation.FailureCollector;

Basic Usage

Simple Transform Plugin

@Plugin(type = Transform.PLUGIN_TYPE)
@Name("MyTransform")
public class MyTransform extends Transform<StructuredRecord, StructuredRecord> {
    
    private Config config;
    
    @Override
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
        Schema inputSchema = stageConfigurer.getInputSchema();
        
        if (inputSchema != null) {
            Schema outputSchema = buildOutputSchema(inputSchema);
            stageConfigurer.setOutputSchema(outputSchema);
        }
    }
    
    @Override
    public void initialize(TransformContext context) throws Exception {
        this.config = getConfig();
    }
    
    @Override
    public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) throws Exception {
        StructuredRecord.Builder builder = StructuredRecord.builder(getContext().getOutputSchema());
        
        // Transform logic here
        for (Schema.Field field : input.getSchema().getFields()) {
            builder.set(field.getName(), input.get(field.getName()));
        }
        
        emitter.emit(builder.build());
    }
}

Basic Pipeline Configuration

public class MyPipeline implements PipelineConfigurable {
    
    @Override
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
        
        // Configure stage properties
        stageConfigurer.setName("MyStage");
        stageConfigurer.setDescription("A sample ETL stage");
        
        // Set input/output schemas
        stageConfigurer.setInputSchema(inputSchema);
        stageConfigurer.setOutputSchema(outputSchema);
        
        // Validate configuration
        FailureCollector collector = stageConfigurer.getFailureCollector();
        validateConfig(collector);
    }
}

Architecture

The CDAP ETL API is built around several core concepts:

Pipeline Stages

  • Sources - Read data from external systems
  • Transforms - Process and modify data records
  • Sinks - Write data to external systems
  • Aggregators - Perform grouping and aggregation operations
  • Joiners - Combine data from multiple inputs
  • Actions - Execute custom logic or external operations

Execution Engines

  • MapReduce - Traditional Hadoop MapReduce execution
  • Spark - Apache Spark execution for better performance
  • Native - CDAP native execution engine

Data Flow

Data flows through pipeline stages using the Emitter Pattern:

  • Emitter<T> - Emit regular data records
  • ErrorEmitter<T> - Emit error records for error handling
  • AlertEmitter - Emit alerts and notifications
  • MultiOutputEmitter<T> - Emit to multiple output ports

Capabilities

Core Pipeline Development

Build ETL pipeline components with comprehensive lifecycle management, configuration validation, and multi-engine support.

// Transform interface
public void transform(IN input, Emitter<OUT> emitter) throws Exception

// Aggregator interface  
public void aggregate(GROUP_KEY groupKey, Iterator<GROUP_VALUE> groupValues, 
                     Emitter<OUT> emitter) throws Exception

// Joiner interface
public OUT merge(JOIN_KEY joinKey, Iterable<JoinElement<INPUT_RECORD>> joinResult) 
                throws Exception

Core Pipeline →

Batch Processing

Comprehensive support for batch data processing with sources, sinks, and specialized batch operations.

// Batch source
public abstract class BatchSource<KEY_IN, VAL_IN, OUT> 
    extends BatchConfigurable<BatchSourceContext>

// Batch sink
public abstract class BatchSink<IN, KEY_OUT, VAL_OUT> 
    extends BatchConfigurable<BatchSinkContext>

Batch Processing →

Data Connectors

Framework for building data connectors with browse, sample, and specification generation capabilities.

// Connector interface
BrowseDetail browse(ConnectorContext context, BrowseRequest request)
SampleDetail sample(ConnectorContext context, SampleRequest request)
ConnectorSpec generateSpec(ConnectorContext context, ConnectorSpecRequest request)

Data Connectors →

Join Operations

Advanced join operations with automatic join optimization and comprehensive join definitions.

// AutoJoiner interface
JoinDefinition define(AutoJoinerContext context)

// Join definition builder
JoinDefinition.builder()
    .select(fields)
    .from(stages)
    .on(joinConditions)
    .build()

Join Operations →

Validation Framework

Robust validation system for early error detection and structured error reporting.

// Validation failure collection
FailureCollector collector = context.getFailureCollector();
ValidationFailure failure = collector.addFailure("Invalid configuration", "Use valid value")
    .withConfigProperty("propertyName")
    .withCorrectiveAction("Set property to valid range");

// Exception with structured failures
throw new ValidationException(collector.getValidationFailures());

Validation →

SQL Engine Support

SQL engine integration for advanced query processing and optimization.

// SQL Engine interface
boolean canTransform(SQLTransformDefinition transformDefinition)
SQLDataset transform(SQLTransformRequest transformRequest)
SQLDataset join(SQLJoinRequest joinRequest)

SQL Engine →

Lineage and Metadata

Track data lineage and field-level transformations for governance and debugging.

// Field lineage recording
LineageRecorder recorder = context.getLineageRecorder();
recorder.record(Arrays.asList(
    new FieldReadOperation("read", "Read source field", 
                          Collections.singletonList("input.field")),
    new FieldTransformOperation("transform", "Transform field", 
                               Arrays.asList("input.field"), Arrays.asList("output.field"))
));

Lineage & Metadata →

Actions and Conditions

Pipeline actions and conditional execution for workflow control and external integrations.

// Action implementation
public abstract class Action implements PipelineConfigurable, 
                                      SubmitterLifecycle<ActionContext>, 
                                      StageLifecycle<ActionContext>

// Condition evaluation
public abstract ConditionResult apply() throws Exception

Actions & Conditions →

Plugin Types

The API supports these plugin types for the CDAP plugin system:

Plugin TypeConstantDescription
Transform"transform"Data transformation stages
Splitter Transform"splittertransform"Multi-output transformation stages
Error Transform"errortransform"Error record transformation stages
Batch Source"batchsource"Batch data sources
Batch Sink"batchsink"Batch data sinks
Batch Aggregator"batchaggregator"Batch aggregation operations
Batch Joiner"batchjoiner"Batch join operations
Action"action"Pipeline actions
Post Action"postaction"Post-run pipeline actions
Condition"condition"Conditional execution
Alert Publisher"alertpublisher"Alert publishing
Connector"connector"Data connectors
SQL Engine"sqlengine"SQL query processing engines

Key Interfaces

Type Definitions

// Core transformation interface
interface Transformation<IN, OUT> {
    void transform(IN input, Emitter<OUT> emitter) throws Exception;
}

// Data emission interface
interface Emitter<T> extends AlertEmitter, ErrorEmitter<T> {
    void emit(T value);
}

// Pipeline configuration interface
interface PipelineConfigurable {
    void configurePipeline(PipelineConfigurer pipelineConfigurer);
}

// Pipeline configurer interface
interface PipelineConfigurer extends PluginConfigurer, DatasetConfigurer, FeatureFlagsProvider {
    StageConfigurer getStageConfigurer();
}

// Stage configurer interface
interface StageConfigurer {
    @Nullable Schema getInputSchema();
    String getStageName();
    void setOutputSchema(@Nullable Schema outputSchema);
    FailureCollector getFailureCollector();
}

// Stage lifecycle interface  
interface StageLifecycle<T> extends Destroyable {
    void initialize(T context) throws Exception;
}

// Runtime context interface
interface StageContext extends ServiceDiscoverer, MetadataReader, MetadataWriter,
                              LineageRecorder, FeatureFlagsProvider {
    String getStageName();
    StageMetrics getMetrics();
    PluginProperties getPluginProperties();
    PluginProperties getPluginProperties(String pluginId);
    <T> T newPluginInstance(String pluginId) throws InstantiationException;
    Schema getInputSchema();
    Map<String, Schema> getInputSchemas();
    Schema getOutputSchema();
    Map<String, Schema> getOutputSchemas();
    Arguments getArguments();
}

docs

actions-conditions.md

batch-processing.md

core-pipeline.md

data-connectors.md

index.md

join-operations.md

lineage-metadata.md

sql-engine.md

validation.md

tile.json