CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-metrics-core

Core metrics interfaces and implementations for Apache Flink stream processing framework

Pending
Overview
Eval results
Files

implementations.mddocs/

Metric Implementations

Concrete implementations of metric interfaces providing both thread-safe and performance-optimized variants. These implementations offer ready-to-use metric types with different performance characteristics for various use cases.

Capabilities

SimpleCounter Implementation

Basic non-thread-safe counter implementation optimized for single-threaded scenarios with minimal overhead.

/**
 * A simple low-overhead Counter that is not thread-safe.
 */
@Internal
public class SimpleCounter implements Counter {
    
    /** Increment the current count by 1. */
    @Override
    public void inc() { /* implementation */ }
    
    /**
     * Increment the current count by the given value.
     * @param n value to increment the current count by
     */
    @Override
    public void inc(long n) { /* implementation */ }
    
    /** Decrement the current count by 1. */
    @Override
    public void dec() { /* implementation */ }
    
    /**
     * Decrement the current count by the given value.
     * @param n value to decrement the current count by
     */
    @Override
    public void dec(long n) { /* implementation */ }
    
    /**
     * Returns the current count.
     * @return current count
     */
    @Override
    public long getCount() { /* implementation */ }
}

Usage Examples:

// Create simple counter for single-threaded use
Counter simpleCounter = new SimpleCounter();

// Basic operations
simpleCounter.inc();           // Fast increment
simpleCounter.inc(10);         // Fast bulk increment
simpleCounter.dec();           // Fast decrement
simpleCounter.dec(5);          // Fast bulk decrement

long count = simpleCounter.getCount(); // Fast read

// Use case: single-threaded operators
public class SingleThreadedOperator {
    private final Counter processedRecords = new SimpleCounter();
    
    public void processRecord(Record record) {
        // Process record...
        processedRecords.inc(); // No synchronization overhead
    }
}

ThreadSafeSimpleCounter Implementation

Thread-safe counter implementation using atomic operations, suitable for multi-threaded scenarios.

/**
 * A simple low-overhead Counter that is thread-safe.
 */
@Internal
public class ThreadSafeSimpleCounter implements Counter {
    
    /** Increment the current count by 1. */
    @Override
    public void inc() { /* uses LongAdder.increment() */ }
    
    /**
     * Increment the current count by the given value.
     * @param n value to increment the current count by
     */
    @Override
    public void inc(long n) { /* uses LongAdder.add(n) */ }
    
    /** Decrement the current count by 1. */
    @Override 
    public void dec() { /* uses LongAdder.decrement() */ }
    
    /**
     * Decrement the current count by the given value.
     * @param n value to decrement the current count by
     */
    @Override
    public void dec(long n) { /* uses LongAdder.add(-n) */ }
    
    /**
     * Returns the current count.
     * @return current count
     */
    @Override
    public long getCount() { /* uses LongAdder.longValue() */ }
}

Usage Examples:

// Create thread-safe counter for multi-threaded use
Counter threadSafeCounter = new ThreadSafeSimpleCounter();

// Safe for concurrent access
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
    executor.submit(() -> {
        threadSafeCounter.inc(); // Thread-safe increment
    });
}

// Use case: shared counters across multiple threads
public class MultiThreadedProcessor {
    private final Counter totalProcessed = new ThreadSafeSimpleCounter();
    
    // Called from multiple threads
    public void processInParallel(List<Record> records) {
        records.parallelStream().forEach(record -> {
            processRecord(record);
            totalProcessed.inc(); // Safe concurrent increment
        });
    }
}

MeterView Implementation

Meter implementation that provides average rate calculations over a specified time window using a circular buffer approach.

/**
 * A MeterView provides an average rate of events per second over a given time period.
 * The primary advantage is that the rate is neither updated by the computing thread 
 * nor for every event. Instead, a history of counts is maintained that is updated 
 * in regular intervals by a background thread.
 */
@Internal
public class MeterView implements Meter, View {
    
    /**
     * Creates a MeterView with specified time span.
     * @param timeSpanInSeconds time span over which to calculate average
     */
    public MeterView(int timeSpanInSeconds) { /* implementation */ }
    
    /**
     * Creates a MeterView with a custom counter and default time span.
     * @param counter the underlying counter
     */
    public MeterView(Counter counter) { /* implementation */ }
    
    /**
     * Creates a MeterView with custom counter and time span.
     * @param counter the underlying counter
     * @param timeSpanInSeconds time span over which to calculate average
     */
    public MeterView(Counter counter, int timeSpanInSeconds) { /* implementation */ }
    
    /**
     * Creates a MeterView from a gauge that returns numeric values.
     * @param numberGauge gauge providing numeric values
     */
    public MeterView(Gauge<? extends Number> numberGauge) { /* implementation */ }
    
    /** Mark occurrence of an event. */
    @Override
    public void markEvent() { /* delegates to counter.inc() */ }
    
    /**
     * Mark occurrence of multiple events.
     * @param n number of events occurred
     */
    @Override
    public void markEvent(long n) { /* delegates to counter.inc(n) */ }
    
    /**
     * Returns the current rate of events per second.
     * @return current rate of events per second
     */
    @Override
    public double getRate() { /* returns calculated rate */ }
    
    /**
     * Get number of events marked on the meter.
     * @return number of events marked on the meter
     */
    @Override
    public long getCount() { /* delegates to counter.getCount() */ }
    
    /**
     * Called periodically to update the rate calculation.
     * This is part of the View interface.
     */
    @Override
    public void update() { /* updates internal rate calculation */ }
}

Usage Examples:

// Create meter with default 60-second window
MeterView throughputMeter = new MeterView(60);

// Create meter with custom counter
Counter customCounter = new ThreadSafeSimpleCounter();
MeterView customMeter = new MeterView(customCounter, 30); // 30-second window

// Create meter from gauge
Gauge<Long> queueSizeGauge = () -> messageQueue.size();
MeterView queueGrowthRate = new MeterView(queueSizeGauge);

// Usage in streaming context
public class StreamProcessor {
    private final MeterView processingRate = new MeterView(30);
    
    public void processMessage(Message message) {
        // Process the message...
        
        processingRate.markEvent(); // Update rate calculation
        
        // Can also mark batch events
        if (message.isBatch()) {
            processingRate.markEvent(message.getBatchSize());
        }
    }
    
    public void reportMetrics() {
        double currentRate = processingRate.getRate(); // events/second
        long totalEvents = processingRate.getCount();  // total processed
        
        System.out.printf("Processing %.2f events/sec, %d total%n", 
                         currentRate, totalEvents);
    }
}

// Integration with View update system
public class MetricUpdater {
    private final List<View> views = new ArrayList<>();
    private final ScheduledExecutorService scheduler = 
        Executors.newScheduledThreadPool(1);
    
    public void registerView(View view) {
        views.add(view);
    }
    
    public void startUpdating() {
        scheduler.scheduleAtFixedRate(
            () -> views.forEach(View::update),
            0, 
            View.UPDATE_INTERVAL_SECONDS, 
            TimeUnit.SECONDS
        );
    }
}

View Interface for Background Updates

Interface for metrics that require periodic background updates, such as time-windowed calculations.

/**
 * An interface for metrics which should be updated in regular intervals 
 * by a background thread.
 */
public interface View {
    /** The interval in which metrics are updated. */
    int UPDATE_INTERVAL_SECONDS = 5;
    
    /** This method will be called regularly to update the metric. */
    void update();
}

Usage Examples:

// Custom view implementation
public class MovingAverageGauge implements Gauge<Double>, View {
    private final Queue<Double> values = new LinkedList<>();
    private final int windowSize;
    private double currentAverage = 0.0;
    
    public MovingAverageGauge(int windowSize) {
        this.windowSize = windowSize;
    }
    
    public void addValue(double value) {
        synchronized (values) {
            values.offer(value);
            if (values.size() > windowSize) {
                values.poll();
            }
        }
    }
    
    @Override
    public Double getValue() {
        return currentAverage;
    }
    
    @Override
    public void update() {
        synchronized (values) {
            if (!values.isEmpty()) {
                currentAverage = values.stream()
                    .mapToDouble(Double::doubleValue)
                    .average()
                    .orElse(0.0);
            }
        }
    }
}

// Register view for automatic updates
MovingAverageGauge avgLatency = new MovingAverageGauge(100);
metricGroup.gauge("average-latency", avgLatency);

// The Flink metrics system will automatically call update() every 5 seconds

Performance Characteristics

Understanding when to use each implementation:

SimpleCounter:

  • Use when: Single-threaded access, maximum performance needed
  • Performance: Fastest, no synchronization overhead
  • Thread safety: Not thread-safe
  • Memory: Minimal memory footprint

ThreadSafeSimpleCounter:

  • Use when: Multi-threaded access, good performance needed
  • Performance: High performance with low contention
  • Thread safety: Thread-safe using LongAdder
  • Memory: Low memory footprint, optimized for concurrent access

MeterView:

  • Use when: Need rate calculations over time
  • Performance: Background calculation, minimal impact on hot path
  • Thread safety: Thread-safe for event marking
  • Memory: Memory usage scales with time window size
// Performance comparison example
public class CounterPerformanceTest {
    
    @Test
    public void singleThreadedPerformance() {
        Counter simple = new SimpleCounter();
        Counter threadSafe = new ThreadSafeSimpleCounter();
        
        // SimpleCounter is faster for single-threaded use
        long start = System.nanoTime();
        for (int i = 0; i < 1_000_000; i++) {
            simple.inc();
        }
        long simpleTime = System.nanoTime() - start;
        
        start = System.nanoTime();
        for (int i = 0; i < 1_000_000; i++) {
            threadSafe.inc();
        }
        long threadSafeTime = System.nanoTime() - start;
        
        // simpleTime < threadSafeTime for single-threaded access
    }
    
    @Test
    public void multiThreadedPerformance() {
        Counter threadSafe = new ThreadSafeSimpleCounter();
        
        // ThreadSafeSimpleCounter scales well with multiple threads
        ExecutorService executor = Executors.newFixedThreadPool(8);
        
        long start = System.nanoTime();
        List<Future<?>> futures = new ArrayList<>();
        for (int thread = 0; thread < 8; thread++) {
            futures.add(executor.submit(() -> {
                for (int i = 0; i < 125_000; i++) { // 1M total
                    threadSafe.inc();
                }
            }));
        }
        
        futures.forEach(f -> {
            try { f.get(); } catch (Exception e) { /* handle */ }
        });
        long multiThreadTime = System.nanoTime() - start;
        
        assertEquals(1_000_000L, threadSafe.getCount());
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-metrics-core

docs

configuration.md

core-metrics.md

implementations.md

index.md

metric-groups.md

reporters.md

specialized-groups.md

tracing.md

tile.json