0
# Flink Metrics Dropwizard
1
2
Dropwizard metrics integration for Apache Flink that provides bidirectional bridges and wrappers to integrate Flink metrics with the Dropwizard Metrics library, enabling Flink applications to leverage Dropwizard's extensive ecosystem of metric reporters.
3
4
## Package Information
5
6
- **Package Name**: flink-metrics-dropwizard
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Group ID**: org.apache.flink
10
- **Artifact ID**: flink-metrics-dropwizard
11
- **Installation**: Add to Maven dependencies:
12
13
```xml
14
<dependency>
15
<groupId>org.apache.flink</groupId>
16
<artifactId>flink-metrics-dropwizard</artifactId>
17
<version>2.1.0</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
```java
24
import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
25
import org.apache.flink.dropwizard.metrics.FlinkCounterWrapper;
26
import org.apache.flink.dropwizard.metrics.FlinkGaugeWrapper;
27
import org.apache.flink.dropwizard.metrics.FlinkMeterWrapper;
28
import org.apache.flink.dropwizard.metrics.FlinkHistogramWrapper;
29
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
30
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
31
```
32
33
## Basic Usage
34
35
```java
36
import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
37
import org.apache.flink.dropwizard.metrics.FlinkCounterWrapper;
38
import org.apache.flink.metrics.Counter;
39
import org.apache.flink.metrics.SimpleCounter;
40
import org.apache.flink.metrics.MetricConfig;
41
import com.codahale.metrics.ScheduledReporter;
42
import com.codahale.metrics.ConsoleReporter;
43
import java.util.concurrent.TimeUnit;
44
45
// Create a custom reporter extending ScheduledDropwizardReporter
46
public class CustomDropwizardReporter extends ScheduledDropwizardReporter {
47
@Override
48
public ScheduledReporter getReporter(MetricConfig config) {
49
return ConsoleReporter.forRegistry(registry)
50
.convertRatesTo(TimeUnit.SECONDS)
51
.convertDurationsTo(TimeUnit.MILLISECONDS)
52
.build();
53
}
54
}
55
56
// Use Flink counter with Dropwizard
57
Counter flinkCounter = new SimpleCounter();
58
FlinkCounterWrapper dropwizardCounter = new FlinkCounterWrapper(flinkCounter);
59
60
// Increment and get count
61
dropwizardCounter.inc();
62
dropwizardCounter.inc(5);
63
long count = dropwizardCounter.getCount(); // Returns 6
64
65
// Use Dropwizard meter with Flink
66
com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
67
DropwizardMeterWrapper flinkMeter = new DropwizardMeterWrapper(dropwizardMeter);
68
69
// Mark events and get rate
70
flinkMeter.markEvent();
71
flinkMeter.markEvent(10);
72
double rate = flinkMeter.getRate();
73
```
74
75
## Architecture
76
77
The library provides bidirectional integration between Flink and Dropwizard metrics systems:
78
79
- **Reporter Base Class**: `ScheduledDropwizardReporter` serves as the foundation for creating custom Dropwizard-based metric reporters in Flink
80
- **Flink-to-Dropwizard Wrappers**: Allow Flink metrics to be used as Dropwizard metrics for reporting through Dropwizard's reporter ecosystem
81
- **Dropwizard-to-Flink Wrappers**: Enable Dropwizard metrics to be used within Flink's metrics system
82
- **Statistics Adapters**: Provide seamless conversion between Flink and Dropwizard histogram statistics formats
83
- **Automatic Type Detection**: The reporter automatically applies appropriate wrappers based on metric types
84
85
## Capabilities
86
87
### Scheduled Dropwizard Reporter
88
89
Abstract base class for creating metric reporters that integrate Flink metrics with Dropwizard's reporting infrastructure.
90
91
```java { .api }
92
/**
93
* Base class for MetricReporter that wraps a Dropwizard Reporter.
94
* Automatically handles metric type detection and wrapper application.
95
*/
96
@PublicEvolving
97
public abstract class ScheduledDropwizardReporter
98
implements MetricReporter, Scheduled, Reporter, CharacterFilter {
99
100
// Configuration constants
101
public static final String ARG_HOST = "host";
102
public static final String ARG_PORT = "port";
103
public static final String ARG_PREFIX = "prefix";
104
public static final String ARG_CONVERSION_RATE = "rateConversion";
105
public static final String ARG_CONVERSION_DURATION = "durationConversion";
106
107
/**
108
* Opens the reporter with the given configuration
109
* @param config Metric configuration
110
*/
111
public void open(MetricConfig config);
112
113
/**
114
* Closes the reporter and stops reporting
115
*/
116
public void close();
117
118
/**
119
* Called when a metric is added to the system
120
* @param metric The metric instance
121
* @param metricName The metric name
122
* @param group The metric group
123
*/
124
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
125
126
/**
127
* Called when a metric is removed from the system
128
* @param metric The metric instance
129
* @param metricName The metric name
130
* @param group The metric group
131
*/
132
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
133
134
/**
135
* Filters invalid characters from metric names
136
* @param metricName The metric name to filter
137
* @return Filtered metric name
138
*/
139
public String filterCharacters(String metricName);
140
141
/**
142
* Reports all metrics to the underlying Dropwizard reporter
143
*/
144
public void report();
145
146
/**
147
* Abstract method to create the underlying Dropwizard ScheduledReporter
148
* @param config Metric configuration
149
* @return ScheduledReporter instance
150
*/
151
public abstract ScheduledReporter getReporter(MetricConfig config);
152
}
153
```
154
155
### Flink Counter Wrapper
156
157
Wrapper that allows a Flink counter to be used as a Dropwizard counter.
158
159
```java { .api }
160
/**
161
* Wrapper that allows a Flink counter to be used as a DropWizard counter
162
*/
163
public class FlinkCounterWrapper extends com.codahale.metrics.Counter {
164
165
/**
166
* Creates a wrapper around a Flink counter
167
* @param counter The Flink counter to wrap
168
*/
169
public FlinkCounterWrapper(Counter counter);
170
171
/**
172
* Gets the current count value
173
* @return The current count
174
*/
175
public long getCount();
176
177
/**
178
* Increments the counter by 1
179
*/
180
public void inc();
181
182
/**
183
* Increments the counter by the given amount
184
* @param n Amount to increment by
185
*/
186
public void inc(long n);
187
188
/**
189
* Decrements the counter by 1
190
*/
191
public void dec();
192
193
/**
194
* Decrements the counter by the given amount
195
* @param n Amount to decrement by
196
*/
197
public void dec(long n);
198
}
199
```
200
201
### Flink Gauge Wrapper
202
203
Wrapper that allows a Flink gauge to be used as a Dropwizard gauge.
204
205
```java { .api }
206
/**
207
* Wrapper that allows a Flink gauge to be used as a DropWizard gauge
208
*/
209
public class FlinkGaugeWrapper<T> implements com.codahale.metrics.Gauge<T> {
210
211
/**
212
* Creates a wrapper around a Flink gauge
213
* @param gauge The Flink gauge to wrap
214
*/
215
public FlinkGaugeWrapper(Gauge<T> gauge);
216
217
/**
218
* Gets the current gauge value
219
* @return The current value
220
*/
221
public T getValue();
222
223
/**
224
* Static factory method to create a wrapper from any gauge
225
* @param gauge The gauge to wrap
226
* @return FlinkGaugeWrapper instance
227
*/
228
public static <T> FlinkGaugeWrapper<T> fromGauge(Gauge<?> gauge);
229
}
230
```
231
232
### Flink Meter Wrapper
233
234
Wrapper to use a Flink Meter as a Dropwizard Meter for reporting purposes.
235
236
```java { .api }
237
/**
238
* Wrapper to use a Flink Meter as a Dropwizard Meter.
239
* Note: Only one minute rate is supported, other rates return 0.
240
*/
241
public class FlinkMeterWrapper extends com.codahale.metrics.Meter {
242
243
/**
244
* Creates a wrapper around a Flink meter
245
* @param meter The Flink meter to wrap
246
*/
247
public FlinkMeterWrapper(Meter meter);
248
249
/**
250
* Creates a wrapper around a Flink meter with custom clock
251
* @param meter The Flink meter to wrap
252
* @param clock Custom clock for timing
253
*/
254
public FlinkMeterWrapper(Meter meter, Clock clock);
255
256
/**
257
* Marks a single event
258
*/
259
public void mark();
260
261
/**
262
* Marks multiple events
263
* @param n Number of events to mark
264
*/
265
public void mark(long n);
266
267
/**
268
* Gets the total event count
269
* @return Total count of events
270
*/
271
public long getCount();
272
273
/**
274
* Gets the one minute rate (delegates to Flink meter's getRate())
275
* @return One minute rate
276
*/
277
public double getOneMinuteRate();
278
279
/**
280
* Gets the five minute rate (not supported, returns 0)
281
* @return Always returns 0.0
282
*/
283
public double getFiveMinuteRate();
284
285
/**
286
* Gets the fifteen minute rate (not supported, returns 0)
287
* @return Always returns 0.0
288
*/
289
public double getFifteenMinuteRate();
290
291
/**
292
* Gets the mean rate (not supported, returns 0)
293
* @return Always returns 0.0
294
*/
295
public double getMeanRate();
296
}
297
```
298
299
### Flink Histogram Wrapper
300
301
Wrapper to use a Flink Histogram as a Dropwizard Histogram for reporting purposes.
302
303
```java { .api }
304
/**
305
* Wrapper to use a Flink Histogram as a Dropwizard Histogram
306
*/
307
public class FlinkHistogramWrapper extends com.codahale.metrics.Histogram {
308
309
/**
310
* Creates a wrapper around a Flink histogram
311
* @param histogram The Flink histogram to wrap
312
*/
313
public FlinkHistogramWrapper(Histogram histogram);
314
315
/**
316
* Updates the histogram with a new value
317
* @param value The value to add
318
*/
319
public void update(long value);
320
321
/**
322
* Gets the total number of values recorded
323
* @return Count of recorded values
324
*/
325
public long getCount();
326
327
/**
328
* Gets a snapshot of the histogram statistics
329
* @return Snapshot wrapped for Dropwizard compatibility
330
*/
331
public Snapshot getSnapshot();
332
}
333
```
334
335
### Dropwizard Meter Wrapper
336
337
Wrapper to use a Dropwizard Meter as a Flink Meter within Flink's metrics system.
338
339
```java { .api }
340
/**
341
* Wrapper to use a Dropwizard Meter as a Flink Meter
342
*/
343
public class DropwizardMeterWrapper implements Meter {
344
345
/**
346
* Creates a wrapper around a Dropwizard meter
347
* @param meter The Dropwizard meter to wrap
348
*/
349
public DropwizardMeterWrapper(com.codahale.metrics.Meter meter);
350
351
/**
352
* Gets access to the underlying Dropwizard meter
353
* @return The wrapped Dropwizard meter
354
*/
355
public com.codahale.metrics.Meter getDropwizardMeter();
356
357
/**
358
* Marks a single event
359
*/
360
public void markEvent();
361
362
/**
363
* Marks multiple events
364
* @param n Number of events to mark
365
*/
366
public void markEvent(long n);
367
368
/**
369
* Gets the rate (one minute rate from Dropwizard meter)
370
* @return Current rate
371
*/
372
public double getRate();
373
374
/**
375
* Gets the total event count
376
* @return Total count of events
377
*/
378
public long getCount();
379
}
380
```
381
382
### Dropwizard Histogram Wrapper
383
384
Wrapper to use a Dropwizard Histogram as a Flink Histogram within Flink's metrics system.
385
386
```java { .api }
387
/**
388
* Wrapper to use a Dropwizard Histogram as a Flink Histogram
389
*/
390
public class DropwizardHistogramWrapper implements Histogram {
391
392
/**
393
* Creates a wrapper around a Dropwizard histogram
394
* @param dropwizardHistogram The Dropwizard histogram to wrap
395
*/
396
public DropwizardHistogramWrapper(com.codahale.metrics.Histogram dropwizardHistogram);
397
398
/**
399
* Gets access to the underlying Dropwizard histogram
400
* @return The wrapped Dropwizard histogram
401
*/
402
public com.codahale.metrics.Histogram getDropwizardHistogram();
403
404
/**
405
* Updates the histogram with a new value
406
* @param value The value to add
407
*/
408
public void update(long value);
409
410
/**
411
* Gets the total number of values recorded
412
* @return Count of recorded values
413
*/
414
public long getCount();
415
416
/**
417
* Gets histogram statistics compatible with Flink
418
* @return HistogramStatistics instance
419
*/
420
public HistogramStatistics getStatistics();
421
}
422
```
423
424
## Types
425
426
```java { .api }
427
/**
428
* Dropwizard histogram statistics implementation for DropwizardHistogramWrapper
429
*/
430
class DropwizardHistogramStatistics extends HistogramStatistics {
431
432
/**
433
* Gets the value at the specified quantile
434
* @param quantile The quantile (0.0 to 1.0)
435
* @return Value at the quantile
436
*/
437
public double getQuantile(double quantile);
438
439
/**
440
* Gets all recorded values
441
* @return Array of all values
442
*/
443
public long[] getValues();
444
445
/**
446
* Gets the number of recorded values
447
* @return Size of the dataset
448
*/
449
public int size();
450
451
/**
452
* Gets the arithmetic mean of all values
453
* @return Mean value
454
*/
455
public double getMean();
456
457
/**
458
* Gets the standard deviation
459
* @return Standard deviation
460
*/
461
public double getStdDev();
462
463
/**
464
* Gets the maximum value
465
* @return Maximum value
466
*/
467
public long getMax();
468
469
/**
470
* Gets the minimum value
471
* @return Minimum value
472
*/
473
public long getMin();
474
}
475
476
/**
477
* Wrapper to use Flink's HistogramStatistics as a Dropwizard Snapshot
478
*/
479
class HistogramStatisticsWrapper extends Snapshot {
480
481
/**
482
* Gets the value at the specified quantile
483
* @param quantile The quantile (0.0 to 1.0)
484
* @return Value at the quantile
485
*/
486
public double getValue(double quantile);
487
488
/**
489
* Gets all recorded values
490
* @return Array of all values
491
*/
492
public long[] getValues();
493
494
/**
495
* Gets the number of recorded values
496
* @return Size of the dataset
497
*/
498
public int size();
499
500
/**
501
* Gets the maximum value
502
* @return Maximum value
503
*/
504
public long getMax();
505
506
/**
507
* Gets the arithmetic mean of all values
508
* @return Mean value
509
*/
510
public double getMean();
511
512
/**
513
* Gets the minimum value
514
* @return Minimum value
515
*/
516
public long getMin();
517
518
/**
519
* Gets the standard deviation
520
* @return Standard deviation
521
*/
522
public double getStdDev();
523
524
/**
525
* Dumps all histogram values to an output stream
526
* @param output OutputStream to write values to
527
*/
528
public void dump(OutputStream output);
529
}
530
```