CDAP ETL API provides comprehensive abstractions for building Extract, Transform, and Load pipeline applications on the CDAP platform
—
Pipeline actions and conditional execution for workflow control, external integrations, and dynamic pipeline behavior in CDAP ETL.
Base abstract class for pipeline actions that execute custom logic or external operations.
package io.cdap.cdap.etl.api.action;
public abstract class Action
implements PipelineConfigurable,
SubmitterLifecycle<ActionContext>,
StageLifecycle<ActionContext> {
public static final String PLUGIN_TYPE = "action";
// Configuration lifecycle
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {}
// Submission lifecycle
public void prepareRun(ActionContext context) throws Exception {}
public void onRunFinish(boolean succeeded, ActionContext context) {}
// Stage lifecycle
public void initialize(ActionContext context) throws Exception {}
public void destroy() {}
// Action execution
public abstract void run() throws Exception;
}Action Implementation Example:
@Plugin(type = Action.PLUGIN_TYPE)
@Name("DatabaseCleanup")
@Description("Cleans up old records from database tables")
public class DatabaseCleanupAction extends Action {
private final Config config;
private ActionContext actionContext;
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
FailureCollector collector = stageConfigurer.getFailureCollector();
// Validate configuration
config.validate(collector);
}
@Override
public void prepareRun(ActionContext context) throws Exception {
// Validate database connectivity
try (Connection conn = getConnection()) {
if (!conn.isValid(30)) {
throw new Exception("Database connection is not valid");
}
}
}
@Override
public void initialize(ActionContext context) throws Exception {
this.actionContext = context;
}
@Override
public void run() throws Exception {
StageMetrics metrics = actionContext.getMetrics();
SettableArguments arguments = actionContext.getArguments();
try (Connection conn = getConnection()) {
for (String tableName : config.tablesToClean) {
long deletedRecords = cleanupTable(conn, tableName);
// Record metrics
metrics.count("records.deleted." + tableName, deletedRecords);
// Set runtime arguments for downstream stages
arguments.set("cleanup." + tableName + ".deleted", String.valueOf(deletedRecords));
LOG.info("Deleted {} old records from table {}", deletedRecords, tableName);
}
// Set overall completion status
arguments.set("cleanup.completed", "true");
arguments.set("cleanup.timestamp", Instant.now().toString());
} catch (SQLException e) {
metrics.count("cleanup.errors", 1);
throw new Exception("Database cleanup failed", e);
}
}
private long cleanupTable(Connection conn, String tableName) throws SQLException {
String deleteSQL = String.format(
"DELETE FROM %s WHERE created_date < ?",
tableName
);
try (PreparedStatement stmt = conn.prepareStatement(deleteSQL)) {
// Delete records older than retention days
Timestamp cutoffDate = Timestamp.from(
Instant.now().minus(config.retentionDays, ChronoUnit.DAYS)
);
stmt.setTimestamp(1, cutoffDate);
return stmt.executeUpdate();
}
}
private Connection getConnection() throws SQLException {
return DriverManager.getConnection(
config.connectionString,
config.username,
config.password
);
}
}Context interface for action execution providing access to runtime services.
package io.cdap.cdap.etl.api.action;
public interface ActionContext extends StageContext {
/**
* Get settable arguments for passing data to other stages.
*/
SettableArguments getArguments();
}Interface for arguments that can be modified by actions.
package io.cdap.cdap.etl.api.action;
public interface SettableArguments extends Arguments {
/**
* Set argument value.
*/
void set(String name, String value);
}@Plugin(type = Action.PLUGIN_TYPE)
@Name("FileProcessor")
@Description("Processes files and prepares them for pipeline consumption")
public class FileProcessorAction extends Action {
private final Config config;
@Override
public void run() throws Exception {
ActionContext context = getContext();
SettableArguments arguments = context.getArguments();
StageMetrics metrics = context.getMetrics();
// Get input directory
String inputDir = config.inputDirectory;
String outputDir = config.outputDirectory;
String archiveDir = config.archiveDirectory;
// Create directories if they don't exist
createDirectoryIfNotExists(outputDir);
createDirectoryIfNotExists(archiveDir);
// Process files
List<String> processedFiles = new ArrayList<>();
int totalFiles = 0;
int successfulFiles = 0;
int errorFiles = 0;
try (DirectoryStream<Path> stream = Files.newDirectoryStream(
Paths.get(inputDir), config.filePattern)) {
for (Path filePath : stream) {
totalFiles++;
String fileName = filePath.getFileName().toString();
try {
// Process individual file
boolean processed = processFile(filePath, outputDir);
if (processed) {
successfulFiles++;
processedFiles.add(fileName);
// Archive processed file
if (config.archiveProcessedFiles) {
archiveFile(filePath, archiveDir);
}
} else {
errorFiles++;
LOG.warn("Failed to process file: {}", fileName);
}
} catch (Exception e) {
errorFiles++;
LOG.error("Error processing file: {}", fileName, e);
// Move error files to error directory
if (config.errorDirectory != null) {
moveFileToErrorDirectory(filePath, config.errorDirectory);
}
}
}
}
// Record metrics
metrics.count("files.total", totalFiles);
metrics.count("files.successful", successfulFiles);
metrics.count("files.errors", errorFiles);
// Set arguments for downstream stages
arguments.set("processed.file.count", String.valueOf(successfulFiles));
arguments.set("processed.files", String.join(",", processedFiles));
arguments.set("output.directory", outputDir);
if (successfulFiles == 0 && config.failOnNoFiles) {
throw new Exception("No files were successfully processed");
}
LOG.info("Processed {} files successfully, {} errors out of {} total files",
successfulFiles, errorFiles, totalFiles);
}
private boolean processFile(Path inputFile, String outputDir) throws IOException {
String fileName = inputFile.getFileName().toString();
Path outputFile = Paths.get(outputDir, fileName);
// Apply file transformations based on type
String fileExtension = getFileExtension(fileName);
switch (fileExtension.toLowerCase()) {
case "csv":
return processCSVFile(inputFile, outputFile);
case "json":
return processJSONFile(inputFile, outputFile);
case "xml":
return processXMLFile(inputFile, outputFile);
default:
// For unknown types, just copy
Files.copy(inputFile, outputFile, StandardCopyOption.REPLACE_EXISTING);
return true;
}
}
private boolean processCSVFile(Path inputFile, Path outputFile) throws IOException {
// CSV-specific processing: validate format, clean data, etc.
try (BufferedReader reader = Files.newBufferedReader(inputFile);
BufferedWriter writer = Files.newBufferedWriter(outputFile)) {
String line;
int lineNumber = 0;
boolean hasHeader = config.csvHasHeader;
while ((line = reader.readLine()) != null) {
lineNumber++;
// Skip header if configured
if (hasHeader && lineNumber == 1) {
writer.write(line);
writer.newLine();
continue;
}
// Validate and clean CSV line
String cleanedLine = cleanCSVLine(line);
if (cleanedLine != null) {
writer.write(cleanedLine);
writer.newLine();
}
}
return true;
}
}
private String cleanCSVLine(String line) {
if (line == null || line.trim().isEmpty()) {
return null;
}
// Apply CSV cleaning rules
String[] fields = line.split(config.csvDelimiter);
// Validate field count
if (config.expectedFieldCount > 0 && fields.length != config.expectedFieldCount) {
LOG.warn("Invalid field count in CSV line: expected {}, got {}",
config.expectedFieldCount, fields.length);
return null;
}
// Clean individual fields
for (int i = 0; i < fields.length; i++) {
fields[i] = fields[i].trim();
// Remove quotes if present
if (fields[i].startsWith("\"") && fields[i].endsWith("\"")) {
fields[i] = fields[i].substring(1, fields[i].length() - 1);
}
}
return String.join(config.csvDelimiter, fields);
}
}@Plugin(type = Action.PLUGIN_TYPE)
@Name("APINotification")
@Description("Sends notifications to external systems via REST API")
public class APINotificationAction extends Action {
private final Config config;
private HttpClient httpClient;
@Override
public void initialize(ActionContext context) throws Exception {
// Initialize HTTP client
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(config.connectTimeoutSeconds))
.build();
}
@Override
public void run() throws Exception {
ActionContext context = getContext();
Arguments arguments = context.getArguments();
StageMetrics metrics = context.getMetrics();
// Build notification payload
Map<String, Object> payload = buildNotificationPayload(arguments);
// Convert to JSON
String jsonPayload = new Gson().toJson(payload);
// Send notification
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(config.webhookUrl))
.header("Content-Type", "application/json")
.header("Authorization", "Bearer " + config.apiToken)
.POST(HttpRequest.BodyPublishers.ofString(jsonPayload))
.timeout(Duration.ofSeconds(config.requestTimeoutSeconds))
.build();
try {
HttpResponse<String> response = httpClient.send(request,
HttpResponse.BodyHandlers.ofString());
if (response.statusCode() >= 200 && response.statusCode() < 300) {
metrics.count("notifications.success", 1);
LOG.info("Notification sent successfully: {}", response.statusCode());
// Parse response if needed
if (config.parseResponse) {
parseAndSetResponse(response.body(), context.getArguments());
}
} else {
metrics.count("notifications.failure", 1);
String errorMsg = String.format("Notification failed with status %d: %s",
response.statusCode(), response.body());
if (config.failOnError) {
throw new Exception(errorMsg);
} else {
LOG.warn(errorMsg);
}
}
} catch (IOException | InterruptedException e) {
metrics.count("notifications.error", 1);
if (config.failOnError) {
throw new Exception("Failed to send notification", e);
} else {
LOG.error("Error sending notification", e);
}
}
}
private Map<String, Object> buildNotificationPayload(Arguments arguments) {
Map<String, Object> payload = new HashMap<>();
// Add basic pipeline information
payload.put("pipelineName", arguments.get("pipeline.name"));
payload.put("pipelineRunId", arguments.get("pipeline.run.id"));
payload.put("timestamp", Instant.now().toString());
// Add custom fields from configuration
if (config.customFields != null) {
for (Map.Entry<String, String> entry : config.customFields.entrySet()) {
String key = entry.getKey();
String template = entry.getValue();
// Resolve template variables
String value = resolveTemplate(template, arguments);
payload.put(key, value);
}
}
// Add dynamic arguments
if (config.includeArguments != null) {
Map<String, String> dynamicArgs = new HashMap<>();
for (String argName : config.includeArguments) {
String argValue = arguments.get(argName);
if (argValue != null) {
dynamicArgs.put(argName, argValue);
}
}
payload.put("arguments", dynamicArgs);
}
return payload;
}
private String resolveTemplate(String template, Arguments arguments) {
String resolved = template;
// Simple template variable resolution: ${variable.name}
Pattern pattern = Pattern.compile("\\$\\{([^}]+)\\}");
Matcher matcher = pattern.matcher(template);
while (matcher.find()) {
String variableName = matcher.group(1);
String variableValue = arguments.get(variableName);
if (variableValue != null) {
resolved = resolved.replace(matcher.group(0), variableValue);
}
}
return resolved;
}
@Override
public void destroy() {
// Cleanup HTTP client resources
if (httpClient != null) {
// HttpClient doesn't need explicit cleanup in Java 11+
}
}
}Base abstract class for conditional execution in pipelines.
package io.cdap.cdap.etl.api.condition;
public abstract class Condition
implements PipelineConfigurable,
SubmitterLifecycle<ConditionContext>,
StageLifecycle<ConditionContext> {
public static final String PLUGIN_TYPE = "condition";
// Configuration lifecycle
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {}
// Submission lifecycle
public void prepareRun(ConditionContext context) throws Exception {}
public void onRunFinish(boolean succeeded, ConditionContext context) {}
// Stage lifecycle
public void initialize(ConditionContext context) throws Exception {}
public void destroy() {}
// Condition evaluation
public abstract ConditionResult apply() throws Exception;
}Condition Implementation Example:
@Plugin(type = Condition.PLUGIN_TYPE)
@Name("FileExistenceCondition")
@Description("Checks if specified files exist before proceeding")
public class FileExistenceCondition extends Condition {
private final Config config;
private ConditionContext conditionContext;
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
FailureCollector collector = stageConfigurer.getFailureCollector();
config.validate(collector);
}
@Override
public void initialize(ConditionContext context) throws Exception {
this.conditionContext = context;
}
@Override
public ConditionResult apply() throws Exception {
StageMetrics metrics = conditionContext.getMetrics();
Arguments arguments = conditionContext.getArguments();
List<String> missingFiles = new ArrayList<>();
List<String> existingFiles = new ArrayList<>();
// Check each required file
for (String filePath : config.requiredFiles) {
// Resolve file path with runtime arguments
String resolvedPath = resolveFilePath(filePath, arguments);
if (fileExists(resolvedPath)) {
existingFiles.add(resolvedPath);
metrics.count("files.found", 1);
} else {
missingFiles.add(resolvedPath);
metrics.count("files.missing", 1);
}
}
// Determine condition result
boolean conditionMet = false;
String message;
switch (config.checkMode) {
case ALL_MUST_EXIST:
conditionMet = missingFiles.isEmpty();
message = conditionMet ?
"All required files exist" :
"Missing files: " + String.join(", ", missingFiles);
break;
case ANY_MUST_EXIST:
conditionMet = !existingFiles.isEmpty();
message = conditionMet ?
"Found files: " + String.join(", ", existingFiles) :
"No required files found";
break;
case NONE_MUST_EXIST:
conditionMet = existingFiles.isEmpty();
message = conditionMet ?
"No files exist (as expected)" :
"Unexpected files found: " + String.join(", ", existingFiles);
break;
default:
throw new IllegalArgumentException("Unknown check mode: " + config.checkMode);
}
LOG.info("File existence condition: {} - {}", conditionMet, message);
return new ConditionResult(conditionMet, message);
}
private String resolveFilePath(String filePath, Arguments arguments) {
String resolved = filePath;
// Replace runtime argument placeholders
Pattern pattern = Pattern.compile("\\$\\{([^}]+)\\}");
Matcher matcher = pattern.matcher(filePath);
while (matcher.find()) {
String argName = matcher.group(1);
String argValue = arguments.get(argName);
if (argValue != null) {
resolved = resolved.replace(matcher.group(0), argValue);
}
}
return resolved;
}
private boolean fileExists(String filePath) {
try {
Path path = Paths.get(filePath);
return Files.exists(path) && Files.isReadable(path);
} catch (Exception e) {
LOG.warn("Error checking file existence: {}", filePath, e);
return false;
}
}
}Context interface for condition evaluation.
package io.cdap.cdap.etl.api.condition;
public interface ConditionContext extends StageContext {
/**
* Get stage statistics for previous stages.
*/
StageStatistics getStageStatistics(String stageName);
}Interface for accessing statistics from pipeline stages.
package io.cdap.cdap.etl.api.condition;
public interface StageStatistics {
/**
* Get input record count.
*/
long getInputRecordsCount();
/**
* Get output record count.
*/
long getOutputRecordsCount();
/**
* Get error record count.
*/
long getErrorRecordsCount();
}@Plugin(type = Condition.PLUGIN_TYPE)
@Name("DataQualityCondition")
@Description("Checks data quality metrics before proceeding")
public class DataQualityCondition extends Condition {
private final Config config;
@Override
public ConditionResult apply() throws Exception {
ConditionContext context = getContext();
boolean qualityMet = true;
StringBuilder messageBuilder = new StringBuilder();
List<String> failures = new ArrayList<>();
// Check each configured stage's statistics
for (QualityCheck check : config.qualityChecks) {
StageStatistics stats = context.getStageStatistics(check.stageName);
if (stats == null) {
failures.add("No statistics available for stage: " + check.stageName);
qualityMet = false;
continue;
}
// Check error rate
if (check.maxErrorRate != null) {
long totalRecords = stats.getInputRecordsCount();
long errorRecords = stats.getErrorRecordsCount();
if (totalRecords > 0) {
double errorRate = (double) errorRecords / totalRecords;
if (errorRate > check.maxErrorRate) {
failures.add(String.format(
"Stage %s error rate %.2f%% exceeds limit %.2f%%",
check.stageName, errorRate * 100, check.maxErrorRate * 100
));
qualityMet = false;
}
}
}
// Check minimum record count
if (check.minRecordCount != null) {
long outputRecords = stats.getOutputRecordsCount();
if (outputRecords < check.minRecordCount) {
failures.add(String.format(
"Stage %s output count %d below minimum %d",
check.stageName, outputRecords, check.minRecordCount
));
qualityMet = false;
}
}
// Check maximum record count
if (check.maxRecordCount != null) {
long outputRecords = stats.getOutputRecordsCount();
if (outputRecords > check.maxRecordCount) {
failures.add(String.format(
"Stage %s output count %d exceeds maximum %d",
check.stageName, outputRecords, check.maxRecordCount
));
qualityMet = false;
}
}
}
String message;
if (qualityMet) {
message = "All data quality checks passed";
} else {
message = "Data quality failures: " + String.join("; ", failures);
}
return new ConditionResult(qualityMet, message);
}
private static class QualityCheck {
public String stageName;
public Double maxErrorRate;
public Long minRecordCount;
public Long maxRecordCount;
}
}@Plugin(type = Condition.PLUGIN_TYPE)
@Name("TimeWindowCondition")
@Description("Checks if current time is within allowed execution window")
public class TimeWindowCondition extends Condition {
private final Config config;
@Override
public ConditionResult apply() throws Exception {
ZonedDateTime now = ZonedDateTime.now(ZoneId.of(config.timezone));
// Check day of week
if (config.allowedDaysOfWeek != null && !config.allowedDaysOfWeek.isEmpty()) {
DayOfWeek currentDay = now.getDayOfWeek();
if (!config.allowedDaysOfWeek.contains(currentDay)) {
return new ConditionResult(false,
String.format("Current day %s not in allowed days: %s",
currentDay, config.allowedDaysOfWeek));
}
}
// Check time window
if (config.startTime != null && config.endTime != null) {
LocalTime currentTime = now.toLocalTime();
LocalTime startTime = LocalTime.parse(config.startTime);
LocalTime endTime = LocalTime.parse(config.endTime);
boolean inWindow;
if (startTime.isBefore(endTime)) {
// Normal window (e.g., 09:00-17:00)
inWindow = currentTime.isAfter(startTime) && currentTime.isBefore(endTime);
} else {
// Overnight window (e.g., 22:00-06:00)
inWindow = currentTime.isAfter(startTime) || currentTime.isBefore(endTime);
}
if (!inWindow) {
return new ConditionResult(false,
String.format("Current time %s not in allowed window %s-%s",
currentTime, config.startTime, config.endTime));
}
}
// Check date range
if (config.startDate != null || config.endDate != null) {
LocalDate currentDate = now.toLocalDate();
if (config.startDate != null) {
LocalDate startDate = LocalDate.parse(config.startDate);
if (currentDate.isBefore(startDate)) {
return new ConditionResult(false,
String.format("Current date %s before start date %s",
currentDate, config.startDate));
}
}
if (config.endDate != null) {
LocalDate endDate = LocalDate.parse(config.endDate);
if (currentDate.isAfter(endDate)) {
return new ConditionResult(false,
String.format("Current date %s after end date %s",
currentDate, config.endDate));
}
}
}
return new ConditionResult(true,
String.format("Time window check passed at %s", now));
}
}public class ConditionalPipelineOrchestrator {
public static void executeConditionalStages(List<ConditionalStage> stages,
PipelineContext context) throws Exception {
for (ConditionalStage stage : stages) {
if (stage.hasCondition()) {
// Evaluate condition
ConditionResult result = stage.getCondition().apply();
if (result.isConditionMet()) {
LOG.info("Condition met for stage {}: {}",
stage.getName(), result.getMessage());
// Execute stage
executeStage(stage, context);
} else {
LOG.info("Condition not met for stage {}: {}",
stage.getName(), result.getMessage());
// Handle condition failure
if (stage.isRequired()) {
throw new Exception("Required stage condition failed: " +
stage.getName());
} else {
// Skip optional stage
continue;
}
}
} else {
// Execute unconditionally
executeStage(stage, context);
}
}
}
private static void executeStage(ConditionalStage stage, PipelineContext context)
throws Exception {
// Execute pre-stage actions
for (Action preAction : stage.getPreActions()) {
preAction.run();
}
// Execute main stage logic
stage.execute(context);
// Execute post-stage actions
for (Action postAction : stage.getPostActions()) {
postAction.run();
}
}
}public class ArgumentChainProcessor {
public static void processArgumentChain(List<Action> actions,
Map<String, String> initialArguments)
throws Exception {
// Create mutable arguments map
Map<String, String> currentArguments = new HashMap<>(initialArguments);
for (Action action : actions) {
// Create settable arguments for this action
SettableArguments settableArgs = new MapBackedSettableArguments(currentArguments);
// Create action context with current arguments
ActionContext actionContext = createActionContext(settableArgs);
// Initialize and run action
action.initialize(actionContext);
try {
action.run();
} finally {
action.destroy();
}
// Update arguments for next action
currentArguments.putAll(settableArgs.getUpdatedArguments());
}
}
private static class MapBackedSettableArguments implements SettableArguments {
private final Map<String, String> arguments;
private final Map<String, String> updates;
public MapBackedSettableArguments(Map<String, String> arguments) {
this.arguments = new HashMap<>(arguments);
this.updates = new HashMap<>();
}
@Override
public void set(String name, String value) {
arguments.put(name, value);
updates.put(name, value);
}
@Override
public boolean has(String name) {
return arguments.containsKey(name);
}
@Override
public String get(String name) {
return arguments.get(name);
}
public Map<String, String> getUpdatedArguments() {
return new HashMap<>(updates);
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-cdap-cdap--cdap-etl-api