CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-metrics-core

Core metrics interfaces and implementations for Apache Flink stream processing framework

Pending
Overview
Eval results
Files

core-metrics.mddocs/

Core Metric Types

Essential metric interfaces for measuring different aspects of system behavior within Flink applications. These core types provide the foundation for all monitoring and observability in the Flink metrics system.

Capabilities

Counter Interface

Metric for counting discrete events and maintaining running totals. Supports both increment and decrement operations with atomic guarantees.

/**
 * A Counter is a Metric that measures a count.
 */
public interface Counter extends Metric {
    /** Increment the current count by 1. */
    void inc();
    
    /**
     * Increment the current count by the given value.
     * @param n value to increment the current count by
     */
    void inc(long n);
    
    /** Decrement the current count by 1. */
    void dec();
    
    /**
     * Decrement the current count by the given value.
     * @param n value to decrement the current count by
     */
    void dec(long n);
    
    /**
     * Returns the current count.
     * @return current count
     */
    long getCount();
}

Usage Examples:

// Create and use a counter
Counter processedRecords = metricGroup.counter("records-processed");

// Increment operations
processedRecords.inc();           // +1
processedRecords.inc(10);         // +10

// Decrement operations  
processedRecords.dec();           // -1
processedRecords.dec(5);          // -5

// Read current value
long currentCount = processedRecords.getCount();

Gauge Interface

Metric for measuring instantaneous values that can fluctuate over time. Gauges are read-only from the metrics perspective and derive their values from application state.

/**
 * A Gauge is a Metric that calculates a specific value at a point in time.
 * @param <T> the type of the measured value
 */
public interface Gauge<T> extends Metric {
    /**
     * Calculates and returns the measured value.
     * @return calculated value
     */
    T getValue();
}

Usage Examples:

// Create gauge for queue size
Gauge<Integer> queueSizeGauge = new Gauge<Integer>() {
    @Override
    public Integer getValue() {
        return taskQueue.size();
    }
};
metricGroup.gauge("queue-size", queueSizeGauge);

// Create gauge for memory usage
Gauge<Double> memoryUsageGauge = () -> {
    Runtime runtime = Runtime.getRuntime();
    long used = runtime.totalMemory() - runtime.freeMemory();
    return (double) used / runtime.maxMemory();
};
metricGroup.gauge("memory-usage-ratio", memoryUsageGauge);

// Lambda expression gauge
Gauge<String> statusGauge = () -> processor.getCurrentStatus().name();
metricGroup.gauge("processor-status", statusGauge);

Meter Interface

Metric for measuring throughput and event rates over time. Provides both instantaneous rate and cumulative event count.

/**
 * Metric for measuring throughput.
 */
public interface Meter extends Metric {
    /** Mark occurrence of an event. */
    void markEvent();
    
    /**
     * Mark occurrence of multiple events.
     * @param n number of events occurred
     */
    void markEvent(long n);
    
    /**
     * Returns the current rate of events per second.
     * @return current rate of events per second
     */
    double getRate();
    
    /**
     * Get number of events marked on the meter.
     * @return number of events marked on the meter
     */
    long getCount();
}

Usage Examples:

// Create a meter with 60-second time window
Meter throughputMeter = new MeterView(60);
metricGroup.meter("throughput", throughputMeter);

// Mark single events
throughputMeter.markEvent();

// Mark batch events
throughputMeter.markEvent(batchSize);

// Read current rate and total count
double currentRate = throughputMeter.getRate();    // events/second
long totalEvents = throughputMeter.getCount();     // total events marked

Histogram Interface

Metric for recording value distributions and calculating statistical measures. Enables measurement of latencies, sizes, and other value distributions.

/**
 * Histogram interface to be used with Flink's metrics system.
 * The histogram allows to record values, get the current count of recorded 
 * values and create histogram statistics for the currently seen elements.
 */
public interface Histogram extends Metric {
    /**
     * Update the histogram with the given value.
     * @param value Value to update the histogram with
     */
    void update(long value);
    
    /**
     * Get the count of seen elements.
     * @return Count of seen elements
     */
    long getCount();
    
    /**
     * Create statistics for the currently recorded elements.
     * @return Statistics about the currently recorded elements
     */
    HistogramStatistics getStatistics();
}

Histogram Statistics:

/**
 * Histogram statistics represent the current snapshot of elements 
 * recorded in the histogram.
 */
public abstract class HistogramStatistics {
    /**
     * Returns the value for the given quantile based on the histogram statistics.
     * @param quantile Quantile to calculate the value for
     * @return Value for the given quantile
     */
    public abstract double getQuantile(double quantile);
    
    /**
     * Returns the elements of the statistics' sample.
     * @return Elements of the statistics' sample
     */
    public abstract long[] getValues();
    
    /**
     * Returns the size of the statistics' sample.
     * @return Size of the statistics' sample
     */
    public abstract int size();
    
    /**
     * Returns the mean of the histogram values.
     * @return Mean of the histogram values
     */
    public abstract double getMean();
    
    /**
     * Returns the standard deviation of the distribution.
     * @return Standard deviation of histogram distribution
     */
    public abstract double getStdDev();
    
    /**
     * Returns the maximum value of the histogram.
     * @return Maximum value of the histogram
     */
    public abstract long getMax();
    
    /**
     * Returns the minimum value of the histogram.
     * @return Minimum value of the histogram
     */
    public abstract long getMin();
}

Usage Examples:

// Create and use histogram
Histogram latencyHistogram = // ... custom histogram implementation
metricGroup.histogram("request-latency", latencyHistogram);

// Record values
latencyHistogram.update(45);      // 45ms latency
latencyHistogram.update(67);      // 67ms latency
latencyHistogram.update(23);      // 23ms latency

// Get statistics
HistogramStatistics stats = latencyHistogram.getStatistics();
double p95 = stats.getQuantile(0.95);        // 95th percentile
double mean = stats.getMean();               // average latency
double stdDev = stats.getStdDev();          // standard deviation
long max = stats.getMax();                  // maximum latency
long min = stats.getMin();                  // minimum latency
int sampleSize = stats.size();              // number of samples

Base Metric Interface

Common interface implemented by all metric types, providing metric type identification.

/**
 * Common super interface for all metrics.
 */
public interface Metric {
    /**
     * Returns the metric type. Default implementation throws 
     * UnsupportedOperationException for custom metric types.
     * @return MetricType enum value
     */
    default MetricType getMetricType() {
        throw new UnsupportedOperationException("Custom metric types are not supported.");
    }
}

Metric Type Enumeration

Enumeration defining the standard metric types supported by Flink.

/**
 * Enum describing the different metric types.
 */
public enum MetricType {
    COUNTER,
    METER, 
    GAUGE,
    HISTOGRAM
}

Usage Examples:

// Check metric type
if (someMetric.getMetricType() == MetricType.COUNTER) {
    Counter counter = (Counter) someMetric;
    long count = counter.getCount();
}

// Type-specific handling
switch (metric.getMetricType()) {
    case COUNTER:
        handleCounter((Counter) metric);
        break;
    case GAUGE:
        handleGauge((Gauge<?>) metric);
        break;
    case METER:
        handleMeter((Meter) metric);
        break;
    case HISTOGRAM:
        handleHistogram((Histogram) metric);
        break;
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-metrics-core

docs

configuration.md

core-metrics.md

implementations.md

index.md

metric-groups.md

reporters.md

specialized-groups.md

tracing.md

tile.json