CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-etl-proto

Protocol classes and configuration objects for ETL (Extract, Transform, Load) pipelines in CDAP

Pending
Overview
Eval results
Files

artifact-resource-management.mddocs/

Artifact and Resource Management

Configuration management for plugin artifacts and pipeline resource allocation. Handles artifact selection for plugins, resource specification for different pipeline components, and optimization of computing resources across ETL pipeline execution.

Capabilities

Artifact Selection Configuration

Configuration for selecting and resolving plugin artifacts during pipeline execution.

/**
 * Configuration for selecting plugin artifacts during ETL pipeline execution
 */
public class ArtifactSelectorConfig {
    /**
     * Create default artifact selector (empty configuration)
     */
    public ArtifactSelectorConfig();
    
    /**
     * Create artifact selector with specific scope, name, and version
     * @param scope artifact scope (e.g., "SYSTEM", "USER")
     * @param name artifact name
     * @param version artifact version
     */
    public ArtifactSelectorConfig(String scope, String name, String version);
    
    /**
     * Get artifact scope
     * @return artifact scope identifier, may be null
     */
    public String getScope();
    
    /**
     * Get artifact name
     * @return artifact name, may be null
     */
    public String getName();
    
    /**
     * Get artifact version
     * @return artifact version string, may be null
     */
    public String getVersion();
}

Usage Examples:

import co.cask.cdap.etl.proto.ArtifactSelectorConfig;

// System artifact selection (built-in plugins)
ArtifactSelectorConfig systemArtifact = new ArtifactSelectorConfig(
    "SYSTEM",
    "core-plugins", 
    "2.8.0"
);

// User artifact selection (custom plugins)
ArtifactSelectorConfig userArtifact = new ArtifactSelectorConfig(
    "USER",
    "custom-analytics-plugins",
    "1.2.0"
);

// Default artifact selection (no specific requirements)
ArtifactSelectorConfig defaultArtifact = new ArtifactSelectorConfig();

// Use with plugin configuration
Map<String, String> pluginProps = new HashMap<>();
pluginProps.put("algorithm", "advanced");
pluginProps.put("threshold", "0.85");

ETLPlugin pluginWithArtifact = new ETLPlugin(
    "CustomTransform",
    "transform",
    pluginProps,
    userArtifact
);

// Environment-specific artifact selection
String environment = System.getenv("CDAP_ENV");
ArtifactSelectorConfig envSpecificArtifact = new ArtifactSelectorConfig(
    "SYSTEM",
    "core-plugins",
    environment.equals("prod") ? "2.8.0" : "2.9.0-SNAPSHOT"
);

Resource Allocation Configuration

Resource specification and allocation for different components of ETL pipeline execution.

/**
 * Resource allocation configuration for pipeline components
 */
public class Resources {
    /**
     * Create resource configuration with memory and CPU specification
     * @param memoryMB memory allocation in megabytes
     * @param virtualCores number of virtual CPU cores
     */
    public Resources(int memoryMB, int virtualCores);
    
    /**
     * Create default resource configuration (1024 MB, 1 core)
     */
    public Resources();
    
    /**
     * Get memory allocation
     * @return memory in megabytes
     */
    public int getMemoryMB();
    
    /**
     * Get CPU core allocation
     * @return number of virtual cores
     */
    public int getVirtualCores();
}

Usage Examples:

import co.cask.cdap.api.Resources;

// Basic resource configurations
Resources smallPipeline = new Resources(1024, 1);      // 1GB, 1 core
Resources mediumPipeline = new Resources(4096, 4);     // 4GB, 4 cores  
Resources largePipeline = new Resources(16384, 16);    // 16GB, 16 cores

// Default resources
Resources defaultResources = new Resources();  // 1GB, 1 core

// Component-specific resource allocation
Resources driverResources = new Resources(2048, 2);    // Driver: 2GB, 2 cores
Resources executorResources = new Resources(8192, 8);  // Executor: 8GB, 8 cores
Resources clientResources = new Resources(512, 1);     // Client: 512MB, 1 core

// Configure pipeline with different resource tiers
ETLBatchConfig resourceOptimizedConfig = ETLBatchConfig.builder()
    .setResources(executorResources)           // Main execution resources
    .setDriverResources(driverResources)       // Driver resources
    .setClientResources(clientResources)       // Client resources
    .addStage(sourceStage)
    .addStage(transformStage)
    .addStage(sinkStage)
    .build();

// Dynamic resource allocation based on data volume
public Resources calculateResourcesForDataVolume(long recordCount) {
    if (recordCount < 1_000_000) {
        return new Resources(2048, 2);         // < 1M records
    } else if (recordCount < 10_000_000) {
        return new Resources(8192, 8);         // 1M-10M records
    } else {
        return new Resources(32768, 32);       // > 10M records
    }
}

Artifact Management Patterns

Common patterns for artifact selection and management across different deployment scenarios.

Environment-based Artifact Selection:

// Development environment with latest snapshots
public class DevelopmentArtifactSelector {
    public static ArtifactSelectorConfig getCorePluins() {
        return new ArtifactSelectorConfig("SYSTEM", "core-plugins", "3.0.0-SNAPSHOT");
    }
    
    public static ArtifactSelectorConfig getCustomPlugins() {
        return new ArtifactSelectorConfig("USER", "dev-plugins", "latest");
    }
}

// Production environment with stable versions
public class ProductionArtifactSelector {
    public static ArtifactSelectorConfig getCorePlugins() {
        return new ArtifactSelectorConfig("SYSTEM", "core-plugins", "2.8.0");
    }
    
    public static ArtifactSelectorConfig getCustomPlugins() {
        return new ArtifactSelectorConfig("USER", "analytics-plugins", "1.5.2");
    }
}

// Conditional artifact selection
public ArtifactSelectorConfig selectArtifact(String pluginName, String environment) {
    switch (environment.toLowerCase()) {
        case "prod":
            return ProductionArtifactSelector.getCorePlugins();
        case "staging":
            return new ArtifactSelectorConfig("SYSTEM", "core-plugins", "2.8.1-RC1");
        case "dev":
        default:
            return DevelopmentArtifactSelector.getCorePluins();
    }
}

Plugin-specific Artifact Selection:

// Artifact selection based on plugin capabilities
public class PluginArtifactResolver {
    private static final Map<String, ArtifactSelectorConfig> PLUGIN_ARTIFACTS = new HashMap<>();
    
    static {
        // Core plugins
        PLUGIN_ARTIFACTS.put("Table", new ArtifactSelectorConfig("SYSTEM", "core-plugins", "2.8.0"));
        PLUGIN_ARTIFACTS.put("File", new ArtifactSelectorConfig("SYSTEM", "core-plugins", "2.8.0"));
        PLUGIN_ARTIFACTS.put("Database", new ArtifactSelectorConfig("SYSTEM", "database-plugins", "2.8.0"));
        
        // Advanced analytics plugins
        PLUGIN_ARTIFACTS.put("MLTransform", new ArtifactSelectorConfig("USER", "ml-plugins", "2.1.0"));
        PLUGIN_ARTIFACTS.put("TensorFlow", new ArtifactSelectorConfig("USER", "tensorflow-plugins", "1.3.0"));
        
        // Cloud-specific plugins
        PLUGIN_ARTIFACTS.put("BigQuery", new ArtifactSelectorConfig("SYSTEM", "gcp-plugins", "0.20.0"));
        PLUGIN_ARTIFACTS.put("S3", new ArtifactSelectorConfig("SYSTEM", "aws-plugins", "0.15.0"));
    }
    
    public static ArtifactSelectorConfig resolveArtifact(String pluginName) {
        return PLUGIN_ARTIFACTS.getOrDefault(pluginName, new ArtifactSelectorConfig());
    }
}

// Usage in plugin creation
ETLPlugin createPluginWithArtifact(String pluginName, String pluginType, Map<String, String> properties) {
    ArtifactSelectorConfig artifact = PluginArtifactResolver.resolveArtifact(pluginName);
    return new ETLPlugin(pluginName, pluginType, properties, artifact);
}

Resource Optimization Patterns

Strategies for optimizing resource allocation across different pipeline types and data volumes.

Data Volume-based Resource Scaling:

public class ResourceOptimizer {
    
    /**
     * Calculate optimal resources based on input data characteristics
     */
    public static Resources optimizeForDataVolume(DataVolumeMetrics metrics) {
        long recordCount = metrics.getRecordCount();
        long avgRecordSize = metrics.getAvgRecordSizeBytes();
        int transformComplexity = metrics.getTransformComplexityScore();
        
        // Base resource calculation
        int baseMemoryMB = 1024;
        int baseCores = 1;
        
        // Scale based on record count
        if (recordCount > 10_000_000) {
            baseMemoryMB *= 8;
            baseCores *= 8;
        } else if (recordCount > 1_000_000) {
            baseMemoryMB *= 4;
            baseCores *= 4;
        } else if (recordCount > 100_000) {
            baseMemoryMB *= 2;
            baseCores *= 2;
        }
        
        // Adjust for record size
        if (avgRecordSize > 1024) {  // Large records (>1KB)
            baseMemoryMB *= 2;
        }
        
        // Adjust for transform complexity
        if (transformComplexity > 7) {  // Complex transformations
            baseCores *= 2;
            baseMemoryMB = Math.max(baseMemoryMB, 4096);
        }
        
        // Apply resource limits
        baseMemoryMB = Math.min(baseMemoryMB, 65536);  // Max 64GB
        baseCores = Math.min(baseCores, 64);           // Max 64 cores
        
        return new Resources(baseMemoryMB, baseCores);
    }
    
    /**
     * Optimize resources for streaming pipelines
     */
    public static Resources optimizeForStreaming(String batchInterval, int expectedThroughput) {
        int intervalSeconds = parseBatchInterval(batchInterval);
        
        // More frequent batches need more resources
        int memoryMB = 2048;
        int cores = 2;
        
        if (intervalSeconds < 60) {  // Sub-minute batches
            memoryMB *= 4;
            cores *= 4;
        } else if (intervalSeconds < 300) {  // Sub-5-minute batches
            memoryMB *= 2;
            cores *= 2;
        }
        
        // Scale based on throughput
        if (expectedThroughput > 10000) {  // High throughput
            memoryMB *= 2;
            cores *= 2;
        }
        
        return new Resources(memoryMB, cores);
    }
    
    private static int parseBatchInterval(String interval) {
        // Parse interval strings like "30s", "5m", "1h"
        if (interval.endsWith("s")) {
            return Integer.parseInt(interval.substring(0, interval.length() - 1));
        } else if (interval.endsWith("m")) {
            return Integer.parseInt(interval.substring(0, interval.length() - 1)) * 60;
        } else if (interval.endsWith("h")) {
            return Integer.parseInt(interval.substring(0, interval.length() - 1)) * 3600;
        }
        return 60; // Default to 1 minute
    }
}

Environment-specific Resource Configuration:

public class EnvironmentResourceManager {
    
    public static class ResourceProfile {
        public final Resources executor;
        public final Resources driver;
        public final Resources client;
        
        public ResourceProfile(Resources executor, Resources driver, Resources client) {
            this.executor = executor;
            this.driver = driver;
            this.client = client;
        }
    }
    
    // Predefined resource profiles for different environments
    public static final ResourceProfile DEVELOPMENT = new ResourceProfile(
        new Resources(1024, 1),   // Small executor
        new Resources(512, 1),    // Minimal driver
        new Resources(256, 1)     // Minimal client
    );
    
    public static final ResourceProfile TESTING = new ResourceProfile(
        new Resources(2048, 2),   // Medium executor
        new Resources(1024, 1),   // Small driver
        new Resources(512, 1)     // Small client
    );
    
    public static final ResourceProfile PRODUCTION = new ResourceProfile(
        new Resources(8192, 8),   // Large executor
        new Resources(4096, 4),   // Medium driver
        new Resources(1024, 2)    // Medium client
    );
    
    public static final ResourceProfile HIGH_VOLUME = new ResourceProfile(
        new Resources(32768, 32), // XL executor
        new Resources(8192, 8),   // Large driver
        new Resources(2048, 4)    // Large client
    );
    
    public static ResourceProfile getProfileForEnvironment(String environment) {
        switch (environment.toLowerCase()) {
            case "prod":
            case "production":
                return PRODUCTION;
            case "staging":
            case "test":
                return TESTING;
            case "high-volume":
            case "batch":
                return HIGH_VOLUME;
            case "dev":
            case "development":
            default:
                return DEVELOPMENT;
        }
    }
    
    // Apply resource profile to pipeline configuration
    public static ETLBatchConfig.Builder applyResourceProfile(
        ETLBatchConfig.Builder builder, ResourceProfile profile) {
        return builder
            .setResources(profile.executor)
            .setDriverResources(profile.driver)
            .setClientResources(profile.client);
    }
}

// Usage example
String environment = System.getProperty("environment", "dev");
ResourceProfile profile = EnvironmentResourceManager.getProfileForEnvironment(environment);

ETLBatchConfig config = EnvironmentResourceManager.applyResourceProfile(
    ETLBatchConfig.builder(), profile)
    .addStage(sourceStage)
    .addStage(transformStage)
    .addStage(sinkStage)
    .build();

Resource Monitoring and Adjustment

Patterns for monitoring resource usage and making dynamic adjustments.

/**
 * Resource monitoring and adjustment utilities
 */
public class ResourceMonitor {
    
    /**
     * Monitor pipeline resource usage and suggest optimizations
     */
    public static ResourceRecommendation analyzeResourceUsage(PipelineExecutionMetrics metrics) {
        double cpuUtilization = metrics.getAvgCpuUtilization();
        double memoryUtilization = metrics.getAvgMemoryUtilization();
        long executionTimeMs = metrics.getExecutionTimeMs();
        
        ResourceRecommendation recommendation = new ResourceRecommendation();
        
        // CPU optimization
        if (cpuUtilization < 30) {
            recommendation.suggestCpuReduction();
        } else if (cpuUtilization > 90) {
            recommendation.suggestCpuIncrease();
        }
        
        // Memory optimization
        if (memoryUtilization < 40) {
            recommendation.suggestMemoryReduction();
        } else if (memoryUtilization > 85) {
            recommendation.suggestMemoryIncrease();
        }
        
        // Execution time optimization
        if (executionTimeMs > metrics.getSlaTimeMs()) {
            recommendation.suggestPerformanceImprovement();
        }
        
        return recommendation;
    }
    
    /**
     * Auto-scale resources based on historical performance
     */
    public static Resources autoScaleResources(Resources current, List<PipelineExecutionMetrics> history) {
        if (history.isEmpty()) {
            return current;
        }
        
        // Calculate average resource utilization
        double avgCpuUtil = history.stream()
            .mapToDouble(PipelineExecutionMetrics::getAvgCpuUtilization)
            .average()
            .orElse(50.0);
            
        double avgMemoryUtil = history.stream()
            .mapToDouble(PipelineExecutionMetrics::getAvgMemoryUtilization)
            .average()
            .orElse(50.0);
        
        // Scale conservatively
        int newCores = current.getVirtualCores();
        int newMemoryMB = current.getMemoryMB();
        
        if (avgCpuUtil > 80) {
            newCores = Math.min(newCores * 2, 64);
        } else if (avgCpuUtil < 20) {
            newCores = Math.max(newCores / 2, 1);
        }
        
        if (avgMemoryUtil > 80) {
            newMemoryMB = Math.min(newMemoryMB * 2, 65536);
        } else if (avgMemoryUtil < 30) {
            newMemoryMB = Math.max(newMemoryMB / 2, 512);
        }
        
        return new Resources(newMemoryMB, newCores);
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-etl-proto

docs

artifact-resource-management.md

connection-data-flow.md

index.md

legacy-version-support.md

pipeline-configuration.md

pipeline-triggering.md

stage-plugin-management.md

tile.json