0
# Operational APIs
1
2
CDAP provides comprehensive operational APIs for monitoring, scheduling, retry handling, and feature management. These APIs enable production-ready applications with enterprise-grade operational capabilities including metrics collection, automated scheduling, resilient error handling, and dynamic feature control.
3
4
## Metrics Collection
5
6
The metrics system provides real-time monitoring and observability for all application components.
7
8
### Metrics Interface
9
10
```java { .api }
11
import io.cdap.cdap.api.metrics.*;
12
import java.util.Map;
13
14
// Core metrics interface
15
public interface Metrics {
16
17
// Counter metrics - track counts and rates
18
void count(String metricName, int delta);
19
20
default void countLong(String metricName, long delta) {
21
if (delta < Integer.MIN_VALUE || delta > Integer.MAX_VALUE) {
22
throw new IllegalArgumentException("Invalid delta value for metrics count: " + delta);
23
}
24
count(metricName, (int) delta);
25
}
26
27
// Gauge metrics - track current values
28
void gauge(String metricName, long value);
29
30
// Tagged metrics - add context dimensions
31
Metrics child(Map<String, String> tags);
32
33
// Convenience methods
34
default void increment(String metricName) {
35
count(metricName, 1);
36
}
37
38
default void increment(String metricName, int delta) {
39
count(metricName, delta);
40
}
41
42
default void decrement(String metricName) {
43
count(metricName, -1);
44
}
45
46
default void decrement(String metricName, int delta) {
47
count(metricName, -delta);
48
}
49
}
50
51
// Runtime metrics for program execution monitoring
52
public interface RuntimeMetrics {
53
Map<String, Long> getAllCounters();
54
Map<String, Long> getAllGauges();
55
long getCounter(String name);
56
long getGauge(String name);
57
}
58
```
59
60
### Metrics Usage Examples
61
62
```java { .api }
63
// Service with comprehensive metrics
64
public class DataProcessingService extends AbstractHttpServiceHandler {
65
66
private Metrics metrics;
67
68
@Override
69
public void initialize(HttpServiceContext context) throws Exception {
70
super.initialize(context);
71
this.metrics = context.getMetrics();
72
}
73
74
@GET
75
@Path("/process/{id}")
76
public void processData(HttpServiceRequest request, HttpServiceResponder responder,
77
@PathParam("id") String dataId) {
78
79
// Track request metrics
80
metrics.increment("requests.total");
81
metrics.increment("requests.process_data");
82
83
long startTime = System.currentTimeMillis();
84
85
try {
86
// Process data with detailed metrics
87
ProcessingResult result = processDataWithMetrics(dataId);
88
89
// Success metrics
90
metrics.increment("requests.success");
91
metrics.gauge("processing.last_success_time", System.currentTimeMillis());
92
metrics.gauge("processing.records_processed", result.getRecordCount());
93
metrics.gauge("processing.data_size_mb", result.getDataSizeMB());
94
95
responder.sendJson(200, result.toJson());
96
97
} catch (ValidationException e) {
98
// Validation error metrics
99
metrics.increment("errors.validation");
100
Metrics errorMetrics = metrics.child(Map.of("error_type", "validation"));
101
errorMetrics.increment("error_count");
102
103
responder.sendError(400, "Validation failed: " + e.getMessage());
104
105
} catch (ProcessingException e) {
106
// Processing error metrics
107
metrics.increment("errors.processing");
108
Metrics errorMetrics = metrics.child(Map.of("error_type", "processing"));
109
errorMetrics.increment("error_count");
110
111
responder.sendError(500, "Processing failed: " + e.getMessage());
112
113
} catch (Exception e) {
114
// General error metrics
115
metrics.increment("errors.unknown");
116
Metrics errorMetrics = metrics.child(Map.of("error_type", "unknown"));
117
errorMetrics.increment("error_count");
118
119
responder.sendError(500, "Internal error: " + e.getMessage());
120
121
} finally {
122
// Response time metrics
123
long duration = System.currentTimeMillis() - startTime;
124
metrics.gauge("processing.response_time_ms", duration);
125
126
// Update running averages
127
updateResponseTimeAverage(duration);
128
}
129
}
130
131
private ProcessingResult processDataWithMetrics(String dataId) throws Exception {
132
Table dataTable = getContext().getDataset("input_data");
133
Table resultsTable = getContext().getDataset("processed_results");
134
135
// Data access metrics
136
metrics.increment("dataset.reads");
137
long readStart = System.currentTimeMillis();
138
139
Row inputRow = dataTable.get(Bytes.toBytes(dataId));
140
if (inputRow.isEmpty()) {
141
metrics.increment("dataset.read_misses");
142
throw new ValidationException("Data not found: " + dataId);
143
}
144
145
metrics.gauge("dataset.read_latency_ms", System.currentTimeMillis() - readStart);
146
147
// Processing metrics
148
metrics.increment("processing.operations");
149
long processStart = System.currentTimeMillis();
150
151
ProcessingResult result = performDataTransformation(inputRow);
152
153
metrics.gauge("processing.operation_latency_ms", System.currentTimeMillis() - processStart);
154
155
// Data write metrics
156
metrics.increment("dataset.writes");
157
long writeStart = System.currentTimeMillis();
158
159
Put outputPut = new Put(Bytes.toBytes(dataId + "_processed"));
160
outputPut.add("result", "data", result.getData());
161
outputPut.add("result", "timestamp", System.currentTimeMillis());
162
outputPut.add("result", "record_count", result.getRecordCount());
163
164
resultsTable.put(outputPut);
165
166
metrics.gauge("dataset.write_latency_ms", System.currentTimeMillis() - writeStart);
167
168
return result;
169
}
170
171
private ProcessingResult performDataTransformation(Row inputRow) {
172
String inputData = inputRow.getString("data");
173
174
// Business logic metrics
175
metrics.increment("business_logic.transformations");
176
177
// Simulate complex processing with metrics
178
int recordCount = 0;
179
long dataSize = 0;
180
181
if (inputData != null) {
182
String[] records = inputData.split("\n");
183
recordCount = records.length;
184
dataSize = inputData.length();
185
186
// Track processing quality metrics
187
int validRecords = 0;
188
for (String record : records) {
189
if (isValidRecord(record)) {
190
validRecords++;
191
}
192
}
193
194
metrics.gauge("data_quality.valid_record_ratio",
195
(double) validRecords / recordCount);
196
metrics.gauge("data_quality.invalid_records", recordCount - validRecords);
197
}
198
199
return new ProcessingResult(inputData, recordCount, dataSize);
200
}
201
202
private void updateResponseTimeAverage(long duration) {
203
// Calculate and emit rolling averages
204
// This could use a sliding window or exponential moving average
205
metrics.gauge("processing.avg_response_time_ms", calculateAverage(duration));
206
}
207
208
private boolean isValidRecord(String record) {
209
return record != null && !record.trim().isEmpty() && record.contains(",");
210
}
211
212
private long calculateAverage(long newValue) {
213
// Implementation for calculating rolling average
214
return newValue; // Simplified
215
}
216
}
217
218
// Worker with operational metrics
219
public class DataMonitoringWorker extends AbstractWorker {
220
221
private volatile boolean running = false;
222
private Metrics metrics;
223
224
@Override
225
public void configure(WorkerConfigurer configurer) {
226
configurer.setName("DataMonitoringWorker");
227
configurer.setDescription("Monitors data pipeline health and performance");
228
}
229
230
@Override
231
public void initialize(WorkerContext context) throws Exception {
232
super.initialize(context);
233
this.metrics = context.getMetrics();
234
this.running = true;
235
}
236
237
@Override
238
public void run() throws Exception {
239
while (running) {
240
try {
241
// System health metrics
242
collectSystemHealthMetrics();
243
244
// Data pipeline metrics
245
collectDataPipelineMetrics();
246
247
// Application-specific metrics
248
collectApplicationMetrics();
249
250
// Sleep between collection cycles
251
Thread.sleep(30000); // 30 seconds
252
253
} catch (InterruptedException e) {
254
Thread.currentThread().interrupt();
255
break;
256
} catch (Exception e) {
257
LOG.error("Error collecting metrics", e);
258
metrics.increment("metrics_collection.errors");
259
260
// Continue operation even if metrics collection fails
261
Thread.sleep(10000);
262
}
263
}
264
}
265
266
@Override
267
public void stop() {
268
running = false;
269
}
270
271
private void collectSystemHealthMetrics() {
272
WorkerContext context = getContext();
273
274
// JVM metrics
275
Runtime runtime = Runtime.getRuntime();
276
long totalMemory = runtime.totalMemory();
277
long freeMemory = runtime.freeMemory();
278
long usedMemory = totalMemory - freeMemory;
279
280
metrics.gauge("system.memory.total_mb", totalMemory / (1024 * 1024));
281
metrics.gauge("system.memory.used_mb", usedMemory / (1024 * 1024));
282
metrics.gauge("system.memory.free_mb", freeMemory / (1024 * 1024));
283
metrics.gauge("system.memory.usage_percent", (usedMemory * 100) / totalMemory);
284
285
// Thread metrics
286
metrics.gauge("system.threads.active", Thread.activeCount());
287
288
// Garbage collection metrics (if available)
289
collectGCMetrics();
290
}
291
292
private void collectDataPipelineMetrics() throws Exception {
293
WorkerContext context = getContext();
294
295
// Check dataset sizes and health
296
checkDatasetMetrics("input_data", "input");
297
checkDatasetMetrics("processed_results", "output");
298
checkDatasetMetrics("error_records", "errors");
299
300
// Check processing lag
301
checkProcessingLag();
302
303
// Check data quality trends
304
checkDataQualityMetrics();
305
}
306
307
private void checkDatasetMetrics(String datasetName, String type) {
308
try {
309
Table dataset = getContext().getDataset(datasetName);
310
311
// Count records (simplified - in practice might sample)
312
long recordCount = countTableRecords(dataset);
313
metrics.gauge(String.format("dataset.%s.record_count", type), recordCount);
314
315
// Check recent activity
316
long recentRecords = countRecentRecords(dataset, System.currentTimeMillis() - 3600000); // Last hour
317
metrics.gauge(String.format("dataset.%s.recent_records", type), recentRecords);
318
319
metrics.increment(String.format("dataset.%s.health_checks", type));
320
321
} catch (Exception e) {
322
LOG.error("Failed to check metrics for dataset: {}", datasetName, e);
323
metrics.increment(String.format("dataset.%s.health_check_errors", type));
324
}
325
}
326
327
private void checkProcessingLag() {
328
// Calculate processing lag between input and output
329
// This is a simplified example
330
long inputCount = getDatasetRecordCount("input_data");
331
long outputCount = getDatasetRecordCount("processed_results");
332
long processingLag = inputCount - outputCount;
333
334
metrics.gauge("pipeline.processing_lag", Math.max(0, processingLag));
335
336
if (processingLag > 1000) {
337
metrics.increment("pipeline.high_lag_alerts");
338
}
339
}
340
341
private void checkDataQualityMetrics() {
342
// Check error rates and data quality trends
343
long errorCount = getDatasetRecordCount("error_records");
344
long totalProcessed = getDatasetRecordCount("processed_results");
345
346
if (totalProcessed > 0) {
347
double errorRate = (double) errorCount / (totalProcessed + errorCount);
348
metrics.gauge("data_quality.error_rate", (long) (errorRate * 10000)); // Store as basis points
349
350
if (errorRate > 0.05) { // 5% error rate threshold
351
metrics.increment("data_quality.high_error_rate_alerts");
352
}
353
}
354
}
355
356
private void collectApplicationMetrics() {
357
WorkerContext context = getContext();
358
359
// Application runtime metrics
360
long uptime = System.currentTimeMillis() - context.getLogicalStartTime();
361
metrics.gauge("application.uptime_minutes", uptime / (60 * 1000));
362
363
// Instance metrics
364
metrics.gauge("application.worker_instance_id", context.getInstanceId());
365
metrics.gauge("application.worker_instance_count", context.getInstanceCount());
366
367
// Runtime arguments metrics (for configuration tracking)
368
Map<String, String> runtimeArgs = context.getRuntimeArguments();
369
metrics.gauge("application.runtime_args_count", runtimeArgs.size());
370
}
371
372
private void collectGCMetrics() {
373
// Implementation for garbage collection metrics
374
// Would use JMX beans to get GC statistics
375
}
376
377
private long countTableRecords(Table table) {
378
// Simplified record counting - in practice might use sampling
379
long count = 0;
380
try (Scanner scanner = table.scan(null, null)) {
381
while (scanner.next() != null) {
382
count++;
383
if (count % 10000 == 0) {
384
// Emit progress for large datasets
385
metrics.gauge("metrics_collection.scan_progress", count);
386
}
387
}
388
} catch (Exception e) {
389
LOG.warn("Error counting table records", e);
390
}
391
return count;
392
}
393
394
private long countRecentRecords(Table table, long sinceTimestamp) {
395
// Count records created since timestamp
396
long count = 0;
397
try (Scanner scanner = table.scan(null, null)) {
398
Row row;
399
while ((row = scanner.next()) != null) {
400
Long timestamp = row.getLong("timestamp");
401
if (timestamp != null && timestamp > sinceTimestamp) {
402
count++;
403
}
404
}
405
} catch (Exception e) {
406
LOG.warn("Error counting recent records", e);
407
}
408
return count;
409
}
410
411
private long getDatasetRecordCount(String datasetName) {
412
try {
413
Table dataset = getContext().getDataset(datasetName);
414
return countTableRecords(dataset);
415
} catch (Exception e) {
416
LOG.error("Failed to get record count for dataset: {}", datasetName, e);
417
return 0;
418
}
419
}
420
}
421
```
422
423
## Scheduling System
424
425
CDAP provides a flexible scheduling system for automating program execution based on time, data availability, or other program completion events.
426
427
### Scheduling Interfaces
428
429
```java { .api }
430
import io.cdap.cdap.api.schedule.*;
431
432
// Schedule builder for creating schedules
433
public interface ScheduleBuilder {
434
435
// Set schedule name and description
436
ScheduleBuilder setName(String name);
437
ScheduleBuilder setDescription(String description);
438
439
// Configure schedule properties
440
ScheduleBuilder setProperties(Map<String, String> properties);
441
ScheduleBuilder setProperty(String key, String value);
442
443
// Set schedule constraints
444
ScheduleBuilder setMaxConcurrentRuns(int maxConcurrentRuns);
445
446
// Build the schedule
447
Schedule build();
448
}
449
450
// Trigger factory for creating different trigger types
451
public final class TriggerFactory {
452
453
// Time-based triggers
454
public static Trigger byTime(String cronExpression) { /* create cron trigger */ }
455
public static Trigger byFrequency(Duration duration) { /* create frequency trigger */ }
456
457
// Data-based triggers
458
public static Trigger onDataAvailable(String datasetName) { /* create data trigger */ }
459
public static Trigger onPartitionAvailable(String datasetName, int numPartitions) { /* create partition trigger */ }
460
461
// Program status triggers
462
public static Trigger onProgramStatus(ProgramType programType, String applicationName,
463
String programName, Set<ProgramStatus> programStatuses) {
464
/* create program status trigger */
465
}
466
467
// Composite triggers
468
public static Trigger and(Trigger... triggers) { /* create AND trigger */ }
469
public static Trigger or(Trigger... triggers) { /* create OR trigger */ }
470
}
471
472
// Trigger information interfaces
473
public interface TriggerInfo {
474
String getName();
475
String getDescription();
476
TriggerType getType();
477
Map<String, String> getProperties();
478
}
479
480
public interface ProgramStatusTriggerInfo extends TriggerInfo {
481
String getApplicationName();
482
String getProgramName();
483
ProgramType getProgramType();
484
Set<ProgramStatus> getProgramStatuses();
485
}
486
487
public interface PartitionTriggerInfo extends TriggerInfo {
488
String getDatasetName();
489
int getNumPartitions();
490
}
491
492
// Triggering schedule information
493
public interface TriggeringScheduleInfo {
494
String getName();
495
String getDescription();
496
List<TriggerInfo> getTriggerInfos();
497
Map<String, String> getProperties();
498
}
499
```
500
501
### Scheduling Examples
502
503
```java { .api }
504
// Application with comprehensive scheduling
505
public class ScheduledDataPipelineApp extends AbstractApplication {
506
507
@Override
508
public void configure(ApplicationConfigurer configurer, ApplicationContext context) {
509
configurer.setName("ScheduledDataPipeline");
510
configurer.setDescription("Data pipeline with various scheduling patterns");
511
512
// Add programs
513
configurer.addMapReduce(new DailyETLMapReduce());
514
configurer.addSpark(new RealTimeAggregationSpark());
515
configurer.addWorkflow(new DataProcessingWorkflow());
516
configurer.addWorker(new DataValidationWorker());
517
518
// Time-based scheduling - Daily ETL at 2 AM
519
configurer.schedule(
520
ScheduleBuilder.create("daily-etl-schedule")
521
.setDescription("Daily ETL processing at 2 AM")
522
.triggerByTime("0 2 * * *") // Cron: 2 AM daily
523
.setMaxConcurrentRuns(1)
524
.setProperty("processing.mode", "batch")
525
.setProperty("data.retention.days", "90")
526
.build()
527
.programName("DailyETLMapReduce")
528
);
529
530
// Frequency-based scheduling - Hourly aggregation
531
configurer.schedule(
532
ScheduleBuilder.create("hourly-aggregation-schedule")
533
.setDescription("Hourly real-time data aggregation")
534
.triggerByFrequency(Duration.ofHours(1))
535
.setMaxConcurrentRuns(2)
536
.setProperty("aggregation.window", "1h")
537
.build()
538
.programName("RealTimeAggregationSpark")
539
);
540
541
// Data availability scheduling - Process when new data arrives
542
configurer.schedule(
543
ScheduleBuilder.create("data-driven-schedule")
544
.setDescription("Process workflow when new partitions are available")
545
.triggerOnDataAvailable("incoming_data", 3) // Wait for 3 new partitions
546
.setMaxConcurrentRuns(1)
547
.setProperty("processing.trigger", "data-availability")
548
.build()
549
.programName("DataProcessingWorkflow")
550
);
551
552
// Program dependency scheduling - Start validation after ETL completes
553
configurer.schedule(
554
ScheduleBuilder.create("post-etl-validation-schedule")
555
.setDescription("Run validation after ETL completion")
556
.triggerOnProgramStatus(
557
ProgramType.MAPREDUCE,
558
"ScheduledDataPipeline",
559
"DailyETLMapReduce",
560
Set.of(ProgramStatus.COMPLETED)
561
)
562
.setMaxConcurrentRuns(1)
563
.setProperty("validation.mode", "post-processing")
564
.build()
565
.programName("DataValidationWorker")
566
);
567
568
// Complex composite scheduling - Multiple conditions
569
configurer.schedule(
570
ScheduleBuilder.create("complex-schedule")
571
.setDescription("Complex trigger combining time and data availability")
572
.triggerByComposite(
573
TriggerFactory.and(
574
TriggerFactory.byTime("0 */6 * * *"), // Every 6 hours
575
TriggerFactory.onDataAvailable("quality_metrics") // And when quality data available
576
)
577
)
578
.setMaxConcurrentRuns(1)
579
.setProperty("processing.type", "quality-analysis")
580
.build()
581
.programName("DataProcessingWorkflow")
582
);
583
}
584
}
585
586
// Workflow that responds to schedule context
587
public class DataProcessingWorkflow extends AbstractWorkflow {
588
589
@Override
590
public void configure(WorkflowConfigurer configurer) {
591
configurer.setName("DataProcessingWorkflow");
592
configurer.setDescription("Processes data based on schedule triggers");
593
594
// Add conditional processing based on schedule properties
595
configurer.addAction(new ScheduleAwareAction());
596
597
configurer.condition(new ScheduleBasedCondition())
598
.addMapReduce("BatchDataProcessor")
599
.addSpark("AdvancedAnalyticsSpark")
600
.otherwise()
601
.addAction(new LightweightProcessingAction())
602
.end();
603
604
configurer.addAction(new CompletionNotificationAction());
605
}
606
607
// Custom action that adapts behavior based on schedule
608
public static class ScheduleAwareAction extends AbstractCustomAction {
609
610
@Override
611
public void configure(CustomActionConfigurer configurer) {
612
configurer.setName("ScheduleAwareAction");
613
configurer.setDescription("Adapts processing based on triggering schedule");
614
}
615
616
@Override
617
public void run(CustomActionContext context) throws Exception {
618
WorkflowToken token = context.getWorkflowToken();
619
620
// Get schedule information
621
TriggeringScheduleInfo scheduleInfo = context.getTriggeringScheduleInfo();
622
623
if (scheduleInfo != null) {
624
String scheduleName = scheduleInfo.getName();
625
token.put("schedule.name", scheduleName);
626
token.put("schedule.description", scheduleInfo.getDescription());
627
628
// Adapt processing based on schedule properties
629
Map<String, String> scheduleProperties = scheduleInfo.getProperties();
630
String processingMode = scheduleProperties.get("processing.mode");
631
632
if ("batch".equals(processingMode)) {
633
// Configure for batch processing
634
token.put("processing.batch_size", "10000");
635
token.put("processing.parallelism", "high");
636
token.put("processing.memory_limit", "8GB");
637
} else {
638
// Configure for real-time processing
639
token.put("processing.batch_size", "1000");
640
token.put("processing.parallelism", "medium");
641
token.put("processing.memory_limit", "2GB");
642
}
643
644
// Handle different trigger types
645
List<TriggerInfo> triggers = scheduleInfo.getTriggerInfos();
646
for (TriggerInfo trigger : triggers) {
647
processTriggerInfo(trigger, token, context);
648
}
649
650
context.getMetrics().increment("schedule.triggered_runs");
651
Metrics scheduleMetrics = context.getMetrics().child(
652
Map.of("schedule_name", scheduleName)
653
);
654
scheduleMetrics.increment("executions");
655
656
} else {
657
// Manual execution
658
token.put("execution.type", "manual");
659
token.put("processing.batch_size", "5000");
660
context.getMetrics().increment("schedule.manual_runs");
661
}
662
}
663
664
private void processTriggerInfo(TriggerInfo trigger, WorkflowToken token,
665
CustomActionContext context) {
666
667
token.put("trigger.type", trigger.getType().name());
668
token.put("trigger.name", trigger.getName());
669
670
switch (trigger.getType()) {
671
case TIME:
672
// Time-based trigger processing
673
token.put("trigger.execution_time", String.valueOf(System.currentTimeMillis()));
674
break;
675
676
case PARTITION:
677
// Data partition trigger processing
678
if (trigger instanceof PartitionTriggerInfo) {
679
PartitionTriggerInfo partitionTrigger = (PartitionTriggerInfo) trigger;
680
token.put("trigger.dataset", partitionTrigger.getDatasetName());
681
token.put("trigger.partitions", String.valueOf(partitionTrigger.getNumPartitions()));
682
}
683
break;
684
685
case PROGRAM_STATUS:
686
// Program status trigger processing
687
if (trigger instanceof ProgramStatusTriggerInfo) {
688
ProgramStatusTriggerInfo statusTrigger = (ProgramStatusTriggerInfo) trigger;
689
token.put("trigger.program", statusTrigger.getProgramName());
690
token.put("trigger.application", statusTrigger.getApplicationName());
691
token.put("trigger.program_type", statusTrigger.getProgramType().name());
692
}
693
break;
694
695
default:
696
LOG.warn("Unknown trigger type: {}", trigger.getType());
697
}
698
}
699
}
700
701
// Condition that makes decisions based on schedule context
702
public static class ScheduleBasedCondition implements Predicate<WorkflowContext> {
703
704
@Override
705
public boolean apply(WorkflowContext context) {
706
WorkflowToken token = context.getToken();
707
708
// Decision based on schedule properties
709
String processingMode = token.get("processing.mode").toString();
710
String triggerType = token.get("trigger.type").toString();
711
712
// Use heavy processing for batch schedules or time-based triggers
713
return "batch".equals(processingMode) || "TIME".equals(triggerType);
714
}
715
}
716
}
717
```
718
719
## Retry Policies and Error Handling
720
721
CDAP provides robust retry mechanisms and error handling patterns for building resilient applications.
722
723
### Retry Framework
724
725
```java { .api }
726
import io.cdap.cdap.api.retry.*;
727
728
// Retryable exception marker
729
public class RetryableException extends Exception {
730
public RetryableException(String message) { super(message); }
731
public RetryableException(String message, Throwable cause) { super(message, cause); }
732
}
733
734
// Exception when retries are exhausted
735
public class RetriesExhaustedException extends Exception {
736
private final int attemptsMade;
737
private final Exception lastException;
738
739
public RetriesExhaustedException(int attemptsMade, Exception lastException) {
740
super(String.format("Retries exhausted after %d attempts", attemptsMade), lastException);
741
this.attemptsMade = attemptsMade;
742
this.lastException = lastException;
743
}
744
745
public int getAttemptsMade() { return attemptsMade; }
746
public Exception getLastException() { return lastException; }
747
}
748
749
// Idempotency levels for retry safety
750
public enum Idempotency {
751
IDEMPOTENT, // Safe to retry without side effects
752
NOT_IDEMPOTENT, // Retries may cause duplicate side effects
753
UNKNOWN // Idempotency is unknown
754
}
755
```
756
757
### Retry Implementation Examples
758
759
```java { .api }
760
// Service with comprehensive retry logic
761
public class ResilientDataService extends AbstractHttpServiceHandler {
762
763
private static final int MAX_RETRIES = 3;
764
private static final long INITIAL_DELAY_MS = 1000;
765
private static final double BACKOFF_MULTIPLIER = 2.0;
766
767
@POST
768
@Path("/process")
769
public void processWithRetries(HttpServiceRequest request, HttpServiceResponder responder) {
770
Metrics metrics = getContext().getMetrics();
771
772
try {
773
String content = Charset.forName("UTF-8").decode(
774
ByteBuffer.wrap(request.getContent())).toString();
775
776
ProcessingResult result = executeWithRetries(
777
() -> processData(content),
778
MAX_RETRIES,
779
INITIAL_DELAY_MS,
780
metrics
781
);
782
783
responder.sendJson(200, result.toJson());
784
785
} catch (RetriesExhaustedException e) {
786
LOG.error("Processing failed after {} retries", e.getAttemptsMade(), e);
787
metrics.increment("processing.retries_exhausted");
788
responder.sendError(500, "Processing failed after retries: " + e.getLastException().getMessage());
789
790
} catch (Exception e) {
791
LOG.error("Processing failed with non-retryable error", e);
792
metrics.increment("processing.non_retryable_errors");
793
responder.sendError(500, "Processing failed: " + e.getMessage());
794
}
795
}
796
797
private <T> T executeWithRetries(RetryableOperation<T> operation, int maxRetries,
798
long initialDelayMs, Metrics metrics)
799
throws RetriesExhaustedException {
800
801
Exception lastException = null;
802
long delay = initialDelayMs;
803
804
for (int attempt = 1; attempt <= maxRetries; attempt++) {
805
try {
806
T result = operation.execute();
807
808
if (attempt > 1) {
809
metrics.increment("retries.successful");
810
metrics.gauge("retries.attempts_before_success", attempt);
811
}
812
813
return result;
814
815
} catch (RetryableException e) {
816
lastException = e;
817
metrics.increment("retries.retryable_failures");
818
819
LOG.warn("Retryable error on attempt {} of {}: {}", attempt, maxRetries, e.getMessage());
820
821
if (attempt < maxRetries) {
822
try {
823
Thread.sleep(delay);
824
delay = (long) (delay * BACKOFF_MULTIPLIER);
825
} catch (InterruptedException ie) {
826
Thread.currentThread().interrupt();
827
throw new RetriesExhaustedException(attempt, e);
828
}
829
}
830
831
} catch (Exception e) {
832
// Non-retryable exception
833
metrics.increment("retries.non_retryable_failures");
834
throw e;
835
}
836
}
837
838
metrics.increment("retries.exhausted");
839
throw new RetriesExhaustedException(maxRetries, lastException);
840
}
841
842
private ProcessingResult processData(String data) throws RetryableException {
843
try {
844
// Simulate processing that might fail transiently
845
if (isTransientFailureCondition()) {
846
throw new RetryableException("Transient processing failure - external service unavailable");
847
}
848
849
// Perform actual processing
850
return performProcessing(data);
851
852
} catch (NetworkException e) {
853
// Network issues are typically retryable
854
throw new RetryableException("Network error during processing", e);
855
856
} catch (TemporaryResourceException e) {
857
// Temporary resource issues are retryable
858
throw new RetryableException("Temporary resource unavailable", e);
859
860
} catch (ValidationException e) {
861
// Validation errors are not retryable
862
throw e;
863
}
864
}
865
866
private boolean isTransientFailureCondition() {
867
// Simulate transient failure conditions
868
return Math.random() < 0.3; // 30% chance of transient failure
869
}
870
871
private ProcessingResult performProcessing(String data) throws ValidationException {
872
if (data == null || data.trim().isEmpty()) {
873
throw new ValidationException("Input data cannot be empty");
874
}
875
876
// Simulate processing
877
return new ProcessingResult("processed: " + data, 1, data.length());
878
}
879
880
@FunctionalInterface
881
private interface RetryableOperation<T> {
882
T execute() throws Exception;
883
}
884
}
885
886
// MapReduce with retry logic for external system integration
887
public class ResilientDataExtractionMapReduce extends AbstractMapReduce {
888
889
public static class ExternalDataMapper extends Mapper<byte[], Row, Text, Text> {
890
891
private ExternalDataClient externalClient;
892
private Metrics metrics;
893
private int maxRetries;
894
private long retryDelay;
895
896
@Override
897
protected void setup(Context context) throws IOException, InterruptedException {
898
Configuration conf = context.getConfiguration();
899
900
// Initialize external client with retry configuration
901
externalClient = new ExternalDataClient(conf.get("external.service.url"));
902
maxRetries = conf.getInt("retry.max_attempts", 3);
903
retryDelay = conf.getLong("retry.initial_delay_ms", 1000);
904
905
// Get metrics from context
906
MapReduceTaskContext<?, ?> taskContext = (MapReduceTaskContext<?, ?>) context;
907
metrics = taskContext.getMetrics();
908
}
909
910
@Override
911
protected void map(byte[] key, Row row, Context context)
912
throws IOException, InterruptedException {
913
914
String recordId = row.getString("id");
915
916
try {
917
// Attempt to enrich data with external service
918
String enrichedData = executeWithRetries(() -> {
919
return externalClient.fetchData(recordId);
920
});
921
922
// Output successful result
923
context.write(new Text(recordId), new Text(enrichedData));
924
metrics.increment("external_calls.success");
925
926
} catch (RetriesExhaustedException e) {
927
// Handle exhausted retries
928
LOG.error("Failed to fetch external data for record {} after {} retries",
929
recordId, e.getAttemptsMade(), e);
930
931
metrics.increment("external_calls.retries_exhausted");
932
933
// Write to error output or skip record
934
context.write(new Text("ERROR:" + recordId),
935
new Text("Failed after retries: " + e.getLastException().getMessage()));
936
937
} catch (Exception e) {
938
// Handle non-retryable errors
939
LOG.error("Non-retryable error for record {}", recordId, e);
940
metrics.increment("external_calls.non_retryable_errors");
941
942
context.write(new Text("ERROR:" + recordId),
943
new Text("Non-retryable error: " + e.getMessage()));
944
}
945
}
946
947
private String executeWithRetries(ExternalDataOperation operation)
948
throws RetriesExhaustedException {
949
950
Exception lastException = null;
951
long delay = retryDelay;
952
953
for (int attempt = 1; attempt <= maxRetries; attempt++) {
954
try {
955
String result = operation.execute();
956
957
if (attempt > 1) {
958
metrics.increment("external_calls.retry_success");
959
metrics.gauge("external_calls.attempts_before_success", attempt);
960
}
961
962
return result;
963
964
} catch (ExternalServiceException e) {
965
lastException = e;
966
967
if (e.isRetryable()) {
968
metrics.increment("external_calls.retryable_failures");
969
LOG.warn("Retryable external service error on attempt {} of {}: {}",
970
attempt, maxRetries, e.getMessage());
971
972
if (attempt < maxRetries) {
973
try {
974
Thread.sleep(delay);
975
delay *= 2; // Exponential backoff
976
} catch (InterruptedException ie) {
977
Thread.currentThread().interrupt();
978
throw new RetriesExhaustedException(attempt, e);
979
}
980
}
981
} else {
982
// Non-retryable external service error
983
throw e;
984
}
985
986
} catch (Exception e) {
987
// Other non-retryable exceptions
988
throw e;
989
}
990
}
991
992
throw new RetriesExhaustedException(maxRetries, lastException);
993
}
994
995
@FunctionalInterface
996
private interface ExternalDataOperation {
997
String execute() throws ExternalServiceException;
998
}
999
1000
@Override
1001
protected void cleanup(Context context) throws IOException, InterruptedException {
1002
if (externalClient != null) {
1003
externalClient.close();
1004
}
1005
}
1006
}
1007
}
1008
```
1009
1010
## Feature Flags
1011
1012
CDAP provides feature flag support for dynamic application behavior control.
1013
1014
### Feature Flags Interface
1015
1016
```java { .api }
1017
import io.cdap.cdap.api.feature.*;
1018
1019
// Feature flags provider interface
1020
public interface FeatureFlagsProvider {
1021
boolean isFeatureEnabled(String featureName);
1022
1023
default boolean isFeatureEnabled(String featureName, boolean defaultValue) {
1024
try {
1025
return isFeatureEnabled(featureName);
1026
} catch (Exception e) {
1027
return defaultValue;
1028
}
1029
}
1030
}
1031
```
1032
1033
### Feature Flags Usage
1034
1035
```java { .api }
1036
// Service with feature-controlled behavior
1037
public class FeatureControlledService extends AbstractHttpServiceHandler {
1038
1039
@GET
1040
@Path("/data/{id}")
1041
public void getData(HttpServiceRequest request, HttpServiceResponder responder,
1042
@PathParam("id") String dataId) {
1043
1044
FeatureFlagsProvider featureFlags = getContext();
1045
1046
try {
1047
// Core data retrieval
1048
JsonObject data = retrieveBaseData(dataId);
1049
1050
// Feature-controlled enhancements
1051
if (featureFlags.isFeatureEnabled("enhanced_analytics", false)) {
1052
enhanceWithAnalytics(data, dataId);
1053
}
1054
1055
if (featureFlags.isFeatureEnabled("real_time_recommendations", false)) {
1056
addRealTimeRecommendations(data, dataId);
1057
}
1058
1059
if (featureFlags.isFeatureEnabled("advanced_caching", true)) {
1060
enableAdvancedCaching(data, dataId);
1061
}
1062
1063
// Feature-controlled response format
1064
if (featureFlags.isFeatureEnabled("response_compression", false)) {
1065
sendCompressedResponse(responder, data);
1066
} else {
1067
responder.sendJson(200, data);
1068
}
1069
1070
} catch (Exception e) {
1071
responder.sendError(500, "Error retrieving data: " + e.getMessage());
1072
}
1073
}
1074
1075
private JsonObject retrieveBaseData(String dataId) {
1076
// Core data retrieval logic
1077
JsonObject data = new JsonObject();
1078
data.addProperty("id", dataId);
1079
data.addProperty("timestamp", System.currentTimeMillis());
1080
return data;
1081
}
1082
1083
private void enhanceWithAnalytics(JsonObject data, String dataId) {
1084
// Enhanced analytics - controlled by feature flag
1085
data.addProperty("analytics_score", calculateAnalyticsScore(dataId));
1086
data.addProperty("trend_direction", getTrendDirection(dataId));
1087
}
1088
1089
private void addRealTimeRecommendations(JsonObject data, String dataId) {
1090
// Real-time recommendations - controlled by feature flag
1091
JsonArray recommendations = generateRecommendations(dataId);
1092
data.add("recommendations", recommendations);
1093
}
1094
1095
private void enableAdvancedCaching(JsonObject data, String dataId) {
1096
// Advanced caching logic - controlled by feature flag
1097
data.addProperty("cached", true);
1098
data.addProperty("cache_key", generateCacheKey(dataId));
1099
}
1100
}
1101
```
1102
1103
The Operational APIs in CDAP provide comprehensive monitoring, scheduling, retry handling, and feature management capabilities essential for running enterprise-grade data applications in production environments.