0
# Metrics and Monitoring
1
2
Comprehensive metrics collection for monitoring upload performance, failure rates, and system health in production environments. The metrics system provides visibility into changelog storage operations and helps with performance tuning.
3
4
## Capabilities
5
6
### ChangelogStorageMetricGroup
7
8
Main metrics container providing thread-safe collection of changelog storage performance data.
9
10
```java { .api }
11
/**
12
* Metrics related to the Changelog Storage used by the Changelog State Backend.
13
* Thread-safe implementation for use by multiple uploader threads.
14
*/
15
public class ChangelogStorageMetricGroup extends ProxyMetricGroup<MetricGroup> {
16
17
/**
18
* Creates changelog storage metric group
19
* @param parent Parent metric group for hierarchical organization
20
*/
21
public ChangelogStorageMetricGroup(MetricGroup parent);
22
23
/**
24
* Records a successful upload operation
25
* @param batchSize Number of change sets in the upload batch
26
* @param uploadSize Total size of uploaded data in bytes
27
* @param latencyNanos Upload latency in nanoseconds
28
* @param attempts Number of attempts needed for successful upload
29
* @param totalAttempts Total attempts including retries
30
*/
31
public void recordUpload(
32
int batchSize,
33
long uploadSize,
34
long latencyNanos,
35
int attempts,
36
int totalAttempts
37
);
38
39
/**
40
* Records a failed upload operation
41
* @param batchSize Number of change sets in the failed batch
42
* @param uploadSize Total size of failed data in bytes
43
* @param attempts Number of attempts made before failure
44
*/
45
public void recordUploadFailure(int batchSize, long uploadSize, int attempts);
46
47
/**
48
* Updates the current in-flight data gauge
49
* @param inFlightBytes Current amount of in-flight data in bytes
50
*/
51
public void updateInFlightData(long inFlightBytes);
52
53
/**
54
* Records queue size metrics
55
* @param queueSize Current number of tasks in upload queue
56
*/
57
public void updateQueueSize(int queueSize);
58
}
59
```
60
61
### Core Metrics
62
63
The metric group provides several categories of metrics for comprehensive monitoring:
64
65
```java { .api }
66
/**
67
* Counter metrics for tracking upload operations
68
*/
69
public class CounterMetrics {
70
/** Total number of upload requests initiated */
71
private final Counter uploadsCounter;
72
73
/** Total number of upload failures */
74
private final Counter uploadFailuresCounter;
75
}
76
77
/**
78
* Histogram metrics for tracking distributions and performance
79
*/
80
public class HistogramMetrics {
81
/** Distribution of batch sizes in upload operations */
82
private final Histogram uploadBatchSizes;
83
84
/** Distribution of upload sizes in bytes */
85
private final Histogram uploadSizes;
86
87
/** Distribution of upload latencies in nanoseconds */
88
private final Histogram uploadLatenciesNanos;
89
90
/** Distribution of attempts per successful upload */
91
private final Histogram attemptsPerUpload;
92
93
/** Distribution of total attempts including failed uploads */
94
private final Histogram totalAttemptsPerUpload;
95
}
96
97
/**
98
* Gauge metrics for real-time status monitoring
99
*/
100
public class GaugeMetrics {
101
/** Current amount of in-flight data in bytes */
102
private final Gauge<Long> inFlightDataGauge;
103
104
/** Current size of upload queue */
105
private final Gauge<Integer> queueSizeGauge;
106
}
107
```
108
109
### Metric Constants
110
111
Standard metric names for consistent reporting across Flink installations:
112
113
```java { .api }
114
/**
115
* Standard metric names for changelog storage
116
*/
117
public class MetricNames {
118
/** Counter: Total number of upload requests */
119
public static final String CHANGELOG_STORAGE_NUM_UPLOAD_REQUESTS = "numUploadRequests";
120
121
/** Counter: Total number of upload failures */
122
public static final String CHANGELOG_STORAGE_NUM_UPLOAD_FAILURES = "numUploadFailures";
123
124
/** Histogram: Upload batch sizes */
125
public static final String CHANGELOG_STORAGE_UPLOAD_BATCH_SIZES = "uploadBatchSizes";
126
127
/** Histogram: Upload sizes in bytes */
128
public static final String CHANGELOG_STORAGE_UPLOAD_SIZES = "uploadSizes";
129
130
/** Histogram: Upload latencies in nanoseconds */
131
public static final String CHANGELOG_STORAGE_UPLOAD_LATENCIES_NANOS = "uploadLatenciesNanos";
132
133
/** Histogram: Attempts per successful upload */
134
public static final String CHANGELOG_STORAGE_ATTEMPTS_PER_UPLOAD = "attemptsPerUpload";
135
136
/** Histogram: Total attempts including failures */
137
public static final String CHANGELOG_STORAGE_TOTAL_ATTEMPTS_PER_UPLOAD = "totalAttemptsPerUpload";
138
139
/** Gauge: Current in-flight data in bytes */
140
public static final String CHANGELOG_STORAGE_IN_FLIGHT_DATA = "inFlightData";
141
142
/** Gauge: Current upload queue size */
143
public static final String CHANGELOG_STORAGE_QUEUE_SIZE = "queueSize";
144
}
145
```
146
147
**Usage Examples:**
148
149
```java
150
import org.apache.flink.changelog.fs.ChangelogStorageMetricGroup;
151
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
152
153
// Create metrics group (typically done by storage)
154
ChangelogStorageMetricGroup metricGroup =
155
new ChangelogStorageMetricGroup(taskManagerJobMetricGroup);
156
157
// Record successful upload
158
long startTime = System.nanoTime();
159
// ... perform upload ...
160
long endTime = System.nanoTime();
161
162
metricGroup.recordUpload(
163
batchSize, // Number of change sets uploaded
164
uploadSizeBytes, // Total bytes uploaded
165
endTime - startTime, // Upload latency in nanoseconds
166
1, // Attempts needed (1 for success on first try)
167
1 // Total attempts made
168
);
169
170
// Record failed upload
171
metricGroup.recordUploadFailure(
172
failedBatchSize, // Number of change sets that failed
173
failedUploadSize, // Total bytes that failed to upload
174
maxAttempts // Number of attempts made before giving up
175
);
176
177
// Update real-time gauges
178
metricGroup.updateInFlightData(currentInFlightBytes);
179
metricGroup.updateQueueSize(currentQueueSize);
180
```
181
182
### Integration with Upload System
183
184
The metrics system integrates seamlessly with the upload components:
185
186
```java
187
// Upload scheduler records metrics during operation
188
public class BatchingStateChangeUploadScheduler implements StateChangeUploadScheduler {
189
190
private void executeUpload(Collection<UploadTask> tasks) {
191
int totalBatchSize = tasks.stream().mapToInt(task -> task.getChangeSets().size()).sum();
192
long totalSize = tasks.stream().mapToLong(task -> task.getTotalSize()).sum();
193
194
// Update queue size before upload
195
metricGroup.updateQueueSize(pendingTasks.size());
196
197
long startTime = System.nanoTime();
198
try {
199
// Perform upload
200
UploadTasksResult result = uploader.upload(tasks);
201
long endTime = System.nanoTime();
202
203
// Record successful uploads
204
result.getSuccessful().forEach((task, uploadResults) -> {
205
metricGroup.recordUpload(
206
task.getChangeSets().size(),
207
task.getTotalSize(),
208
endTime - startTime,
209
1, // Successful on first attempt
210
1
211
);
212
});
213
214
// Record failed uploads
215
result.getFailed().forEach((task, exception) -> {
216
metricGroup.recordUploadFailure(
217
task.getChangeSets().size(),
218
task.getTotalSize(),
219
maxRetryAttempts
220
);
221
});
222
223
} catch (Exception e) {
224
// Record all as failures
225
metricGroup.recordUploadFailure(totalBatchSize, totalSize, maxRetryAttempts);
226
}
227
}
228
}
229
```
230
231
### Retry Metrics Integration
232
233
The metrics system tracks retry behavior and helps tune retry policies:
234
235
```java
236
// RetryingExecutor integration with metrics
237
public class RetryingExecutor {
238
239
public <T> T execute(Callable<T> operation, RetryPolicy retryPolicy) throws Exception {
240
int attempts = 0;
241
long totalAttempts = 0;
242
Exception lastException = null;
243
244
while (attempts < maxAttempts) {
245
attempts++;
246
totalAttempts++;
247
248
try {
249
long startTime = System.nanoTime();
250
T result = operation.call();
251
long endTime = System.nanoTime();
252
253
// Record successful operation with retry metrics
254
metricGroup.recordUpload(
255
batchSize,
256
uploadSize,
257
endTime - startTime,
258
attempts, // Attempts needed for success
259
totalAttempts // Total attempts made
260
);
261
262
return result;
263
264
} catch (Exception e) {
265
lastException = e;
266
long retryDelay = retryPolicy.retryAfter(attempts, e);
267
268
if (retryDelay < 0) {
269
// No more retries
270
break;
271
}
272
273
// Sleep before retry
274
Thread.sleep(retryDelay);
275
}
276
}
277
278
// Record failure with total attempts
279
metricGroup.recordUploadFailure(batchSize, uploadSize, totalAttempts);
280
throw lastException;
281
}
282
}
283
```
284
285
### Monitoring Dashboard Integration
286
287
The metrics can be integrated with monitoring dashboards and alerting systems:
288
289
```java
290
// Example metric queries for monitoring systems:
291
292
// Upload success rate
293
// sum(rate(numUploadRequests[5m])) - sum(rate(numUploadFailures[5m])) / sum(rate(numUploadRequests[5m]))
294
295
// Average upload latency
296
// histogram_quantile(0.5, uploadLatenciesNanos)
297
298
// P99 upload latency
299
// histogram_quantile(0.99, uploadLatenciesNanos)
300
301
// Current backpressure status
302
// inFlightData > in_flight_data_limit_threshold
303
304
// Queue buildup
305
// queueSize > queue_size_threshold
306
```
307
308
### Performance Analysis
309
310
Use metrics for performance analysis and optimization:
311
312
```java
313
/**
314
* Metrics analysis for performance tuning
315
*/
316
public class PerformanceAnalysis {
317
318
/**
319
* Analyzes upload performance metrics
320
* @param metricGroup Metrics to analyze
321
* @return Performance recommendations
322
*/
323
public PerformanceRecommendations analyze(ChangelogStorageMetricGroup metricGroup) {
324
// Analyze upload patterns
325
double failureRate = calculateFailureRate();
326
double averageLatency = calculateAverageLatency();
327
double p99Latency = calculateP99Latency();
328
long averageBatchSize = calculateAverageBatchSize();
329
330
// Generate recommendations
331
if (failureRate > 0.05) {
332
// High failure rate - increase retry attempts or timeout
333
return new PerformanceRecommendations()
334
.increaseRetryAttempts()
335
.increaseUploadTimeout();
336
}
337
338
if (p99Latency > Duration.ofSeconds(10).toNanos()) {
339
// High tail latency - increase parallelism or buffer size
340
return new PerformanceRecommendations()
341
.increaseUploadThreads()
342
.increaseBufferSize();
343
}
344
345
if (averageBatchSize < 5) {
346
// Small batches - increase batching delay or threshold
347
return new PerformanceRecommendations()
348
.increasePersistDelay()
349
.increasePersistSizeThreshold();
350
}
351
352
return PerformanceRecommendations.optimal();
353
}
354
}
355
```
356
357
### Alerting and Monitoring
358
359
Set up alerts based on key metrics:
360
361
```java
362
/**
363
* Monitoring thresholds for alerting
364
*/
365
public class MonitoringThresholds {
366
367
/** Alert when failure rate exceeds 5% */
368
public static final double MAX_FAILURE_RATE = 0.05;
369
370
/** Alert when P99 latency exceeds 30 seconds */
371
public static final long MAX_P99_LATENCY_NANOS = Duration.ofSeconds(30).toNanos();
372
373
/** Alert when in-flight data approaches limit */
374
public static final double IN_FLIGHT_DATA_WARNING_RATIO = 0.8;
375
376
/** Alert when queue size indicates backpressure */
377
public static final int MAX_QUEUE_SIZE = 1000;
378
379
/**
380
* Checks if any metrics exceed alert thresholds
381
* @param metrics Current metric values
382
* @return List of active alerts
383
*/
384
public List<Alert> checkAlerts(MetricSnapshot metrics) {
385
List<Alert> alerts = new ArrayList<>();
386
387
if (metrics.getFailureRate() > MAX_FAILURE_RATE) {
388
alerts.add(Alert.highFailureRate(metrics.getFailureRate()));
389
}
390
391
if (metrics.getP99LatencyNanos() > MAX_P99_LATENCY_NANOS) {
392
alerts.add(Alert.highLatency(metrics.getP99LatencyNanos()));
393
}
394
395
if (metrics.getInFlightDataRatio() > IN_FLIGHT_DATA_WARNING_RATIO) {
396
alerts.add(Alert.backpressureWarning(metrics.getInFlightDataRatio()));
397
}
398
399
if (metrics.getQueueSize() > MAX_QUEUE_SIZE) {
400
alerts.add(Alert.queueBacklog(metrics.getQueueSize()));
401
}
402
403
return alerts;
404
}
405
}
406
```