0
# Flink Metrics InfluxDB Reporter
1
2
Flink Metrics InfluxDB Reporter provides an InfluxDB integration for Apache Flink's metrics system, enabling Flink applications to export runtime metrics (counters, gauges, histograms, meters) to InfluxDB time series database. The reporter implements Flink's MetricReporter interface and supports scheduled metric collection, configurable connection parameters, batch writing, and comprehensive metric mapping capabilities.
3
4
## Package Information
5
6
- **Package Name**: flink-metrics-influxdb_2.11
7
- **Package Type**: Maven
8
- **Language**: Java
9
- **Group ID**: org.apache.flink
10
- **Artifact ID**: flink-metrics-influxdb_2.11
11
- **Installation**: Add Maven dependency:
12
13
```xml
14
<dependency>
15
<groupId>org.apache.flink</groupId>
16
<artifactId>flink-metrics-influxdb_2.11</artifactId>
17
<version>1.13.6</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
```java
24
import org.apache.flink.metrics.influxdb.InfluxdbReporter;
25
import org.apache.flink.metrics.influxdb.InfluxdbReporterFactory;
26
import org.apache.flink.metrics.influxdb.InfluxdbReporterOptions;
27
```
28
29
## Basic Usage
30
31
The InfluxDB reporter is typically configured through Flink's configuration system rather than being instantiated directly in code. Configuration is done via Flink configuration properties:
32
33
```properties
34
# Enable InfluxDB reporter
35
metrics.reporters: influxdb
36
metrics.reporter.influxdb.factory.class: org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
37
38
# Configure InfluxDB connection
39
metrics.reporter.influxdb.host: localhost
40
metrics.reporter.influxdb.port: 8086
41
metrics.reporter.influxdb.db: flink_metrics
42
metrics.reporter.influxdb.username: admin
43
metrics.reporter.influxdb.password: password
44
```
45
46
When configured, Flink will automatically instantiate the reporter and begin sending metrics to InfluxDB:
47
48
```java
49
// Example of how Flink uses the reporter internally
50
MetricReporter reporter = new InfluxdbReporterFactory().createMetricReporter(properties);
51
reporter.open(metricConfig);
52
// Metrics are automatically reported at scheduled intervals
53
```
54
55
## Architecture
56
57
The InfluxDB reporter is built around several key components:
58
59
- **Reporter Factory**: `InfluxdbReporterFactory` creates reporter instances via service provider interface
60
- **Main Reporter**: `InfluxdbReporter` handles metric collection and batch transmission to InfluxDB
61
- **Configuration System**: `InfluxdbReporterOptions` provides comprehensive configuration options
62
- **Metric Mapping**: `MetricMapper` converts Flink metrics to InfluxDB measurement points
63
- **Measurement System**: `MeasurementInfo` and `MeasurementInfoProvider` handle metric metadata and tagging
64
65
## Capabilities
66
67
### Reporter Factory
68
69
Factory for creating InfluxDB reporter instances. Automatically registered via service provider interface and used by Flink's metric system.
70
71
```java { .api }
72
public class InfluxdbReporterFactory implements MetricReporterFactory {
73
public MetricReporter createMetricReporter(Properties properties);
74
}
75
```
76
77
### Main Reporter
78
79
Core InfluxDB metrics reporter that extends Flink's reporter framework with scheduled InfluxDB transmission.
80
81
```java { .api }
82
public class InfluxdbReporter extends AbstractReporter<MeasurementInfo> implements Scheduled {
83
public InfluxdbReporter();
84
public void open(MetricConfig config);
85
public void close();
86
public void report();
87
}
88
```
89
90
### Configuration Options
91
92
Comprehensive configuration options for InfluxDB connection, authentication, and performance tuning.
93
94
```java { .api }
95
public class InfluxdbReporterOptions {
96
// Connection Configuration
97
public static final ConfigOption<String> HOST;
98
public static final ConfigOption<Scheme> SCHEME;
99
public static final ConfigOption<Integer> PORT;
100
101
// Authentication Configuration
102
public static final ConfigOption<String> USERNAME;
103
public static final ConfigOption<String> PASSWORD;
104
105
// Database Configuration
106
public static final ConfigOption<String> DB;
107
public static final ConfigOption<String> RETENTION_POLICY;
108
public static final ConfigOption<InfluxDB.ConsistencyLevel> CONSISTENCY;
109
110
// Performance Configuration
111
public static final ConfigOption<Integer> CONNECT_TIMEOUT;
112
public static final ConfigOption<Integer> WRITE_TIMEOUT;
113
}
114
115
public enum Scheme {
116
HTTP("http"),
117
HTTPS("https");
118
119
public String toString();
120
}
121
```
122
123
### Metric Mapping
124
125
Package-private utility functions for converting Flink metrics to InfluxDB measurement points with appropriate field mappings.
126
127
```java { .api }
128
class MetricMapper {
129
static Point map(MeasurementInfo info, Instant timestamp, Gauge<?> gauge);
130
static Point map(MeasurementInfo info, Instant timestamp, Counter counter);
131
static Point map(MeasurementInfo info, Instant timestamp, Histogram histogram);
132
static Point map(MeasurementInfo info, Instant timestamp, Meter meter);
133
}
134
```
135
136
### Measurement Information
137
138
Package-private data structures for representing InfluxDB measurement metadata including names and tags derived from Flink metric groups.
139
140
```java { .api }
141
final class MeasurementInfo {
142
MeasurementInfo(String name, Map<String, String> tags);
143
String getName();
144
Map<String, String> getTags();
145
}
146
147
class MeasurementInfoProvider implements MetricInfoProvider<MeasurementInfo> {
148
public MeasurementInfo getMetricInfo(String metricName, MetricGroup group);
149
}
150
151
interface MetricInfoProvider<MetricInfo> {
152
MetricInfo getMetricInfo(String metricName, MetricGroup group);
153
}
154
```
155
156
### Abstract Reporter Base
157
158
Package-private base class providing metric registry functionality for different metric types with generic information handling.
159
160
```java { .api }
161
abstract class AbstractReporter<MetricInfo> implements MetricReporter {
162
protected AbstractReporter(MetricInfoProvider<MetricInfo> metricInfoProvider);
163
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
164
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
165
}
166
```
167
168
## Configuration Examples
169
170
### Basic Configuration
171
172
```properties
173
metrics.reporters: influxdb
174
metrics.reporter.influxdb.factory.class: org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
175
metrics.reporter.influxdb.host: localhost
176
metrics.reporter.influxdb.port: 8086
177
metrics.reporter.influxdb.db: flink_metrics
178
```
179
180
### Authenticated Configuration
181
182
```properties
183
metrics.reporters: influxdb
184
metrics.reporter.influxdb.factory.class: org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
185
metrics.reporter.influxdb.host: influxdb.example.com
186
metrics.reporter.influxdb.port: 8086
187
metrics.reporter.influxdb.scheme: HTTPS
188
metrics.reporter.influxdb.db: production_metrics
189
metrics.reporter.influxdb.username: flink_user
190
metrics.reporter.influxdb.password: secure_password
191
metrics.reporter.influxdb.retentionPolicy: autogen
192
metrics.reporter.influxdb.consistency: ONE
193
```
194
195
### Performance Tuned Configuration
196
197
```properties
198
metrics.reporters: influxdb
199
metrics.reporter.influxdb.factory.class: org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
200
metrics.reporter.influxdb.host: influxdb.example.com
201
metrics.reporter.influxdb.port: 8086
202
metrics.reporter.influxdb.db: flink_metrics
203
metrics.reporter.influxdb.connectTimeout: 5000
204
metrics.reporter.influxdb.writeTimeout: 15000
205
metrics.reporter.influxdb.consistency: QUORUM
206
```
207
208
## Error Handling
209
210
The reporter includes graceful error handling for common scenarios:
211
212
- **Connection failures**: Logged but do not interrupt Flink execution
213
- **Authentication errors**: Thrown during reporter initialization
214
- **Concurrent modifications**: Ignored and retried on next report cycle
215
- **Invalid configuration**: Throws IllegalArgumentException during setup
216
- **Network timeouts**: Configurable via timeout options
217
218
## Metric Types Support
219
220
The reporter supports all standard Flink metric types with appropriate InfluxDB field mappings:
221
222
- **Gauges**: Stored as single "value" field (numeric or string)
223
- **Counters**: Stored as "count" field
224
- **Histograms**: Stored with count, min, max, mean, stddev, and percentile fields (p50, p75, p95, p98, p99, p999)
225
- **Meters**: Stored with "count" and "rate" fields
226
227
## Tags and Naming
228
229
Flink metric group variables are automatically converted to InfluxDB tags, with metric names constructed using the logical scope and metric name separated by underscores. Characters are normalized to InfluxDB-compatible format (alphanumeric, colon, underscore only).