or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

annotations.mdapplication-framework.mddataset-management.mdindex.mdmapreduce-programs.mdplugin-framework.mdscheduling.mdservice-programs.mdspark-programs.mdsystem-services.mdtransactions.mdworker-programs.mdworkflow-programs.md

workflow-programs.mddocs/

0

# Workflow Programs

1

2

Workflow programs in CDAP provide orchestration capabilities for coordinating the execution of multiple programs with support for conditional logic, parallel execution, and state management.

3

4

## Core Workflow Interfaces

5

6

### Workflow

7

8

```java { .api }

9

public interface Workflow extends ProgramLifecycle<WorkflowContext> {

10

void configure(WorkflowConfigurer configurer);

11

}

12

```

13

14

### AbstractWorkflow

15

16

```java { .api }

17

public abstract class AbstractWorkflow implements Workflow {

18

public abstract void configure(WorkflowConfigurer configurer);

19

20

@Override

21

public void initialize(WorkflowContext context) throws Exception {

22

// Optional initialization

23

}

24

25

@Override

26

public void destroy() {

27

// Optional cleanup

28

}

29

}

30

```

31

32

## Workflow Configuration

33

34

### WorkflowConfigurer

35

36

```java { .api }

37

public interface WorkflowConfigurer extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {

38

void addMapReduce(String name);

39

void addSpark(String name);

40

void addAction(CustomAction action);

41

42

void fork();

43

void also();

44

void join();

45

46

void condition(Condition condition);

47

void otherwise();

48

void end();

49

}

50

```

51

52

### WorkflowContext

53

54

```java { .api }

55

public interface WorkflowContext extends RuntimeContext, DatasetContext {

56

WorkflowToken getWorkflowToken();

57

Map<String, String> getRuntimeArguments();

58

59

WorkflowNodeState getNodeState(String nodeName);

60

Map<String, WorkflowNodeState> getNodeStates();

61

}

62

```

63

64

## Workflow Token and State Management

65

66

### WorkflowToken

67

68

```java { .api }

69

public interface WorkflowToken {

70

void put(String key, Value value);

71

void put(String key, String value);

72

void put(String key, int value);

73

void put(String key, long value);

74

void put(String key, double value);

75

void put(String key, boolean value);

76

77

Value get(String key);

78

Value get(String key, String nodeName);

79

Value get(String key, WorkflowNodeScope scope);

80

81

Map<String, Value> getAll();

82

Map<String, Value> getAll(WorkflowNodeScope scope);

83

84

void putAll(Map<String, String> values);

85

}

86

```

87

88

## Conditions and Custom Actions

89

90

### Condition

91

92

```java { .api }

93

public interface Condition extends ProgramLifecycle<WorkflowContext> {

94

void configure(ConditionConfigurer configurer);

95

boolean apply(WorkflowContext context) throws Exception;

96

}

97

```

98

99

### AbstractCondition

100

101

```java { .api }

102

public abstract class AbstractCondition implements Condition {

103

public abstract void configure(ConditionConfigurer configurer);

104

public abstract boolean apply(WorkflowContext context) throws Exception;

105

106

@Override

107

public void initialize(WorkflowContext context) throws Exception {

108

// Optional initialization

109

}

110

111

@Override

112

public void destroy() {

113

// Optional cleanup

114

}

115

}

116

```

117

118

### CustomAction

119

120

```java { .api }

121

public interface CustomAction extends ProgramLifecycle<CustomActionContext> {

122

void configure(CustomActionConfigurer configurer);

123

void run(CustomActionContext context) throws Exception;

124

}

125

```

126

127

## Usage Examples

128

129

### Basic Workflow

130

131

```java

132

public class DataProcessingWorkflow extends AbstractWorkflow {

133

134

@Override

135

public void configure(WorkflowConfigurer configurer) {

136

configurer.setName("DataProcessingWorkflow");

137

configurer.setDescription("Processes daily data with validation and aggregation");

138

139

// Sequential execution

140

configurer.addAction(new DataValidationAction());

141

configurer.addMapReduce("DataCleaning");

142

configurer.addSpark("DataAggregation");

143

configurer.addAction(new NotificationAction());

144

}

145

}

146

```

147

148

### Workflow with Conditional Logic

149

150

```java

151

public class ConditionalWorkflow extends AbstractWorkflow {

152

153

@Override

154

public void configure(WorkflowConfigurer configurer) {

155

configurer.setName("ConditionalProcessing");

156

157

configurer.addAction(new DataCheckAction());

158

159

// Conditional execution based on data availability

160

configurer.condition(new DataAvailabilityCondition())

161

.addMapReduce("ProcessLargeDataset")

162

.addSpark("ComplexAnalytics")

163

.otherwise()

164

.addMapReduce("ProcessSmallDataset")

165

.addAction(new SimpleReportAction())

166

.end();

167

168

configurer.addAction(new CleanupAction());

169

}

170

}

171

172

public class DataAvailabilityCondition extends AbstractCondition {

173

174

@Override

175

public void configure(ConditionConfigurer configurer) {

176

configurer.setName("DataAvailabilityCheck");

177

configurer.useDataset("inputData");

178

}

179

180

@Override

181

public boolean apply(WorkflowContext context) throws Exception {

182

FileSet inputData = context.getDataset("inputData");

183

184

// Check if large dataset is available

185

Location dataLocation = inputData.getLocation("large-dataset");

186

if (dataLocation.exists()) {

187

long fileSize = dataLocation.length();

188

context.getWorkflowToken().put("dataSize", fileSize);

189

return fileSize > 1000000; // 1MB threshold

190

}

191

192

return false;

193

}

194

}

195

```

196

197

### Workflow with Fork-Join Parallelism

198

199

```java

200

public class ParallelWorkflow extends AbstractWorkflow {

201

202

@Override

203

public void configure(WorkflowConfigurer configurer) {

204

configurer.setName("ParallelProcessing");

205

206

configurer.addAction(new PrepareDataAction());

207

208

// Fork for parallel execution

209

configurer.fork()

210

.addMapReduce("ProcessRegionA")

211

.addSpark("AnalyzeRegionA")

212

.also()

213

.addMapReduce("ProcessRegionB")

214

.addSpark("AnalyzeRegionB")

215

.also()

216

.addMapReduce("ProcessRegionC")

217

.addSpark("AnalyzeRegionC")

218

.join();

219

220

// Continue with sequential execution after join

221

configurer.addSpark("CombineResults");

222

configurer.addAction(new GenerateReportAction());

223

}

224

}

225

```

226

227

### Custom Action Implementation

228

229

```java

230

public class DataValidationAction implements CustomAction {

231

232

@Override

233

public void configure(CustomActionConfigurer configurer) {

234

configurer.setName("DataValidation");

235

configurer.setDescription("Validates input data quality");

236

configurer.useDataset("inputData");

237

configurer.useDataset("validationRules");

238

}

239

240

@Override

241

public void run(CustomActionContext context) throws Exception {

242

ObjectStore<DataRecord> inputData = context.getDataset("inputData");

243

KeyValueTable rules = context.getDataset("validationRules");

244

245

int totalRecords = 0;

246

int validRecords = 0;

247

int invalidRecords = 0;

248

249

// Validate each record

250

try (CloseableIterator<KeyValue<byte[], DataRecord>> iterator = inputData.scan(null, null)) {

251

while (iterator.hasNext()) {

252

KeyValue<byte[], DataRecord> entry = iterator.next();

253

DataRecord record = entry.getValue();

254

totalRecords++;

255

256

if (validateRecord(record, rules)) {

257

validRecords++;

258

} else {

259

invalidRecords++;

260

// Log invalid record details

261

context.getMetrics().count("validation.invalid", 1);

262

}

263

}

264

}

265

266

// Store validation results in workflow token for downstream use

267

WorkflowToken token = context.getWorkflowToken();

268

token.put("validation.total", totalRecords);

269

token.put("validation.valid", validRecords);

270

token.put("validation.invalid", invalidRecords);

271

272

double validationRate = (double) validRecords / totalRecords;

273

token.put("validation.rate", validationRate);

274

275

// Fail the workflow if validation rate is too low

276

if (validationRate < 0.95) {

277

throw new RuntimeException("Data validation failed: only " +

278

(validationRate * 100) + "% of records are valid");

279

}

280

}

281

282

private boolean validateRecord(DataRecord record, KeyValueTable rules) {

283

// Implement validation logic based on rules

284

return record != null && record.getId() != null && !record.getId().isEmpty();

285

}

286

}

287

```

288

289

### Workflow with Plugin Integration

290

291

```java

292

public class PluginWorkflow extends AbstractWorkflow {

293

294

@Override

295

public void configure(WorkflowConfigurer configurer) {

296

configurer.setName("PluginBasedWorkflow");

297

298

// Use external data source plugin

299

configurer.usePlugin("source", "externalAPI", "apiSource",

300

PluginProperties.builder()

301

.add("endpoint", "https://api.example.com/data")

302

.add("apiKey", "${api.key}")

303

.build());

304

305

configurer.addAction(new FetchExternalDataAction());

306

configurer.addMapReduce("ProcessExternalData");

307

configurer.addAction(new PublishResultsAction());

308

}

309

}

310

311

public class FetchExternalDataAction implements CustomAction {

312

313

@Override

314

public void configure(CustomActionConfigurer configurer) {

315

configurer.setName("FetchExternalData");

316

configurer.useDataset("externalData");

317

}

318

319

@Override

320

public void run(CustomActionContext context) throws Exception {

321

ExternalDataSource source = context.getPluginContext().newPluginInstance("apiSource");

322

ObjectStore<ExternalRecord> dataStore = context.getDataset("externalData");

323

324

List<ExternalRecord> records = source.fetchData();

325

326

for (int i = 0; i < records.size(); i++) {

327

dataStore.write("record_" + i, records.get(i));

328

}

329

330

context.getWorkflowToken().put("external.records.count", records.size());

331

context.getMetrics().count("external.records.fetched", records.size());

332

}

333

}

334

```