or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Flink Metrics InfluxDB Reporter

1

2

Flink Metrics InfluxDB Reporter provides an InfluxDB integration for Apache Flink's metrics system, enabling Flink applications to export runtime metrics (counters, gauges, histograms, meters) to InfluxDB time series database. The reporter implements Flink's MetricReporter interface and supports scheduled metric collection, configurable connection parameters, batch writing, and comprehensive metric mapping capabilities.

3

4

## Package Information

5

6

- **Package Name**: flink-metrics-influxdb_2.11

7

- **Package Type**: Maven

8

- **Language**: Java

9

- **Group ID**: org.apache.flink

10

- **Artifact ID**: flink-metrics-influxdb_2.11

11

- **Installation**: Add Maven dependency:

12

13

```xml

14

<dependency>

15

<groupId>org.apache.flink</groupId>

16

<artifactId>flink-metrics-influxdb_2.11</artifactId>

17

<version>1.13.6</version>

18

</dependency>

19

```

20

21

## Core Imports

22

23

```java

24

import org.apache.flink.metrics.influxdb.InfluxdbReporter;

25

import org.apache.flink.metrics.influxdb.InfluxdbReporterFactory;

26

import org.apache.flink.metrics.influxdb.InfluxdbReporterOptions;

27

```

28

29

## Basic Usage

30

31

The InfluxDB reporter is typically configured through Flink's configuration system rather than being instantiated directly in code. Configuration is done via Flink configuration properties:

32

33

```properties

34

# Enable InfluxDB reporter

35

metrics.reporters: influxdb

36

metrics.reporter.influxdb.factory.class: org.apache.flink.metrics.influxdb.InfluxdbReporterFactory

37

38

# Configure InfluxDB connection

39

metrics.reporter.influxdb.host: localhost

40

metrics.reporter.influxdb.port: 8086

41

metrics.reporter.influxdb.db: flink_metrics

42

metrics.reporter.influxdb.username: admin

43

metrics.reporter.influxdb.password: password

44

```

45

46

When configured, Flink will automatically instantiate the reporter and begin sending metrics to InfluxDB:

47

48

```java

49

// Example of how Flink uses the reporter internally

50

MetricReporter reporter = new InfluxdbReporterFactory().createMetricReporter(properties);

51

reporter.open(metricConfig);

52

// Metrics are automatically reported at scheduled intervals

53

```

54

55

## Architecture

56

57

The InfluxDB reporter is built around several key components:

58

59

- **Reporter Factory**: `InfluxdbReporterFactory` creates reporter instances via service provider interface

60

- **Main Reporter**: `InfluxdbReporter` handles metric collection and batch transmission to InfluxDB

61

- **Configuration System**: `InfluxdbReporterOptions` provides comprehensive configuration options

62

- **Metric Mapping**: `MetricMapper` converts Flink metrics to InfluxDB measurement points

63

- **Measurement System**: `MeasurementInfo` and `MeasurementInfoProvider` handle metric metadata and tagging

64

65

## Capabilities

66

67

### Reporter Factory

68

69

Factory for creating InfluxDB reporter instances. Automatically registered via service provider interface and used by Flink's metric system.

70

71

```java { .api }

72

public class InfluxdbReporterFactory implements MetricReporterFactory {

73

public MetricReporter createMetricReporter(Properties properties);

74

}

75

```

76

77

### Main Reporter

78

79

Core InfluxDB metrics reporter that extends Flink's reporter framework with scheduled InfluxDB transmission.

80

81

```java { .api }

82

public class InfluxdbReporter extends AbstractReporter<MeasurementInfo> implements Scheduled {

83

public InfluxdbReporter();

84

public void open(MetricConfig config);

85

public void close();

86

public void report();

87

}

88

```

89

90

### Configuration Options

91

92

Comprehensive configuration options for InfluxDB connection, authentication, and performance tuning.

93

94

```java { .api }

95

public class InfluxdbReporterOptions {

96

// Connection Configuration

97

public static final ConfigOption<String> HOST;

98

public static final ConfigOption<Scheme> SCHEME;

99

public static final ConfigOption<Integer> PORT;

100

101

// Authentication Configuration

102

public static final ConfigOption<String> USERNAME;

103

public static final ConfigOption<String> PASSWORD;

104

105

// Database Configuration

106

public static final ConfigOption<String> DB;

107

public static final ConfigOption<String> RETENTION_POLICY;

108

public static final ConfigOption<InfluxDB.ConsistencyLevel> CONSISTENCY;

109

110

// Performance Configuration

111

public static final ConfigOption<Integer> CONNECT_TIMEOUT;

112

public static final ConfigOption<Integer> WRITE_TIMEOUT;

113

}

114

115

public enum Scheme {

116

HTTP("http"),

117

HTTPS("https");

118

119

public String toString();

120

}

121

```

122

123

### Metric Mapping

124

125

Package-private utility functions for converting Flink metrics to InfluxDB measurement points with appropriate field mappings.

126

127

```java { .api }

128

class MetricMapper {

129

static Point map(MeasurementInfo info, Instant timestamp, Gauge<?> gauge);

130

static Point map(MeasurementInfo info, Instant timestamp, Counter counter);

131

static Point map(MeasurementInfo info, Instant timestamp, Histogram histogram);

132

static Point map(MeasurementInfo info, Instant timestamp, Meter meter);

133

}

134

```

135

136

### Measurement Information

137

138

Package-private data structures for representing InfluxDB measurement metadata including names and tags derived from Flink metric groups.

139

140

```java { .api }

141

final class MeasurementInfo {

142

MeasurementInfo(String name, Map<String, String> tags);

143

String getName();

144

Map<String, String> getTags();

145

}

146

147

class MeasurementInfoProvider implements MetricInfoProvider<MeasurementInfo> {

148

public MeasurementInfo getMetricInfo(String metricName, MetricGroup group);

149

}

150

151

interface MetricInfoProvider<MetricInfo> {

152

MetricInfo getMetricInfo(String metricName, MetricGroup group);

153

}

154

```

155

156

### Abstract Reporter Base

157

158

Package-private base class providing metric registry functionality for different metric types with generic information handling.

159

160

```java { .api }

161

abstract class AbstractReporter<MetricInfo> implements MetricReporter {

162

protected AbstractReporter(MetricInfoProvider<MetricInfo> metricInfoProvider);

163

public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);

164

public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);

165

}

166

```

167

168

## Configuration Examples

169

170

### Basic Configuration

171

172

```properties

173

metrics.reporters: influxdb

174

metrics.reporter.influxdb.factory.class: org.apache.flink.metrics.influxdb.InfluxdbReporterFactory

175

metrics.reporter.influxdb.host: localhost

176

metrics.reporter.influxdb.port: 8086

177

metrics.reporter.influxdb.db: flink_metrics

178

```

179

180

### Authenticated Configuration

181

182

```properties

183

metrics.reporters: influxdb

184

metrics.reporter.influxdb.factory.class: org.apache.flink.metrics.influxdb.InfluxdbReporterFactory

185

metrics.reporter.influxdb.host: influxdb.example.com

186

metrics.reporter.influxdb.port: 8086

187

metrics.reporter.influxdb.scheme: HTTPS

188

metrics.reporter.influxdb.db: production_metrics

189

metrics.reporter.influxdb.username: flink_user

190

metrics.reporter.influxdb.password: secure_password

191

metrics.reporter.influxdb.retentionPolicy: autogen

192

metrics.reporter.influxdb.consistency: ONE

193

```

194

195

### Performance Tuned Configuration

196

197

```properties

198

metrics.reporters: influxdb

199

metrics.reporter.influxdb.factory.class: org.apache.flink.metrics.influxdb.InfluxdbReporterFactory

200

metrics.reporter.influxdb.host: influxdb.example.com

201

metrics.reporter.influxdb.port: 8086

202

metrics.reporter.influxdb.db: flink_metrics

203

metrics.reporter.influxdb.connectTimeout: 5000

204

metrics.reporter.influxdb.writeTimeout: 15000

205

metrics.reporter.influxdb.consistency: QUORUM

206

```

207

208

## Error Handling

209

210

The reporter includes graceful error handling for common scenarios:

211

212

- **Connection failures**: Logged but do not interrupt Flink execution

213

- **Authentication errors**: Thrown during reporter initialization

214

- **Concurrent modifications**: Ignored and retried on next report cycle

215

- **Invalid configuration**: Throws IllegalArgumentException during setup

216

- **Network timeouts**: Configurable via timeout options

217

218

## Metric Types Support

219

220

The reporter supports all standard Flink metric types with appropriate InfluxDB field mappings:

221

222

- **Gauges**: Stored as single "value" field (numeric or string)

223

- **Counters**: Stored as "count" field

224

- **Histograms**: Stored with count, min, max, mean, stddev, and percentile fields (p50, p75, p95, p98, p99, p999)

225

- **Meters**: Stored with "count" and "rate" fields

226

227

## Tags and Naming

228

229

Flink metric group variables are automatically converted to InfluxDB tags, with metric names constructed using the logical scope and metric name separated by underscores. Characters are normalized to InfluxDB-compatible format (alphanumeric, colon, underscore only).