CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-etl-proto

Protocol classes and configuration objects for ETL (Extract, Transform, Load) pipelines in CDAP

Pending
Overview
Eval results
Files

connection-data-flow.mddocs/

Connection and Data Flow

Connection management system for defining data flow between pipeline stages. Connections specify how data moves through the ETL pipeline, supporting various routing patterns including conditional connections, port-based routing, and complex data flow topologies.

Capabilities

Basic Connection Management

Core connection functionality for linking pipeline stages in simple linear or branching data flows.

/**
 * Connection between two ETL stages defining data flow
 */
public class Connection {
    /**
     * Create basic connection between stages
     * @param from source stage name
     * @param to target stage name
     */
    public Connection(String from, String to);
    
    /**
     * Create connection with output port specification
     * @param from source stage name
     * @param to target stage name
     * @param port output port name for multi-output stages
     */
    public Connection(String from, String to, String port);
    
    /**
     * Create conditional connection
     * @param from source stage name
     * @param to target stage name
     * @param condition boolean condition determining if connection is active
     */
    public Connection(String from, String to, Boolean condition);
    
    /**
     * Get source stage name
     * @return name of the source stage
     */
    public String getFrom();
    
    /**
     * Get target stage name
     * @return name of the target stage
     */
    public String getTo();
    
    /**
     * Get output port name
     * @return port name for multi-output stages, may be null
     */
    public String getPort();
    
    /**
     * Get connection condition
     * @return boolean condition for conditional connections, may be null
     */
    public Boolean getCondition();
}

Usage Examples:

import co.cask.cdap.etl.proto.Connection;

// Basic linear pipeline connections
Connection sourceToTransform = new Connection("data_source", "clean_data");
Connection transformToSink = new Connection("clean_data", "output_table");

// Multi-output stage with port-based routing
Connection validOutput = new Connection("validator", "valid_sink", "valid");
Connection invalidOutput = new Connection("validator", "error_sink", "invalid");

// Conditional connections for dynamic routing
Connection conditionalConnection = new Connection("decision_stage", "special_processing", true);
Connection defaultConnection = new Connection("decision_stage", "normal_processing", false);

// Complex branching example
Set<Connection> branchingFlow = Set.of(
    new Connection("source", "splitter"),
    new Connection("splitter", "branch_a", "output_a"),
    new Connection("splitter", "branch_b", "output_b"),
    new Connection("branch_a", "joiner", "input_a"),
    new Connection("branch_b", "joiner", "input_b"),
    new Connection("joiner", "final_sink")
);

Linear Pipeline Patterns

Simple sequential data processing patterns where data flows through stages in a straight line.

// Simple ETL pipeline: Extract -> Transform -> Load
List<Connection> linearPipeline = List.of(
    new Connection("file_source", "data_cleaner"),
    new Connection("data_cleaner", "format_converter"),
    new Connection("format_converter", "database_sink")
);

// Multi-stage transformation pipeline
List<Connection> transformationChain = List.of(
    new Connection("api_source", "json_parser"),
    new Connection("json_parser", "field_validator"),
    new Connection("field_validator", "data_enricher"),
    new Connection("data_enricher", "aggregator"),
    new Connection("aggregator", "table_sink")
);

Branching and Joining Patterns

Complex data flow patterns supporting parallel processing, conditional routing, and data aggregation.

// Fan-out pattern - one source to multiple processing paths
Set<Connection> fanOutPattern = Set.of(
    new Connection("main_source", "customer_processor"),
    new Connection("main_source", "order_processor"),
    new Connection("main_source", "product_processor"),
    new Connection("customer_processor", "customer_sink"),
    new Connection("order_processor", "order_sink"),
    new Connection("product_processor", "product_sink")
);

// Fan-in pattern - multiple sources to single processor
Set<Connection> fanInPattern = Set.of(
    new Connection("sales_data", "data_merger"),
    new Connection("inventory_data", "data_merger"),
    new Connection("customer_data", "data_merger"),
    new Connection("data_merger", "unified_sink")
);

// Diamond pattern - split and rejoin
Set<Connection> diamondPattern = Set.of(
    new Connection("input", "splitter"),
    new Connection("splitter", "fast_path", "priority"),
    new Connection("splitter", "slow_path", "standard"),
    new Connection("fast_path", "merger", "fast_input"),
    new Connection("slow_path", "merger", "slow_input"),
    new Connection("merger", "output")
);

Error Handling and Alternative Paths

Connection patterns for handling errors, validation failures, and alternative processing paths.

// Error handling with multiple outputs
Set<Connection> errorHandlingFlow = Set.of(
    new Connection("source", "validator"),
    new Connection("validator", "main_processor", "valid"),
    new Connection("validator", "error_handler", "invalid"),
    new Connection("main_processor", "success_sink"),
    new Connection("error_handler", "error_sink")
);

// Multi-stage error handling
Set<Connection> complexErrorHandling = Set.of(
    new Connection("input", "stage1"),
    new Connection("stage1", "stage2", "success"),
    new Connection("stage1", "error_processor", "error"),
    new Connection("stage2", "stage3", "success"),
    new Connection("stage2", "error_processor", "error"),
    new Connection("stage3", "final_sink", "success"),
    new Connection("stage3", "error_processor", "error"),
    new Connection("error_processor", "error_sink")
);

// Conditional processing based on data content
Set<Connection> conditionalProcessing = Set.of(
    new Connection("source", "classifier"),
    new Connection("classifier", "premium_processor", "premium"),
    new Connection("classifier", "standard_processor", "standard"),
    new Connection("classifier", "basic_processor", "basic"),
    new Connection("premium_processor", "premium_sink"),
    new Connection("standard_processor", "standard_sink"),
    new Connection("basic_processor", "basic_sink")
);

Connection Validation and Best Practices

Validation rules and best practices for ensuring correct connection topology and preventing common errors.

/**
 * Connection validation utilities
 */
public class ConnectionValidator {
    /**
     * Validate connection topology for cycles and orphaned stages
     * @param stages all pipeline stages
     * @param connections all pipeline connections
     * @throws IllegalArgumentException if topology is invalid
     */
    public static void validateTopology(Set<ETLStage> stages, Set<Connection> connections);
    
    /**
     * Check for unreachable stages (no input connections)
     * @param stages all pipeline stages
     * @param connections all pipeline connections
     * @return list of unreachable stage names
     */
    public static List<String> findUnreachableStages(Set<ETLStage> stages, Set<Connection> connections);
    
    /**
     * Check for dead-end stages (no output connections, not sinks)
     * @param stages all pipeline stages
     * @param connections all pipeline connections
     * @return list of dead-end stage names
     */
    public static List<String> findDeadEndStages(Set<ETLStage> stages, Set<Connection> connections);
    
    /**
     * Detect circular dependencies in pipeline
     * @param connections all pipeline connections
     * @return true if circular dependencies exist
     */
    public static boolean hasCycles(Set<Connection> connections);
}

Best Practices Examples:

import co.cask.cdap.etl.proto.v2.*;
import co.cask.cdap.etl.proto.Connection;

// Good: Well-defined pipeline with clear data flow
Set<ETLStage> stages = Set.of(
    new ETLStage("source", sourcePlugin),
    new ETLStage("transform", transformPlugin),
    new ETLStage("sink", sinkPlugin)
);

Set<Connection> connections = Set.of(
    new Connection("source", "transform"),
    new Connection("transform", "sink")
);

// Validate before using
try {
    // Check for basic topology issues
    ConnectionValidator.validateTopology(stages, connections);
    
    // Check for specific issues
    List<String> unreachable = ConnectionValidator.findUnreachableStages(stages, connections);
    if (!unreachable.isEmpty()) {
        throw new IllegalArgumentException("Unreachable stages: " + unreachable);
    }
    
    List<String> deadEnds = ConnectionValidator.findDeadEndStages(stages, connections);
    if (!deadEnds.isEmpty()) {
        throw new IllegalArgumentException("Dead-end stages: " + deadEnds);
    }
    
    if (ConnectionValidator.hasCycles(connections)) {
        throw new IllegalArgumentException("Circular dependencies detected");
    }
    
} catch (IllegalArgumentException e) {
    logger.error("Pipeline topology validation failed: {}", e.getMessage());
    // Handle validation failure
}

// Good: Descriptive stage names that clearly indicate data flow
Set<Connection> descriptiveConnections = Set.of(
    new Connection("raw_customer_data", "validate_customer_fields"),
    new Connection("validate_customer_fields", "enrich_customer_data"),
    new Connection("enrich_customer_data", "customer_warehouse_table")
);

// Good: Port names that clearly indicate data type/purpose
Set<Connection> clearPortConnections = Set.of(
    new Connection("data_splitter", "valid_records_processor", "validated_output"),
    new Connection("data_splitter", "invalid_records_handler", "rejected_output"),
    new Connection("data_splitter", "audit_logger", "audit_output")
);

Advanced Connection Patterns

Sophisticated connection patterns for complex ETL scenarios including parallel processing, data replication, and conditional routing.

// Parallel processing with synchronization
Set<Connection> parallelSync = Set.of(
    new Connection("source", "parallel_split"),
    new Connection("parallel_split", "worker_1", "batch_1"),
    new Connection("parallel_split", "worker_2", "batch_2"), 
    new Connection("parallel_split", "worker_3", "batch_3"),
    new Connection("worker_1", "sync_barrier", "result_1"),
    new Connection("worker_2", "sync_barrier", "result_2"),
    new Connection("worker_3", "sync_barrier", "result_3"),
    new Connection("sync_barrier", "final_aggregator"),
    new Connection("final_aggregator", "output")
);

// Data replication for multiple destinations
Set<Connection> replicationPattern = Set.of(
    new Connection("master_source", "data_replicator"),
    new Connection("data_replicator", "operational_sink", "live_copy"),
    new Connection("data_replicator", "warehouse_sink", "analytical_copy"),
    new Connection("data_replicator", "backup_sink", "backup_copy"),
    new Connection("data_replicator", "audit_sink", "audit_trail")
);

// Hierarchical processing with multiple levels
Set<Connection> hierarchicalFlow = Set.of(
    new Connection("raw_input", "level1_processor"),
    new Connection("level1_processor", "level2_processor_a", "category_a"),
    new Connection("level1_processor", "level2_processor_b", "category_b"),
    new Connection("level2_processor_a", "level3_detail_processor", "detailed_a"),
    new Connection("level2_processor_a", "level3_summary_processor", "summary_a"),
    new Connection("level2_processor_b", "level3_detail_processor", "detailed_b"),
    new Connection("level2_processor_b", "level3_summary_processor", "summary_b"),
    new Connection("level3_detail_processor", "detail_sink"),
    new Connection("level3_summary_processor", "summary_sink")
);

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-etl-proto

docs

artifact-resource-management.md

connection-data-flow.md

index.md

legacy-version-support.md

pipeline-configuration.md

pipeline-triggering.md

stage-plugin-management.md

tile.json