CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-runtime-2-10

Apache Flink runtime engine providing core distributed streaming dataflow execution, task scheduling, state management, and fault tolerance capabilities.

Pending
Overview
Eval results
Files

metrics.mddocs/

Metrics System

The Metrics System provides comprehensive infrastructure for collecting, registering, and reporting runtime metrics from Flink applications. This system enables monitoring of job performance, resource usage, throughput, and custom application metrics across the distributed cluster.

Core Components

MetricRegistry

Central registry for Flink metrics that handles metric registration, reporter management, and metric lifecycle.

public class MetricRegistry implements MetricRegistryImpl {
    public static MetricRegistry create(MetricRegistryConfiguration config);
    
    public void register(Metric metric, String metricName, AbstractMetricGroup group);
    public void unregister(Metric metric, String metricName, AbstractMetricGroup group);
    
    public void startReporters(Configuration config);
    public void stopReporters();
    
    public char getDelimiter();
    public int getNumberReporters();
    
    public ScopeFormats getScopeFormats();
    
    @Override
    public void close();
}

MetricRegistryConfiguration

Configuration class for metrics registry setup and behavior customization.

public class MetricRegistryConfiguration {
    public static MetricRegistryConfiguration fromConfiguration(Configuration config);
    public static MetricRegistryConfiguration defaultMetricRegistryConfiguration();
    
    public long getQueryServiceUpdateInterval();
    public int getQueryServicePort();
    public String getQueryServiceBindAddress();
    
    public List<ReporterSetup> getReporterConfigurations();
    public ScopeFormats getScopeFormats();
    
    public char getDelimiter();
    public List<String> getExcludedMetrics();
}

Metric Types

Counter

Metric that tracks a count that can only increase.

public interface Counter extends Metric {
    void inc();
    void inc(long n);
    long getCount();
    
    // Static factory method
    static Counter of(LongCounter longCounter);
}

Gauge

Metric that provides an instantaneous measurement of a value.

public interface Gauge<T> extends Metric {
    T getValue();
    
    // Static factory methods
    static <T> Gauge<T> of(Supplier<T> supplier);
    static Gauge<Double> of(DoubleSupplier supplier);
    static Gauge<Long> of(LongSupplier supplier);
}

Meter

Metric that tracks the rate of events over time.

public interface Meter extends Metric {
    void markEvent();
    void markEvent(long n);
    
    double getRate();
    long getCount();
}

Histogram

Metric that tracks the distribution of values over time.

public interface Histogram extends Metric {
    void update(long value);
    
    long getCount();
    HistogramStatistics getStatistics();
}

public interface HistogramStatistics {
    double getQuantile(double quantile);
    long[] getValues();
    int size();
    double getMean();
    double getStdDev();
    long getMax();
    long getMin();
}

Metric Groups

MetricGroup

Base interface for organizing metrics into hierarchical groups with scoped naming.

public interface MetricGroup {
    Counter counter(String name);
    Counter counter(String name, Counter counter);
    
    <T, C extends Counter> C counter(String name, C counter);
    
    <T> Gauge<T> gauge(String name, Gauge<T> gauge);
    
    Histogram histogram(String name, Histogram histogram);
    
    Meter meter(String name, Meter meter);
    
    MetricGroup addGroup(String name);
    MetricGroup addGroup(String key, String value);
    
    String[] getScopeComponents();
    Map<String, String> getAllVariables();
    String getMetricIdentifier(String metricName);
    String getMetricIdentifier(String metricName, CharacterFilter filter);
}

AbstractMetricGroup

Abstract base implementation providing common metric group functionality.

public abstract class AbstractMetricGroup implements MetricGroup {
    protected AbstractMetricGroup(MetricRegistry registry, String[] scope, AbstractMetricGroup parent);
    
    protected void addMetric(String name, Metric metric);
    protected void removeMetric(String name);
    
    public final MetricGroup addGroup(String name);
    public final MetricGroup addGroup(String key, String value);
    
    protected abstract String getGroupName(String name);
    
    public final String[] getScopeComponents();
    public final Map<String, String> getAllVariables();
    public final String getMetricIdentifier(String metricName);
    public final String getMetricIdentifier(String metricName, CharacterFilter filter);
    
    @Override
    public void close();
}

ComponentMetricGroup

Specialized metric group for cluster components (JobManager, TaskManager, etc.).

public class ComponentMetricGroup extends AbstractMetricGroup {
    public ComponentMetricGroup(MetricRegistry registry, String componentName);
    
    public <J> JobManagerMetricGroup addJobManager(Configuration config, String hostname, String jobManagerId);
    public TaskManagerMetricGroup addTaskManager(Configuration config, String hostname, String taskManagerId);
    
    protected String getGroupName(String name);
}

JobManagerMetricGroup

Metric group for JobManager-specific metrics.

public class JobManagerMetricGroup extends ComponentMetricGroup {
    public JobManagerMetricGroup(MetricRegistry registry, String hostname, String jobManagerId);
    
    public JobMetricGroup addJob(JobGraph jobGraph);
    public JobMetricGroup addJob(JobID jobId, String jobName);
    
    public String hostname();
    public String jobManagerId();
    
    protected String getGroupName(String name);
}

TaskManagerMetricGroup

Metric group for TaskManager-specific metrics.

public class TaskManagerMetricGroup extends ComponentMetricGroup {
    public TaskManagerMetricGroup(MetricRegistry registry, String hostname, String taskManagerId);
    
    public TaskMetricGroup addTaskForJob(JobID jobId, String jobName, JobVertexID jobVertexId, 
                                       ExecutionAttemptID executionAttemptId, String taskName, 
                                       int subtaskIndex, int attemptNumber);
    
    public String hostname();
    public String taskManagerId();
    
    protected String getGroupName(String name);
}

Reporters

MetricReporter

Base interface for metric reporters that output metrics to external monitoring systems.

public interface MetricReporter extends AutoCloseable {
    void open(MetricConfig config);
    
    void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
    void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
    
    @Override
    void close();
}

ScheduledDropwizardReporter

Reporter that integrates with Dropwizard metrics and supports scheduled reporting.

public abstract class ScheduledDropwizardReporter implements MetricReporter, Scheduled {
    protected final com.codahale.metrics.MetricRegistry registry = new com.codahale.metrics.MetricRegistry();
    
    @Override
    public void open(MetricConfig config);
    
    @Override
    public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
    
    @Override
    public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
    
    public abstract void report();
    
    protected com.codahale.metrics.MetricRegistry getRegistry();
    
    @Override
    public void close();
}

Scheduled

Interface for reporters that support scheduled/periodic reporting.

public interface Scheduled {
    void report();
}

Configuration

MetricConfig

Configuration container for metric reporters.

public class MetricConfig {
    public static final String REPORTER_CLASS = "class";
    public static final String REPORTER_INTERVAL = "interval";
    
    public String getString(String key, String defaultValue);
    public int getInteger(String key, int defaultValue);
    public long getLong(String key, long defaultValue);
    public boolean getBoolean(String key, boolean defaultValue);
    
    public Properties getProperties();
    
    public void setString(String key, String value);
    public void setInteger(String key, int value);
    public void setLong(String key, long value);
    public void setBoolean(String key, boolean value);
}

ReporterSetup

Configuration setup for individual metric reporters.

public class ReporterSetup {
    public ReporterSetup(String name, MetricConfig configuration);
    
    public String getName();
    public MetricConfig getConfiguration();
    
    public Optional<String> getClassName();
    public Optional<String> getFactoryClassName();
    public Optional<Long> getIntervalSettings();
}

Scope and Formatting

ScopeFormats

Defines scope format patterns for different component types.

public class ScopeFormats {
    public static final ScopeFormats fromConfig(Configuration config);
    
    public String[] getJobManagerFormat();
    public String[] getTaskManagerFormat(); 
    public String[] getJobFormat();
    public String[] getTaskFormat();
    public String[] getOperatorFormat();
    
    public String getJobManagerScope(Configuration config, String hostname, String jmId);
    public String getTaskManagerScope(Configuration config, String hostname, String tmId);
    
    // Format variables
    public static final String SCOPE_HOST = "<host>";
    public static final String SCOPE_TASKMANAGER_ID = "<tm_id>";
    public static final String SCOPE_JOB_ID = "<job_id>";
    public static final String SCOPE_JOB_NAME = "<job_name>";
    public static final String SCOPE_TASK_VERTEX_ID = "<task_id>";
    public static final String SCOPE_TASK_NAME = "<task_name>";
    public static final String SCOPE_TASK_SUBTASK_INDEX = "<subtask_index>";
    public static final String SCOPE_TASK_ATTEMPT_ID = "<task_attempt_id>";
    public static final String SCOPE_TASK_ATTEMPT_NUM = "<task_attempt_num>";
    public static final String SCOPE_OPERATOR_ID = "<operator_id>";
    public static final String SCOPE_OPERATOR_NAME = "<operator_name>";
}

CharacterFilter

Interface for filtering characters in metric names and identifiers.

public interface CharacterFilter {
    String filterCharacters(String input);
    
    CharacterFilter NO_OP_FILTER = input -> input;
}

Usage Examples

Basic Metrics Registration

import org.apache.flink.metrics.*;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;

// Create metrics registry
MetricRegistryConfiguration config = MetricRegistryConfiguration.defaultMetricRegistryConfiguration();
MetricRegistry metricRegistry = MetricRegistry.create(config);

// Create metric group
ComponentMetricGroup rootGroup = new ComponentMetricGroup(metricRegistry, "MyApplication");
MetricGroup jobGroup = rootGroup.addGroup("job", "data-processing");

// Register different metric types
Counter recordsProcessed = jobGroup.counter("records_processed");
Gauge<Long> memoryUsage = jobGroup.gauge("memory_usage", () -> Runtime.getRuntime().totalMemory());
Meter throughput = jobGroup.meter("throughput", new MeterView(recordsProcessed, 60));
Histogram latency = jobGroup.histogram("latency", new DescriptiveStatisticsHistogram(1000));

// Use metrics in application
for (int i = 0; i < 1000; i++) {
    // Process record
    processRecord();
    
    // Update metrics
    recordsProcessed.inc();
    latency.update(System.currentTimeMillis() - startTime);
    throughput.markEvent();
}

// Clean up
metricRegistry.close();

Custom Metric Reporter

import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.*;

public class CustomMetricReporter implements MetricReporter {
    private final Map<String, Metric> metrics = new ConcurrentHashMap<>();
    
    @Override
    public void open(MetricConfig config) {
        String endpoint = config.getString("endpoint", "localhost:8080");
        String interval = config.getString("interval", "10");
        
        System.out.println("Opening custom reporter with endpoint: " + endpoint);
        
        // Start reporting thread
        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
        scheduler.scheduleAtFixedRate(this::report, 0, Long.parseLong(interval), TimeUnit.SECONDS);
    }
    
    @Override
    public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
        String fullName = group.getMetricIdentifier(metricName);
        metrics.put(fullName, metric);
        System.out.println("Added metric: " + fullName);
    }
    
    @Override
    public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
        String fullName = group.getMetricIdentifier(metricName);
        metrics.remove(fullName);
        System.out.println("Removed metric: " + fullName);
    }
    
    private void report() {
        System.out.println("=== Custom Metric Report ===");
        for (Map.Entry<String, Metric> entry : metrics.entrySet()) {
            String name = entry.getKey();
            Metric metric = entry.getValue();
            
            if (metric instanceof Counter) {
                Counter counter = (Counter) metric;
                System.out.println(name + " (Counter): " + counter.getCount());
            } else if (metric instanceof Gauge) {
                Gauge<?> gauge = (Gauge<?>) metric;
                System.out.println(name + " (Gauge): " + gauge.getValue());
            } else if (metric instanceof Meter) {
                Meter meter = (Meter) metric;
                System.out.println(name + " (Meter): " + meter.getRate() + " events/sec");
            } else if (metric instanceof Histogram) {
                Histogram histogram = (Histogram) metric;
                HistogramStatistics stats = histogram.getStatistics();
                System.out.println(name + " (Histogram): count=" + histogram.getCount() + 
                                 ", mean=" + stats.getMean() + ", max=" + stats.getMax());
            }
        }
    }
    
    @Override
    public void close() {
        System.out.println("Closing custom reporter");
    }
}

Metrics Configuration

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;

// Configure metrics system
Configuration config = new Configuration();

// Enable metrics
config.setString(MetricOptions.REPORTERS_LIST, "prometheus,slf4j");

// Configure Prometheus reporter
config.setString("metrics.reporter.prometheus.class", "org.apache.flink.metrics.prometheus.PrometheusReporter");
config.setInteger("metrics.reporter.prometheus.port", 9249);

// Configure SLF4J reporter
config.setString("metrics.reporter.slf4j.class", "org.apache.flink.metrics.slf4j.Slf4jReporter");
config.setString("metrics.reporter.slf4j.interval", "10 SECONDS");

// Configure scope formats
config.setString(MetricOptions.SCOPE_NAMING_JM, "<host>.jobmanager.<jm_id>");
config.setString(MetricOptions.SCOPE_NAMING_TM, "<host>.taskmanager.<tm_id>");
config.setString(MetricOptions.SCOPE_NAMING_JOB, "<host>.jobmanager.<jm_id>.<job_name>");
config.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>");

// Set metric exclusions
config.setString("metrics.reporter.prometheus.excludes", "*.taskmanager.Status.JVM.CPU.*;*.taskmanager.Status.JVM.Memory.Heap.Max");

// Create registry with configuration
MetricRegistryConfiguration registryConfig = MetricRegistryConfiguration.fromConfiguration(config);
MetricRegistry metricRegistry = MetricRegistry.create(registryConfig);

Task-Level Metrics

import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Gauge;

public class MetricsAwareProcessFunction extends ProcessFunction<String, String> {
    private transient Counter recordsProcessed;
    private transient Counter recordsFiltered;
    private transient Histogram processingLatency;
    private transient Gauge<Long> currentBacklog;
    
    private volatile long backlogSize = 0;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        
        // Get metric group from runtime context
        MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
        
        // Register metrics
        recordsProcessed = metricGroup.counter("records_processed");
        recordsFiltered = metricGroup.counter("records_filtered");
        processingLatency = metricGroup.histogram("processing_latency_ms", 
            new DescriptiveStatisticsHistogram(10000));
        currentBacklog = metricGroup.gauge("current_backlog", () -> backlogSize);
        
        // Register custom metrics with specific names
        metricGroup.gauge("memory_usage_mb", () -> 
            (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024);
    }
    
    @Override
    public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
        long startTime = System.currentTimeMillis();
        
        try {
            // Process the record
            if (shouldFilter(value)) {
                recordsFiltered.inc();
                return;
            }
            
            String processed = processRecord(value);
            out.collect(processed);
            
            recordsProcessed.inc();
            
        } finally {
            // Track processing latency
            long latency = System.currentTimeMillis() - startTime;
            processingLatency.update(latency);
            
            // Update backlog estimate
            backlogSize = estimateBacklogSize();
        }
    }
    
    private boolean shouldFilter(String value) {
        // Filtering logic
        return value.isEmpty();
    }
    
    private String processRecord(String value) {
        // Processing logic
        return value.toUpperCase();
    }
    
    private long estimateBacklogSize() {
        // Estimate current backlog size
        return 0; // Simplified for example
    }
}

Operator Metrics

import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

public class MetricsAwareOperator extends AbstractStreamOperator<String> 
    implements OneInputStreamOperator<String, String> {
    
    private transient Counter inputRecords;
    private transient Counter outputRecords;
    private transient Meter inputRate;
    private transient Histogram processingTime;
    
    @Override
    public void open() throws Exception {
        super.open();
        
        // Get operator metric group
        MetricGroup operatorGroup = getMetricGroup();
        
        // Register operator metrics
        inputRecords = operatorGroup.counter("input_records");
        outputRecords = operatorGroup.counter("output_records");
        inputRate = operatorGroup.meter("input_rate", new MeterView(inputRecords));
        processingTime = operatorGroup.histogram("processing_time_ns",
            new DescriptiveStatisticsHistogram(1000));
        
        // Add operator-specific metrics
        operatorGroup.gauge("operator_busy_time_per_sec", () -> calculateBusyTime());
    }
    
    @Override
    public void processElement(StreamRecord<String> element) throws Exception {
        long startTime = System.nanoTime();
        
        inputRecords.inc();
        inputRate.markEvent();
        
        try {
            // Process element
            String result = element.getValue().toLowerCase();
            
            // Emit result
            output.collect(new StreamRecord<>(result, element.getTimestamp()));
            outputRecords.inc();
            
        } finally {
            // Track processing time
            long duration = System.nanoTime() - startTime;
            processingTime.update(duration);
        }
    }
    
    private double calculateBusyTime() {
        // Calculate operator busy time percentage
        return 0.0; // Simplified for example
    }
}

Common Patterns

Metric Lifecycle Management

public class MetricsManager implements AutoCloseable {
    private final MetricRegistry registry;
    private final Map<String, MetricGroup> groups = new ConcurrentHashMap<>();
    
    public MetricsManager(Configuration config) {
        MetricRegistryConfiguration registryConfig = MetricRegistryConfiguration.fromConfiguration(config);
        this.registry = MetricRegistry.create(registryConfig);
        registry.startReporters(config);
    }
    
    public MetricGroup createGroup(String... path) {
        String groupKey = String.join(".", path);
        return groups.computeIfAbsent(groupKey, k -> {
            ComponentMetricGroup root = new ComponentMetricGroup(registry, path[0]);
            MetricGroup current = root;
            for (int i = 1; i < path.length; i++) {
                current = current.addGroup(path[i]);
            }
            return current;
        });
    }
    
    public void removeGroup(String... path) {
        String groupKey = String.join(".", path);
        MetricGroup group = groups.remove(groupKey);
        if (group != null) {
            group.close();
        }
    }
    
    @Override
    public void close() {
        groups.values().forEach(MetricGroup::close);
        groups.clear();
        
        registry.stopReporters();
        registry.close();
    }
}

Conditional Metric Registration

public class ConditionalMetrics {
    private final MetricGroup metricGroup;
    private final boolean metricsEnabled;
    
    public ConditionalMetrics(MetricGroup metricGroup, Configuration config) {
        this.metricGroup = metricGroup;
        this.metricsEnabled = config.getBoolean("metrics.enabled", true);
    }
    
    public Counter createCounter(String name) {
        if (metricsEnabled) {
            return metricGroup.counter(name);
        } else {
            return new NoOpCounter();
        }
    }
    
    public <T> Gauge<T> createGauge(String name, Supplier<T> valueSupplier) {
        if (metricsEnabled) {
            return metricGroup.gauge(name, Gauge.of(valueSupplier));
        } else {
            return new NoOpGauge<>();
        }
    }
    
    // No-op implementations for when metrics are disabled
    private static class NoOpCounter implements Counter {
        @Override public void inc() {}
        @Override public void inc(long n) {}
        @Override public long getCount() { return 0; }
    }
    
    private static class NoOpGauge<T> implements Gauge<T> {
        @Override public T getValue() { return null; }
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-runtime-2-10

docs

data-exchange.md

execution-graph.md

high-availability.md

index.md

job-management.md

message-passing.md

metrics.md

mini-cluster.md

rpc-framework.md

state-management.md

task-execution.md

tile.json