CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-cdap-cdap--cdap-etl-api

CDAP ETL API provides comprehensive abstractions for building Extract, Transform, and Load pipeline applications on the CDAP platform

Pending
Overview
Eval results
Files

actions-conditions.mddocs/

Actions and Conditions

Pipeline actions and conditional execution for workflow control, external integrations, and dynamic pipeline behavior in CDAP ETL.

Actions

Action

Base abstract class for pipeline actions that execute custom logic or external operations.

package io.cdap.cdap.etl.api.action;

public abstract class Action 
    implements PipelineConfigurable, 
               SubmitterLifecycle<ActionContext>, 
               StageLifecycle<ActionContext> {
    
    public static final String PLUGIN_TYPE = "action";
    
    // Configuration lifecycle
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {}
    
    // Submission lifecycle  
    public void prepareRun(ActionContext context) throws Exception {}
    public void onRunFinish(boolean succeeded, ActionContext context) {}
    
    // Stage lifecycle
    public void initialize(ActionContext context) throws Exception {}
    public void destroy() {}
    
    // Action execution
    public abstract void run() throws Exception;
}

Action Implementation Example:

@Plugin(type = Action.PLUGIN_TYPE)
@Name("DatabaseCleanup")
@Description("Cleans up old records from database tables")
public class DatabaseCleanupAction extends Action {
    
    private final Config config;
    private ActionContext actionContext;
    
    @Override
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
        FailureCollector collector = stageConfigurer.getFailureCollector();
        
        // Validate configuration
        config.validate(collector);
    }
    
    @Override
    public void prepareRun(ActionContext context) throws Exception {
        // Validate database connectivity
        try (Connection conn = getConnection()) {
            if (!conn.isValid(30)) {
                throw new Exception("Database connection is not valid");
            }
        }
    }
    
    @Override
    public void initialize(ActionContext context) throws Exception {
        this.actionContext = context;
    }
    
    @Override
    public void run() throws Exception {
        StageMetrics metrics = actionContext.getMetrics();
        SettableArguments arguments = actionContext.getArguments();
        
        try (Connection conn = getConnection()) {
            for (String tableName : config.tablesToClean) {
                long deletedRecords = cleanupTable(conn, tableName);
                
                // Record metrics
                metrics.count("records.deleted." + tableName, deletedRecords);
                
                // Set runtime arguments for downstream stages
                arguments.set("cleanup." + tableName + ".deleted", String.valueOf(deletedRecords));
                
                LOG.info("Deleted {} old records from table {}", deletedRecords, tableName);
            }
            
            // Set overall completion status
            arguments.set("cleanup.completed", "true");
            arguments.set("cleanup.timestamp", Instant.now().toString());
            
        } catch (SQLException e) {
            metrics.count("cleanup.errors", 1);
            throw new Exception("Database cleanup failed", e);
        }
    }
    
    private long cleanupTable(Connection conn, String tableName) throws SQLException {
        String deleteSQL = String.format(
            "DELETE FROM %s WHERE created_date < ?", 
            tableName
        );
        
        try (PreparedStatement stmt = conn.prepareStatement(deleteSQL)) {
            // Delete records older than retention days
            Timestamp cutoffDate = Timestamp.from(
                Instant.now().minus(config.retentionDays, ChronoUnit.DAYS)
            );
            stmt.setTimestamp(1, cutoffDate);
            
            return stmt.executeUpdate();
        }
    }
    
    private Connection getConnection() throws SQLException {
        return DriverManager.getConnection(
            config.connectionString, 
            config.username, 
            config.password
        );
    }
}

ActionContext

Context interface for action execution providing access to runtime services.

package io.cdap.cdap.etl.api.action;

public interface ActionContext extends StageContext {
    /**
     * Get settable arguments for passing data to other stages.
     */
    SettableArguments getArguments();
}

SettableArguments

Interface for arguments that can be modified by actions.

package io.cdap.cdap.etl.api.action;

public interface SettableArguments extends Arguments {
    /**
     * Set argument value.
     */
    void set(String name, String value);
}

Advanced Action Examples

File Processing Action

@Plugin(type = Action.PLUGIN_TYPE)
@Name("FileProcessor")
@Description("Processes files and prepares them for pipeline consumption")
public class FileProcessorAction extends Action {
    
    private final Config config;
    
    @Override
    public void run() throws Exception {
        ActionContext context = getContext();
        SettableArguments arguments = context.getArguments();
        StageMetrics metrics = context.getMetrics();
        
        // Get input directory
        String inputDir = config.inputDirectory;
        String outputDir = config.outputDirectory;
        String archiveDir = config.archiveDirectory;
        
        // Create directories if they don't exist
        createDirectoryIfNotExists(outputDir);
        createDirectoryIfNotExists(archiveDir);
        
        // Process files
        List<String> processedFiles = new ArrayList<>();
        int totalFiles = 0;
        int successfulFiles = 0;
        int errorFiles = 0;
        
        try (DirectoryStream<Path> stream = Files.newDirectoryStream(
                Paths.get(inputDir), config.filePattern)) {
            
            for (Path filePath : stream) {
                totalFiles++;
                String fileName = filePath.getFileName().toString();
                
                try {
                    // Process individual file
                    boolean processed = processFile(filePath, outputDir);
                    
                    if (processed) {
                        successfulFiles++;
                        processedFiles.add(fileName);
                        
                        // Archive processed file
                        if (config.archiveProcessedFiles) {
                            archiveFile(filePath, archiveDir);
                        }
                    } else {
                        errorFiles++;
                        LOG.warn("Failed to process file: {}", fileName);
                    }
                    
                } catch (Exception e) {
                    errorFiles++;
                    LOG.error("Error processing file: {}", fileName, e);
                    
                    // Move error files to error directory
                    if (config.errorDirectory != null) {
                        moveFileToErrorDirectory(filePath, config.errorDirectory);
                    }
                }
            }
        }
        
        // Record metrics
        metrics.count("files.total", totalFiles);
        metrics.count("files.successful", successfulFiles);
        metrics.count("files.errors", errorFiles);
        
        // Set arguments for downstream stages
        arguments.set("processed.file.count", String.valueOf(successfulFiles));
        arguments.set("processed.files", String.join(",", processedFiles));
        arguments.set("output.directory", outputDir);
        
        if (successfulFiles == 0 && config.failOnNoFiles) {
            throw new Exception("No files were successfully processed");
        }
        
        LOG.info("Processed {} files successfully, {} errors out of {} total files", 
                successfulFiles, errorFiles, totalFiles);
    }
    
    private boolean processFile(Path inputFile, String outputDir) throws IOException {
        String fileName = inputFile.getFileName().toString();
        Path outputFile = Paths.get(outputDir, fileName);
        
        // Apply file transformations based on type
        String fileExtension = getFileExtension(fileName);
        
        switch (fileExtension.toLowerCase()) {
            case "csv":
                return processCSVFile(inputFile, outputFile);
            case "json":
                return processJSONFile(inputFile, outputFile);
            case "xml":
                return processXMLFile(inputFile, outputFile);
            default:
                // For unknown types, just copy
                Files.copy(inputFile, outputFile, StandardCopyOption.REPLACE_EXISTING);
                return true;
        }
    }
    
    private boolean processCSVFile(Path inputFile, Path outputFile) throws IOException {
        // CSV-specific processing: validate format, clean data, etc.
        try (BufferedReader reader = Files.newBufferedReader(inputFile);
             BufferedWriter writer = Files.newBufferedWriter(outputFile)) {
            
            String line;
            int lineNumber = 0;
            boolean hasHeader = config.csvHasHeader;
            
            while ((line = reader.readLine()) != null) {
                lineNumber++;
                
                // Skip header if configured
                if (hasHeader && lineNumber == 1) {
                    writer.write(line);
                    writer.newLine();
                    continue;
                }
                
                // Validate and clean CSV line
                String cleanedLine = cleanCSVLine(line);
                if (cleanedLine != null) {
                    writer.write(cleanedLine);
                    writer.newLine();
                }
            }
            
            return true;
        }
    }
    
    private String cleanCSVLine(String line) {
        if (line == null || line.trim().isEmpty()) {
            return null;
        }
        
        // Apply CSV cleaning rules
        String[] fields = line.split(config.csvDelimiter);
        
        // Validate field count
        if (config.expectedFieldCount > 0 && fields.length != config.expectedFieldCount) {
            LOG.warn("Invalid field count in CSV line: expected {}, got {}", 
                    config.expectedFieldCount, fields.length);
            return null;
        }
        
        // Clean individual fields
        for (int i = 0; i < fields.length; i++) {
            fields[i] = fields[i].trim();
            
            // Remove quotes if present
            if (fields[i].startsWith("\"") && fields[i].endsWith("\"")) {
                fields[i] = fields[i].substring(1, fields[i].length() - 1);
            }
        }
        
        return String.join(config.csvDelimiter, fields);
    }
}

External System Integration Action

@Plugin(type = Action.PLUGIN_TYPE)
@Name("APINotification")
@Description("Sends notifications to external systems via REST API")
public class APINotificationAction extends Action {
    
    private final Config config;
    private HttpClient httpClient;
    
    @Override
    public void initialize(ActionContext context) throws Exception {
        // Initialize HTTP client
        this.httpClient = HttpClient.newBuilder()
            .connectTimeout(Duration.ofSeconds(config.connectTimeoutSeconds))
            .build();
    }
    
    @Override
    public void run() throws Exception {
        ActionContext context = getContext();
        Arguments arguments = context.getArguments();
        StageMetrics metrics = context.getMetrics();
        
        // Build notification payload
        Map<String, Object> payload = buildNotificationPayload(arguments);
        
        // Convert to JSON
        String jsonPayload = new Gson().toJson(payload);
        
        // Send notification
        HttpRequest request = HttpRequest.newBuilder()
            .uri(URI.create(config.webhookUrl))
            .header("Content-Type", "application/json")
            .header("Authorization", "Bearer " + config.apiToken)
            .POST(HttpRequest.BodyPublishers.ofString(jsonPayload))
            .timeout(Duration.ofSeconds(config.requestTimeoutSeconds))
            .build();
        
        try {
            HttpResponse<String> response = httpClient.send(request, 
                HttpResponse.BodyHandlers.ofString());
            
            if (response.statusCode() >= 200 && response.statusCode() < 300) {
                metrics.count("notifications.success", 1);
                LOG.info("Notification sent successfully: {}", response.statusCode());
                
                // Parse response if needed
                if (config.parseResponse) {
                    parseAndSetResponse(response.body(), context.getArguments());
                }
                
            } else {
                metrics.count("notifications.failure", 1);
                String errorMsg = String.format("Notification failed with status %d: %s", 
                                              response.statusCode(), response.body());
                
                if (config.failOnError) {
                    throw new Exception(errorMsg);
                } else {
                    LOG.warn(errorMsg);
                }
            }
            
        } catch (IOException | InterruptedException e) {
            metrics.count("notifications.error", 1);
            
            if (config.failOnError) {
                throw new Exception("Failed to send notification", e);
            } else {
                LOG.error("Error sending notification", e);
            }
        }
    }
    
    private Map<String, Object> buildNotificationPayload(Arguments arguments) {
        Map<String, Object> payload = new HashMap<>();
        
        // Add basic pipeline information
        payload.put("pipelineName", arguments.get("pipeline.name"));
        payload.put("pipelineRunId", arguments.get("pipeline.run.id"));
        payload.put("timestamp", Instant.now().toString());
        
        // Add custom fields from configuration
        if (config.customFields != null) {
            for (Map.Entry<String, String> entry : config.customFields.entrySet()) {
                String key = entry.getKey();
                String template = entry.getValue();
                
                // Resolve template variables
                String value = resolveTemplate(template, arguments);
                payload.put(key, value);
            }
        }
        
        // Add dynamic arguments
        if (config.includeArguments != null) {
            Map<String, String> dynamicArgs = new HashMap<>();
            for (String argName : config.includeArguments) {
                String argValue = arguments.get(argName);
                if (argValue != null) {
                    dynamicArgs.put(argName, argValue);
                }
            }
            payload.put("arguments", dynamicArgs);
        }
        
        return payload;
    }
    
    private String resolveTemplate(String template, Arguments arguments) {
        String resolved = template;
        
        // Simple template variable resolution: ${variable.name}
        Pattern pattern = Pattern.compile("\\$\\{([^}]+)\\}");
        Matcher matcher = pattern.matcher(template);
        
        while (matcher.find()) {
            String variableName = matcher.group(1);
            String variableValue = arguments.get(variableName);
            
            if (variableValue != null) {
                resolved = resolved.replace(matcher.group(0), variableValue);
            }
        }
        
        return resolved;
    }
    
    @Override
    public void destroy() {
        // Cleanup HTTP client resources
        if (httpClient != null) {
            // HttpClient doesn't need explicit cleanup in Java 11+
        }
    }
}

Conditions

Condition

Base abstract class for conditional execution in pipelines.

package io.cdap.cdap.etl.api.condition;

public abstract class Condition 
    implements PipelineConfigurable, 
               SubmitterLifecycle<ConditionContext>, 
               StageLifecycle<ConditionContext> {
    
    public static final String PLUGIN_TYPE = "condition";
    
    // Configuration lifecycle
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {}
    
    // Submission lifecycle
    public void prepareRun(ConditionContext context) throws Exception {}
    public void onRunFinish(boolean succeeded, ConditionContext context) {}
    
    // Stage lifecycle
    public void initialize(ConditionContext context) throws Exception {}
    public void destroy() {}
    
    // Condition evaluation
    public abstract ConditionResult apply() throws Exception;
}

Condition Implementation Example:

@Plugin(type = Condition.PLUGIN_TYPE)
@Name("FileExistenceCondition")
@Description("Checks if specified files exist before proceeding")
public class FileExistenceCondition extends Condition {
    
    private final Config config;
    private ConditionContext conditionContext;
    
    @Override
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
        FailureCollector collector = stageConfigurer.getFailureCollector();
        
        config.validate(collector);
    }
    
    @Override
    public void initialize(ConditionContext context) throws Exception {
        this.conditionContext = context;
    }
    
    @Override
    public ConditionResult apply() throws Exception {
        StageMetrics metrics = conditionContext.getMetrics();
        Arguments arguments = conditionContext.getArguments();
        
        List<String> missingFiles = new ArrayList<>();
        List<String> existingFiles = new ArrayList<>();
        
        // Check each required file
        for (String filePath : config.requiredFiles) {
            // Resolve file path with runtime arguments
            String resolvedPath = resolveFilePath(filePath, arguments);
            
            if (fileExists(resolvedPath)) {
                existingFiles.add(resolvedPath);
                metrics.count("files.found", 1);
            } else {
                missingFiles.add(resolvedPath);
                metrics.count("files.missing", 1);
            }
        }
        
        // Determine condition result
        boolean conditionMet = false;
        String message;
        
        switch (config.checkMode) {
            case ALL_MUST_EXIST:
                conditionMet = missingFiles.isEmpty();
                message = conditionMet ? 
                    "All required files exist" : 
                    "Missing files: " + String.join(", ", missingFiles);
                break;
                
            case ANY_MUST_EXIST:
                conditionMet = !existingFiles.isEmpty();
                message = conditionMet ?
                    "Found files: " + String.join(", ", existingFiles) :
                    "No required files found";
                break;
                
            case NONE_MUST_EXIST:
                conditionMet = existingFiles.isEmpty();
                message = conditionMet ?
                    "No files exist (as expected)" :
                    "Unexpected files found: " + String.join(", ", existingFiles);
                break;
                
            default:
                throw new IllegalArgumentException("Unknown check mode: " + config.checkMode);
        }
        
        LOG.info("File existence condition: {} - {}", conditionMet, message);
        return new ConditionResult(conditionMet, message);
    }
    
    private String resolveFilePath(String filePath, Arguments arguments) {
        String resolved = filePath;
        
        // Replace runtime argument placeholders
        Pattern pattern = Pattern.compile("\\$\\{([^}]+)\\}");
        Matcher matcher = pattern.matcher(filePath);
        
        while (matcher.find()) {
            String argName = matcher.group(1);
            String argValue = arguments.get(argName);
            
            if (argValue != null) {
                resolved = resolved.replace(matcher.group(0), argValue);
            }
        }
        
        return resolved;
    }
    
    private boolean fileExists(String filePath) {
        try {
            Path path = Paths.get(filePath);
            return Files.exists(path) && Files.isReadable(path);
        } catch (Exception e) {
            LOG.warn("Error checking file existence: {}", filePath, e);
            return false;
        }
    }
}

ConditionContext

Context interface for condition evaluation.

package io.cdap.cdap.etl.api.condition;

public interface ConditionContext extends StageContext {
    /**
     * Get stage statistics for previous stages.
     */
    StageStatistics getStageStatistics(String stageName);
}

StageStatistics

Interface for accessing statistics from pipeline stages.

package io.cdap.cdap.etl.api.condition;

public interface StageStatistics {
    /**
     * Get input record count.
     */
    long getInputRecordsCount();
    
    /**
     * Get output record count.
     */
    long getOutputRecordsCount();
    
    /**
     * Get error record count.
     */
    long getErrorRecordsCount();
}

Advanced Condition Examples

Data Quality Condition

@Plugin(type = Condition.PLUGIN_TYPE)
@Name("DataQualityCondition")
@Description("Checks data quality metrics before proceeding")
public class DataQualityCondition extends Condition {
    
    private final Config config;
    
    @Override
    public ConditionResult apply() throws Exception {
        ConditionContext context = getContext();
        
        boolean qualityMet = true;
        StringBuilder messageBuilder = new StringBuilder();
        List<String> failures = new ArrayList<>();
        
        // Check each configured stage's statistics
        for (QualityCheck check : config.qualityChecks) {
            StageStatistics stats = context.getStageStatistics(check.stageName);
            
            if (stats == null) {
                failures.add("No statistics available for stage: " + check.stageName);
                qualityMet = false;
                continue;
            }
            
            // Check error rate
            if (check.maxErrorRate != null) {
                long totalRecords = stats.getInputRecordsCount();
                long errorRecords = stats.getErrorRecordsCount();
                
                if (totalRecords > 0) {
                    double errorRate = (double) errorRecords / totalRecords;
                    if (errorRate > check.maxErrorRate) {
                        failures.add(String.format(
                            "Stage %s error rate %.2f%% exceeds limit %.2f%%",
                            check.stageName, errorRate * 100, check.maxErrorRate * 100
                        ));
                        qualityMet = false;
                    }
                }
            }
            
            // Check minimum record count
            if (check.minRecordCount != null) {
                long outputRecords = stats.getOutputRecordsCount();
                if (outputRecords < check.minRecordCount) {
                    failures.add(String.format(
                        "Stage %s output count %d below minimum %d",
                        check.stageName, outputRecords, check.minRecordCount
                    ));
                    qualityMet = false;
                }
            }
            
            // Check maximum record count
            if (check.maxRecordCount != null) {
                long outputRecords = stats.getOutputRecordsCount();
                if (outputRecords > check.maxRecordCount) {
                    failures.add(String.format(
                        "Stage %s output count %d exceeds maximum %d",
                        check.stageName, outputRecords, check.maxRecordCount
                    ));
                    qualityMet = false;
                }
            }
        }
        
        String message;
        if (qualityMet) {
            message = "All data quality checks passed";
        } else {
            message = "Data quality failures: " + String.join("; ", failures);
        }
        
        return new ConditionResult(qualityMet, message);
    }
    
    private static class QualityCheck {
        public String stageName;
        public Double maxErrorRate;
        public Long minRecordCount;
        public Long maxRecordCount;
    }
}

Time-Based Condition

@Plugin(type = Condition.PLUGIN_TYPE)
@Name("TimeWindowCondition")
@Description("Checks if current time is within allowed execution window")
public class TimeWindowCondition extends Condition {
    
    private final Config config;
    
    @Override
    public ConditionResult apply() throws Exception {
        ZonedDateTime now = ZonedDateTime.now(ZoneId.of(config.timezone));
        
        // Check day of week
        if (config.allowedDaysOfWeek != null && !config.allowedDaysOfWeek.isEmpty()) {
            DayOfWeek currentDay = now.getDayOfWeek();
            if (!config.allowedDaysOfWeek.contains(currentDay)) {
                return new ConditionResult(false, 
                    String.format("Current day %s not in allowed days: %s", 
                                currentDay, config.allowedDaysOfWeek));
            }
        }
        
        // Check time window
        if (config.startTime != null && config.endTime != null) {
            LocalTime currentTime = now.toLocalTime();
            LocalTime startTime = LocalTime.parse(config.startTime);
            LocalTime endTime = LocalTime.parse(config.endTime);
            
            boolean inWindow;
            if (startTime.isBefore(endTime)) {
                // Normal window (e.g., 09:00-17:00)
                inWindow = currentTime.isAfter(startTime) && currentTime.isBefore(endTime);
            } else {
                // Overnight window (e.g., 22:00-06:00)
                inWindow = currentTime.isAfter(startTime) || currentTime.isBefore(endTime);
            }
            
            if (!inWindow) {
                return new ConditionResult(false,
                    String.format("Current time %s not in allowed window %s-%s",
                                currentTime, config.startTime, config.endTime));
            }
        }
        
        // Check date range
        if (config.startDate != null || config.endDate != null) {
            LocalDate currentDate = now.toLocalDate();
            
            if (config.startDate != null) {
                LocalDate startDate = LocalDate.parse(config.startDate);
                if (currentDate.isBefore(startDate)) {
                    return new ConditionResult(false,
                        String.format("Current date %s before start date %s",
                                    currentDate, config.startDate));
                }
            }
            
            if (config.endDate != null) {
                LocalDate endDate = LocalDate.parse(config.endDate);
                if (currentDate.isAfter(endDate)) {
                    return new ConditionResult(false,
                        String.format("Current date %s after end date %s",
                                    currentDate, config.endDate));
                }
            }
        }
        
        return new ConditionResult(true, 
            String.format("Time window check passed at %s", now));
    }
}

Workflow Integration

Conditional Pipeline Execution

public class ConditionalPipelineOrchestrator {
    
    public static void executeConditionalStages(List<ConditionalStage> stages, 
                                              PipelineContext context) throws Exception {
        
        for (ConditionalStage stage : stages) {
            if (stage.hasCondition()) {
                // Evaluate condition
                ConditionResult result = stage.getCondition().apply();
                
                if (result.isConditionMet()) {
                    LOG.info("Condition met for stage {}: {}", 
                           stage.getName(), result.getMessage());
                    
                    // Execute stage
                    executeStage(stage, context);
                } else {
                    LOG.info("Condition not met for stage {}: {}", 
                           stage.getName(), result.getMessage());
                    
                    // Handle condition failure
                    if (stage.isRequired()) {
                        throw new Exception("Required stage condition failed: " + 
                                          stage.getName());
                    } else {
                        // Skip optional stage
                        continue;
                    }
                }
            } else {
                // Execute unconditionally
                executeStage(stage, context);
            }
        }
    }
    
    private static void executeStage(ConditionalStage stage, PipelineContext context) 
            throws Exception {
        // Execute pre-stage actions
        for (Action preAction : stage.getPreActions()) {
            preAction.run();
        }
        
        // Execute main stage logic
        stage.execute(context);
        
        // Execute post-stage actions
        for (Action postAction : stage.getPostActions()) {
            postAction.run();
        }
    }
}

Dynamic Argument Passing

public class ArgumentChainProcessor {
    
    public static void processArgumentChain(List<Action> actions, 
                                          Map<String, String> initialArguments) 
            throws Exception {
        
        // Create mutable arguments map
        Map<String, String> currentArguments = new HashMap<>(initialArguments);
        
        for (Action action : actions) {
            // Create settable arguments for this action
            SettableArguments settableArgs = new MapBackedSettableArguments(currentArguments);
            
            // Create action context with current arguments
            ActionContext actionContext = createActionContext(settableArgs);
            
            // Initialize and run action
            action.initialize(actionContext);
            try {
                action.run();
            } finally {
                action.destroy();
            }
            
            // Update arguments for next action
            currentArguments.putAll(settableArgs.getUpdatedArguments());
        }
    }
    
    private static class MapBackedSettableArguments implements SettableArguments {
        private final Map<String, String> arguments;
        private final Map<String, String> updates;
        
        public MapBackedSettableArguments(Map<String, String> arguments) {
            this.arguments = new HashMap<>(arguments);
            this.updates = new HashMap<>();
        }
        
        @Override
        public void set(String name, String value) {
            arguments.put(name, value);
            updates.put(name, value);
        }
        
        @Override
        public boolean has(String name) {
            return arguments.containsKey(name);
        }
        
        @Override
        public String get(String name) {
            return arguments.get(name);
        }
        
        public Map<String, String> getUpdatedArguments() {
            return new HashMap<>(updates);
        }
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-io-cdap-cdap--cdap-etl-api

docs

actions-conditions.md

batch-processing.md

core-pipeline.md

data-connectors.md

index.md

join-operations.md

lineage-metadata.md

sql-engine.md

validation.md

tile.json