CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-dstl-dfs

Distributed file system-based changelog storage implementation for Apache Flink's streaming state backend.

Pending
Overview
Eval results
Files

metrics-monitoring.mddocs/

Metrics and Monitoring

Comprehensive metrics collection for monitoring upload performance, failure rates, and system health in production environments. The metrics system provides visibility into changelog storage operations and helps with performance tuning.

Capabilities

ChangelogStorageMetricGroup

Main metrics container providing thread-safe collection of changelog storage performance data.

/**
 * Metrics related to the Changelog Storage used by the Changelog State Backend.
 * Thread-safe implementation for use by multiple uploader threads.
 */
public class ChangelogStorageMetricGroup extends ProxyMetricGroup<MetricGroup> {
    
    /**
     * Creates changelog storage metric group
     * @param parent Parent metric group for hierarchical organization
     */
    public ChangelogStorageMetricGroup(MetricGroup parent);
    
    /**
     * Records a successful upload operation
     * @param batchSize Number of change sets in the upload batch
     * @param uploadSize Total size of uploaded data in bytes
     * @param latencyNanos Upload latency in nanoseconds
     * @param attempts Number of attempts needed for successful upload
     * @param totalAttempts Total attempts including retries
     */
    public void recordUpload(
        int batchSize,
        long uploadSize, 
        long latencyNanos,
        int attempts,
        int totalAttempts
    );
    
    /**
     * Records a failed upload operation
     * @param batchSize Number of change sets in the failed batch
     * @param uploadSize Total size of failed data in bytes
     * @param attempts Number of attempts made before failure
     */
    public void recordUploadFailure(int batchSize, long uploadSize, int attempts);
    
    /**
     * Updates the current in-flight data gauge
     * @param inFlightBytes Current amount of in-flight data in bytes
     */
    public void updateInFlightData(long inFlightBytes);
    
    /**
     * Records queue size metrics
     * @param queueSize Current number of tasks in upload queue
     */
    public void updateQueueSize(int queueSize);
}

Core Metrics

The metric group provides several categories of metrics for comprehensive monitoring:

/**
 * Counter metrics for tracking upload operations
 */
public class CounterMetrics {
    /** Total number of upload requests initiated */
    private final Counter uploadsCounter;
    
    /** Total number of upload failures */
    private final Counter uploadFailuresCounter;
}

/**
 * Histogram metrics for tracking distributions and performance
 */
public class HistogramMetrics {
    /** Distribution of batch sizes in upload operations */
    private final Histogram uploadBatchSizes;
    
    /** Distribution of upload sizes in bytes */
    private final Histogram uploadSizes;
    
    /** Distribution of upload latencies in nanoseconds */
    private final Histogram uploadLatenciesNanos;
    
    /** Distribution of attempts per successful upload */
    private final Histogram attemptsPerUpload;
    
    /** Distribution of total attempts including failed uploads */
    private final Histogram totalAttemptsPerUpload;
}

/**
 * Gauge metrics for real-time status monitoring
 */
public class GaugeMetrics {
    /** Current amount of in-flight data in bytes */
    private final Gauge<Long> inFlightDataGauge;
    
    /** Current size of upload queue */
    private final Gauge<Integer> queueSizeGauge;
}

Metric Constants

Standard metric names for consistent reporting across Flink installations:

/**
 * Standard metric names for changelog storage
 */
public class MetricNames {
    /** Counter: Total number of upload requests */
    public static final String CHANGELOG_STORAGE_NUM_UPLOAD_REQUESTS = "numUploadRequests";
    
    /** Counter: Total number of upload failures */
    public static final String CHANGELOG_STORAGE_NUM_UPLOAD_FAILURES = "numUploadFailures";
    
    /** Histogram: Upload batch sizes */
    public static final String CHANGELOG_STORAGE_UPLOAD_BATCH_SIZES = "uploadBatchSizes";
    
    /** Histogram: Upload sizes in bytes */
    public static final String CHANGELOG_STORAGE_UPLOAD_SIZES = "uploadSizes";
    
    /** Histogram: Upload latencies in nanoseconds */
    public static final String CHANGELOG_STORAGE_UPLOAD_LATENCIES_NANOS = "uploadLatenciesNanos";
    
    /** Histogram: Attempts per successful upload */
    public static final String CHANGELOG_STORAGE_ATTEMPTS_PER_UPLOAD = "attemptsPerUpload";
    
    /** Histogram: Total attempts including failures */
    public static final String CHANGELOG_STORAGE_TOTAL_ATTEMPTS_PER_UPLOAD = "totalAttemptsPerUpload";
    
    /** Gauge: Current in-flight data in bytes */
    public static final String CHANGELOG_STORAGE_IN_FLIGHT_DATA = "inFlightData";
    
    /** Gauge: Current upload queue size */
    public static final String CHANGELOG_STORAGE_QUEUE_SIZE = "queueSize";
}

Usage Examples:

import org.apache.flink.changelog.fs.ChangelogStorageMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;

// Create metrics group (typically done by storage)
ChangelogStorageMetricGroup metricGroup = 
    new ChangelogStorageMetricGroup(taskManagerJobMetricGroup);

// Record successful upload
long startTime = System.nanoTime();
// ... perform upload ...
long endTime = System.nanoTime();

metricGroup.recordUpload(
    batchSize,           // Number of change sets uploaded
    uploadSizeBytes,     // Total bytes uploaded  
    endTime - startTime, // Upload latency in nanoseconds
    1,                   // Attempts needed (1 for success on first try)
    1                    // Total attempts made
);

// Record failed upload
metricGroup.recordUploadFailure(
    failedBatchSize,     // Number of change sets that failed
    failedUploadSize,    // Total bytes that failed to upload
    maxAttempts          // Number of attempts made before giving up
);

// Update real-time gauges
metricGroup.updateInFlightData(currentInFlightBytes);
metricGroup.updateQueueSize(currentQueueSize);

Integration with Upload System

The metrics system integrates seamlessly with the upload components:

// Upload scheduler records metrics during operation
public class BatchingStateChangeUploadScheduler implements StateChangeUploadScheduler {
    
    private void executeUpload(Collection<UploadTask> tasks) {
        int totalBatchSize = tasks.stream().mapToInt(task -> task.getChangeSets().size()).sum();
        long totalSize = tasks.stream().mapToLong(task -> task.getTotalSize()).sum();
        
        // Update queue size before upload
        metricGroup.updateQueueSize(pendingTasks.size());
        
        long startTime = System.nanoTime();
        try {
            // Perform upload
            UploadTasksResult result = uploader.upload(tasks);
            long endTime = System.nanoTime();
            
            // Record successful uploads
            result.getSuccessful().forEach((task, uploadResults) -> {
                metricGroup.recordUpload(
                    task.getChangeSets().size(),
                    task.getTotalSize(),
                    endTime - startTime,
                    1, // Successful on first attempt
                    1
                );
            });
            
            // Record failed uploads
            result.getFailed().forEach((task, exception) -> {
                metricGroup.recordUploadFailure(
                    task.getChangeSets().size(),
                    task.getTotalSize(),
                    maxRetryAttempts
                );
            });
            
        } catch (Exception e) {
            // Record all as failures
            metricGroup.recordUploadFailure(totalBatchSize, totalSize, maxRetryAttempts);
        }
    }
}

Retry Metrics Integration

The metrics system tracks retry behavior and helps tune retry policies:

// RetryingExecutor integration with metrics
public class RetryingExecutor {
    
    public <T> T execute(Callable<T> operation, RetryPolicy retryPolicy) throws Exception {
        int attempts = 0;
        long totalAttempts = 0;
        Exception lastException = null;
        
        while (attempts < maxAttempts) {
            attempts++;
            totalAttempts++;
            
            try {
                long startTime = System.nanoTime();
                T result = operation.call();
                long endTime = System.nanoTime();
                
                // Record successful operation with retry metrics
                metricGroup.recordUpload(
                    batchSize, 
                    uploadSize, 
                    endTime - startTime,
                    attempts,      // Attempts needed for success
                    totalAttempts  // Total attempts made
                );
                
                return result;
                
            } catch (Exception e) {
                lastException = e;
                long retryDelay = retryPolicy.retryAfter(attempts, e);
                
                if (retryDelay < 0) {
                    // No more retries
                    break;
                }
                
                // Sleep before retry
                Thread.sleep(retryDelay);
            }
        }
        
        // Record failure with total attempts
        metricGroup.recordUploadFailure(batchSize, uploadSize, totalAttempts);
        throw lastException;
    }
}

Monitoring Dashboard Integration

The metrics can be integrated with monitoring dashboards and alerting systems:

// Example metric queries for monitoring systems:

// Upload success rate
// sum(rate(numUploadRequests[5m])) - sum(rate(numUploadFailures[5m])) / sum(rate(numUploadRequests[5m]))

// Average upload latency  
// histogram_quantile(0.5, uploadLatenciesNanos)

// P99 upload latency
// histogram_quantile(0.99, uploadLatenciesNanos)

// Current backpressure status
// inFlightData > in_flight_data_limit_threshold

// Queue buildup
// queueSize > queue_size_threshold

Performance Analysis

Use metrics for performance analysis and optimization:

/**
 * Metrics analysis for performance tuning
 */
public class PerformanceAnalysis {
    
    /**
     * Analyzes upload performance metrics
     * @param metricGroup Metrics to analyze
     * @return Performance recommendations
     */
    public PerformanceRecommendations analyze(ChangelogStorageMetricGroup metricGroup) {
        // Analyze upload patterns
        double failureRate = calculateFailureRate();
        double averageLatency = calculateAverageLatency();
        double p99Latency = calculateP99Latency();
        long averageBatchSize = calculateAverageBatchSize();
        
        // Generate recommendations
        if (failureRate > 0.05) {
            // High failure rate - increase retry attempts or timeout
            return new PerformanceRecommendations()
                .increaseRetryAttempts()
                .increaseUploadTimeout();
        }
        
        if (p99Latency > Duration.ofSeconds(10).toNanos()) {
            // High tail latency - increase parallelism or buffer size
            return new PerformanceRecommendations()
                .increaseUploadThreads()
                .increaseBufferSize();
        }
        
        if (averageBatchSize < 5) {
            // Small batches - increase batching delay or threshold
            return new PerformanceRecommendations()
                .increasePersistDelay()
                .increasePersistSizeThreshold();
        }
        
        return PerformanceRecommendations.optimal();
    }
}

Alerting and Monitoring

Set up alerts based on key metrics:

/**
 * Monitoring thresholds for alerting
 */
public class MonitoringThresholds {
    
    /** Alert when failure rate exceeds 5% */
    public static final double MAX_FAILURE_RATE = 0.05;
    
    /** Alert when P99 latency exceeds 30 seconds */
    public static final long MAX_P99_LATENCY_NANOS = Duration.ofSeconds(30).toNanos();
    
    /** Alert when in-flight data approaches limit */
    public static final double IN_FLIGHT_DATA_WARNING_RATIO = 0.8;
    
    /** Alert when queue size indicates backpressure */
    public static final int MAX_QUEUE_SIZE = 1000;
    
    /**
     * Checks if any metrics exceed alert thresholds
     * @param metrics Current metric values
     * @return List of active alerts
     */
    public List<Alert> checkAlerts(MetricSnapshot metrics) {
        List<Alert> alerts = new ArrayList<>();
        
        if (metrics.getFailureRate() > MAX_FAILURE_RATE) {
            alerts.add(Alert.highFailureRate(metrics.getFailureRate()));
        }
        
        if (metrics.getP99LatencyNanos() > MAX_P99_LATENCY_NANOS) {
            alerts.add(Alert.highLatency(metrics.getP99LatencyNanos()));
        }
        
        if (metrics.getInFlightDataRatio() > IN_FLIGHT_DATA_WARNING_RATIO) {
            alerts.add(Alert.backpressureWarning(metrics.getInFlightDataRatio()));
        }
        
        if (metrics.getQueueSize() > MAX_QUEUE_SIZE) {
            alerts.add(Alert.queueBacklog(metrics.getQueueSize()));
        }
        
        return alerts;
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-dstl-dfs

docs

changelog-writers.md

configuration-options.md

index.md

metrics-monitoring.md

recovery-system.md

storage-factory.md

storage-implementation.md

upload-system.md

tile.json