CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-metrics-core

Core metrics interfaces and implementations for Apache Flink stream processing framework

Pending
Overview
Eval results
Files

specialized-groups.mddocs/

Specialized Metric Groups

Component-specific metric groups tailored for different parts of the Flink runtime. These specialized interfaces provide context-aware metric organization and expose relevant sub-groups for specific Flink components like operators, sources, sinks, and coordinators.

Capabilities

OperatorMetricGroup Interface

Special metric group representing Flink operators, providing access to I/O-specific metrics through a dedicated sub-group.

/**
 * Special MetricGroup representing an Operator.
 * You should only update the metrics in the main operator thread.
 */
@PublicEvolving
public interface OperatorMetricGroup extends MetricGroup {
    /**
     * Returns the I/O metric group for this operator.
     * @return OperatorIOMetricGroup for I/O metrics
     */
    OperatorIOMetricGroup getIOMetricGroup();
}

Usage Examples:

// In an operator implementation
public class MyMapOperator extends AbstractStreamOperator<String> {
    private Counter processedRecords;
    private Counter droppedRecords;
    private Gauge<Integer> bufferSize;
    
    @Override
    public void open() throws Exception {
        super.open();
        
        // Get operator-specific metric group
        OperatorMetricGroup operatorMetrics = getRuntimeContext().getMetricGroup();
        
        // Register operator-level metrics
        processedRecords = operatorMetrics.counter("records-processed");
        droppedRecords = operatorMetrics.counter("records-dropped");
        bufferSize = operatorMetrics.gauge("buffer-size", () -> internalBuffer.size());
        
        // Access I/O metrics sub-group
        OperatorIOMetricGroup ioMetrics = operatorMetrics.getIOMetricGroup();
        // I/O metrics are typically managed by the runtime, but can be accessed here
    }
    
    public void processElement(StreamRecord<String> element) {
        String value = element.getValue();
        
        if (shouldProcess(value)) {
            // Process element...
            processedRecords.inc();
        } else {
            droppedRecords.inc();
        }
    }
}

OperatorIOMetricGroup Interface

Metric group specifically for operator I/O metrics, tracking input and output throughput, backpressure, and buffer utilization.

/**
 * Metric group that contains shareable pre-defined IO-related metrics for operators.
 * You should only update the metrics in the main operator thread.
 */
public interface OperatorIOMetricGroup extends MetricGroup {
    /**
     * The total number of input records since the operator started.
     * Will also populate numRecordsInPerSecond meter.
     */
    Counter getNumRecordsInCounter();

    /**
     * The total number of output records since the operator started.
     * Will also populate numRecordsOutPerSecond meter.
     */
    Counter getNumRecordsOutCounter();

    /**
     * The total number of input bytes since the task started.
     * Will also populate numBytesInPerSecond meter.
     */
    Counter getNumBytesInCounter();

    /**
     * The total number of output bytes since the task started.
     * Will also populate numBytesOutPerSecond meter.
     */
    Counter getNumBytesOutCounter();
}

Usage Examples:

// Accessing I/O metrics (typically read-only from operator code)
public class StreamingOperator extends AbstractStreamOperator<Output> {
    
    @Override
    public void open() throws Exception {
        super.open();
        
        OperatorMetricGroup operatorGroup = getRuntimeContext().getMetricGroup();
        OperatorIOMetricGroup ioGroup = operatorGroup.getIOMetricGroup();
        
        // Access pre-defined I/O counters provided by Flink runtime
        Counter recordsIn = ioGroup.getNumRecordsInCounter();
        Counter recordsOut = ioGroup.getNumRecordsOutCounter();
        Counter bytesIn = ioGroup.getNumBytesInCounter();
        Counter bytesOut = ioGroup.getNumBytesOutCounter();
        
        // These counters are automatically maintained by Flink runtime
        // and also populate corresponding rate meters
        
        // You can still add custom I/O-related metrics
        ioGroup.gauge("custom-io-metric", () -> getCustomIOValue());
    }
}

SourceReaderMetricGroup Interface

Specialized metric group for source readers in the new source API, providing context for source-specific metrics.

/**
 * Pre-defined metrics for SourceReader.
 * You should only update the metrics in the main operator thread.
 */
public interface SourceReaderMetricGroup extends OperatorMetricGroup {
    /** The total number of record that failed to consume, process, or emit. */
    Counter getNumRecordsInErrorsCounter();

    /**
     * Sets an optional gauge for the number of bytes that have not been fetched by the source.
     * e.g. the remaining bytes in a file after the file descriptor reading position.
     * 
     * Note that not every source can report this metric in an plausible and efficient way.
     */
    void setPendingBytesGauge(Gauge<Long> pendingBytesGauge);

    /**
     * Sets an optional gauge for the number of records that have not been fetched by the source.
     * e.g. the available records after the consumer offset in a Kafka partition.
     * 
     * Note that not every source can report this metric in an plausible and efficient way.
     */
    void setPendingRecordsGauge(Gauge<Long> pendingRecordsGauge);
}

Usage Examples:

// In a source reader implementation
public class KafkaSourceReader implements SourceReader<Record, KafkaPartitionSplit> {
    private SourceReaderMetricGroup readerMetrics;
    private Counter recordsRead;
    private Counter bytesRead;
    private Gauge<Integer> pendingSplits;
    private Histogram recordSize;
    
    public void initialize(SourceReaderContext context) {
        this.readerMetrics = context.getMetricGroup();
        
        // Access pre-defined error counter
        Counter errorCounter = readerMetrics.getNumRecordsInErrorsCounter();
        
        // Set optional pending gauges if source can provide these metrics
        if (canTrackPendingBytes()) {
            readerMetrics.setPendingBytesGauge(() -> calculatePendingBytes());
        }
        if (canTrackPendingRecords()) {
            readerMetrics.setPendingRecordsGauge(() -> calculatePendingRecords());
        }
        
        // Register additional custom source reader metrics
        recordsRead = readerMetrics.counter("records-read");
        bytesRead = readerMetrics.counter("bytes-read");
        pendingSplits = readerMetrics.gauge("pending-splits", () -> splitQueue.size());
        recordSize = readerMetrics.histogram("record-size", new MyHistogram());
    }
    
    public InputStatus pollNext(ReaderOutput<Record> output) {
        Record record = pollRecord();
        if (record != null) {
            output.collect(record);
            recordsRead.inc();
            bytesRead.inc(record.sizeInBytes());
            recordSize.update(record.sizeInBytes());
            return InputStatus.MORE_AVAILABLE;
        }
        return InputStatus.NOTHING_AVAILABLE;
    }
}

SinkWriterMetricGroup Interface

Metric group for sink writers, enabling tracking of writing performance and success rates.

/**
 * Pre-defined metrics for sinks.
 * You should only update the metrics in the main operator thread.
 */
public interface SinkWriterMetricGroup extends OperatorMetricGroup {
    /** The total number of records failed to send. */
    Counter getNumRecordsOutErrorsCounter();

    /**
     * The total number of records failed to send.
     * This metric has the same value as numRecordsOutError.
     */
    Counter getNumRecordsSendErrorsCounter();

    /**
     * The total number of records have been sent to the downstream system.
     * This metric has the same value as numRecordsOut of the operator.
     * Note: this counter will count all records the SinkWriter sent.
     */
    Counter getNumRecordsSendCounter();

    /**
     * The total number of output send bytes since the task started.
     * This metric has the same value as numBytesOut of the operator.
     */
    Counter getNumBytesSendCounter();

    /**
     * Sets an optional gauge for the time it takes to send the last record.
     * This metric is an instantaneous value recorded for the last processed record.
     */
    void setCurrentSendTimeGauge(Gauge<Long> currentSendTimeGauge);
}

Usage Examples:

// In a sink writer implementation
public class DatabaseSinkWriter implements SinkWriter<Record> {
    private SinkWriterMetricGroup writerMetrics;
    private Counter recordsWritten;
    private Counter writesSucceeded;
    private Counter writesFailed;
    private Histogram writeLatency;
    private Meter writeRate;
    
    public void initialize(SinkWriter.Context context) {
        this.writerMetrics = context.getMetricGroup();
        
        // Access pre-defined sink writer counters
        Counter outErrors = writerMetrics.getNumRecordsOutErrorsCounter();
        Counter sendErrors = writerMetrics.getNumRecordsSendErrorsCounter();
        Counter recordsSent = writerMetrics.getNumRecordsSendCounter();
        Counter bytesSent = writerMetrics.getNumBytesSendCounter();
        
        // Set optional send time gauge if sink can provide this metric
        if (canTrackSendTime()) {
            writerMetrics.setCurrentSendTimeGauge(() -> lastSendTimeMillis);
        }
        
        // Register additional custom sink writer metrics
        recordsWritten = writerMetrics.counter("records-written");
        writeLatency = writerMetrics.histogram("write-latency", new LatencyHistogram());
        writeRate = writerMetrics.meter("write-rate", new MeterView(30));
        
        // Connection-specific metrics
        MetricGroup connectionGroup = writerMetrics.addGroup("connection");
        connectionGroup.gauge("pool-size", () -> connectionPool.getActiveConnections());
        connectionGroup.gauge("pool-utilization", () -> connectionPool.getUtilization());
    }
    
    @Override
    public void write(Record record, Context context) {
        long startTime = System.currentTimeMillis();
        
        try {
            database.write(record);
            recordsWritten.inc();
            writesSucceeded.inc();
            writeRate.markEvent();
            
            long latency = System.currentTimeMillis() - startTime;
            writeLatency.update(latency);
            
        } catch (Exception e) {
            writesFailed.inc();
            throw new RuntimeException("Failed to write record", e);
        }
    }
}

SinkCommitterMetricGroup Interface

Metric group for sink committers, tracking commit operations and success rates in two-phase commit scenarios.

/**
 * Pre-defined metrics for sinks.
 * You should only update the metrics in the main operator thread.
 */
public interface SinkCommitterMetricGroup extends OperatorMetricGroup {
    /** The total number of committables arrived. */
    Counter getNumCommittablesTotalCounter();

    /** The total number of committable failures. */
    Counter getNumCommittablesFailureCounter();

    /** The total number of committable retry. */
    Counter getNumCommittablesRetryCounter();

    /** The total number of successful committables. */
    Counter getNumCommittablesSuccessCounter();

    /** The total number of already committed committables. */
    Counter getNumCommittablesAlreadyCommittedCounter();

    /** The pending committables. */
    void setCurrentPendingCommittablesGauge(Gauge<Integer> currentPendingCommittablesGauge);
}

Usage Examples:

// In a sink committer implementation
public class TransactionalSinkCommitter implements SinkCommitter<CommitInfo> {
    private SinkCommitterMetricGroup committerMetrics;
    private Counter commitsAttempted;
    private Counter commitsSucceeded;
    private Counter commitsFailed;
    private Histogram commitLatency;
    private Gauge<Integer> pendingCommits;
    
    public void initialize(SinkCommitter.Context context) {
        this.committerMetrics = context.getMetricGroup();
        
        // Access pre-defined committer counters
        Counter totalCommittables = committerMetrics.getNumCommittablesTotalCounter();
        Counter failureCommittables = committerMetrics.getNumCommittablesFailureCounter();
        Counter retryCommittables = committerMetrics.getNumCommittablesRetryCounter();
        Counter successCommittables = committerMetrics.getNumCommittablesSuccessCounter();
        Counter alreadyCommitted = committerMetrics.getNumCommittablesAlreadyCommittedCounter();
        
        // Set optional pending committables gauge if committer can provide this metric
        if (canTrackPendingCommittables()) {
            committerMetrics.setCurrentPendingCommittablesGauge(() -> pendingCommittables.size());
        }
        
        // Register additional custom committer metrics
        commitLatency = committerMetrics.histogram("commit-latency", new LatencyHistogram());
        
        // Transaction-specific metrics
        MetricGroup txnGroup = committerMetrics.addGroup("transactions");
        txnGroup.counter("transactions-started");
        txnGroup.counter("transactions-committed");
        txnGroup.counter("transactions-aborted");
    }
    
    @Override
    public List<CommitInfo> commit(List<CommitInfo> commitInfos) {
        List<CommitInfo> failedCommits = new ArrayList<>();
        
        for (CommitInfo commitInfo : commitInfos) {
            long startTime = System.currentTimeMillis();
            commitsAttempted.inc();
            
            try {
                database.commit(commitInfo.getTransactionId());
                commitsSucceeded.inc();
                
                long latency = System.currentTimeMillis() - startTime;
                commitLatency.update(latency);
                
            } catch (Exception e) {
                commitsFailed.inc();
                failedCommits.add(commitInfo);
            }
        }
        
        return failedCommits; // Return failed commits for retry
    }
}

CacheMetricGroup Interface

Metric group for cache operations, enabling tracking of cache performance, hit rates, and memory usage. This interface provides pre-defined methods for registering cache-related metrics.

/**
 * Pre-defined metrics for cache.
 * Please note that these methods should only be invoked once.
 * Registering a metric with same name for multiple times would lead to an undefined behavior.
 */
public interface CacheMetricGroup extends MetricGroup {
    /** The number of cache hits. */
    void hitCounter(Counter hitCounter);

    /** The number of cache misses. */
    void missCounter(Counter missCounter);

    /** The number of times to load data into cache from external system. */
    void loadCounter(Counter loadCounter);

    /** The number of load failures. */
    void numLoadFailuresCounter(Counter numLoadFailuresCounter);

    /** The time spent for the latest load operation. */
    void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);

    /** The number of records in cache. */
    void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);

    /** The number of bytes used by cache. */
    void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
}

Usage Examples:

// In a caching operator or function
public class CachingFunction extends RichMapFunction<Input, Output> {
    private CacheMetricGroup cacheMetrics;
    private Counter hits;
    private Counter misses;
    private Counter loads;
    private Counter loadFailures;
    private Gauge<Long> loadTime;
    private Gauge<Long> cachedRecords;
    private Gauge<Long> cachedBytes;
    
    @Override
    public void open(Configuration config) throws Exception {
        super.open(config);
        
        // Get cache metric group
        this.cacheMetrics = getRuntimeContext().getMetricGroup().addGroup("cache");
        
        // Register cache metrics - each method should only be called once
        this.hits = new SimpleCounter();
        cacheMetrics.hitCounter(hits);
        
        this.misses = new SimpleCounter();
        cacheMetrics.missCounter(misses);
        
        this.loads = new SimpleCounter();
        cacheMetrics.loadCounter(loads);
        
        this.loadFailures = new SimpleCounter();
        cacheMetrics.numLoadFailuresCounter(loadFailures);
        
        // Register gauges for cache state
        cacheMetrics.latestLoadTimeGauge(() -> lastLoadTimeMillis);
        cacheMetrics.numCachedRecordsGauge(() -> cache.size());
        cacheMetrics.numCachedBytesGauge(() -> cache.getEstimatedSize());
    }
    
    @Override
    public Output map(Input input) throws Exception {
        Output cached = cache.get(input.getKey());
        if (cached != null) {
            hits.inc();
            return cached;
        }
        
        // Cache miss - load from external system
        misses.inc();
        long startTime = System.currentTimeMillis();
        
        try {
            loads.inc();
            Output result = loadFromExternalSystem(input);
            cache.put(input.getKey(), result);
            
            lastLoadTimeMillis = System.currentTimeMillis() - startTime;
            return result;
            
        } catch (Exception e) {
            loadFailures.inc();
            throw e;
        }
    }
}

SplitEnumeratorMetricGroup Interface

Metric group for split enumerators in the source connector API, tracking split assignment and discovery.

/**
 * Metric group for split enumerators. 
 */
public interface SplitEnumeratorMetricGroup extends MetricGroup {
    // Extends MetricGroup with split enumerator context
}

Usage Examples:

// In a split enumerator implementation
public class KafkaSplitEnumerator implements SplitEnumerator<KafkaPartitionSplit, KafkaState> {
    private SplitEnumeratorMetricGroup enumeratorMetrics;
    private Counter splitsDiscovered;
    private Counter splitsAssigned;
    private Gauge<Integer> unassignedSplits;
    private Gauge<Integer> activeReaders;
    
    public void initialize(SplitEnumeratorContext<KafkaPartitionSplit> context) {
        this.enumeratorMetrics = context.getMetricGroup();
        
        // Register enumerator metrics
        splitsDiscovered = enumeratorMetrics.counter("splits-discovered");
        splitsAssigned = enumeratorMetrics.counter("splits-assigned");
        unassignedSplits = enumeratorMetrics.gauge("unassigned-splits", () -> unassignedSplitQueue.size());
        activeReaders = enumeratorMetrics.gauge("active-readers", () -> readerStates.size());
        
        // Topic-specific metrics
        MetricGroup topicGroup = enumeratorMetrics.addGroup("topics");
        for (String topic : monitoredTopics) {
            MetricGroup tGroup = topicGroup.addGroup("topic", topic);
            tGroup.gauge("partitions", () -> getPartitionCount(topic));
            tGroup.counter("splits-for-topic");
        }
    }
    
    @Override
    public void handleSplitRequest(int subtaskId, String requesterHostname) {
        List<KafkaPartitionSplit> availableSplits = getAvailableSplits();
        
        if (!availableSplits.isEmpty()) {
            context.assignSplit(availableSplits.get(0), subtaskId);
            splitsAssigned.inc();
        }
    }
    
    @Override
    public void addSplitsBack(List<KafkaPartitionSplit> splits, int subtaskId) {
        unassignedSplitQueue.addAll(splits);
        // Splits are back in the pool for reassignment
    }
}

OperatorCoordinatorMetricGroup Interface

Metric group for operator coordinators, which manage coordination between parallel operator instances.

/**
 * Metric group for operator coordinators.
 */
public interface OperatorCoordinatorMetricGroup extends MetricGroup {
    // Extends MetricGroup with operator coordinator context
}

Usage Examples:

// In an operator coordinator implementation
public class CheckpointCoordinator implements OperatorCoordinator {
    private OperatorCoordinatorMetricGroup coordinatorMetrics;
    private Counter coordinationEvents;
    private Counter successfulCoordinations;
    private Counter failedCoordinations;
    private Gauge<Integer> pendingOperations;
    
    public void initialize(OperatorCoordinator.Context context) {
        this.coordinatorMetrics = context.getMetricGroup();
        
        // Register coordinator metrics
        coordinationEvents = coordinatorMetrics.counter("coordination-events");
        successfulCoordinations = coordinatorMetrics.counter("successful-coordinations");
        failedCoordinations = coordinatorMetrics.counter("failed-coordinations");
        pendingOperations = coordinatorMetrics.gauge("pending-operations", () -> operationQueue.size());
        
        // Per-subtask metrics
        MetricGroup subtaskGroup = coordinatorMetrics.addGroup("subtasks");
        for (int i = 0; i < context.currentParallelism(); i++) {
            MetricGroup stGroup = subtaskGroup.addGroup("subtask", String.valueOf(i));
            stGroup.counter("messages-sent");
            stGroup.counter("messages-received");
            stGroup.gauge("last-seen", () -> getLastSeenTime(i));
        }
    }
    
    @Override
    public void handleEventFromOperator(int subtask, OperatorEvent event) {
        coordinationEvents.inc();
        
        try {
            processCoordinationEvent(subtask, event);
            successfulCoordinations.inc();
        } catch (Exception e) {
            failedCoordinations.inc();
            throw new RuntimeException("Coordination failed", e);
        }
    }
}

CacheMetricGroup Interface

Metric group for cache-related metrics, useful for caching layers and buffering components.

/**
 * Metric group for cache metrics.
 */
public interface CacheMetricGroup extends MetricGroup {
    // Extends MetricGroup with cache-specific context
}

Usage Examples:

// In a caching component
public class LookupCache {
    private CacheMetricGroup cacheMetrics;
    private Counter hits;
    private Counter misses;
    private Counter evictions;
    private Gauge<Integer> cacheSize;
    private Gauge<Double> hitRate;
    
    public void initialize(CacheMetricGroup metrics) {
        this.cacheMetrics = metrics;
        
        // Standard cache metrics
        hits = cacheMetrics.counter("hits");
        misses = cacheMetrics.counter("misses");
        evictions = cacheMetrics.counter("evictions");
        cacheSize = cacheMetrics.gauge("size", () -> cache.size());
        hitRate = cacheMetrics.gauge("hit-rate", this::calculateHitRate);
        
        // Memory metrics
        MetricGroup memoryGroup = cacheMetrics.addGroup("memory");
        memoryGroup.gauge("used-bytes", () -> cache.estimatedSize());
        memoryGroup.gauge("max-bytes", () -> cache.maximumSize());
    }
    
    public Object lookup(String key) {
        Object value = cache.get(key);
        
        if (value != null) {
            hits.inc();
            return value;
        } else {
            misses.inc();
            return null;
        }
    }
    
    private double calculateHitRate() {
        long totalHits = hits.getCount();
        long totalMisses = misses.getCount();
        long total = totalHits + totalMisses;
        
        return total > 0 ? (double) totalHits / total : 0.0;
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-metrics-core

docs

configuration.md

core-metrics.md

implementations.md

index.md

metric-groups.md

reporters.md

specialized-groups.md

tracing.md

tile.json