Protocol classes and configuration objects for ETL (Extract, Transform, Load) pipelines in CDAP
—
Configuration management for plugin artifacts and pipeline resource allocation. Handles artifact selection for plugins, resource specification for different pipeline components, and optimization of computing resources across ETL pipeline execution.
Configuration for selecting and resolving plugin artifacts during pipeline execution.
/**
* Configuration for selecting plugin artifacts during ETL pipeline execution
*/
public class ArtifactSelectorConfig {
/**
* Create default artifact selector (empty configuration)
*/
public ArtifactSelectorConfig();
/**
* Create artifact selector with specific scope, name, and version
* @param scope artifact scope (e.g., "SYSTEM", "USER")
* @param name artifact name
* @param version artifact version
*/
public ArtifactSelectorConfig(String scope, String name, String version);
/**
* Get artifact scope
* @return artifact scope identifier, may be null
*/
public String getScope();
/**
* Get artifact name
* @return artifact name, may be null
*/
public String getName();
/**
* Get artifact version
* @return artifact version string, may be null
*/
public String getVersion();
}Usage Examples:
import co.cask.cdap.etl.proto.ArtifactSelectorConfig;
// System artifact selection (built-in plugins)
ArtifactSelectorConfig systemArtifact = new ArtifactSelectorConfig(
"SYSTEM",
"core-plugins",
"2.8.0"
);
// User artifact selection (custom plugins)
ArtifactSelectorConfig userArtifact = new ArtifactSelectorConfig(
"USER",
"custom-analytics-plugins",
"1.2.0"
);
// Default artifact selection (no specific requirements)
ArtifactSelectorConfig defaultArtifact = new ArtifactSelectorConfig();
// Use with plugin configuration
Map<String, String> pluginProps = new HashMap<>();
pluginProps.put("algorithm", "advanced");
pluginProps.put("threshold", "0.85");
ETLPlugin pluginWithArtifact = new ETLPlugin(
"CustomTransform",
"transform",
pluginProps,
userArtifact
);
// Environment-specific artifact selection
String environment = System.getenv("CDAP_ENV");
ArtifactSelectorConfig envSpecificArtifact = new ArtifactSelectorConfig(
"SYSTEM",
"core-plugins",
environment.equals("prod") ? "2.8.0" : "2.9.0-SNAPSHOT"
);Resource specification and allocation for different components of ETL pipeline execution.
/**
* Resource allocation configuration for pipeline components
*/
public class Resources {
/**
* Create resource configuration with memory and CPU specification
* @param memoryMB memory allocation in megabytes
* @param virtualCores number of virtual CPU cores
*/
public Resources(int memoryMB, int virtualCores);
/**
* Create default resource configuration (1024 MB, 1 core)
*/
public Resources();
/**
* Get memory allocation
* @return memory in megabytes
*/
public int getMemoryMB();
/**
* Get CPU core allocation
* @return number of virtual cores
*/
public int getVirtualCores();
}Usage Examples:
import co.cask.cdap.api.Resources;
// Basic resource configurations
Resources smallPipeline = new Resources(1024, 1); // 1GB, 1 core
Resources mediumPipeline = new Resources(4096, 4); // 4GB, 4 cores
Resources largePipeline = new Resources(16384, 16); // 16GB, 16 cores
// Default resources
Resources defaultResources = new Resources(); // 1GB, 1 core
// Component-specific resource allocation
Resources driverResources = new Resources(2048, 2); // Driver: 2GB, 2 cores
Resources executorResources = new Resources(8192, 8); // Executor: 8GB, 8 cores
Resources clientResources = new Resources(512, 1); // Client: 512MB, 1 core
// Configure pipeline with different resource tiers
ETLBatchConfig resourceOptimizedConfig = ETLBatchConfig.builder()
.setResources(executorResources) // Main execution resources
.setDriverResources(driverResources) // Driver resources
.setClientResources(clientResources) // Client resources
.addStage(sourceStage)
.addStage(transformStage)
.addStage(sinkStage)
.build();
// Dynamic resource allocation based on data volume
public Resources calculateResourcesForDataVolume(long recordCount) {
if (recordCount < 1_000_000) {
return new Resources(2048, 2); // < 1M records
} else if (recordCount < 10_000_000) {
return new Resources(8192, 8); // 1M-10M records
} else {
return new Resources(32768, 32); // > 10M records
}
}Common patterns for artifact selection and management across different deployment scenarios.
Environment-based Artifact Selection:
// Development environment with latest snapshots
public class DevelopmentArtifactSelector {
public static ArtifactSelectorConfig getCorePluins() {
return new ArtifactSelectorConfig("SYSTEM", "core-plugins", "3.0.0-SNAPSHOT");
}
public static ArtifactSelectorConfig getCustomPlugins() {
return new ArtifactSelectorConfig("USER", "dev-plugins", "latest");
}
}
// Production environment with stable versions
public class ProductionArtifactSelector {
public static ArtifactSelectorConfig getCorePlugins() {
return new ArtifactSelectorConfig("SYSTEM", "core-plugins", "2.8.0");
}
public static ArtifactSelectorConfig getCustomPlugins() {
return new ArtifactSelectorConfig("USER", "analytics-plugins", "1.5.2");
}
}
// Conditional artifact selection
public ArtifactSelectorConfig selectArtifact(String pluginName, String environment) {
switch (environment.toLowerCase()) {
case "prod":
return ProductionArtifactSelector.getCorePlugins();
case "staging":
return new ArtifactSelectorConfig("SYSTEM", "core-plugins", "2.8.1-RC1");
case "dev":
default:
return DevelopmentArtifactSelector.getCorePluins();
}
}Plugin-specific Artifact Selection:
// Artifact selection based on plugin capabilities
public class PluginArtifactResolver {
private static final Map<String, ArtifactSelectorConfig> PLUGIN_ARTIFACTS = new HashMap<>();
static {
// Core plugins
PLUGIN_ARTIFACTS.put("Table", new ArtifactSelectorConfig("SYSTEM", "core-plugins", "2.8.0"));
PLUGIN_ARTIFACTS.put("File", new ArtifactSelectorConfig("SYSTEM", "core-plugins", "2.8.0"));
PLUGIN_ARTIFACTS.put("Database", new ArtifactSelectorConfig("SYSTEM", "database-plugins", "2.8.0"));
// Advanced analytics plugins
PLUGIN_ARTIFACTS.put("MLTransform", new ArtifactSelectorConfig("USER", "ml-plugins", "2.1.0"));
PLUGIN_ARTIFACTS.put("TensorFlow", new ArtifactSelectorConfig("USER", "tensorflow-plugins", "1.3.0"));
// Cloud-specific plugins
PLUGIN_ARTIFACTS.put("BigQuery", new ArtifactSelectorConfig("SYSTEM", "gcp-plugins", "0.20.0"));
PLUGIN_ARTIFACTS.put("S3", new ArtifactSelectorConfig("SYSTEM", "aws-plugins", "0.15.0"));
}
public static ArtifactSelectorConfig resolveArtifact(String pluginName) {
return PLUGIN_ARTIFACTS.getOrDefault(pluginName, new ArtifactSelectorConfig());
}
}
// Usage in plugin creation
ETLPlugin createPluginWithArtifact(String pluginName, String pluginType, Map<String, String> properties) {
ArtifactSelectorConfig artifact = PluginArtifactResolver.resolveArtifact(pluginName);
return new ETLPlugin(pluginName, pluginType, properties, artifact);
}Strategies for optimizing resource allocation across different pipeline types and data volumes.
Data Volume-based Resource Scaling:
public class ResourceOptimizer {
/**
* Calculate optimal resources based on input data characteristics
*/
public static Resources optimizeForDataVolume(DataVolumeMetrics metrics) {
long recordCount = metrics.getRecordCount();
long avgRecordSize = metrics.getAvgRecordSizeBytes();
int transformComplexity = metrics.getTransformComplexityScore();
// Base resource calculation
int baseMemoryMB = 1024;
int baseCores = 1;
// Scale based on record count
if (recordCount > 10_000_000) {
baseMemoryMB *= 8;
baseCores *= 8;
} else if (recordCount > 1_000_000) {
baseMemoryMB *= 4;
baseCores *= 4;
} else if (recordCount > 100_000) {
baseMemoryMB *= 2;
baseCores *= 2;
}
// Adjust for record size
if (avgRecordSize > 1024) { // Large records (>1KB)
baseMemoryMB *= 2;
}
// Adjust for transform complexity
if (transformComplexity > 7) { // Complex transformations
baseCores *= 2;
baseMemoryMB = Math.max(baseMemoryMB, 4096);
}
// Apply resource limits
baseMemoryMB = Math.min(baseMemoryMB, 65536); // Max 64GB
baseCores = Math.min(baseCores, 64); // Max 64 cores
return new Resources(baseMemoryMB, baseCores);
}
/**
* Optimize resources for streaming pipelines
*/
public static Resources optimizeForStreaming(String batchInterval, int expectedThroughput) {
int intervalSeconds = parseBatchInterval(batchInterval);
// More frequent batches need more resources
int memoryMB = 2048;
int cores = 2;
if (intervalSeconds < 60) { // Sub-minute batches
memoryMB *= 4;
cores *= 4;
} else if (intervalSeconds < 300) { // Sub-5-minute batches
memoryMB *= 2;
cores *= 2;
}
// Scale based on throughput
if (expectedThroughput > 10000) { // High throughput
memoryMB *= 2;
cores *= 2;
}
return new Resources(memoryMB, cores);
}
private static int parseBatchInterval(String interval) {
// Parse interval strings like "30s", "5m", "1h"
if (interval.endsWith("s")) {
return Integer.parseInt(interval.substring(0, interval.length() - 1));
} else if (interval.endsWith("m")) {
return Integer.parseInt(interval.substring(0, interval.length() - 1)) * 60;
} else if (interval.endsWith("h")) {
return Integer.parseInt(interval.substring(0, interval.length() - 1)) * 3600;
}
return 60; // Default to 1 minute
}
}Environment-specific Resource Configuration:
public class EnvironmentResourceManager {
public static class ResourceProfile {
public final Resources executor;
public final Resources driver;
public final Resources client;
public ResourceProfile(Resources executor, Resources driver, Resources client) {
this.executor = executor;
this.driver = driver;
this.client = client;
}
}
// Predefined resource profiles for different environments
public static final ResourceProfile DEVELOPMENT = new ResourceProfile(
new Resources(1024, 1), // Small executor
new Resources(512, 1), // Minimal driver
new Resources(256, 1) // Minimal client
);
public static final ResourceProfile TESTING = new ResourceProfile(
new Resources(2048, 2), // Medium executor
new Resources(1024, 1), // Small driver
new Resources(512, 1) // Small client
);
public static final ResourceProfile PRODUCTION = new ResourceProfile(
new Resources(8192, 8), // Large executor
new Resources(4096, 4), // Medium driver
new Resources(1024, 2) // Medium client
);
public static final ResourceProfile HIGH_VOLUME = new ResourceProfile(
new Resources(32768, 32), // XL executor
new Resources(8192, 8), // Large driver
new Resources(2048, 4) // Large client
);
public static ResourceProfile getProfileForEnvironment(String environment) {
switch (environment.toLowerCase()) {
case "prod":
case "production":
return PRODUCTION;
case "staging":
case "test":
return TESTING;
case "high-volume":
case "batch":
return HIGH_VOLUME;
case "dev":
case "development":
default:
return DEVELOPMENT;
}
}
// Apply resource profile to pipeline configuration
public static ETLBatchConfig.Builder applyResourceProfile(
ETLBatchConfig.Builder builder, ResourceProfile profile) {
return builder
.setResources(profile.executor)
.setDriverResources(profile.driver)
.setClientResources(profile.client);
}
}
// Usage example
String environment = System.getProperty("environment", "dev");
ResourceProfile profile = EnvironmentResourceManager.getProfileForEnvironment(environment);
ETLBatchConfig config = EnvironmentResourceManager.applyResourceProfile(
ETLBatchConfig.builder(), profile)
.addStage(sourceStage)
.addStage(transformStage)
.addStage(sinkStage)
.build();Patterns for monitoring resource usage and making dynamic adjustments.
/**
* Resource monitoring and adjustment utilities
*/
public class ResourceMonitor {
/**
* Monitor pipeline resource usage and suggest optimizations
*/
public static ResourceRecommendation analyzeResourceUsage(PipelineExecutionMetrics metrics) {
double cpuUtilization = metrics.getAvgCpuUtilization();
double memoryUtilization = metrics.getAvgMemoryUtilization();
long executionTimeMs = metrics.getExecutionTimeMs();
ResourceRecommendation recommendation = new ResourceRecommendation();
// CPU optimization
if (cpuUtilization < 30) {
recommendation.suggestCpuReduction();
} else if (cpuUtilization > 90) {
recommendation.suggestCpuIncrease();
}
// Memory optimization
if (memoryUtilization < 40) {
recommendation.suggestMemoryReduction();
} else if (memoryUtilization > 85) {
recommendation.suggestMemoryIncrease();
}
// Execution time optimization
if (executionTimeMs > metrics.getSlaTimeMs()) {
recommendation.suggestPerformanceImprovement();
}
return recommendation;
}
/**
* Auto-scale resources based on historical performance
*/
public static Resources autoScaleResources(Resources current, List<PipelineExecutionMetrics> history) {
if (history.isEmpty()) {
return current;
}
// Calculate average resource utilization
double avgCpuUtil = history.stream()
.mapToDouble(PipelineExecutionMetrics::getAvgCpuUtilization)
.average()
.orElse(50.0);
double avgMemoryUtil = history.stream()
.mapToDouble(PipelineExecutionMetrics::getAvgMemoryUtilization)
.average()
.orElse(50.0);
// Scale conservatively
int newCores = current.getVirtualCores();
int newMemoryMB = current.getMemoryMB();
if (avgCpuUtil > 80) {
newCores = Math.min(newCores * 2, 64);
} else if (avgCpuUtil < 20) {
newCores = Math.max(newCores / 2, 1);
}
if (avgMemoryUtil > 80) {
newMemoryMB = Math.min(newMemoryMB * 2, 65536);
} else if (avgMemoryUtil < 30) {
newMemoryMB = Math.max(newMemoryMB / 2, 512);
}
return new Resources(newMemoryMB, newCores);
}
}Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-etl-proto