The Cask Data Application Platform (CDAP) is an integrated, open source application development platform for the Hadoop ecosystem that provides developers with data and application abstractions to simplify and accelerate application development.
—
CDAP provides comprehensive operational APIs for monitoring, scheduling, retry handling, and feature management. These APIs enable production-ready applications with enterprise-grade operational capabilities including metrics collection, automated scheduling, resilient error handling, and dynamic feature control.
The metrics system provides real-time monitoring and observability for all application components.
import io.cdap.cdap.api.metrics.*;
import java.util.Map;
// Core metrics interface
public interface Metrics {
// Counter metrics - track counts and rates
void count(String metricName, int delta);
default void countLong(String metricName, long delta) {
if (delta < Integer.MIN_VALUE || delta > Integer.MAX_VALUE) {
throw new IllegalArgumentException("Invalid delta value for metrics count: " + delta);
}
count(metricName, (int) delta);
}
// Gauge metrics - track current values
void gauge(String metricName, long value);
// Tagged metrics - add context dimensions
Metrics child(Map<String, String> tags);
// Convenience methods
default void increment(String metricName) {
count(metricName, 1);
}
default void increment(String metricName, int delta) {
count(metricName, delta);
}
default void decrement(String metricName) {
count(metricName, -1);
}
default void decrement(String metricName, int delta) {
count(metricName, -delta);
}
}
// Runtime metrics for program execution monitoring
public interface RuntimeMetrics {
Map<String, Long> getAllCounters();
Map<String, Long> getAllGauges();
long getCounter(String name);
long getGauge(String name);
}// Service with comprehensive metrics
public class DataProcessingService extends AbstractHttpServiceHandler {
private Metrics metrics;
@Override
public void initialize(HttpServiceContext context) throws Exception {
super.initialize(context);
this.metrics = context.getMetrics();
}
@GET
@Path("/process/{id}")
public void processData(HttpServiceRequest request, HttpServiceResponder responder,
@PathParam("id") String dataId) {
// Track request metrics
metrics.increment("requests.total");
metrics.increment("requests.process_data");
long startTime = System.currentTimeMillis();
try {
// Process data with detailed metrics
ProcessingResult result = processDataWithMetrics(dataId);
// Success metrics
metrics.increment("requests.success");
metrics.gauge("processing.last_success_time", System.currentTimeMillis());
metrics.gauge("processing.records_processed", result.getRecordCount());
metrics.gauge("processing.data_size_mb", result.getDataSizeMB());
responder.sendJson(200, result.toJson());
} catch (ValidationException e) {
// Validation error metrics
metrics.increment("errors.validation");
Metrics errorMetrics = metrics.child(Map.of("error_type", "validation"));
errorMetrics.increment("error_count");
responder.sendError(400, "Validation failed: " + e.getMessage());
} catch (ProcessingException e) {
// Processing error metrics
metrics.increment("errors.processing");
Metrics errorMetrics = metrics.child(Map.of("error_type", "processing"));
errorMetrics.increment("error_count");
responder.sendError(500, "Processing failed: " + e.getMessage());
} catch (Exception e) {
// General error metrics
metrics.increment("errors.unknown");
Metrics errorMetrics = metrics.child(Map.of("error_type", "unknown"));
errorMetrics.increment("error_count");
responder.sendError(500, "Internal error: " + e.getMessage());
} finally {
// Response time metrics
long duration = System.currentTimeMillis() - startTime;
metrics.gauge("processing.response_time_ms", duration);
// Update running averages
updateResponseTimeAverage(duration);
}
}
private ProcessingResult processDataWithMetrics(String dataId) throws Exception {
Table dataTable = getContext().getDataset("input_data");
Table resultsTable = getContext().getDataset("processed_results");
// Data access metrics
metrics.increment("dataset.reads");
long readStart = System.currentTimeMillis();
Row inputRow = dataTable.get(Bytes.toBytes(dataId));
if (inputRow.isEmpty()) {
metrics.increment("dataset.read_misses");
throw new ValidationException("Data not found: " + dataId);
}
metrics.gauge("dataset.read_latency_ms", System.currentTimeMillis() - readStart);
// Processing metrics
metrics.increment("processing.operations");
long processStart = System.currentTimeMillis();
ProcessingResult result = performDataTransformation(inputRow);
metrics.gauge("processing.operation_latency_ms", System.currentTimeMillis() - processStart);
// Data write metrics
metrics.increment("dataset.writes");
long writeStart = System.currentTimeMillis();
Put outputPut = new Put(Bytes.toBytes(dataId + "_processed"));
outputPut.add("result", "data", result.getData());
outputPut.add("result", "timestamp", System.currentTimeMillis());
outputPut.add("result", "record_count", result.getRecordCount());
resultsTable.put(outputPut);
metrics.gauge("dataset.write_latency_ms", System.currentTimeMillis() - writeStart);
return result;
}
private ProcessingResult performDataTransformation(Row inputRow) {
String inputData = inputRow.getString("data");
// Business logic metrics
metrics.increment("business_logic.transformations");
// Simulate complex processing with metrics
int recordCount = 0;
long dataSize = 0;
if (inputData != null) {
String[] records = inputData.split("\n");
recordCount = records.length;
dataSize = inputData.length();
// Track processing quality metrics
int validRecords = 0;
for (String record : records) {
if (isValidRecord(record)) {
validRecords++;
}
}
metrics.gauge("data_quality.valid_record_ratio",
(double) validRecords / recordCount);
metrics.gauge("data_quality.invalid_records", recordCount - validRecords);
}
return new ProcessingResult(inputData, recordCount, dataSize);
}
private void updateResponseTimeAverage(long duration) {
// Calculate and emit rolling averages
// This could use a sliding window or exponential moving average
metrics.gauge("processing.avg_response_time_ms", calculateAverage(duration));
}
private boolean isValidRecord(String record) {
return record != null && !record.trim().isEmpty() && record.contains(",");
}
private long calculateAverage(long newValue) {
// Implementation for calculating rolling average
return newValue; // Simplified
}
}
// Worker with operational metrics
public class DataMonitoringWorker extends AbstractWorker {
private volatile boolean running = false;
private Metrics metrics;
@Override
public void configure(WorkerConfigurer configurer) {
configurer.setName("DataMonitoringWorker");
configurer.setDescription("Monitors data pipeline health and performance");
}
@Override
public void initialize(WorkerContext context) throws Exception {
super.initialize(context);
this.metrics = context.getMetrics();
this.running = true;
}
@Override
public void run() throws Exception {
while (running) {
try {
// System health metrics
collectSystemHealthMetrics();
// Data pipeline metrics
collectDataPipelineMetrics();
// Application-specific metrics
collectApplicationMetrics();
// Sleep between collection cycles
Thread.sleep(30000); // 30 seconds
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
LOG.error("Error collecting metrics", e);
metrics.increment("metrics_collection.errors");
// Continue operation even if metrics collection fails
Thread.sleep(10000);
}
}
}
@Override
public void stop() {
running = false;
}
private void collectSystemHealthMetrics() {
WorkerContext context = getContext();
// JVM metrics
Runtime runtime = Runtime.getRuntime();
long totalMemory = runtime.totalMemory();
long freeMemory = runtime.freeMemory();
long usedMemory = totalMemory - freeMemory;
metrics.gauge("system.memory.total_mb", totalMemory / (1024 * 1024));
metrics.gauge("system.memory.used_mb", usedMemory / (1024 * 1024));
metrics.gauge("system.memory.free_mb", freeMemory / (1024 * 1024));
metrics.gauge("system.memory.usage_percent", (usedMemory * 100) / totalMemory);
// Thread metrics
metrics.gauge("system.threads.active", Thread.activeCount());
// Garbage collection metrics (if available)
collectGCMetrics();
}
private void collectDataPipelineMetrics() throws Exception {
WorkerContext context = getContext();
// Check dataset sizes and health
checkDatasetMetrics("input_data", "input");
checkDatasetMetrics("processed_results", "output");
checkDatasetMetrics("error_records", "errors");
// Check processing lag
checkProcessingLag();
// Check data quality trends
checkDataQualityMetrics();
}
private void checkDatasetMetrics(String datasetName, String type) {
try {
Table dataset = getContext().getDataset(datasetName);
// Count records (simplified - in practice might sample)
long recordCount = countTableRecords(dataset);
metrics.gauge(String.format("dataset.%s.record_count", type), recordCount);
// Check recent activity
long recentRecords = countRecentRecords(dataset, System.currentTimeMillis() - 3600000); // Last hour
metrics.gauge(String.format("dataset.%s.recent_records", type), recentRecords);
metrics.increment(String.format("dataset.%s.health_checks", type));
} catch (Exception e) {
LOG.error("Failed to check metrics for dataset: {}", datasetName, e);
metrics.increment(String.format("dataset.%s.health_check_errors", type));
}
}
private void checkProcessingLag() {
// Calculate processing lag between input and output
// This is a simplified example
long inputCount = getDatasetRecordCount("input_data");
long outputCount = getDatasetRecordCount("processed_results");
long processingLag = inputCount - outputCount;
metrics.gauge("pipeline.processing_lag", Math.max(0, processingLag));
if (processingLag > 1000) {
metrics.increment("pipeline.high_lag_alerts");
}
}
private void checkDataQualityMetrics() {
// Check error rates and data quality trends
long errorCount = getDatasetRecordCount("error_records");
long totalProcessed = getDatasetRecordCount("processed_results");
if (totalProcessed > 0) {
double errorRate = (double) errorCount / (totalProcessed + errorCount);
metrics.gauge("data_quality.error_rate", (long) (errorRate * 10000)); // Store as basis points
if (errorRate > 0.05) { // 5% error rate threshold
metrics.increment("data_quality.high_error_rate_alerts");
}
}
}
private void collectApplicationMetrics() {
WorkerContext context = getContext();
// Application runtime metrics
long uptime = System.currentTimeMillis() - context.getLogicalStartTime();
metrics.gauge("application.uptime_minutes", uptime / (60 * 1000));
// Instance metrics
metrics.gauge("application.worker_instance_id", context.getInstanceId());
metrics.gauge("application.worker_instance_count", context.getInstanceCount());
// Runtime arguments metrics (for configuration tracking)
Map<String, String> runtimeArgs = context.getRuntimeArguments();
metrics.gauge("application.runtime_args_count", runtimeArgs.size());
}
private void collectGCMetrics() {
// Implementation for garbage collection metrics
// Would use JMX beans to get GC statistics
}
private long countTableRecords(Table table) {
// Simplified record counting - in practice might use sampling
long count = 0;
try (Scanner scanner = table.scan(null, null)) {
while (scanner.next() != null) {
count++;
if (count % 10000 == 0) {
// Emit progress for large datasets
metrics.gauge("metrics_collection.scan_progress", count);
}
}
} catch (Exception e) {
LOG.warn("Error counting table records", e);
}
return count;
}
private long countRecentRecords(Table table, long sinceTimestamp) {
// Count records created since timestamp
long count = 0;
try (Scanner scanner = table.scan(null, null)) {
Row row;
while ((row = scanner.next()) != null) {
Long timestamp = row.getLong("timestamp");
if (timestamp != null && timestamp > sinceTimestamp) {
count++;
}
}
} catch (Exception e) {
LOG.warn("Error counting recent records", e);
}
return count;
}
private long getDatasetRecordCount(String datasetName) {
try {
Table dataset = getContext().getDataset(datasetName);
return countTableRecords(dataset);
} catch (Exception e) {
LOG.error("Failed to get record count for dataset: {}", datasetName, e);
return 0;
}
}
}CDAP provides a flexible scheduling system for automating program execution based on time, data availability, or other program completion events.
import io.cdap.cdap.api.schedule.*;
// Schedule builder for creating schedules
public interface ScheduleBuilder {
// Set schedule name and description
ScheduleBuilder setName(String name);
ScheduleBuilder setDescription(String description);
// Configure schedule properties
ScheduleBuilder setProperties(Map<String, String> properties);
ScheduleBuilder setProperty(String key, String value);
// Set schedule constraints
ScheduleBuilder setMaxConcurrentRuns(int maxConcurrentRuns);
// Build the schedule
Schedule build();
}
// Trigger factory for creating different trigger types
public final class TriggerFactory {
// Time-based triggers
public static Trigger byTime(String cronExpression) { /* create cron trigger */ }
public static Trigger byFrequency(Duration duration) { /* create frequency trigger */ }
// Data-based triggers
public static Trigger onDataAvailable(String datasetName) { /* create data trigger */ }
public static Trigger onPartitionAvailable(String datasetName, int numPartitions) { /* create partition trigger */ }
// Program status triggers
public static Trigger onProgramStatus(ProgramType programType, String applicationName,
String programName, Set<ProgramStatus> programStatuses) {
/* create program status trigger */
}
// Composite triggers
public static Trigger and(Trigger... triggers) { /* create AND trigger */ }
public static Trigger or(Trigger... triggers) { /* create OR trigger */ }
}
// Trigger information interfaces
public interface TriggerInfo {
String getName();
String getDescription();
TriggerType getType();
Map<String, String> getProperties();
}
public interface ProgramStatusTriggerInfo extends TriggerInfo {
String getApplicationName();
String getProgramName();
ProgramType getProgramType();
Set<ProgramStatus> getProgramStatuses();
}
public interface PartitionTriggerInfo extends TriggerInfo {
String getDatasetName();
int getNumPartitions();
}
// Triggering schedule information
public interface TriggeringScheduleInfo {
String getName();
String getDescription();
List<TriggerInfo> getTriggerInfos();
Map<String, String> getProperties();
}// Application with comprehensive scheduling
public class ScheduledDataPipelineApp extends AbstractApplication {
@Override
public void configure(ApplicationConfigurer configurer, ApplicationContext context) {
configurer.setName("ScheduledDataPipeline");
configurer.setDescription("Data pipeline with various scheduling patterns");
// Add programs
configurer.addMapReduce(new DailyETLMapReduce());
configurer.addSpark(new RealTimeAggregationSpark());
configurer.addWorkflow(new DataProcessingWorkflow());
configurer.addWorker(new DataValidationWorker());
// Time-based scheduling - Daily ETL at 2 AM
configurer.schedule(
ScheduleBuilder.create("daily-etl-schedule")
.setDescription("Daily ETL processing at 2 AM")
.triggerByTime("0 2 * * *") // Cron: 2 AM daily
.setMaxConcurrentRuns(1)
.setProperty("processing.mode", "batch")
.setProperty("data.retention.days", "90")
.build()
.programName("DailyETLMapReduce")
);
// Frequency-based scheduling - Hourly aggregation
configurer.schedule(
ScheduleBuilder.create("hourly-aggregation-schedule")
.setDescription("Hourly real-time data aggregation")
.triggerByFrequency(Duration.ofHours(1))
.setMaxConcurrentRuns(2)
.setProperty("aggregation.window", "1h")
.build()
.programName("RealTimeAggregationSpark")
);
// Data availability scheduling - Process when new data arrives
configurer.schedule(
ScheduleBuilder.create("data-driven-schedule")
.setDescription("Process workflow when new partitions are available")
.triggerOnDataAvailable("incoming_data", 3) // Wait for 3 new partitions
.setMaxConcurrentRuns(1)
.setProperty("processing.trigger", "data-availability")
.build()
.programName("DataProcessingWorkflow")
);
// Program dependency scheduling - Start validation after ETL completes
configurer.schedule(
ScheduleBuilder.create("post-etl-validation-schedule")
.setDescription("Run validation after ETL completion")
.triggerOnProgramStatus(
ProgramType.MAPREDUCE,
"ScheduledDataPipeline",
"DailyETLMapReduce",
Set.of(ProgramStatus.COMPLETED)
)
.setMaxConcurrentRuns(1)
.setProperty("validation.mode", "post-processing")
.build()
.programName("DataValidationWorker")
);
// Complex composite scheduling - Multiple conditions
configurer.schedule(
ScheduleBuilder.create("complex-schedule")
.setDescription("Complex trigger combining time and data availability")
.triggerByComposite(
TriggerFactory.and(
TriggerFactory.byTime("0 */6 * * *"), // Every 6 hours
TriggerFactory.onDataAvailable("quality_metrics") // And when quality data available
)
)
.setMaxConcurrentRuns(1)
.setProperty("processing.type", "quality-analysis")
.build()
.programName("DataProcessingWorkflow")
);
}
}
// Workflow that responds to schedule context
public class DataProcessingWorkflow extends AbstractWorkflow {
@Override
public void configure(WorkflowConfigurer configurer) {
configurer.setName("DataProcessingWorkflow");
configurer.setDescription("Processes data based on schedule triggers");
// Add conditional processing based on schedule properties
configurer.addAction(new ScheduleAwareAction());
configurer.condition(new ScheduleBasedCondition())
.addMapReduce("BatchDataProcessor")
.addSpark("AdvancedAnalyticsSpark")
.otherwise()
.addAction(new LightweightProcessingAction())
.end();
configurer.addAction(new CompletionNotificationAction());
}
// Custom action that adapts behavior based on schedule
public static class ScheduleAwareAction extends AbstractCustomAction {
@Override
public void configure(CustomActionConfigurer configurer) {
configurer.setName("ScheduleAwareAction");
configurer.setDescription("Adapts processing based on triggering schedule");
}
@Override
public void run(CustomActionContext context) throws Exception {
WorkflowToken token = context.getWorkflowToken();
// Get schedule information
TriggeringScheduleInfo scheduleInfo = context.getTriggeringScheduleInfo();
if (scheduleInfo != null) {
String scheduleName = scheduleInfo.getName();
token.put("schedule.name", scheduleName);
token.put("schedule.description", scheduleInfo.getDescription());
// Adapt processing based on schedule properties
Map<String, String> scheduleProperties = scheduleInfo.getProperties();
String processingMode = scheduleProperties.get("processing.mode");
if ("batch".equals(processingMode)) {
// Configure for batch processing
token.put("processing.batch_size", "10000");
token.put("processing.parallelism", "high");
token.put("processing.memory_limit", "8GB");
} else {
// Configure for real-time processing
token.put("processing.batch_size", "1000");
token.put("processing.parallelism", "medium");
token.put("processing.memory_limit", "2GB");
}
// Handle different trigger types
List<TriggerInfo> triggers = scheduleInfo.getTriggerInfos();
for (TriggerInfo trigger : triggers) {
processTriggerInfo(trigger, token, context);
}
context.getMetrics().increment("schedule.triggered_runs");
Metrics scheduleMetrics = context.getMetrics().child(
Map.of("schedule_name", scheduleName)
);
scheduleMetrics.increment("executions");
} else {
// Manual execution
token.put("execution.type", "manual");
token.put("processing.batch_size", "5000");
context.getMetrics().increment("schedule.manual_runs");
}
}
private void processTriggerInfo(TriggerInfo trigger, WorkflowToken token,
CustomActionContext context) {
token.put("trigger.type", trigger.getType().name());
token.put("trigger.name", trigger.getName());
switch (trigger.getType()) {
case TIME:
// Time-based trigger processing
token.put("trigger.execution_time", String.valueOf(System.currentTimeMillis()));
break;
case PARTITION:
// Data partition trigger processing
if (trigger instanceof PartitionTriggerInfo) {
PartitionTriggerInfo partitionTrigger = (PartitionTriggerInfo) trigger;
token.put("trigger.dataset", partitionTrigger.getDatasetName());
token.put("trigger.partitions", String.valueOf(partitionTrigger.getNumPartitions()));
}
break;
case PROGRAM_STATUS:
// Program status trigger processing
if (trigger instanceof ProgramStatusTriggerInfo) {
ProgramStatusTriggerInfo statusTrigger = (ProgramStatusTriggerInfo) trigger;
token.put("trigger.program", statusTrigger.getProgramName());
token.put("trigger.application", statusTrigger.getApplicationName());
token.put("trigger.program_type", statusTrigger.getProgramType().name());
}
break;
default:
LOG.warn("Unknown trigger type: {}", trigger.getType());
}
}
}
// Condition that makes decisions based on schedule context
public static class ScheduleBasedCondition implements Predicate<WorkflowContext> {
@Override
public boolean apply(WorkflowContext context) {
WorkflowToken token = context.getToken();
// Decision based on schedule properties
String processingMode = token.get("processing.mode").toString();
String triggerType = token.get("trigger.type").toString();
// Use heavy processing for batch schedules or time-based triggers
return "batch".equals(processingMode) || "TIME".equals(triggerType);
}
}
}CDAP provides robust retry mechanisms and error handling patterns for building resilient applications.
import io.cdap.cdap.api.retry.*;
// Retryable exception marker
public class RetryableException extends Exception {
public RetryableException(String message) { super(message); }
public RetryableException(String message, Throwable cause) { super(message, cause); }
}
// Exception when retries are exhausted
public class RetriesExhaustedException extends Exception {
private final int attemptsMade;
private final Exception lastException;
public RetriesExhaustedException(int attemptsMade, Exception lastException) {
super(String.format("Retries exhausted after %d attempts", attemptsMade), lastException);
this.attemptsMade = attemptsMade;
this.lastException = lastException;
}
public int getAttemptsMade() { return attemptsMade; }
public Exception getLastException() { return lastException; }
}
// Idempotency levels for retry safety
public enum Idempotency {
IDEMPOTENT, // Safe to retry without side effects
NOT_IDEMPOTENT, // Retries may cause duplicate side effects
UNKNOWN // Idempotency is unknown
}// Service with comprehensive retry logic
public class ResilientDataService extends AbstractHttpServiceHandler {
private static final int MAX_RETRIES = 3;
private static final long INITIAL_DELAY_MS = 1000;
private static final double BACKOFF_MULTIPLIER = 2.0;
@POST
@Path("/process")
public void processWithRetries(HttpServiceRequest request, HttpServiceResponder responder) {
Metrics metrics = getContext().getMetrics();
try {
String content = Charset.forName("UTF-8").decode(
ByteBuffer.wrap(request.getContent())).toString();
ProcessingResult result = executeWithRetries(
() -> processData(content),
MAX_RETRIES,
INITIAL_DELAY_MS,
metrics
);
responder.sendJson(200, result.toJson());
} catch (RetriesExhaustedException e) {
LOG.error("Processing failed after {} retries", e.getAttemptsMade(), e);
metrics.increment("processing.retries_exhausted");
responder.sendError(500, "Processing failed after retries: " + e.getLastException().getMessage());
} catch (Exception e) {
LOG.error("Processing failed with non-retryable error", e);
metrics.increment("processing.non_retryable_errors");
responder.sendError(500, "Processing failed: " + e.getMessage());
}
}
private <T> T executeWithRetries(RetryableOperation<T> operation, int maxRetries,
long initialDelayMs, Metrics metrics)
throws RetriesExhaustedException {
Exception lastException = null;
long delay = initialDelayMs;
for (int attempt = 1; attempt <= maxRetries; attempt++) {
try {
T result = operation.execute();
if (attempt > 1) {
metrics.increment("retries.successful");
metrics.gauge("retries.attempts_before_success", attempt);
}
return result;
} catch (RetryableException e) {
lastException = e;
metrics.increment("retries.retryable_failures");
LOG.warn("Retryable error on attempt {} of {}: {}", attempt, maxRetries, e.getMessage());
if (attempt < maxRetries) {
try {
Thread.sleep(delay);
delay = (long) (delay * BACKOFF_MULTIPLIER);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RetriesExhaustedException(attempt, e);
}
}
} catch (Exception e) {
// Non-retryable exception
metrics.increment("retries.non_retryable_failures");
throw e;
}
}
metrics.increment("retries.exhausted");
throw new RetriesExhaustedException(maxRetries, lastException);
}
private ProcessingResult processData(String data) throws RetryableException {
try {
// Simulate processing that might fail transiently
if (isTransientFailureCondition()) {
throw new RetryableException("Transient processing failure - external service unavailable");
}
// Perform actual processing
return performProcessing(data);
} catch (NetworkException e) {
// Network issues are typically retryable
throw new RetryableException("Network error during processing", e);
} catch (TemporaryResourceException e) {
// Temporary resource issues are retryable
throw new RetryableException("Temporary resource unavailable", e);
} catch (ValidationException e) {
// Validation errors are not retryable
throw e;
}
}
private boolean isTransientFailureCondition() {
// Simulate transient failure conditions
return Math.random() < 0.3; // 30% chance of transient failure
}
private ProcessingResult performProcessing(String data) throws ValidationException {
if (data == null || data.trim().isEmpty()) {
throw new ValidationException("Input data cannot be empty");
}
// Simulate processing
return new ProcessingResult("processed: " + data, 1, data.length());
}
@FunctionalInterface
private interface RetryableOperation<T> {
T execute() throws Exception;
}
}
// MapReduce with retry logic for external system integration
public class ResilientDataExtractionMapReduce extends AbstractMapReduce {
public static class ExternalDataMapper extends Mapper<byte[], Row, Text, Text> {
private ExternalDataClient externalClient;
private Metrics metrics;
private int maxRetries;
private long retryDelay;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
// Initialize external client with retry configuration
externalClient = new ExternalDataClient(conf.get("external.service.url"));
maxRetries = conf.getInt("retry.max_attempts", 3);
retryDelay = conf.getLong("retry.initial_delay_ms", 1000);
// Get metrics from context
MapReduceTaskContext<?, ?> taskContext = (MapReduceTaskContext<?, ?>) context;
metrics = taskContext.getMetrics();
}
@Override
protected void map(byte[] key, Row row, Context context)
throws IOException, InterruptedException {
String recordId = row.getString("id");
try {
// Attempt to enrich data with external service
String enrichedData = executeWithRetries(() -> {
return externalClient.fetchData(recordId);
});
// Output successful result
context.write(new Text(recordId), new Text(enrichedData));
metrics.increment("external_calls.success");
} catch (RetriesExhaustedException e) {
// Handle exhausted retries
LOG.error("Failed to fetch external data for record {} after {} retries",
recordId, e.getAttemptsMade(), e);
metrics.increment("external_calls.retries_exhausted");
// Write to error output or skip record
context.write(new Text("ERROR:" + recordId),
new Text("Failed after retries: " + e.getLastException().getMessage()));
} catch (Exception e) {
// Handle non-retryable errors
LOG.error("Non-retryable error for record {}", recordId, e);
metrics.increment("external_calls.non_retryable_errors");
context.write(new Text("ERROR:" + recordId),
new Text("Non-retryable error: " + e.getMessage()));
}
}
private String executeWithRetries(ExternalDataOperation operation)
throws RetriesExhaustedException {
Exception lastException = null;
long delay = retryDelay;
for (int attempt = 1; attempt <= maxRetries; attempt++) {
try {
String result = operation.execute();
if (attempt > 1) {
metrics.increment("external_calls.retry_success");
metrics.gauge("external_calls.attempts_before_success", attempt);
}
return result;
} catch (ExternalServiceException e) {
lastException = e;
if (e.isRetryable()) {
metrics.increment("external_calls.retryable_failures");
LOG.warn("Retryable external service error on attempt {} of {}: {}",
attempt, maxRetries, e.getMessage());
if (attempt < maxRetries) {
try {
Thread.sleep(delay);
delay *= 2; // Exponential backoff
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RetriesExhaustedException(attempt, e);
}
}
} else {
// Non-retryable external service error
throw e;
}
} catch (Exception e) {
// Other non-retryable exceptions
throw e;
}
}
throw new RetriesExhaustedException(maxRetries, lastException);
}
@FunctionalInterface
private interface ExternalDataOperation {
String execute() throws ExternalServiceException;
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
if (externalClient != null) {
externalClient.close();
}
}
}
}CDAP provides feature flag support for dynamic application behavior control.
import io.cdap.cdap.api.feature.*;
// Feature flags provider interface
public interface FeatureFlagsProvider {
boolean isFeatureEnabled(String featureName);
default boolean isFeatureEnabled(String featureName, boolean defaultValue) {
try {
return isFeatureEnabled(featureName);
} catch (Exception e) {
return defaultValue;
}
}
}// Service with feature-controlled behavior
public class FeatureControlledService extends AbstractHttpServiceHandler {
@GET
@Path("/data/{id}")
public void getData(HttpServiceRequest request, HttpServiceResponder responder,
@PathParam("id") String dataId) {
FeatureFlagsProvider featureFlags = getContext();
try {
// Core data retrieval
JsonObject data = retrieveBaseData(dataId);
// Feature-controlled enhancements
if (featureFlags.isFeatureEnabled("enhanced_analytics", false)) {
enhanceWithAnalytics(data, dataId);
}
if (featureFlags.isFeatureEnabled("real_time_recommendations", false)) {
addRealTimeRecommendations(data, dataId);
}
if (featureFlags.isFeatureEnabled("advanced_caching", true)) {
enableAdvancedCaching(data, dataId);
}
// Feature-controlled response format
if (featureFlags.isFeatureEnabled("response_compression", false)) {
sendCompressedResponse(responder, data);
} else {
responder.sendJson(200, data);
}
} catch (Exception e) {
responder.sendError(500, "Error retrieving data: " + e.getMessage());
}
}
private JsonObject retrieveBaseData(String dataId) {
// Core data retrieval logic
JsonObject data = new JsonObject();
data.addProperty("id", dataId);
data.addProperty("timestamp", System.currentTimeMillis());
return data;
}
private void enhanceWithAnalytics(JsonObject data, String dataId) {
// Enhanced analytics - controlled by feature flag
data.addProperty("analytics_score", calculateAnalyticsScore(dataId));
data.addProperty("trend_direction", getTrendDirection(dataId));
}
private void addRealTimeRecommendations(JsonObject data, String dataId) {
// Real-time recommendations - controlled by feature flag
JsonArray recommendations = generateRecommendations(dataId);
data.add("recommendations", recommendations);
}
private void enableAdvancedCaching(JsonObject data, String dataId) {
// Advanced caching logic - controlled by feature flag
data.addProperty("cached", true);
data.addProperty("cache_key", generateCacheKey(dataId));
}
}The Operational APIs in CDAP provide comprehensive monitoring, scheduling, retry handling, and feature management capabilities essential for running enterprise-grade data applications in production environments.
Install with Tessl CLI
npx tessl i tessl/maven-io-cdap-cdap--cdap