Core metrics interfaces and implementations for Apache Flink stream processing framework
npx @tessl/cli install tessl/maven-org-apache-flink--flink-metrics-core@1.20.0Core metrics interfaces and implementations for Apache Flink stream processing framework. This library provides the foundational metrics system that enables monitoring and observability within Flink applications, supporting essential metric types, hierarchical organization, and pluggable reporting to external monitoring systems.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-core</artifactId>
<version>1.20.2</version>
</dependency>import org.apache.flink.metrics.*;
import org.apache.flink.metrics.groups.*;
import org.apache.flink.metrics.reporter.*;For specific components:
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MetricGroup;import org.apache.flink.metrics.*;
// Working with counters
Counter eventCounter = metricGroup.counter("events");
eventCounter.inc();
eventCounter.inc(5);
// Working with gauges
Gauge<Integer> queueSizeGauge = new Gauge<Integer>() {
@Override
public Integer getValue() {
return queue.size();
}
};
metricGroup.gauge("queueSize", queueSizeGauge);
// Working with meters
Meter throughputMeter = new MeterView(60); // 60 second time window
metricGroup.meter("throughput", throughputMeter);
throughputMeter.markEvent();
// Creating metric hierarchies
MetricGroup operatorGroup = rootGroup.addGroup("operators");
MetricGroup specificOpGroup = operatorGroup.addGroup("map-operator-1");
Counter opCounter = specificOpGroup.counter("records-processed");Flink Metrics Core follows a hierarchical design built around several key components:
Counter, Gauge, Meter, and Histogram provide the fundamental metric abstractionsMetricGroup creates hierarchical namespaces for organizing metrics logicallyMetricReporter and MetricReporterFactory enable pluggable export to external systemsMetricConfig provides type-safe configuration managementSpan and TraceReporter for distributed tracing capabilitiesThis design enables flexible metric collection across Flink's distributed runtime while maintaining performance and providing extensibility for custom metric types and reporting backends.
Essential metric interfaces for measuring different aspects of system behavior. Includes counters for event counting, gauges for instantaneous values, meters for rate measurement, and histograms for distribution statistics.
interface Counter extends Metric {
void inc();
void inc(long n);
void dec();
void dec(long n);
long getCount();
}
interface Gauge<T> extends Metric {
T getValue();
}
interface Meter extends Metric {
void markEvent();
void markEvent(long n);
double getRate();
long getCount();
}
interface Histogram extends Metric {
void update(long value);
long getCount();
HistogramStatistics getStatistics();
}Hierarchical metric organization system for creating logical namespaces and managing metric lifecycles. Supports both flat and nested structures with variable interpolation and scoping.
interface MetricGroup {
Counter counter(int name);
Counter counter(String name);
<C extends Counter> C counter(int name, C counter);
<C extends Counter> C counter(String name, C counter);
<T, G extends Gauge<T>> G gauge(int name, G gauge);
<T, G extends Gauge<T>> G gauge(String name, G gauge);
<H extends Histogram> H histogram(int name, H histogram);
<H extends Histogram> H histogram(String name, H histogram);
<M extends Meter> M meter(int name, M meter);
<M extends Meter> M meter(String name, M meter);
MetricGroup addGroup(int name);
MetricGroup addGroup(String name);
MetricGroup addGroup(String key, String value);
String[] getScopeComponents();
Map<String, String> getAllVariables();
String getMetricIdentifier(String metricName);
String getMetricIdentifier(String metricName, CharacterFilter filter);
void addSpan(SpanBuilder spanBuilder); // @Experimental
}Pluggable system for exporting metrics to external monitoring systems. Supports both push and pull patterns with configurable scheduling and lifecycle management.
interface MetricReporter {
void open(MetricConfig config);
void close();
void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
}
interface MetricReporterFactory {
MetricReporter createMetricReporter(Properties properties);
}
interface Scheduled {
void report();
}Concrete implementations of metric interfaces providing both thread-safe and performance-optimized variants. Includes simple counters, meter views with time windowing, and utility implementations.
class SimpleCounter implements Counter { /* non-thread-safe */ }
class ThreadSafeSimpleCounter implements Counter { /* thread-safe */ }
class MeterView implements Meter, View { /* time-windowed rate calculation */ }Component-specific metric groups tailored for different parts of the Flink runtime. Provides specialized interfaces for operators, sources, sinks, coordinators, and other Flink components.
interface OperatorMetricGroup extends MetricGroup {
OperatorIOMetricGroup getIOMetricGroup();
}
interface SourceReaderMetricGroup extends OperatorMetricGroup {
Counter getNumRecordsInErrorsCounter();
void setPendingBytesGauge(Gauge<Long> pendingBytesGauge);
void setPendingRecordsGauge(Gauge<Long> pendingRecordsGauge);
}
interface SinkWriterMetricGroup extends OperatorMetricGroup {
Counter getNumRecordsOutErrorsCounter();
Counter getNumRecordsSendErrorsCounter();
Counter getNumRecordsSendCounter();
Counter getNumBytesSendCounter();
void setCurrentSendTimeGauge(Gauge<Long> currentSendTimeGauge);
}
interface SinkCommitterMetricGroup extends OperatorMetricGroup {
Counter getNumCommittablesTotalCounter();
Counter getNumCommittablesFailureCounter();
Counter getNumCommittablesRetryCounter();
Counter getNumCommittablesSuccessCounter();
Counter getNumCommittablesAlreadyCommittedCounter();
void setCurrentPendingCommittablesGauge(Gauge<Integer> currentPendingCommittablesGauge);
}
interface CacheMetricGroup extends MetricGroup {
void hitCounter(Counter hitCounter);
void missCounter(Counter missCounter);
void loadCounter(Counter loadCounter);
void numLoadFailuresCounter(Counter numLoadFailuresCounter);
void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);
void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);
void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
}
interface SplitEnumeratorMetricGroup extends MetricGroup { }Configuration management and utility classes for metric system setup. Includes type-safe property access, character filtering for metric names, and metric type enumeration.
class MetricConfig extends Properties {
String getString(String key, String defaultValue);
int getInteger(String key, int defaultValue);
long getLong(String key, long defaultValue);
float getFloat(String key, float defaultValue);
double getDouble(String key, double defaultValue);
boolean getBoolean(String key, boolean defaultValue);
}
interface CharacterFilter {
String filterCharacters(String input);
}
enum MetricType { COUNTER, METER, GAUGE, HISTOGRAM }Experimental distributed tracing capabilities for capturing execution spans across Flink's distributed runtime. Supports span creation, attribute attachment, and pluggable trace reporting.
interface Span {
static SpanBuilder builder(Class<?> classScope, String name);
String getScope();
String getName();
long getStartTsMillis();
long getEndTsMillis();
Map<String, Object> getAttributes();
}
class SpanBuilder {
SpanBuilder(Class<?> classScope, String name);
Span build();
SpanBuilder setStartTsMillis(long startTsMillis);
SpanBuilder setEndTsMillis(long endTsMillis);
SpanBuilder setAttribute(String key, String value);
SpanBuilder setAttribute(String key, long value);
SpanBuilder setAttribute(String key, double value);
}
interface TraceReporter {
void open(MetricConfig config);
void close();
void notifyOfAddedSpan(Span span);
}interface Metric {
default MetricType getMetricType();
}
abstract class HistogramStatistics {
public abstract double getQuantile(double quantile);
public abstract long[] getValues();
public abstract int size();
public abstract double getMean();
public abstract double getStdDev();
public abstract long getMax();
public abstract long getMin();
}
interface View {
int UPDATE_INTERVAL_SECONDS = 5;
void update();
}
interface LogicalScopeProvider {
String getLogicalScope(CharacterFilter filter);
String getLogicalScope(CharacterFilter filter, char delimiter);
MetricGroup getWrappedMetricGroup();
static LogicalScopeProvider castFrom(MetricGroup metricGroup);
}