0
# Flink Metrics Core
1
2
Core metrics interfaces and implementations for Apache Flink stream processing framework. This library provides the foundational metrics system that enables monitoring and observability within Flink applications, supporting essential metric types, hierarchical organization, and pluggable reporting to external monitoring systems.
3
4
## Package Information
5
6
- **Package Name**: flink-metrics-core
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Group ID**: org.apache.flink
10
- **Artifact ID**: flink-metrics-core
11
- **Installation**: Include in your Maven project:
12
13
```xml
14
<dependency>
15
<groupId>org.apache.flink</groupId>
16
<artifactId>flink-metrics-core</artifactId>
17
<version>1.20.2</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
```java
24
import org.apache.flink.metrics.*;
25
import org.apache.flink.metrics.groups.*;
26
import org.apache.flink.metrics.reporter.*;
27
```
28
29
For specific components:
30
31
```java
32
import org.apache.flink.metrics.Counter;
33
import org.apache.flink.metrics.Gauge;
34
import org.apache.flink.metrics.Meter;
35
import org.apache.flink.metrics.Histogram;
36
import org.apache.flink.metrics.MetricGroup;
37
```
38
39
## Basic Usage
40
41
```java
42
import org.apache.flink.metrics.*;
43
44
// Working with counters
45
Counter eventCounter = metricGroup.counter("events");
46
eventCounter.inc();
47
eventCounter.inc(5);
48
49
// Working with gauges
50
Gauge<Integer> queueSizeGauge = new Gauge<Integer>() {
51
@Override
52
public Integer getValue() {
53
return queue.size();
54
}
55
};
56
metricGroup.gauge("queueSize", queueSizeGauge);
57
58
// Working with meters
59
Meter throughputMeter = new MeterView(60); // 60 second time window
60
metricGroup.meter("throughput", throughputMeter);
61
throughputMeter.markEvent();
62
63
// Creating metric hierarchies
64
MetricGroup operatorGroup = rootGroup.addGroup("operators");
65
MetricGroup specificOpGroup = operatorGroup.addGroup("map-operator-1");
66
Counter opCounter = specificOpGroup.counter("records-processed");
67
```
68
69
## Architecture
70
71
Flink Metrics Core follows a hierarchical design built around several key components:
72
73
- **Core Metric Types**: `Counter`, `Gauge`, `Meter`, and `Histogram` provide the fundamental metric abstractions
74
- **Metric Groups**: `MetricGroup` creates hierarchical namespaces for organizing metrics logically
75
- **Reporter Framework**: `MetricReporter` and `MetricReporterFactory` enable pluggable export to external systems
76
- **Specialized Groups**: Component-specific metric groups for operators, sources, sinks, and coordinators
77
- **Configuration System**: `MetricConfig` provides type-safe configuration management
78
- **Tracing Support**: Experimental `Span` and `TraceReporter` for distributed tracing capabilities
79
80
This design enables flexible metric collection across Flink's distributed runtime while maintaining performance and providing extensibility for custom metric types and reporting backends.
81
82
## Capabilities
83
84
### Core Metric Types
85
86
Essential metric interfaces for measuring different aspects of system behavior. Includes counters for event counting, gauges for instantaneous values, meters for rate measurement, and histograms for distribution statistics.
87
88
```java { .api }
89
interface Counter extends Metric {
90
void inc();
91
void inc(long n);
92
void dec();
93
void dec(long n);
94
long getCount();
95
}
96
97
interface Gauge<T> extends Metric {
98
T getValue();
99
}
100
101
interface Meter extends Metric {
102
void markEvent();
103
void markEvent(long n);
104
double getRate();
105
long getCount();
106
}
107
108
interface Histogram extends Metric {
109
void update(long value);
110
long getCount();
111
HistogramStatistics getStatistics();
112
}
113
```
114
115
[Core Metrics](./core-metrics.md)
116
117
### Metric Organization
118
119
Hierarchical metric organization system for creating logical namespaces and managing metric lifecycles. Supports both flat and nested structures with variable interpolation and scoping.
120
121
```java { .api }
122
interface MetricGroup {
123
Counter counter(int name);
124
Counter counter(String name);
125
<C extends Counter> C counter(int name, C counter);
126
<C extends Counter> C counter(String name, C counter);
127
<T, G extends Gauge<T>> G gauge(int name, G gauge);
128
<T, G extends Gauge<T>> G gauge(String name, G gauge);
129
<H extends Histogram> H histogram(int name, H histogram);
130
<H extends Histogram> H histogram(String name, H histogram);
131
<M extends Meter> M meter(int name, M meter);
132
<M extends Meter> M meter(String name, M meter);
133
MetricGroup addGroup(int name);
134
MetricGroup addGroup(String name);
135
MetricGroup addGroup(String key, String value);
136
String[] getScopeComponents();
137
Map<String, String> getAllVariables();
138
String getMetricIdentifier(String metricName);
139
String getMetricIdentifier(String metricName, CharacterFilter filter);
140
void addSpan(SpanBuilder spanBuilder); // @Experimental
141
}
142
```
143
144
[Metric Groups](./metric-groups.md)
145
146
### Reporter Framework
147
148
Pluggable system for exporting metrics to external monitoring systems. Supports both push and pull patterns with configurable scheduling and lifecycle management.
149
150
```java { .api }
151
interface MetricReporter {
152
void open(MetricConfig config);
153
void close();
154
void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
155
void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
156
}
157
158
interface MetricReporterFactory {
159
MetricReporter createMetricReporter(Properties properties);
160
}
161
162
interface Scheduled {
163
void report();
164
}
165
```
166
167
[Reporters](./reporters.md)
168
169
### Metric Implementations
170
171
Concrete implementations of metric interfaces providing both thread-safe and performance-optimized variants. Includes simple counters, meter views with time windowing, and utility implementations.
172
173
```java { .api }
174
class SimpleCounter implements Counter { /* non-thread-safe */ }
175
class ThreadSafeSimpleCounter implements Counter { /* thread-safe */ }
176
class MeterView implements Meter, View { /* time-windowed rate calculation */ }
177
```
178
179
[Implementations](./implementations.md)
180
181
### Specialized Metric Groups
182
183
Component-specific metric groups tailored for different parts of the Flink runtime. Provides specialized interfaces for operators, sources, sinks, coordinators, and other Flink components.
184
185
```java { .api }
186
interface OperatorMetricGroup extends MetricGroup {
187
OperatorIOMetricGroup getIOMetricGroup();
188
}
189
190
interface SourceReaderMetricGroup extends OperatorMetricGroup {
191
Counter getNumRecordsInErrorsCounter();
192
void setPendingBytesGauge(Gauge<Long> pendingBytesGauge);
193
void setPendingRecordsGauge(Gauge<Long> pendingRecordsGauge);
194
}
195
interface SinkWriterMetricGroup extends OperatorMetricGroup {
196
Counter getNumRecordsOutErrorsCounter();
197
Counter getNumRecordsSendErrorsCounter();
198
Counter getNumRecordsSendCounter();
199
Counter getNumBytesSendCounter();
200
void setCurrentSendTimeGauge(Gauge<Long> currentSendTimeGauge);
201
}
202
interface SinkCommitterMetricGroup extends OperatorMetricGroup {
203
Counter getNumCommittablesTotalCounter();
204
Counter getNumCommittablesFailureCounter();
205
Counter getNumCommittablesRetryCounter();
206
Counter getNumCommittablesSuccessCounter();
207
Counter getNumCommittablesAlreadyCommittedCounter();
208
void setCurrentPendingCommittablesGauge(Gauge<Integer> currentPendingCommittablesGauge);
209
}
210
211
interface CacheMetricGroup extends MetricGroup {
212
void hitCounter(Counter hitCounter);
213
void missCounter(Counter missCounter);
214
void loadCounter(Counter loadCounter);
215
void numLoadFailuresCounter(Counter numLoadFailuresCounter);
216
void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);
217
void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);
218
void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
219
}
220
221
interface SplitEnumeratorMetricGroup extends MetricGroup { }
222
```
223
224
[Specialized Groups](./specialized-groups.md)
225
226
### Configuration and Utilities
227
228
Configuration management and utility classes for metric system setup. Includes type-safe property access, character filtering for metric names, and metric type enumeration.
229
230
```java { .api }
231
class MetricConfig extends Properties {
232
String getString(String key, String defaultValue);
233
int getInteger(String key, int defaultValue);
234
long getLong(String key, long defaultValue);
235
float getFloat(String key, float defaultValue);
236
double getDouble(String key, double defaultValue);
237
boolean getBoolean(String key, boolean defaultValue);
238
}
239
240
interface CharacterFilter {
241
String filterCharacters(String input);
242
}
243
244
enum MetricType { COUNTER, METER, GAUGE, HISTOGRAM }
245
```
246
247
[Configuration](./configuration.md)
248
249
### Tracing Support
250
251
Experimental distributed tracing capabilities for capturing execution spans across Flink's distributed runtime. Supports span creation, attribute attachment, and pluggable trace reporting.
252
253
```java { .api }
254
interface Span {
255
static SpanBuilder builder(Class<?> classScope, String name);
256
String getScope();
257
String getName();
258
long getStartTsMillis();
259
long getEndTsMillis();
260
Map<String, Object> getAttributes();
261
}
262
263
class SpanBuilder {
264
SpanBuilder(Class<?> classScope, String name);
265
Span build();
266
SpanBuilder setStartTsMillis(long startTsMillis);
267
SpanBuilder setEndTsMillis(long endTsMillis);
268
SpanBuilder setAttribute(String key, String value);
269
SpanBuilder setAttribute(String key, long value);
270
SpanBuilder setAttribute(String key, double value);
271
}
272
273
interface TraceReporter {
274
void open(MetricConfig config);
275
void close();
276
void notifyOfAddedSpan(Span span);
277
}
278
```
279
280
[Tracing](./tracing.md)
281
282
## Types
283
284
```java { .api }
285
interface Metric {
286
default MetricType getMetricType();
287
}
288
289
abstract class HistogramStatistics {
290
public abstract double getQuantile(double quantile);
291
public abstract long[] getValues();
292
public abstract int size();
293
public abstract double getMean();
294
public abstract double getStdDev();
295
public abstract long getMax();
296
public abstract long getMin();
297
}
298
299
interface View {
300
int UPDATE_INTERVAL_SECONDS = 5;
301
void update();
302
}
303
304
interface LogicalScopeProvider {
305
String getLogicalScope(CharacterFilter filter);
306
String getLogicalScope(CharacterFilter filter, char delimiter);
307
MetricGroup getWrappedMetricGroup();
308
static LogicalScopeProvider castFrom(MetricGroup metricGroup);
309
}
310
```