0
# Metrics System
1
2
The Metrics System provides comprehensive infrastructure for collecting, registering, and reporting runtime metrics from Flink applications. This system enables monitoring of job performance, resource usage, throughput, and custom application metrics across the distributed cluster.
3
4
## Core Components
5
6
### MetricRegistry
7
8
Central registry for Flink metrics that handles metric registration, reporter management, and metric lifecycle.
9
10
```java { .api }
11
public class MetricRegistry implements MetricRegistryImpl {
12
public static MetricRegistry create(MetricRegistryConfiguration config);
13
14
public void register(Metric metric, String metricName, AbstractMetricGroup group);
15
public void unregister(Metric metric, String metricName, AbstractMetricGroup group);
16
17
public void startReporters(Configuration config);
18
public void stopReporters();
19
20
public char getDelimiter();
21
public int getNumberReporters();
22
23
public ScopeFormats getScopeFormats();
24
25
@Override
26
public void close();
27
}
28
```
29
30
### MetricRegistryConfiguration
31
32
Configuration class for metrics registry setup and behavior customization.
33
34
```java { .api }
35
public class MetricRegistryConfiguration {
36
public static MetricRegistryConfiguration fromConfiguration(Configuration config);
37
public static MetricRegistryConfiguration defaultMetricRegistryConfiguration();
38
39
public long getQueryServiceUpdateInterval();
40
public int getQueryServicePort();
41
public String getQueryServiceBindAddress();
42
43
public List<ReporterSetup> getReporterConfigurations();
44
public ScopeFormats getScopeFormats();
45
46
public char getDelimiter();
47
public List<String> getExcludedMetrics();
48
}
49
```
50
51
## Metric Types
52
53
### Counter
54
55
Metric that tracks a count that can only increase.
56
57
```java { .api }
58
public interface Counter extends Metric {
59
void inc();
60
void inc(long n);
61
long getCount();
62
63
// Static factory method
64
static Counter of(LongCounter longCounter);
65
}
66
```
67
68
### Gauge
69
70
Metric that provides an instantaneous measurement of a value.
71
72
```java { .api }
73
public interface Gauge<T> extends Metric {
74
T getValue();
75
76
// Static factory methods
77
static <T> Gauge<T> of(Supplier<T> supplier);
78
static Gauge<Double> of(DoubleSupplier supplier);
79
static Gauge<Long> of(LongSupplier supplier);
80
}
81
```
82
83
### Meter
84
85
Metric that tracks the rate of events over time.
86
87
```java { .api }
88
public interface Meter extends Metric {
89
void markEvent();
90
void markEvent(long n);
91
92
double getRate();
93
long getCount();
94
}
95
```
96
97
### Histogram
98
99
Metric that tracks the distribution of values over time.
100
101
```java { .api }
102
public interface Histogram extends Metric {
103
void update(long value);
104
105
long getCount();
106
HistogramStatistics getStatistics();
107
}
108
109
public interface HistogramStatistics {
110
double getQuantile(double quantile);
111
long[] getValues();
112
int size();
113
double getMean();
114
double getStdDev();
115
long getMax();
116
long getMin();
117
}
118
```
119
120
## Metric Groups
121
122
### MetricGroup
123
124
Base interface for organizing metrics into hierarchical groups with scoped naming.
125
126
```java { .api }
127
public interface MetricGroup {
128
Counter counter(String name);
129
Counter counter(String name, Counter counter);
130
131
<T, C extends Counter> C counter(String name, C counter);
132
133
<T> Gauge<T> gauge(String name, Gauge<T> gauge);
134
135
Histogram histogram(String name, Histogram histogram);
136
137
Meter meter(String name, Meter meter);
138
139
MetricGroup addGroup(String name);
140
MetricGroup addGroup(String key, String value);
141
142
String[] getScopeComponents();
143
Map<String, String> getAllVariables();
144
String getMetricIdentifier(String metricName);
145
String getMetricIdentifier(String metricName, CharacterFilter filter);
146
}
147
```
148
149
### AbstractMetricGroup
150
151
Abstract base implementation providing common metric group functionality.
152
153
```java { .api }
154
public abstract class AbstractMetricGroup implements MetricGroup {
155
protected AbstractMetricGroup(MetricRegistry registry, String[] scope, AbstractMetricGroup parent);
156
157
protected void addMetric(String name, Metric metric);
158
protected void removeMetric(String name);
159
160
public final MetricGroup addGroup(String name);
161
public final MetricGroup addGroup(String key, String value);
162
163
protected abstract String getGroupName(String name);
164
165
public final String[] getScopeComponents();
166
public final Map<String, String> getAllVariables();
167
public final String getMetricIdentifier(String metricName);
168
public final String getMetricIdentifier(String metricName, CharacterFilter filter);
169
170
@Override
171
public void close();
172
}
173
```
174
175
### ComponentMetricGroup
176
177
Specialized metric group for cluster components (JobManager, TaskManager, etc.).
178
179
```java { .api }
180
public class ComponentMetricGroup extends AbstractMetricGroup {
181
public ComponentMetricGroup(MetricRegistry registry, String componentName);
182
183
public <J> JobManagerMetricGroup addJobManager(Configuration config, String hostname, String jobManagerId);
184
public TaskManagerMetricGroup addTaskManager(Configuration config, String hostname, String taskManagerId);
185
186
protected String getGroupName(String name);
187
}
188
```
189
190
### JobManagerMetricGroup
191
192
Metric group for JobManager-specific metrics.
193
194
```java { .api }
195
public class JobManagerMetricGroup extends ComponentMetricGroup {
196
public JobManagerMetricGroup(MetricRegistry registry, String hostname, String jobManagerId);
197
198
public JobMetricGroup addJob(JobGraph jobGraph);
199
public JobMetricGroup addJob(JobID jobId, String jobName);
200
201
public String hostname();
202
public String jobManagerId();
203
204
protected String getGroupName(String name);
205
}
206
```
207
208
### TaskManagerMetricGroup
209
210
Metric group for TaskManager-specific metrics.
211
212
```java { .api }
213
public class TaskManagerMetricGroup extends ComponentMetricGroup {
214
public TaskManagerMetricGroup(MetricRegistry registry, String hostname, String taskManagerId);
215
216
public TaskMetricGroup addTaskForJob(JobID jobId, String jobName, JobVertexID jobVertexId,
217
ExecutionAttemptID executionAttemptId, String taskName,
218
int subtaskIndex, int attemptNumber);
219
220
public String hostname();
221
public String taskManagerId();
222
223
protected String getGroupName(String name);
224
}
225
```
226
227
## Reporters
228
229
### MetricReporter
230
231
Base interface for metric reporters that output metrics to external monitoring systems.
232
233
```java { .api }
234
public interface MetricReporter extends AutoCloseable {
235
void open(MetricConfig config);
236
237
void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
238
void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
239
240
@Override
241
void close();
242
}
243
```
244
245
### ScheduledDropwizardReporter
246
247
Reporter that integrates with Dropwizard metrics and supports scheduled reporting.
248
249
```java { .api }
250
public abstract class ScheduledDropwizardReporter implements MetricReporter, Scheduled {
251
protected final com.codahale.metrics.MetricRegistry registry = new com.codahale.metrics.MetricRegistry();
252
253
@Override
254
public void open(MetricConfig config);
255
256
@Override
257
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
258
259
@Override
260
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
261
262
public abstract void report();
263
264
protected com.codahale.metrics.MetricRegistry getRegistry();
265
266
@Override
267
public void close();
268
}
269
```
270
271
### Scheduled
272
273
Interface for reporters that support scheduled/periodic reporting.
274
275
```java { .api }
276
public interface Scheduled {
277
void report();
278
}
279
```
280
281
## Configuration
282
283
### MetricConfig
284
285
Configuration container for metric reporters.
286
287
```java { .api }
288
public class MetricConfig {
289
public static final String REPORTER_CLASS = "class";
290
public static final String REPORTER_INTERVAL = "interval";
291
292
public String getString(String key, String defaultValue);
293
public int getInteger(String key, int defaultValue);
294
public long getLong(String key, long defaultValue);
295
public boolean getBoolean(String key, boolean defaultValue);
296
297
public Properties getProperties();
298
299
public void setString(String key, String value);
300
public void setInteger(String key, int value);
301
public void setLong(String key, long value);
302
public void setBoolean(String key, boolean value);
303
}
304
```
305
306
### ReporterSetup
307
308
Configuration setup for individual metric reporters.
309
310
```java { .api }
311
public class ReporterSetup {
312
public ReporterSetup(String name, MetricConfig configuration);
313
314
public String getName();
315
public MetricConfig getConfiguration();
316
317
public Optional<String> getClassName();
318
public Optional<String> getFactoryClassName();
319
public Optional<Long> getIntervalSettings();
320
}
321
```
322
323
## Scope and Formatting
324
325
### ScopeFormats
326
327
Defines scope format patterns for different component types.
328
329
```java { .api }
330
public class ScopeFormats {
331
public static final ScopeFormats fromConfig(Configuration config);
332
333
public String[] getJobManagerFormat();
334
public String[] getTaskManagerFormat();
335
public String[] getJobFormat();
336
public String[] getTaskFormat();
337
public String[] getOperatorFormat();
338
339
public String getJobManagerScope(Configuration config, String hostname, String jmId);
340
public String getTaskManagerScope(Configuration config, String hostname, String tmId);
341
342
// Format variables
343
public static final String SCOPE_HOST = "<host>";
344
public static final String SCOPE_TASKMANAGER_ID = "<tm_id>";
345
public static final String SCOPE_JOB_ID = "<job_id>";
346
public static final String SCOPE_JOB_NAME = "<job_name>";
347
public static final String SCOPE_TASK_VERTEX_ID = "<task_id>";
348
public static final String SCOPE_TASK_NAME = "<task_name>";
349
public static final String SCOPE_TASK_SUBTASK_INDEX = "<subtask_index>";
350
public static final String SCOPE_TASK_ATTEMPT_ID = "<task_attempt_id>";
351
public static final String SCOPE_TASK_ATTEMPT_NUM = "<task_attempt_num>";
352
public static final String SCOPE_OPERATOR_ID = "<operator_id>";
353
public static final String SCOPE_OPERATOR_NAME = "<operator_name>";
354
}
355
```
356
357
### CharacterFilter
358
359
Interface for filtering characters in metric names and identifiers.
360
361
```java { .api }
362
public interface CharacterFilter {
363
String filterCharacters(String input);
364
365
CharacterFilter NO_OP_FILTER = input -> input;
366
}
367
```
368
369
## Usage Examples
370
371
### Basic Metrics Registration
372
373
```java
374
import org.apache.flink.metrics.*;
375
import org.apache.flink.runtime.metrics.MetricRegistry;
376
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
377
378
// Create metrics registry
379
MetricRegistryConfiguration config = MetricRegistryConfiguration.defaultMetricRegistryConfiguration();
380
MetricRegistry metricRegistry = MetricRegistry.create(config);
381
382
// Create metric group
383
ComponentMetricGroup rootGroup = new ComponentMetricGroup(metricRegistry, "MyApplication");
384
MetricGroup jobGroup = rootGroup.addGroup("job", "data-processing");
385
386
// Register different metric types
387
Counter recordsProcessed = jobGroup.counter("records_processed");
388
Gauge<Long> memoryUsage = jobGroup.gauge("memory_usage", () -> Runtime.getRuntime().totalMemory());
389
Meter throughput = jobGroup.meter("throughput", new MeterView(recordsProcessed, 60));
390
Histogram latency = jobGroup.histogram("latency", new DescriptiveStatisticsHistogram(1000));
391
392
// Use metrics in application
393
for (int i = 0; i < 1000; i++) {
394
// Process record
395
processRecord();
396
397
// Update metrics
398
recordsProcessed.inc();
399
latency.update(System.currentTimeMillis() - startTime);
400
throughput.markEvent();
401
}
402
403
// Clean up
404
metricRegistry.close();
405
```
406
407
### Custom Metric Reporter
408
409
```java
410
import org.apache.flink.metrics.reporter.MetricReporter;
411
import org.apache.flink.metrics.*;
412
413
public class CustomMetricReporter implements MetricReporter {
414
private final Map<String, Metric> metrics = new ConcurrentHashMap<>();
415
416
@Override
417
public void open(MetricConfig config) {
418
String endpoint = config.getString("endpoint", "localhost:8080");
419
String interval = config.getString("interval", "10");
420
421
System.out.println("Opening custom reporter with endpoint: " + endpoint);
422
423
// Start reporting thread
424
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
425
scheduler.scheduleAtFixedRate(this::report, 0, Long.parseLong(interval), TimeUnit.SECONDS);
426
}
427
428
@Override
429
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
430
String fullName = group.getMetricIdentifier(metricName);
431
metrics.put(fullName, metric);
432
System.out.println("Added metric: " + fullName);
433
}
434
435
@Override
436
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
437
String fullName = group.getMetricIdentifier(metricName);
438
metrics.remove(fullName);
439
System.out.println("Removed metric: " + fullName);
440
}
441
442
private void report() {
443
System.out.println("=== Custom Metric Report ===");
444
for (Map.Entry<String, Metric> entry : metrics.entrySet()) {
445
String name = entry.getKey();
446
Metric metric = entry.getValue();
447
448
if (metric instanceof Counter) {
449
Counter counter = (Counter) metric;
450
System.out.println(name + " (Counter): " + counter.getCount());
451
} else if (metric instanceof Gauge) {
452
Gauge<?> gauge = (Gauge<?>) metric;
453
System.out.println(name + " (Gauge): " + gauge.getValue());
454
} else if (metric instanceof Meter) {
455
Meter meter = (Meter) metric;
456
System.out.println(name + " (Meter): " + meter.getRate() + " events/sec");
457
} else if (metric instanceof Histogram) {
458
Histogram histogram = (Histogram) metric;
459
HistogramStatistics stats = histogram.getStatistics();
460
System.out.println(name + " (Histogram): count=" + histogram.getCount() +
461
", mean=" + stats.getMean() + ", max=" + stats.getMax());
462
}
463
}
464
}
465
466
@Override
467
public void close() {
468
System.out.println("Closing custom reporter");
469
}
470
}
471
```
472
473
### Metrics Configuration
474
475
```java
476
import org.apache.flink.configuration.Configuration;
477
import org.apache.flink.configuration.MetricOptions;
478
479
// Configure metrics system
480
Configuration config = new Configuration();
481
482
// Enable metrics
483
config.setString(MetricOptions.REPORTERS_LIST, "prometheus,slf4j");
484
485
// Configure Prometheus reporter
486
config.setString("metrics.reporter.prometheus.class", "org.apache.flink.metrics.prometheus.PrometheusReporter");
487
config.setInteger("metrics.reporter.prometheus.port", 9249);
488
489
// Configure SLF4J reporter
490
config.setString("metrics.reporter.slf4j.class", "org.apache.flink.metrics.slf4j.Slf4jReporter");
491
config.setString("metrics.reporter.slf4j.interval", "10 SECONDS");
492
493
// Configure scope formats
494
config.setString(MetricOptions.SCOPE_NAMING_JM, "<host>.jobmanager.<jm_id>");
495
config.setString(MetricOptions.SCOPE_NAMING_TM, "<host>.taskmanager.<tm_id>");
496
config.setString(MetricOptions.SCOPE_NAMING_JOB, "<host>.jobmanager.<jm_id>.<job_name>");
497
config.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>");
498
499
// Set metric exclusions
500
config.setString("metrics.reporter.prometheus.excludes", "*.taskmanager.Status.JVM.CPU.*;*.taskmanager.Status.JVM.Memory.Heap.Max");
501
502
// Create registry with configuration
503
MetricRegistryConfiguration registryConfig = MetricRegistryConfiguration.fromConfiguration(config);
504
MetricRegistry metricRegistry = MetricRegistry.create(registryConfig);
505
```
506
507
### Task-Level Metrics
508
509
```java
510
import org.apache.flink.streaming.api.functions.ProcessFunction;
511
import org.apache.flink.metrics.Counter;
512
import org.apache.flink.metrics.Histogram;
513
import org.apache.flink.metrics.Gauge;
514
515
public class MetricsAwareProcessFunction extends ProcessFunction<String, String> {
516
private transient Counter recordsProcessed;
517
private transient Counter recordsFiltered;
518
private transient Histogram processingLatency;
519
private transient Gauge<Long> currentBacklog;
520
521
private volatile long backlogSize = 0;
522
523
@Override
524
public void open(Configuration parameters) throws Exception {
525
super.open(parameters);
526
527
// Get metric group from runtime context
528
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
529
530
// Register metrics
531
recordsProcessed = metricGroup.counter("records_processed");
532
recordsFiltered = metricGroup.counter("records_filtered");
533
processingLatency = metricGroup.histogram("processing_latency_ms",
534
new DescriptiveStatisticsHistogram(10000));
535
currentBacklog = metricGroup.gauge("current_backlog", () -> backlogSize);
536
537
// Register custom metrics with specific names
538
metricGroup.gauge("memory_usage_mb", () ->
539
(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024);
540
}
541
542
@Override
543
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
544
long startTime = System.currentTimeMillis();
545
546
try {
547
// Process the record
548
if (shouldFilter(value)) {
549
recordsFiltered.inc();
550
return;
551
}
552
553
String processed = processRecord(value);
554
out.collect(processed);
555
556
recordsProcessed.inc();
557
558
} finally {
559
// Track processing latency
560
long latency = System.currentTimeMillis() - startTime;
561
processingLatency.update(latency);
562
563
// Update backlog estimate
564
backlogSize = estimateBacklogSize();
565
}
566
}
567
568
private boolean shouldFilter(String value) {
569
// Filtering logic
570
return value.isEmpty();
571
}
572
573
private String processRecord(String value) {
574
// Processing logic
575
return value.toUpperCase();
576
}
577
578
private long estimateBacklogSize() {
579
// Estimate current backlog size
580
return 0; // Simplified for example
581
}
582
}
583
```
584
585
### Operator Metrics
586
587
```java
588
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
589
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
590
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
591
592
public class MetricsAwareOperator extends AbstractStreamOperator<String>
593
implements OneInputStreamOperator<String, String> {
594
595
private transient Counter inputRecords;
596
private transient Counter outputRecords;
597
private transient Meter inputRate;
598
private transient Histogram processingTime;
599
600
@Override
601
public void open() throws Exception {
602
super.open();
603
604
// Get operator metric group
605
MetricGroup operatorGroup = getMetricGroup();
606
607
// Register operator metrics
608
inputRecords = operatorGroup.counter("input_records");
609
outputRecords = operatorGroup.counter("output_records");
610
inputRate = operatorGroup.meter("input_rate", new MeterView(inputRecords));
611
processingTime = operatorGroup.histogram("processing_time_ns",
612
new DescriptiveStatisticsHistogram(1000));
613
614
// Add operator-specific metrics
615
operatorGroup.gauge("operator_busy_time_per_sec", () -> calculateBusyTime());
616
}
617
618
@Override
619
public void processElement(StreamRecord<String> element) throws Exception {
620
long startTime = System.nanoTime();
621
622
inputRecords.inc();
623
inputRate.markEvent();
624
625
try {
626
// Process element
627
String result = element.getValue().toLowerCase();
628
629
// Emit result
630
output.collect(new StreamRecord<>(result, element.getTimestamp()));
631
outputRecords.inc();
632
633
} finally {
634
// Track processing time
635
long duration = System.nanoTime() - startTime;
636
processingTime.update(duration);
637
}
638
}
639
640
private double calculateBusyTime() {
641
// Calculate operator busy time percentage
642
return 0.0; // Simplified for example
643
}
644
}
645
```
646
647
## Common Patterns
648
649
### Metric Lifecycle Management
650
651
```java
652
public class MetricsManager implements AutoCloseable {
653
private final MetricRegistry registry;
654
private final Map<String, MetricGroup> groups = new ConcurrentHashMap<>();
655
656
public MetricsManager(Configuration config) {
657
MetricRegistryConfiguration registryConfig = MetricRegistryConfiguration.fromConfiguration(config);
658
this.registry = MetricRegistry.create(registryConfig);
659
registry.startReporters(config);
660
}
661
662
public MetricGroup createGroup(String... path) {
663
String groupKey = String.join(".", path);
664
return groups.computeIfAbsent(groupKey, k -> {
665
ComponentMetricGroup root = new ComponentMetricGroup(registry, path[0]);
666
MetricGroup current = root;
667
for (int i = 1; i < path.length; i++) {
668
current = current.addGroup(path[i]);
669
}
670
return current;
671
});
672
}
673
674
public void removeGroup(String... path) {
675
String groupKey = String.join(".", path);
676
MetricGroup group = groups.remove(groupKey);
677
if (group != null) {
678
group.close();
679
}
680
}
681
682
@Override
683
public void close() {
684
groups.values().forEach(MetricGroup::close);
685
groups.clear();
686
687
registry.stopReporters();
688
registry.close();
689
}
690
}
691
```
692
693
### Conditional Metric Registration
694
695
```java
696
public class ConditionalMetrics {
697
private final MetricGroup metricGroup;
698
private final boolean metricsEnabled;
699
700
public ConditionalMetrics(MetricGroup metricGroup, Configuration config) {
701
this.metricGroup = metricGroup;
702
this.metricsEnabled = config.getBoolean("metrics.enabled", true);
703
}
704
705
public Counter createCounter(String name) {
706
if (metricsEnabled) {
707
return metricGroup.counter(name);
708
} else {
709
return new NoOpCounter();
710
}
711
}
712
713
public <T> Gauge<T> createGauge(String name, Supplier<T> valueSupplier) {
714
if (metricsEnabled) {
715
return metricGroup.gauge(name, Gauge.of(valueSupplier));
716
} else {
717
return new NoOpGauge<>();
718
}
719
}
720
721
// No-op implementations for when metrics are disabled
722
private static class NoOpCounter implements Counter {
723
@Override public void inc() {}
724
@Override public void inc(long n) {}
725
@Override public long getCount() { return 0; }
726
}
727
728
private static class NoOpGauge<T> implements Gauge<T> {
729
@Override public T getValue() { return null; }
730
}
731
}
732
```