0
# Metrics Testing
1
2
Comprehensive utilities for testing metrics reporting in Flink applications, including metric collection, validation, and assertion utilities using Hamcrest matchers.
3
4
## Capabilities
5
6
### Metric Listener
7
8
`MetricListener` captures and stores metrics registered under a root metric group for retrieval and testing.
9
10
```java { .api }
11
/**
12
* Listens for metric and group registration under a root metric group and stores them for retrieval
13
*/
14
public class MetricListener {
15
/**
16
* Delimiter for metric identifiers
17
*/
18
public static final String DELIMITER = ".";
19
20
/**
21
* Name of root metric group
22
*/
23
public static final String ROOT_METRIC_GROUP_NAME = "rootMetricGroup";
24
25
/**
26
* Creates new metric listener with testing registry
27
*/
28
public MetricListener();
29
30
/**
31
* Returns the root metric group for registering metrics
32
* @return MetricGroup for metric registration
33
*/
34
public MetricGroup getMetricGroup();
35
36
/**
37
* Gets registered metric by type and identifier
38
* @param metricType Class of the metric type to retrieve
39
* @param identifier Metric identifier components
40
* @return Optional containing the metric if found
41
*/
42
public <T extends Metric> Optional<T> getMetric(Class<T> metricType, String... identifier);
43
44
/**
45
* Gets registered Meter metric
46
* @param identifier Metric identifier components
47
* @return Optional containing the Meter if found
48
*/
49
public Optional<Meter> getMeter(String... identifier);
50
51
/**
52
* Gets registered Counter metric
53
* @param identifier Metric identifier components
54
* @return Optional containing the Counter if found
55
*/
56
public Optional<Counter> getCounter(String... identifier);
57
58
/**
59
* Gets registered Histogram metric
60
* @param identifier Metric identifier components
61
* @return Optional containing the Histogram if found
62
*/
63
public Optional<Histogram> getHistogram(String... identifier);
64
65
/**
66
* Gets registered Gauge metric
67
* @param identifier Metric identifier components
68
* @return Optional containing the Gauge if found
69
*/
70
public <T> Optional<Gauge<T>> getGauge(String... identifier);
71
}
72
```
73
74
**Usage Example:**
75
76
```java
77
import org.apache.flink.metrics.testutils.MetricListener;
78
import org.apache.flink.metrics.Counter;
79
import org.apache.flink.metrics.MetricGroup;
80
81
@Test
82
public void testMetricsReporting() throws Exception {
83
// Create metric listener
84
MetricListener metricListener = new MetricListener();
85
MetricGroup metricGroup = metricListener.getMetricGroup();
86
87
// Register a counter metric
88
Counter myCounter = metricGroup.counter("test", "myCounter");
89
myCounter.inc(5);
90
91
// Retrieve and verify the metric
92
Optional<Counter> retrievedCounter = metricListener.getCounter("test", "myCounter");
93
assertTrue(retrievedCounter.isPresent());
94
assertEquals(5L, retrievedCounter.get().getCount());
95
96
// Test with subgroups
97
MetricGroup subGroup = metricGroup.addGroup("subgroup");
98
Counter subCounter = subGroup.counter("subCounter");
99
subCounter.inc(10);
100
101
Optional<Counter> retrievedSubCounter = metricListener.getCounter("subgroup", "subCounter");
102
assertTrue(retrievedSubCounter.isPresent());
103
assertEquals(10L, retrievedSubCounter.get().getCount());
104
}
105
```
106
107
### Metric Matchers
108
109
`MetricMatchers` provides Hamcrest matchers for metric assertions in tests.
110
111
```java { .api }
112
/**
113
* Provides Hamcrest matchers for metric assertions in tests
114
*/
115
public class MetricMatchers {
116
117
/**
118
* Creates matcher for Gauge metrics
119
* @param valueMatcher Matcher for the gauge value
120
* @return Matcher that checks if metric is a Gauge with matching value
121
*/
122
public static <V, T extends Metric> Matcher<T> isGauge(Matcher<V> valueMatcher);
123
124
/**
125
* Creates matcher for Counter metrics
126
* @param valueMatcher Matcher for the counter value
127
* @return Matcher that checks if metric is a Counter with matching value
128
*/
129
public static <T extends Metric> Matcher<T> isCounter(Matcher<Long> valueMatcher);
130
}
131
```
132
133
**Usage Example:**
134
135
```java
136
import org.apache.flink.metrics.testutils.MetricListener;
137
import org.apache.flink.metrics.testutils.MetricMatchers;
138
import org.apache.flink.metrics.Counter;
139
import org.apache.flink.metrics.Gauge;
140
import org.apache.flink.metrics.MetricGroup;
141
import static org.hamcrest.MatcherAssert.assertThat;
142
import static org.hamcrest.Matchers.*;
143
144
@Test
145
public void testMetricMatchers() throws Exception {
146
MetricListener metricListener = new MetricListener();
147
MetricGroup metricGroup = metricListener.getMetricGroup();
148
149
// Test Counter matcher
150
Counter counter = metricGroup.counter("testCounter");
151
counter.inc(42);
152
153
Optional<Counter> retrievedCounter = metricListener.getCounter("testCounter");
154
assertTrue(retrievedCounter.isPresent());
155
156
// Use metric matcher to assert counter value
157
assertThat(retrievedCounter.get(), MetricMatchers.isCounter(equalTo(42L)));
158
assertThat(retrievedCounter.get(), MetricMatchers.isCounter(greaterThan(40L)));
159
160
// Test Gauge matcher
161
Gauge<Integer> gauge = metricGroup.gauge("testGauge", () -> 100);
162
163
Optional<Gauge<Integer>> retrievedGauge = metricListener.getGauge("testGauge");
164
assertTrue(retrievedGauge.isPresent());
165
166
// Use metric matcher to assert gauge value
167
assertThat(retrievedGauge.get(), MetricMatchers.isGauge(equalTo(100)));
168
assertThat(retrievedGauge.get(), MetricMatchers.isGauge(greaterThanOrEqualTo(100)));
169
}
170
```
171
172
## Complete Metrics Testing Example
173
174
Here's a comprehensive example showing how to test metrics in a Flink application:
175
176
```java
177
import org.apache.flink.api.common.functions.RichMapFunction;
178
import org.apache.flink.configuration.Configuration;
179
import org.apache.flink.metrics.Counter;
180
import org.apache.flink.metrics.Gauge;
181
import org.apache.flink.metrics.MetricGroup;
182
import org.apache.flink.metrics.testutils.MetricListener;
183
import org.apache.flink.metrics.testutils.MetricMatchers;
184
import org.apache.flink.streaming.api.datastream.DataStream;
185
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
186
import org.apache.flink.test.util.AbstractTestBase;
187
188
public class MetricsTestExample extends AbstractTestBase {
189
190
@Test
191
public void testJobMetrics() throws Exception {
192
MetricListener metricListener = new MetricListener();
193
194
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
195
env.setParallelism(1);
196
197
// Create a function that reports metrics
198
DataStream<String> input = env.fromElements("a", "b", "c", "d", "e");
199
DataStream<String> result = input.map(new MetricsReportingMapFunction(metricListener));
200
201
result.print();
202
env.execute("Metrics Test Job");
203
204
// Verify metrics were reported
205
Optional<Counter> processedCounter = metricListener.getCounter("processed_elements");
206
assertTrue("Counter should be present", processedCounter.isPresent());
207
assertThat(processedCounter.get(), MetricMatchers.isCounter(equalTo(5L)));
208
209
Optional<Gauge<Long>> lastProcessedTime = metricListener.getGauge("last_processed_time");
210
assertTrue("Gauge should be present", lastProcessedTime.isPresent());
211
assertThat(lastProcessedTime.get(), MetricMatchers.isGauge(greaterThan(0L)));
212
}
213
214
private static class MetricsReportingMapFunction extends RichMapFunction<String, String> {
215
private final MetricListener metricListener;
216
private Counter processedElements;
217
private volatile long lastProcessedTime = 0;
218
219
public MetricsReportingMapFunction(MetricListener metricListener) {
220
this.metricListener = metricListener;
221
}
222
223
@Override
224
public void open(Configuration parameters) {
225
MetricGroup metricGroup = metricListener.getMetricGroup();
226
processedElements = metricGroup.counter("processed_elements");
227
metricGroup.gauge("last_processed_time", () -> lastProcessedTime);
228
}
229
230
@Override
231
public String map(String value) {
232
processedElements.inc();
233
lastProcessedTime = System.currentTimeMillis();
234
return value.toUpperCase();
235
}
236
}
237
}
238
```
239
240
## Testing Different Metric Types
241
242
### Testing Histograms
243
244
```java
245
import org.apache.flink.metrics.Histogram;
246
247
@Test
248
public void testHistogramMetrics() throws Exception {
249
MetricListener metricListener = new MetricListener();
250
MetricGroup metricGroup = metricListener.getMetricGroup();
251
252
// Register histogram
253
Histogram histogram = metricGroup.histogram("response_time");
254
255
// Report some values
256
histogram.update(100);
257
histogram.update(200);
258
histogram.update(150);
259
260
// Retrieve and verify
261
Optional<Histogram> retrievedHistogram = metricListener.getHistogram("response_time");
262
assertTrue(retrievedHistogram.isPresent());
263
assertEquals(3, retrievedHistogram.get().getCount());
264
}
265
```
266
267
### Testing Meters
268
269
```java
270
import org.apache.flink.metrics.Meter;
271
272
@Test
273
public void testMeterMetrics() throws Exception {
274
MetricListener metricListener = new MetricListener();
275
MetricGroup metricGroup = metricListener.getMetricGroup();
276
277
// Register meter
278
Meter meter = metricGroup.meter("requests_per_second");
279
280
// Mark some events
281
meter.markEvent();
282
meter.markEvent(5);
283
284
// Retrieve and verify
285
Optional<Meter> retrievedMeter = metricListener.getMeter("requests_per_second");
286
assertTrue(retrievedMeter.isPresent());
287
assertEquals(6, retrievedMeter.get().getCount());
288
}
289
```