0
# Core Metric Types
1
2
Essential metric interfaces for measuring different aspects of system behavior within Flink applications. These core types provide the foundation for all monitoring and observability in the Flink metrics system.
3
4
## Capabilities
5
6
### Counter Interface
7
8
Metric for counting discrete events and maintaining running totals. Supports both increment and decrement operations with atomic guarantees.
9
10
```java { .api }
11
/**
12
* A Counter is a Metric that measures a count.
13
*/
14
public interface Counter extends Metric {
15
/** Increment the current count by 1. */
16
void inc();
17
18
/**
19
* Increment the current count by the given value.
20
* @param n value to increment the current count by
21
*/
22
void inc(long n);
23
24
/** Decrement the current count by 1. */
25
void dec();
26
27
/**
28
* Decrement the current count by the given value.
29
* @param n value to decrement the current count by
30
*/
31
void dec(long n);
32
33
/**
34
* Returns the current count.
35
* @return current count
36
*/
37
long getCount();
38
}
39
```
40
41
**Usage Examples:**
42
43
```java
44
// Create and use a counter
45
Counter processedRecords = metricGroup.counter("records-processed");
46
47
// Increment operations
48
processedRecords.inc(); // +1
49
processedRecords.inc(10); // +10
50
51
// Decrement operations
52
processedRecords.dec(); // -1
53
processedRecords.dec(5); // -5
54
55
// Read current value
56
long currentCount = processedRecords.getCount();
57
```
58
59
### Gauge Interface
60
61
Metric for measuring instantaneous values that can fluctuate over time. Gauges are read-only from the metrics perspective and derive their values from application state.
62
63
```java { .api }
64
/**
65
* A Gauge is a Metric that calculates a specific value at a point in time.
66
* @param <T> the type of the measured value
67
*/
68
public interface Gauge<T> extends Metric {
69
/**
70
* Calculates and returns the measured value.
71
* @return calculated value
72
*/
73
T getValue();
74
}
75
```
76
77
**Usage Examples:**
78
79
```java
80
// Create gauge for queue size
81
Gauge<Integer> queueSizeGauge = new Gauge<Integer>() {
82
@Override
83
public Integer getValue() {
84
return taskQueue.size();
85
}
86
};
87
metricGroup.gauge("queue-size", queueSizeGauge);
88
89
// Create gauge for memory usage
90
Gauge<Double> memoryUsageGauge = () -> {
91
Runtime runtime = Runtime.getRuntime();
92
long used = runtime.totalMemory() - runtime.freeMemory();
93
return (double) used / runtime.maxMemory();
94
};
95
metricGroup.gauge("memory-usage-ratio", memoryUsageGauge);
96
97
// Lambda expression gauge
98
Gauge<String> statusGauge = () -> processor.getCurrentStatus().name();
99
metricGroup.gauge("processor-status", statusGauge);
100
```
101
102
### Meter Interface
103
104
Metric for measuring throughput and event rates over time. Provides both instantaneous rate and cumulative event count.
105
106
```java { .api }
107
/**
108
* Metric for measuring throughput.
109
*/
110
public interface Meter extends Metric {
111
/** Mark occurrence of an event. */
112
void markEvent();
113
114
/**
115
* Mark occurrence of multiple events.
116
* @param n number of events occurred
117
*/
118
void markEvent(long n);
119
120
/**
121
* Returns the current rate of events per second.
122
* @return current rate of events per second
123
*/
124
double getRate();
125
126
/**
127
* Get number of events marked on the meter.
128
* @return number of events marked on the meter
129
*/
130
long getCount();
131
}
132
```
133
134
**Usage Examples:**
135
136
```java
137
// Create a meter with 60-second time window
138
Meter throughputMeter = new MeterView(60);
139
metricGroup.meter("throughput", throughputMeter);
140
141
// Mark single events
142
throughputMeter.markEvent();
143
144
// Mark batch events
145
throughputMeter.markEvent(batchSize);
146
147
// Read current rate and total count
148
double currentRate = throughputMeter.getRate(); // events/second
149
long totalEvents = throughputMeter.getCount(); // total events marked
150
```
151
152
### Histogram Interface
153
154
Metric for recording value distributions and calculating statistical measures. Enables measurement of latencies, sizes, and other value distributions.
155
156
```java { .api }
157
/**
158
* Histogram interface to be used with Flink's metrics system.
159
* The histogram allows to record values, get the current count of recorded
160
* values and create histogram statistics for the currently seen elements.
161
*/
162
public interface Histogram extends Metric {
163
/**
164
* Update the histogram with the given value.
165
* @param value Value to update the histogram with
166
*/
167
void update(long value);
168
169
/**
170
* Get the count of seen elements.
171
* @return Count of seen elements
172
*/
173
long getCount();
174
175
/**
176
* Create statistics for the currently recorded elements.
177
* @return Statistics about the currently recorded elements
178
*/
179
HistogramStatistics getStatistics();
180
}
181
```
182
183
**Histogram Statistics:**
184
185
```java { .api }
186
/**
187
* Histogram statistics represent the current snapshot of elements
188
* recorded in the histogram.
189
*/
190
public abstract class HistogramStatistics {
191
/**
192
* Returns the value for the given quantile based on the histogram statistics.
193
* @param quantile Quantile to calculate the value for
194
* @return Value for the given quantile
195
*/
196
public abstract double getQuantile(double quantile);
197
198
/**
199
* Returns the elements of the statistics' sample.
200
* @return Elements of the statistics' sample
201
*/
202
public abstract long[] getValues();
203
204
/**
205
* Returns the size of the statistics' sample.
206
* @return Size of the statistics' sample
207
*/
208
public abstract int size();
209
210
/**
211
* Returns the mean of the histogram values.
212
* @return Mean of the histogram values
213
*/
214
public abstract double getMean();
215
216
/**
217
* Returns the standard deviation of the distribution.
218
* @return Standard deviation of histogram distribution
219
*/
220
public abstract double getStdDev();
221
222
/**
223
* Returns the maximum value of the histogram.
224
* @return Maximum value of the histogram
225
*/
226
public abstract long getMax();
227
228
/**
229
* Returns the minimum value of the histogram.
230
* @return Minimum value of the histogram
231
*/
232
public abstract long getMin();
233
}
234
```
235
236
**Usage Examples:**
237
238
```java
239
// Create and use histogram
240
Histogram latencyHistogram = // ... custom histogram implementation
241
metricGroup.histogram("request-latency", latencyHistogram);
242
243
// Record values
244
latencyHistogram.update(45); // 45ms latency
245
latencyHistogram.update(67); // 67ms latency
246
latencyHistogram.update(23); // 23ms latency
247
248
// Get statistics
249
HistogramStatistics stats = latencyHistogram.getStatistics();
250
double p95 = stats.getQuantile(0.95); // 95th percentile
251
double mean = stats.getMean(); // average latency
252
double stdDev = stats.getStdDev(); // standard deviation
253
long max = stats.getMax(); // maximum latency
254
long min = stats.getMin(); // minimum latency
255
int sampleSize = stats.size(); // number of samples
256
```
257
258
### Base Metric Interface
259
260
Common interface implemented by all metric types, providing metric type identification.
261
262
```java { .api }
263
/**
264
* Common super interface for all metrics.
265
*/
266
public interface Metric {
267
/**
268
* Returns the metric type. Default implementation throws
269
* UnsupportedOperationException for custom metric types.
270
* @return MetricType enum value
271
*/
272
default MetricType getMetricType() {
273
throw new UnsupportedOperationException("Custom metric types are not supported.");
274
}
275
}
276
```
277
278
### Metric Type Enumeration
279
280
Enumeration defining the standard metric types supported by Flink.
281
282
```java { .api }
283
/**
284
* Enum describing the different metric types.
285
*/
286
public enum MetricType {
287
COUNTER,
288
METER,
289
GAUGE,
290
HISTOGRAM
291
}
292
```
293
294
**Usage Examples:**
295
296
```java
297
// Check metric type
298
if (someMetric.getMetricType() == MetricType.COUNTER) {
299
Counter counter = (Counter) someMetric;
300
long count = counter.getCount();
301
}
302
303
// Type-specific handling
304
switch (metric.getMetricType()) {
305
case COUNTER:
306
handleCounter((Counter) metric);
307
break;
308
case GAUGE:
309
handleGauge((Gauge<?>) metric);
310
break;
311
case METER:
312
handleMeter((Meter) metric);
313
break;
314
case HISTOGRAM:
315
handleHistogram((Histogram) metric);
316
break;
317
}
318
```