InfluxDB metrics reporter for Apache Flink that enables exporting Flink metrics to InfluxDB time series database
npx @tessl/cli install tessl/maven-org-apache-flink--flink-metrics-influxdb-2-11@1.13.0Flink Metrics InfluxDB Reporter provides an InfluxDB integration for Apache Flink's metrics system, enabling Flink applications to export runtime metrics (counters, gauges, histograms, meters) to InfluxDB time series database. The reporter implements Flink's MetricReporter interface and supports scheduled metric collection, configurable connection parameters, batch writing, and comprehensive metric mapping capabilities.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-influxdb_2.11</artifactId>
<version>1.13.6</version>
</dependency>import org.apache.flink.metrics.influxdb.InfluxdbReporter;
import org.apache.flink.metrics.influxdb.InfluxdbReporterFactory;
import org.apache.flink.metrics.influxdb.InfluxdbReporterOptions;The InfluxDB reporter is typically configured through Flink's configuration system rather than being instantiated directly in code. Configuration is done via Flink configuration properties:
# Enable InfluxDB reporter
metrics.reporters: influxdb
metrics.reporter.influxdb.factory.class: org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
# Configure InfluxDB connection
metrics.reporter.influxdb.host: localhost
metrics.reporter.influxdb.port: 8086
metrics.reporter.influxdb.db: flink_metrics
metrics.reporter.influxdb.username: admin
metrics.reporter.influxdb.password: passwordWhen configured, Flink will automatically instantiate the reporter and begin sending metrics to InfluxDB:
// Example of how Flink uses the reporter internally
MetricReporter reporter = new InfluxdbReporterFactory().createMetricReporter(properties);
reporter.open(metricConfig);
// Metrics are automatically reported at scheduled intervalsThe InfluxDB reporter is built around several key components:
InfluxdbReporterFactory creates reporter instances via service provider interfaceInfluxdbReporter handles metric collection and batch transmission to InfluxDBInfluxdbReporterOptions provides comprehensive configuration optionsMetricMapper converts Flink metrics to InfluxDB measurement pointsMeasurementInfo and MeasurementInfoProvider handle metric metadata and taggingFactory for creating InfluxDB reporter instances. Automatically registered via service provider interface and used by Flink's metric system.
public class InfluxdbReporterFactory implements MetricReporterFactory {
public MetricReporter createMetricReporter(Properties properties);
}Core InfluxDB metrics reporter that extends Flink's reporter framework with scheduled InfluxDB transmission.
public class InfluxdbReporter extends AbstractReporter<MeasurementInfo> implements Scheduled {
public InfluxdbReporter();
public void open(MetricConfig config);
public void close();
public void report();
}Comprehensive configuration options for InfluxDB connection, authentication, and performance tuning.
public class InfluxdbReporterOptions {
// Connection Configuration
public static final ConfigOption<String> HOST;
public static final ConfigOption<Scheme> SCHEME;
public static final ConfigOption<Integer> PORT;
// Authentication Configuration
public static final ConfigOption<String> USERNAME;
public static final ConfigOption<String> PASSWORD;
// Database Configuration
public static final ConfigOption<String> DB;
public static final ConfigOption<String> RETENTION_POLICY;
public static final ConfigOption<InfluxDB.ConsistencyLevel> CONSISTENCY;
// Performance Configuration
public static final ConfigOption<Integer> CONNECT_TIMEOUT;
public static final ConfigOption<Integer> WRITE_TIMEOUT;
}
public enum Scheme {
HTTP("http"),
HTTPS("https");
public String toString();
}Package-private utility functions for converting Flink metrics to InfluxDB measurement points with appropriate field mappings.
class MetricMapper {
static Point map(MeasurementInfo info, Instant timestamp, Gauge<?> gauge);
static Point map(MeasurementInfo info, Instant timestamp, Counter counter);
static Point map(MeasurementInfo info, Instant timestamp, Histogram histogram);
static Point map(MeasurementInfo info, Instant timestamp, Meter meter);
}Package-private data structures for representing InfluxDB measurement metadata including names and tags derived from Flink metric groups.
final class MeasurementInfo {
MeasurementInfo(String name, Map<String, String> tags);
String getName();
Map<String, String> getTags();
}
class MeasurementInfoProvider implements MetricInfoProvider<MeasurementInfo> {
public MeasurementInfo getMetricInfo(String metricName, MetricGroup group);
}
interface MetricInfoProvider<MetricInfo> {
MetricInfo getMetricInfo(String metricName, MetricGroup group);
}Package-private base class providing metric registry functionality for different metric types with generic information handling.
abstract class AbstractReporter<MetricInfo> implements MetricReporter {
protected AbstractReporter(MetricInfoProvider<MetricInfo> metricInfoProvider);
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
}metrics.reporters: influxdb
metrics.reporter.influxdb.factory.class: org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
metrics.reporter.influxdb.host: localhost
metrics.reporter.influxdb.port: 8086
metrics.reporter.influxdb.db: flink_metricsmetrics.reporters: influxdb
metrics.reporter.influxdb.factory.class: org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
metrics.reporter.influxdb.host: influxdb.example.com
metrics.reporter.influxdb.port: 8086
metrics.reporter.influxdb.scheme: HTTPS
metrics.reporter.influxdb.db: production_metrics
metrics.reporter.influxdb.username: flink_user
metrics.reporter.influxdb.password: secure_password
metrics.reporter.influxdb.retentionPolicy: autogen
metrics.reporter.influxdb.consistency: ONEmetrics.reporters: influxdb
metrics.reporter.influxdb.factory.class: org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
metrics.reporter.influxdb.host: influxdb.example.com
metrics.reporter.influxdb.port: 8086
metrics.reporter.influxdb.db: flink_metrics
metrics.reporter.influxdb.connectTimeout: 5000
metrics.reporter.influxdb.writeTimeout: 15000
metrics.reporter.influxdb.consistency: QUORUMThe reporter includes graceful error handling for common scenarios:
The reporter supports all standard Flink metric types with appropriate InfluxDB field mappings:
Flink metric group variables are automatically converted to InfluxDB tags, with metric names constructed using the logical scope and metric name separated by underscores. Characters are normalized to InfluxDB-compatible format (alphanumeric, colon, underscore only).