CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-etl-proto

Protocol classes and configuration objects for ETL (Extract, Transform, Load) pipelines in CDAP

Pending
Overview
Eval results
Files

stage-plugin-management.mddocs/

Stage and Plugin Management

Core components for defining individual pipeline stages and their associated plugins. Each stage represents a discrete processing step in an ETL pipeline, containing plugin configuration, validation logic, and metadata necessary for pipeline execution.

Capabilities

ETL Stage Configuration

Individual pipeline stage configuration containing plugin definition and stage metadata.

/**
 * ETL Stage Configuration representing a single processing step in a pipeline
 */
public final class ETLStage {
    /**
     * Create ETL stage with name and plugin
     * @param name unique stage name within the pipeline
     * @param plugin plugin configuration for this stage
     */
    public ETLStage(String name, ETLPlugin plugin);
    
    /**
     * Get stage name
     * @return unique stage name
     */
    public String getName();
    
    /**
     * Get plugin configuration
     * @return ETL plugin configuration
     */
    public ETLPlugin getPlugin();
    
    /**
     * Validate stage configuration
     * @throws IllegalArgumentException if stage configuration is invalid
     */
    public void validate();
    
    /**
     * Upgrade stage to current version with artifact resolution
     * @param upgradeContext context providing artifact information
     * @return upgraded stage configuration
     */
    public ETLStage upgradeStage(UpgradeContext upgradeContext);
}

Usage Examples:

import co.cask.cdap.etl.proto.v2.*;

// Create source stage
Map<String, String> sourceProperties = new HashMap<>();
sourceProperties.put("name", "customer_data");
sourceProperties.put("schema.row.field", "customer_id");

ETLPlugin tableSource = new ETLPlugin(
    "Table", 
    "batchsource",
    sourceProperties
);
ETLStage sourceStage = new ETLStage("customers", tableSource);

// Create transform stage with JavaScript
Map<String, String> transformProperties = new HashMap<>();
transformProperties.put("script", "function transform(input, emitter, context) {" +
                                 "  input.processed_date = new Date().toISOString();" +
                                 "  emitter.emit(input);" +
                                 "}");

ETLPlugin jsTransform = new ETLPlugin(
    "JavaScript",
    "transform", 
    transformProperties
);
ETLStage transformStage = new ETLStage("add_timestamp", jsTransform);

// Create sink stage
Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("name", "processed_customers");

ETLPlugin tableSink = new ETLPlugin(
    "Table",
    "batchsink",
    sinkProperties
);
ETLStage sinkStage = new ETLStage("output", tableSink);

// Validate stages
sourceStage.validate();
transformStage.validate();
sinkStage.validate();

ETL Plugin Configuration

Plugin configuration within an ETL stage, containing plugin identification, properties, and artifact selection.

/**
 * Plugin Configuration defining the processing logic for an ETL stage
 */
public class ETLPlugin {
    /**
     * Create plugin with basic configuration
     * @param name plugin name
     * @param type plugin type (e.g., "batchsource", "transform", "batchsink")
     * @param properties plugin configuration properties
     */
    public ETLPlugin(String name, String type, Map<String, String> properties);
    
    /**
     * Create plugin with artifact specification
     * @param name plugin name
     * @param type plugin type
     * @param properties plugin configuration properties
     * @param artifact artifact selector for plugin resolution
     */
    public ETLPlugin(String name, String type, Map<String, String> properties, ArtifactSelectorConfig artifact);
    
    /**
     * Get plugin name
     * @return plugin name
     */
    public String getName();
    
    /**
     * Get plugin type
     * @return plugin type identifier
     */
    public String getType();
    
    /**
     * Get plugin properties
     * @return immutable map of plugin properties
     */
    public Map<String, String> getProperties();
    
    /**
     * Get plugin properties as PluginProperties object
     * @return PluginProperties instance for CDAP plugin system
     */
    public PluginProperties getPluginProperties();
    
    /**
     * Get artifact configuration
     * @return artifact selector configuration, may be null
     */
    public ArtifactSelectorConfig getArtifactConfig();
    
    /**
     * Validate plugin configuration
     * @throws IllegalArgumentException if plugin configuration is invalid
     */
    public void validate();
}

Usage Examples:

import co.cask.cdap.etl.proto.v2.ETLPlugin;
import co.cask.cdap.etl.proto.ArtifactSelectorConfig;

// Basic plugin configuration
Map<String, String> basicProperties = new HashMap<>();
basicProperties.put("path", "/data/input");
basicProperties.put("format", "csv");
basicProperties.put("delimiter", ",");
basicProperties.put("skipHeader", "true");

ETLPlugin basicPlugin = new ETLPlugin(
    "File",
    "batchsource",
    basicProperties
);

// Plugin with artifact specification
ArtifactSelectorConfig artifact = new ArtifactSelectorConfig(
    "SYSTEM", 
    "core-plugins", 
    "2.8.0"
);

Map<String, String> bigQueryProperties = new HashMap<>();
bigQueryProperties.put("project", "my-gcp-project");
bigQueryProperties.put("dataset", "analytics");
bigQueryProperties.put("table", "events");
bigQueryProperties.put("serviceFilePath", "/path/to/service-account.json");

ETLPlugin pluginWithArtifact = new ETLPlugin(
    "BigQueryTable",
    "batchsource",
    bigQueryProperties,
    artifact
);

// Complex transform plugin
Map<String, String> pythonProperties = new HashMap<>();
pythonProperties.put("script", "def transform(input, emitter, context):\n" +
                             "    if input['age'] >= 18:\n" +
                             "        input['category'] = 'adult'\n" +
                             "    else:\n" +
                             "        input['category'] = 'minor'\n" +
                             "    emitter.emit(input)");

ETLPlugin pythonTransform = new ETLPlugin(
    "Python",
    "transform",
    pythonProperties
);

// Validate plugins
basicPlugin.validate();
pluginWithArtifact.validate();
pythonTransform.validate();

// Access plugin properties
Map<String, String> properties = basicPlugin.getProperties();
String path = properties.get("path");

Plugin Types and Common Configurations

Standard plugin types and their typical configuration patterns for different ETL operations.

Batch Source Plugins:

// Table source
Map<String, String> tableProps = new HashMap<>();
tableProps.put("name", "input_table");
tableProps.put("schema.row.field", "id");
ETLPlugin tableSource = new ETLPlugin("Table", "batchsource", tableProps);

// File source
Map<String, String> fileProps = new HashMap<>();
fileProps.put("path", "/data/input.csv");
fileProps.put("format", "csv");
ETLPlugin fileSource = new ETLPlugin("File", "batchsource", fileProps);

// Database source  
Map<String, String> dbProps = new HashMap<>();
dbProps.put("connectionString", "jdbc:mysql://localhost:3306/db");
dbProps.put("tableName", "users");
dbProps.put("user", "admin");
ETLPlugin dbSource = new ETLPlugin("Database", "batchsource", dbProps);

Transform Plugins:

// JavaScript transform
Map<String, String> jsProps = new HashMap<>();
jsProps.put("script", "function transform(input, emitter, context) { /* logic */ }");
ETLPlugin jsTransform = new ETLPlugin("JavaScript", "transform", jsProps);

// Projection transform
Map<String, String> projProps = new HashMap<>();
projProps.put("fieldsToKeep", "id,name,email");
projProps.put("fieldsToRename", "old_name:new_name");
ETLPlugin projection = new ETLPlugin("Projection", "transform", projProps);

// Validator transform
Map<String, String> validProps = new HashMap<>();
validProps.put("validators", "email:email,age:range[0,120]");
ETLPlugin validator = new ETLPlugin("Validator", "transform", validProps);

Batch Sink Plugins:

// Table sink
Map<String, String> tableSinkProps = new HashMap<>();
tableSinkProps.put("name", "output_table");
tableSinkProps.put("schema.row.field", "id");
ETLPlugin tableSink = new ETLPlugin("Table", "batchsink", tableSinkProps);

// File sink
Map<String, String> fileSinkProps = new HashMap<>();
fileSinkProps.put("path", "/data/output");
fileSinkProps.put("format", "parquet");
ETLPlugin fileSink = new ETLPlugin("File", "batchsink", fileSinkProps);

// Database sink
Map<String, String> dbSinkProps = new HashMap<>();
dbSinkProps.put("connectionString", "jdbc:postgresql://localhost:5432/warehouse");
dbSinkProps.put("tableName", "processed_data");
ETLPlugin dbSink = new ETLPlugin("Database", "batchsink", dbSinkProps);

Stage Validation and Error Handling

Comprehensive validation logic ensuring stage and plugin configurations are correct and complete.

/**
 * Validation methods for stage and plugin configurations
 */
public class ValidationUtils {
    /**
     * Common validation patterns for stage names
     * - Must not be null or empty
     * - Must be unique within pipeline
     * - Should follow naming conventions
     */
    public static void validateStageName(String name);
    
    /**
     * Common validation patterns for plugin configurations
     * - Plugin name and type must be specified
     * - Required properties must be present
     * - Property values must meet format requirements
     */
    public static void validatePlugin(ETLPlugin plugin);
}

Error Handling Examples:

try {
    // Invalid stage - missing name
    ETLStage invalidStage = new ETLStage("", plugin);
    invalidStage.validate();
} catch (IllegalArgumentException e) {
    // Handle validation error
    System.err.println("Stage validation failed: " + e.getMessage());
}

try {
    // Invalid plugin - missing required properties
    ETLPlugin invalidPlugin = new ETLPlugin("Database", "batchsource", new HashMap<>());
    invalidPlugin.validate();
} catch (IllegalArgumentException e) {
    // Handle plugin validation error
    System.err.println("Plugin validation failed: " + e.getMessage());
}

// Best practice: validate before using in pipeline
if (stage.getName() != null && !stage.getName().isEmpty()) {
    try {
        stage.validate();
        // Safe to use stage in pipeline
    } catch (IllegalArgumentException e) {
        // Log error and handle gracefully
        logger.error("Stage validation failed for {}: {}", stage.getName(), e.getMessage());
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-etl-proto

docs

artifact-resource-management.md

connection-data-flow.md

index.md

legacy-version-support.md

pipeline-configuration.md

pipeline-triggering.md

stage-plugin-management.md

tile.json