CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-etl-proto

Protocol classes and configuration objects for ETL (Extract, Transform, Load) pipelines in CDAP

Pending
Overview
Eval results
Files

pipeline-triggering.mddocs/

Pipeline Triggering and Property Mapping

Advanced pipeline triggering capabilities enabling property mapping between triggering and triggered pipelines. Supports both argument mapping and plugin property mapping for sophisticated pipeline orchestration and parameter passing.

Capabilities

Triggering Property Mapping

Container for managing property mappings between triggering and triggered pipelines, enabling complex parameter passing scenarios.

/**
 * Container for property mappings between triggering and triggered pipelines
 */
public class TriggeringPropertyMapping {
    /**
     * Create empty property mapping (no mappings)
     */
    public TriggeringPropertyMapping();
    
    /**
     * Create property mapping with argument and plugin property mappings
     * @param arguments list of argument mappings between pipelines
     * @param pluginProperties list of plugin property mappings
     */
    public TriggeringPropertyMapping(List<ArgumentMapping> arguments, List<PluginPropertyMapping> pluginProperties);
    
    /**
     * Get argument mappings between triggering and triggered pipeline arguments
     * @return immutable list of argument mappings
     */
    public List<ArgumentMapping> getArguments();
    
    /**
     * Get plugin property mappings from triggering pipeline to triggered pipeline arguments
     * @return immutable list of plugin property mappings
     */
    public List<PluginPropertyMapping> getPluginProperties();
}

Usage Examples:

import co.cask.cdap.etl.proto.v2.*;

// Simple argument mapping between pipelines
List<ArgumentMapping> argumentMappings = List.of(
    new ArgumentMapping("source_table", "input_table"),
    new ArgumentMapping("processing_date", "batch_date"),
    new ArgumentMapping("output_format", "target_format")
);

// Plugin property mapping from triggering pipeline to triggered arguments
List<PluginPropertyMapping> pluginPropertyMappings = List.of(
    new PluginPropertyMapping("file_source", "path", "input_path"),
    new PluginPropertyMapping("database_sink", "tableName", "target_table"),
    new PluginPropertyMapping("validator", "threshold", "validation_threshold")
);

// Create comprehensive property mapping
TriggeringPropertyMapping propertyMapping = new TriggeringPropertyMapping(
    argumentMappings,
    pluginPropertyMappings
);

// Use in pipeline triggering configuration
System.out.println("Argument mappings: " + propertyMapping.getArguments().size());
System.out.println("Plugin property mappings: " + propertyMapping.getPluginProperties().size());

Argument Mapping

Direct mapping between triggering pipeline arguments and triggered pipeline arguments.

/**
 * Mapping between triggering pipeline argument and triggered pipeline argument
 */
public class ArgumentMapping {
    /**
     * Create argument mapping between source and target arguments
     * @param source name of triggering pipeline argument, may be null
     * @param target name of triggered pipeline argument, may be null
     */
    public ArgumentMapping(String source, String target);
    
    /**
     * Get source argument name from triggering pipeline
     * @return triggering pipeline argument name, may be null
     */
    public String getSource();
    
    /**
     * Get target argument name for triggered pipeline
     * @return triggered pipeline argument name, may be null
     */
    public String getTarget();
}

Usage Examples:

import co.cask.cdap.etl.proto.v2.ArgumentMapping;

// Direct argument mappings for common parameters
ArgumentMapping tableMapping = new ArgumentMapping("input_table", "source_table");
ArgumentMapping dateMapping = new ArgumentMapping("process_date", "batch_date");
ArgumentMapping formatMapping = new ArgumentMapping("output_format", "sink_format");

// Complex mapping scenarios
List<ArgumentMapping> complexMappings = List.of(
    // Map single source to multiple targets (handled at pipeline level)
    new ArgumentMapping("master_config", "worker_config_1"),
    new ArgumentMapping("master_config", "worker_config_2"),
    
    // Environment-specific mappings
    new ArgumentMapping("prod_database_url", "db_connection_string"),
    new ArgumentMapping("prod_api_key", "external_service_key"),
    
    // Data lineage tracking
    new ArgumentMapping("upstream_batch_id", "source_batch_id"),
    new ArgumentMapping("pipeline_run_id", "parent_run_id")
);

// Conditional argument mapping based on triggering context
String sourceArg = "dynamic_source";
String targetArg = System.getenv("ENVIRONMENT").equals("prod") ? "prod_source" : "dev_source";
ArgumentMapping conditionalMapping = new ArgumentMapping(sourceArg, targetArg);

// Validation example
for (ArgumentMapping mapping : complexMappings) {
    if (mapping.getSource() == null || mapping.getTarget() == null) {
        System.out.println("Warning: Incomplete mapping - " + mapping);
    }
}

Plugin Property Mapping

Mapping between triggering pipeline plugin properties and triggered pipeline arguments, enabling complex parameter extraction from stage configurations.

/**
 * Mapping between triggering pipeline plugin property and triggered pipeline argument
 */
public class PluginPropertyMapping extends ArgumentMapping {
    /**
     * Create plugin property mapping
     * @param stageName name of the stage in triggering pipeline containing the plugin property
     * @param source name of the plugin property in the specified stage
     * @param target name of the triggered pipeline argument to receive the property value
     */
    public PluginPropertyMapping(String stageName, String source, String target);
    
    /**
     * Get stage name containing the plugin property
     * @return stage name in triggering pipeline, may be null
     */
    public String getStageName();
    
    // Inherits getSource() and getTarget() from ArgumentMapping
}

Usage Examples:

import co.cask.cdap.etl.proto.v2.PluginPropertyMapping;

// Extract file paths from source stages
PluginPropertyMapping filePathMapping = new PluginPropertyMapping(
    "file_reader",           // stage name
    "path",                 // plugin property
    "input_file_path"       // target argument
);

// Extract database connection details
PluginPropertyMapping dbConnectionMapping = new PluginPropertyMapping(
    "database_source",
    "connectionString", 
    "downstream_db_url"
);

// Extract processing parameters
PluginPropertyMapping thresholdMapping = new PluginPropertyMapping(
    "data_validator",
    "validation_threshold",
    "quality_threshold"
);

// Complex plugin property extraction
List<PluginPropertyMapping> extractionMappings = List.of(
    // Extract source configuration
    new PluginPropertyMapping("kafka_source", "brokers", "kafka_brokers"),
    new PluginPropertyMapping("kafka_source", "topic", "source_topic"),
    
    // Extract transformation parameters
    new PluginPropertyMapping("aggregator", "window_size", "agg_window"),
    new PluginPropertyMapping("aggregator", "grouping_fields", "group_by_fields"),
    
    // Extract sink configuration
    new PluginPropertyMapping("table_sink", "name", "output_table"),
    new PluginPropertyMapping("table_sink", "schema.row.field", "key_field")
);

// Validation and usage
for (PluginPropertyMapping mapping : extractionMappings) {
    System.out.printf("Stage: %s, Property: %s -> Argument: %s%n",
        mapping.getStageName(), mapping.getSource(), mapping.getTarget());
}

Pipeline Orchestration Patterns

Common patterns for orchestrating multiple pipelines with property mapping and triggering.

Sequential Pipeline Chain:

// Master pipeline triggers data processing pipeline
TriggeringPropertyMapping masterToProcessor = new TriggeringPropertyMapping(
    List.of(
        new ArgumentMapping("data_date", "process_date"),
        new ArgumentMapping("source_system", "input_system")
    ),
    List.of(
        new PluginPropertyMapping("file_source", "directory", "input_directory"),
        new PluginPropertyMapping("file_source", "format", "file_format")
    )
);

// Processing pipeline triggers aggregation pipeline
TriggeringPropertyMapping processorToAggregator = new TriggeringPropertyMapping(
    List.of(
        new ArgumentMapping("process_date", "aggregation_date"),
        new ArgumentMapping("processed_records", "input_count")
    ),
    List.of(
        new PluginPropertyMapping("output_table", "name", "source_table"),
        new PluginPropertyMapping("quality_checker", "pass_rate", "quality_threshold")
    )
);

Fan-out Pipeline Pattern:

// Single trigger pipeline spawning multiple specialized processors
TriggeringPropertyMapping fanOutMapping = new TriggeringPropertyMapping(
    List.of(
        // Common arguments for all triggered pipelines
        new ArgumentMapping("master_batch_id", "parent_batch_id"),
        new ArgumentMapping("processing_timestamp", "start_time")
    ),
    List.of(
        // Extract different aspects for different pipelines
        new PluginPropertyMapping("data_splitter", "customer_output_path", "customer_data_path"),
        new PluginPropertyMapping("data_splitter", "order_output_path", "order_data_path"),
        new PluginPropertyMapping("data_splitter", "product_output_path", "product_data_path")
    )
);

Dynamic Configuration Pattern:

// Environment-aware property mapping
TriggeringPropertyMapping dynamicMapping = new TriggeringPropertyMapping(
    List.of(
        new ArgumentMapping("environment", "target_env"),
        new ArgumentMapping("scaling_factor", "parallelism_level")
    ),
    List.of(
        // Extract environment-specific configurations
        new PluginPropertyMapping("env_config", "database_url", "target_database"),
        new PluginPropertyMapping("env_config", "api_endpoint", "service_url"),
        new PluginPropertyMapping("resource_manager", "memory_allocation", "worker_memory"),
        new PluginPropertyMapping("resource_manager", "cpu_cores", "worker_cores")
    )
);

Advanced Triggering Scenarios

Complex triggering scenarios with conditional mappings and error handling.

// Conditional triggering based on data quality
TriggeringPropertyMapping qualityBasedTriggering = new TriggeringPropertyMapping(
    List.of(
        new ArgumentMapping("data_quality_score", "input_quality"),
        new ArgumentMapping("record_count", "input_volume")
    ),
    List.of(
        new PluginPropertyMapping("quality_validator", "pass_threshold", "minimum_quality"),
        new PluginPropertyMapping("quality_validator", "error_rate", "max_error_rate"),
        new PluginPropertyMapping("record_counter", "total_records", "expected_count")
    )
);

// Error recovery triggering
TriggeringPropertyMapping errorRecoveryMapping = new TriggeringPropertyMapping(
    List.of(
        new ArgumentMapping("failed_batch_id", "recovery_batch_id"),
        new ArgumentMapping("error_timestamp", "failure_time"),
        new ArgumentMapping("retry_attempt", "attempt_number")
    ),
    List.of(
        new PluginPropertyMapping("error_analyzer", "failure_reason", "error_category"),
        new PluginPropertyMapping("error_analyzer", "affected_records", "recovery_scope"),
        new PluginPropertyMapping("checkpoint_manager", "last_success_point", "resume_from")
    )
);

// Multi-tenant triggering
TriggeringPropertyMapping multiTenantMapping = new TriggeringPropertyMapping(
    List.of(
        new ArgumentMapping("tenant_id", "target_tenant"),
        new ArgumentMapping("tenant_config", "processing_rules")
    ),
    List.of(
        new PluginPropertyMapping("tenant_resolver", "database_schema", "tenant_schema"),
        new PluginPropertyMapping("tenant_resolver", "resource_limits", "tenant_quotas"),
        new PluginPropertyMapping("security_manager", "access_token", "tenant_credentials")
    )
);

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-etl-proto

docs

artifact-resource-management.md

connection-data-flow.md

index.md

legacy-version-support.md

pipeline-configuration.md

pipeline-triggering.md

stage-plugin-management.md

tile.json