or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actions-conditions.mdbatch-processing.mdcore-pipeline.mddata-connectors.mdindex.mdjoin-operations.mdlineage-metadata.mdsql-engine.mdvalidation.md

actions-conditions.mddocs/

0

# Actions and Conditions

1

2

Pipeline actions and conditional execution for workflow control, external integrations, and dynamic pipeline behavior in CDAP ETL.

3

4

## Actions

5

6

### Action

7

8

Base abstract class for pipeline actions that execute custom logic or external operations.

9

10

```java { .api }

11

package io.cdap.cdap.etl.api.action;

12

13

public abstract class Action

14

implements PipelineConfigurable,

15

SubmitterLifecycle<ActionContext>,

16

StageLifecycle<ActionContext> {

17

18

public static final String PLUGIN_TYPE = "action";

19

20

// Configuration lifecycle

21

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {}

22

23

// Submission lifecycle

24

public void prepareRun(ActionContext context) throws Exception {}

25

public void onRunFinish(boolean succeeded, ActionContext context) {}

26

27

// Stage lifecycle

28

public void initialize(ActionContext context) throws Exception {}

29

public void destroy() {}

30

31

// Action execution

32

public abstract void run() throws Exception;

33

}

34

```

35

36

**Action Implementation Example:**

37

```java

38

@Plugin(type = Action.PLUGIN_TYPE)

39

@Name("DatabaseCleanup")

40

@Description("Cleans up old records from database tables")

41

public class DatabaseCleanupAction extends Action {

42

43

private final Config config;

44

private ActionContext actionContext;

45

46

@Override

47

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {

48

StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();

49

FailureCollector collector = stageConfigurer.getFailureCollector();

50

51

// Validate configuration

52

config.validate(collector);

53

}

54

55

@Override

56

public void prepareRun(ActionContext context) throws Exception {

57

// Validate database connectivity

58

try (Connection conn = getConnection()) {

59

if (!conn.isValid(30)) {

60

throw new Exception("Database connection is not valid");

61

}

62

}

63

}

64

65

@Override

66

public void initialize(ActionContext context) throws Exception {

67

this.actionContext = context;

68

}

69

70

@Override

71

public void run() throws Exception {

72

StageMetrics metrics = actionContext.getMetrics();

73

SettableArguments arguments = actionContext.getArguments();

74

75

try (Connection conn = getConnection()) {

76

for (String tableName : config.tablesToClean) {

77

long deletedRecords = cleanupTable(conn, tableName);

78

79

// Record metrics

80

metrics.count("records.deleted." + tableName, deletedRecords);

81

82

// Set runtime arguments for downstream stages

83

arguments.set("cleanup." + tableName + ".deleted", String.valueOf(deletedRecords));

84

85

LOG.info("Deleted {} old records from table {}", deletedRecords, tableName);

86

}

87

88

// Set overall completion status

89

arguments.set("cleanup.completed", "true");

90

arguments.set("cleanup.timestamp", Instant.now().toString());

91

92

} catch (SQLException e) {

93

metrics.count("cleanup.errors", 1);

94

throw new Exception("Database cleanup failed", e);

95

}

96

}

97

98

private long cleanupTable(Connection conn, String tableName) throws SQLException {

99

String deleteSQL = String.format(

100

"DELETE FROM %s WHERE created_date < ?",

101

tableName

102

);

103

104

try (PreparedStatement stmt = conn.prepareStatement(deleteSQL)) {

105

// Delete records older than retention days

106

Timestamp cutoffDate = Timestamp.from(

107

Instant.now().minus(config.retentionDays, ChronoUnit.DAYS)

108

);

109

stmt.setTimestamp(1, cutoffDate);

110

111

return stmt.executeUpdate();

112

}

113

}

114

115

private Connection getConnection() throws SQLException {

116

return DriverManager.getConnection(

117

config.connectionString,

118

config.username,

119

config.password

120

);

121

}

122

}

123

```

124

125

### ActionContext

126

127

Context interface for action execution providing access to runtime services.

128

129

```java { .api }

130

package io.cdap.cdap.etl.api.action;

131

132

public interface ActionContext extends StageContext {

133

/**

134

* Get settable arguments for passing data to other stages.

135

*/

136

SettableArguments getArguments();

137

}

138

```

139

140

### SettableArguments

141

142

Interface for arguments that can be modified by actions.

143

144

```java { .api }

145

package io.cdap.cdap.etl.api.action;

146

147

public interface SettableArguments extends Arguments {

148

/**

149

* Set argument value.

150

*/

151

void set(String name, String value);

152

}

153

```

154

155

## Advanced Action Examples

156

157

### File Processing Action

158

159

```java

160

@Plugin(type = Action.PLUGIN_TYPE)

161

@Name("FileProcessor")

162

@Description("Processes files and prepares them for pipeline consumption")

163

public class FileProcessorAction extends Action {

164

165

private final Config config;

166

167

@Override

168

public void run() throws Exception {

169

ActionContext context = getContext();

170

SettableArguments arguments = context.getArguments();

171

StageMetrics metrics = context.getMetrics();

172

173

// Get input directory

174

String inputDir = config.inputDirectory;

175

String outputDir = config.outputDirectory;

176

String archiveDir = config.archiveDirectory;

177

178

// Create directories if they don't exist

179

createDirectoryIfNotExists(outputDir);

180

createDirectoryIfNotExists(archiveDir);

181

182

// Process files

183

List<String> processedFiles = new ArrayList<>();

184

int totalFiles = 0;

185

int successfulFiles = 0;

186

int errorFiles = 0;

187

188

try (DirectoryStream<Path> stream = Files.newDirectoryStream(

189

Paths.get(inputDir), config.filePattern)) {

190

191

for (Path filePath : stream) {

192

totalFiles++;

193

String fileName = filePath.getFileName().toString();

194

195

try {

196

// Process individual file

197

boolean processed = processFile(filePath, outputDir);

198

199

if (processed) {

200

successfulFiles++;

201

processedFiles.add(fileName);

202

203

// Archive processed file

204

if (config.archiveProcessedFiles) {

205

archiveFile(filePath, archiveDir);

206

}

207

} else {

208

errorFiles++;

209

LOG.warn("Failed to process file: {}", fileName);

210

}

211

212

} catch (Exception e) {

213

errorFiles++;

214

LOG.error("Error processing file: {}", fileName, e);

215

216

// Move error files to error directory

217

if (config.errorDirectory != null) {

218

moveFileToErrorDirectory(filePath, config.errorDirectory);

219

}

220

}

221

}

222

}

223

224

// Record metrics

225

metrics.count("files.total", totalFiles);

226

metrics.count("files.successful", successfulFiles);

227

metrics.count("files.errors", errorFiles);

228

229

// Set arguments for downstream stages

230

arguments.set("processed.file.count", String.valueOf(successfulFiles));

231

arguments.set("processed.files", String.join(",", processedFiles));

232

arguments.set("output.directory", outputDir);

233

234

if (successfulFiles == 0 && config.failOnNoFiles) {

235

throw new Exception("No files were successfully processed");

236

}

237

238

LOG.info("Processed {} files successfully, {} errors out of {} total files",

239

successfulFiles, errorFiles, totalFiles);

240

}

241

242

private boolean processFile(Path inputFile, String outputDir) throws IOException {

243

String fileName = inputFile.getFileName().toString();

244

Path outputFile = Paths.get(outputDir, fileName);

245

246

// Apply file transformations based on type

247

String fileExtension = getFileExtension(fileName);

248

249

switch (fileExtension.toLowerCase()) {

250

case "csv":

251

return processCSVFile(inputFile, outputFile);

252

case "json":

253

return processJSONFile(inputFile, outputFile);

254

case "xml":

255

return processXMLFile(inputFile, outputFile);

256

default:

257

// For unknown types, just copy

258

Files.copy(inputFile, outputFile, StandardCopyOption.REPLACE_EXISTING);

259

return true;

260

}

261

}

262

263

private boolean processCSVFile(Path inputFile, Path outputFile) throws IOException {

264

// CSV-specific processing: validate format, clean data, etc.

265

try (BufferedReader reader = Files.newBufferedReader(inputFile);

266

BufferedWriter writer = Files.newBufferedWriter(outputFile)) {

267

268

String line;

269

int lineNumber = 0;

270

boolean hasHeader = config.csvHasHeader;

271

272

while ((line = reader.readLine()) != null) {

273

lineNumber++;

274

275

// Skip header if configured

276

if (hasHeader && lineNumber == 1) {

277

writer.write(line);

278

writer.newLine();

279

continue;

280

}

281

282

// Validate and clean CSV line

283

String cleanedLine = cleanCSVLine(line);

284

if (cleanedLine != null) {

285

writer.write(cleanedLine);

286

writer.newLine();

287

}

288

}

289

290

return true;

291

}

292

}

293

294

private String cleanCSVLine(String line) {

295

if (line == null || line.trim().isEmpty()) {

296

return null;

297

}

298

299

// Apply CSV cleaning rules

300

String[] fields = line.split(config.csvDelimiter);

301

302

// Validate field count

303

if (config.expectedFieldCount > 0 && fields.length != config.expectedFieldCount) {

304

LOG.warn("Invalid field count in CSV line: expected {}, got {}",

305

config.expectedFieldCount, fields.length);

306

return null;

307

}

308

309

// Clean individual fields

310

for (int i = 0; i < fields.length; i++) {

311

fields[i] = fields[i].trim();

312

313

// Remove quotes if present

314

if (fields[i].startsWith("\"") && fields[i].endsWith("\"")) {

315

fields[i] = fields[i].substring(1, fields[i].length() - 1);

316

}

317

}

318

319

return String.join(config.csvDelimiter, fields);

320

}

321

}

322

```

323

324

### External System Integration Action

325

326

```java

327

@Plugin(type = Action.PLUGIN_TYPE)

328

@Name("APINotification")

329

@Description("Sends notifications to external systems via REST API")

330

public class APINotificationAction extends Action {

331

332

private final Config config;

333

private HttpClient httpClient;

334

335

@Override

336

public void initialize(ActionContext context) throws Exception {

337

// Initialize HTTP client

338

this.httpClient = HttpClient.newBuilder()

339

.connectTimeout(Duration.ofSeconds(config.connectTimeoutSeconds))

340

.build();

341

}

342

343

@Override

344

public void run() throws Exception {

345

ActionContext context = getContext();

346

Arguments arguments = context.getArguments();

347

StageMetrics metrics = context.getMetrics();

348

349

// Build notification payload

350

Map<String, Object> payload = buildNotificationPayload(arguments);

351

352

// Convert to JSON

353

String jsonPayload = new Gson().toJson(payload);

354

355

// Send notification

356

HttpRequest request = HttpRequest.newBuilder()

357

.uri(URI.create(config.webhookUrl))

358

.header("Content-Type", "application/json")

359

.header("Authorization", "Bearer " + config.apiToken)

360

.POST(HttpRequest.BodyPublishers.ofString(jsonPayload))

361

.timeout(Duration.ofSeconds(config.requestTimeoutSeconds))

362

.build();

363

364

try {

365

HttpResponse<String> response = httpClient.send(request,

366

HttpResponse.BodyHandlers.ofString());

367

368

if (response.statusCode() >= 200 && response.statusCode() < 300) {

369

metrics.count("notifications.success", 1);

370

LOG.info("Notification sent successfully: {}", response.statusCode());

371

372

// Parse response if needed

373

if (config.parseResponse) {

374

parseAndSetResponse(response.body(), context.getArguments());

375

}

376

377

} else {

378

metrics.count("notifications.failure", 1);

379

String errorMsg = String.format("Notification failed with status %d: %s",

380

response.statusCode(), response.body());

381

382

if (config.failOnError) {

383

throw new Exception(errorMsg);

384

} else {

385

LOG.warn(errorMsg);

386

}

387

}

388

389

} catch (IOException | InterruptedException e) {

390

metrics.count("notifications.error", 1);

391

392

if (config.failOnError) {

393

throw new Exception("Failed to send notification", e);

394

} else {

395

LOG.error("Error sending notification", e);

396

}

397

}

398

}

399

400

private Map<String, Object> buildNotificationPayload(Arguments arguments) {

401

Map<String, Object> payload = new HashMap<>();

402

403

// Add basic pipeline information

404

payload.put("pipelineName", arguments.get("pipeline.name"));

405

payload.put("pipelineRunId", arguments.get("pipeline.run.id"));

406

payload.put("timestamp", Instant.now().toString());

407

408

// Add custom fields from configuration

409

if (config.customFields != null) {

410

for (Map.Entry<String, String> entry : config.customFields.entrySet()) {

411

String key = entry.getKey();

412

String template = entry.getValue();

413

414

// Resolve template variables

415

String value = resolveTemplate(template, arguments);

416

payload.put(key, value);

417

}

418

}

419

420

// Add dynamic arguments

421

if (config.includeArguments != null) {

422

Map<String, String> dynamicArgs = new HashMap<>();

423

for (String argName : config.includeArguments) {

424

String argValue = arguments.get(argName);

425

if (argValue != null) {

426

dynamicArgs.put(argName, argValue);

427

}

428

}

429

payload.put("arguments", dynamicArgs);

430

}

431

432

return payload;

433

}

434

435

private String resolveTemplate(String template, Arguments arguments) {

436

String resolved = template;

437

438

// Simple template variable resolution: ${variable.name}

439

Pattern pattern = Pattern.compile("\\$\\{([^}]+)\\}");

440

Matcher matcher = pattern.matcher(template);

441

442

while (matcher.find()) {

443

String variableName = matcher.group(1);

444

String variableValue = arguments.get(variableName);

445

446

if (variableValue != null) {

447

resolved = resolved.replace(matcher.group(0), variableValue);

448

}

449

}

450

451

return resolved;

452

}

453

454

@Override

455

public void destroy() {

456

// Cleanup HTTP client resources

457

if (httpClient != null) {

458

// HttpClient doesn't need explicit cleanup in Java 11+

459

}

460

}

461

}

462

```

463

464

## Conditions

465

466

### Condition

467

468

Base abstract class for conditional execution in pipelines.

469

470

```java { .api }

471

package io.cdap.cdap.etl.api.condition;

472

473

public abstract class Condition

474

implements PipelineConfigurable,

475

SubmitterLifecycle<ConditionContext>,

476

StageLifecycle<ConditionContext> {

477

478

public static final String PLUGIN_TYPE = "condition";

479

480

// Configuration lifecycle

481

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {}

482

483

// Submission lifecycle

484

public void prepareRun(ConditionContext context) throws Exception {}

485

public void onRunFinish(boolean succeeded, ConditionContext context) {}

486

487

// Stage lifecycle

488

public void initialize(ConditionContext context) throws Exception {}

489

public void destroy() {}

490

491

// Condition evaluation

492

public abstract ConditionResult apply() throws Exception;

493

}

494

```

495

496

**Condition Implementation Example:**

497

```java

498

@Plugin(type = Condition.PLUGIN_TYPE)

499

@Name("FileExistenceCondition")

500

@Description("Checks if specified files exist before proceeding")

501

public class FileExistenceCondition extends Condition {

502

503

private final Config config;

504

private ConditionContext conditionContext;

505

506

@Override

507

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {

508

StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();

509

FailureCollector collector = stageConfigurer.getFailureCollector();

510

511

config.validate(collector);

512

}

513

514

@Override

515

public void initialize(ConditionContext context) throws Exception {

516

this.conditionContext = context;

517

}

518

519

@Override

520

public ConditionResult apply() throws Exception {

521

StageMetrics metrics = conditionContext.getMetrics();

522

Arguments arguments = conditionContext.getArguments();

523

524

List<String> missingFiles = new ArrayList<>();

525

List<String> existingFiles = new ArrayList<>();

526

527

// Check each required file

528

for (String filePath : config.requiredFiles) {

529

// Resolve file path with runtime arguments

530

String resolvedPath = resolveFilePath(filePath, arguments);

531

532

if (fileExists(resolvedPath)) {

533

existingFiles.add(resolvedPath);

534

metrics.count("files.found", 1);

535

} else {

536

missingFiles.add(resolvedPath);

537

metrics.count("files.missing", 1);

538

}

539

}

540

541

// Determine condition result

542

boolean conditionMet = false;

543

String message;

544

545

switch (config.checkMode) {

546

case ALL_MUST_EXIST:

547

conditionMet = missingFiles.isEmpty();

548

message = conditionMet ?

549

"All required files exist" :

550

"Missing files: " + String.join(", ", missingFiles);

551

break;

552

553

case ANY_MUST_EXIST:

554

conditionMet = !existingFiles.isEmpty();

555

message = conditionMet ?

556

"Found files: " + String.join(", ", existingFiles) :

557

"No required files found";

558

break;

559

560

case NONE_MUST_EXIST:

561

conditionMet = existingFiles.isEmpty();

562

message = conditionMet ?

563

"No files exist (as expected)" :

564

"Unexpected files found: " + String.join(", ", existingFiles);

565

break;

566

567

default:

568

throw new IllegalArgumentException("Unknown check mode: " + config.checkMode);

569

}

570

571

LOG.info("File existence condition: {} - {}", conditionMet, message);

572

return new ConditionResult(conditionMet, message);

573

}

574

575

private String resolveFilePath(String filePath, Arguments arguments) {

576

String resolved = filePath;

577

578

// Replace runtime argument placeholders

579

Pattern pattern = Pattern.compile("\\$\\{([^}]+)\\}");

580

Matcher matcher = pattern.matcher(filePath);

581

582

while (matcher.find()) {

583

String argName = matcher.group(1);

584

String argValue = arguments.get(argName);

585

586

if (argValue != null) {

587

resolved = resolved.replace(matcher.group(0), argValue);

588

}

589

}

590

591

return resolved;

592

}

593

594

private boolean fileExists(String filePath) {

595

try {

596

Path path = Paths.get(filePath);

597

return Files.exists(path) && Files.isReadable(path);

598

} catch (Exception e) {

599

LOG.warn("Error checking file existence: {}", filePath, e);

600

return false;

601

}

602

}

603

}

604

```

605

606

### ConditionContext

607

608

Context interface for condition evaluation.

609

610

```java { .api }

611

package io.cdap.cdap.etl.api.condition;

612

613

public interface ConditionContext extends StageContext {

614

/**

615

* Get stage statistics for previous stages.

616

*/

617

StageStatistics getStageStatistics(String stageName);

618

}

619

```

620

621

### StageStatistics

622

623

Interface for accessing statistics from pipeline stages.

624

625

```java { .api }

626

package io.cdap.cdap.etl.api.condition;

627

628

public interface StageStatistics {

629

/**

630

* Get input record count.

631

*/

632

long getInputRecordsCount();

633

634

/**

635

* Get output record count.

636

*/

637

long getOutputRecordsCount();

638

639

/**

640

* Get error record count.

641

*/

642

long getErrorRecordsCount();

643

}

644

```

645

646

## Advanced Condition Examples

647

648

### Data Quality Condition

649

650

```java

651

@Plugin(type = Condition.PLUGIN_TYPE)

652

@Name("DataQualityCondition")

653

@Description("Checks data quality metrics before proceeding")

654

public class DataQualityCondition extends Condition {

655

656

private final Config config;

657

658

@Override

659

public ConditionResult apply() throws Exception {

660

ConditionContext context = getContext();

661

662

boolean qualityMet = true;

663

StringBuilder messageBuilder = new StringBuilder();

664

List<String> failures = new ArrayList<>();

665

666

// Check each configured stage's statistics

667

for (QualityCheck check : config.qualityChecks) {

668

StageStatistics stats = context.getStageStatistics(check.stageName);

669

670

if (stats == null) {

671

failures.add("No statistics available for stage: " + check.stageName);

672

qualityMet = false;

673

continue;

674

}

675

676

// Check error rate

677

if (check.maxErrorRate != null) {

678

long totalRecords = stats.getInputRecordsCount();

679

long errorRecords = stats.getErrorRecordsCount();

680

681

if (totalRecords > 0) {

682

double errorRate = (double) errorRecords / totalRecords;

683

if (errorRate > check.maxErrorRate) {

684

failures.add(String.format(

685

"Stage %s error rate %.2f%% exceeds limit %.2f%%",

686

check.stageName, errorRate * 100, check.maxErrorRate * 100

687

));

688

qualityMet = false;

689

}

690

}

691

}

692

693

// Check minimum record count

694

if (check.minRecordCount != null) {

695

long outputRecords = stats.getOutputRecordsCount();

696

if (outputRecords < check.minRecordCount) {

697

failures.add(String.format(

698

"Stage %s output count %d below minimum %d",

699

check.stageName, outputRecords, check.minRecordCount

700

));

701

qualityMet = false;

702

}

703

}

704

705

// Check maximum record count

706

if (check.maxRecordCount != null) {

707

long outputRecords = stats.getOutputRecordsCount();

708

if (outputRecords > check.maxRecordCount) {

709

failures.add(String.format(

710

"Stage %s output count %d exceeds maximum %d",

711

check.stageName, outputRecords, check.maxRecordCount

712

));

713

qualityMet = false;

714

}

715

}

716

}

717

718

String message;

719

if (qualityMet) {

720

message = "All data quality checks passed";

721

} else {

722

message = "Data quality failures: " + String.join("; ", failures);

723

}

724

725

return new ConditionResult(qualityMet, message);

726

}

727

728

private static class QualityCheck {

729

public String stageName;

730

public Double maxErrorRate;

731

public Long minRecordCount;

732

public Long maxRecordCount;

733

}

734

}

735

```

736

737

### Time-Based Condition

738

739

```java

740

@Plugin(type = Condition.PLUGIN_TYPE)

741

@Name("TimeWindowCondition")

742

@Description("Checks if current time is within allowed execution window")

743

public class TimeWindowCondition extends Condition {

744

745

private final Config config;

746

747

@Override

748

public ConditionResult apply() throws Exception {

749

ZonedDateTime now = ZonedDateTime.now(ZoneId.of(config.timezone));

750

751

// Check day of week

752

if (config.allowedDaysOfWeek != null && !config.allowedDaysOfWeek.isEmpty()) {

753

DayOfWeek currentDay = now.getDayOfWeek();

754

if (!config.allowedDaysOfWeek.contains(currentDay)) {

755

return new ConditionResult(false,

756

String.format("Current day %s not in allowed days: %s",

757

currentDay, config.allowedDaysOfWeek));

758

}

759

}

760

761

// Check time window

762

if (config.startTime != null && config.endTime != null) {

763

LocalTime currentTime = now.toLocalTime();

764

LocalTime startTime = LocalTime.parse(config.startTime);

765

LocalTime endTime = LocalTime.parse(config.endTime);

766

767

boolean inWindow;

768

if (startTime.isBefore(endTime)) {

769

// Normal window (e.g., 09:00-17:00)

770

inWindow = currentTime.isAfter(startTime) && currentTime.isBefore(endTime);

771

} else {

772

// Overnight window (e.g., 22:00-06:00)

773

inWindow = currentTime.isAfter(startTime) || currentTime.isBefore(endTime);

774

}

775

776

if (!inWindow) {

777

return new ConditionResult(false,

778

String.format("Current time %s not in allowed window %s-%s",

779

currentTime, config.startTime, config.endTime));

780

}

781

}

782

783

// Check date range

784

if (config.startDate != null || config.endDate != null) {

785

LocalDate currentDate = now.toLocalDate();

786

787

if (config.startDate != null) {

788

LocalDate startDate = LocalDate.parse(config.startDate);

789

if (currentDate.isBefore(startDate)) {

790

return new ConditionResult(false,

791

String.format("Current date %s before start date %s",

792

currentDate, config.startDate));

793

}

794

}

795

796

if (config.endDate != null) {

797

LocalDate endDate = LocalDate.parse(config.endDate);

798

if (currentDate.isAfter(endDate)) {

799

return new ConditionResult(false,

800

String.format("Current date %s after end date %s",

801

currentDate, config.endDate));

802

}

803

}

804

}

805

806

return new ConditionResult(true,

807

String.format("Time window check passed at %s", now));

808

}

809

}

810

```

811

812

## Workflow Integration

813

814

### Conditional Pipeline Execution

815

816

```java

817

public class ConditionalPipelineOrchestrator {

818

819

public static void executeConditionalStages(List<ConditionalStage> stages,

820

PipelineContext context) throws Exception {

821

822

for (ConditionalStage stage : stages) {

823

if (stage.hasCondition()) {

824

// Evaluate condition

825

ConditionResult result = stage.getCondition().apply();

826

827

if (result.isConditionMet()) {

828

LOG.info("Condition met for stage {}: {}",

829

stage.getName(), result.getMessage());

830

831

// Execute stage

832

executeStage(stage, context);

833

} else {

834

LOG.info("Condition not met for stage {}: {}",

835

stage.getName(), result.getMessage());

836

837

// Handle condition failure

838

if (stage.isRequired()) {

839

throw new Exception("Required stage condition failed: " +

840

stage.getName());

841

} else {

842

// Skip optional stage

843

continue;

844

}

845

}

846

} else {

847

// Execute unconditionally

848

executeStage(stage, context);

849

}

850

}

851

}

852

853

private static void executeStage(ConditionalStage stage, PipelineContext context)

854

throws Exception {

855

// Execute pre-stage actions

856

for (Action preAction : stage.getPreActions()) {

857

preAction.run();

858

}

859

860

// Execute main stage logic

861

stage.execute(context);

862

863

// Execute post-stage actions

864

for (Action postAction : stage.getPostActions()) {

865

postAction.run();

866

}

867

}

868

}

869

```

870

871

### Dynamic Argument Passing

872

873

```java

874

public class ArgumentChainProcessor {

875

876

public static void processArgumentChain(List<Action> actions,

877

Map<String, String> initialArguments)

878

throws Exception {

879

880

// Create mutable arguments map

881

Map<String, String> currentArguments = new HashMap<>(initialArguments);

882

883

for (Action action : actions) {

884

// Create settable arguments for this action

885

SettableArguments settableArgs = new MapBackedSettableArguments(currentArguments);

886

887

// Create action context with current arguments

888

ActionContext actionContext = createActionContext(settableArgs);

889

890

// Initialize and run action

891

action.initialize(actionContext);

892

try {

893

action.run();

894

} finally {

895

action.destroy();

896

}

897

898

// Update arguments for next action

899

currentArguments.putAll(settableArgs.getUpdatedArguments());

900

}

901

}

902

903

private static class MapBackedSettableArguments implements SettableArguments {

904

private final Map<String, String> arguments;

905

private final Map<String, String> updates;

906

907

public MapBackedSettableArguments(Map<String, String> arguments) {

908

this.arguments = new HashMap<>(arguments);

909

this.updates = new HashMap<>();

910

}

911

912

@Override

913

public void set(String name, String value) {

914

arguments.put(name, value);

915

updates.put(name, value);

916

}

917

918

@Override

919

public boolean has(String name) {

920

return arguments.containsKey(name);

921

}

922

923

@Override

924

public String get(String name) {

925

return arguments.get(name);

926

}

927

928

public Map<String, String> getUpdatedArguments() {

929

return new HashMap<>(updates);

930

}

931

}

932

}

933

```