A Flink metrics reporter that exports metrics to Graphite monitoring system via TCP or UDP protocols
npx @tessl/cli install tessl/maven-org-apache-flink--flink-metrics-graphite@2.1.00
# Flink Metrics Graphite
1
2
Flink Metrics Graphite is a metrics reporter component for Apache Flink that exports metrics to Graphite monitoring systems. It provides seamless integration between Flink's internal metrics collection framework and Graphite's time-series database, supporting both TCP and UDP protocols for metrics transmission.
3
4
## Package Information
5
6
- **Package Name**: flink-metrics-graphite
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**: `org.apache.flink:flink-metrics-graphite:2.1.0`
10
11
## Core Imports
12
13
```java
14
import org.apache.flink.metrics.graphite.GraphiteReporter;
15
import org.apache.flink.metrics.graphite.GraphiteReporterFactory;
16
import org.apache.flink.metrics.MetricConfig;
17
import org.apache.flink.metrics.reporter.MetricReporter;
18
import com.codahale.metrics.ScheduledReporter;
19
```
20
21
## Basic Usage
22
23
```java
24
import org.apache.flink.metrics.graphite.GraphiteReporter;
25
import org.apache.flink.metrics.MetricConfig;
26
import com.codahale.metrics.ScheduledReporter;
27
28
// Configure and create a GraphiteReporter
29
MetricConfig config = new MetricConfig();
30
config.setString(GraphiteReporter.ARG_HOST, "localhost");
31
config.setString(GraphiteReporter.ARG_PORT, "2003");
32
config.setString(GraphiteReporter.ARG_PROTOCOL, "TCP");
33
34
// Create GraphiteReporter and get configured ScheduledReporter
35
GraphiteReporter reporter = new GraphiteReporter();
36
ScheduledReporter scheduledReporter = reporter.getReporter(config);
37
38
// Or use through Flink's metric system lifecycle
39
reporter.open(config);
40
// Metrics will be automatically reported
41
reporter.close(); // When shutting down
42
```
43
44
## Architecture
45
46
The flink-metrics-graphite package is built on top of Apache Flink's metrics framework:
47
48
- **GraphiteReporter**: Extends `ScheduledDropwizardReporter` to provide Graphite-specific functionality
49
- **GraphiteReporterFactory**: Implements `MetricReporterFactory` for service provider interface integration
50
- **Protocol Support**: Handles both TCP and UDP communication protocols with Graphite
51
- **Dropwizard Integration**: Uses Dropwizard Metrics library internally for reporter implementation
52
- **Configuration**: Supports flexible configuration through Flink's `MetricConfig` system
53
54
## Capabilities
55
56
### Graphite Metrics Reporting
57
58
Main reporter class that exports Flink metrics to Graphite monitoring systems.
59
60
```java { .api }
61
/**
62
* GraphiteReporter extends ScheduledDropwizardReporter to provide Graphite integration.
63
* Supports both TCP and UDP protocols for metrics transmission.
64
*/
65
@PublicEvolving
66
public class GraphiteReporter extends ScheduledDropwizardReporter {
67
68
/** Configuration parameter name for protocol selection */
69
public static final String ARG_PROTOCOL = "protocol";
70
71
/** Inherited constants from ScheduledDropwizardReporter */
72
public static final String ARG_HOST = "host";
73
public static final String ARG_PORT = "port";
74
public static final String ARG_PREFIX = "prefix";
75
public static final String ARG_CONVERSION_RATE = "rateConversion";
76
public static final String ARG_CONVERSION_DURATION = "durationConversion";
77
78
/**
79
* Creates and configures a Dropwizard GraphiteReporter instance.
80
* @param config MetricConfig containing connection and formatting options
81
* @return ScheduledReporter configured for Graphite communication
82
* @throws IllegalArgumentException if host/port configuration is invalid
83
*/
84
@Override
85
public ScheduledReporter getReporter(MetricConfig config);
86
87
/** Inherited methods from ScheduledDropwizardReporter and MetricReporter */
88
89
/**
90
* Opens the reporter with the given configuration.
91
* @param config MetricConfig containing reporter configuration
92
*/
93
@Override
94
public void open(MetricConfig config);
95
96
/**
97
* Closes the reporter and stops metric reporting.
98
*/
99
@Override
100
public void close();
101
102
/**
103
* Called when a metric is added to the system.
104
* @param metric The metric instance
105
* @param metricName The metric name
106
* @param group The metric group
107
*/
108
@Override
109
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
110
111
/**
112
* Called when a metric is removed from the system.
113
* @param metric The metric instance
114
* @param metricName The metric name
115
* @param group The metric group
116
*/
117
@Override
118
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
119
120
/**
121
* Reports all registered metrics.
122
*/
123
@Override
124
public void report();
125
126
/**
127
* Filters invalid characters from metric names.
128
* @param metricName The original metric name
129
* @return Filtered metric name with invalid characters replaced
130
*/
131
@Override
132
public String filterCharacters(String metricName);
133
}
134
```
135
136
**Usage Example:**
137
138
```java
139
import org.apache.flink.metrics.graphite.GraphiteReporter;
140
import org.apache.flink.metrics.MetricConfig;
141
import com.codahale.metrics.ScheduledReporter;
142
143
// Create reporter with TCP protocol (default)
144
GraphiteReporter reporter = new GraphiteReporter();
145
MetricConfig config = new MetricConfig();
146
config.setString(GraphiteReporter.ARG_HOST, "graphite.example.com");
147
config.setString(GraphiteReporter.ARG_PORT, "2003");
148
config.setString(GraphiteReporter.ARG_PROTOCOL, "TCP");
149
config.setString(GraphiteReporter.ARG_PREFIX, "flink.myapp");
150
151
ScheduledReporter scheduledReporter = reporter.getReporter(config);
152
153
// Create new config for UDP protocol
154
MetricConfig udpConfig = new MetricConfig();
155
udpConfig.setString(GraphiteReporter.ARG_HOST, "graphite.example.com");
156
udpConfig.setString(GraphiteReporter.ARG_PORT, "2003");
157
udpConfig.setString(GraphiteReporter.ARG_PROTOCOL, "UDP");
158
ScheduledReporter udpReporter = reporter.getReporter(udpConfig);
159
```
160
161
### Reporter Factory
162
163
Factory class for creating GraphiteReporter instances through Flink's service provider interface.
164
165
```java { .api }
166
/**
167
* MetricReporterFactory implementation for GraphiteReporter.
168
* Enables automatic discovery and instantiation by Flink's metrics system.
169
*/
170
public class GraphiteReporterFactory implements MetricReporterFactory {
171
172
/**
173
* Creates a new GraphiteReporter instance.
174
* @param properties Configuration properties (currently unused)
175
* @return New GraphiteReporter instance
176
*/
177
@Override
178
public MetricReporter createMetricReporter(Properties properties);
179
}
180
```
181
182
**Usage Example:**
183
184
```java
185
import org.apache.flink.metrics.graphite.GraphiteReporterFactory;
186
import org.apache.flink.metrics.reporter.MetricReporter;
187
import java.util.Properties;
188
189
// Direct factory usage
190
GraphiteReporterFactory factory = new GraphiteReporterFactory();
191
Properties props = new Properties();
192
MetricReporter reporter = factory.createMetricReporter(props);
193
194
// Automatic discovery via SPI - configured in Flink configuration
195
// metrics.reporter.graphite.factory.class: org.apache.flink.metrics.graphite.GraphiteReporterFactory
196
```
197
198
## Configuration Parameters
199
200
The GraphiteReporter accepts the following configuration parameters through `MetricConfig`:
201
202
```java { .api }
203
/** Configuration parameters inherited from ScheduledDropwizardReporter */
204
public static final String ARG_HOST = "host"; // Required: Graphite server hostname
205
public static final String ARG_PORT = "port"; // Required: Graphite server port
206
public static final String ARG_PREFIX = "prefix"; // Optional: Metric name prefix
207
public static final String ARG_CONVERSION_RATE = "rateConversion"; // Optional: Rate conversion time unit
208
public static final String ARG_CONVERSION_DURATION = "durationConversion"; // Optional: Duration conversion time unit
209
210
/** GraphiteReporter-specific parameters */
211
public static final String ARG_PROTOCOL = "protocol"; // Optional: "TCP" or "UDP" (default: "TCP", case-sensitive)
212
```
213
214
**Configuration Example:**
215
216
```java
217
MetricConfig config = new MetricConfig();
218
219
// Required parameters
220
config.setString(GraphiteReporter.ARG_HOST, "graphite.monitoring.com");
221
config.setString(GraphiteReporter.ARG_PORT, "2003");
222
223
// Optional parameters
224
config.setString(GraphiteReporter.ARG_PROTOCOL, "UDP"); // Use UDP instead of TCP
225
config.setString(GraphiteReporter.ARG_PREFIX, "flink.production.app1"); // Add prefix to all metrics
226
config.setString(GraphiteReporter.ARG_CONVERSION_RATE, "SECONDS"); // Convert rates to per-second
227
config.setString(GraphiteReporter.ARG_CONVERSION_DURATION, "MILLISECONDS"); // Convert durations to milliseconds
228
```
229
230
## Error Handling
231
232
The GraphiteReporter includes validation and error handling:
233
234
- **Invalid Host/Port**: `getReporter()` throws `IllegalArgumentException` if host is null/empty or port < 1
235
- **Invalid Protocol**: Logs warning message and defaults to TCP if protocol is not "TCP" or "UDP" (case-sensitive)
236
- **Connection Issues**: Handled by underlying Dropwizard Graphite reporter with standard network error handling
237
238
**Example Error Handling:**
239
240
```java
241
try {
242
MetricConfig config = new MetricConfig();
243
config.setString(GraphiteReporter.ARG_HOST, ""); // Invalid empty host
244
config.setString(GraphiteReporter.ARG_PORT, "0"); // Invalid port < 1
245
246
GraphiteReporter reporter = new GraphiteReporter();
247
reporter.getReporter(config); // Throws IllegalArgumentException
248
} catch (IllegalArgumentException e) {
249
// Handle configuration error
250
// Message format: "Invalid host/port configuration. Host: [host] Port: [port]"
251
System.err.println("Configuration error: " + e.getMessage());
252
}
253
```
254
255
## Types
256
257
```java { .api }
258
/** Protocol enumeration for internal use (private to GraphiteReporter class) */
259
private enum Protocol {
260
TCP, // TCP socket communication (default)
261
UDP // UDP datagram communication
262
}
263
264
/** Required imports for complete API usage */
265
import org.apache.flink.metrics.Metric;
266
import org.apache.flink.metrics.MetricGroup;
267
import org.apache.flink.metrics.Counter;
268
import org.apache.flink.metrics.Gauge;
269
import org.apache.flink.metrics.Histogram;
270
import org.apache.flink.metrics.Meter;
271
import java.util.Properties;
272
```
273
274
## Dependencies
275
276
The package requires the following dependencies:
277
278
- **Provided Dependencies** (must be available at runtime):
279
- `org.apache.flink:flink-annotations`
280
- `org.apache.flink:flink-metrics-core`
281
282
- **Bundled Dependencies** (included in shaded JAR):
283
- `org.apache.flink:flink-metrics-dropwizard`
284
- `io.dropwizard.metrics:metrics-core`
285
- `io.dropwizard.metrics:metrics-graphite`