or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-framework.mddata-management.mddata-processing.mdindex.mdoperational.mdplugin-system.mdsecurity-metadata.md

operational.mddocs/

0

# Operational APIs

1

2

CDAP provides comprehensive operational APIs for monitoring, scheduling, retry handling, and feature management. These APIs enable production-ready applications with enterprise-grade operational capabilities including metrics collection, automated scheduling, resilient error handling, and dynamic feature control.

3

4

## Metrics Collection

5

6

The metrics system provides real-time monitoring and observability for all application components.

7

8

### Metrics Interface

9

10

```java { .api }

11

import io.cdap.cdap.api.metrics.*;

12

import java.util.Map;

13

14

// Core metrics interface

15

public interface Metrics {

16

17

// Counter metrics - track counts and rates

18

void count(String metricName, int delta);

19

20

default void countLong(String metricName, long delta) {

21

if (delta < Integer.MIN_VALUE || delta > Integer.MAX_VALUE) {

22

throw new IllegalArgumentException("Invalid delta value for metrics count: " + delta);

23

}

24

count(metricName, (int) delta);

25

}

26

27

// Gauge metrics - track current values

28

void gauge(String metricName, long value);

29

30

// Tagged metrics - add context dimensions

31

Metrics child(Map<String, String> tags);

32

33

// Convenience methods

34

default void increment(String metricName) {

35

count(metricName, 1);

36

}

37

38

default void increment(String metricName, int delta) {

39

count(metricName, delta);

40

}

41

42

default void decrement(String metricName) {

43

count(metricName, -1);

44

}

45

46

default void decrement(String metricName, int delta) {

47

count(metricName, -delta);

48

}

49

}

50

51

// Runtime metrics for program execution monitoring

52

public interface RuntimeMetrics {

53

Map<String, Long> getAllCounters();

54

Map<String, Long> getAllGauges();

55

long getCounter(String name);

56

long getGauge(String name);

57

}

58

```

59

60

### Metrics Usage Examples

61

62

```java { .api }

63

// Service with comprehensive metrics

64

public class DataProcessingService extends AbstractHttpServiceHandler {

65

66

private Metrics metrics;

67

68

@Override

69

public void initialize(HttpServiceContext context) throws Exception {

70

super.initialize(context);

71

this.metrics = context.getMetrics();

72

}

73

74

@GET

75

@Path("/process/{id}")

76

public void processData(HttpServiceRequest request, HttpServiceResponder responder,

77

@PathParam("id") String dataId) {

78

79

// Track request metrics

80

metrics.increment("requests.total");

81

metrics.increment("requests.process_data");

82

83

long startTime = System.currentTimeMillis();

84

85

try {

86

// Process data with detailed metrics

87

ProcessingResult result = processDataWithMetrics(dataId);

88

89

// Success metrics

90

metrics.increment("requests.success");

91

metrics.gauge("processing.last_success_time", System.currentTimeMillis());

92

metrics.gauge("processing.records_processed", result.getRecordCount());

93

metrics.gauge("processing.data_size_mb", result.getDataSizeMB());

94

95

responder.sendJson(200, result.toJson());

96

97

} catch (ValidationException e) {

98

// Validation error metrics

99

metrics.increment("errors.validation");

100

Metrics errorMetrics = metrics.child(Map.of("error_type", "validation"));

101

errorMetrics.increment("error_count");

102

103

responder.sendError(400, "Validation failed: " + e.getMessage());

104

105

} catch (ProcessingException e) {

106

// Processing error metrics

107

metrics.increment("errors.processing");

108

Metrics errorMetrics = metrics.child(Map.of("error_type", "processing"));

109

errorMetrics.increment("error_count");

110

111

responder.sendError(500, "Processing failed: " + e.getMessage());

112

113

} catch (Exception e) {

114

// General error metrics

115

metrics.increment("errors.unknown");

116

Metrics errorMetrics = metrics.child(Map.of("error_type", "unknown"));

117

errorMetrics.increment("error_count");

118

119

responder.sendError(500, "Internal error: " + e.getMessage());

120

121

} finally {

122

// Response time metrics

123

long duration = System.currentTimeMillis() - startTime;

124

metrics.gauge("processing.response_time_ms", duration);

125

126

// Update running averages

127

updateResponseTimeAverage(duration);

128

}

129

}

130

131

private ProcessingResult processDataWithMetrics(String dataId) throws Exception {

132

Table dataTable = getContext().getDataset("input_data");

133

Table resultsTable = getContext().getDataset("processed_results");

134

135

// Data access metrics

136

metrics.increment("dataset.reads");

137

long readStart = System.currentTimeMillis();

138

139

Row inputRow = dataTable.get(Bytes.toBytes(dataId));

140

if (inputRow.isEmpty()) {

141

metrics.increment("dataset.read_misses");

142

throw new ValidationException("Data not found: " + dataId);

143

}

144

145

metrics.gauge("dataset.read_latency_ms", System.currentTimeMillis() - readStart);

146

147

// Processing metrics

148

metrics.increment("processing.operations");

149

long processStart = System.currentTimeMillis();

150

151

ProcessingResult result = performDataTransformation(inputRow);

152

153

metrics.gauge("processing.operation_latency_ms", System.currentTimeMillis() - processStart);

154

155

// Data write metrics

156

metrics.increment("dataset.writes");

157

long writeStart = System.currentTimeMillis();

158

159

Put outputPut = new Put(Bytes.toBytes(dataId + "_processed"));

160

outputPut.add("result", "data", result.getData());

161

outputPut.add("result", "timestamp", System.currentTimeMillis());

162

outputPut.add("result", "record_count", result.getRecordCount());

163

164

resultsTable.put(outputPut);

165

166

metrics.gauge("dataset.write_latency_ms", System.currentTimeMillis() - writeStart);

167

168

return result;

169

}

170

171

private ProcessingResult performDataTransformation(Row inputRow) {

172

String inputData = inputRow.getString("data");

173

174

// Business logic metrics

175

metrics.increment("business_logic.transformations");

176

177

// Simulate complex processing with metrics

178

int recordCount = 0;

179

long dataSize = 0;

180

181

if (inputData != null) {

182

String[] records = inputData.split("\n");

183

recordCount = records.length;

184

dataSize = inputData.length();

185

186

// Track processing quality metrics

187

int validRecords = 0;

188

for (String record : records) {

189

if (isValidRecord(record)) {

190

validRecords++;

191

}

192

}

193

194

metrics.gauge("data_quality.valid_record_ratio",

195

(double) validRecords / recordCount);

196

metrics.gauge("data_quality.invalid_records", recordCount - validRecords);

197

}

198

199

return new ProcessingResult(inputData, recordCount, dataSize);

200

}

201

202

private void updateResponseTimeAverage(long duration) {

203

// Calculate and emit rolling averages

204

// This could use a sliding window or exponential moving average

205

metrics.gauge("processing.avg_response_time_ms", calculateAverage(duration));

206

}

207

208

private boolean isValidRecord(String record) {

209

return record != null && !record.trim().isEmpty() && record.contains(",");

210

}

211

212

private long calculateAverage(long newValue) {

213

// Implementation for calculating rolling average

214

return newValue; // Simplified

215

}

216

}

217

218

// Worker with operational metrics

219

public class DataMonitoringWorker extends AbstractWorker {

220

221

private volatile boolean running = false;

222

private Metrics metrics;

223

224

@Override

225

public void configure(WorkerConfigurer configurer) {

226

configurer.setName("DataMonitoringWorker");

227

configurer.setDescription("Monitors data pipeline health and performance");

228

}

229

230

@Override

231

public void initialize(WorkerContext context) throws Exception {

232

super.initialize(context);

233

this.metrics = context.getMetrics();

234

this.running = true;

235

}

236

237

@Override

238

public void run() throws Exception {

239

while (running) {

240

try {

241

// System health metrics

242

collectSystemHealthMetrics();

243

244

// Data pipeline metrics

245

collectDataPipelineMetrics();

246

247

// Application-specific metrics

248

collectApplicationMetrics();

249

250

// Sleep between collection cycles

251

Thread.sleep(30000); // 30 seconds

252

253

} catch (InterruptedException e) {

254

Thread.currentThread().interrupt();

255

break;

256

} catch (Exception e) {

257

LOG.error("Error collecting metrics", e);

258

metrics.increment("metrics_collection.errors");

259

260

// Continue operation even if metrics collection fails

261

Thread.sleep(10000);

262

}

263

}

264

}

265

266

@Override

267

public void stop() {

268

running = false;

269

}

270

271

private void collectSystemHealthMetrics() {

272

WorkerContext context = getContext();

273

274

// JVM metrics

275

Runtime runtime = Runtime.getRuntime();

276

long totalMemory = runtime.totalMemory();

277

long freeMemory = runtime.freeMemory();

278

long usedMemory = totalMemory - freeMemory;

279

280

metrics.gauge("system.memory.total_mb", totalMemory / (1024 * 1024));

281

metrics.gauge("system.memory.used_mb", usedMemory / (1024 * 1024));

282

metrics.gauge("system.memory.free_mb", freeMemory / (1024 * 1024));

283

metrics.gauge("system.memory.usage_percent", (usedMemory * 100) / totalMemory);

284

285

// Thread metrics

286

metrics.gauge("system.threads.active", Thread.activeCount());

287

288

// Garbage collection metrics (if available)

289

collectGCMetrics();

290

}

291

292

private void collectDataPipelineMetrics() throws Exception {

293

WorkerContext context = getContext();

294

295

// Check dataset sizes and health

296

checkDatasetMetrics("input_data", "input");

297

checkDatasetMetrics("processed_results", "output");

298

checkDatasetMetrics("error_records", "errors");

299

300

// Check processing lag

301

checkProcessingLag();

302

303

// Check data quality trends

304

checkDataQualityMetrics();

305

}

306

307

private void checkDatasetMetrics(String datasetName, String type) {

308

try {

309

Table dataset = getContext().getDataset(datasetName);

310

311

// Count records (simplified - in practice might sample)

312

long recordCount = countTableRecords(dataset);

313

metrics.gauge(String.format("dataset.%s.record_count", type), recordCount);

314

315

// Check recent activity

316

long recentRecords = countRecentRecords(dataset, System.currentTimeMillis() - 3600000); // Last hour

317

metrics.gauge(String.format("dataset.%s.recent_records", type), recentRecords);

318

319

metrics.increment(String.format("dataset.%s.health_checks", type));

320

321

} catch (Exception e) {

322

LOG.error("Failed to check metrics for dataset: {}", datasetName, e);

323

metrics.increment(String.format("dataset.%s.health_check_errors", type));

324

}

325

}

326

327

private void checkProcessingLag() {

328

// Calculate processing lag between input and output

329

// This is a simplified example

330

long inputCount = getDatasetRecordCount("input_data");

331

long outputCount = getDatasetRecordCount("processed_results");

332

long processingLag = inputCount - outputCount;

333

334

metrics.gauge("pipeline.processing_lag", Math.max(0, processingLag));

335

336

if (processingLag > 1000) {

337

metrics.increment("pipeline.high_lag_alerts");

338

}

339

}

340

341

private void checkDataQualityMetrics() {

342

// Check error rates and data quality trends

343

long errorCount = getDatasetRecordCount("error_records");

344

long totalProcessed = getDatasetRecordCount("processed_results");

345

346

if (totalProcessed > 0) {

347

double errorRate = (double) errorCount / (totalProcessed + errorCount);

348

metrics.gauge("data_quality.error_rate", (long) (errorRate * 10000)); // Store as basis points

349

350

if (errorRate > 0.05) { // 5% error rate threshold

351

metrics.increment("data_quality.high_error_rate_alerts");

352

}

353

}

354

}

355

356

private void collectApplicationMetrics() {

357

WorkerContext context = getContext();

358

359

// Application runtime metrics

360

long uptime = System.currentTimeMillis() - context.getLogicalStartTime();

361

metrics.gauge("application.uptime_minutes", uptime / (60 * 1000));

362

363

// Instance metrics

364

metrics.gauge("application.worker_instance_id", context.getInstanceId());

365

metrics.gauge("application.worker_instance_count", context.getInstanceCount());

366

367

// Runtime arguments metrics (for configuration tracking)

368

Map<String, String> runtimeArgs = context.getRuntimeArguments();

369

metrics.gauge("application.runtime_args_count", runtimeArgs.size());

370

}

371

372

private void collectGCMetrics() {

373

// Implementation for garbage collection metrics

374

// Would use JMX beans to get GC statistics

375

}

376

377

private long countTableRecords(Table table) {

378

// Simplified record counting - in practice might use sampling

379

long count = 0;

380

try (Scanner scanner = table.scan(null, null)) {

381

while (scanner.next() != null) {

382

count++;

383

if (count % 10000 == 0) {

384

// Emit progress for large datasets

385

metrics.gauge("metrics_collection.scan_progress", count);

386

}

387

}

388

} catch (Exception e) {

389

LOG.warn("Error counting table records", e);

390

}

391

return count;

392

}

393

394

private long countRecentRecords(Table table, long sinceTimestamp) {

395

// Count records created since timestamp

396

long count = 0;

397

try (Scanner scanner = table.scan(null, null)) {

398

Row row;

399

while ((row = scanner.next()) != null) {

400

Long timestamp = row.getLong("timestamp");

401

if (timestamp != null && timestamp > sinceTimestamp) {

402

count++;

403

}

404

}

405

} catch (Exception e) {

406

LOG.warn("Error counting recent records", e);

407

}

408

return count;

409

}

410

411

private long getDatasetRecordCount(String datasetName) {

412

try {

413

Table dataset = getContext().getDataset(datasetName);

414

return countTableRecords(dataset);

415

} catch (Exception e) {

416

LOG.error("Failed to get record count for dataset: {}", datasetName, e);

417

return 0;

418

}

419

}

420

}

421

```

422

423

## Scheduling System

424

425

CDAP provides a flexible scheduling system for automating program execution based on time, data availability, or other program completion events.

426

427

### Scheduling Interfaces

428

429

```java { .api }

430

import io.cdap.cdap.api.schedule.*;

431

432

// Schedule builder for creating schedules

433

public interface ScheduleBuilder {

434

435

// Set schedule name and description

436

ScheduleBuilder setName(String name);

437

ScheduleBuilder setDescription(String description);

438

439

// Configure schedule properties

440

ScheduleBuilder setProperties(Map<String, String> properties);

441

ScheduleBuilder setProperty(String key, String value);

442

443

// Set schedule constraints

444

ScheduleBuilder setMaxConcurrentRuns(int maxConcurrentRuns);

445

446

// Build the schedule

447

Schedule build();

448

}

449

450

// Trigger factory for creating different trigger types

451

public final class TriggerFactory {

452

453

// Time-based triggers

454

public static Trigger byTime(String cronExpression) { /* create cron trigger */ }

455

public static Trigger byFrequency(Duration duration) { /* create frequency trigger */ }

456

457

// Data-based triggers

458

public static Trigger onDataAvailable(String datasetName) { /* create data trigger */ }

459

public static Trigger onPartitionAvailable(String datasetName, int numPartitions) { /* create partition trigger */ }

460

461

// Program status triggers

462

public static Trigger onProgramStatus(ProgramType programType, String applicationName,

463

String programName, Set<ProgramStatus> programStatuses) {

464

/* create program status trigger */

465

}

466

467

// Composite triggers

468

public static Trigger and(Trigger... triggers) { /* create AND trigger */ }

469

public static Trigger or(Trigger... triggers) { /* create OR trigger */ }

470

}

471

472

// Trigger information interfaces

473

public interface TriggerInfo {

474

String getName();

475

String getDescription();

476

TriggerType getType();

477

Map<String, String> getProperties();

478

}

479

480

public interface ProgramStatusTriggerInfo extends TriggerInfo {

481

String getApplicationName();

482

String getProgramName();

483

ProgramType getProgramType();

484

Set<ProgramStatus> getProgramStatuses();

485

}

486

487

public interface PartitionTriggerInfo extends TriggerInfo {

488

String getDatasetName();

489

int getNumPartitions();

490

}

491

492

// Triggering schedule information

493

public interface TriggeringScheduleInfo {

494

String getName();

495

String getDescription();

496

List<TriggerInfo> getTriggerInfos();

497

Map<String, String> getProperties();

498

}

499

```

500

501

### Scheduling Examples

502

503

```java { .api }

504

// Application with comprehensive scheduling

505

public class ScheduledDataPipelineApp extends AbstractApplication {

506

507

@Override

508

public void configure(ApplicationConfigurer configurer, ApplicationContext context) {

509

configurer.setName("ScheduledDataPipeline");

510

configurer.setDescription("Data pipeline with various scheduling patterns");

511

512

// Add programs

513

configurer.addMapReduce(new DailyETLMapReduce());

514

configurer.addSpark(new RealTimeAggregationSpark());

515

configurer.addWorkflow(new DataProcessingWorkflow());

516

configurer.addWorker(new DataValidationWorker());

517

518

// Time-based scheduling - Daily ETL at 2 AM

519

configurer.schedule(

520

ScheduleBuilder.create("daily-etl-schedule")

521

.setDescription("Daily ETL processing at 2 AM")

522

.triggerByTime("0 2 * * *") // Cron: 2 AM daily

523

.setMaxConcurrentRuns(1)

524

.setProperty("processing.mode", "batch")

525

.setProperty("data.retention.days", "90")

526

.build()

527

.programName("DailyETLMapReduce")

528

);

529

530

// Frequency-based scheduling - Hourly aggregation

531

configurer.schedule(

532

ScheduleBuilder.create("hourly-aggregation-schedule")

533

.setDescription("Hourly real-time data aggregation")

534

.triggerByFrequency(Duration.ofHours(1))

535

.setMaxConcurrentRuns(2)

536

.setProperty("aggregation.window", "1h")

537

.build()

538

.programName("RealTimeAggregationSpark")

539

);

540

541

// Data availability scheduling - Process when new data arrives

542

configurer.schedule(

543

ScheduleBuilder.create("data-driven-schedule")

544

.setDescription("Process workflow when new partitions are available")

545

.triggerOnDataAvailable("incoming_data", 3) // Wait for 3 new partitions

546

.setMaxConcurrentRuns(1)

547

.setProperty("processing.trigger", "data-availability")

548

.build()

549

.programName("DataProcessingWorkflow")

550

);

551

552

// Program dependency scheduling - Start validation after ETL completes

553

configurer.schedule(

554

ScheduleBuilder.create("post-etl-validation-schedule")

555

.setDescription("Run validation after ETL completion")

556

.triggerOnProgramStatus(

557

ProgramType.MAPREDUCE,

558

"ScheduledDataPipeline",

559

"DailyETLMapReduce",

560

Set.of(ProgramStatus.COMPLETED)

561

)

562

.setMaxConcurrentRuns(1)

563

.setProperty("validation.mode", "post-processing")

564

.build()

565

.programName("DataValidationWorker")

566

);

567

568

// Complex composite scheduling - Multiple conditions

569

configurer.schedule(

570

ScheduleBuilder.create("complex-schedule")

571

.setDescription("Complex trigger combining time and data availability")

572

.triggerByComposite(

573

TriggerFactory.and(

574

TriggerFactory.byTime("0 */6 * * *"), // Every 6 hours

575

TriggerFactory.onDataAvailable("quality_metrics") // And when quality data available

576

)

577

)

578

.setMaxConcurrentRuns(1)

579

.setProperty("processing.type", "quality-analysis")

580

.build()

581

.programName("DataProcessingWorkflow")

582

);

583

}

584

}

585

586

// Workflow that responds to schedule context

587

public class DataProcessingWorkflow extends AbstractWorkflow {

588

589

@Override

590

public void configure(WorkflowConfigurer configurer) {

591

configurer.setName("DataProcessingWorkflow");

592

configurer.setDescription("Processes data based on schedule triggers");

593

594

// Add conditional processing based on schedule properties

595

configurer.addAction(new ScheduleAwareAction());

596

597

configurer.condition(new ScheduleBasedCondition())

598

.addMapReduce("BatchDataProcessor")

599

.addSpark("AdvancedAnalyticsSpark")

600

.otherwise()

601

.addAction(new LightweightProcessingAction())

602

.end();

603

604

configurer.addAction(new CompletionNotificationAction());

605

}

606

607

// Custom action that adapts behavior based on schedule

608

public static class ScheduleAwareAction extends AbstractCustomAction {

609

610

@Override

611

public void configure(CustomActionConfigurer configurer) {

612

configurer.setName("ScheduleAwareAction");

613

configurer.setDescription("Adapts processing based on triggering schedule");

614

}

615

616

@Override

617

public void run(CustomActionContext context) throws Exception {

618

WorkflowToken token = context.getWorkflowToken();

619

620

// Get schedule information

621

TriggeringScheduleInfo scheduleInfo = context.getTriggeringScheduleInfo();

622

623

if (scheduleInfo != null) {

624

String scheduleName = scheduleInfo.getName();

625

token.put("schedule.name", scheduleName);

626

token.put("schedule.description", scheduleInfo.getDescription());

627

628

// Adapt processing based on schedule properties

629

Map<String, String> scheduleProperties = scheduleInfo.getProperties();

630

String processingMode = scheduleProperties.get("processing.mode");

631

632

if ("batch".equals(processingMode)) {

633

// Configure for batch processing

634

token.put("processing.batch_size", "10000");

635

token.put("processing.parallelism", "high");

636

token.put("processing.memory_limit", "8GB");

637

} else {

638

// Configure for real-time processing

639

token.put("processing.batch_size", "1000");

640

token.put("processing.parallelism", "medium");

641

token.put("processing.memory_limit", "2GB");

642

}

643

644

// Handle different trigger types

645

List<TriggerInfo> triggers = scheduleInfo.getTriggerInfos();

646

for (TriggerInfo trigger : triggers) {

647

processTriggerInfo(trigger, token, context);

648

}

649

650

context.getMetrics().increment("schedule.triggered_runs");

651

Metrics scheduleMetrics = context.getMetrics().child(

652

Map.of("schedule_name", scheduleName)

653

);

654

scheduleMetrics.increment("executions");

655

656

} else {

657

// Manual execution

658

token.put("execution.type", "manual");

659

token.put("processing.batch_size", "5000");

660

context.getMetrics().increment("schedule.manual_runs");

661

}

662

}

663

664

private void processTriggerInfo(TriggerInfo trigger, WorkflowToken token,

665

CustomActionContext context) {

666

667

token.put("trigger.type", trigger.getType().name());

668

token.put("trigger.name", trigger.getName());

669

670

switch (trigger.getType()) {

671

case TIME:

672

// Time-based trigger processing

673

token.put("trigger.execution_time", String.valueOf(System.currentTimeMillis()));

674

break;

675

676

case PARTITION:

677

// Data partition trigger processing

678

if (trigger instanceof PartitionTriggerInfo) {

679

PartitionTriggerInfo partitionTrigger = (PartitionTriggerInfo) trigger;

680

token.put("trigger.dataset", partitionTrigger.getDatasetName());

681

token.put("trigger.partitions", String.valueOf(partitionTrigger.getNumPartitions()));

682

}

683

break;

684

685

case PROGRAM_STATUS:

686

// Program status trigger processing

687

if (trigger instanceof ProgramStatusTriggerInfo) {

688

ProgramStatusTriggerInfo statusTrigger = (ProgramStatusTriggerInfo) trigger;

689

token.put("trigger.program", statusTrigger.getProgramName());

690

token.put("trigger.application", statusTrigger.getApplicationName());

691

token.put("trigger.program_type", statusTrigger.getProgramType().name());

692

}

693

break;

694

695

default:

696

LOG.warn("Unknown trigger type: {}", trigger.getType());

697

}

698

}

699

}

700

701

// Condition that makes decisions based on schedule context

702

public static class ScheduleBasedCondition implements Predicate<WorkflowContext> {

703

704

@Override

705

public boolean apply(WorkflowContext context) {

706

WorkflowToken token = context.getToken();

707

708

// Decision based on schedule properties

709

String processingMode = token.get("processing.mode").toString();

710

String triggerType = token.get("trigger.type").toString();

711

712

// Use heavy processing for batch schedules or time-based triggers

713

return "batch".equals(processingMode) || "TIME".equals(triggerType);

714

}

715

}

716

}

717

```

718

719

## Retry Policies and Error Handling

720

721

CDAP provides robust retry mechanisms and error handling patterns for building resilient applications.

722

723

### Retry Framework

724

725

```java { .api }

726

import io.cdap.cdap.api.retry.*;

727

728

// Retryable exception marker

729

public class RetryableException extends Exception {

730

public RetryableException(String message) { super(message); }

731

public RetryableException(String message, Throwable cause) { super(message, cause); }

732

}

733

734

// Exception when retries are exhausted

735

public class RetriesExhaustedException extends Exception {

736

private final int attemptsMade;

737

private final Exception lastException;

738

739

public RetriesExhaustedException(int attemptsMade, Exception lastException) {

740

super(String.format("Retries exhausted after %d attempts", attemptsMade), lastException);

741

this.attemptsMade = attemptsMade;

742

this.lastException = lastException;

743

}

744

745

public int getAttemptsMade() { return attemptsMade; }

746

public Exception getLastException() { return lastException; }

747

}

748

749

// Idempotency levels for retry safety

750

public enum Idempotency {

751

IDEMPOTENT, // Safe to retry without side effects

752

NOT_IDEMPOTENT, // Retries may cause duplicate side effects

753

UNKNOWN // Idempotency is unknown

754

}

755

```

756

757

### Retry Implementation Examples

758

759

```java { .api }

760

// Service with comprehensive retry logic

761

public class ResilientDataService extends AbstractHttpServiceHandler {

762

763

private static final int MAX_RETRIES = 3;

764

private static final long INITIAL_DELAY_MS = 1000;

765

private static final double BACKOFF_MULTIPLIER = 2.0;

766

767

@POST

768

@Path("/process")

769

public void processWithRetries(HttpServiceRequest request, HttpServiceResponder responder) {

770

Metrics metrics = getContext().getMetrics();

771

772

try {

773

String content = Charset.forName("UTF-8").decode(

774

ByteBuffer.wrap(request.getContent())).toString();

775

776

ProcessingResult result = executeWithRetries(

777

() -> processData(content),

778

MAX_RETRIES,

779

INITIAL_DELAY_MS,

780

metrics

781

);

782

783

responder.sendJson(200, result.toJson());

784

785

} catch (RetriesExhaustedException e) {

786

LOG.error("Processing failed after {} retries", e.getAttemptsMade(), e);

787

metrics.increment("processing.retries_exhausted");

788

responder.sendError(500, "Processing failed after retries: " + e.getLastException().getMessage());

789

790

} catch (Exception e) {

791

LOG.error("Processing failed with non-retryable error", e);

792

metrics.increment("processing.non_retryable_errors");

793

responder.sendError(500, "Processing failed: " + e.getMessage());

794

}

795

}

796

797

private <T> T executeWithRetries(RetryableOperation<T> operation, int maxRetries,

798

long initialDelayMs, Metrics metrics)

799

throws RetriesExhaustedException {

800

801

Exception lastException = null;

802

long delay = initialDelayMs;

803

804

for (int attempt = 1; attempt <= maxRetries; attempt++) {

805

try {

806

T result = operation.execute();

807

808

if (attempt > 1) {

809

metrics.increment("retries.successful");

810

metrics.gauge("retries.attempts_before_success", attempt);

811

}

812

813

return result;

814

815

} catch (RetryableException e) {

816

lastException = e;

817

metrics.increment("retries.retryable_failures");

818

819

LOG.warn("Retryable error on attempt {} of {}: {}", attempt, maxRetries, e.getMessage());

820

821

if (attempt < maxRetries) {

822

try {

823

Thread.sleep(delay);

824

delay = (long) (delay * BACKOFF_MULTIPLIER);

825

} catch (InterruptedException ie) {

826

Thread.currentThread().interrupt();

827

throw new RetriesExhaustedException(attempt, e);

828

}

829

}

830

831

} catch (Exception e) {

832

// Non-retryable exception

833

metrics.increment("retries.non_retryable_failures");

834

throw e;

835

}

836

}

837

838

metrics.increment("retries.exhausted");

839

throw new RetriesExhaustedException(maxRetries, lastException);

840

}

841

842

private ProcessingResult processData(String data) throws RetryableException {

843

try {

844

// Simulate processing that might fail transiently

845

if (isTransientFailureCondition()) {

846

throw new RetryableException("Transient processing failure - external service unavailable");

847

}

848

849

// Perform actual processing

850

return performProcessing(data);

851

852

} catch (NetworkException e) {

853

// Network issues are typically retryable

854

throw new RetryableException("Network error during processing", e);

855

856

} catch (TemporaryResourceException e) {

857

// Temporary resource issues are retryable

858

throw new RetryableException("Temporary resource unavailable", e);

859

860

} catch (ValidationException e) {

861

// Validation errors are not retryable

862

throw e;

863

}

864

}

865

866

private boolean isTransientFailureCondition() {

867

// Simulate transient failure conditions

868

return Math.random() < 0.3; // 30% chance of transient failure

869

}

870

871

private ProcessingResult performProcessing(String data) throws ValidationException {

872

if (data == null || data.trim().isEmpty()) {

873

throw new ValidationException("Input data cannot be empty");

874

}

875

876

// Simulate processing

877

return new ProcessingResult("processed: " + data, 1, data.length());

878

}

879

880

@FunctionalInterface

881

private interface RetryableOperation<T> {

882

T execute() throws Exception;

883

}

884

}

885

886

// MapReduce with retry logic for external system integration

887

public class ResilientDataExtractionMapReduce extends AbstractMapReduce {

888

889

public static class ExternalDataMapper extends Mapper<byte[], Row, Text, Text> {

890

891

private ExternalDataClient externalClient;

892

private Metrics metrics;

893

private int maxRetries;

894

private long retryDelay;

895

896

@Override

897

protected void setup(Context context) throws IOException, InterruptedException {

898

Configuration conf = context.getConfiguration();

899

900

// Initialize external client with retry configuration

901

externalClient = new ExternalDataClient(conf.get("external.service.url"));

902

maxRetries = conf.getInt("retry.max_attempts", 3);

903

retryDelay = conf.getLong("retry.initial_delay_ms", 1000);

904

905

// Get metrics from context

906

MapReduceTaskContext<?, ?> taskContext = (MapReduceTaskContext<?, ?>) context;

907

metrics = taskContext.getMetrics();

908

}

909

910

@Override

911

protected void map(byte[] key, Row row, Context context)

912

throws IOException, InterruptedException {

913

914

String recordId = row.getString("id");

915

916

try {

917

// Attempt to enrich data with external service

918

String enrichedData = executeWithRetries(() -> {

919

return externalClient.fetchData(recordId);

920

});

921

922

// Output successful result

923

context.write(new Text(recordId), new Text(enrichedData));

924

metrics.increment("external_calls.success");

925

926

} catch (RetriesExhaustedException e) {

927

// Handle exhausted retries

928

LOG.error("Failed to fetch external data for record {} after {} retries",

929

recordId, e.getAttemptsMade(), e);

930

931

metrics.increment("external_calls.retries_exhausted");

932

933

// Write to error output or skip record

934

context.write(new Text("ERROR:" + recordId),

935

new Text("Failed after retries: " + e.getLastException().getMessage()));

936

937

} catch (Exception e) {

938

// Handle non-retryable errors

939

LOG.error("Non-retryable error for record {}", recordId, e);

940

metrics.increment("external_calls.non_retryable_errors");

941

942

context.write(new Text("ERROR:" + recordId),

943

new Text("Non-retryable error: " + e.getMessage()));

944

}

945

}

946

947

private String executeWithRetries(ExternalDataOperation operation)

948

throws RetriesExhaustedException {

949

950

Exception lastException = null;

951

long delay = retryDelay;

952

953

for (int attempt = 1; attempt <= maxRetries; attempt++) {

954

try {

955

String result = operation.execute();

956

957

if (attempt > 1) {

958

metrics.increment("external_calls.retry_success");

959

metrics.gauge("external_calls.attempts_before_success", attempt);

960

}

961

962

return result;

963

964

} catch (ExternalServiceException e) {

965

lastException = e;

966

967

if (e.isRetryable()) {

968

metrics.increment("external_calls.retryable_failures");

969

LOG.warn("Retryable external service error on attempt {} of {}: {}",

970

attempt, maxRetries, e.getMessage());

971

972

if (attempt < maxRetries) {

973

try {

974

Thread.sleep(delay);

975

delay *= 2; // Exponential backoff

976

} catch (InterruptedException ie) {

977

Thread.currentThread().interrupt();

978

throw new RetriesExhaustedException(attempt, e);

979

}

980

}

981

} else {

982

// Non-retryable external service error

983

throw e;

984

}

985

986

} catch (Exception e) {

987

// Other non-retryable exceptions

988

throw e;

989

}

990

}

991

992

throw new RetriesExhaustedException(maxRetries, lastException);

993

}

994

995

@FunctionalInterface

996

private interface ExternalDataOperation {

997

String execute() throws ExternalServiceException;

998

}

999

1000

@Override

1001

protected void cleanup(Context context) throws IOException, InterruptedException {

1002

if (externalClient != null) {

1003

externalClient.close();

1004

}

1005

}

1006

}

1007

}

1008

```

1009

1010

## Feature Flags

1011

1012

CDAP provides feature flag support for dynamic application behavior control.

1013

1014

### Feature Flags Interface

1015

1016

```java { .api }

1017

import io.cdap.cdap.api.feature.*;

1018

1019

// Feature flags provider interface

1020

public interface FeatureFlagsProvider {

1021

boolean isFeatureEnabled(String featureName);

1022

1023

default boolean isFeatureEnabled(String featureName, boolean defaultValue) {

1024

try {

1025

return isFeatureEnabled(featureName);

1026

} catch (Exception e) {

1027

return defaultValue;

1028

}

1029

}

1030

}

1031

```

1032

1033

### Feature Flags Usage

1034

1035

```java { .api }

1036

// Service with feature-controlled behavior

1037

public class FeatureControlledService extends AbstractHttpServiceHandler {

1038

1039

@GET

1040

@Path("/data/{id}")

1041

public void getData(HttpServiceRequest request, HttpServiceResponder responder,

1042

@PathParam("id") String dataId) {

1043

1044

FeatureFlagsProvider featureFlags = getContext();

1045

1046

try {

1047

// Core data retrieval

1048

JsonObject data = retrieveBaseData(dataId);

1049

1050

// Feature-controlled enhancements

1051

if (featureFlags.isFeatureEnabled("enhanced_analytics", false)) {

1052

enhanceWithAnalytics(data, dataId);

1053

}

1054

1055

if (featureFlags.isFeatureEnabled("real_time_recommendations", false)) {

1056

addRealTimeRecommendations(data, dataId);

1057

}

1058

1059

if (featureFlags.isFeatureEnabled("advanced_caching", true)) {

1060

enableAdvancedCaching(data, dataId);

1061

}

1062

1063

// Feature-controlled response format

1064

if (featureFlags.isFeatureEnabled("response_compression", false)) {

1065

sendCompressedResponse(responder, data);

1066

} else {

1067

responder.sendJson(200, data);

1068

}

1069

1070

} catch (Exception e) {

1071

responder.sendError(500, "Error retrieving data: " + e.getMessage());

1072

}

1073

}

1074

1075

private JsonObject retrieveBaseData(String dataId) {

1076

// Core data retrieval logic

1077

JsonObject data = new JsonObject();

1078

data.addProperty("id", dataId);

1079

data.addProperty("timestamp", System.currentTimeMillis());

1080

return data;

1081

}

1082

1083

private void enhanceWithAnalytics(JsonObject data, String dataId) {

1084

// Enhanced analytics - controlled by feature flag

1085

data.addProperty("analytics_score", calculateAnalyticsScore(dataId));

1086

data.addProperty("trend_direction", getTrendDirection(dataId));

1087

}

1088

1089

private void addRealTimeRecommendations(JsonObject data, String dataId) {

1090

// Real-time recommendations - controlled by feature flag

1091

JsonArray recommendations = generateRecommendations(dataId);

1092

data.add("recommendations", recommendations);

1093

}

1094

1095

private void enableAdvancedCaching(JsonObject data, String dataId) {

1096

// Advanced caching logic - controlled by feature flag

1097

data.addProperty("cached", true);

1098

data.addProperty("cache_key", generateCacheKey(dataId));

1099

}

1100

}

1101

```

1102

1103

The Operational APIs in CDAP provide comprehensive monitoring, scheduling, retry handling, and feature management capabilities essential for running enterprise-grade data applications in production environments.