CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-cdap-cdap--cdap

The Cask Data Application Platform (CDAP) is an integrated, open source application development platform for the Hadoop ecosystem that provides developers with data and application abstractions to simplify and accelerate application development.

Pending
Overview
Eval results
Files

operational.mddocs/

Operational APIs

CDAP provides comprehensive operational APIs for monitoring, scheduling, retry handling, and feature management. These APIs enable production-ready applications with enterprise-grade operational capabilities including metrics collection, automated scheduling, resilient error handling, and dynamic feature control.

Metrics Collection

The metrics system provides real-time monitoring and observability for all application components.

Metrics Interface

import io.cdap.cdap.api.metrics.*;
import java.util.Map;

// Core metrics interface
public interface Metrics {
    
    // Counter metrics - track counts and rates
    void count(String metricName, int delta);
    
    default void countLong(String metricName, long delta) {
        if (delta < Integer.MIN_VALUE || delta > Integer.MAX_VALUE) {
            throw new IllegalArgumentException("Invalid delta value for metrics count: " + delta);
        }
        count(metricName, (int) delta);
    }
    
    // Gauge metrics - track current values
    void gauge(String metricName, long value);
    
    // Tagged metrics - add context dimensions
    Metrics child(Map<String, String> tags);
    
    // Convenience methods
    default void increment(String metricName) {
        count(metricName, 1);
    }
    
    default void increment(String metricName, int delta) {
        count(metricName, delta);
    }
    
    default void decrement(String metricName) {
        count(metricName, -1);
    }
    
    default void decrement(String metricName, int delta) {
        count(metricName, -delta);
    }
}

// Runtime metrics for program execution monitoring
public interface RuntimeMetrics {
    Map<String, Long> getAllCounters();
    Map<String, Long> getAllGauges();
    long getCounter(String name);
    long getGauge(String name);
}

Metrics Usage Examples

// Service with comprehensive metrics
public class DataProcessingService extends AbstractHttpServiceHandler {
    
    private Metrics metrics;
    
    @Override
    public void initialize(HttpServiceContext context) throws Exception {
        super.initialize(context);
        this.metrics = context.getMetrics();
    }
    
    @GET
    @Path("/process/{id}")
    public void processData(HttpServiceRequest request, HttpServiceResponder responder,
                           @PathParam("id") String dataId) {
        
        // Track request metrics
        metrics.increment("requests.total");
        metrics.increment("requests.process_data");
        
        long startTime = System.currentTimeMillis();
        
        try {
            // Process data with detailed metrics
            ProcessingResult result = processDataWithMetrics(dataId);
            
            // Success metrics
            metrics.increment("requests.success");
            metrics.gauge("processing.last_success_time", System.currentTimeMillis());
            metrics.gauge("processing.records_processed", result.getRecordCount());
            metrics.gauge("processing.data_size_mb", result.getDataSizeMB());
            
            responder.sendJson(200, result.toJson());
            
        } catch (ValidationException e) {
            // Validation error metrics
            metrics.increment("errors.validation");
            Metrics errorMetrics = metrics.child(Map.of("error_type", "validation"));
            errorMetrics.increment("error_count");
            
            responder.sendError(400, "Validation failed: " + e.getMessage());
            
        } catch (ProcessingException e) {
            // Processing error metrics  
            metrics.increment("errors.processing");
            Metrics errorMetrics = metrics.child(Map.of("error_type", "processing"));
            errorMetrics.increment("error_count");
            
            responder.sendError(500, "Processing failed: " + e.getMessage());
            
        } catch (Exception e) {
            // General error metrics
            metrics.increment("errors.unknown");
            Metrics errorMetrics = metrics.child(Map.of("error_type", "unknown"));
            errorMetrics.increment("error_count");
            
            responder.sendError(500, "Internal error: " + e.getMessage());
            
        } finally {
            // Response time metrics
            long duration = System.currentTimeMillis() - startTime;
            metrics.gauge("processing.response_time_ms", duration);
            
            // Update running averages
            updateResponseTimeAverage(duration);
        }
    }
    
    private ProcessingResult processDataWithMetrics(String dataId) throws Exception {
        Table dataTable = getContext().getDataset("input_data");
        Table resultsTable = getContext().getDataset("processed_results");
        
        // Data access metrics
        metrics.increment("dataset.reads");
        long readStart = System.currentTimeMillis();
        
        Row inputRow = dataTable.get(Bytes.toBytes(dataId));
        if (inputRow.isEmpty()) {
            metrics.increment("dataset.read_misses");
            throw new ValidationException("Data not found: " + dataId);
        }
        
        metrics.gauge("dataset.read_latency_ms", System.currentTimeMillis() - readStart);
        
        // Processing metrics
        metrics.increment("processing.operations");
        long processStart = System.currentTimeMillis();
        
        ProcessingResult result = performDataTransformation(inputRow);
        
        metrics.gauge("processing.operation_latency_ms", System.currentTimeMillis() - processStart);
        
        // Data write metrics
        metrics.increment("dataset.writes");
        long writeStart = System.currentTimeMillis();
        
        Put outputPut = new Put(Bytes.toBytes(dataId + "_processed"));
        outputPut.add("result", "data", result.getData());
        outputPut.add("result", "timestamp", System.currentTimeMillis());
        outputPut.add("result", "record_count", result.getRecordCount());
        
        resultsTable.put(outputPut);
        
        metrics.gauge("dataset.write_latency_ms", System.currentTimeMillis() - writeStart);
        
        return result;
    }
    
    private ProcessingResult performDataTransformation(Row inputRow) {
        String inputData = inputRow.getString("data");
        
        // Business logic metrics
        metrics.increment("business_logic.transformations");
        
        // Simulate complex processing with metrics
        int recordCount = 0;
        long dataSize = 0;
        
        if (inputData != null) {
            String[] records = inputData.split("\n");
            recordCount = records.length;
            dataSize = inputData.length();
            
            // Track processing quality metrics
            int validRecords = 0;
            for (String record : records) {
                if (isValidRecord(record)) {
                    validRecords++;
                }
            }
            
            metrics.gauge("data_quality.valid_record_ratio", 
                (double) validRecords / recordCount);
            metrics.gauge("data_quality.invalid_records", recordCount - validRecords);
        }
        
        return new ProcessingResult(inputData, recordCount, dataSize);
    }
    
    private void updateResponseTimeAverage(long duration) {
        // Calculate and emit rolling averages
        // This could use a sliding window or exponential moving average
        metrics.gauge("processing.avg_response_time_ms", calculateAverage(duration));
    }
    
    private boolean isValidRecord(String record) {
        return record != null && !record.trim().isEmpty() && record.contains(",");
    }
    
    private long calculateAverage(long newValue) {
        // Implementation for calculating rolling average
        return newValue; // Simplified
    }
}

// Worker with operational metrics
public class DataMonitoringWorker extends AbstractWorker {
    
    private volatile boolean running = false;
    private Metrics metrics;
    
    @Override
    public void configure(WorkerConfigurer configurer) {
        configurer.setName("DataMonitoringWorker");
        configurer.setDescription("Monitors data pipeline health and performance");
    }
    
    @Override
    public void initialize(WorkerContext context) throws Exception {
        super.initialize(context);
        this.metrics = context.getMetrics();
        this.running = true;
    }
    
    @Override
    public void run() throws Exception {
        while (running) {
            try {
                // System health metrics
                collectSystemHealthMetrics();
                
                // Data pipeline metrics
                collectDataPipelineMetrics();
                
                // Application-specific metrics
                collectApplicationMetrics();
                
                // Sleep between collection cycles
                Thread.sleep(30000); // 30 seconds
                
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            } catch (Exception e) {
                LOG.error("Error collecting metrics", e);
                metrics.increment("metrics_collection.errors");
                
                // Continue operation even if metrics collection fails
                Thread.sleep(10000);
            }
        }
    }
    
    @Override
    public void stop() {
        running = false;
    }
    
    private void collectSystemHealthMetrics() {
        WorkerContext context = getContext();
        
        // JVM metrics
        Runtime runtime = Runtime.getRuntime();
        long totalMemory = runtime.totalMemory();
        long freeMemory = runtime.freeMemory();
        long usedMemory = totalMemory - freeMemory;
        
        metrics.gauge("system.memory.total_mb", totalMemory / (1024 * 1024));
        metrics.gauge("system.memory.used_mb", usedMemory / (1024 * 1024));
        metrics.gauge("system.memory.free_mb", freeMemory / (1024 * 1024));
        metrics.gauge("system.memory.usage_percent", (usedMemory * 100) / totalMemory);
        
        // Thread metrics
        metrics.gauge("system.threads.active", Thread.activeCount());
        
        // Garbage collection metrics (if available)
        collectGCMetrics();
    }
    
    private void collectDataPipelineMetrics() throws Exception {
        WorkerContext context = getContext();
        
        // Check dataset sizes and health
        checkDatasetMetrics("input_data", "input");
        checkDatasetMetrics("processed_results", "output");
        checkDatasetMetrics("error_records", "errors");
        
        // Check processing lag
        checkProcessingLag();
        
        // Check data quality trends
        checkDataQualityMetrics();
    }
    
    private void checkDatasetMetrics(String datasetName, String type) {
        try {
            Table dataset = getContext().getDataset(datasetName);
            
            // Count records (simplified - in practice might sample)
            long recordCount = countTableRecords(dataset);
            metrics.gauge(String.format("dataset.%s.record_count", type), recordCount);
            
            // Check recent activity
            long recentRecords = countRecentRecords(dataset, System.currentTimeMillis() - 3600000); // Last hour
            metrics.gauge(String.format("dataset.%s.recent_records", type), recentRecords);
            
            metrics.increment(String.format("dataset.%s.health_checks", type));
            
        } catch (Exception e) {
            LOG.error("Failed to check metrics for dataset: {}", datasetName, e);
            metrics.increment(String.format("dataset.%s.health_check_errors", type));
        }
    }
    
    private void checkProcessingLag() {
        // Calculate processing lag between input and output
        // This is a simplified example
        long inputCount = getDatasetRecordCount("input_data");
        long outputCount = getDatasetRecordCount("processed_results");
        long processingLag = inputCount - outputCount;
        
        metrics.gauge("pipeline.processing_lag", Math.max(0, processingLag));
        
        if (processingLag > 1000) {
            metrics.increment("pipeline.high_lag_alerts");
        }
    }
    
    private void checkDataQualityMetrics() {
        // Check error rates and data quality trends
        long errorCount = getDatasetRecordCount("error_records");
        long totalProcessed = getDatasetRecordCount("processed_results");
        
        if (totalProcessed > 0) {
            double errorRate = (double) errorCount / (totalProcessed + errorCount);
            metrics.gauge("data_quality.error_rate", (long) (errorRate * 10000)); // Store as basis points
            
            if (errorRate > 0.05) { // 5% error rate threshold
                metrics.increment("data_quality.high_error_rate_alerts");
            }
        }
    }
    
    private void collectApplicationMetrics() {
        WorkerContext context = getContext();
        
        // Application runtime metrics
        long uptime = System.currentTimeMillis() - context.getLogicalStartTime();
        metrics.gauge("application.uptime_minutes", uptime / (60 * 1000));
        
        // Instance metrics
        metrics.gauge("application.worker_instance_id", context.getInstanceId());
        metrics.gauge("application.worker_instance_count", context.getInstanceCount());
        
        // Runtime arguments metrics (for configuration tracking)
        Map<String, String> runtimeArgs = context.getRuntimeArguments();
        metrics.gauge("application.runtime_args_count", runtimeArgs.size());
    }
    
    private void collectGCMetrics() {
        // Implementation for garbage collection metrics
        // Would use JMX beans to get GC statistics
    }
    
    private long countTableRecords(Table table) {
        // Simplified record counting - in practice might use sampling
        long count = 0;
        try (Scanner scanner = table.scan(null, null)) {
            while (scanner.next() != null) {
                count++;
                if (count % 10000 == 0) {
                    // Emit progress for large datasets
                    metrics.gauge("metrics_collection.scan_progress", count);
                }
            }
        } catch (Exception e) {
            LOG.warn("Error counting table records", e);
        }
        return count;
    }
    
    private long countRecentRecords(Table table, long sinceTimestamp) {
        // Count records created since timestamp
        long count = 0;
        try (Scanner scanner = table.scan(null, null)) {
            Row row;
            while ((row = scanner.next()) != null) {
                Long timestamp = row.getLong("timestamp");
                if (timestamp != null && timestamp > sinceTimestamp) {
                    count++;
                }
            }
        } catch (Exception e) {
            LOG.warn("Error counting recent records", e);
        }
        return count;
    }
    
    private long getDatasetRecordCount(String datasetName) {
        try {
            Table dataset = getContext().getDataset(datasetName);
            return countTableRecords(dataset);
        } catch (Exception e) {
            LOG.error("Failed to get record count for dataset: {}", datasetName, e);
            return 0;
        }
    }
}

Scheduling System

CDAP provides a flexible scheduling system for automating program execution based on time, data availability, or other program completion events.

Scheduling Interfaces

import io.cdap.cdap.api.schedule.*;

// Schedule builder for creating schedules
public interface ScheduleBuilder {
    
    // Set schedule name and description
    ScheduleBuilder setName(String name);
    ScheduleBuilder setDescription(String description);
    
    // Configure schedule properties
    ScheduleBuilder setProperties(Map<String, String> properties);
    ScheduleBuilder setProperty(String key, String value);
    
    // Set schedule constraints
    ScheduleBuilder setMaxConcurrentRuns(int maxConcurrentRuns);
    
    // Build the schedule
    Schedule build();
}

// Trigger factory for creating different trigger types
public final class TriggerFactory {
    
    // Time-based triggers
    public static Trigger byTime(String cronExpression) { /* create cron trigger */ }
    public static Trigger byFrequency(Duration duration) { /* create frequency trigger */ }
    
    // Data-based triggers  
    public static Trigger onDataAvailable(String datasetName) { /* create data trigger */ }
    public static Trigger onPartitionAvailable(String datasetName, int numPartitions) { /* create partition trigger */ }
    
    // Program status triggers
    public static Trigger onProgramStatus(ProgramType programType, String applicationName, 
                                        String programName, Set<ProgramStatus> programStatuses) { 
        /* create program status trigger */ 
    }
    
    // Composite triggers
    public static Trigger and(Trigger... triggers) { /* create AND trigger */ }
    public static Trigger or(Trigger... triggers) { /* create OR trigger */ }
}

// Trigger information interfaces
public interface TriggerInfo {
    String getName();
    String getDescription(); 
    TriggerType getType();
    Map<String, String> getProperties();
}

public interface ProgramStatusTriggerInfo extends TriggerInfo {
    String getApplicationName();
    String getProgramName();
    ProgramType getProgramType();
    Set<ProgramStatus> getProgramStatuses();
}

public interface PartitionTriggerInfo extends TriggerInfo {
    String getDatasetName();
    int getNumPartitions();
}

// Triggering schedule information
public interface TriggeringScheduleInfo {
    String getName();
    String getDescription();
    List<TriggerInfo> getTriggerInfos();
    Map<String, String> getProperties();
}

Scheduling Examples

// Application with comprehensive scheduling
public class ScheduledDataPipelineApp extends AbstractApplication {
    
    @Override
    public void configure(ApplicationConfigurer configurer, ApplicationContext context) {
        configurer.setName("ScheduledDataPipeline");
        configurer.setDescription("Data pipeline with various scheduling patterns");
        
        // Add programs
        configurer.addMapReduce(new DailyETLMapReduce());
        configurer.addSpark(new RealTimeAggregationSpark());
        configurer.addWorkflow(new DataProcessingWorkflow());
        configurer.addWorker(new DataValidationWorker());
        
        // Time-based scheduling - Daily ETL at 2 AM
        configurer.schedule(
            ScheduleBuilder.create("daily-etl-schedule")
                .setDescription("Daily ETL processing at 2 AM")
                .triggerByTime("0 2 * * *") // Cron: 2 AM daily
                .setMaxConcurrentRuns(1)
                .setProperty("processing.mode", "batch")
                .setProperty("data.retention.days", "90")
                .build()
                .programName("DailyETLMapReduce")
        );
        
        // Frequency-based scheduling - Hourly aggregation
        configurer.schedule(
            ScheduleBuilder.create("hourly-aggregation-schedule")
                .setDescription("Hourly real-time data aggregation")
                .triggerByFrequency(Duration.ofHours(1))
                .setMaxConcurrentRuns(2)
                .setProperty("aggregation.window", "1h")
                .build()
                .programName("RealTimeAggregationSpark")
        );
        
        // Data availability scheduling - Process when new data arrives
        configurer.schedule(
            ScheduleBuilder.create("data-driven-schedule")
                .setDescription("Process workflow when new partitions are available")
                .triggerOnDataAvailable("incoming_data", 3) // Wait for 3 new partitions
                .setMaxConcurrentRuns(1)
                .setProperty("processing.trigger", "data-availability")
                .build()
                .programName("DataProcessingWorkflow")
        );
        
        // Program dependency scheduling - Start validation after ETL completes
        configurer.schedule(
            ScheduleBuilder.create("post-etl-validation-schedule")
                .setDescription("Run validation after ETL completion")
                .triggerOnProgramStatus(
                    ProgramType.MAPREDUCE,
                    "ScheduledDataPipeline",
                    "DailyETLMapReduce", 
                    Set.of(ProgramStatus.COMPLETED)
                )
                .setMaxConcurrentRuns(1)
                .setProperty("validation.mode", "post-processing")
                .build()
                .programName("DataValidationWorker")
        );
        
        // Complex composite scheduling - Multiple conditions
        configurer.schedule(
            ScheduleBuilder.create("complex-schedule")
                .setDescription("Complex trigger combining time and data availability")
                .triggerByComposite(
                    TriggerFactory.and(
                        TriggerFactory.byTime("0 */6 * * *"), // Every 6 hours
                        TriggerFactory.onDataAvailable("quality_metrics") // And when quality data available
                    )
                )
                .setMaxConcurrentRuns(1)
                .setProperty("processing.type", "quality-analysis")
                .build()
                .programName("DataProcessingWorkflow")
        );
    }
}

// Workflow that responds to schedule context
public class DataProcessingWorkflow extends AbstractWorkflow {
    
    @Override
    public void configure(WorkflowConfigurer configurer) {
        configurer.setName("DataProcessingWorkflow");
        configurer.setDescription("Processes data based on schedule triggers");
        
        // Add conditional processing based on schedule properties
        configurer.addAction(new ScheduleAwareAction());
        
        configurer.condition(new ScheduleBasedCondition())
            .addMapReduce("BatchDataProcessor")
            .addSpark("AdvancedAnalyticsSpark")
        .otherwise()
            .addAction(new LightweightProcessingAction())
        .end();
        
        configurer.addAction(new CompletionNotificationAction());
    }
    
    // Custom action that adapts behavior based on schedule
    public static class ScheduleAwareAction extends AbstractCustomAction {
        
        @Override
        public void configure(CustomActionConfigurer configurer) {
            configurer.setName("ScheduleAwareAction");
            configurer.setDescription("Adapts processing based on triggering schedule");
        }
        
        @Override
        public void run(CustomActionContext context) throws Exception {
            WorkflowToken token = context.getWorkflowToken();
            
            // Get schedule information
            TriggeringScheduleInfo scheduleInfo = context.getTriggeringScheduleInfo();
            
            if (scheduleInfo != null) {
                String scheduleName = scheduleInfo.getName();
                token.put("schedule.name", scheduleName);
                token.put("schedule.description", scheduleInfo.getDescription());
                
                // Adapt processing based on schedule properties
                Map<String, String> scheduleProperties = scheduleInfo.getProperties();
                String processingMode = scheduleProperties.get("processing.mode");
                
                if ("batch".equals(processingMode)) {
                    // Configure for batch processing
                    token.put("processing.batch_size", "10000");
                    token.put("processing.parallelism", "high");
                    token.put("processing.memory_limit", "8GB");
                } else {
                    // Configure for real-time processing  
                    token.put("processing.batch_size", "1000");
                    token.put("processing.parallelism", "medium");
                    token.put("processing.memory_limit", "2GB");
                }
                
                // Handle different trigger types
                List<TriggerInfo> triggers = scheduleInfo.getTriggerInfos();
                for (TriggerInfo trigger : triggers) {
                    processTriggerInfo(trigger, token, context);
                }
                
                context.getMetrics().increment("schedule.triggered_runs");
                Metrics scheduleMetrics = context.getMetrics().child(
                    Map.of("schedule_name", scheduleName)
                );
                scheduleMetrics.increment("executions");
                
            } else {
                // Manual execution
                token.put("execution.type", "manual");
                token.put("processing.batch_size", "5000");
                context.getMetrics().increment("schedule.manual_runs");
            }
        }
        
        private void processTriggerInfo(TriggerInfo trigger, WorkflowToken token, 
                                      CustomActionContext context) {
            
            token.put("trigger.type", trigger.getType().name());
            token.put("trigger.name", trigger.getName());
            
            switch (trigger.getType()) {
                case TIME:
                    // Time-based trigger processing
                    token.put("trigger.execution_time", String.valueOf(System.currentTimeMillis()));
                    break;
                    
                case PARTITION:
                    // Data partition trigger processing
                    if (trigger instanceof PartitionTriggerInfo) {
                        PartitionTriggerInfo partitionTrigger = (PartitionTriggerInfo) trigger;
                        token.put("trigger.dataset", partitionTrigger.getDatasetName());
                        token.put("trigger.partitions", String.valueOf(partitionTrigger.getNumPartitions()));
                    }
                    break;
                    
                case PROGRAM_STATUS:
                    // Program status trigger processing
                    if (trigger instanceof ProgramStatusTriggerInfo) {
                        ProgramStatusTriggerInfo statusTrigger = (ProgramStatusTriggerInfo) trigger;
                        token.put("trigger.program", statusTrigger.getProgramName());
                        token.put("trigger.application", statusTrigger.getApplicationName());
                        token.put("trigger.program_type", statusTrigger.getProgramType().name());
                    }
                    break;
                    
                default:
                    LOG.warn("Unknown trigger type: {}", trigger.getType());
            }
        }
    }
    
    // Condition that makes decisions based on schedule context
    public static class ScheduleBasedCondition implements Predicate<WorkflowContext> {
        
        @Override
        public boolean apply(WorkflowContext context) {
            WorkflowToken token = context.getToken();
            
            // Decision based on schedule properties
            String processingMode = token.get("processing.mode").toString();
            String triggerType = token.get("trigger.type").toString();
            
            // Use heavy processing for batch schedules or time-based triggers
            return "batch".equals(processingMode) || "TIME".equals(triggerType);
        }
    }
}

Retry Policies and Error Handling

CDAP provides robust retry mechanisms and error handling patterns for building resilient applications.

Retry Framework

import io.cdap.cdap.api.retry.*;

// Retryable exception marker
public class RetryableException extends Exception {
    public RetryableException(String message) { super(message); }
    public RetryableException(String message, Throwable cause) { super(message, cause); }
}

// Exception when retries are exhausted
public class RetriesExhaustedException extends Exception {
    private final int attemptsMade;
    private final Exception lastException;
    
    public RetriesExhaustedException(int attemptsMade, Exception lastException) {
        super(String.format("Retries exhausted after %d attempts", attemptsMade), lastException);
        this.attemptsMade = attemptsMade;
        this.lastException = lastException;
    }
    
    public int getAttemptsMade() { return attemptsMade; }
    public Exception getLastException() { return lastException; }
}

// Idempotency levels for retry safety
public enum Idempotency {
    IDEMPOTENT,      // Safe to retry without side effects
    NOT_IDEMPOTENT,  // Retries may cause duplicate side effects
    UNKNOWN          // Idempotency is unknown
}

Retry Implementation Examples

// Service with comprehensive retry logic
public class ResilientDataService extends AbstractHttpServiceHandler {
    
    private static final int MAX_RETRIES = 3;
    private static final long INITIAL_DELAY_MS = 1000;
    private static final double BACKOFF_MULTIPLIER = 2.0;
    
    @POST
    @Path("/process")
    public void processWithRetries(HttpServiceRequest request, HttpServiceResponder responder) {
        Metrics metrics = getContext().getMetrics();
        
        try {
            String content = Charset.forName("UTF-8").decode(
                ByteBuffer.wrap(request.getContent())).toString();
            
            ProcessingResult result = executeWithRetries(
                () -> processData(content),
                MAX_RETRIES,
                INITIAL_DELAY_MS,
                metrics
            );
            
            responder.sendJson(200, result.toJson());
            
        } catch (RetriesExhaustedException e) {
            LOG.error("Processing failed after {} retries", e.getAttemptsMade(), e);
            metrics.increment("processing.retries_exhausted");
            responder.sendError(500, "Processing failed after retries: " + e.getLastException().getMessage());
            
        } catch (Exception e) {
            LOG.error("Processing failed with non-retryable error", e);
            metrics.increment("processing.non_retryable_errors");
            responder.sendError(500, "Processing failed: " + e.getMessage());
        }
    }
    
    private <T> T executeWithRetries(RetryableOperation<T> operation, int maxRetries, 
                                   long initialDelayMs, Metrics metrics) 
                                   throws RetriesExhaustedException {
        
        Exception lastException = null;
        long delay = initialDelayMs;
        
        for (int attempt = 1; attempt <= maxRetries; attempt++) {
            try {
                T result = operation.execute();
                
                if (attempt > 1) {
                    metrics.increment("retries.successful");
                    metrics.gauge("retries.attempts_before_success", attempt);
                }
                
                return result;
                
            } catch (RetryableException e) {
                lastException = e;
                metrics.increment("retries.retryable_failures");
                
                LOG.warn("Retryable error on attempt {} of {}: {}", attempt, maxRetries, e.getMessage());
                
                if (attempt < maxRetries) {
                    try {
                        Thread.sleep(delay);
                        delay = (long) (delay * BACKOFF_MULTIPLIER);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        throw new RetriesExhaustedException(attempt, e);
                    }
                }
                
            } catch (Exception e) {
                // Non-retryable exception
                metrics.increment("retries.non_retryable_failures");
                throw e;
            }
        }
        
        metrics.increment("retries.exhausted");
        throw new RetriesExhaustedException(maxRetries, lastException);
    }
    
    private ProcessingResult processData(String data) throws RetryableException {
        try {
            // Simulate processing that might fail transiently
            if (isTransientFailureCondition()) {
                throw new RetryableException("Transient processing failure - external service unavailable");
            }
            
            // Perform actual processing
            return performProcessing(data);
            
        } catch (NetworkException e) {
            // Network issues are typically retryable
            throw new RetryableException("Network error during processing", e);
            
        } catch (TemporaryResourceException e) {
            // Temporary resource issues are retryable
            throw new RetryableException("Temporary resource unavailable", e);
            
        } catch (ValidationException e) {
            // Validation errors are not retryable
            throw e;
        }
    }
    
    private boolean isTransientFailureCondition() {
        // Simulate transient failure conditions
        return Math.random() < 0.3; // 30% chance of transient failure
    }
    
    private ProcessingResult performProcessing(String data) throws ValidationException {
        if (data == null || data.trim().isEmpty()) {
            throw new ValidationException("Input data cannot be empty");
        }
        
        // Simulate processing
        return new ProcessingResult("processed: " + data, 1, data.length());
    }
    
    @FunctionalInterface
    private interface RetryableOperation<T> {
        T execute() throws Exception;
    }
}

// MapReduce with retry logic for external system integration
public class ResilientDataExtractionMapReduce extends AbstractMapReduce {
    
    public static class ExternalDataMapper extends Mapper<byte[], Row, Text, Text> {
        
        private ExternalDataClient externalClient;
        private Metrics metrics;
        private int maxRetries;
        private long retryDelay;
        
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            
            // Initialize external client with retry configuration
            externalClient = new ExternalDataClient(conf.get("external.service.url"));
            maxRetries = conf.getInt("retry.max_attempts", 3);
            retryDelay = conf.getLong("retry.initial_delay_ms", 1000);
            
            // Get metrics from context
            MapReduceTaskContext<?, ?> taskContext = (MapReduceTaskContext<?, ?>) context;
            metrics = taskContext.getMetrics();
        }
        
        @Override
        protected void map(byte[] key, Row row, Context context) 
            throws IOException, InterruptedException {
            
            String recordId = row.getString("id");
            
            try {
                // Attempt to enrich data with external service
                String enrichedData = executeWithRetries(() -> {
                    return externalClient.fetchData(recordId);
                });
                
                // Output successful result
                context.write(new Text(recordId), new Text(enrichedData));
                metrics.increment("external_calls.success");
                
            } catch (RetriesExhaustedException e) {
                // Handle exhausted retries
                LOG.error("Failed to fetch external data for record {} after {} retries", 
                    recordId, e.getAttemptsMade(), e);
                
                metrics.increment("external_calls.retries_exhausted");
                
                // Write to error output or skip record
                context.write(new Text("ERROR:" + recordId), 
                    new Text("Failed after retries: " + e.getLastException().getMessage()));
                
            } catch (Exception e) {
                // Handle non-retryable errors
                LOG.error("Non-retryable error for record {}", recordId, e);
                metrics.increment("external_calls.non_retryable_errors");
                
                context.write(new Text("ERROR:" + recordId), 
                    new Text("Non-retryable error: " + e.getMessage()));
            }
        }
        
        private String executeWithRetries(ExternalDataOperation operation) 
            throws RetriesExhaustedException {
            
            Exception lastException = null;
            long delay = retryDelay;
            
            for (int attempt = 1; attempt <= maxRetries; attempt++) {
                try {
                    String result = operation.execute();
                    
                    if (attempt > 1) {
                        metrics.increment("external_calls.retry_success");
                        metrics.gauge("external_calls.attempts_before_success", attempt);
                    }
                    
                    return result;
                    
                } catch (ExternalServiceException e) {
                    lastException = e;
                    
                    if (e.isRetryable()) {
                        metrics.increment("external_calls.retryable_failures");
                        LOG.warn("Retryable external service error on attempt {} of {}: {}", 
                            attempt, maxRetries, e.getMessage());
                        
                        if (attempt < maxRetries) {
                            try {
                                Thread.sleep(delay);
                                delay *= 2; // Exponential backoff
                            } catch (InterruptedException ie) {
                                Thread.currentThread().interrupt();
                                throw new RetriesExhaustedException(attempt, e);
                            }
                        }
                    } else {
                        // Non-retryable external service error
                        throw e;
                    }
                    
                } catch (Exception e) {
                    // Other non-retryable exceptions
                    throw e;
                }
            }
            
            throw new RetriesExhaustedException(maxRetries, lastException);
        }
        
        @FunctionalInterface
        private interface ExternalDataOperation {
            String execute() throws ExternalServiceException;
        }
        
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            if (externalClient != null) {
                externalClient.close();
            }
        }
    }
}

Feature Flags

CDAP provides feature flag support for dynamic application behavior control.

Feature Flags Interface

import io.cdap.cdap.api.feature.*;

// Feature flags provider interface
public interface FeatureFlagsProvider {
    boolean isFeatureEnabled(String featureName);
    
    default boolean isFeatureEnabled(String featureName, boolean defaultValue) {
        try {
            return isFeatureEnabled(featureName);
        } catch (Exception e) {
            return defaultValue;
        }
    }
}

Feature Flags Usage

// Service with feature-controlled behavior
public class FeatureControlledService extends AbstractHttpServiceHandler {
    
    @GET
    @Path("/data/{id}")
    public void getData(HttpServiceRequest request, HttpServiceResponder responder,
                       @PathParam("id") String dataId) {
        
        FeatureFlagsProvider featureFlags = getContext();
        
        try {
            // Core data retrieval
            JsonObject data = retrieveBaseData(dataId);
            
            // Feature-controlled enhancements
            if (featureFlags.isFeatureEnabled("enhanced_analytics", false)) {
                enhanceWithAnalytics(data, dataId);
            }
            
            if (featureFlags.isFeatureEnabled("real_time_recommendations", false)) {
                addRealTimeRecommendations(data, dataId);
            }
            
            if (featureFlags.isFeatureEnabled("advanced_caching", true)) {
                enableAdvancedCaching(data, dataId);
            }
            
            // Feature-controlled response format
            if (featureFlags.isFeatureEnabled("response_compression", false)) {
                sendCompressedResponse(responder, data);
            } else {
                responder.sendJson(200, data);
            }
            
        } catch (Exception e) {
            responder.sendError(500, "Error retrieving data: " + e.getMessage());
        }
    }
    
    private JsonObject retrieveBaseData(String dataId) {
        // Core data retrieval logic
        JsonObject data = new JsonObject();
        data.addProperty("id", dataId);
        data.addProperty("timestamp", System.currentTimeMillis());
        return data;
    }
    
    private void enhanceWithAnalytics(JsonObject data, String dataId) {
        // Enhanced analytics - controlled by feature flag
        data.addProperty("analytics_score", calculateAnalyticsScore(dataId));
        data.addProperty("trend_direction", getTrendDirection(dataId));
    }
    
    private void addRealTimeRecommendations(JsonObject data, String dataId) {
        // Real-time recommendations - controlled by feature flag
        JsonArray recommendations = generateRecommendations(dataId);
        data.add("recommendations", recommendations);
    }
    
    private void enableAdvancedCaching(JsonObject data, String dataId) {
        // Advanced caching logic - controlled by feature flag
        data.addProperty("cached", true);
        data.addProperty("cache_key", generateCacheKey(dataId));
    }
}

The Operational APIs in CDAP provide comprehensive monitoring, scheduling, retry handling, and feature management capabilities essential for running enterprise-grade data applications in production environments.

Install with Tessl CLI

npx tessl i tessl/maven-io-cdap-cdap--cdap

docs

application-framework.md

data-management.md

data-processing.md

index.md

operational.md

plugin-system.md

security-metadata.md

tile.json