CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-etl-proto

Protocol classes and configuration objects for ETL (Extract, Transform, Load) pipelines in CDAP

Pending
Overview
Eval results
Files

legacy-version-support.mddocs/

Legacy Version Support

Backward compatibility support for v0 and v1 protocol versions with automatic upgrade mechanisms to current v2 format. This enables smooth migration from older CDAP versions while maintaining pipeline functionality and configuration integrity.

Capabilities

Upgrade Interface

Core upgrade interface enabling version-to-version configuration migration with contextual artifact resolution.

/**
 * Interface for configurations that can be upgraded to newer versions
 * @param <T> the type of configuration this upgrades to
 */
public interface UpgradeableConfig<T extends UpgradeableConfig> {
    /**
     * Check if this configuration can be upgraded to a newer version
     * @return true if upgrade is possible, false if already at latest version
     */
    boolean canUpgrade();
    
    /**
     * Upgrade configuration to the next version
     * This enables chain upgrading: v0 -> v1 -> v2
     * @param upgradeContext context providing artifact resolution and upgrade utilities
     * @return upgraded configuration of the next version
     */
    T upgrade(UpgradeContext upgradeContext);
}

Usage Examples:

import co.cask.cdap.etl.proto.*;
import co.cask.cdap.etl.proto.v0.*;
import co.cask.cdap.etl.proto.v1.*;
import co.cask.cdap.etl.proto.v2.*;

// Upgrade chain example: v0 -> v1 -> v2
co.cask.cdap.etl.proto.v0.ETLBatchConfig v0Config = loadLegacyConfig();

// Check if upgrade is needed
if (v0Config.canUpgrade()) {
    // Upgrade v0 to v1
    co.cask.cdap.etl.proto.v1.ETLBatchConfig v1Config = v0Config.upgrade(upgradeContext);
    
    // Continue upgrade to v2 if needed
    if (v1Config.canUpgrade()) {
        co.cask.cdap.etl.proto.v2.ETLBatchConfig v2Config = v1Config.upgrade(upgradeContext);
        
        // v2Config is now ready for use with current CDAP version
        v2Config.validate();
    }
}

// Automatic chain upgrade utility
public static co.cask.cdap.etl.proto.v2.ETLBatchConfig upgradeToLatest(
    UpgradeableConfig<?> config, UpgradeContext context) {
    
    UpgradeableConfig<?> current = config;
    while (current.canUpgrade()) {
        current = current.upgrade(context);
    }
    return (co.cask.cdap.etl.proto.v2.ETLBatchConfig) current;
}

Upgrade Context

Context interface providing artifact resolution and upgrade utilities during configuration migration.

/**
 * Context for upgrading configurations, providing artifact resolution services
 */
public interface UpgradeContext {
    /**
     * Get artifact information for a plugin type and name
     * Used during upgrade to resolve plugin artifacts from older versions
     * @param pluginType the plugin type (e.g., "batchsource", "transform", "batchsink")
     * @param pluginName the plugin name
     * @return artifact selector configuration for the plugin, null if not found
     */
    ArtifactSelectorConfig getPluginArtifact(String pluginType, String pluginName);
}

Implementation Examples:

import co.cask.cdap.etl.proto.UpgradeContext;
import co.cask.cdap.etl.proto.ArtifactSelectorConfig;

// Custom upgrade context for development/testing
public class DevelopmentUpgradeContext implements UpgradeContext {
    private final Map<String, Map<String, ArtifactSelectorConfig>> pluginArtifacts;
    
    public DevelopmentUpgradeContext() {
        this.pluginArtifacts = loadPluginArtifactMappings();
    }
    
    @Override
    public ArtifactSelectorConfig getPluginArtifact(String pluginType, String pluginName) {
        return pluginArtifacts
            .getOrDefault(pluginType, Collections.emptyMap())
            .get(pluginName);
    }
    
    private Map<String, Map<String, ArtifactSelectorConfig>> loadPluginArtifactMappings() {
        // Load from configuration file or database
        return Map.of(
            "batchsource", Map.of(
                "Table", new ArtifactSelectorConfig("SYSTEM", "core-plugins", "2.8.0"),
                "File", new ArtifactSelectorConfig("SYSTEM", "core-plugins", "2.8.0")
            ),
            "transform", Map.of(
                "JavaScript", new ArtifactSelectorConfig("SYSTEM", "core-plugins", "2.8.0"),
                "Python", new ArtifactSelectorConfig("SYSTEM", "hydrator-plugins", "2.8.0")
            )
        );
    }
}

// Production upgrade context with artifact service integration
public class ProductionUpgradeContext implements UpgradeContext {
    private final ArtifactService artifactService;
    
    public ProductionUpgradeContext(ArtifactService artifactService) {
        this.artifactService = artifactService;
    }
    
    @Override
    public ArtifactSelectorConfig getPluginArtifact(String pluginType, String pluginName) {
        try {
            return artifactService.resolvePluginArtifact(pluginType, pluginName);
        } catch (ArtifactNotFoundException e) {
            logger.warn("Could not resolve artifact for plugin {}:{}", pluginType, pluginName);
            return null;
        }
    }
}

Version 1 (v1) Legacy Support

Support for v1 protocol format featuring separated source/sink/transform structure with basic connection support.

/**
 * ETL Configuration version 1 - legacy format with separated stage types
 */
public class ETLConfig extends Config {
    /**
     * Get source stage configuration
     * @return ETL source stage
     */
    public ETLStage getSource();
    
    /**
     * Get sink stage configurations
     * @return list of ETL sink stages
     */
    public List<ETLStage> getSinks();
    
    /**
     * Get transform stage configurations
     * @return list of ETL transform stages
     */
    public List<ETLStage> getTransforms();
    
    /**
     * Get stage connections
     * @return list of connections between stages
     */
    public List<Connection> getConnections();
    
    /**
     * Get resource allocation
     * @return resource configuration
     */
    public Resources getResources();
    
    /**
     * Check if stage logging is enabled
     * @return true if stage logging enabled
     */
    public Boolean isStageLoggingEnabled();
    
    /**
     * Get compatible configuration (internal conversion method)
     * @return compatible configuration format
     */
    public ETLConfig getCompatibleConfig();
}

/**
 * ETL Batch Configuration version 1
 */
public final class ETLBatchConfig extends ETLConfig implements UpgradeableConfig<co.cask.cdap.etl.proto.v2.ETLBatchConfig> {
    public enum Engine { MAPREDUCE, SPARK }
    
    public Engine getEngine();
    public String getSchedule();
    public List<ETLStage> getActions();
    public Resources getDriverResources();
    
    @Override
    public boolean canUpgrade() { return true; }
    
    @Override
    public co.cask.cdap.etl.proto.v2.ETLBatchConfig upgrade(UpgradeContext upgradeContext);
}

v1 Usage Examples:

import co.cask.cdap.etl.proto.v1.*;

// v1 configuration structure
co.cask.cdap.etl.proto.v1.ETLBatchConfig v1Config = loadV1Config();

// Access v1-specific structure
ETLStage sourceStage = v1Config.getSource();
List<ETLStage> sinkStages = v1Config.getSinks();
List<ETLStage> transformStages = v1Config.getTransforms();

// v1 stage with plugin structure
Plugin sourcePlugin = sourceStage.getPlugin();
String pluginName = sourcePlugin.getName();
Map<String, String> properties = sourcePlugin.getProperties();

// Upgrade v1 to v2
if (v1Config.canUpgrade()) {
    co.cask.cdap.etl.proto.v2.ETLBatchConfig v2Config = v1Config.upgrade(upgradeContext);
    
    // v2 uses unified stage structure
    Set<co.cask.cdap.etl.proto.v2.ETLStage> v2Stages = v2Config.getStages();
    Set<Connection> v2Connections = v2Config.getConnections();
}

Version 0 (v0) Legacy Support

Support for the original v0 protocol format with basic stage structure and minimal connection support.

/**
 * ETL Configuration version 0 - original legacy format
 */
public abstract class ETLConfig extends Config {
    /**
     * Get source stage
     * @return ETL source stage
     */
    public ETLStage getSource();
    
    /**
     * Get sink stages
     * @return immutable list of sink stages
     */
    public List<ETLStage> getSinks();
    
    /**
     * Get transform stages
     * @return immutable list of transform stages
     */
    public List<ETLStage> getTransforms();
    
    /**
     * Get resource allocation
     * @return resource configuration, defaults if null
     */
    public Resources getResources();
}

/**
 * ETL Batch Configuration version 0
 */
public final class ETLBatchConfig extends ETLConfig implements UpgradeableConfig<co.cask.cdap.etl.proto.v1.ETLBatchConfig> {
    public ETLBatchConfig(String schedule, ETLStage source, List<ETLStage> sinks, 
                         List<ETLStage> transforms, Resources resources, List<ETLStage> actions);
    
    public List<ETLStage> getActions();
    
    @Override
    public boolean canUpgrade() { return true; }
    
    @Override
    public co.cask.cdap.etl.proto.v1.ETLBatchConfig upgrade(UpgradeContext upgradeContext);
}

/**
 * ETL Stage version 0 - simple property-based structure
 */
public class ETLStage {
    public ETLStage(String name, Map<String, String> properties, String errorDatasetName);
    
    public String getName();
    public String getErrorDatasetName();
    public Map<String, String> getProperties();
    
    co.cask.cdap.etl.proto.v1.ETLStage upgradeStage(String name, String pluginType, UpgradeContext upgradeContext);
}

v0 Usage Examples:

import co.cask.cdap.etl.proto.v0.*;

// v0 configuration - simple property-based stages
ETLStage v0Source = new ETLStage(
    "Table",
    Map.of("name", "input_table", "schema.row.field", "id"),
    null  // no error dataset
);

ETLStage v0Transform = new ETLStage(
    "JavaScript",
    Map.of("script", "function transform(input, emitter, context) { emitter.emit(input); }"),
    "error_dataset"  // error dataset name
);

ETLStage v0Sink = new ETLStage(
    "Table", 
    Map.of("name", "output_table"),
    null
);

// v0 batch configuration
ETLBatchConfig v0BatchConfig = new ETLBatchConfig(
    "0 0 2 * * ?",  // schedule
    v0Source,
    List.of(v0Sink),
    List.of(v0Transform),
    new Resources(1024, 2),
    List.of()  // no actions
);

// Upgrade v0 to v1
co.cask.cdap.etl.proto.v1.ETLBatchConfig v1Config = v0BatchConfig.upgrade(upgradeContext);

// Note: v0 stages with error datasets will fail upgrade
// Error datasets were replaced by error collectors in later versions

Migration Utilities and Best Practices

Utilities and patterns for safe configuration migration across versions.

/**
 * Migration utilities for handling version upgrades
 */
public class ConfigMigrationUtils {
    /**
     * Safely upgrade configuration with error handling
     * @param config configuration to upgrade
     * @param context upgrade context
     * @return upgraded configuration or original if upgrade fails
     */
    public static UpgradeableConfig<?> safeUpgrade(UpgradeableConfig<?> config, UpgradeContext context);
    
    /**
     * Validate configuration before and after upgrade
     * @param original original configuration
     * @param upgraded upgraded configuration
     * @return validation results
     */
    public static ValidationResult validateUpgrade(UpgradeableConfig<?> original, UpgradeableConfig<?> upgraded);
    
    /**
     * Create backup of configuration before upgrade
     * @param config configuration to backup
     * @return serialized configuration backup
     */
    public static String backupConfiguration(UpgradeableConfig<?> config);
    
    /**
     * Restore configuration from backup if upgrade fails
     * @param backup serialized configuration backup
     * @return restored configuration
     */
    public static UpgradeableConfig<?> restoreFromBackup(String backup);
}

Migration Best Practices:

// Safe upgrade pattern with backup and validation
public co.cask.cdap.etl.proto.v2.ETLBatchConfig migrateConfigSafely(
    UpgradeableConfig<?> legacyConfig, UpgradeContext context) {
    
    // 1. Create backup
    String backup = ConfigMigrationUtils.backupConfiguration(legacyConfig);
    
    try {
        // 2. Perform upgrade
        UpgradeableConfig<?> current = legacyConfig;
        while (current.canUpgrade()) {
            UpgradeableConfig<?> next = current.upgrade(context);
            
            // 3. Validate each upgrade step
            ValidationResult result = ConfigMigrationUtils.validateUpgrade(current, next);
            if (!result.isValid()) {
                throw new ConfigUpgradeException("Upgrade validation failed: " + result.getErrors());
            }
            
            current = next;
        }
        
        // 4. Final validation
        co.cask.cdap.etl.proto.v2.ETLBatchConfig finalConfig = 
            (co.cask.cdap.etl.proto.v2.ETLBatchConfig) current;
        finalConfig.validate();
        
        return finalConfig;
        
    } catch (Exception e) {
        // 5. Restore from backup on failure
        logger.error("Configuration upgrade failed, restoring from backup", e);
        UpgradeableConfig<?> restored = ConfigMigrationUtils.restoreFromBackup(backup);
        throw new ConfigUpgradeException("Upgrade failed, configuration restored", e);
    }
}

// Handle common upgrade issues
public void handleUpgradeIssues(UpgradeableConfig<?> config) {
    try {
        config.upgrade(upgradeContext);
    } catch (IllegalStateException e) {
        if (e.getMessage().contains("Error datasets")) {
            // Handle error dataset migration
            logger.warn("Configuration uses deprecated error datasets. Manual migration required.");
            // Provide migration guidance or automatic conversion
        } else {
            throw e;
        }
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-etl-proto

docs

artifact-resource-management.md

connection-data-flow.md

index.md

legacy-version-support.md

pipeline-configuration.md

pipeline-triggering.md

stage-plugin-management.md

tile.json