0
# Workflow Programs
1
2
Workflow programs in CDAP provide orchestration capabilities for coordinating the execution of multiple programs with support for conditional logic, parallel execution, and state management.
3
4
## Core Workflow Interfaces
5
6
### Workflow
7
8
```java { .api }
9
public interface Workflow extends ProgramLifecycle<WorkflowContext> {
10
void configure(WorkflowConfigurer configurer);
11
}
12
```
13
14
### AbstractWorkflow
15
16
```java { .api }
17
public abstract class AbstractWorkflow implements Workflow {
18
public abstract void configure(WorkflowConfigurer configurer);
19
20
@Override
21
public void initialize(WorkflowContext context) throws Exception {
22
// Optional initialization
23
}
24
25
@Override
26
public void destroy() {
27
// Optional cleanup
28
}
29
}
30
```
31
32
## Workflow Configuration
33
34
### WorkflowConfigurer
35
36
```java { .api }
37
public interface WorkflowConfigurer extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {
38
void addMapReduce(String name);
39
void addSpark(String name);
40
void addAction(CustomAction action);
41
42
void fork();
43
void also();
44
void join();
45
46
void condition(Condition condition);
47
void otherwise();
48
void end();
49
}
50
```
51
52
### WorkflowContext
53
54
```java { .api }
55
public interface WorkflowContext extends RuntimeContext, DatasetContext {
56
WorkflowToken getWorkflowToken();
57
Map<String, String> getRuntimeArguments();
58
59
WorkflowNodeState getNodeState(String nodeName);
60
Map<String, WorkflowNodeState> getNodeStates();
61
}
62
```
63
64
## Workflow Token and State Management
65
66
### WorkflowToken
67
68
```java { .api }
69
public interface WorkflowToken {
70
void put(String key, Value value);
71
void put(String key, String value);
72
void put(String key, int value);
73
void put(String key, long value);
74
void put(String key, double value);
75
void put(String key, boolean value);
76
77
Value get(String key);
78
Value get(String key, String nodeName);
79
Value get(String key, WorkflowNodeScope scope);
80
81
Map<String, Value> getAll();
82
Map<String, Value> getAll(WorkflowNodeScope scope);
83
84
void putAll(Map<String, String> values);
85
}
86
```
87
88
## Conditions and Custom Actions
89
90
### Condition
91
92
```java { .api }
93
public interface Condition extends ProgramLifecycle<WorkflowContext> {
94
void configure(ConditionConfigurer configurer);
95
boolean apply(WorkflowContext context) throws Exception;
96
}
97
```
98
99
### AbstractCondition
100
101
```java { .api }
102
public abstract class AbstractCondition implements Condition {
103
public abstract void configure(ConditionConfigurer configurer);
104
public abstract boolean apply(WorkflowContext context) throws Exception;
105
106
@Override
107
public void initialize(WorkflowContext context) throws Exception {
108
// Optional initialization
109
}
110
111
@Override
112
public void destroy() {
113
// Optional cleanup
114
}
115
}
116
```
117
118
### CustomAction
119
120
```java { .api }
121
public interface CustomAction extends ProgramLifecycle<CustomActionContext> {
122
void configure(CustomActionConfigurer configurer);
123
void run(CustomActionContext context) throws Exception;
124
}
125
```
126
127
## Usage Examples
128
129
### Basic Workflow
130
131
```java
132
public class DataProcessingWorkflow extends AbstractWorkflow {
133
134
@Override
135
public void configure(WorkflowConfigurer configurer) {
136
configurer.setName("DataProcessingWorkflow");
137
configurer.setDescription("Processes daily data with validation and aggregation");
138
139
// Sequential execution
140
configurer.addAction(new DataValidationAction());
141
configurer.addMapReduce("DataCleaning");
142
configurer.addSpark("DataAggregation");
143
configurer.addAction(new NotificationAction());
144
}
145
}
146
```
147
148
### Workflow with Conditional Logic
149
150
```java
151
public class ConditionalWorkflow extends AbstractWorkflow {
152
153
@Override
154
public void configure(WorkflowConfigurer configurer) {
155
configurer.setName("ConditionalProcessing");
156
157
configurer.addAction(new DataCheckAction());
158
159
// Conditional execution based on data availability
160
configurer.condition(new DataAvailabilityCondition())
161
.addMapReduce("ProcessLargeDataset")
162
.addSpark("ComplexAnalytics")
163
.otherwise()
164
.addMapReduce("ProcessSmallDataset")
165
.addAction(new SimpleReportAction())
166
.end();
167
168
configurer.addAction(new CleanupAction());
169
}
170
}
171
172
public class DataAvailabilityCondition extends AbstractCondition {
173
174
@Override
175
public void configure(ConditionConfigurer configurer) {
176
configurer.setName("DataAvailabilityCheck");
177
configurer.useDataset("inputData");
178
}
179
180
@Override
181
public boolean apply(WorkflowContext context) throws Exception {
182
FileSet inputData = context.getDataset("inputData");
183
184
// Check if large dataset is available
185
Location dataLocation = inputData.getLocation("large-dataset");
186
if (dataLocation.exists()) {
187
long fileSize = dataLocation.length();
188
context.getWorkflowToken().put("dataSize", fileSize);
189
return fileSize > 1000000; // 1MB threshold
190
}
191
192
return false;
193
}
194
}
195
```
196
197
### Workflow with Fork-Join Parallelism
198
199
```java
200
public class ParallelWorkflow extends AbstractWorkflow {
201
202
@Override
203
public void configure(WorkflowConfigurer configurer) {
204
configurer.setName("ParallelProcessing");
205
206
configurer.addAction(new PrepareDataAction());
207
208
// Fork for parallel execution
209
configurer.fork()
210
.addMapReduce("ProcessRegionA")
211
.addSpark("AnalyzeRegionA")
212
.also()
213
.addMapReduce("ProcessRegionB")
214
.addSpark("AnalyzeRegionB")
215
.also()
216
.addMapReduce("ProcessRegionC")
217
.addSpark("AnalyzeRegionC")
218
.join();
219
220
// Continue with sequential execution after join
221
configurer.addSpark("CombineResults");
222
configurer.addAction(new GenerateReportAction());
223
}
224
}
225
```
226
227
### Custom Action Implementation
228
229
```java
230
public class DataValidationAction implements CustomAction {
231
232
@Override
233
public void configure(CustomActionConfigurer configurer) {
234
configurer.setName("DataValidation");
235
configurer.setDescription("Validates input data quality");
236
configurer.useDataset("inputData");
237
configurer.useDataset("validationRules");
238
}
239
240
@Override
241
public void run(CustomActionContext context) throws Exception {
242
ObjectStore<DataRecord> inputData = context.getDataset("inputData");
243
KeyValueTable rules = context.getDataset("validationRules");
244
245
int totalRecords = 0;
246
int validRecords = 0;
247
int invalidRecords = 0;
248
249
// Validate each record
250
try (CloseableIterator<KeyValue<byte[], DataRecord>> iterator = inputData.scan(null, null)) {
251
while (iterator.hasNext()) {
252
KeyValue<byte[], DataRecord> entry = iterator.next();
253
DataRecord record = entry.getValue();
254
totalRecords++;
255
256
if (validateRecord(record, rules)) {
257
validRecords++;
258
} else {
259
invalidRecords++;
260
// Log invalid record details
261
context.getMetrics().count("validation.invalid", 1);
262
}
263
}
264
}
265
266
// Store validation results in workflow token for downstream use
267
WorkflowToken token = context.getWorkflowToken();
268
token.put("validation.total", totalRecords);
269
token.put("validation.valid", validRecords);
270
token.put("validation.invalid", invalidRecords);
271
272
double validationRate = (double) validRecords / totalRecords;
273
token.put("validation.rate", validationRate);
274
275
// Fail the workflow if validation rate is too low
276
if (validationRate < 0.95) {
277
throw new RuntimeException("Data validation failed: only " +
278
(validationRate * 100) + "% of records are valid");
279
}
280
}
281
282
private boolean validateRecord(DataRecord record, KeyValueTable rules) {
283
// Implement validation logic based on rules
284
return record != null && record.getId() != null && !record.getId().isEmpty();
285
}
286
}
287
```
288
289
### Workflow with Plugin Integration
290
291
```java
292
public class PluginWorkflow extends AbstractWorkflow {
293
294
@Override
295
public void configure(WorkflowConfigurer configurer) {
296
configurer.setName("PluginBasedWorkflow");
297
298
// Use external data source plugin
299
configurer.usePlugin("source", "externalAPI", "apiSource",
300
PluginProperties.builder()
301
.add("endpoint", "https://api.example.com/data")
302
.add("apiKey", "${api.key}")
303
.build());
304
305
configurer.addAction(new FetchExternalDataAction());
306
configurer.addMapReduce("ProcessExternalData");
307
configurer.addAction(new PublishResultsAction());
308
}
309
}
310
311
public class FetchExternalDataAction implements CustomAction {
312
313
@Override
314
public void configure(CustomActionConfigurer configurer) {
315
configurer.setName("FetchExternalData");
316
configurer.useDataset("externalData");
317
}
318
319
@Override
320
public void run(CustomActionContext context) throws Exception {
321
ExternalDataSource source = context.getPluginContext().newPluginInstance("apiSource");
322
ObjectStore<ExternalRecord> dataStore = context.getDataset("externalData");
323
324
List<ExternalRecord> records = source.fetchData();
325
326
for (int i = 0; i < records.size(); i++) {
327
dataStore.write("record_" + i, records.get(i));
328
}
329
330
context.getWorkflowToken().put("external.records.count", records.size());
331
context.getMetrics().count("external.records.fetched", records.size());
332
}
333
}
334
```