0
# Specialized Metric Groups
1
2
Component-specific metric groups tailored for different parts of the Flink runtime. These specialized interfaces provide context-aware metric organization and expose relevant sub-groups for specific Flink components like operators, sources, sinks, and coordinators.
3
4
## Capabilities
5
6
### OperatorMetricGroup Interface
7
8
Special metric group representing Flink operators, providing access to I/O-specific metrics through a dedicated sub-group.
9
10
```java { .api }
11
/**
12
* Special MetricGroup representing an Operator.
13
* You should only update the metrics in the main operator thread.
14
*/
15
@PublicEvolving
16
public interface OperatorMetricGroup extends MetricGroup {
17
/**
18
* Returns the I/O metric group for this operator.
19
* @return OperatorIOMetricGroup for I/O metrics
20
*/
21
OperatorIOMetricGroup getIOMetricGroup();
22
}
23
```
24
25
**Usage Examples:**
26
27
```java
28
// In an operator implementation
29
public class MyMapOperator extends AbstractStreamOperator<String> {
30
private Counter processedRecords;
31
private Counter droppedRecords;
32
private Gauge<Integer> bufferSize;
33
34
@Override
35
public void open() throws Exception {
36
super.open();
37
38
// Get operator-specific metric group
39
OperatorMetricGroup operatorMetrics = getRuntimeContext().getMetricGroup();
40
41
// Register operator-level metrics
42
processedRecords = operatorMetrics.counter("records-processed");
43
droppedRecords = operatorMetrics.counter("records-dropped");
44
bufferSize = operatorMetrics.gauge("buffer-size", () -> internalBuffer.size());
45
46
// Access I/O metrics sub-group
47
OperatorIOMetricGroup ioMetrics = operatorMetrics.getIOMetricGroup();
48
// I/O metrics are typically managed by the runtime, but can be accessed here
49
}
50
51
public void processElement(StreamRecord<String> element) {
52
String value = element.getValue();
53
54
if (shouldProcess(value)) {
55
// Process element...
56
processedRecords.inc();
57
} else {
58
droppedRecords.inc();
59
}
60
}
61
}
62
```
63
64
### OperatorIOMetricGroup Interface
65
66
Metric group specifically for operator I/O metrics, tracking input and output throughput, backpressure, and buffer utilization.
67
68
```java { .api }
69
/**
70
* Metric group that contains shareable pre-defined IO-related metrics for operators.
71
* You should only update the metrics in the main operator thread.
72
*/
73
public interface OperatorIOMetricGroup extends MetricGroup {
74
/**
75
* The total number of input records since the operator started.
76
* Will also populate numRecordsInPerSecond meter.
77
*/
78
Counter getNumRecordsInCounter();
79
80
/**
81
* The total number of output records since the operator started.
82
* Will also populate numRecordsOutPerSecond meter.
83
*/
84
Counter getNumRecordsOutCounter();
85
86
/**
87
* The total number of input bytes since the task started.
88
* Will also populate numBytesInPerSecond meter.
89
*/
90
Counter getNumBytesInCounter();
91
92
/**
93
* The total number of output bytes since the task started.
94
* Will also populate numBytesOutPerSecond meter.
95
*/
96
Counter getNumBytesOutCounter();
97
}
98
```
99
100
**Usage Examples:**
101
102
```java
103
// Accessing I/O metrics (typically read-only from operator code)
104
public class StreamingOperator extends AbstractStreamOperator<Output> {
105
106
@Override
107
public void open() throws Exception {
108
super.open();
109
110
OperatorMetricGroup operatorGroup = getRuntimeContext().getMetricGroup();
111
OperatorIOMetricGroup ioGroup = operatorGroup.getIOMetricGroup();
112
113
// Access pre-defined I/O counters provided by Flink runtime
114
Counter recordsIn = ioGroup.getNumRecordsInCounter();
115
Counter recordsOut = ioGroup.getNumRecordsOutCounter();
116
Counter bytesIn = ioGroup.getNumBytesInCounter();
117
Counter bytesOut = ioGroup.getNumBytesOutCounter();
118
119
// These counters are automatically maintained by Flink runtime
120
// and also populate corresponding rate meters
121
122
// You can still add custom I/O-related metrics
123
ioGroup.gauge("custom-io-metric", () -> getCustomIOValue());
124
}
125
}
126
```
127
128
### SourceReaderMetricGroup Interface
129
130
Specialized metric group for source readers in the new source API, providing context for source-specific metrics.
131
132
```java { .api }
133
/**
134
* Pre-defined metrics for SourceReader.
135
* You should only update the metrics in the main operator thread.
136
*/
137
public interface SourceReaderMetricGroup extends OperatorMetricGroup {
138
/** The total number of record that failed to consume, process, or emit. */
139
Counter getNumRecordsInErrorsCounter();
140
141
/**
142
* Sets an optional gauge for the number of bytes that have not been fetched by the source.
143
* e.g. the remaining bytes in a file after the file descriptor reading position.
144
*
145
* Note that not every source can report this metric in an plausible and efficient way.
146
*/
147
void setPendingBytesGauge(Gauge<Long> pendingBytesGauge);
148
149
/**
150
* Sets an optional gauge for the number of records that have not been fetched by the source.
151
* e.g. the available records after the consumer offset in a Kafka partition.
152
*
153
* Note that not every source can report this metric in an plausible and efficient way.
154
*/
155
void setPendingRecordsGauge(Gauge<Long> pendingRecordsGauge);
156
}
157
```
158
159
**Usage Examples:**
160
161
```java
162
// In a source reader implementation
163
public class KafkaSourceReader implements SourceReader<Record, KafkaPartitionSplit> {
164
private SourceReaderMetricGroup readerMetrics;
165
private Counter recordsRead;
166
private Counter bytesRead;
167
private Gauge<Integer> pendingSplits;
168
private Histogram recordSize;
169
170
public void initialize(SourceReaderContext context) {
171
this.readerMetrics = context.getMetricGroup();
172
173
// Access pre-defined error counter
174
Counter errorCounter = readerMetrics.getNumRecordsInErrorsCounter();
175
176
// Set optional pending gauges if source can provide these metrics
177
if (canTrackPendingBytes()) {
178
readerMetrics.setPendingBytesGauge(() -> calculatePendingBytes());
179
}
180
if (canTrackPendingRecords()) {
181
readerMetrics.setPendingRecordsGauge(() -> calculatePendingRecords());
182
}
183
184
// Register additional custom source reader metrics
185
recordsRead = readerMetrics.counter("records-read");
186
bytesRead = readerMetrics.counter("bytes-read");
187
pendingSplits = readerMetrics.gauge("pending-splits", () -> splitQueue.size());
188
recordSize = readerMetrics.histogram("record-size", new MyHistogram());
189
}
190
191
public InputStatus pollNext(ReaderOutput<Record> output) {
192
Record record = pollRecord();
193
if (record != null) {
194
output.collect(record);
195
recordsRead.inc();
196
bytesRead.inc(record.sizeInBytes());
197
recordSize.update(record.sizeInBytes());
198
return InputStatus.MORE_AVAILABLE;
199
}
200
return InputStatus.NOTHING_AVAILABLE;
201
}
202
}
203
```
204
205
### SinkWriterMetricGroup Interface
206
207
Metric group for sink writers, enabling tracking of writing performance and success rates.
208
209
```java { .api }
210
/**
211
* Pre-defined metrics for sinks.
212
* You should only update the metrics in the main operator thread.
213
*/
214
public interface SinkWriterMetricGroup extends OperatorMetricGroup {
215
/** The total number of records failed to send. */
216
Counter getNumRecordsOutErrorsCounter();
217
218
/**
219
* The total number of records failed to send.
220
* This metric has the same value as numRecordsOutError.
221
*/
222
Counter getNumRecordsSendErrorsCounter();
223
224
/**
225
* The total number of records have been sent to the downstream system.
226
* This metric has the same value as numRecordsOut of the operator.
227
* Note: this counter will count all records the SinkWriter sent.
228
*/
229
Counter getNumRecordsSendCounter();
230
231
/**
232
* The total number of output send bytes since the task started.
233
* This metric has the same value as numBytesOut of the operator.
234
*/
235
Counter getNumBytesSendCounter();
236
237
/**
238
* Sets an optional gauge for the time it takes to send the last record.
239
* This metric is an instantaneous value recorded for the last processed record.
240
*/
241
void setCurrentSendTimeGauge(Gauge<Long> currentSendTimeGauge);
242
}
243
```
244
245
**Usage Examples:**
246
247
```java
248
// In a sink writer implementation
249
public class DatabaseSinkWriter implements SinkWriter<Record> {
250
private SinkWriterMetricGroup writerMetrics;
251
private Counter recordsWritten;
252
private Counter writesSucceeded;
253
private Counter writesFailed;
254
private Histogram writeLatency;
255
private Meter writeRate;
256
257
public void initialize(SinkWriter.Context context) {
258
this.writerMetrics = context.getMetricGroup();
259
260
// Access pre-defined sink writer counters
261
Counter outErrors = writerMetrics.getNumRecordsOutErrorsCounter();
262
Counter sendErrors = writerMetrics.getNumRecordsSendErrorsCounter();
263
Counter recordsSent = writerMetrics.getNumRecordsSendCounter();
264
Counter bytesSent = writerMetrics.getNumBytesSendCounter();
265
266
// Set optional send time gauge if sink can provide this metric
267
if (canTrackSendTime()) {
268
writerMetrics.setCurrentSendTimeGauge(() -> lastSendTimeMillis);
269
}
270
271
// Register additional custom sink writer metrics
272
recordsWritten = writerMetrics.counter("records-written");
273
writeLatency = writerMetrics.histogram("write-latency", new LatencyHistogram());
274
writeRate = writerMetrics.meter("write-rate", new MeterView(30));
275
276
// Connection-specific metrics
277
MetricGroup connectionGroup = writerMetrics.addGroup("connection");
278
connectionGroup.gauge("pool-size", () -> connectionPool.getActiveConnections());
279
connectionGroup.gauge("pool-utilization", () -> connectionPool.getUtilization());
280
}
281
282
@Override
283
public void write(Record record, Context context) {
284
long startTime = System.currentTimeMillis();
285
286
try {
287
database.write(record);
288
recordsWritten.inc();
289
writesSucceeded.inc();
290
writeRate.markEvent();
291
292
long latency = System.currentTimeMillis() - startTime;
293
writeLatency.update(latency);
294
295
} catch (Exception e) {
296
writesFailed.inc();
297
throw new RuntimeException("Failed to write record", e);
298
}
299
}
300
}
301
```
302
303
### SinkCommitterMetricGroup Interface
304
305
Metric group for sink committers, tracking commit operations and success rates in two-phase commit scenarios.
306
307
```java { .api }
308
/**
309
* Pre-defined metrics for sinks.
310
* You should only update the metrics in the main operator thread.
311
*/
312
public interface SinkCommitterMetricGroup extends OperatorMetricGroup {
313
/** The total number of committables arrived. */
314
Counter getNumCommittablesTotalCounter();
315
316
/** The total number of committable failures. */
317
Counter getNumCommittablesFailureCounter();
318
319
/** The total number of committable retry. */
320
Counter getNumCommittablesRetryCounter();
321
322
/** The total number of successful committables. */
323
Counter getNumCommittablesSuccessCounter();
324
325
/** The total number of already committed committables. */
326
Counter getNumCommittablesAlreadyCommittedCounter();
327
328
/** The pending committables. */
329
void setCurrentPendingCommittablesGauge(Gauge<Integer> currentPendingCommittablesGauge);
330
}
331
```
332
333
**Usage Examples:**
334
335
```java
336
// In a sink committer implementation
337
public class TransactionalSinkCommitter implements SinkCommitter<CommitInfo> {
338
private SinkCommitterMetricGroup committerMetrics;
339
private Counter commitsAttempted;
340
private Counter commitsSucceeded;
341
private Counter commitsFailed;
342
private Histogram commitLatency;
343
private Gauge<Integer> pendingCommits;
344
345
public void initialize(SinkCommitter.Context context) {
346
this.committerMetrics = context.getMetricGroup();
347
348
// Access pre-defined committer counters
349
Counter totalCommittables = committerMetrics.getNumCommittablesTotalCounter();
350
Counter failureCommittables = committerMetrics.getNumCommittablesFailureCounter();
351
Counter retryCommittables = committerMetrics.getNumCommittablesRetryCounter();
352
Counter successCommittables = committerMetrics.getNumCommittablesSuccessCounter();
353
Counter alreadyCommitted = committerMetrics.getNumCommittablesAlreadyCommittedCounter();
354
355
// Set optional pending committables gauge if committer can provide this metric
356
if (canTrackPendingCommittables()) {
357
committerMetrics.setCurrentPendingCommittablesGauge(() -> pendingCommittables.size());
358
}
359
360
// Register additional custom committer metrics
361
commitLatency = committerMetrics.histogram("commit-latency", new LatencyHistogram());
362
363
// Transaction-specific metrics
364
MetricGroup txnGroup = committerMetrics.addGroup("transactions");
365
txnGroup.counter("transactions-started");
366
txnGroup.counter("transactions-committed");
367
txnGroup.counter("transactions-aborted");
368
}
369
370
@Override
371
public List<CommitInfo> commit(List<CommitInfo> commitInfos) {
372
List<CommitInfo> failedCommits = new ArrayList<>();
373
374
for (CommitInfo commitInfo : commitInfos) {
375
long startTime = System.currentTimeMillis();
376
commitsAttempted.inc();
377
378
try {
379
database.commit(commitInfo.getTransactionId());
380
commitsSucceeded.inc();
381
382
long latency = System.currentTimeMillis() - startTime;
383
commitLatency.update(latency);
384
385
} catch (Exception e) {
386
commitsFailed.inc();
387
failedCommits.add(commitInfo);
388
}
389
}
390
391
return failedCommits; // Return failed commits for retry
392
}
393
}
394
```
395
396
### CacheMetricGroup Interface
397
398
Metric group for cache operations, enabling tracking of cache performance, hit rates, and memory usage. This interface provides pre-defined methods for registering cache-related metrics.
399
400
```java { .api }
401
/**
402
* Pre-defined metrics for cache.
403
* Please note that these methods should only be invoked once.
404
* Registering a metric with same name for multiple times would lead to an undefined behavior.
405
*/
406
public interface CacheMetricGroup extends MetricGroup {
407
/** The number of cache hits. */
408
void hitCounter(Counter hitCounter);
409
410
/** The number of cache misses. */
411
void missCounter(Counter missCounter);
412
413
/** The number of times to load data into cache from external system. */
414
void loadCounter(Counter loadCounter);
415
416
/** The number of load failures. */
417
void numLoadFailuresCounter(Counter numLoadFailuresCounter);
418
419
/** The time spent for the latest load operation. */
420
void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);
421
422
/** The number of records in cache. */
423
void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);
424
425
/** The number of bytes used by cache. */
426
void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
427
}
428
```
429
430
**Usage Examples:**
431
432
```java
433
// In a caching operator or function
434
public class CachingFunction extends RichMapFunction<Input, Output> {
435
private CacheMetricGroup cacheMetrics;
436
private Counter hits;
437
private Counter misses;
438
private Counter loads;
439
private Counter loadFailures;
440
private Gauge<Long> loadTime;
441
private Gauge<Long> cachedRecords;
442
private Gauge<Long> cachedBytes;
443
444
@Override
445
public void open(Configuration config) throws Exception {
446
super.open(config);
447
448
// Get cache metric group
449
this.cacheMetrics = getRuntimeContext().getMetricGroup().addGroup("cache");
450
451
// Register cache metrics - each method should only be called once
452
this.hits = new SimpleCounter();
453
cacheMetrics.hitCounter(hits);
454
455
this.misses = new SimpleCounter();
456
cacheMetrics.missCounter(misses);
457
458
this.loads = new SimpleCounter();
459
cacheMetrics.loadCounter(loads);
460
461
this.loadFailures = new SimpleCounter();
462
cacheMetrics.numLoadFailuresCounter(loadFailures);
463
464
// Register gauges for cache state
465
cacheMetrics.latestLoadTimeGauge(() -> lastLoadTimeMillis);
466
cacheMetrics.numCachedRecordsGauge(() -> cache.size());
467
cacheMetrics.numCachedBytesGauge(() -> cache.getEstimatedSize());
468
}
469
470
@Override
471
public Output map(Input input) throws Exception {
472
Output cached = cache.get(input.getKey());
473
if (cached != null) {
474
hits.inc();
475
return cached;
476
}
477
478
// Cache miss - load from external system
479
misses.inc();
480
long startTime = System.currentTimeMillis();
481
482
try {
483
loads.inc();
484
Output result = loadFromExternalSystem(input);
485
cache.put(input.getKey(), result);
486
487
lastLoadTimeMillis = System.currentTimeMillis() - startTime;
488
return result;
489
490
} catch (Exception e) {
491
loadFailures.inc();
492
throw e;
493
}
494
}
495
}
496
```
497
498
### SplitEnumeratorMetricGroup Interface
499
500
Metric group for split enumerators in the source connector API, tracking split assignment and discovery.
501
502
```java { .api }
503
/**
504
* Metric group for split enumerators.
505
*/
506
public interface SplitEnumeratorMetricGroup extends MetricGroup {
507
// Extends MetricGroup with split enumerator context
508
}
509
```
510
511
**Usage Examples:**
512
513
```java
514
// In a split enumerator implementation
515
public class KafkaSplitEnumerator implements SplitEnumerator<KafkaPartitionSplit, KafkaState> {
516
private SplitEnumeratorMetricGroup enumeratorMetrics;
517
private Counter splitsDiscovered;
518
private Counter splitsAssigned;
519
private Gauge<Integer> unassignedSplits;
520
private Gauge<Integer> activeReaders;
521
522
public void initialize(SplitEnumeratorContext<KafkaPartitionSplit> context) {
523
this.enumeratorMetrics = context.getMetricGroup();
524
525
// Register enumerator metrics
526
splitsDiscovered = enumeratorMetrics.counter("splits-discovered");
527
splitsAssigned = enumeratorMetrics.counter("splits-assigned");
528
unassignedSplits = enumeratorMetrics.gauge("unassigned-splits", () -> unassignedSplitQueue.size());
529
activeReaders = enumeratorMetrics.gauge("active-readers", () -> readerStates.size());
530
531
// Topic-specific metrics
532
MetricGroup topicGroup = enumeratorMetrics.addGroup("topics");
533
for (String topic : monitoredTopics) {
534
MetricGroup tGroup = topicGroup.addGroup("topic", topic);
535
tGroup.gauge("partitions", () -> getPartitionCount(topic));
536
tGroup.counter("splits-for-topic");
537
}
538
}
539
540
@Override
541
public void handleSplitRequest(int subtaskId, String requesterHostname) {
542
List<KafkaPartitionSplit> availableSplits = getAvailableSplits();
543
544
if (!availableSplits.isEmpty()) {
545
context.assignSplit(availableSplits.get(0), subtaskId);
546
splitsAssigned.inc();
547
}
548
}
549
550
@Override
551
public void addSplitsBack(List<KafkaPartitionSplit> splits, int subtaskId) {
552
unassignedSplitQueue.addAll(splits);
553
// Splits are back in the pool for reassignment
554
}
555
}
556
```
557
558
### OperatorCoordinatorMetricGroup Interface
559
560
Metric group for operator coordinators, which manage coordination between parallel operator instances.
561
562
```java { .api }
563
/**
564
* Metric group for operator coordinators.
565
*/
566
public interface OperatorCoordinatorMetricGroup extends MetricGroup {
567
// Extends MetricGroup with operator coordinator context
568
}
569
```
570
571
**Usage Examples:**
572
573
```java
574
// In an operator coordinator implementation
575
public class CheckpointCoordinator implements OperatorCoordinator {
576
private OperatorCoordinatorMetricGroup coordinatorMetrics;
577
private Counter coordinationEvents;
578
private Counter successfulCoordinations;
579
private Counter failedCoordinations;
580
private Gauge<Integer> pendingOperations;
581
582
public void initialize(OperatorCoordinator.Context context) {
583
this.coordinatorMetrics = context.getMetricGroup();
584
585
// Register coordinator metrics
586
coordinationEvents = coordinatorMetrics.counter("coordination-events");
587
successfulCoordinations = coordinatorMetrics.counter("successful-coordinations");
588
failedCoordinations = coordinatorMetrics.counter("failed-coordinations");
589
pendingOperations = coordinatorMetrics.gauge("pending-operations", () -> operationQueue.size());
590
591
// Per-subtask metrics
592
MetricGroup subtaskGroup = coordinatorMetrics.addGroup("subtasks");
593
for (int i = 0; i < context.currentParallelism(); i++) {
594
MetricGroup stGroup = subtaskGroup.addGroup("subtask", String.valueOf(i));
595
stGroup.counter("messages-sent");
596
stGroup.counter("messages-received");
597
stGroup.gauge("last-seen", () -> getLastSeenTime(i));
598
}
599
}
600
601
@Override
602
public void handleEventFromOperator(int subtask, OperatorEvent event) {
603
coordinationEvents.inc();
604
605
try {
606
processCoordinationEvent(subtask, event);
607
successfulCoordinations.inc();
608
} catch (Exception e) {
609
failedCoordinations.inc();
610
throw new RuntimeException("Coordination failed", e);
611
}
612
}
613
}
614
```
615
616
### CacheMetricGroup Interface
617
618
Metric group for cache-related metrics, useful for caching layers and buffering components.
619
620
```java { .api }
621
/**
622
* Metric group for cache metrics.
623
*/
624
public interface CacheMetricGroup extends MetricGroup {
625
// Extends MetricGroup with cache-specific context
626
}
627
```
628
629
**Usage Examples:**
630
631
```java
632
// In a caching component
633
public class LookupCache {
634
private CacheMetricGroup cacheMetrics;
635
private Counter hits;
636
private Counter misses;
637
private Counter evictions;
638
private Gauge<Integer> cacheSize;
639
private Gauge<Double> hitRate;
640
641
public void initialize(CacheMetricGroup metrics) {
642
this.cacheMetrics = metrics;
643
644
// Standard cache metrics
645
hits = cacheMetrics.counter("hits");
646
misses = cacheMetrics.counter("misses");
647
evictions = cacheMetrics.counter("evictions");
648
cacheSize = cacheMetrics.gauge("size", () -> cache.size());
649
hitRate = cacheMetrics.gauge("hit-rate", this::calculateHitRate);
650
651
// Memory metrics
652
MetricGroup memoryGroup = cacheMetrics.addGroup("memory");
653
memoryGroup.gauge("used-bytes", () -> cache.estimatedSize());
654
memoryGroup.gauge("max-bytes", () -> cache.maximumSize());
655
}
656
657
public Object lookup(String key) {
658
Object value = cache.get(key);
659
660
if (value != null) {
661
hits.inc();
662
return value;
663
} else {
664
misses.inc();
665
return null;
666
}
667
}
668
669
private double calculateHitRate() {
670
long totalHits = hits.getCount();
671
long totalMisses = misses.getCount();
672
long total = totalHits + totalMisses;
673
674
return total > 0 ? (double) totalHits / total : 0.0;
675
}
676
}
677
```