Protocol classes and configuration objects for ETL (Extract, Transform, Load) pipelines in CDAP
npx @tessl/cli install tessl/maven-co-cask-cdap--cdap-etl-proto@5.1.0CDAP ETL Protocol provides protocol classes and configuration objects for defining ETL (Extract, Transform, Load) pipelines in the Cask Data Application Platform (CDAP). It contains core data structures for representing ETL configurations, stages, plugins, and connections with comprehensive backward compatibility through versioned protocol support (v0, v1, v2).
<dependency>
<groupId>co.cask.cdap</groupId>
<artifactId>cdap-etl-proto</artifactId>
<version>5.1.2</version>
</dependency>import co.cask.cdap.etl.proto.v2.*;
import co.cask.cdap.etl.proto.Connection;
import co.cask.cdap.api.Resources;
import co.cask.cdap.etl.api.Engine;For legacy compatibility:
import co.cask.cdap.etl.proto.v1.*;
import co.cask.cdap.etl.proto.v0.*;import co.cask.cdap.etl.proto.v2.*;
import co.cask.cdap.etl.proto.Connection;
import co.cask.cdap.api.Resources;
// Create ETL plugin configuration
Map<String, String> sourceProps = new HashMap<>();
sourceProps.put("name", "users");
sourceProps.put("schema.row.field", "userid");
ETLPlugin sourcePlugin = new ETLPlugin(
"Table",
"batchsource",
sourceProps
);
Map<String, String> transformProps = new HashMap<>();
transformProps.put("script", "function transform(input, emitter, context) { emitter.emit(input); }");
ETLPlugin transformPlugin = new ETLPlugin(
"JavaScript",
"transform",
transformProps
);
Map<String, String> sinkProps = new HashMap<>();
sinkProps.put("name", "processed_users");
ETLPlugin sinkPlugin = new ETLPlugin(
"Table",
"batchsink",
sinkProps
);
// Create ETL stages
ETLStage sourceStage = new ETLStage("users_source", sourcePlugin);
ETLStage transformStage = new ETLStage("process_users", transformPlugin);
ETLStage sinkStage = new ETLStage("users_sink", sinkPlugin);
// Create pipeline connections
Set<Connection> connections = new HashSet<>();
connections.add(new Connection("users_source", "process_users"));
connections.add(new Connection("process_users", "users_sink"));
// Build batch ETL configuration
ETLBatchConfig config = ETLBatchConfig.builder()
.addStage(sourceStage)
.addStage(transformStage)
.addStage(sinkStage)
.addConnections(connections)
.setResources(new Resources(1024, 2))
.build();
// Validate configuration
config.validate();CDAP ETL Protocol is structured around versioned protocol packages that enable backward compatibility and smooth upgrades:
Latest version ETL pipeline configuration with comprehensive features for batch and streaming data processing. Includes advanced resource management, stage logging, and extensive property support.
public class ETLConfig extends Config implements UpgradeableConfig {
public String getDescription();
public Set<ETLStage> getStages();
public Set<Connection> getConnections();
public Resources getResources();
public Resources getDriverResources();
public Resources getClientResources();
public int getNumOfRecordsPreview();
public boolean isStageLoggingEnabled();
public boolean isProcessTimingEnabled();
public Map<String, String> getProperties();
public void validate();
public boolean canUpgrade();
public UpgradeableConfig upgrade(UpgradeContext upgradeContext);
}
public final class ETLBatchConfig extends ETLConfig {
public List<ETLStage> getPostActions();
public Engine getEngine();
public String getSchedule();
public Integer getMaxConcurrentRuns();
public ETLBatchConfig convertOldConfig();
public static Builder builder();
public static Builder builder(String schedule);
}
public final class DataStreamsConfig extends ETLConfig {
public String getBatchInterval();
public boolean isUnitTest();
public boolean checkpointsDisabled();
public String getExtraJavaOpts();
public Boolean getStopGracefully();
public String getCheckpointDir();
public static Builder builder();
}Core components for defining individual pipeline stages and their associated plugins. Handles plugin configuration, artifact selection, and validation.
public final class ETLStage {
public ETLStage(String name, ETLPlugin plugin);
public String getName();
public ETLPlugin getPlugin();
public void validate();
}
public class ETLPlugin {
public ETLPlugin(String name, String type, Map<String, String> properties);
public ETLPlugin(String name, String type, Map<String, String> properties, ArtifactSelectorConfig artifact);
public String getName();
public String getType();
public Map<String, String> getProperties();
public PluginProperties getPluginProperties();
public ArtifactSelectorConfig getArtifactConfig();
public void validate();
}Connection management for defining data flow between pipeline stages, including support for conditional connections and port-based routing.
public class Connection {
public Connection(String from, String to);
public Connection(String from, String to, String port);
public Connection(String from, String to, Boolean condition);
public String getFrom();
public String getTo();
public String getPort();
public Boolean getCondition();
}Advanced pipeline triggering capabilities with property mapping between triggering and triggered pipelines, supporting both argument and plugin property mappings.
public class TriggeringPropertyMapping {
public TriggeringPropertyMapping(List<ArgumentMapping> arguments, List<PluginPropertyMapping> pluginProperties);
public List<ArgumentMapping> getArguments();
public List<PluginPropertyMapping> getPluginProperties();
}
public class ArgumentMapping {
public ArgumentMapping(String source, String target);
public String getSource();
public String getTarget();
}
public class PluginPropertyMapping extends ArgumentMapping {
public PluginPropertyMapping(String stageName, String source, String target);
public String getStageName();
}Backward compatibility support for v0 and v1 protocol versions with automatic upgrade mechanisms to current v2 format.
public interface UpgradeableConfig<T extends UpgradeableConfig> {
boolean canUpgrade();
T upgrade(UpgradeContext upgradeContext);
}
public interface UpgradeContext {
ArtifactSelectorConfig getPluginArtifact(String pluginType, String pluginName);
}Configuration management for plugin artifacts and pipeline resource allocation, including driver, client, and executor resource specifications.
public class ArtifactSelectorConfig {
public ArtifactSelectorConfig();
public ArtifactSelectorConfig(String scope, String name, String version);
public String getScope();
public String getName();
public String getVersion();
}Artifact and Resource Management
// From co.cask.cdap.etl.api.Engine
public enum Engine {
MAPREDUCE, SPARK
}
// From co.cask.cdap.api.Resources
public class Resources {
public Resources(int memoryMB, int virtualCores);
public int getMemoryMB();
public int getVirtualCores();
}
// Validation error structure (conceptual)
public class ValidationError {
public String getMessage();
public String getField();
}