0
# Reporter Framework
1
2
Pluggable system for exporting metrics to external monitoring systems. The reporter framework enables integration with various monitoring backends through configurable metric exporters that can operate in both push and pull patterns.
3
4
## Capabilities
5
6
### MetricReporter Interface
7
8
Core interface for implementing metric reporters that export metrics to external systems. Provides lifecycle management and metric notification callbacks.
9
10
```java { .api }
11
/**
12
* Metric reporters are used to export Metrics to an external backend.
13
* Metric reporters are instantiated via a MetricReporterFactory.
14
*/
15
public interface MetricReporter {
16
17
/**
18
* Configures this reporter. If the reporter was instantiated generically
19
* and hence parameter-less, this method is the place where the reporter
20
* sets its basic fields based on configuration values.
21
* This method is always called first on a newly instantiated reporter.
22
* @param config A properties object that contains all parameters set for this reporter
23
*/
24
void open(MetricConfig config);
25
26
/**
27
* Closes this reporter. Should be used to close channels, streams and release resources.
28
*/
29
void close();
30
31
/**
32
* Called when a new Metric was added.
33
* @param metric the metric that was added
34
* @param metricName the name of the metric
35
* @param group the group that contains the metric
36
*/
37
void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
38
39
/**
40
* Called when a Metric was removed.
41
* @param metric the metric that should be removed
42
* @param metricName the name of the metric
43
* @param group the group that contains the metric
44
*/
45
void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
46
}
47
```
48
49
**Usage Examples:**
50
51
```java
52
// Custom reporter implementation
53
public class ConsoleMetricReporter implements MetricReporter {
54
private boolean isOpen = false;
55
56
@Override
57
public void open(MetricConfig config) {
58
this.isOpen = true;
59
System.out.println("Console reporter started");
60
}
61
62
@Override
63
public void close() {
64
this.isOpen = false;
65
System.out.println("Console reporter stopped");
66
}
67
68
@Override
69
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
70
String identifier = group.getMetricIdentifier(metricName);
71
System.out.println("Added metric: " + identifier + " of type " + metric.getMetricType());
72
}
73
74
@Override
75
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
76
String identifier = group.getMetricIdentifier(metricName);
77
System.out.println("Removed metric: " + identifier);
78
}
79
}
80
81
// Handle different metric types in reporter
82
@Override
83
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
84
String identifier = group.getMetricIdentifier(metricName);
85
86
switch (metric.getMetricType()) {
87
case COUNTER:
88
Counter counter = (Counter) metric;
89
registerCounter(identifier, counter);
90
break;
91
case GAUGE:
92
Gauge<?> gauge = (Gauge<?>) metric;
93
registerGauge(identifier, gauge);
94
break;
95
case METER:
96
Meter meter = (Meter) metric;
97
registerMeter(identifier, meter);
98
break;
99
case HISTOGRAM:
100
Histogram histogram = (Histogram) metric;
101
registerHistogram(identifier, histogram);
102
break;
103
}
104
}
105
```
106
107
### MetricReporterFactory Interface
108
109
Factory interface for creating metric reporters, enabling plugin-based reporter loading and configuration.
110
111
```java { .api }
112
/**
113
* MetricReporter factory. Metric reporters that can be instantiated with
114
* a factory automatically qualify for being loaded as a plugin, so long as
115
* the reporter jar is self-contained and contains a
116
* META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory
117
* file containing the qualified class name of the factory.
118
*/
119
public interface MetricReporterFactory {
120
/**
121
* Creates a new metric reporter.
122
* @param properties configured properties for the reporter
123
* @return created metric reporter
124
*/
125
MetricReporter createMetricReporter(Properties properties);
126
}
127
```
128
129
**Usage Examples:**
130
131
```java
132
// Factory implementation
133
public class ConsoleReporterFactory implements MetricReporterFactory {
134
@Override
135
public MetricReporter createMetricReporter(Properties properties) {
136
return new ConsoleMetricReporter();
137
}
138
}
139
140
// Factory with configuration
141
public class ConfigurableReporterFactory implements MetricReporterFactory {
142
@Override
143
public MetricReporter createMetricReporter(Properties properties) {
144
String endpoint = properties.getProperty("endpoint", "localhost:8080");
145
int interval = Integer.parseInt(properties.getProperty("interval", "60"));
146
147
ConfigurableReporter reporter = new ConfigurableReporter();
148
reporter.setEndpoint(endpoint);
149
reporter.setReportInterval(interval);
150
151
return reporter;
152
}
153
}
154
155
// Plugin registration (in META-INF/services file)
156
// META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory
157
// com.example.ConsoleReporterFactory
158
// com.example.ConfigurableReporterFactory
159
```
160
161
### Scheduled Reporter Interface
162
163
Interface for reporters that actively send out data periodically rather than only responding to metric addition/removal events.
164
165
```java { .api }
166
/**
167
* Interface for reporters that actively send out data periodically.
168
*/
169
public interface Scheduled {
170
/**
171
* Report the current measurements. This method is called periodically
172
* by the metrics registry that uses the reporter. This method must not
173
* block for a significant amount of time, any reporter needing more time
174
* should instead run the operation asynchronously.
175
*/
176
void report();
177
}
178
```
179
180
**Usage Examples:**
181
182
```java
183
// Scheduled reporter implementation
184
public class PeriodicReporter implements MetricReporter, Scheduled {
185
private final Map<String, Metric> registeredMetrics = new ConcurrentHashMap<>();
186
private final HttpClient httpClient = HttpClient.newHttpClient();
187
private String endpoint;
188
189
@Override
190
public void open(MetricConfig config) {
191
this.endpoint = config.getString("endpoint", "http://localhost:8080/metrics");
192
}
193
194
@Override
195
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
196
String identifier = group.getMetricIdentifier(metricName);
197
registeredMetrics.put(identifier, metric);
198
}
199
200
@Override
201
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
202
String identifier = group.getMetricIdentifier(metricName);
203
registeredMetrics.remove(identifier);
204
}
205
206
// Called periodically by Flink
207
@Override
208
public void report() {
209
Map<String, Object> metrics = new HashMap<>();
210
211
for (Map.Entry<String, Metric> entry : registeredMetrics.entrySet()) {
212
String name = entry.getKey();
213
Metric metric = entry.getValue();
214
215
Object value = extractValue(metric);
216
if (value != null) {
217
metrics.put(name, value);
218
}
219
}
220
221
// Send metrics asynchronously to avoid blocking
222
CompletableFuture.runAsync(() -> sendMetrics(metrics));
223
}
224
225
private Object extractValue(Metric metric) {
226
switch (metric.getMetricType()) {
227
case COUNTER:
228
return ((Counter) metric).getCount();
229
case GAUGE:
230
return ((Gauge<?>) metric).getValue();
231
case METER:
232
Meter meter = (Meter) metric;
233
Map<String, Object> meterData = new HashMap<>();
234
meterData.put("rate", meter.getRate());
235
meterData.put("count", meter.getCount());
236
return meterData;
237
case HISTOGRAM:
238
Histogram histogram = (Histogram) metric;
239
HistogramStatistics stats = histogram.getStatistics();
240
Map<String, Object> histogramData = new HashMap<>();
241
histogramData.put("count", histogram.getCount());
242
histogramData.put("mean", stats.getMean());
243
histogramData.put("p95", stats.getQuantile(0.95));
244
histogramData.put("max", stats.getMax());
245
histogramData.put("min", stats.getMin());
246
return histogramData;
247
default:
248
return null;
249
}
250
}
251
252
@Override
253
public void close() {
254
// Clean up resources
255
}
256
}
257
```
258
259
### AbstractReporter Base Class
260
261
Base implementation providing common functionality for metric reporters.
262
263
```java { .api }
264
/**
265
* Base implementation for metric reporters.
266
*/
267
public abstract class AbstractReporter implements MetricReporter {
268
// Provides common functionality and convenience methods for reporter implementations
269
}
270
```
271
272
### Reporter Annotations
273
274
Annotations for controlling reporter instantiation behavior.
275
276
```java { .api }
277
/**
278
* Marker annotation for factory-instantiated reporters.
279
*/
280
public @interface InstantiateViaFactory {
281
}
282
283
/**
284
* Marker annotation for reflection-based instantiation.
285
*/
286
public @interface InterceptInstantiationViaReflection {
287
}
288
```
289
290
**Usage Examples:**
291
292
```java
293
// Factory-based reporter
294
@InstantiateViaFactory
295
public class MyFactoryReporter implements MetricReporter {
296
// Will be instantiated via MetricReporterFactory
297
}
298
299
// Reflection-based reporter
300
@InterceptInstantiationViaReflection
301
public class MyReflectionReporter implements MetricReporter {
302
// Will be instantiated via reflection with default constructor
303
}
304
```
305
306
### Reporter Configuration Patterns
307
308
Common patterns for configuring reporters using MetricConfig.
309
310
**Configuration Handling:**
311
312
```java
313
public class DatabaseReporter implements MetricReporter {
314
private String jdbcUrl;
315
private String username;
316
private String password;
317
private int batchSize;
318
private long flushInterval;
319
320
@Override
321
public void open(MetricConfig config) {
322
// Required configuration
323
this.jdbcUrl = config.getString("jdbc.url", null);
324
if (jdbcUrl == null) {
325
throw new IllegalArgumentException("jdbc.url is required");
326
}
327
328
// Optional configuration with defaults
329
this.username = config.getString("jdbc.username", "metrics");
330
this.password = config.getString("jdbc.password", "");
331
this.batchSize = config.getInteger("batch.size", 100);
332
this.flushInterval = config.getLong("flush.interval", 30000); // 30 seconds
333
334
// Initialize database connection
335
initializeConnection();
336
}
337
}
338
```
339
340
**Error Handling in Reporters:**
341
342
```java
343
public class RobustReporter implements MetricReporter, Scheduled {
344
private final AtomicBoolean isHealthy = new AtomicBoolean(true);
345
346
@Override
347
public void report() {
348
if (!isHealthy.get()) {
349
return; // Skip reporting if unhealthy
350
}
351
352
try {
353
doReport();
354
} catch (Exception e) {
355
isHealthy.set(false);
356
scheduleHealthCheck();
357
log.warn("Reporter became unhealthy", e);
358
}
359
}
360
361
private void scheduleHealthCheck() {
362
CompletableFuture.delayedExecutor(30, TimeUnit.SECONDS)
363
.execute(() -> {
364
try {
365
if (checkHealth()) {
366
isHealthy.set(true);
367
log.info("Reporter recovered");
368
}
369
} catch (Exception e) {
370
// Will retry on next scheduled check
371
}
372
});
373
}
374
}
375
```