Apache Flink runtime engine providing core distributed streaming dataflow execution, task scheduling, state management, and fault tolerance capabilities.
—
The Metrics System provides comprehensive infrastructure for collecting, registering, and reporting runtime metrics from Flink applications. This system enables monitoring of job performance, resource usage, throughput, and custom application metrics across the distributed cluster.
Central registry for Flink metrics that handles metric registration, reporter management, and metric lifecycle.
public class MetricRegistry implements MetricRegistryImpl {
public static MetricRegistry create(MetricRegistryConfiguration config);
public void register(Metric metric, String metricName, AbstractMetricGroup group);
public void unregister(Metric metric, String metricName, AbstractMetricGroup group);
public void startReporters(Configuration config);
public void stopReporters();
public char getDelimiter();
public int getNumberReporters();
public ScopeFormats getScopeFormats();
@Override
public void close();
}Configuration class for metrics registry setup and behavior customization.
public class MetricRegistryConfiguration {
public static MetricRegistryConfiguration fromConfiguration(Configuration config);
public static MetricRegistryConfiguration defaultMetricRegistryConfiguration();
public long getQueryServiceUpdateInterval();
public int getQueryServicePort();
public String getQueryServiceBindAddress();
public List<ReporterSetup> getReporterConfigurations();
public ScopeFormats getScopeFormats();
public char getDelimiter();
public List<String> getExcludedMetrics();
}Metric that tracks a count that can only increase.
public interface Counter extends Metric {
void inc();
void inc(long n);
long getCount();
// Static factory method
static Counter of(LongCounter longCounter);
}Metric that provides an instantaneous measurement of a value.
public interface Gauge<T> extends Metric {
T getValue();
// Static factory methods
static <T> Gauge<T> of(Supplier<T> supplier);
static Gauge<Double> of(DoubleSupplier supplier);
static Gauge<Long> of(LongSupplier supplier);
}Metric that tracks the rate of events over time.
public interface Meter extends Metric {
void markEvent();
void markEvent(long n);
double getRate();
long getCount();
}Metric that tracks the distribution of values over time.
public interface Histogram extends Metric {
void update(long value);
long getCount();
HistogramStatistics getStatistics();
}
public interface HistogramStatistics {
double getQuantile(double quantile);
long[] getValues();
int size();
double getMean();
double getStdDev();
long getMax();
long getMin();
}Base interface for organizing metrics into hierarchical groups with scoped naming.
public interface MetricGroup {
Counter counter(String name);
Counter counter(String name, Counter counter);
<T, C extends Counter> C counter(String name, C counter);
<T> Gauge<T> gauge(String name, Gauge<T> gauge);
Histogram histogram(String name, Histogram histogram);
Meter meter(String name, Meter meter);
MetricGroup addGroup(String name);
MetricGroup addGroup(String key, String value);
String[] getScopeComponents();
Map<String, String> getAllVariables();
String getMetricIdentifier(String metricName);
String getMetricIdentifier(String metricName, CharacterFilter filter);
}Abstract base implementation providing common metric group functionality.
public abstract class AbstractMetricGroup implements MetricGroup {
protected AbstractMetricGroup(MetricRegistry registry, String[] scope, AbstractMetricGroup parent);
protected void addMetric(String name, Metric metric);
protected void removeMetric(String name);
public final MetricGroup addGroup(String name);
public final MetricGroup addGroup(String key, String value);
protected abstract String getGroupName(String name);
public final String[] getScopeComponents();
public final Map<String, String> getAllVariables();
public final String getMetricIdentifier(String metricName);
public final String getMetricIdentifier(String metricName, CharacterFilter filter);
@Override
public void close();
}Specialized metric group for cluster components (JobManager, TaskManager, etc.).
public class ComponentMetricGroup extends AbstractMetricGroup {
public ComponentMetricGroup(MetricRegistry registry, String componentName);
public <J> JobManagerMetricGroup addJobManager(Configuration config, String hostname, String jobManagerId);
public TaskManagerMetricGroup addTaskManager(Configuration config, String hostname, String taskManagerId);
protected String getGroupName(String name);
}Metric group for JobManager-specific metrics.
public class JobManagerMetricGroup extends ComponentMetricGroup {
public JobManagerMetricGroup(MetricRegistry registry, String hostname, String jobManagerId);
public JobMetricGroup addJob(JobGraph jobGraph);
public JobMetricGroup addJob(JobID jobId, String jobName);
public String hostname();
public String jobManagerId();
protected String getGroupName(String name);
}Metric group for TaskManager-specific metrics.
public class TaskManagerMetricGroup extends ComponentMetricGroup {
public TaskManagerMetricGroup(MetricRegistry registry, String hostname, String taskManagerId);
public TaskMetricGroup addTaskForJob(JobID jobId, String jobName, JobVertexID jobVertexId,
ExecutionAttemptID executionAttemptId, String taskName,
int subtaskIndex, int attemptNumber);
public String hostname();
public String taskManagerId();
protected String getGroupName(String name);
}Base interface for metric reporters that output metrics to external monitoring systems.
public interface MetricReporter extends AutoCloseable {
void open(MetricConfig config);
void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
@Override
void close();
}Reporter that integrates with Dropwizard metrics and supports scheduled reporting.
public abstract class ScheduledDropwizardReporter implements MetricReporter, Scheduled {
protected final com.codahale.metrics.MetricRegistry registry = new com.codahale.metrics.MetricRegistry();
@Override
public void open(MetricConfig config);
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
@Override
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
public abstract void report();
protected com.codahale.metrics.MetricRegistry getRegistry();
@Override
public void close();
}Interface for reporters that support scheduled/periodic reporting.
public interface Scheduled {
void report();
}Configuration container for metric reporters.
public class MetricConfig {
public static final String REPORTER_CLASS = "class";
public static final String REPORTER_INTERVAL = "interval";
public String getString(String key, String defaultValue);
public int getInteger(String key, int defaultValue);
public long getLong(String key, long defaultValue);
public boolean getBoolean(String key, boolean defaultValue);
public Properties getProperties();
public void setString(String key, String value);
public void setInteger(String key, int value);
public void setLong(String key, long value);
public void setBoolean(String key, boolean value);
}Configuration setup for individual metric reporters.
public class ReporterSetup {
public ReporterSetup(String name, MetricConfig configuration);
public String getName();
public MetricConfig getConfiguration();
public Optional<String> getClassName();
public Optional<String> getFactoryClassName();
public Optional<Long> getIntervalSettings();
}Defines scope format patterns for different component types.
public class ScopeFormats {
public static final ScopeFormats fromConfig(Configuration config);
public String[] getJobManagerFormat();
public String[] getTaskManagerFormat();
public String[] getJobFormat();
public String[] getTaskFormat();
public String[] getOperatorFormat();
public String getJobManagerScope(Configuration config, String hostname, String jmId);
public String getTaskManagerScope(Configuration config, String hostname, String tmId);
// Format variables
public static final String SCOPE_HOST = "<host>";
public static final String SCOPE_TASKMANAGER_ID = "<tm_id>";
public static final String SCOPE_JOB_ID = "<job_id>";
public static final String SCOPE_JOB_NAME = "<job_name>";
public static final String SCOPE_TASK_VERTEX_ID = "<task_id>";
public static final String SCOPE_TASK_NAME = "<task_name>";
public static final String SCOPE_TASK_SUBTASK_INDEX = "<subtask_index>";
public static final String SCOPE_TASK_ATTEMPT_ID = "<task_attempt_id>";
public static final String SCOPE_TASK_ATTEMPT_NUM = "<task_attempt_num>";
public static final String SCOPE_OPERATOR_ID = "<operator_id>";
public static final String SCOPE_OPERATOR_NAME = "<operator_name>";
}Interface for filtering characters in metric names and identifiers.
public interface CharacterFilter {
String filterCharacters(String input);
CharacterFilter NO_OP_FILTER = input -> input;
}import org.apache.flink.metrics.*;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
// Create metrics registry
MetricRegistryConfiguration config = MetricRegistryConfiguration.defaultMetricRegistryConfiguration();
MetricRegistry metricRegistry = MetricRegistry.create(config);
// Create metric group
ComponentMetricGroup rootGroup = new ComponentMetricGroup(metricRegistry, "MyApplication");
MetricGroup jobGroup = rootGroup.addGroup("job", "data-processing");
// Register different metric types
Counter recordsProcessed = jobGroup.counter("records_processed");
Gauge<Long> memoryUsage = jobGroup.gauge("memory_usage", () -> Runtime.getRuntime().totalMemory());
Meter throughput = jobGroup.meter("throughput", new MeterView(recordsProcessed, 60));
Histogram latency = jobGroup.histogram("latency", new DescriptiveStatisticsHistogram(1000));
// Use metrics in application
for (int i = 0; i < 1000; i++) {
// Process record
processRecord();
// Update metrics
recordsProcessed.inc();
latency.update(System.currentTimeMillis() - startTime);
throughput.markEvent();
}
// Clean up
metricRegistry.close();import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.*;
public class CustomMetricReporter implements MetricReporter {
private final Map<String, Metric> metrics = new ConcurrentHashMap<>();
@Override
public void open(MetricConfig config) {
String endpoint = config.getString("endpoint", "localhost:8080");
String interval = config.getString("interval", "10");
System.out.println("Opening custom reporter with endpoint: " + endpoint);
// Start reporting thread
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(this::report, 0, Long.parseLong(interval), TimeUnit.SECONDS);
}
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
String fullName = group.getMetricIdentifier(metricName);
metrics.put(fullName, metric);
System.out.println("Added metric: " + fullName);
}
@Override
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
String fullName = group.getMetricIdentifier(metricName);
metrics.remove(fullName);
System.out.println("Removed metric: " + fullName);
}
private void report() {
System.out.println("=== Custom Metric Report ===");
for (Map.Entry<String, Metric> entry : metrics.entrySet()) {
String name = entry.getKey();
Metric metric = entry.getValue();
if (metric instanceof Counter) {
Counter counter = (Counter) metric;
System.out.println(name + " (Counter): " + counter.getCount());
} else if (metric instanceof Gauge) {
Gauge<?> gauge = (Gauge<?>) metric;
System.out.println(name + " (Gauge): " + gauge.getValue());
} else if (metric instanceof Meter) {
Meter meter = (Meter) metric;
System.out.println(name + " (Meter): " + meter.getRate() + " events/sec");
} else if (metric instanceof Histogram) {
Histogram histogram = (Histogram) metric;
HistogramStatistics stats = histogram.getStatistics();
System.out.println(name + " (Histogram): count=" + histogram.getCount() +
", mean=" + stats.getMean() + ", max=" + stats.getMax());
}
}
}
@Override
public void close() {
System.out.println("Closing custom reporter");
}
}import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
// Configure metrics system
Configuration config = new Configuration();
// Enable metrics
config.setString(MetricOptions.REPORTERS_LIST, "prometheus,slf4j");
// Configure Prometheus reporter
config.setString("metrics.reporter.prometheus.class", "org.apache.flink.metrics.prometheus.PrometheusReporter");
config.setInteger("metrics.reporter.prometheus.port", 9249);
// Configure SLF4J reporter
config.setString("metrics.reporter.slf4j.class", "org.apache.flink.metrics.slf4j.Slf4jReporter");
config.setString("metrics.reporter.slf4j.interval", "10 SECONDS");
// Configure scope formats
config.setString(MetricOptions.SCOPE_NAMING_JM, "<host>.jobmanager.<jm_id>");
config.setString(MetricOptions.SCOPE_NAMING_TM, "<host>.taskmanager.<tm_id>");
config.setString(MetricOptions.SCOPE_NAMING_JOB, "<host>.jobmanager.<jm_id>.<job_name>");
config.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>");
// Set metric exclusions
config.setString("metrics.reporter.prometheus.excludes", "*.taskmanager.Status.JVM.CPU.*;*.taskmanager.Status.JVM.Memory.Heap.Max");
// Create registry with configuration
MetricRegistryConfiguration registryConfig = MetricRegistryConfiguration.fromConfiguration(config);
MetricRegistry metricRegistry = MetricRegistry.create(registryConfig);import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Gauge;
public class MetricsAwareProcessFunction extends ProcessFunction<String, String> {
private transient Counter recordsProcessed;
private transient Counter recordsFiltered;
private transient Histogram processingLatency;
private transient Gauge<Long> currentBacklog;
private volatile long backlogSize = 0;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Get metric group from runtime context
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
// Register metrics
recordsProcessed = metricGroup.counter("records_processed");
recordsFiltered = metricGroup.counter("records_filtered");
processingLatency = metricGroup.histogram("processing_latency_ms",
new DescriptiveStatisticsHistogram(10000));
currentBacklog = metricGroup.gauge("current_backlog", () -> backlogSize);
// Register custom metrics with specific names
metricGroup.gauge("memory_usage_mb", () ->
(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024);
}
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
long startTime = System.currentTimeMillis();
try {
// Process the record
if (shouldFilter(value)) {
recordsFiltered.inc();
return;
}
String processed = processRecord(value);
out.collect(processed);
recordsProcessed.inc();
} finally {
// Track processing latency
long latency = System.currentTimeMillis() - startTime;
processingLatency.update(latency);
// Update backlog estimate
backlogSize = estimateBacklogSize();
}
}
private boolean shouldFilter(String value) {
// Filtering logic
return value.isEmpty();
}
private String processRecord(String value) {
// Processing logic
return value.toUpperCase();
}
private long estimateBacklogSize() {
// Estimate current backlog size
return 0; // Simplified for example
}
}import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
public class MetricsAwareOperator extends AbstractStreamOperator<String>
implements OneInputStreamOperator<String, String> {
private transient Counter inputRecords;
private transient Counter outputRecords;
private transient Meter inputRate;
private transient Histogram processingTime;
@Override
public void open() throws Exception {
super.open();
// Get operator metric group
MetricGroup operatorGroup = getMetricGroup();
// Register operator metrics
inputRecords = operatorGroup.counter("input_records");
outputRecords = operatorGroup.counter("output_records");
inputRate = operatorGroup.meter("input_rate", new MeterView(inputRecords));
processingTime = operatorGroup.histogram("processing_time_ns",
new DescriptiveStatisticsHistogram(1000));
// Add operator-specific metrics
operatorGroup.gauge("operator_busy_time_per_sec", () -> calculateBusyTime());
}
@Override
public void processElement(StreamRecord<String> element) throws Exception {
long startTime = System.nanoTime();
inputRecords.inc();
inputRate.markEvent();
try {
// Process element
String result = element.getValue().toLowerCase();
// Emit result
output.collect(new StreamRecord<>(result, element.getTimestamp()));
outputRecords.inc();
} finally {
// Track processing time
long duration = System.nanoTime() - startTime;
processingTime.update(duration);
}
}
private double calculateBusyTime() {
// Calculate operator busy time percentage
return 0.0; // Simplified for example
}
}public class MetricsManager implements AutoCloseable {
private final MetricRegistry registry;
private final Map<String, MetricGroup> groups = new ConcurrentHashMap<>();
public MetricsManager(Configuration config) {
MetricRegistryConfiguration registryConfig = MetricRegistryConfiguration.fromConfiguration(config);
this.registry = MetricRegistry.create(registryConfig);
registry.startReporters(config);
}
public MetricGroup createGroup(String... path) {
String groupKey = String.join(".", path);
return groups.computeIfAbsent(groupKey, k -> {
ComponentMetricGroup root = new ComponentMetricGroup(registry, path[0]);
MetricGroup current = root;
for (int i = 1; i < path.length; i++) {
current = current.addGroup(path[i]);
}
return current;
});
}
public void removeGroup(String... path) {
String groupKey = String.join(".", path);
MetricGroup group = groups.remove(groupKey);
if (group != null) {
group.close();
}
}
@Override
public void close() {
groups.values().forEach(MetricGroup::close);
groups.clear();
registry.stopReporters();
registry.close();
}
}public class ConditionalMetrics {
private final MetricGroup metricGroup;
private final boolean metricsEnabled;
public ConditionalMetrics(MetricGroup metricGroup, Configuration config) {
this.metricGroup = metricGroup;
this.metricsEnabled = config.getBoolean("metrics.enabled", true);
}
public Counter createCounter(String name) {
if (metricsEnabled) {
return metricGroup.counter(name);
} else {
return new NoOpCounter();
}
}
public <T> Gauge<T> createGauge(String name, Supplier<T> valueSupplier) {
if (metricsEnabled) {
return metricGroup.gauge(name, Gauge.of(valueSupplier));
} else {
return new NoOpGauge<>();
}
}
// No-op implementations for when metrics are disabled
private static class NoOpCounter implements Counter {
@Override public void inc() {}
@Override public void inc(long n) {}
@Override public long getCount() { return 0; }
}
private static class NoOpGauge<T> implements Gauge<T> {
@Override public T getValue() { return null; }
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-runtime-2-10