or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/maven-org-apache-flink--flink-metrics-influxdb-2-11

InfluxDB metrics reporter for Apache Flink that enables exporting Flink metrics to InfluxDB time series database

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-metrics-influxdb_2.11@1.13.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-metrics-influxdb-2-11@1.13.0

index.mddocs/

Flink Metrics InfluxDB Reporter

Flink Metrics InfluxDB Reporter provides an InfluxDB integration for Apache Flink's metrics system, enabling Flink applications to export runtime metrics (counters, gauges, histograms, meters) to InfluxDB time series database. The reporter implements Flink's MetricReporter interface and supports scheduled metric collection, configurable connection parameters, batch writing, and comprehensive metric mapping capabilities.

Package Information

  • Package Name: flink-metrics-influxdb_2.11
  • Package Type: Maven
  • Language: Java
  • Group ID: org.apache.flink
  • Artifact ID: flink-metrics-influxdb_2.11
  • Installation: Add Maven dependency:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-metrics-influxdb_2.11</artifactId>
    <version>1.13.6</version>
</dependency>

Core Imports

import org.apache.flink.metrics.influxdb.InfluxdbReporter;
import org.apache.flink.metrics.influxdb.InfluxdbReporterFactory;
import org.apache.flink.metrics.influxdb.InfluxdbReporterOptions;

Basic Usage

The InfluxDB reporter is typically configured through Flink's configuration system rather than being instantiated directly in code. Configuration is done via Flink configuration properties:

# Enable InfluxDB reporter
metrics.reporters: influxdb
metrics.reporter.influxdb.factory.class: org.apache.flink.metrics.influxdb.InfluxdbReporterFactory

# Configure InfluxDB connection
metrics.reporter.influxdb.host: localhost
metrics.reporter.influxdb.port: 8086
metrics.reporter.influxdb.db: flink_metrics
metrics.reporter.influxdb.username: admin
metrics.reporter.influxdb.password: password

When configured, Flink will automatically instantiate the reporter and begin sending metrics to InfluxDB:

// Example of how Flink uses the reporter internally
MetricReporter reporter = new InfluxdbReporterFactory().createMetricReporter(properties);
reporter.open(metricConfig);
// Metrics are automatically reported at scheduled intervals

Architecture

The InfluxDB reporter is built around several key components:

  • Reporter Factory: InfluxdbReporterFactory creates reporter instances via service provider interface
  • Main Reporter: InfluxdbReporter handles metric collection and batch transmission to InfluxDB
  • Configuration System: InfluxdbReporterOptions provides comprehensive configuration options
  • Metric Mapping: MetricMapper converts Flink metrics to InfluxDB measurement points
  • Measurement System: MeasurementInfo and MeasurementInfoProvider handle metric metadata and tagging

Capabilities

Reporter Factory

Factory for creating InfluxDB reporter instances. Automatically registered via service provider interface and used by Flink's metric system.

public class InfluxdbReporterFactory implements MetricReporterFactory {
    public MetricReporter createMetricReporter(Properties properties);
}

Main Reporter

Core InfluxDB metrics reporter that extends Flink's reporter framework with scheduled InfluxDB transmission.

public class InfluxdbReporter extends AbstractReporter<MeasurementInfo> implements Scheduled {
    public InfluxdbReporter();
    public void open(MetricConfig config);
    public void close();
    public void report();
}

Configuration Options

Comprehensive configuration options for InfluxDB connection, authentication, and performance tuning.

public class InfluxdbReporterOptions {
    // Connection Configuration
    public static final ConfigOption<String> HOST;
    public static final ConfigOption<Scheme> SCHEME;
    public static final ConfigOption<Integer> PORT;
    
    // Authentication Configuration  
    public static final ConfigOption<String> USERNAME;
    public static final ConfigOption<String> PASSWORD;
    
    // Database Configuration
    public static final ConfigOption<String> DB;
    public static final ConfigOption<String> RETENTION_POLICY;
    public static final ConfigOption<InfluxDB.ConsistencyLevel> CONSISTENCY;
    
    // Performance Configuration
    public static final ConfigOption<Integer> CONNECT_TIMEOUT;
    public static final ConfigOption<Integer> WRITE_TIMEOUT;
}

public enum Scheme {
    HTTP("http"),
    HTTPS("https");
    
    public String toString();
}

Metric Mapping

Package-private utility functions for converting Flink metrics to InfluxDB measurement points with appropriate field mappings.

class MetricMapper {
    static Point map(MeasurementInfo info, Instant timestamp, Gauge<?> gauge);
    static Point map(MeasurementInfo info, Instant timestamp, Counter counter);
    static Point map(MeasurementInfo info, Instant timestamp, Histogram histogram);
    static Point map(MeasurementInfo info, Instant timestamp, Meter meter);
}

Measurement Information

Package-private data structures for representing InfluxDB measurement metadata including names and tags derived from Flink metric groups.

final class MeasurementInfo {
    MeasurementInfo(String name, Map<String, String> tags);
    String getName();
    Map<String, String> getTags();
}

class MeasurementInfoProvider implements MetricInfoProvider<MeasurementInfo> {
    public MeasurementInfo getMetricInfo(String metricName, MetricGroup group);
}

interface MetricInfoProvider<MetricInfo> {
    MetricInfo getMetricInfo(String metricName, MetricGroup group);
}

Abstract Reporter Base

Package-private base class providing metric registry functionality for different metric types with generic information handling.

abstract class AbstractReporter<MetricInfo> implements MetricReporter {
    protected AbstractReporter(MetricInfoProvider<MetricInfo> metricInfoProvider);
    public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
    public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
}

Configuration Examples

Basic Configuration

metrics.reporters: influxdb
metrics.reporter.influxdb.factory.class: org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
metrics.reporter.influxdb.host: localhost
metrics.reporter.influxdb.port: 8086
metrics.reporter.influxdb.db: flink_metrics

Authenticated Configuration

metrics.reporters: influxdb
metrics.reporter.influxdb.factory.class: org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
metrics.reporter.influxdb.host: influxdb.example.com
metrics.reporter.influxdb.port: 8086
metrics.reporter.influxdb.scheme: HTTPS
metrics.reporter.influxdb.db: production_metrics
metrics.reporter.influxdb.username: flink_user
metrics.reporter.influxdb.password: secure_password
metrics.reporter.influxdb.retentionPolicy: autogen
metrics.reporter.influxdb.consistency: ONE

Performance Tuned Configuration

metrics.reporters: influxdb
metrics.reporter.influxdb.factory.class: org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
metrics.reporter.influxdb.host: influxdb.example.com
metrics.reporter.influxdb.port: 8086
metrics.reporter.influxdb.db: flink_metrics
metrics.reporter.influxdb.connectTimeout: 5000
metrics.reporter.influxdb.writeTimeout: 15000
metrics.reporter.influxdb.consistency: QUORUM

Error Handling

The reporter includes graceful error handling for common scenarios:

  • Connection failures: Logged but do not interrupt Flink execution
  • Authentication errors: Thrown during reporter initialization
  • Concurrent modifications: Ignored and retried on next report cycle
  • Invalid configuration: Throws IllegalArgumentException during setup
  • Network timeouts: Configurable via timeout options

Metric Types Support

The reporter supports all standard Flink metric types with appropriate InfluxDB field mappings:

  • Gauges: Stored as single "value" field (numeric or string)
  • Counters: Stored as "count" field
  • Histograms: Stored with count, min, max, mean, stddev, and percentile fields (p50, p75, p95, p98, p99, p999)
  • Meters: Stored with "count" and "rate" fields

Tags and Naming

Flink metric group variables are automatically converted to InfluxDB tags, with metric names constructed using the logical scope and metric name separated by underscores. Characters are normalized to InfluxDB-compatible format (alphanumeric, colon, underscore only).