Protocol classes and configuration objects for ETL (Extract, Transform, Load) pipelines in CDAP
—
Core components for defining individual pipeline stages and their associated plugins. Each stage represents a discrete processing step in an ETL pipeline, containing plugin configuration, validation logic, and metadata necessary for pipeline execution.
Individual pipeline stage configuration containing plugin definition and stage metadata.
/**
* ETL Stage Configuration representing a single processing step in a pipeline
*/
public final class ETLStage {
/**
* Create ETL stage with name and plugin
* @param name unique stage name within the pipeline
* @param plugin plugin configuration for this stage
*/
public ETLStage(String name, ETLPlugin plugin);
/**
* Get stage name
* @return unique stage name
*/
public String getName();
/**
* Get plugin configuration
* @return ETL plugin configuration
*/
public ETLPlugin getPlugin();
/**
* Validate stage configuration
* @throws IllegalArgumentException if stage configuration is invalid
*/
public void validate();
/**
* Upgrade stage to current version with artifact resolution
* @param upgradeContext context providing artifact information
* @return upgraded stage configuration
*/
public ETLStage upgradeStage(UpgradeContext upgradeContext);
}Usage Examples:
import co.cask.cdap.etl.proto.v2.*;
// Create source stage
Map<String, String> sourceProperties = new HashMap<>();
sourceProperties.put("name", "customer_data");
sourceProperties.put("schema.row.field", "customer_id");
ETLPlugin tableSource = new ETLPlugin(
"Table",
"batchsource",
sourceProperties
);
ETLStage sourceStage = new ETLStage("customers", tableSource);
// Create transform stage with JavaScript
Map<String, String> transformProperties = new HashMap<>();
transformProperties.put("script", "function transform(input, emitter, context) {" +
" input.processed_date = new Date().toISOString();" +
" emitter.emit(input);" +
"}");
ETLPlugin jsTransform = new ETLPlugin(
"JavaScript",
"transform",
transformProperties
);
ETLStage transformStage = new ETLStage("add_timestamp", jsTransform);
// Create sink stage
Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("name", "processed_customers");
ETLPlugin tableSink = new ETLPlugin(
"Table",
"batchsink",
sinkProperties
);
ETLStage sinkStage = new ETLStage("output", tableSink);
// Validate stages
sourceStage.validate();
transformStage.validate();
sinkStage.validate();Plugin configuration within an ETL stage, containing plugin identification, properties, and artifact selection.
/**
* Plugin Configuration defining the processing logic for an ETL stage
*/
public class ETLPlugin {
/**
* Create plugin with basic configuration
* @param name plugin name
* @param type plugin type (e.g., "batchsource", "transform", "batchsink")
* @param properties plugin configuration properties
*/
public ETLPlugin(String name, String type, Map<String, String> properties);
/**
* Create plugin with artifact specification
* @param name plugin name
* @param type plugin type
* @param properties plugin configuration properties
* @param artifact artifact selector for plugin resolution
*/
public ETLPlugin(String name, String type, Map<String, String> properties, ArtifactSelectorConfig artifact);
/**
* Get plugin name
* @return plugin name
*/
public String getName();
/**
* Get plugin type
* @return plugin type identifier
*/
public String getType();
/**
* Get plugin properties
* @return immutable map of plugin properties
*/
public Map<String, String> getProperties();
/**
* Get plugin properties as PluginProperties object
* @return PluginProperties instance for CDAP plugin system
*/
public PluginProperties getPluginProperties();
/**
* Get artifact configuration
* @return artifact selector configuration, may be null
*/
public ArtifactSelectorConfig getArtifactConfig();
/**
* Validate plugin configuration
* @throws IllegalArgumentException if plugin configuration is invalid
*/
public void validate();
}Usage Examples:
import co.cask.cdap.etl.proto.v2.ETLPlugin;
import co.cask.cdap.etl.proto.ArtifactSelectorConfig;
// Basic plugin configuration
Map<String, String> basicProperties = new HashMap<>();
basicProperties.put("path", "/data/input");
basicProperties.put("format", "csv");
basicProperties.put("delimiter", ",");
basicProperties.put("skipHeader", "true");
ETLPlugin basicPlugin = new ETLPlugin(
"File",
"batchsource",
basicProperties
);
// Plugin with artifact specification
ArtifactSelectorConfig artifact = new ArtifactSelectorConfig(
"SYSTEM",
"core-plugins",
"2.8.0"
);
Map<String, String> bigQueryProperties = new HashMap<>();
bigQueryProperties.put("project", "my-gcp-project");
bigQueryProperties.put("dataset", "analytics");
bigQueryProperties.put("table", "events");
bigQueryProperties.put("serviceFilePath", "/path/to/service-account.json");
ETLPlugin pluginWithArtifact = new ETLPlugin(
"BigQueryTable",
"batchsource",
bigQueryProperties,
artifact
);
// Complex transform plugin
Map<String, String> pythonProperties = new HashMap<>();
pythonProperties.put("script", "def transform(input, emitter, context):\n" +
" if input['age'] >= 18:\n" +
" input['category'] = 'adult'\n" +
" else:\n" +
" input['category'] = 'minor'\n" +
" emitter.emit(input)");
ETLPlugin pythonTransform = new ETLPlugin(
"Python",
"transform",
pythonProperties
);
// Validate plugins
basicPlugin.validate();
pluginWithArtifact.validate();
pythonTransform.validate();
// Access plugin properties
Map<String, String> properties = basicPlugin.getProperties();
String path = properties.get("path");Standard plugin types and their typical configuration patterns for different ETL operations.
Batch Source Plugins:
// Table source
Map<String, String> tableProps = new HashMap<>();
tableProps.put("name", "input_table");
tableProps.put("schema.row.field", "id");
ETLPlugin tableSource = new ETLPlugin("Table", "batchsource", tableProps);
// File source
Map<String, String> fileProps = new HashMap<>();
fileProps.put("path", "/data/input.csv");
fileProps.put("format", "csv");
ETLPlugin fileSource = new ETLPlugin("File", "batchsource", fileProps);
// Database source
Map<String, String> dbProps = new HashMap<>();
dbProps.put("connectionString", "jdbc:mysql://localhost:3306/db");
dbProps.put("tableName", "users");
dbProps.put("user", "admin");
ETLPlugin dbSource = new ETLPlugin("Database", "batchsource", dbProps);Transform Plugins:
// JavaScript transform
Map<String, String> jsProps = new HashMap<>();
jsProps.put("script", "function transform(input, emitter, context) { /* logic */ }");
ETLPlugin jsTransform = new ETLPlugin("JavaScript", "transform", jsProps);
// Projection transform
Map<String, String> projProps = new HashMap<>();
projProps.put("fieldsToKeep", "id,name,email");
projProps.put("fieldsToRename", "old_name:new_name");
ETLPlugin projection = new ETLPlugin("Projection", "transform", projProps);
// Validator transform
Map<String, String> validProps = new HashMap<>();
validProps.put("validators", "email:email,age:range[0,120]");
ETLPlugin validator = new ETLPlugin("Validator", "transform", validProps);Batch Sink Plugins:
// Table sink
Map<String, String> tableSinkProps = new HashMap<>();
tableSinkProps.put("name", "output_table");
tableSinkProps.put("schema.row.field", "id");
ETLPlugin tableSink = new ETLPlugin("Table", "batchsink", tableSinkProps);
// File sink
Map<String, String> fileSinkProps = new HashMap<>();
fileSinkProps.put("path", "/data/output");
fileSinkProps.put("format", "parquet");
ETLPlugin fileSink = new ETLPlugin("File", "batchsink", fileSinkProps);
// Database sink
Map<String, String> dbSinkProps = new HashMap<>();
dbSinkProps.put("connectionString", "jdbc:postgresql://localhost:5432/warehouse");
dbSinkProps.put("tableName", "processed_data");
ETLPlugin dbSink = new ETLPlugin("Database", "batchsink", dbSinkProps);Comprehensive validation logic ensuring stage and plugin configurations are correct and complete.
/**
* Validation methods for stage and plugin configurations
*/
public class ValidationUtils {
/**
* Common validation patterns for stage names
* - Must not be null or empty
* - Must be unique within pipeline
* - Should follow naming conventions
*/
public static void validateStageName(String name);
/**
* Common validation patterns for plugin configurations
* - Plugin name and type must be specified
* - Required properties must be present
* - Property values must meet format requirements
*/
public static void validatePlugin(ETLPlugin plugin);
}Error Handling Examples:
try {
// Invalid stage - missing name
ETLStage invalidStage = new ETLStage("", plugin);
invalidStage.validate();
} catch (IllegalArgumentException e) {
// Handle validation error
System.err.println("Stage validation failed: " + e.getMessage());
}
try {
// Invalid plugin - missing required properties
ETLPlugin invalidPlugin = new ETLPlugin("Database", "batchsource", new HashMap<>());
invalidPlugin.validate();
} catch (IllegalArgumentException e) {
// Handle plugin validation error
System.err.println("Plugin validation failed: " + e.getMessage());
}
// Best practice: validate before using in pipeline
if (stage.getName() != null && !stage.getName().isEmpty()) {
try {
stage.validate();
// Safe to use stage in pipeline
} catch (IllegalArgumentException e) {
// Log error and handle gracefully
logger.error("Stage validation failed for {}: {}", stage.getName(), e.getMessage());
}
}Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-etl-proto