0
# Actions and Conditions
1
2
Pipeline actions and conditional execution for workflow control, external integrations, and dynamic pipeline behavior in CDAP ETL.
3
4
## Actions
5
6
### Action
7
8
Base abstract class for pipeline actions that execute custom logic or external operations.
9
10
```java { .api }
11
package io.cdap.cdap.etl.api.action;
12
13
public abstract class Action
14
implements PipelineConfigurable,
15
SubmitterLifecycle<ActionContext>,
16
StageLifecycle<ActionContext> {
17
18
public static final String PLUGIN_TYPE = "action";
19
20
// Configuration lifecycle
21
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {}
22
23
// Submission lifecycle
24
public void prepareRun(ActionContext context) throws Exception {}
25
public void onRunFinish(boolean succeeded, ActionContext context) {}
26
27
// Stage lifecycle
28
public void initialize(ActionContext context) throws Exception {}
29
public void destroy() {}
30
31
// Action execution
32
public abstract void run() throws Exception;
33
}
34
```
35
36
**Action Implementation Example:**
37
```java
38
@Plugin(type = Action.PLUGIN_TYPE)
39
@Name("DatabaseCleanup")
40
@Description("Cleans up old records from database tables")
41
public class DatabaseCleanupAction extends Action {
42
43
private final Config config;
44
private ActionContext actionContext;
45
46
@Override
47
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
48
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
49
FailureCollector collector = stageConfigurer.getFailureCollector();
50
51
// Validate configuration
52
config.validate(collector);
53
}
54
55
@Override
56
public void prepareRun(ActionContext context) throws Exception {
57
// Validate database connectivity
58
try (Connection conn = getConnection()) {
59
if (!conn.isValid(30)) {
60
throw new Exception("Database connection is not valid");
61
}
62
}
63
}
64
65
@Override
66
public void initialize(ActionContext context) throws Exception {
67
this.actionContext = context;
68
}
69
70
@Override
71
public void run() throws Exception {
72
StageMetrics metrics = actionContext.getMetrics();
73
SettableArguments arguments = actionContext.getArguments();
74
75
try (Connection conn = getConnection()) {
76
for (String tableName : config.tablesToClean) {
77
long deletedRecords = cleanupTable(conn, tableName);
78
79
// Record metrics
80
metrics.count("records.deleted." + tableName, deletedRecords);
81
82
// Set runtime arguments for downstream stages
83
arguments.set("cleanup." + tableName + ".deleted", String.valueOf(deletedRecords));
84
85
LOG.info("Deleted {} old records from table {}", deletedRecords, tableName);
86
}
87
88
// Set overall completion status
89
arguments.set("cleanup.completed", "true");
90
arguments.set("cleanup.timestamp", Instant.now().toString());
91
92
} catch (SQLException e) {
93
metrics.count("cleanup.errors", 1);
94
throw new Exception("Database cleanup failed", e);
95
}
96
}
97
98
private long cleanupTable(Connection conn, String tableName) throws SQLException {
99
String deleteSQL = String.format(
100
"DELETE FROM %s WHERE created_date < ?",
101
tableName
102
);
103
104
try (PreparedStatement stmt = conn.prepareStatement(deleteSQL)) {
105
// Delete records older than retention days
106
Timestamp cutoffDate = Timestamp.from(
107
Instant.now().minus(config.retentionDays, ChronoUnit.DAYS)
108
);
109
stmt.setTimestamp(1, cutoffDate);
110
111
return stmt.executeUpdate();
112
}
113
}
114
115
private Connection getConnection() throws SQLException {
116
return DriverManager.getConnection(
117
config.connectionString,
118
config.username,
119
config.password
120
);
121
}
122
}
123
```
124
125
### ActionContext
126
127
Context interface for action execution providing access to runtime services.
128
129
```java { .api }
130
package io.cdap.cdap.etl.api.action;
131
132
public interface ActionContext extends StageContext {
133
/**
134
* Get settable arguments for passing data to other stages.
135
*/
136
SettableArguments getArguments();
137
}
138
```
139
140
### SettableArguments
141
142
Interface for arguments that can be modified by actions.
143
144
```java { .api }
145
package io.cdap.cdap.etl.api.action;
146
147
public interface SettableArguments extends Arguments {
148
/**
149
* Set argument value.
150
*/
151
void set(String name, String value);
152
}
153
```
154
155
## Advanced Action Examples
156
157
### File Processing Action
158
159
```java
160
@Plugin(type = Action.PLUGIN_TYPE)
161
@Name("FileProcessor")
162
@Description("Processes files and prepares them for pipeline consumption")
163
public class FileProcessorAction extends Action {
164
165
private final Config config;
166
167
@Override
168
public void run() throws Exception {
169
ActionContext context = getContext();
170
SettableArguments arguments = context.getArguments();
171
StageMetrics metrics = context.getMetrics();
172
173
// Get input directory
174
String inputDir = config.inputDirectory;
175
String outputDir = config.outputDirectory;
176
String archiveDir = config.archiveDirectory;
177
178
// Create directories if they don't exist
179
createDirectoryIfNotExists(outputDir);
180
createDirectoryIfNotExists(archiveDir);
181
182
// Process files
183
List<String> processedFiles = new ArrayList<>();
184
int totalFiles = 0;
185
int successfulFiles = 0;
186
int errorFiles = 0;
187
188
try (DirectoryStream<Path> stream = Files.newDirectoryStream(
189
Paths.get(inputDir), config.filePattern)) {
190
191
for (Path filePath : stream) {
192
totalFiles++;
193
String fileName = filePath.getFileName().toString();
194
195
try {
196
// Process individual file
197
boolean processed = processFile(filePath, outputDir);
198
199
if (processed) {
200
successfulFiles++;
201
processedFiles.add(fileName);
202
203
// Archive processed file
204
if (config.archiveProcessedFiles) {
205
archiveFile(filePath, archiveDir);
206
}
207
} else {
208
errorFiles++;
209
LOG.warn("Failed to process file: {}", fileName);
210
}
211
212
} catch (Exception e) {
213
errorFiles++;
214
LOG.error("Error processing file: {}", fileName, e);
215
216
// Move error files to error directory
217
if (config.errorDirectory != null) {
218
moveFileToErrorDirectory(filePath, config.errorDirectory);
219
}
220
}
221
}
222
}
223
224
// Record metrics
225
metrics.count("files.total", totalFiles);
226
metrics.count("files.successful", successfulFiles);
227
metrics.count("files.errors", errorFiles);
228
229
// Set arguments for downstream stages
230
arguments.set("processed.file.count", String.valueOf(successfulFiles));
231
arguments.set("processed.files", String.join(",", processedFiles));
232
arguments.set("output.directory", outputDir);
233
234
if (successfulFiles == 0 && config.failOnNoFiles) {
235
throw new Exception("No files were successfully processed");
236
}
237
238
LOG.info("Processed {} files successfully, {} errors out of {} total files",
239
successfulFiles, errorFiles, totalFiles);
240
}
241
242
private boolean processFile(Path inputFile, String outputDir) throws IOException {
243
String fileName = inputFile.getFileName().toString();
244
Path outputFile = Paths.get(outputDir, fileName);
245
246
// Apply file transformations based on type
247
String fileExtension = getFileExtension(fileName);
248
249
switch (fileExtension.toLowerCase()) {
250
case "csv":
251
return processCSVFile(inputFile, outputFile);
252
case "json":
253
return processJSONFile(inputFile, outputFile);
254
case "xml":
255
return processXMLFile(inputFile, outputFile);
256
default:
257
// For unknown types, just copy
258
Files.copy(inputFile, outputFile, StandardCopyOption.REPLACE_EXISTING);
259
return true;
260
}
261
}
262
263
private boolean processCSVFile(Path inputFile, Path outputFile) throws IOException {
264
// CSV-specific processing: validate format, clean data, etc.
265
try (BufferedReader reader = Files.newBufferedReader(inputFile);
266
BufferedWriter writer = Files.newBufferedWriter(outputFile)) {
267
268
String line;
269
int lineNumber = 0;
270
boolean hasHeader = config.csvHasHeader;
271
272
while ((line = reader.readLine()) != null) {
273
lineNumber++;
274
275
// Skip header if configured
276
if (hasHeader && lineNumber == 1) {
277
writer.write(line);
278
writer.newLine();
279
continue;
280
}
281
282
// Validate and clean CSV line
283
String cleanedLine = cleanCSVLine(line);
284
if (cleanedLine != null) {
285
writer.write(cleanedLine);
286
writer.newLine();
287
}
288
}
289
290
return true;
291
}
292
}
293
294
private String cleanCSVLine(String line) {
295
if (line == null || line.trim().isEmpty()) {
296
return null;
297
}
298
299
// Apply CSV cleaning rules
300
String[] fields = line.split(config.csvDelimiter);
301
302
// Validate field count
303
if (config.expectedFieldCount > 0 && fields.length != config.expectedFieldCount) {
304
LOG.warn("Invalid field count in CSV line: expected {}, got {}",
305
config.expectedFieldCount, fields.length);
306
return null;
307
}
308
309
// Clean individual fields
310
for (int i = 0; i < fields.length; i++) {
311
fields[i] = fields[i].trim();
312
313
// Remove quotes if present
314
if (fields[i].startsWith("\"") && fields[i].endsWith("\"")) {
315
fields[i] = fields[i].substring(1, fields[i].length() - 1);
316
}
317
}
318
319
return String.join(config.csvDelimiter, fields);
320
}
321
}
322
```
323
324
### External System Integration Action
325
326
```java
327
@Plugin(type = Action.PLUGIN_TYPE)
328
@Name("APINotification")
329
@Description("Sends notifications to external systems via REST API")
330
public class APINotificationAction extends Action {
331
332
private final Config config;
333
private HttpClient httpClient;
334
335
@Override
336
public void initialize(ActionContext context) throws Exception {
337
// Initialize HTTP client
338
this.httpClient = HttpClient.newBuilder()
339
.connectTimeout(Duration.ofSeconds(config.connectTimeoutSeconds))
340
.build();
341
}
342
343
@Override
344
public void run() throws Exception {
345
ActionContext context = getContext();
346
Arguments arguments = context.getArguments();
347
StageMetrics metrics = context.getMetrics();
348
349
// Build notification payload
350
Map<String, Object> payload = buildNotificationPayload(arguments);
351
352
// Convert to JSON
353
String jsonPayload = new Gson().toJson(payload);
354
355
// Send notification
356
HttpRequest request = HttpRequest.newBuilder()
357
.uri(URI.create(config.webhookUrl))
358
.header("Content-Type", "application/json")
359
.header("Authorization", "Bearer " + config.apiToken)
360
.POST(HttpRequest.BodyPublishers.ofString(jsonPayload))
361
.timeout(Duration.ofSeconds(config.requestTimeoutSeconds))
362
.build();
363
364
try {
365
HttpResponse<String> response = httpClient.send(request,
366
HttpResponse.BodyHandlers.ofString());
367
368
if (response.statusCode() >= 200 && response.statusCode() < 300) {
369
metrics.count("notifications.success", 1);
370
LOG.info("Notification sent successfully: {}", response.statusCode());
371
372
// Parse response if needed
373
if (config.parseResponse) {
374
parseAndSetResponse(response.body(), context.getArguments());
375
}
376
377
} else {
378
metrics.count("notifications.failure", 1);
379
String errorMsg = String.format("Notification failed with status %d: %s",
380
response.statusCode(), response.body());
381
382
if (config.failOnError) {
383
throw new Exception(errorMsg);
384
} else {
385
LOG.warn(errorMsg);
386
}
387
}
388
389
} catch (IOException | InterruptedException e) {
390
metrics.count("notifications.error", 1);
391
392
if (config.failOnError) {
393
throw new Exception("Failed to send notification", e);
394
} else {
395
LOG.error("Error sending notification", e);
396
}
397
}
398
}
399
400
private Map<String, Object> buildNotificationPayload(Arguments arguments) {
401
Map<String, Object> payload = new HashMap<>();
402
403
// Add basic pipeline information
404
payload.put("pipelineName", arguments.get("pipeline.name"));
405
payload.put("pipelineRunId", arguments.get("pipeline.run.id"));
406
payload.put("timestamp", Instant.now().toString());
407
408
// Add custom fields from configuration
409
if (config.customFields != null) {
410
for (Map.Entry<String, String> entry : config.customFields.entrySet()) {
411
String key = entry.getKey();
412
String template = entry.getValue();
413
414
// Resolve template variables
415
String value = resolveTemplate(template, arguments);
416
payload.put(key, value);
417
}
418
}
419
420
// Add dynamic arguments
421
if (config.includeArguments != null) {
422
Map<String, String> dynamicArgs = new HashMap<>();
423
for (String argName : config.includeArguments) {
424
String argValue = arguments.get(argName);
425
if (argValue != null) {
426
dynamicArgs.put(argName, argValue);
427
}
428
}
429
payload.put("arguments", dynamicArgs);
430
}
431
432
return payload;
433
}
434
435
private String resolveTemplate(String template, Arguments arguments) {
436
String resolved = template;
437
438
// Simple template variable resolution: ${variable.name}
439
Pattern pattern = Pattern.compile("\\$\\{([^}]+)\\}");
440
Matcher matcher = pattern.matcher(template);
441
442
while (matcher.find()) {
443
String variableName = matcher.group(1);
444
String variableValue = arguments.get(variableName);
445
446
if (variableValue != null) {
447
resolved = resolved.replace(matcher.group(0), variableValue);
448
}
449
}
450
451
return resolved;
452
}
453
454
@Override
455
public void destroy() {
456
// Cleanup HTTP client resources
457
if (httpClient != null) {
458
// HttpClient doesn't need explicit cleanup in Java 11+
459
}
460
}
461
}
462
```
463
464
## Conditions
465
466
### Condition
467
468
Base abstract class for conditional execution in pipelines.
469
470
```java { .api }
471
package io.cdap.cdap.etl.api.condition;
472
473
public abstract class Condition
474
implements PipelineConfigurable,
475
SubmitterLifecycle<ConditionContext>,
476
StageLifecycle<ConditionContext> {
477
478
public static final String PLUGIN_TYPE = "condition";
479
480
// Configuration lifecycle
481
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {}
482
483
// Submission lifecycle
484
public void prepareRun(ConditionContext context) throws Exception {}
485
public void onRunFinish(boolean succeeded, ConditionContext context) {}
486
487
// Stage lifecycle
488
public void initialize(ConditionContext context) throws Exception {}
489
public void destroy() {}
490
491
// Condition evaluation
492
public abstract ConditionResult apply() throws Exception;
493
}
494
```
495
496
**Condition Implementation Example:**
497
```java
498
@Plugin(type = Condition.PLUGIN_TYPE)
499
@Name("FileExistenceCondition")
500
@Description("Checks if specified files exist before proceeding")
501
public class FileExistenceCondition extends Condition {
502
503
private final Config config;
504
private ConditionContext conditionContext;
505
506
@Override
507
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
508
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
509
FailureCollector collector = stageConfigurer.getFailureCollector();
510
511
config.validate(collector);
512
}
513
514
@Override
515
public void initialize(ConditionContext context) throws Exception {
516
this.conditionContext = context;
517
}
518
519
@Override
520
public ConditionResult apply() throws Exception {
521
StageMetrics metrics = conditionContext.getMetrics();
522
Arguments arguments = conditionContext.getArguments();
523
524
List<String> missingFiles = new ArrayList<>();
525
List<String> existingFiles = new ArrayList<>();
526
527
// Check each required file
528
for (String filePath : config.requiredFiles) {
529
// Resolve file path with runtime arguments
530
String resolvedPath = resolveFilePath(filePath, arguments);
531
532
if (fileExists(resolvedPath)) {
533
existingFiles.add(resolvedPath);
534
metrics.count("files.found", 1);
535
} else {
536
missingFiles.add(resolvedPath);
537
metrics.count("files.missing", 1);
538
}
539
}
540
541
// Determine condition result
542
boolean conditionMet = false;
543
String message;
544
545
switch (config.checkMode) {
546
case ALL_MUST_EXIST:
547
conditionMet = missingFiles.isEmpty();
548
message = conditionMet ?
549
"All required files exist" :
550
"Missing files: " + String.join(", ", missingFiles);
551
break;
552
553
case ANY_MUST_EXIST:
554
conditionMet = !existingFiles.isEmpty();
555
message = conditionMet ?
556
"Found files: " + String.join(", ", existingFiles) :
557
"No required files found";
558
break;
559
560
case NONE_MUST_EXIST:
561
conditionMet = existingFiles.isEmpty();
562
message = conditionMet ?
563
"No files exist (as expected)" :
564
"Unexpected files found: " + String.join(", ", existingFiles);
565
break;
566
567
default:
568
throw new IllegalArgumentException("Unknown check mode: " + config.checkMode);
569
}
570
571
LOG.info("File existence condition: {} - {}", conditionMet, message);
572
return new ConditionResult(conditionMet, message);
573
}
574
575
private String resolveFilePath(String filePath, Arguments arguments) {
576
String resolved = filePath;
577
578
// Replace runtime argument placeholders
579
Pattern pattern = Pattern.compile("\\$\\{([^}]+)\\}");
580
Matcher matcher = pattern.matcher(filePath);
581
582
while (matcher.find()) {
583
String argName = matcher.group(1);
584
String argValue = arguments.get(argName);
585
586
if (argValue != null) {
587
resolved = resolved.replace(matcher.group(0), argValue);
588
}
589
}
590
591
return resolved;
592
}
593
594
private boolean fileExists(String filePath) {
595
try {
596
Path path = Paths.get(filePath);
597
return Files.exists(path) && Files.isReadable(path);
598
} catch (Exception e) {
599
LOG.warn("Error checking file existence: {}", filePath, e);
600
return false;
601
}
602
}
603
}
604
```
605
606
### ConditionContext
607
608
Context interface for condition evaluation.
609
610
```java { .api }
611
package io.cdap.cdap.etl.api.condition;
612
613
public interface ConditionContext extends StageContext {
614
/**
615
* Get stage statistics for previous stages.
616
*/
617
StageStatistics getStageStatistics(String stageName);
618
}
619
```
620
621
### StageStatistics
622
623
Interface for accessing statistics from pipeline stages.
624
625
```java { .api }
626
package io.cdap.cdap.etl.api.condition;
627
628
public interface StageStatistics {
629
/**
630
* Get input record count.
631
*/
632
long getInputRecordsCount();
633
634
/**
635
* Get output record count.
636
*/
637
long getOutputRecordsCount();
638
639
/**
640
* Get error record count.
641
*/
642
long getErrorRecordsCount();
643
}
644
```
645
646
## Advanced Condition Examples
647
648
### Data Quality Condition
649
650
```java
651
@Plugin(type = Condition.PLUGIN_TYPE)
652
@Name("DataQualityCondition")
653
@Description("Checks data quality metrics before proceeding")
654
public class DataQualityCondition extends Condition {
655
656
private final Config config;
657
658
@Override
659
public ConditionResult apply() throws Exception {
660
ConditionContext context = getContext();
661
662
boolean qualityMet = true;
663
StringBuilder messageBuilder = new StringBuilder();
664
List<String> failures = new ArrayList<>();
665
666
// Check each configured stage's statistics
667
for (QualityCheck check : config.qualityChecks) {
668
StageStatistics stats = context.getStageStatistics(check.stageName);
669
670
if (stats == null) {
671
failures.add("No statistics available for stage: " + check.stageName);
672
qualityMet = false;
673
continue;
674
}
675
676
// Check error rate
677
if (check.maxErrorRate != null) {
678
long totalRecords = stats.getInputRecordsCount();
679
long errorRecords = stats.getErrorRecordsCount();
680
681
if (totalRecords > 0) {
682
double errorRate = (double) errorRecords / totalRecords;
683
if (errorRate > check.maxErrorRate) {
684
failures.add(String.format(
685
"Stage %s error rate %.2f%% exceeds limit %.2f%%",
686
check.stageName, errorRate * 100, check.maxErrorRate * 100
687
));
688
qualityMet = false;
689
}
690
}
691
}
692
693
// Check minimum record count
694
if (check.minRecordCount != null) {
695
long outputRecords = stats.getOutputRecordsCount();
696
if (outputRecords < check.minRecordCount) {
697
failures.add(String.format(
698
"Stage %s output count %d below minimum %d",
699
check.stageName, outputRecords, check.minRecordCount
700
));
701
qualityMet = false;
702
}
703
}
704
705
// Check maximum record count
706
if (check.maxRecordCount != null) {
707
long outputRecords = stats.getOutputRecordsCount();
708
if (outputRecords > check.maxRecordCount) {
709
failures.add(String.format(
710
"Stage %s output count %d exceeds maximum %d",
711
check.stageName, outputRecords, check.maxRecordCount
712
));
713
qualityMet = false;
714
}
715
}
716
}
717
718
String message;
719
if (qualityMet) {
720
message = "All data quality checks passed";
721
} else {
722
message = "Data quality failures: " + String.join("; ", failures);
723
}
724
725
return new ConditionResult(qualityMet, message);
726
}
727
728
private static class QualityCheck {
729
public String stageName;
730
public Double maxErrorRate;
731
public Long minRecordCount;
732
public Long maxRecordCount;
733
}
734
}
735
```
736
737
### Time-Based Condition
738
739
```java
740
@Plugin(type = Condition.PLUGIN_TYPE)
741
@Name("TimeWindowCondition")
742
@Description("Checks if current time is within allowed execution window")
743
public class TimeWindowCondition extends Condition {
744
745
private final Config config;
746
747
@Override
748
public ConditionResult apply() throws Exception {
749
ZonedDateTime now = ZonedDateTime.now(ZoneId.of(config.timezone));
750
751
// Check day of week
752
if (config.allowedDaysOfWeek != null && !config.allowedDaysOfWeek.isEmpty()) {
753
DayOfWeek currentDay = now.getDayOfWeek();
754
if (!config.allowedDaysOfWeek.contains(currentDay)) {
755
return new ConditionResult(false,
756
String.format("Current day %s not in allowed days: %s",
757
currentDay, config.allowedDaysOfWeek));
758
}
759
}
760
761
// Check time window
762
if (config.startTime != null && config.endTime != null) {
763
LocalTime currentTime = now.toLocalTime();
764
LocalTime startTime = LocalTime.parse(config.startTime);
765
LocalTime endTime = LocalTime.parse(config.endTime);
766
767
boolean inWindow;
768
if (startTime.isBefore(endTime)) {
769
// Normal window (e.g., 09:00-17:00)
770
inWindow = currentTime.isAfter(startTime) && currentTime.isBefore(endTime);
771
} else {
772
// Overnight window (e.g., 22:00-06:00)
773
inWindow = currentTime.isAfter(startTime) || currentTime.isBefore(endTime);
774
}
775
776
if (!inWindow) {
777
return new ConditionResult(false,
778
String.format("Current time %s not in allowed window %s-%s",
779
currentTime, config.startTime, config.endTime));
780
}
781
}
782
783
// Check date range
784
if (config.startDate != null || config.endDate != null) {
785
LocalDate currentDate = now.toLocalDate();
786
787
if (config.startDate != null) {
788
LocalDate startDate = LocalDate.parse(config.startDate);
789
if (currentDate.isBefore(startDate)) {
790
return new ConditionResult(false,
791
String.format("Current date %s before start date %s",
792
currentDate, config.startDate));
793
}
794
}
795
796
if (config.endDate != null) {
797
LocalDate endDate = LocalDate.parse(config.endDate);
798
if (currentDate.isAfter(endDate)) {
799
return new ConditionResult(false,
800
String.format("Current date %s after end date %s",
801
currentDate, config.endDate));
802
}
803
}
804
}
805
806
return new ConditionResult(true,
807
String.format("Time window check passed at %s", now));
808
}
809
}
810
```
811
812
## Workflow Integration
813
814
### Conditional Pipeline Execution
815
816
```java
817
public class ConditionalPipelineOrchestrator {
818
819
public static void executeConditionalStages(List<ConditionalStage> stages,
820
PipelineContext context) throws Exception {
821
822
for (ConditionalStage stage : stages) {
823
if (stage.hasCondition()) {
824
// Evaluate condition
825
ConditionResult result = stage.getCondition().apply();
826
827
if (result.isConditionMet()) {
828
LOG.info("Condition met for stage {}: {}",
829
stage.getName(), result.getMessage());
830
831
// Execute stage
832
executeStage(stage, context);
833
} else {
834
LOG.info("Condition not met for stage {}: {}",
835
stage.getName(), result.getMessage());
836
837
// Handle condition failure
838
if (stage.isRequired()) {
839
throw new Exception("Required stage condition failed: " +
840
stage.getName());
841
} else {
842
// Skip optional stage
843
continue;
844
}
845
}
846
} else {
847
// Execute unconditionally
848
executeStage(stage, context);
849
}
850
}
851
}
852
853
private static void executeStage(ConditionalStage stage, PipelineContext context)
854
throws Exception {
855
// Execute pre-stage actions
856
for (Action preAction : stage.getPreActions()) {
857
preAction.run();
858
}
859
860
// Execute main stage logic
861
stage.execute(context);
862
863
// Execute post-stage actions
864
for (Action postAction : stage.getPostActions()) {
865
postAction.run();
866
}
867
}
868
}
869
```
870
871
### Dynamic Argument Passing
872
873
```java
874
public class ArgumentChainProcessor {
875
876
public static void processArgumentChain(List<Action> actions,
877
Map<String, String> initialArguments)
878
throws Exception {
879
880
// Create mutable arguments map
881
Map<String, String> currentArguments = new HashMap<>(initialArguments);
882
883
for (Action action : actions) {
884
// Create settable arguments for this action
885
SettableArguments settableArgs = new MapBackedSettableArguments(currentArguments);
886
887
// Create action context with current arguments
888
ActionContext actionContext = createActionContext(settableArgs);
889
890
// Initialize and run action
891
action.initialize(actionContext);
892
try {
893
action.run();
894
} finally {
895
action.destroy();
896
}
897
898
// Update arguments for next action
899
currentArguments.putAll(settableArgs.getUpdatedArguments());
900
}
901
}
902
903
private static class MapBackedSettableArguments implements SettableArguments {
904
private final Map<String, String> arguments;
905
private final Map<String, String> updates;
906
907
public MapBackedSettableArguments(Map<String, String> arguments) {
908
this.arguments = new HashMap<>(arguments);
909
this.updates = new HashMap<>();
910
}
911
912
@Override
913
public void set(String name, String value) {
914
arguments.put(name, value);
915
updates.put(name, value);
916
}
917
918
@Override
919
public boolean has(String name) {
920
return arguments.containsKey(name);
921
}
922
923
@Override
924
public String get(String name) {
925
return arguments.get(name);
926
}
927
928
public Map<String, String> getUpdatedArguments() {
929
return new HashMap<>(updates);
930
}
931
}
932
}
933
```