0
# Metrics and Monitoring
1
2
The metrics framework provides utilities for querying and validating Flink job metrics via REST API. This enables performance testing, behavior validation, and monitoring of connector operations during test execution.
3
4
## Capabilities
5
6
### Metric Querier
7
8
Main utility class for querying job metrics from Flink cluster via REST API.
9
10
```java { .api }
11
/**
12
* Utility for querying job metrics via Flink REST API
13
*/
14
public class MetricQuerier {
15
16
/**
17
* Create metric querier with configuration
18
* @param configuration Flink configuration for REST client
19
* @throws ConfigurationException if configuration is invalid
20
*/
21
public MetricQuerier(Configuration configuration) throws ConfigurationException;
22
23
/**
24
* Get job details including vertex information
25
* @param client REST client instance
26
* @param endpoint Test environment REST endpoint
27
* @param jobId Job ID to query
28
* @return Job details with vertex information
29
* @throws Exception if query fails
30
*/
31
public static JobDetailsInfo getJobDetails(
32
RestClient client,
33
TestEnvironment.Endpoint endpoint,
34
JobID jobId
35
) throws Exception;
36
37
/**
38
* Get aggregated metrics for source/sink operator
39
* @param endpoint Test environment REST endpoint
40
* @param jobId Job ID to query
41
* @param sourceOrSinkName Name of source or sink operator
42
* @param metricName Name of metric to query (e.g., "numRecordsIn", "numRecordsOut")
43
* @param filter Optional filter for metric selection (e.g., "Writer" for Sink V2)
44
* @return Aggregated metric value across all subtasks
45
* @throws Exception if query fails or metric not found
46
*/
47
public Double getAggregatedMetricsByRestAPI(
48
TestEnvironment.Endpoint endpoint,
49
JobID jobId,
50
String sourceOrSinkName,
51
String metricName,
52
String filter
53
) throws Exception;
54
55
/**
56
* Get list of available metrics for job vertex
57
* @param endpoint Test environment REST endpoint
58
* @param jobId Job ID to query
59
* @param vertexId Vertex ID to query metrics for
60
* @return Response containing available metrics
61
* @throws Exception if query fails
62
*/
63
public AggregatedMetricsResponseBody getMetricList(
64
TestEnvironment.Endpoint endpoint,
65
JobID jobId,
66
JobVertexID vertexId
67
) throws Exception;
68
69
/**
70
* Get specific metrics with filtering
71
* @param endpoint Test environment REST endpoint
72
* @param jobId Job ID to query
73
* @param vertexId Vertex ID to query
74
* @param filters Comma-separated list of metric filters
75
* @return Response containing filtered metrics
76
* @throws Exception if query fails
77
*/
78
public AggregatedMetricsResponseBody getMetrics(
79
TestEnvironment.Endpoint endpoint,
80
JobID jobId,
81
JobVertexID vertexId,
82
String filters
83
) throws Exception;
84
}
85
```
86
87
**Usage Examples:**
88
89
```java
90
// Create metric querier
91
MetricQuerier metricQuerier = new MetricQuerier(new Configuration());
92
93
// Query sink metrics
94
Double numRecordsOut = metricQuerier.getAggregatedMetricsByRestAPI(
95
testEnv.getRestEndpoint(),
96
jobClient.getJobID(),
97
"MySink", // sink operator name
98
MetricNames.NUM_RECORDS_SEND, // metric name
99
"Writer" // filter for Sink V2
100
);
101
102
// Validate expected record count
103
assertThat(numRecordsOut).isEqualTo(expectedRecordCount);
104
```
105
106
### Common Metrics
107
108
Standard metrics available for source and sink connectors.
109
110
```java { .api }
111
/**
112
* Configuration constants for connector testing
113
*/
114
public class ConnectorTestConstants {
115
public static final long METRIC_FETCHER_UPDATE_INTERVAL_MS = 1000L;
116
public static final long SLOT_REQUEST_TIMEOUT_MS = 10_000L;
117
public static final long HEARTBEAT_TIMEOUT_MS = 5_000L;
118
public static final long HEARTBEAT_INTERVAL_MS = 1000L;
119
public static final Duration DEFAULT_COLLECT_DATA_TIMEOUT = Duration.ofSeconds(120L);
120
}
121
```
122
123
Common metric names used in connector testing (from Flink's MetricNames class):
124
125
- `"numRecordsIn"` - Number of records received by source
126
- `"numRecordsOut"` - Number of records emitted by source
127
- `"numRecordsSend"` - Number of records sent by sink
128
- `"numBytesIn"` - Number of bytes received
129
- `"numBytesOut"` - Number of bytes emitted
130
131
**Usage Examples:**
132
133
```java
134
// Query different metric types using metric name strings
135
Double recordsIn = metricQuerier.getAggregatedMetricsByRestAPI(
136
endpoint, jobId, "MySource", "numRecordsIn", null);
137
138
Double recordsOut = metricQuerier.getAggregatedMetricsByRestAPI(
139
endpoint, jobId, "MySink", "numRecordsSend", "Writer");
140
141
Double bytesIn = metricQuerier.getAggregatedMetricsByRestAPI(
142
endpoint, jobId, "MySource", "numBytesIn", null);
143
```
144
145
## Integration with Test Suites
146
147
### Automatic Metrics Testing
148
149
Test suites include automatic metrics validation for sources and sinks.
150
151
```java
152
// From SinkTestSuiteBase.testMetrics()
153
@TestTemplate
154
@DisplayName("Test sink metrics")
155
public void testMetrics(
156
TestEnvironment testEnv,
157
DataStreamSinkExternalContext<T> externalContext,
158
CheckpointingMode semantic
159
) throws Exception {
160
161
// Generate and send test data
162
List<T> testRecords = generateTestData(sinkSettings, externalContext);
163
164
// Create and execute job
165
StreamExecutionEnvironment env = testEnv.createExecutionEnvironment(settings);
166
// ... job setup and execution
167
168
// Validate metrics
169
MetricQuerier queryRestClient = new MetricQuerier(new Configuration());
170
waitUntilCondition(() -> {
171
try {
172
return compareSinkMetrics(
173
queryRestClient,
174
testEnv,
175
externalContext,
176
jobClient.getJobID(),
177
sinkName,
178
MetricNames.NUM_RECORDS_SEND,
179
testRecords.size()
180
);
181
} catch (Exception e) {
182
return false; // Retry on failure
183
}
184
});
185
}
186
```
187
188
### Custom Metrics Validation
189
190
Implement custom metrics validation in your test classes.
191
192
```java
193
public class MyConnectorTestSuite extends SinkTestSuiteBase<String> {
194
195
@Test
196
public void testCustomMetrics() throws Exception {
197
// Execute connector job
198
JobClient jobClient = executeConnectorJob();
199
200
// Query custom metrics
201
MetricQuerier querier = new MetricQuerier(new Configuration());
202
203
// Validate throughput metrics
204
Double throughput = querier.getAggregatedMetricsByRestAPI(
205
testEnv.getRestEndpoint(),
206
jobClient.getJobID(),
207
"MyConnector",
208
"recordsPerSecond",
209
null
210
);
211
212
assertThat(throughput).isGreaterThan(1000.0); // Minimum throughput requirement
213
214
// Validate error metrics
215
Double errorRate = querier.getAggregatedMetricsByRestAPI(
216
testEnv.getRestEndpoint(),
217
jobClient.getJobID(),
218
"MyConnector",
219
"errorRate",
220
null
221
);
222
223
assertThat(errorRate).isLessThan(0.01); // Less than 1% error rate
224
}
225
}
226
```
227
228
## Metric Validation Patterns
229
230
### Throughput Validation
231
232
Validate connector throughput meets performance requirements.
233
234
```java
235
public void validateThroughput(JobClient jobClient, int expectedRecords, Duration testDuration) throws Exception {
236
MetricQuerier querier = new MetricQuerier(new Configuration());
237
238
// Wait for job to process all records
239
waitUntilCondition(() -> {
240
try {
241
Double processedRecords = querier.getAggregatedMetricsByRestAPI(
242
testEnv.getRestEndpoint(),
243
jobClient.getJobID(),
244
"MySource",
245
NUM_RECORDS_OUT,
246
null
247
);
248
return processedRecords >= expectedRecords;
249
} catch (Exception e) {
250
return false;
251
}
252
});
253
254
// Calculate and validate throughput
255
double throughput = expectedRecords / testDuration.toSeconds();
256
assertThat(throughput).isGreaterThan(100.0); // Records per second
257
}
258
```
259
260
### Latency Validation
261
262
Validate connector latency remains within acceptable bounds.
263
264
```java
265
public void validateLatency(JobClient jobClient) throws Exception {
266
MetricQuerier querier = new MetricQuerier(new Configuration());
267
268
// Query latency metrics
269
Double avgLatency = querier.getAggregatedMetricsByRestAPI(
270
testEnv.getRestEndpoint(),
271
jobClient.getJobID(),
272
"MyConnector",
273
LATENCY,
274
null
275
);
276
277
// Validate latency is under 100ms
278
assertThat(avgLatency).isLessThan(100.0);
279
}
280
```
281
282
### Resource Usage Validation
283
284
Validate memory and CPU usage patterns.
285
286
```java
287
public void validateResourceUsage(JobClient jobClient) throws Exception {
288
MetricQuerier querier = new MetricQuerier(new Configuration());
289
290
// Query memory usage
291
Double heapUsed = querier.getAggregatedMetricsByRestAPI(
292
testEnv.getRestEndpoint(),
293
jobClient.getJobID(),
294
"MyConnector",
295
"memoryHeapUsed",
296
null
297
);
298
299
Double heapMax = querier.getAggregatedMetricsByRestAPI(
300
testEnv.getRestEndpoint(),
301
jobClient.getJobID(),
302
"MyConnector",
303
"memoryHeapMax",
304
null
305
);
306
307
// Validate memory usage is under 80% of max
308
double memoryUsageRatio = heapUsed / heapMax;
309
assertThat(memoryUsageRatio).isLessThan(0.8);
310
}
311
```
312
313
## Advanced Metrics Scenarios
314
315
### Multi-Operator Metrics
316
317
Query metrics across multiple operators in complex pipelines.
318
319
```java
320
public void validatePipelineMetrics(JobClient jobClient) throws Exception {
321
MetricQuerier querier = new MetricQuerier(new Configuration());
322
323
// Query source metrics
324
Double sourceRecords = querier.getAggregatedMetricsByRestAPI(
325
testEnv.getRestEndpoint(), jobClient.getJobID(), "Source", NUM_RECORDS_OUT, null);
326
327
// Query transformation metrics
328
Double transformRecords = querier.getAggregatedMetricsByRestAPI(
329
testEnv.getRestEndpoint(), jobClient.getJobID(), "Transform", NUM_RECORDS_OUT, null);
330
331
// Query sink metrics
332
Double sinkRecords = querier.getAggregatedMetricsByRestAPI(
333
testEnv.getRestEndpoint(), jobClient.getJobID(), "Sink", NUM_RECORDS_SEND, "Writer");
334
335
// Validate record flow through pipeline
336
assertThat(sourceRecords).isEqualTo(transformRecords);
337
assertThat(transformRecords).isEqualTo(sinkRecords);
338
}
339
```
340
341
### Historical Metrics Comparison
342
343
Compare metrics across test runs for performance regression detection.
344
345
```java
346
public void compareWithBaseline(JobClient jobClient, MetricsBaseline baseline) throws Exception {
347
MetricQuerier querier = new MetricQuerier(new Configuration());
348
349
// Query current metrics
350
Double currentThroughput = querier.getAggregatedMetricsByRestAPI(
351
testEnv.getRestEndpoint(), jobClient.getJobID(), "MyConnector", "throughput", null);
352
353
Double currentLatency = querier.getAggregatedMetricsByRestAPI(
354
testEnv.getRestEndpoint(), jobClient.getJobID(), "MyConnector", LATENCY, null);
355
356
// Compare with baseline (allow 10% deviation)
357
assertThat(currentThroughput).isGreaterThan(baseline.getThroughput() * 0.9);
358
assertThat(currentLatency).isLessThan(baseline.getLatency() * 1.1);
359
}
360
```
361
362
### Parallel Subtask Metrics
363
364
Analyze metrics across parallel subtasks for load balancing validation.
365
366
```java
367
public void validateLoadBalancing(JobClient jobClient) throws Exception {
368
MetricQuerier querier = new MetricQuerier(new Configuration());
369
370
// Get metrics for each subtask
371
JobDetailsInfo jobDetails = MetricQuerier.getJobDetails(
372
new RestClient(new Configuration(), Executors.newCachedThreadPool()),
373
testEnv.getRestEndpoint(),
374
jobClient.getJobID()
375
);
376
377
// Find source vertex
378
JobVertexID sourceVertexId = jobDetails.getJobVertexInfos().stream()
379
.filter(v -> v.getName().contains("Source"))
380
.findFirst()
381
.map(JobDetailsInfo.JobVertexDetailsInfo::getJobVertexID)
382
.orElseThrow();
383
384
// Get per-subtask metrics
385
AggregatedMetricsResponseBody metrics = querier.getMetrics(
386
testEnv.getRestEndpoint(),
387
jobClient.getJobID(),
388
sourceVertexId,
389
NUM_RECORDS_OUT
390
);
391
392
// Validate load distribution
393
List<Double> subtaskValues = metrics.getMetrics().stream()
394
.map(AggregatedMetric::getSum)
395
.collect(Collectors.toList());
396
397
double mean = subtaskValues.stream().mapToDouble(Double::doubleValue).average().orElse(0.0);
398
double maxDeviation = subtaskValues.stream()
399
.mapToDouble(value -> Math.abs(value - mean) / mean)
400
.max().orElse(0.0);
401
402
// Ensure load is balanced within 20%
403
assertThat(maxDeviation).isLessThan(0.2);
404
}
405
```
406
407
## Error Handling
408
409
### Metric Query Failures
410
411
```java
412
try {
413
Double metric = querier.getAggregatedMetricsByRestAPI(endpoint, jobId, operatorName, metricName, filter);
414
} catch (IllegalStateException e) {
415
// Metric not found - operator name or metric name incorrect
416
fail("Metric not found: " + e.getMessage());
417
} catch (Exception e) {
418
// Network or cluster issues
419
throw new AssumptionViolatedException("Unable to query metrics", e);
420
}
421
```
422
423
### Timeout Handling
424
425
```java
426
// Use waitUntilCondition for retry logic
427
waitUntilCondition(() -> {
428
try {
429
Double metric = querier.getAggregatedMetricsByRestAPI(endpoint, jobId, operatorName, metricName, filter);
430
return Precision.equals(expectedValue, metric);
431
} catch (Exception e) {
432
// Retry on failure
433
return false;
434
}
435
}, Duration.ofMinutes(2)); // 2 minute timeout
436
```
437
438
### Debugging Metrics Issues
439
440
```java
441
// List all available metrics for debugging
442
AggregatedMetricsResponseBody allMetrics = querier.getMetricList(endpoint, jobId, vertexId);
443
allMetrics.getMetrics().forEach(metric -> {
444
System.out.println("Available metric: " + metric.getId());
445
});
446
447
// Get job details for vertex information
448
JobDetailsInfo jobDetails = MetricQuerier.getJobDetails(restClient, endpoint, jobId);
449
jobDetails.getJobVertexInfos().forEach(vertex -> {
450
System.out.println("Vertex: " + vertex.getName() + " ID: " + vertex.getJobVertexID());
451
});
452
```