or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdcore-metrics.mdimplementations.mdindex.mdmetric-groups.mdreporters.mdspecialized-groups.mdtracing.md
tile.json

tessl/maven-org-apache-flink--flink-metrics-core

Core metrics interfaces and implementations for Apache Flink stream processing framework

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-metrics-core@1.20.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-metrics-core@1.20.0

index.mddocs/

Flink Metrics Core

Core metrics interfaces and implementations for Apache Flink stream processing framework. This library provides the foundational metrics system that enables monitoring and observability within Flink applications, supporting essential metric types, hierarchical organization, and pluggable reporting to external monitoring systems.

Package Information

  • Package Name: flink-metrics-core
  • Package Type: maven
  • Language: Java
  • Group ID: org.apache.flink
  • Artifact ID: flink-metrics-core
  • Installation: Include in your Maven project:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-metrics-core</artifactId>
    <version>1.20.2</version>
</dependency>

Core Imports

import org.apache.flink.metrics.*;
import org.apache.flink.metrics.groups.*;
import org.apache.flink.metrics.reporter.*;

For specific components:

import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MetricGroup;

Basic Usage

import org.apache.flink.metrics.*;

// Working with counters
Counter eventCounter = metricGroup.counter("events");
eventCounter.inc();
eventCounter.inc(5);

// Working with gauges
Gauge<Integer> queueSizeGauge = new Gauge<Integer>() {
    @Override
    public Integer getValue() {
        return queue.size();
    }
};
metricGroup.gauge("queueSize", queueSizeGauge);

// Working with meters
Meter throughputMeter = new MeterView(60); // 60 second time window
metricGroup.meter("throughput", throughputMeter);
throughputMeter.markEvent();

// Creating metric hierarchies
MetricGroup operatorGroup = rootGroup.addGroup("operators");
MetricGroup specificOpGroup = operatorGroup.addGroup("map-operator-1");
Counter opCounter = specificOpGroup.counter("records-processed");

Architecture

Flink Metrics Core follows a hierarchical design built around several key components:

  • Core Metric Types: Counter, Gauge, Meter, and Histogram provide the fundamental metric abstractions
  • Metric Groups: MetricGroup creates hierarchical namespaces for organizing metrics logically
  • Reporter Framework: MetricReporter and MetricReporterFactory enable pluggable export to external systems
  • Specialized Groups: Component-specific metric groups for operators, sources, sinks, and coordinators
  • Configuration System: MetricConfig provides type-safe configuration management
  • Tracing Support: Experimental Span and TraceReporter for distributed tracing capabilities

This design enables flexible metric collection across Flink's distributed runtime while maintaining performance and providing extensibility for custom metric types and reporting backends.

Capabilities

Core Metric Types

Essential metric interfaces for measuring different aspects of system behavior. Includes counters for event counting, gauges for instantaneous values, meters for rate measurement, and histograms for distribution statistics.

interface Counter extends Metric {
    void inc();
    void inc(long n);
    void dec();
    void dec(long n);
    long getCount();
}

interface Gauge<T> extends Metric {
    T getValue();
}

interface Meter extends Metric {
    void markEvent();
    void markEvent(long n);
    double getRate();
    long getCount();
}

interface Histogram extends Metric {
    void update(long value);
    long getCount();
    HistogramStatistics getStatistics();
}

Core Metrics

Metric Organization

Hierarchical metric organization system for creating logical namespaces and managing metric lifecycles. Supports both flat and nested structures with variable interpolation and scoping.

interface MetricGroup {
    Counter counter(int name);
    Counter counter(String name);
    <C extends Counter> C counter(int name, C counter);
    <C extends Counter> C counter(String name, C counter);
    <T, G extends Gauge<T>> G gauge(int name, G gauge);
    <T, G extends Gauge<T>> G gauge(String name, G gauge);
    <H extends Histogram> H histogram(int name, H histogram);
    <H extends Histogram> H histogram(String name, H histogram);
    <M extends Meter> M meter(int name, M meter);
    <M extends Meter> M meter(String name, M meter);
    MetricGroup addGroup(int name);
    MetricGroup addGroup(String name);
    MetricGroup addGroup(String key, String value);
    String[] getScopeComponents();
    Map<String, String> getAllVariables();
    String getMetricIdentifier(String metricName);
    String getMetricIdentifier(String metricName, CharacterFilter filter);
    void addSpan(SpanBuilder spanBuilder); // @Experimental
}

Metric Groups

Reporter Framework

Pluggable system for exporting metrics to external monitoring systems. Supports both push and pull patterns with configurable scheduling and lifecycle management.

interface MetricReporter {
    void open(MetricConfig config);
    void close();
    void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
    void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
}

interface MetricReporterFactory {
    MetricReporter createMetricReporter(Properties properties);
}

interface Scheduled {
    void report();
}

Reporters

Metric Implementations

Concrete implementations of metric interfaces providing both thread-safe and performance-optimized variants. Includes simple counters, meter views with time windowing, and utility implementations.

class SimpleCounter implements Counter { /* non-thread-safe */ }
class ThreadSafeSimpleCounter implements Counter { /* thread-safe */ }
class MeterView implements Meter, View { /* time-windowed rate calculation */ }

Implementations

Specialized Metric Groups

Component-specific metric groups tailored for different parts of the Flink runtime. Provides specialized interfaces for operators, sources, sinks, coordinators, and other Flink components.

interface OperatorMetricGroup extends MetricGroup {
    OperatorIOMetricGroup getIOMetricGroup();
}

interface SourceReaderMetricGroup extends OperatorMetricGroup {
    Counter getNumRecordsInErrorsCounter();
    void setPendingBytesGauge(Gauge<Long> pendingBytesGauge);
    void setPendingRecordsGauge(Gauge<Long> pendingRecordsGauge);
}
interface SinkWriterMetricGroup extends OperatorMetricGroup {
    Counter getNumRecordsOutErrorsCounter();
    Counter getNumRecordsSendErrorsCounter();
    Counter getNumRecordsSendCounter();
    Counter getNumBytesSendCounter();
    void setCurrentSendTimeGauge(Gauge<Long> currentSendTimeGauge);
}
interface SinkCommitterMetricGroup extends OperatorMetricGroup {
    Counter getNumCommittablesTotalCounter();
    Counter getNumCommittablesFailureCounter();
    Counter getNumCommittablesRetryCounter();
    Counter getNumCommittablesSuccessCounter();
    Counter getNumCommittablesAlreadyCommittedCounter();
    void setCurrentPendingCommittablesGauge(Gauge<Integer> currentPendingCommittablesGauge);
}

interface CacheMetricGroup extends MetricGroup {
    void hitCounter(Counter hitCounter);
    void missCounter(Counter missCounter);
    void loadCounter(Counter loadCounter);
    void numLoadFailuresCounter(Counter numLoadFailuresCounter);
    void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);
    void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);
    void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
}

interface SplitEnumeratorMetricGroup extends MetricGroup { }

Specialized Groups

Configuration and Utilities

Configuration management and utility classes for metric system setup. Includes type-safe property access, character filtering for metric names, and metric type enumeration.

class MetricConfig extends Properties {
    String getString(String key, String defaultValue);
    int getInteger(String key, int defaultValue);
    long getLong(String key, long defaultValue);
    float getFloat(String key, float defaultValue);
    double getDouble(String key, double defaultValue);
    boolean getBoolean(String key, boolean defaultValue);
}

interface CharacterFilter {
    String filterCharacters(String input);
}

enum MetricType { COUNTER, METER, GAUGE, HISTOGRAM }

Configuration

Tracing Support

Experimental distributed tracing capabilities for capturing execution spans across Flink's distributed runtime. Supports span creation, attribute attachment, and pluggable trace reporting.

interface Span {
    static SpanBuilder builder(Class<?> classScope, String name);
    String getScope();
    String getName();
    long getStartTsMillis();
    long getEndTsMillis();
    Map<String, Object> getAttributes();
}

class SpanBuilder {
    SpanBuilder(Class<?> classScope, String name);
    Span build();
    SpanBuilder setStartTsMillis(long startTsMillis);
    SpanBuilder setEndTsMillis(long endTsMillis);
    SpanBuilder setAttribute(String key, String value);
    SpanBuilder setAttribute(String key, long value);
    SpanBuilder setAttribute(String key, double value);
}

interface TraceReporter {
    void open(MetricConfig config);
    void close();
    void notifyOfAddedSpan(Span span);
}

Tracing

Types

interface Metric {
    default MetricType getMetricType();
}

abstract class HistogramStatistics {
    public abstract double getQuantile(double quantile);
    public abstract long[] getValues();
    public abstract int size();
    public abstract double getMean();
    public abstract double getStdDev();
    public abstract long getMax();
    public abstract long getMin();
}

interface View {
    int UPDATE_INTERVAL_SECONDS = 5;
    void update();
}

interface LogicalScopeProvider {
    String getLogicalScope(CharacterFilter filter);
    String getLogicalScope(CharacterFilter filter, char delimiter);
    MetricGroup getWrappedMetricGroup();
    static LogicalScopeProvider castFrom(MetricGroup metricGroup);
}