0
# Metrics Testing
1
2
Fluent assertions and metric listeners for validating Flink metrics including counters, gauges, histograms, and meters with easy retrieval and verification capabilities. Enables comprehensive testing of application metrics and monitoring.
3
4
## Capabilities
5
6
### Metric Assertions
7
8
AssertJ-style fluent assertions for Flink metrics providing type-safe verification of metric values with readable error messages.
9
10
```java { .api }
11
public class MetricAssertions {
12
public static CounterAssert assertThatCounter(Metric actual);
13
public static <T> GaugeAssert<T> assertThatGauge(Metric actual);
14
}
15
```
16
17
#### Counter Assertions
18
19
Assertions specifically for Counter metrics with value comparison capabilities.
20
21
```java { .api }
22
public static class CounterAssert extends AbstractAssert<CounterAssert, Counter> {
23
public CounterAssert isEqualTo(Object expected);
24
}
25
```
26
27
#### Gauge Assertions
28
29
Assertions for Gauge metrics supporting both exact value comparison and tolerance-based comparisons for numeric values.
30
31
```java { .api }
32
public static class GaugeAssert<T> extends AbstractAssert<GaugeAssert<T>, Gauge<T>> {
33
public GaugeAssert<T> isEqualTo(Object expected);
34
public GaugeAssert<T> isCloseTo(long value, long epsilon);
35
}
36
```
37
38
#### Usage Example
39
40
```java
41
import org.apache.flink.metrics.testutils.MetricAssertions;
42
import org.apache.flink.metrics.Counter;
43
import org.apache.flink.metrics.Gauge;
44
45
// Test counter metrics
46
Counter recordsProcessed = getCounterMetric("records_processed");
47
MetricAssertions.assertThatCounter(recordsProcessed)
48
.isEqualTo(1000L);
49
50
// Test gauge metrics with exact value
51
Gauge<Double> cpuUsage = getGaugeMetric("cpu_usage");
52
MetricAssertions.assertThatGauge(cpuUsage)
53
.isEqualTo(0.75);
54
55
// Test gauge metrics with tolerance
56
Gauge<Long> memoryUsage = getGaugeMetric("memory_usage");
57
MetricAssertions.assertThatGauge(memoryUsage)
58
.isCloseTo(1024L, 50L);
59
```
60
61
### Metric Listener
62
63
Comprehensive metric listener that captures metric registration events and provides convenient retrieval methods for all metric types.
64
65
```java { .api }
66
public class MetricListener implements MetricReporter {
67
public MetricListener();
68
69
public MetricGroup getMetricGroup();
70
71
public <T extends Metric> Optional<T> getMetric(Class<T> metricType, String... identifier);
72
public Optional<Meter> getMeter(String... identifier);
73
public Optional<Counter> getCounter(String... identifier);
74
public Optional<Histogram> getHistogram(String... identifier);
75
public <T> Optional<Gauge<T>> getGauge(String... identifier);
76
}
77
```
78
79
#### Usage Example
80
81
```java
82
import org.apache.flink.metrics.testutils.MetricListener;
83
import org.apache.flink.metrics.*;
84
85
// Create and register metric listener
86
MetricListener metricListener = new MetricListener();
87
88
// Register with Flink's metric system
89
Configuration config = new Configuration();
90
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." +
91
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
92
MetricListener.class.getName());
93
94
// Retrieve specific metric types
95
Optional<Counter> counter = metricListener.getCounter("job", "task", "records_processed");
96
if (counter.isPresent()) {
97
long count = counter.get().getCount();
98
assertEquals(1000L, count);
99
}
100
101
Optional<Gauge<Double>> gauge = metricListener.getGauge("job", "task", "cpu_usage");
102
if (gauge.isPresent()) {
103
double usage = gauge.get().getValue();
104
assertTrue(usage >= 0.0 && usage <= 1.0);
105
}
106
107
Optional<Histogram> histogram = metricListener.getHistogram("job", "task", "latency");
108
if (histogram.isPresent()) {
109
long count = histogram.get().getCount();
110
assertTrue(count > 0);
111
}
112
113
Optional<Meter> meter = metricListener.getMeter("job", "task", "throughput");
114
if (meter.isPresent()) {
115
double rate = meter.get().getRate();
116
assertTrue(rate > 0.0);
117
}
118
```
119
120
### Generic Metric Retrieval
121
122
Type-safe generic method for retrieving any metric type with compile-time type checking.
123
124
```java { .api }
125
public <T extends Metric> Optional<T> getMetric(Class<T> metricType, String... identifier);
126
```
127
128
#### Usage Example
129
130
```java
131
// Retrieve custom metric types
132
Optional<MyCustomMetric> customMetric =
133
metricListener.getMetric(MyCustomMetric.class, "job", "custom");
134
135
// Retrieve standard metrics with type safety
136
Optional<Counter> typedCounter =
137
metricListener.getMetric(Counter.class, "records", "processed");
138
```
139
140
## Integration Patterns
141
142
### Test Case Integration
143
144
Complete example of integrating metrics testing into a Flink test case with metric verification.
145
146
```java
147
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
148
import org.apache.flink.metrics.testutils.MetricListener;
149
import org.apache.flink.metrics.testutils.MetricAssertions;
150
import org.apache.flink.test.junit5.MiniClusterExtension;
151
import org.junit.jupiter.api.Test;
152
import org.junit.jupiter.api.extension.RegisterExtension;
153
154
public class MetricsIntegrationTest {
155
156
@RegisterExtension
157
static final MiniClusterExtension MINI_CLUSTER = new MiniClusterExtension();
158
159
@Test
160
void testJobMetrics() throws Exception {
161
MetricListener metricListener = new MetricListener();
162
163
StreamExecutionEnvironment env = MINI_CLUSTER.getTestStreamEnvironment();
164
165
// Configure metric reporting
166
env.getConfig().setGlobalJobParameters(
167
ParameterTool.fromMap(Collections.singletonMap("metrics.reporter", "test")));
168
169
// Create streaming job
170
env.fromElements(1, 2, 3, 4, 5)
171
.map(new CountingMapFunction())
172
.print();
173
174
// Execute job
175
JobExecutionResult result = env.execute("Metrics Test Job");
176
177
// Verify metrics were captured
178
Optional<Counter> processedRecords =
179
metricListener.getCounter("job", "map", "records_processed");
180
assertTrue(processedRecords.isPresent());
181
182
MetricAssertions.assertThatCounter(processedRecords.get())
183
.isEqualTo(5L);
184
}
185
}
186
```
187
188
### Custom Metric Verification
189
190
Pattern for testing custom business metrics in Flink applications.
191
192
```java
193
@Test
194
void testCustomBusinessMetrics() throws Exception {
195
MetricListener listener = new MetricListener();
196
197
// Execute job with custom metrics
198
executeJobWithCustomMetrics();
199
200
// Verify business-specific metrics
201
Optional<Gauge<Long>> orderCount = listener.getGauge("business", "orders", "total");
202
Optional<Counter> errorCount = listener.getCounter("business", "errors", "validation");
203
Optional<Histogram> processingTime = listener.getHistogram("business", "processing", "duration");
204
205
assertTrue(orderCount.isPresent());
206
assertTrue(errorCount.isPresent());
207
assertTrue(processingTime.isPresent());
208
209
// Verify metric values
210
assertTrue(orderCount.get().getValue() > 0);
211
assertEquals(0L, errorCount.get().getCount()); // No errors expected
212
assertTrue(processingTime.get().getCount() > 0);
213
}
214
```
215
216
### Metric Group Access
217
218
Access to the metric group hierarchy for advanced metric organization verification.
219
220
```java
221
@Test
222
void testMetricGroupStructure() {
223
MetricListener listener = new MetricListener();
224
MetricGroup rootGroup = listener.getMetricGroup();
225
226
// Verify metric group hierarchy
227
assertNotNull(rootGroup);
228
229
// Access nested metric groups and verify structure
230
// Implementation depends on your specific metric organization
231
}
232
```
233
234
## Error Handling and Debugging
235
236
The metrics testing utilities provide clear error messages when:
237
238
- Metrics are not found at the specified path
239
- Metric types don't match expected types
240
- Assertion values don't match actual metric values
241
- Tolerance-based comparisons fail for numeric metrics
242
243
This enables quick identification and resolution of metrics-related test failures, helping ensure your Flink applications properly expose and update their metrics during execution.