0
# Metric Implementations
1
2
Concrete implementations of metric interfaces providing both thread-safe and performance-optimized variants. These implementations offer ready-to-use metric types with different performance characteristics for various use cases.
3
4
## Capabilities
5
6
### SimpleCounter Implementation
7
8
Basic non-thread-safe counter implementation optimized for single-threaded scenarios with minimal overhead.
9
10
```java { .api }
11
/**
12
* A simple low-overhead Counter that is not thread-safe.
13
*/
14
@Internal
15
public class SimpleCounter implements Counter {
16
17
/** Increment the current count by 1. */
18
@Override
19
public void inc() { /* implementation */ }
20
21
/**
22
* Increment the current count by the given value.
23
* @param n value to increment the current count by
24
*/
25
@Override
26
public void inc(long n) { /* implementation */ }
27
28
/** Decrement the current count by 1. */
29
@Override
30
public void dec() { /* implementation */ }
31
32
/**
33
* Decrement the current count by the given value.
34
* @param n value to decrement the current count by
35
*/
36
@Override
37
public void dec(long n) { /* implementation */ }
38
39
/**
40
* Returns the current count.
41
* @return current count
42
*/
43
@Override
44
public long getCount() { /* implementation */ }
45
}
46
```
47
48
**Usage Examples:**
49
50
```java
51
// Create simple counter for single-threaded use
52
Counter simpleCounter = new SimpleCounter();
53
54
// Basic operations
55
simpleCounter.inc(); // Fast increment
56
simpleCounter.inc(10); // Fast bulk increment
57
simpleCounter.dec(); // Fast decrement
58
simpleCounter.dec(5); // Fast bulk decrement
59
60
long count = simpleCounter.getCount(); // Fast read
61
62
// Use case: single-threaded operators
63
public class SingleThreadedOperator {
64
private final Counter processedRecords = new SimpleCounter();
65
66
public void processRecord(Record record) {
67
// Process record...
68
processedRecords.inc(); // No synchronization overhead
69
}
70
}
71
```
72
73
### ThreadSafeSimpleCounter Implementation
74
75
Thread-safe counter implementation using atomic operations, suitable for multi-threaded scenarios.
76
77
```java { .api }
78
/**
79
* A simple low-overhead Counter that is thread-safe.
80
*/
81
@Internal
82
public class ThreadSafeSimpleCounter implements Counter {
83
84
/** Increment the current count by 1. */
85
@Override
86
public void inc() { /* uses LongAdder.increment() */ }
87
88
/**
89
* Increment the current count by the given value.
90
* @param n value to increment the current count by
91
*/
92
@Override
93
public void inc(long n) { /* uses LongAdder.add(n) */ }
94
95
/** Decrement the current count by 1. */
96
@Override
97
public void dec() { /* uses LongAdder.decrement() */ }
98
99
/**
100
* Decrement the current count by the given value.
101
* @param n value to decrement the current count by
102
*/
103
@Override
104
public void dec(long n) { /* uses LongAdder.add(-n) */ }
105
106
/**
107
* Returns the current count.
108
* @return current count
109
*/
110
@Override
111
public long getCount() { /* uses LongAdder.longValue() */ }
112
}
113
```
114
115
**Usage Examples:**
116
117
```java
118
// Create thread-safe counter for multi-threaded use
119
Counter threadSafeCounter = new ThreadSafeSimpleCounter();
120
121
// Safe for concurrent access
122
ExecutorService executor = Executors.newFixedThreadPool(10);
123
for (int i = 0; i < 100; i++) {
124
executor.submit(() -> {
125
threadSafeCounter.inc(); // Thread-safe increment
126
});
127
}
128
129
// Use case: shared counters across multiple threads
130
public class MultiThreadedProcessor {
131
private final Counter totalProcessed = new ThreadSafeSimpleCounter();
132
133
// Called from multiple threads
134
public void processInParallel(List<Record> records) {
135
records.parallelStream().forEach(record -> {
136
processRecord(record);
137
totalProcessed.inc(); // Safe concurrent increment
138
});
139
}
140
}
141
```
142
143
### MeterView Implementation
144
145
Meter implementation that provides average rate calculations over a specified time window using a circular buffer approach.
146
147
```java { .api }
148
/**
149
* A MeterView provides an average rate of events per second over a given time period.
150
* The primary advantage is that the rate is neither updated by the computing thread
151
* nor for every event. Instead, a history of counts is maintained that is updated
152
* in regular intervals by a background thread.
153
*/
154
@Internal
155
public class MeterView implements Meter, View {
156
157
/**
158
* Creates a MeterView with specified time span.
159
* @param timeSpanInSeconds time span over which to calculate average
160
*/
161
public MeterView(int timeSpanInSeconds) { /* implementation */ }
162
163
/**
164
* Creates a MeterView with a custom counter and default time span.
165
* @param counter the underlying counter
166
*/
167
public MeterView(Counter counter) { /* implementation */ }
168
169
/**
170
* Creates a MeterView with custom counter and time span.
171
* @param counter the underlying counter
172
* @param timeSpanInSeconds time span over which to calculate average
173
*/
174
public MeterView(Counter counter, int timeSpanInSeconds) { /* implementation */ }
175
176
/**
177
* Creates a MeterView from a gauge that returns numeric values.
178
* @param numberGauge gauge providing numeric values
179
*/
180
public MeterView(Gauge<? extends Number> numberGauge) { /* implementation */ }
181
182
/** Mark occurrence of an event. */
183
@Override
184
public void markEvent() { /* delegates to counter.inc() */ }
185
186
/**
187
* Mark occurrence of multiple events.
188
* @param n number of events occurred
189
*/
190
@Override
191
public void markEvent(long n) { /* delegates to counter.inc(n) */ }
192
193
/**
194
* Returns the current rate of events per second.
195
* @return current rate of events per second
196
*/
197
@Override
198
public double getRate() { /* returns calculated rate */ }
199
200
/**
201
* Get number of events marked on the meter.
202
* @return number of events marked on the meter
203
*/
204
@Override
205
public long getCount() { /* delegates to counter.getCount() */ }
206
207
/**
208
* Called periodically to update the rate calculation.
209
* This is part of the View interface.
210
*/
211
@Override
212
public void update() { /* updates internal rate calculation */ }
213
}
214
```
215
216
**Usage Examples:**
217
218
```java
219
// Create meter with default 60-second window
220
MeterView throughputMeter = new MeterView(60);
221
222
// Create meter with custom counter
223
Counter customCounter = new ThreadSafeSimpleCounter();
224
MeterView customMeter = new MeterView(customCounter, 30); // 30-second window
225
226
// Create meter from gauge
227
Gauge<Long> queueSizeGauge = () -> messageQueue.size();
228
MeterView queueGrowthRate = new MeterView(queueSizeGauge);
229
230
// Usage in streaming context
231
public class StreamProcessor {
232
private final MeterView processingRate = new MeterView(30);
233
234
public void processMessage(Message message) {
235
// Process the message...
236
237
processingRate.markEvent(); // Update rate calculation
238
239
// Can also mark batch events
240
if (message.isBatch()) {
241
processingRate.markEvent(message.getBatchSize());
242
}
243
}
244
245
public void reportMetrics() {
246
double currentRate = processingRate.getRate(); // events/second
247
long totalEvents = processingRate.getCount(); // total processed
248
249
System.out.printf("Processing %.2f events/sec, %d total%n",
250
currentRate, totalEvents);
251
}
252
}
253
254
// Integration with View update system
255
public class MetricUpdater {
256
private final List<View> views = new ArrayList<>();
257
private final ScheduledExecutorService scheduler =
258
Executors.newScheduledThreadPool(1);
259
260
public void registerView(View view) {
261
views.add(view);
262
}
263
264
public void startUpdating() {
265
scheduler.scheduleAtFixedRate(
266
() -> views.forEach(View::update),
267
0,
268
View.UPDATE_INTERVAL_SECONDS,
269
TimeUnit.SECONDS
270
);
271
}
272
}
273
```
274
275
### View Interface for Background Updates
276
277
Interface for metrics that require periodic background updates, such as time-windowed calculations.
278
279
```java { .api }
280
/**
281
* An interface for metrics which should be updated in regular intervals
282
* by a background thread.
283
*/
284
public interface View {
285
/** The interval in which metrics are updated. */
286
int UPDATE_INTERVAL_SECONDS = 5;
287
288
/** This method will be called regularly to update the metric. */
289
void update();
290
}
291
```
292
293
**Usage Examples:**
294
295
```java
296
// Custom view implementation
297
public class MovingAverageGauge implements Gauge<Double>, View {
298
private final Queue<Double> values = new LinkedList<>();
299
private final int windowSize;
300
private double currentAverage = 0.0;
301
302
public MovingAverageGauge(int windowSize) {
303
this.windowSize = windowSize;
304
}
305
306
public void addValue(double value) {
307
synchronized (values) {
308
values.offer(value);
309
if (values.size() > windowSize) {
310
values.poll();
311
}
312
}
313
}
314
315
@Override
316
public Double getValue() {
317
return currentAverage;
318
}
319
320
@Override
321
public void update() {
322
synchronized (values) {
323
if (!values.isEmpty()) {
324
currentAverage = values.stream()
325
.mapToDouble(Double::doubleValue)
326
.average()
327
.orElse(0.0);
328
}
329
}
330
}
331
}
332
333
// Register view for automatic updates
334
MovingAverageGauge avgLatency = new MovingAverageGauge(100);
335
metricGroup.gauge("average-latency", avgLatency);
336
337
// The Flink metrics system will automatically call update() every 5 seconds
338
```
339
340
### Performance Characteristics
341
342
Understanding when to use each implementation:
343
344
**SimpleCounter:**
345
- **Use when**: Single-threaded access, maximum performance needed
346
- **Performance**: Fastest, no synchronization overhead
347
- **Thread safety**: Not thread-safe
348
- **Memory**: Minimal memory footprint
349
350
**ThreadSafeSimpleCounter:**
351
- **Use when**: Multi-threaded access, good performance needed
352
- **Performance**: High performance with low contention
353
- **Thread safety**: Thread-safe using LongAdder
354
- **Memory**: Low memory footprint, optimized for concurrent access
355
356
**MeterView:**
357
- **Use when**: Need rate calculations over time
358
- **Performance**: Background calculation, minimal impact on hot path
359
- **Thread safety**: Thread-safe for event marking
360
- **Memory**: Memory usage scales with time window size
361
362
```java
363
// Performance comparison example
364
public class CounterPerformanceTest {
365
366
@Test
367
public void singleThreadedPerformance() {
368
Counter simple = new SimpleCounter();
369
Counter threadSafe = new ThreadSafeSimpleCounter();
370
371
// SimpleCounter is faster for single-threaded use
372
long start = System.nanoTime();
373
for (int i = 0; i < 1_000_000; i++) {
374
simple.inc();
375
}
376
long simpleTime = System.nanoTime() - start;
377
378
start = System.nanoTime();
379
for (int i = 0; i < 1_000_000; i++) {
380
threadSafe.inc();
381
}
382
long threadSafeTime = System.nanoTime() - start;
383
384
// simpleTime < threadSafeTime for single-threaded access
385
}
386
387
@Test
388
public void multiThreadedPerformance() {
389
Counter threadSafe = new ThreadSafeSimpleCounter();
390
391
// ThreadSafeSimpleCounter scales well with multiple threads
392
ExecutorService executor = Executors.newFixedThreadPool(8);
393
394
long start = System.nanoTime();
395
List<Future<?>> futures = new ArrayList<>();
396
for (int thread = 0; thread < 8; thread++) {
397
futures.add(executor.submit(() -> {
398
for (int i = 0; i < 125_000; i++) { // 1M total
399
threadSafe.inc();
400
}
401
}));
402
}
403
404
futures.forEach(f -> {
405
try { f.get(); } catch (Exception e) { /* handle */ }
406
});
407
long multiThreadTime = System.nanoTime() - start;
408
409
assertEquals(1_000_000L, threadSafe.getCount());
410
}
411
}
412
```