Protocol classes and configuration objects for ETL (Extract, Transform, Load) pipelines in CDAP
—
Connection management system for defining data flow between pipeline stages. Connections specify how data moves through the ETL pipeline, supporting various routing patterns including conditional connections, port-based routing, and complex data flow topologies.
Core connection functionality for linking pipeline stages in simple linear or branching data flows.
/**
* Connection between two ETL stages defining data flow
*/
public class Connection {
/**
* Create basic connection between stages
* @param from source stage name
* @param to target stage name
*/
public Connection(String from, String to);
/**
* Create connection with output port specification
* @param from source stage name
* @param to target stage name
* @param port output port name for multi-output stages
*/
public Connection(String from, String to, String port);
/**
* Create conditional connection
* @param from source stage name
* @param to target stage name
* @param condition boolean condition determining if connection is active
*/
public Connection(String from, String to, Boolean condition);
/**
* Get source stage name
* @return name of the source stage
*/
public String getFrom();
/**
* Get target stage name
* @return name of the target stage
*/
public String getTo();
/**
* Get output port name
* @return port name for multi-output stages, may be null
*/
public String getPort();
/**
* Get connection condition
* @return boolean condition for conditional connections, may be null
*/
public Boolean getCondition();
}Usage Examples:
import co.cask.cdap.etl.proto.Connection;
// Basic linear pipeline connections
Connection sourceToTransform = new Connection("data_source", "clean_data");
Connection transformToSink = new Connection("clean_data", "output_table");
// Multi-output stage with port-based routing
Connection validOutput = new Connection("validator", "valid_sink", "valid");
Connection invalidOutput = new Connection("validator", "error_sink", "invalid");
// Conditional connections for dynamic routing
Connection conditionalConnection = new Connection("decision_stage", "special_processing", true);
Connection defaultConnection = new Connection("decision_stage", "normal_processing", false);
// Complex branching example
Set<Connection> branchingFlow = Set.of(
new Connection("source", "splitter"),
new Connection("splitter", "branch_a", "output_a"),
new Connection("splitter", "branch_b", "output_b"),
new Connection("branch_a", "joiner", "input_a"),
new Connection("branch_b", "joiner", "input_b"),
new Connection("joiner", "final_sink")
);Simple sequential data processing patterns where data flows through stages in a straight line.
// Simple ETL pipeline: Extract -> Transform -> Load
List<Connection> linearPipeline = List.of(
new Connection("file_source", "data_cleaner"),
new Connection("data_cleaner", "format_converter"),
new Connection("format_converter", "database_sink")
);
// Multi-stage transformation pipeline
List<Connection> transformationChain = List.of(
new Connection("api_source", "json_parser"),
new Connection("json_parser", "field_validator"),
new Connection("field_validator", "data_enricher"),
new Connection("data_enricher", "aggregator"),
new Connection("aggregator", "table_sink")
);Complex data flow patterns supporting parallel processing, conditional routing, and data aggregation.
// Fan-out pattern - one source to multiple processing paths
Set<Connection> fanOutPattern = Set.of(
new Connection("main_source", "customer_processor"),
new Connection("main_source", "order_processor"),
new Connection("main_source", "product_processor"),
new Connection("customer_processor", "customer_sink"),
new Connection("order_processor", "order_sink"),
new Connection("product_processor", "product_sink")
);
// Fan-in pattern - multiple sources to single processor
Set<Connection> fanInPattern = Set.of(
new Connection("sales_data", "data_merger"),
new Connection("inventory_data", "data_merger"),
new Connection("customer_data", "data_merger"),
new Connection("data_merger", "unified_sink")
);
// Diamond pattern - split and rejoin
Set<Connection> diamondPattern = Set.of(
new Connection("input", "splitter"),
new Connection("splitter", "fast_path", "priority"),
new Connection("splitter", "slow_path", "standard"),
new Connection("fast_path", "merger", "fast_input"),
new Connection("slow_path", "merger", "slow_input"),
new Connection("merger", "output")
);Connection patterns for handling errors, validation failures, and alternative processing paths.
// Error handling with multiple outputs
Set<Connection> errorHandlingFlow = Set.of(
new Connection("source", "validator"),
new Connection("validator", "main_processor", "valid"),
new Connection("validator", "error_handler", "invalid"),
new Connection("main_processor", "success_sink"),
new Connection("error_handler", "error_sink")
);
// Multi-stage error handling
Set<Connection> complexErrorHandling = Set.of(
new Connection("input", "stage1"),
new Connection("stage1", "stage2", "success"),
new Connection("stage1", "error_processor", "error"),
new Connection("stage2", "stage3", "success"),
new Connection("stage2", "error_processor", "error"),
new Connection("stage3", "final_sink", "success"),
new Connection("stage3", "error_processor", "error"),
new Connection("error_processor", "error_sink")
);
// Conditional processing based on data content
Set<Connection> conditionalProcessing = Set.of(
new Connection("source", "classifier"),
new Connection("classifier", "premium_processor", "premium"),
new Connection("classifier", "standard_processor", "standard"),
new Connection("classifier", "basic_processor", "basic"),
new Connection("premium_processor", "premium_sink"),
new Connection("standard_processor", "standard_sink"),
new Connection("basic_processor", "basic_sink")
);Validation rules and best practices for ensuring correct connection topology and preventing common errors.
/**
* Connection validation utilities
*/
public class ConnectionValidator {
/**
* Validate connection topology for cycles and orphaned stages
* @param stages all pipeline stages
* @param connections all pipeline connections
* @throws IllegalArgumentException if topology is invalid
*/
public static void validateTopology(Set<ETLStage> stages, Set<Connection> connections);
/**
* Check for unreachable stages (no input connections)
* @param stages all pipeline stages
* @param connections all pipeline connections
* @return list of unreachable stage names
*/
public static List<String> findUnreachableStages(Set<ETLStage> stages, Set<Connection> connections);
/**
* Check for dead-end stages (no output connections, not sinks)
* @param stages all pipeline stages
* @param connections all pipeline connections
* @return list of dead-end stage names
*/
public static List<String> findDeadEndStages(Set<ETLStage> stages, Set<Connection> connections);
/**
* Detect circular dependencies in pipeline
* @param connections all pipeline connections
* @return true if circular dependencies exist
*/
public static boolean hasCycles(Set<Connection> connections);
}Best Practices Examples:
import co.cask.cdap.etl.proto.v2.*;
import co.cask.cdap.etl.proto.Connection;
// Good: Well-defined pipeline with clear data flow
Set<ETLStage> stages = Set.of(
new ETLStage("source", sourcePlugin),
new ETLStage("transform", transformPlugin),
new ETLStage("sink", sinkPlugin)
);
Set<Connection> connections = Set.of(
new Connection("source", "transform"),
new Connection("transform", "sink")
);
// Validate before using
try {
// Check for basic topology issues
ConnectionValidator.validateTopology(stages, connections);
// Check for specific issues
List<String> unreachable = ConnectionValidator.findUnreachableStages(stages, connections);
if (!unreachable.isEmpty()) {
throw new IllegalArgumentException("Unreachable stages: " + unreachable);
}
List<String> deadEnds = ConnectionValidator.findDeadEndStages(stages, connections);
if (!deadEnds.isEmpty()) {
throw new IllegalArgumentException("Dead-end stages: " + deadEnds);
}
if (ConnectionValidator.hasCycles(connections)) {
throw new IllegalArgumentException("Circular dependencies detected");
}
} catch (IllegalArgumentException e) {
logger.error("Pipeline topology validation failed: {}", e.getMessage());
// Handle validation failure
}
// Good: Descriptive stage names that clearly indicate data flow
Set<Connection> descriptiveConnections = Set.of(
new Connection("raw_customer_data", "validate_customer_fields"),
new Connection("validate_customer_fields", "enrich_customer_data"),
new Connection("enrich_customer_data", "customer_warehouse_table")
);
// Good: Port names that clearly indicate data type/purpose
Set<Connection> clearPortConnections = Set.of(
new Connection("data_splitter", "valid_records_processor", "validated_output"),
new Connection("data_splitter", "invalid_records_handler", "rejected_output"),
new Connection("data_splitter", "audit_logger", "audit_output")
);Sophisticated connection patterns for complex ETL scenarios including parallel processing, data replication, and conditional routing.
// Parallel processing with synchronization
Set<Connection> parallelSync = Set.of(
new Connection("source", "parallel_split"),
new Connection("parallel_split", "worker_1", "batch_1"),
new Connection("parallel_split", "worker_2", "batch_2"),
new Connection("parallel_split", "worker_3", "batch_3"),
new Connection("worker_1", "sync_barrier", "result_1"),
new Connection("worker_2", "sync_barrier", "result_2"),
new Connection("worker_3", "sync_barrier", "result_3"),
new Connection("sync_barrier", "final_aggregator"),
new Connection("final_aggregator", "output")
);
// Data replication for multiple destinations
Set<Connection> replicationPattern = Set.of(
new Connection("master_source", "data_replicator"),
new Connection("data_replicator", "operational_sink", "live_copy"),
new Connection("data_replicator", "warehouse_sink", "analytical_copy"),
new Connection("data_replicator", "backup_sink", "backup_copy"),
new Connection("data_replicator", "audit_sink", "audit_trail")
);
// Hierarchical processing with multiple levels
Set<Connection> hierarchicalFlow = Set.of(
new Connection("raw_input", "level1_processor"),
new Connection("level1_processor", "level2_processor_a", "category_a"),
new Connection("level1_processor", "level2_processor_b", "category_b"),
new Connection("level2_processor_a", "level3_detail_processor", "detailed_a"),
new Connection("level2_processor_a", "level3_summary_processor", "summary_a"),
new Connection("level2_processor_b", "level3_detail_processor", "detailed_b"),
new Connection("level2_processor_b", "level3_summary_processor", "summary_b"),
new Connection("level3_detail_processor", "detail_sink"),
new Connection("level3_summary_processor", "summary_sink")
);Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-etl-proto