Protocol classes and configuration objects for ETL (Extract, Transform, Load) pipelines in CDAP
—
Current version (v2) ETL pipeline configuration providing comprehensive features for both batch and streaming data processing scenarios. Includes advanced resource management, stage logging, process timing, and extensive property support.
Core configuration class providing common functionality for all ETL pipeline types.
/**
* Base ETL Configuration class for all pipeline types
*/
public class ETLConfig extends Config implements UpgradeableConfig {
/**
* Get pipeline description
* @return pipeline description, may be null
*/
public String getDescription();
/**
* Get all pipeline stages
* @return immutable set of ETL stages
*/
public Set<ETLStage> getStages();
/**
* Get stage connections defining data flow
* @return immutable set of connections between stages
*/
public Set<Connection> getConnections();
/**
* Get resource allocation for pipeline execution
* @return resource configuration, defaults to 1024MB/1 core if not specified
*/
public Resources getResources();
/**
* Get driver resource allocation
* @return driver resource configuration
*/
public Resources getDriverResources();
/**
* Get client resource allocation
* @return client resource configuration
*/
public Resources getClientResources();
/**
* Get number of records for preview
* @return preview record count, defaults to 100
*/
public int getNumOfRecordsPreview();
/**
* Check if stage logging is enabled
* @return true if stage logging enabled, defaults to true
*/
public boolean isStageLoggingEnabled();
/**
* Check if process timing is enabled
* @return true if process timing enabled, defaults to true
*/
public boolean isProcessTimingEnabled();
/**
* Get pipeline properties
* @return immutable map of pipeline properties
*/
public Map<String, String> getProperties();
/**
* Validate configuration correctness
* @throws IllegalArgumentException if configuration is invalid
*/
public void validate();
/**
* Check if configuration can be upgraded
* @return false for v2 configurations (latest version)
*/
public boolean canUpgrade();
/**
* Upgrade configuration to next version
* @param upgradeContext context for upgrading
* @throws UnsupportedOperationException for v2 configurations
*/
public UpgradeableConfig upgrade(UpgradeContext upgradeContext);
}Usage Example:
import co.cask.cdap.etl.proto.v2.*;
import co.cask.cdap.api.Resources;
// Using builder pattern to create base configuration
ETLConfig.Builder<?> builder = new ETLConfig.Builder<ETLConfig.Builder<?>>() {
// Abstract builder implementation would be provided by concrete subclasses
};
// Configure resources and properties
Map<String, String> properties = new HashMap<>();
properties.put("custom.property", "value");
builder.setResources(new Resources(2048, 4))
.setDriverResources(new Resources(1024, 2))
.setClientResources(new Resources(512, 1))
.setNumOfRecordsPreview(500)
.setProperties(properties);
// Disable optional features
builder.disableStageLogging()
.disableProcessTiming();Configuration specific to batch ETL pipelines with scheduling, execution engine selection, and post-action support.
/**
* ETL Batch Configuration for scheduled batch processing pipelines
*/
public final class ETLBatchConfig extends ETLConfig {
/**
* Get post-execution actions
* @return immutable list of post-actions to execute after pipeline completion
*/
public List<ETLStage> getPostActions();
/**
* Get execution engine
* @return execution engine (MapReduce or Spark), defaults to MapReduce
*/
public Engine getEngine();
/**
* Get schedule configuration
* @return schedule string, may be null
*/
public String getSchedule();
/**
* Get maximum concurrent runs
* @return max concurrent runs, may be null for unlimited
*/
public Integer getMaxConcurrentRuns();
/**
* Convert old configuration format to current v2 format
* @return v2 batch configuration
*/
public ETLBatchConfig convertOldConfig();
/**
* Create builder for batch configuration
* @return new builder instance
*/
public static Builder builder();
/**
* Create builder with schedule (deprecated)
* @param schedule time schedule
* @return new builder instance
* @deprecated use builder() and setTimeSchedule() instead
*/
@Deprecated
public static Builder builder(String schedule);
}Usage Example:
import co.cask.cdap.etl.proto.v2.*;
import co.cask.cdap.etl.api.Engine;
import co.cask.cdap.api.Resources;
// Build batch ETL configuration
ETLBatchConfig batchConfig = ETLBatchConfig.builder()
.setTimeSchedule("0 0 2 * * ?") // Daily at 2 AM
.setEngine(Engine.SPARK)
.setMaxConcurrentRuns(3)
.addStage(sourceStage)
.addStage(transformStage)
.addStage(sinkStage)
.addConnection("source", "transform")
.addConnection("transform", "sink")
.addPostAction(cleanupAction)
.setResources(new Resources(4096, 8))
.setDriverResources(new Resources(2048, 4))
.build();
// Validate and use
batchConfig.validate();Configuration for streaming ETL pipelines with batch interval settings, checkpoint management, and graceful shutdown options.
/**
* Data Streams Configuration for real-time streaming pipelines
*/
public final class DataStreamsConfig extends ETLConfig {
/**
* Get batch processing interval
* @return batch interval string (e.g., "1m", "30s")
*/
public String getBatchInterval();
/**
* Check if running in unit test mode
* @return true if in unit test mode
*/
public boolean isUnitTest();
/**
* Check if checkpoints are disabled
* @return true if checkpoints disabled, defaults to false
*/
public boolean checkpointsDisabled();
/**
* Get additional JVM options
* @return extra Java options string, defaults to empty
*/
public String getExtraJavaOpts();
/**
* Get graceful stop setting
* @return true for graceful stop, defaults to true
*/
public Boolean getStopGracefully();
/**
* Get checkpoint directory
* @return checkpoint directory path, may be null
*/
public String getCheckpointDir();
/**
* Create builder for data streams configuration
* @return new builder instance
*/
public static Builder builder();
}Usage Example:
import co.cask.cdap.etl.proto.v2.DataStreamsConfig;
// Build streaming ETL configuration
DataStreamsConfig streamConfig = DataStreamsConfig.builder()
.setBatchInterval("30s")
.setCheckpointDir("/tmp/streaming-checkpoints")
.setStopGracefully(true)
.addStage(kafkaSource)
.addStage(realtimeTransform)
.addStage(tablesSink)
.addConnection("kafka", "transform")
.addConnection("transform", "sink")
.setResources(new Resources(8192, 16))
.build();
// Configure for production use
if (!streamConfig.checkpointsDisabled()) {
// Checkpoints enabled - configure persistent storage
System.out.println("Checkpoints enabled at: " + streamConfig.getCheckpointDir());
}Abstract builder pattern providing fluent API for constructing ETL configurations with validation and type safety.
/**
* Abstract builder for ETL configurations
* @param <T> The concrete builder type for method chaining
*/
public abstract static class Builder<T extends Builder> {
/**
* Add a stage to the pipeline
* @param stage ETL stage to add
* @return builder instance for chaining
*/
public T addStage(ETLStage stage);
/**
* Add connection between stages
* @param from source stage name
* @param to target stage name
* @return builder instance for chaining
*/
public T addConnection(String from, String to);
/**
* Add connection with port specification
* @param from source stage name
* @param to target stage name
* @param port output port name
* @return builder instance for chaining
*/
public T addConnection(String from, String to, String port);
/**
* Add conditional connection
* @param from source stage name
* @param to target stage name
* @param condition connection condition
* @return builder instance for chaining
*/
public T addConnection(String from, String to, Boolean condition);
/**
* Add connection object
* @param connection connection to add
* @return builder instance for chaining
*/
public T addConnection(Connection connection);
/**
* Add multiple connections
* @param connections collection of connections
* @return builder instance for chaining
*/
public T addConnections(Collection<Connection> connections);
/**
* Set resource allocation
* @param resources resource configuration
* @return builder instance for chaining
*/
public T setResources(Resources resources);
/**
* Set driver resource allocation
* @param resources driver resource configuration
* @return builder instance for chaining
*/
public T setDriverResources(Resources resources);
/**
* Set client resource allocation
* @param resources client resource configuration
* @return builder instance for chaining
*/
public T setClientResources(Resources resources);
/**
* Set number of preview records
* @param numOfRecordsPreview preview record count
* @return builder instance for chaining
*/
public T setNumOfRecordsPreview(int numOfRecordsPreview);
/**
* Disable stage logging
* @return builder instance for chaining
*/
public T disableStageLogging();
/**
* Disable process timing
* @return builder instance for chaining
*/
public T disableProcessTiming();
/**
* Set pipeline properties
* @param properties pipeline properties map
* @return builder instance for chaining
*/
public T setProperties(Map<String, String> properties);
}Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-etl-proto