or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcore-metrics.mdimplementations.mdindex.mdmetric-groups.mdreporters.mdspecialized-groups.mdtracing.md

index.mddocs/

0

# Flink Metrics Core

1

2

Core metrics interfaces and implementations for Apache Flink stream processing framework. This library provides the foundational metrics system that enables monitoring and observability within Flink applications, supporting essential metric types, hierarchical organization, and pluggable reporting to external monitoring systems.

3

4

## Package Information

5

6

- **Package Name**: flink-metrics-core

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Group ID**: org.apache.flink

10

- **Artifact ID**: flink-metrics-core

11

- **Installation**: Include in your Maven project:

12

13

```xml

14

<dependency>

15

<groupId>org.apache.flink</groupId>

16

<artifactId>flink-metrics-core</artifactId>

17

<version>1.20.2</version>

18

</dependency>

19

```

20

21

## Core Imports

22

23

```java

24

import org.apache.flink.metrics.*;

25

import org.apache.flink.metrics.groups.*;

26

import org.apache.flink.metrics.reporter.*;

27

```

28

29

For specific components:

30

31

```java

32

import org.apache.flink.metrics.Counter;

33

import org.apache.flink.metrics.Gauge;

34

import org.apache.flink.metrics.Meter;

35

import org.apache.flink.metrics.Histogram;

36

import org.apache.flink.metrics.MetricGroup;

37

```

38

39

## Basic Usage

40

41

```java

42

import org.apache.flink.metrics.*;

43

44

// Working with counters

45

Counter eventCounter = metricGroup.counter("events");

46

eventCounter.inc();

47

eventCounter.inc(5);

48

49

// Working with gauges

50

Gauge<Integer> queueSizeGauge = new Gauge<Integer>() {

51

@Override

52

public Integer getValue() {

53

return queue.size();

54

}

55

};

56

metricGroup.gauge("queueSize", queueSizeGauge);

57

58

// Working with meters

59

Meter throughputMeter = new MeterView(60); // 60 second time window

60

metricGroup.meter("throughput", throughputMeter);

61

throughputMeter.markEvent();

62

63

// Creating metric hierarchies

64

MetricGroup operatorGroup = rootGroup.addGroup("operators");

65

MetricGroup specificOpGroup = operatorGroup.addGroup("map-operator-1");

66

Counter opCounter = specificOpGroup.counter("records-processed");

67

```

68

69

## Architecture

70

71

Flink Metrics Core follows a hierarchical design built around several key components:

72

73

- **Core Metric Types**: `Counter`, `Gauge`, `Meter`, and `Histogram` provide the fundamental metric abstractions

74

- **Metric Groups**: `MetricGroup` creates hierarchical namespaces for organizing metrics logically

75

- **Reporter Framework**: `MetricReporter` and `MetricReporterFactory` enable pluggable export to external systems

76

- **Specialized Groups**: Component-specific metric groups for operators, sources, sinks, and coordinators

77

- **Configuration System**: `MetricConfig` provides type-safe configuration management

78

- **Tracing Support**: Experimental `Span` and `TraceReporter` for distributed tracing capabilities

79

80

This design enables flexible metric collection across Flink's distributed runtime while maintaining performance and providing extensibility for custom metric types and reporting backends.

81

82

## Capabilities

83

84

### Core Metric Types

85

86

Essential metric interfaces for measuring different aspects of system behavior. Includes counters for event counting, gauges for instantaneous values, meters for rate measurement, and histograms for distribution statistics.

87

88

```java { .api }

89

interface Counter extends Metric {

90

void inc();

91

void inc(long n);

92

void dec();

93

void dec(long n);

94

long getCount();

95

}

96

97

interface Gauge<T> extends Metric {

98

T getValue();

99

}

100

101

interface Meter extends Metric {

102

void markEvent();

103

void markEvent(long n);

104

double getRate();

105

long getCount();

106

}

107

108

interface Histogram extends Metric {

109

void update(long value);

110

long getCount();

111

HistogramStatistics getStatistics();

112

}

113

```

114

115

[Core Metrics](./core-metrics.md)

116

117

### Metric Organization

118

119

Hierarchical metric organization system for creating logical namespaces and managing metric lifecycles. Supports both flat and nested structures with variable interpolation and scoping.

120

121

```java { .api }

122

interface MetricGroup {

123

Counter counter(int name);

124

Counter counter(String name);

125

<C extends Counter> C counter(int name, C counter);

126

<C extends Counter> C counter(String name, C counter);

127

<T, G extends Gauge<T>> G gauge(int name, G gauge);

128

<T, G extends Gauge<T>> G gauge(String name, G gauge);

129

<H extends Histogram> H histogram(int name, H histogram);

130

<H extends Histogram> H histogram(String name, H histogram);

131

<M extends Meter> M meter(int name, M meter);

132

<M extends Meter> M meter(String name, M meter);

133

MetricGroup addGroup(int name);

134

MetricGroup addGroup(String name);

135

MetricGroup addGroup(String key, String value);

136

String[] getScopeComponents();

137

Map<String, String> getAllVariables();

138

String getMetricIdentifier(String metricName);

139

String getMetricIdentifier(String metricName, CharacterFilter filter);

140

void addSpan(SpanBuilder spanBuilder); // @Experimental

141

}

142

```

143

144

[Metric Groups](./metric-groups.md)

145

146

### Reporter Framework

147

148

Pluggable system for exporting metrics to external monitoring systems. Supports both push and pull patterns with configurable scheduling and lifecycle management.

149

150

```java { .api }

151

interface MetricReporter {

152

void open(MetricConfig config);

153

void close();

154

void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);

155

void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);

156

}

157

158

interface MetricReporterFactory {

159

MetricReporter createMetricReporter(Properties properties);

160

}

161

162

interface Scheduled {

163

void report();

164

}

165

```

166

167

[Reporters](./reporters.md)

168

169

### Metric Implementations

170

171

Concrete implementations of metric interfaces providing both thread-safe and performance-optimized variants. Includes simple counters, meter views with time windowing, and utility implementations.

172

173

```java { .api }

174

class SimpleCounter implements Counter { /* non-thread-safe */ }

175

class ThreadSafeSimpleCounter implements Counter { /* thread-safe */ }

176

class MeterView implements Meter, View { /* time-windowed rate calculation */ }

177

```

178

179

[Implementations](./implementations.md)

180

181

### Specialized Metric Groups

182

183

Component-specific metric groups tailored for different parts of the Flink runtime. Provides specialized interfaces for operators, sources, sinks, coordinators, and other Flink components.

184

185

```java { .api }

186

interface OperatorMetricGroup extends MetricGroup {

187

OperatorIOMetricGroup getIOMetricGroup();

188

}

189

190

interface SourceReaderMetricGroup extends OperatorMetricGroup {

191

Counter getNumRecordsInErrorsCounter();

192

void setPendingBytesGauge(Gauge<Long> pendingBytesGauge);

193

void setPendingRecordsGauge(Gauge<Long> pendingRecordsGauge);

194

}

195

interface SinkWriterMetricGroup extends OperatorMetricGroup {

196

Counter getNumRecordsOutErrorsCounter();

197

Counter getNumRecordsSendErrorsCounter();

198

Counter getNumRecordsSendCounter();

199

Counter getNumBytesSendCounter();

200

void setCurrentSendTimeGauge(Gauge<Long> currentSendTimeGauge);

201

}

202

interface SinkCommitterMetricGroup extends OperatorMetricGroup {

203

Counter getNumCommittablesTotalCounter();

204

Counter getNumCommittablesFailureCounter();

205

Counter getNumCommittablesRetryCounter();

206

Counter getNumCommittablesSuccessCounter();

207

Counter getNumCommittablesAlreadyCommittedCounter();

208

void setCurrentPendingCommittablesGauge(Gauge<Integer> currentPendingCommittablesGauge);

209

}

210

211

interface CacheMetricGroup extends MetricGroup {

212

void hitCounter(Counter hitCounter);

213

void missCounter(Counter missCounter);

214

void loadCounter(Counter loadCounter);

215

void numLoadFailuresCounter(Counter numLoadFailuresCounter);

216

void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);

217

void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);

218

void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge);

219

}

220

221

interface SplitEnumeratorMetricGroup extends MetricGroup { }

222

```

223

224

[Specialized Groups](./specialized-groups.md)

225

226

### Configuration and Utilities

227

228

Configuration management and utility classes for metric system setup. Includes type-safe property access, character filtering for metric names, and metric type enumeration.

229

230

```java { .api }

231

class MetricConfig extends Properties {

232

String getString(String key, String defaultValue);

233

int getInteger(String key, int defaultValue);

234

long getLong(String key, long defaultValue);

235

float getFloat(String key, float defaultValue);

236

double getDouble(String key, double defaultValue);

237

boolean getBoolean(String key, boolean defaultValue);

238

}

239

240

interface CharacterFilter {

241

String filterCharacters(String input);

242

}

243

244

enum MetricType { COUNTER, METER, GAUGE, HISTOGRAM }

245

```

246

247

[Configuration](./configuration.md)

248

249

### Tracing Support

250

251

Experimental distributed tracing capabilities for capturing execution spans across Flink's distributed runtime. Supports span creation, attribute attachment, and pluggable trace reporting.

252

253

```java { .api }

254

interface Span {

255

static SpanBuilder builder(Class<?> classScope, String name);

256

String getScope();

257

String getName();

258

long getStartTsMillis();

259

long getEndTsMillis();

260

Map<String, Object> getAttributes();

261

}

262

263

class SpanBuilder {

264

SpanBuilder(Class<?> classScope, String name);

265

Span build();

266

SpanBuilder setStartTsMillis(long startTsMillis);

267

SpanBuilder setEndTsMillis(long endTsMillis);

268

SpanBuilder setAttribute(String key, String value);

269

SpanBuilder setAttribute(String key, long value);

270

SpanBuilder setAttribute(String key, double value);

271

}

272

273

interface TraceReporter {

274

void open(MetricConfig config);

275

void close();

276

void notifyOfAddedSpan(Span span);

277

}

278

```

279

280

[Tracing](./tracing.md)

281

282

## Types

283

284

```java { .api }

285

interface Metric {

286

default MetricType getMetricType();

287

}

288

289

abstract class HistogramStatistics {

290

public abstract double getQuantile(double quantile);

291

public abstract long[] getValues();

292

public abstract int size();

293

public abstract double getMean();

294

public abstract double getStdDev();

295

public abstract long getMax();

296

public abstract long getMin();

297

}

298

299

interface View {

300

int UPDATE_INTERVAL_SECONDS = 5;

301

void update();

302

}

303

304

interface LogicalScopeProvider {

305

String getLogicalScope(CharacterFilter filter);

306

String getLogicalScope(CharacterFilter filter, char delimiter);

307

MetricGroup getWrappedMetricGroup();

308

static LogicalScopeProvider castFrom(MetricGroup metricGroup);

309

}

310

```