Core application programming interface for the Cask Data Application Platform enabling development of scalable data processing applications on Hadoop ecosystems.
—
Workflow programs in CDAP provide orchestration capabilities for coordinating the execution of multiple programs with support for conditional logic, parallel execution, and state management.
public interface Workflow extends ProgramLifecycle<WorkflowContext> {
void configure(WorkflowConfigurer configurer);
}public abstract class AbstractWorkflow implements Workflow {
public abstract void configure(WorkflowConfigurer configurer);
@Override
public void initialize(WorkflowContext context) throws Exception {
// Optional initialization
}
@Override
public void destroy() {
// Optional cleanup
}
}public interface WorkflowConfigurer extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {
void addMapReduce(String name);
void addSpark(String name);
void addAction(CustomAction action);
void fork();
void also();
void join();
void condition(Condition condition);
void otherwise();
void end();
}public interface WorkflowContext extends RuntimeContext, DatasetContext {
WorkflowToken getWorkflowToken();
Map<String, String> getRuntimeArguments();
WorkflowNodeState getNodeState(String nodeName);
Map<String, WorkflowNodeState> getNodeStates();
}public interface WorkflowToken {
void put(String key, Value value);
void put(String key, String value);
void put(String key, int value);
void put(String key, long value);
void put(String key, double value);
void put(String key, boolean value);
Value get(String key);
Value get(String key, String nodeName);
Value get(String key, WorkflowNodeScope scope);
Map<String, Value> getAll();
Map<String, Value> getAll(WorkflowNodeScope scope);
void putAll(Map<String, String> values);
}public interface Condition extends ProgramLifecycle<WorkflowContext> {
void configure(ConditionConfigurer configurer);
boolean apply(WorkflowContext context) throws Exception;
}public abstract class AbstractCondition implements Condition {
public abstract void configure(ConditionConfigurer configurer);
public abstract boolean apply(WorkflowContext context) throws Exception;
@Override
public void initialize(WorkflowContext context) throws Exception {
// Optional initialization
}
@Override
public void destroy() {
// Optional cleanup
}
}public interface CustomAction extends ProgramLifecycle<CustomActionContext> {
void configure(CustomActionConfigurer configurer);
void run(CustomActionContext context) throws Exception;
}public class DataProcessingWorkflow extends AbstractWorkflow {
@Override
public void configure(WorkflowConfigurer configurer) {
configurer.setName("DataProcessingWorkflow");
configurer.setDescription("Processes daily data with validation and aggregation");
// Sequential execution
configurer.addAction(new DataValidationAction());
configurer.addMapReduce("DataCleaning");
configurer.addSpark("DataAggregation");
configurer.addAction(new NotificationAction());
}
}public class ConditionalWorkflow extends AbstractWorkflow {
@Override
public void configure(WorkflowConfigurer configurer) {
configurer.setName("ConditionalProcessing");
configurer.addAction(new DataCheckAction());
// Conditional execution based on data availability
configurer.condition(new DataAvailabilityCondition())
.addMapReduce("ProcessLargeDataset")
.addSpark("ComplexAnalytics")
.otherwise()
.addMapReduce("ProcessSmallDataset")
.addAction(new SimpleReportAction())
.end();
configurer.addAction(new CleanupAction());
}
}
public class DataAvailabilityCondition extends AbstractCondition {
@Override
public void configure(ConditionConfigurer configurer) {
configurer.setName("DataAvailabilityCheck");
configurer.useDataset("inputData");
}
@Override
public boolean apply(WorkflowContext context) throws Exception {
FileSet inputData = context.getDataset("inputData");
// Check if large dataset is available
Location dataLocation = inputData.getLocation("large-dataset");
if (dataLocation.exists()) {
long fileSize = dataLocation.length();
context.getWorkflowToken().put("dataSize", fileSize);
return fileSize > 1000000; // 1MB threshold
}
return false;
}
}public class ParallelWorkflow extends AbstractWorkflow {
@Override
public void configure(WorkflowConfigurer configurer) {
configurer.setName("ParallelProcessing");
configurer.addAction(new PrepareDataAction());
// Fork for parallel execution
configurer.fork()
.addMapReduce("ProcessRegionA")
.addSpark("AnalyzeRegionA")
.also()
.addMapReduce("ProcessRegionB")
.addSpark("AnalyzeRegionB")
.also()
.addMapReduce("ProcessRegionC")
.addSpark("AnalyzeRegionC")
.join();
// Continue with sequential execution after join
configurer.addSpark("CombineResults");
configurer.addAction(new GenerateReportAction());
}
}public class DataValidationAction implements CustomAction {
@Override
public void configure(CustomActionConfigurer configurer) {
configurer.setName("DataValidation");
configurer.setDescription("Validates input data quality");
configurer.useDataset("inputData");
configurer.useDataset("validationRules");
}
@Override
public void run(CustomActionContext context) throws Exception {
ObjectStore<DataRecord> inputData = context.getDataset("inputData");
KeyValueTable rules = context.getDataset("validationRules");
int totalRecords = 0;
int validRecords = 0;
int invalidRecords = 0;
// Validate each record
try (CloseableIterator<KeyValue<byte[], DataRecord>> iterator = inputData.scan(null, null)) {
while (iterator.hasNext()) {
KeyValue<byte[], DataRecord> entry = iterator.next();
DataRecord record = entry.getValue();
totalRecords++;
if (validateRecord(record, rules)) {
validRecords++;
} else {
invalidRecords++;
// Log invalid record details
context.getMetrics().count("validation.invalid", 1);
}
}
}
// Store validation results in workflow token for downstream use
WorkflowToken token = context.getWorkflowToken();
token.put("validation.total", totalRecords);
token.put("validation.valid", validRecords);
token.put("validation.invalid", invalidRecords);
double validationRate = (double) validRecords / totalRecords;
token.put("validation.rate", validationRate);
// Fail the workflow if validation rate is too low
if (validationRate < 0.95) {
throw new RuntimeException("Data validation failed: only " +
(validationRate * 100) + "% of records are valid");
}
}
private boolean validateRecord(DataRecord record, KeyValueTable rules) {
// Implement validation logic based on rules
return record != null && record.getId() != null && !record.getId().isEmpty();
}
}public class PluginWorkflow extends AbstractWorkflow {
@Override
public void configure(WorkflowConfigurer configurer) {
configurer.setName("PluginBasedWorkflow");
// Use external data source plugin
configurer.usePlugin("source", "externalAPI", "apiSource",
PluginProperties.builder()
.add("endpoint", "https://api.example.com/data")
.add("apiKey", "${api.key}")
.build());
configurer.addAction(new FetchExternalDataAction());
configurer.addMapReduce("ProcessExternalData");
configurer.addAction(new PublishResultsAction());
}
}
public class FetchExternalDataAction implements CustomAction {
@Override
public void configure(CustomActionConfigurer configurer) {
configurer.setName("FetchExternalData");
configurer.useDataset("externalData");
}
@Override
public void run(CustomActionContext context) throws Exception {
ExternalDataSource source = context.getPluginContext().newPluginInstance("apiSource");
ObjectStore<ExternalRecord> dataStore = context.getDataset("externalData");
List<ExternalRecord> records = source.fetchData();
for (int i = 0; i < records.size(); i++) {
dataStore.write("record_" + i, records.get(i));
}
context.getWorkflowToken().put("external.records.count", records.size());
context.getMetrics().count("external.records.fetched", records.size());
}
}Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-api