CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-api

Core application programming interface for the Cask Data Application Platform enabling development of scalable data processing applications on Hadoop ecosystems.

Pending
Overview
Eval results
Files

workflow-programs.mddocs/

Workflow Programs

Workflow programs in CDAP provide orchestration capabilities for coordinating the execution of multiple programs with support for conditional logic, parallel execution, and state management.

Core Workflow Interfaces

Workflow

public interface Workflow extends ProgramLifecycle<WorkflowContext> {
    void configure(WorkflowConfigurer configurer);
}

AbstractWorkflow

public abstract class AbstractWorkflow implements Workflow {
    public abstract void configure(WorkflowConfigurer configurer);
    
    @Override
    public void initialize(WorkflowContext context) throws Exception {
        // Optional initialization
    }
    
    @Override
    public void destroy() {
        // Optional cleanup
    }
}

Workflow Configuration

WorkflowConfigurer

public interface WorkflowConfigurer extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {
    void addMapReduce(String name);
    void addSpark(String name);
    void addAction(CustomAction action);
    
    void fork();
    void also();
    void join();
    
    void condition(Condition condition);
    void otherwise();
    void end();
}

WorkflowContext

public interface WorkflowContext extends RuntimeContext, DatasetContext {
    WorkflowToken getWorkflowToken();
    Map<String, String> getRuntimeArguments();
    
    WorkflowNodeState getNodeState(String nodeName);
    Map<String, WorkflowNodeState> getNodeStates();
}

Workflow Token and State Management

WorkflowToken

public interface WorkflowToken {
    void put(String key, Value value);
    void put(String key, String value);
    void put(String key, int value);
    void put(String key, long value);
    void put(String key, double value);
    void put(String key, boolean value);
    
    Value get(String key);
    Value get(String key, String nodeName);
    Value get(String key, WorkflowNodeScope scope);
    
    Map<String, Value> getAll();
    Map<String, Value> getAll(WorkflowNodeScope scope);
    
    void putAll(Map<String, String> values);
}

Conditions and Custom Actions

Condition

public interface Condition extends ProgramLifecycle<WorkflowContext> {
    void configure(ConditionConfigurer configurer);
    boolean apply(WorkflowContext context) throws Exception;
}

AbstractCondition

public abstract class AbstractCondition implements Condition {
    public abstract void configure(ConditionConfigurer configurer);
    public abstract boolean apply(WorkflowContext context) throws Exception;
    
    @Override
    public void initialize(WorkflowContext context) throws Exception {
        // Optional initialization
    }
    
    @Override
    public void destroy() {
        // Optional cleanup
    }
}

CustomAction

public interface CustomAction extends ProgramLifecycle<CustomActionContext> {
    void configure(CustomActionConfigurer configurer);
    void run(CustomActionContext context) throws Exception;
}

Usage Examples

Basic Workflow

public class DataProcessingWorkflow extends AbstractWorkflow {
    
    @Override
    public void configure(WorkflowConfigurer configurer) {
        configurer.setName("DataProcessingWorkflow");
        configurer.setDescription("Processes daily data with validation and aggregation");
        
        // Sequential execution
        configurer.addAction(new DataValidationAction());
        configurer.addMapReduce("DataCleaning");
        configurer.addSpark("DataAggregation");
        configurer.addAction(new NotificationAction());
    }
}

Workflow with Conditional Logic

public class ConditionalWorkflow extends AbstractWorkflow {
    
    @Override
    public void configure(WorkflowConfigurer configurer) {
        configurer.setName("ConditionalProcessing");
        
        configurer.addAction(new DataCheckAction());
        
        // Conditional execution based on data availability
        configurer.condition(new DataAvailabilityCondition())
            .addMapReduce("ProcessLargeDataset")
            .addSpark("ComplexAnalytics")
        .otherwise()
            .addMapReduce("ProcessSmallDataset")
            .addAction(new SimpleReportAction())
        .end();
        
        configurer.addAction(new CleanupAction());
    }
}

public class DataAvailabilityCondition extends AbstractCondition {
    
    @Override
    public void configure(ConditionConfigurer configurer) {
        configurer.setName("DataAvailabilityCheck");
        configurer.useDataset("inputData");
    }
    
    @Override
    public boolean apply(WorkflowContext context) throws Exception {
        FileSet inputData = context.getDataset("inputData");
        
        // Check if large dataset is available
        Location dataLocation = inputData.getLocation("large-dataset");
        if (dataLocation.exists()) {
            long fileSize = dataLocation.length();
            context.getWorkflowToken().put("dataSize", fileSize);
            return fileSize > 1000000; // 1MB threshold
        }
        
        return false;
    }
}

Workflow with Fork-Join Parallelism

public class ParallelWorkflow extends AbstractWorkflow {
    
    @Override
    public void configure(WorkflowConfigurer configurer) {
        configurer.setName("ParallelProcessing");
        
        configurer.addAction(new PrepareDataAction());
        
        // Fork for parallel execution
        configurer.fork()
            .addMapReduce("ProcessRegionA")
            .addSpark("AnalyzeRegionA")
        .also()
            .addMapReduce("ProcessRegionB")
            .addSpark("AnalyzeRegionB")
        .also()
            .addMapReduce("ProcessRegionC")
            .addSpark("AnalyzeRegionC")
        .join();
        
        // Continue with sequential execution after join
        configurer.addSpark("CombineResults");
        configurer.addAction(new GenerateReportAction());
    }
}

Custom Action Implementation

public class DataValidationAction implements CustomAction {
    
    @Override
    public void configure(CustomActionConfigurer configurer) {
        configurer.setName("DataValidation");
        configurer.setDescription("Validates input data quality");
        configurer.useDataset("inputData");
        configurer.useDataset("validationRules");
    }
    
    @Override
    public void run(CustomActionContext context) throws Exception {
        ObjectStore<DataRecord> inputData = context.getDataset("inputData");
        KeyValueTable rules = context.getDataset("validationRules");
        
        int totalRecords = 0;
        int validRecords = 0;
        int invalidRecords = 0;
        
        // Validate each record
        try (CloseableIterator<KeyValue<byte[], DataRecord>> iterator = inputData.scan(null, null)) {
            while (iterator.hasNext()) {
                KeyValue<byte[], DataRecord> entry = iterator.next();
                DataRecord record = entry.getValue();
                totalRecords++;
                
                if (validateRecord(record, rules)) {
                    validRecords++;
                } else {
                    invalidRecords++;
                    // Log invalid record details
                    context.getMetrics().count("validation.invalid", 1);
                }
            }
        }
        
        // Store validation results in workflow token for downstream use
        WorkflowToken token = context.getWorkflowToken();
        token.put("validation.total", totalRecords);
        token.put("validation.valid", validRecords);
        token.put("validation.invalid", invalidRecords);
        
        double validationRate = (double) validRecords / totalRecords;
        token.put("validation.rate", validationRate);
        
        // Fail the workflow if validation rate is too low
        if (validationRate < 0.95) {
            throw new RuntimeException("Data validation failed: only " + 
                                     (validationRate * 100) + "% of records are valid");
        }
    }
    
    private boolean validateRecord(DataRecord record, KeyValueTable rules) {
        // Implement validation logic based on rules
        return record != null && record.getId() != null && !record.getId().isEmpty();
    }
}

Workflow with Plugin Integration

public class PluginWorkflow extends AbstractWorkflow {
    
    @Override
    public void configure(WorkflowConfigurer configurer) {
        configurer.setName("PluginBasedWorkflow");
        
        // Use external data source plugin
        configurer.usePlugin("source", "externalAPI", "apiSource",
                            PluginProperties.builder()
                                .add("endpoint", "https://api.example.com/data")
                                .add("apiKey", "${api.key}")
                                .build());
        
        configurer.addAction(new FetchExternalDataAction());
        configurer.addMapReduce("ProcessExternalData");
        configurer.addAction(new PublishResultsAction());
    }
}

public class FetchExternalDataAction implements CustomAction {
    
    @Override
    public void configure(CustomActionConfigurer configurer) {
        configurer.setName("FetchExternalData");
        configurer.useDataset("externalData");
    }
    
    @Override
    public void run(CustomActionContext context) throws Exception {
        ExternalDataSource source = context.getPluginContext().newPluginInstance("apiSource");
        ObjectStore<ExternalRecord> dataStore = context.getDataset("externalData");
        
        List<ExternalRecord> records = source.fetchData();
        
        for (int i = 0; i < records.size(); i++) {
            dataStore.write("record_" + i, records.get(i));
        }
        
        context.getWorkflowToken().put("external.records.count", records.size());
        context.getMetrics().count("external.records.fetched", records.size());
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-api

docs

annotations.md

application-framework.md

dataset-management.md

index.md

mapreduce-programs.md

plugin-framework.md

scheduling.md

service-programs.md

spark-programs.md

system-services.md

transactions.md

worker-programs.md

workflow-programs.md

tile.json