Core metrics interfaces and implementations for Apache Flink stream processing framework
—
Concrete implementations of metric interfaces providing both thread-safe and performance-optimized variants. These implementations offer ready-to-use metric types with different performance characteristics for various use cases.
Basic non-thread-safe counter implementation optimized for single-threaded scenarios with minimal overhead.
/**
* A simple low-overhead Counter that is not thread-safe.
*/
@Internal
public class SimpleCounter implements Counter {
/** Increment the current count by 1. */
@Override
public void inc() { /* implementation */ }
/**
* Increment the current count by the given value.
* @param n value to increment the current count by
*/
@Override
public void inc(long n) { /* implementation */ }
/** Decrement the current count by 1. */
@Override
public void dec() { /* implementation */ }
/**
* Decrement the current count by the given value.
* @param n value to decrement the current count by
*/
@Override
public void dec(long n) { /* implementation */ }
/**
* Returns the current count.
* @return current count
*/
@Override
public long getCount() { /* implementation */ }
}Usage Examples:
// Create simple counter for single-threaded use
Counter simpleCounter = new SimpleCounter();
// Basic operations
simpleCounter.inc(); // Fast increment
simpleCounter.inc(10); // Fast bulk increment
simpleCounter.dec(); // Fast decrement
simpleCounter.dec(5); // Fast bulk decrement
long count = simpleCounter.getCount(); // Fast read
// Use case: single-threaded operators
public class SingleThreadedOperator {
private final Counter processedRecords = new SimpleCounter();
public void processRecord(Record record) {
// Process record...
processedRecords.inc(); // No synchronization overhead
}
}Thread-safe counter implementation using atomic operations, suitable for multi-threaded scenarios.
/**
* A simple low-overhead Counter that is thread-safe.
*/
@Internal
public class ThreadSafeSimpleCounter implements Counter {
/** Increment the current count by 1. */
@Override
public void inc() { /* uses LongAdder.increment() */ }
/**
* Increment the current count by the given value.
* @param n value to increment the current count by
*/
@Override
public void inc(long n) { /* uses LongAdder.add(n) */ }
/** Decrement the current count by 1. */
@Override
public void dec() { /* uses LongAdder.decrement() */ }
/**
* Decrement the current count by the given value.
* @param n value to decrement the current count by
*/
@Override
public void dec(long n) { /* uses LongAdder.add(-n) */ }
/**
* Returns the current count.
* @return current count
*/
@Override
public long getCount() { /* uses LongAdder.longValue() */ }
}Usage Examples:
// Create thread-safe counter for multi-threaded use
Counter threadSafeCounter = new ThreadSafeSimpleCounter();
// Safe for concurrent access
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
executor.submit(() -> {
threadSafeCounter.inc(); // Thread-safe increment
});
}
// Use case: shared counters across multiple threads
public class MultiThreadedProcessor {
private final Counter totalProcessed = new ThreadSafeSimpleCounter();
// Called from multiple threads
public void processInParallel(List<Record> records) {
records.parallelStream().forEach(record -> {
processRecord(record);
totalProcessed.inc(); // Safe concurrent increment
});
}
}Meter implementation that provides average rate calculations over a specified time window using a circular buffer approach.
/**
* A MeterView provides an average rate of events per second over a given time period.
* The primary advantage is that the rate is neither updated by the computing thread
* nor for every event. Instead, a history of counts is maintained that is updated
* in regular intervals by a background thread.
*/
@Internal
public class MeterView implements Meter, View {
/**
* Creates a MeterView with specified time span.
* @param timeSpanInSeconds time span over which to calculate average
*/
public MeterView(int timeSpanInSeconds) { /* implementation */ }
/**
* Creates a MeterView with a custom counter and default time span.
* @param counter the underlying counter
*/
public MeterView(Counter counter) { /* implementation */ }
/**
* Creates a MeterView with custom counter and time span.
* @param counter the underlying counter
* @param timeSpanInSeconds time span over which to calculate average
*/
public MeterView(Counter counter, int timeSpanInSeconds) { /* implementation */ }
/**
* Creates a MeterView from a gauge that returns numeric values.
* @param numberGauge gauge providing numeric values
*/
public MeterView(Gauge<? extends Number> numberGauge) { /* implementation */ }
/** Mark occurrence of an event. */
@Override
public void markEvent() { /* delegates to counter.inc() */ }
/**
* Mark occurrence of multiple events.
* @param n number of events occurred
*/
@Override
public void markEvent(long n) { /* delegates to counter.inc(n) */ }
/**
* Returns the current rate of events per second.
* @return current rate of events per second
*/
@Override
public double getRate() { /* returns calculated rate */ }
/**
* Get number of events marked on the meter.
* @return number of events marked on the meter
*/
@Override
public long getCount() { /* delegates to counter.getCount() */ }
/**
* Called periodically to update the rate calculation.
* This is part of the View interface.
*/
@Override
public void update() { /* updates internal rate calculation */ }
}Usage Examples:
// Create meter with default 60-second window
MeterView throughputMeter = new MeterView(60);
// Create meter with custom counter
Counter customCounter = new ThreadSafeSimpleCounter();
MeterView customMeter = new MeterView(customCounter, 30); // 30-second window
// Create meter from gauge
Gauge<Long> queueSizeGauge = () -> messageQueue.size();
MeterView queueGrowthRate = new MeterView(queueSizeGauge);
// Usage in streaming context
public class StreamProcessor {
private final MeterView processingRate = new MeterView(30);
public void processMessage(Message message) {
// Process the message...
processingRate.markEvent(); // Update rate calculation
// Can also mark batch events
if (message.isBatch()) {
processingRate.markEvent(message.getBatchSize());
}
}
public void reportMetrics() {
double currentRate = processingRate.getRate(); // events/second
long totalEvents = processingRate.getCount(); // total processed
System.out.printf("Processing %.2f events/sec, %d total%n",
currentRate, totalEvents);
}
}
// Integration with View update system
public class MetricUpdater {
private final List<View> views = new ArrayList<>();
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
public void registerView(View view) {
views.add(view);
}
public void startUpdating() {
scheduler.scheduleAtFixedRate(
() -> views.forEach(View::update),
0,
View.UPDATE_INTERVAL_SECONDS,
TimeUnit.SECONDS
);
}
}Interface for metrics that require periodic background updates, such as time-windowed calculations.
/**
* An interface for metrics which should be updated in regular intervals
* by a background thread.
*/
public interface View {
/** The interval in which metrics are updated. */
int UPDATE_INTERVAL_SECONDS = 5;
/** This method will be called regularly to update the metric. */
void update();
}Usage Examples:
// Custom view implementation
public class MovingAverageGauge implements Gauge<Double>, View {
private final Queue<Double> values = new LinkedList<>();
private final int windowSize;
private double currentAverage = 0.0;
public MovingAverageGauge(int windowSize) {
this.windowSize = windowSize;
}
public void addValue(double value) {
synchronized (values) {
values.offer(value);
if (values.size() > windowSize) {
values.poll();
}
}
}
@Override
public Double getValue() {
return currentAverage;
}
@Override
public void update() {
synchronized (values) {
if (!values.isEmpty()) {
currentAverage = values.stream()
.mapToDouble(Double::doubleValue)
.average()
.orElse(0.0);
}
}
}
}
// Register view for automatic updates
MovingAverageGauge avgLatency = new MovingAverageGauge(100);
metricGroup.gauge("average-latency", avgLatency);
// The Flink metrics system will automatically call update() every 5 secondsUnderstanding when to use each implementation:
SimpleCounter:
ThreadSafeSimpleCounter:
MeterView:
// Performance comparison example
public class CounterPerformanceTest {
@Test
public void singleThreadedPerformance() {
Counter simple = new SimpleCounter();
Counter threadSafe = new ThreadSafeSimpleCounter();
// SimpleCounter is faster for single-threaded use
long start = System.nanoTime();
for (int i = 0; i < 1_000_000; i++) {
simple.inc();
}
long simpleTime = System.nanoTime() - start;
start = System.nanoTime();
for (int i = 0; i < 1_000_000; i++) {
threadSafe.inc();
}
long threadSafeTime = System.nanoTime() - start;
// simpleTime < threadSafeTime for single-threaded access
}
@Test
public void multiThreadedPerformance() {
Counter threadSafe = new ThreadSafeSimpleCounter();
// ThreadSafeSimpleCounter scales well with multiple threads
ExecutorService executor = Executors.newFixedThreadPool(8);
long start = System.nanoTime();
List<Future<?>> futures = new ArrayList<>();
for (int thread = 0; thread < 8; thread++) {
futures.add(executor.submit(() -> {
for (int i = 0; i < 125_000; i++) { // 1M total
threadSafe.inc();
}
}));
}
futures.forEach(f -> {
try { f.get(); } catch (Exception e) { /* handle */ }
});
long multiThreadTime = System.nanoTime() - start;
assertEquals(1_000_000L, threadSafe.getCount());
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-metrics-core