Core metrics interfaces and implementations for Apache Flink stream processing framework
—
Component-specific metric groups tailored for different parts of the Flink runtime. These specialized interfaces provide context-aware metric organization and expose relevant sub-groups for specific Flink components like operators, sources, sinks, and coordinators.
Special metric group representing Flink operators, providing access to I/O-specific metrics through a dedicated sub-group.
/**
* Special MetricGroup representing an Operator.
* You should only update the metrics in the main operator thread.
*/
@PublicEvolving
public interface OperatorMetricGroup extends MetricGroup {
/**
* Returns the I/O metric group for this operator.
* @return OperatorIOMetricGroup for I/O metrics
*/
OperatorIOMetricGroup getIOMetricGroup();
}Usage Examples:
// In an operator implementation
public class MyMapOperator extends AbstractStreamOperator<String> {
private Counter processedRecords;
private Counter droppedRecords;
private Gauge<Integer> bufferSize;
@Override
public void open() throws Exception {
super.open();
// Get operator-specific metric group
OperatorMetricGroup operatorMetrics = getRuntimeContext().getMetricGroup();
// Register operator-level metrics
processedRecords = operatorMetrics.counter("records-processed");
droppedRecords = operatorMetrics.counter("records-dropped");
bufferSize = operatorMetrics.gauge("buffer-size", () -> internalBuffer.size());
// Access I/O metrics sub-group
OperatorIOMetricGroup ioMetrics = operatorMetrics.getIOMetricGroup();
// I/O metrics are typically managed by the runtime, but can be accessed here
}
public void processElement(StreamRecord<String> element) {
String value = element.getValue();
if (shouldProcess(value)) {
// Process element...
processedRecords.inc();
} else {
droppedRecords.inc();
}
}
}Metric group specifically for operator I/O metrics, tracking input and output throughput, backpressure, and buffer utilization.
/**
* Metric group that contains shareable pre-defined IO-related metrics for operators.
* You should only update the metrics in the main operator thread.
*/
public interface OperatorIOMetricGroup extends MetricGroup {
/**
* The total number of input records since the operator started.
* Will also populate numRecordsInPerSecond meter.
*/
Counter getNumRecordsInCounter();
/**
* The total number of output records since the operator started.
* Will also populate numRecordsOutPerSecond meter.
*/
Counter getNumRecordsOutCounter();
/**
* The total number of input bytes since the task started.
* Will also populate numBytesInPerSecond meter.
*/
Counter getNumBytesInCounter();
/**
* The total number of output bytes since the task started.
* Will also populate numBytesOutPerSecond meter.
*/
Counter getNumBytesOutCounter();
}Usage Examples:
// Accessing I/O metrics (typically read-only from operator code)
public class StreamingOperator extends AbstractStreamOperator<Output> {
@Override
public void open() throws Exception {
super.open();
OperatorMetricGroup operatorGroup = getRuntimeContext().getMetricGroup();
OperatorIOMetricGroup ioGroup = operatorGroup.getIOMetricGroup();
// Access pre-defined I/O counters provided by Flink runtime
Counter recordsIn = ioGroup.getNumRecordsInCounter();
Counter recordsOut = ioGroup.getNumRecordsOutCounter();
Counter bytesIn = ioGroup.getNumBytesInCounter();
Counter bytesOut = ioGroup.getNumBytesOutCounter();
// These counters are automatically maintained by Flink runtime
// and also populate corresponding rate meters
// You can still add custom I/O-related metrics
ioGroup.gauge("custom-io-metric", () -> getCustomIOValue());
}
}Specialized metric group for source readers in the new source API, providing context for source-specific metrics.
/**
* Pre-defined metrics for SourceReader.
* You should only update the metrics in the main operator thread.
*/
public interface SourceReaderMetricGroup extends OperatorMetricGroup {
/** The total number of record that failed to consume, process, or emit. */
Counter getNumRecordsInErrorsCounter();
/**
* Sets an optional gauge for the number of bytes that have not been fetched by the source.
* e.g. the remaining bytes in a file after the file descriptor reading position.
*
* Note that not every source can report this metric in an plausible and efficient way.
*/
void setPendingBytesGauge(Gauge<Long> pendingBytesGauge);
/**
* Sets an optional gauge for the number of records that have not been fetched by the source.
* e.g. the available records after the consumer offset in a Kafka partition.
*
* Note that not every source can report this metric in an plausible and efficient way.
*/
void setPendingRecordsGauge(Gauge<Long> pendingRecordsGauge);
}Usage Examples:
// In a source reader implementation
public class KafkaSourceReader implements SourceReader<Record, KafkaPartitionSplit> {
private SourceReaderMetricGroup readerMetrics;
private Counter recordsRead;
private Counter bytesRead;
private Gauge<Integer> pendingSplits;
private Histogram recordSize;
public void initialize(SourceReaderContext context) {
this.readerMetrics = context.getMetricGroup();
// Access pre-defined error counter
Counter errorCounter = readerMetrics.getNumRecordsInErrorsCounter();
// Set optional pending gauges if source can provide these metrics
if (canTrackPendingBytes()) {
readerMetrics.setPendingBytesGauge(() -> calculatePendingBytes());
}
if (canTrackPendingRecords()) {
readerMetrics.setPendingRecordsGauge(() -> calculatePendingRecords());
}
// Register additional custom source reader metrics
recordsRead = readerMetrics.counter("records-read");
bytesRead = readerMetrics.counter("bytes-read");
pendingSplits = readerMetrics.gauge("pending-splits", () -> splitQueue.size());
recordSize = readerMetrics.histogram("record-size", new MyHistogram());
}
public InputStatus pollNext(ReaderOutput<Record> output) {
Record record = pollRecord();
if (record != null) {
output.collect(record);
recordsRead.inc();
bytesRead.inc(record.sizeInBytes());
recordSize.update(record.sizeInBytes());
return InputStatus.MORE_AVAILABLE;
}
return InputStatus.NOTHING_AVAILABLE;
}
}Metric group for sink writers, enabling tracking of writing performance and success rates.
/**
* Pre-defined metrics for sinks.
* You should only update the metrics in the main operator thread.
*/
public interface SinkWriterMetricGroup extends OperatorMetricGroup {
/** The total number of records failed to send. */
Counter getNumRecordsOutErrorsCounter();
/**
* The total number of records failed to send.
* This metric has the same value as numRecordsOutError.
*/
Counter getNumRecordsSendErrorsCounter();
/**
* The total number of records have been sent to the downstream system.
* This metric has the same value as numRecordsOut of the operator.
* Note: this counter will count all records the SinkWriter sent.
*/
Counter getNumRecordsSendCounter();
/**
* The total number of output send bytes since the task started.
* This metric has the same value as numBytesOut of the operator.
*/
Counter getNumBytesSendCounter();
/**
* Sets an optional gauge for the time it takes to send the last record.
* This metric is an instantaneous value recorded for the last processed record.
*/
void setCurrentSendTimeGauge(Gauge<Long> currentSendTimeGauge);
}Usage Examples:
// In a sink writer implementation
public class DatabaseSinkWriter implements SinkWriter<Record> {
private SinkWriterMetricGroup writerMetrics;
private Counter recordsWritten;
private Counter writesSucceeded;
private Counter writesFailed;
private Histogram writeLatency;
private Meter writeRate;
public void initialize(SinkWriter.Context context) {
this.writerMetrics = context.getMetricGroup();
// Access pre-defined sink writer counters
Counter outErrors = writerMetrics.getNumRecordsOutErrorsCounter();
Counter sendErrors = writerMetrics.getNumRecordsSendErrorsCounter();
Counter recordsSent = writerMetrics.getNumRecordsSendCounter();
Counter bytesSent = writerMetrics.getNumBytesSendCounter();
// Set optional send time gauge if sink can provide this metric
if (canTrackSendTime()) {
writerMetrics.setCurrentSendTimeGauge(() -> lastSendTimeMillis);
}
// Register additional custom sink writer metrics
recordsWritten = writerMetrics.counter("records-written");
writeLatency = writerMetrics.histogram("write-latency", new LatencyHistogram());
writeRate = writerMetrics.meter("write-rate", new MeterView(30));
// Connection-specific metrics
MetricGroup connectionGroup = writerMetrics.addGroup("connection");
connectionGroup.gauge("pool-size", () -> connectionPool.getActiveConnections());
connectionGroup.gauge("pool-utilization", () -> connectionPool.getUtilization());
}
@Override
public void write(Record record, Context context) {
long startTime = System.currentTimeMillis();
try {
database.write(record);
recordsWritten.inc();
writesSucceeded.inc();
writeRate.markEvent();
long latency = System.currentTimeMillis() - startTime;
writeLatency.update(latency);
} catch (Exception e) {
writesFailed.inc();
throw new RuntimeException("Failed to write record", e);
}
}
}Metric group for sink committers, tracking commit operations and success rates in two-phase commit scenarios.
/**
* Pre-defined metrics for sinks.
* You should only update the metrics in the main operator thread.
*/
public interface SinkCommitterMetricGroup extends OperatorMetricGroup {
/** The total number of committables arrived. */
Counter getNumCommittablesTotalCounter();
/** The total number of committable failures. */
Counter getNumCommittablesFailureCounter();
/** The total number of committable retry. */
Counter getNumCommittablesRetryCounter();
/** The total number of successful committables. */
Counter getNumCommittablesSuccessCounter();
/** The total number of already committed committables. */
Counter getNumCommittablesAlreadyCommittedCounter();
/** The pending committables. */
void setCurrentPendingCommittablesGauge(Gauge<Integer> currentPendingCommittablesGauge);
}Usage Examples:
// In a sink committer implementation
public class TransactionalSinkCommitter implements SinkCommitter<CommitInfo> {
private SinkCommitterMetricGroup committerMetrics;
private Counter commitsAttempted;
private Counter commitsSucceeded;
private Counter commitsFailed;
private Histogram commitLatency;
private Gauge<Integer> pendingCommits;
public void initialize(SinkCommitter.Context context) {
this.committerMetrics = context.getMetricGroup();
// Access pre-defined committer counters
Counter totalCommittables = committerMetrics.getNumCommittablesTotalCounter();
Counter failureCommittables = committerMetrics.getNumCommittablesFailureCounter();
Counter retryCommittables = committerMetrics.getNumCommittablesRetryCounter();
Counter successCommittables = committerMetrics.getNumCommittablesSuccessCounter();
Counter alreadyCommitted = committerMetrics.getNumCommittablesAlreadyCommittedCounter();
// Set optional pending committables gauge if committer can provide this metric
if (canTrackPendingCommittables()) {
committerMetrics.setCurrentPendingCommittablesGauge(() -> pendingCommittables.size());
}
// Register additional custom committer metrics
commitLatency = committerMetrics.histogram("commit-latency", new LatencyHistogram());
// Transaction-specific metrics
MetricGroup txnGroup = committerMetrics.addGroup("transactions");
txnGroup.counter("transactions-started");
txnGroup.counter("transactions-committed");
txnGroup.counter("transactions-aborted");
}
@Override
public List<CommitInfo> commit(List<CommitInfo> commitInfos) {
List<CommitInfo> failedCommits = new ArrayList<>();
for (CommitInfo commitInfo : commitInfos) {
long startTime = System.currentTimeMillis();
commitsAttempted.inc();
try {
database.commit(commitInfo.getTransactionId());
commitsSucceeded.inc();
long latency = System.currentTimeMillis() - startTime;
commitLatency.update(latency);
} catch (Exception e) {
commitsFailed.inc();
failedCommits.add(commitInfo);
}
}
return failedCommits; // Return failed commits for retry
}
}Metric group for cache operations, enabling tracking of cache performance, hit rates, and memory usage. This interface provides pre-defined methods for registering cache-related metrics.
/**
* Pre-defined metrics for cache.
* Please note that these methods should only be invoked once.
* Registering a metric with same name for multiple times would lead to an undefined behavior.
*/
public interface CacheMetricGroup extends MetricGroup {
/** The number of cache hits. */
void hitCounter(Counter hitCounter);
/** The number of cache misses. */
void missCounter(Counter missCounter);
/** The number of times to load data into cache from external system. */
void loadCounter(Counter loadCounter);
/** The number of load failures. */
void numLoadFailuresCounter(Counter numLoadFailuresCounter);
/** The time spent for the latest load operation. */
void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);
/** The number of records in cache. */
void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);
/** The number of bytes used by cache. */
void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
}Usage Examples:
// In a caching operator or function
public class CachingFunction extends RichMapFunction<Input, Output> {
private CacheMetricGroup cacheMetrics;
private Counter hits;
private Counter misses;
private Counter loads;
private Counter loadFailures;
private Gauge<Long> loadTime;
private Gauge<Long> cachedRecords;
private Gauge<Long> cachedBytes;
@Override
public void open(Configuration config) throws Exception {
super.open(config);
// Get cache metric group
this.cacheMetrics = getRuntimeContext().getMetricGroup().addGroup("cache");
// Register cache metrics - each method should only be called once
this.hits = new SimpleCounter();
cacheMetrics.hitCounter(hits);
this.misses = new SimpleCounter();
cacheMetrics.missCounter(misses);
this.loads = new SimpleCounter();
cacheMetrics.loadCounter(loads);
this.loadFailures = new SimpleCounter();
cacheMetrics.numLoadFailuresCounter(loadFailures);
// Register gauges for cache state
cacheMetrics.latestLoadTimeGauge(() -> lastLoadTimeMillis);
cacheMetrics.numCachedRecordsGauge(() -> cache.size());
cacheMetrics.numCachedBytesGauge(() -> cache.getEstimatedSize());
}
@Override
public Output map(Input input) throws Exception {
Output cached = cache.get(input.getKey());
if (cached != null) {
hits.inc();
return cached;
}
// Cache miss - load from external system
misses.inc();
long startTime = System.currentTimeMillis();
try {
loads.inc();
Output result = loadFromExternalSystem(input);
cache.put(input.getKey(), result);
lastLoadTimeMillis = System.currentTimeMillis() - startTime;
return result;
} catch (Exception e) {
loadFailures.inc();
throw e;
}
}
}Metric group for split enumerators in the source connector API, tracking split assignment and discovery.
/**
* Metric group for split enumerators.
*/
public interface SplitEnumeratorMetricGroup extends MetricGroup {
// Extends MetricGroup with split enumerator context
}Usage Examples:
// In a split enumerator implementation
public class KafkaSplitEnumerator implements SplitEnumerator<KafkaPartitionSplit, KafkaState> {
private SplitEnumeratorMetricGroup enumeratorMetrics;
private Counter splitsDiscovered;
private Counter splitsAssigned;
private Gauge<Integer> unassignedSplits;
private Gauge<Integer> activeReaders;
public void initialize(SplitEnumeratorContext<KafkaPartitionSplit> context) {
this.enumeratorMetrics = context.getMetricGroup();
// Register enumerator metrics
splitsDiscovered = enumeratorMetrics.counter("splits-discovered");
splitsAssigned = enumeratorMetrics.counter("splits-assigned");
unassignedSplits = enumeratorMetrics.gauge("unassigned-splits", () -> unassignedSplitQueue.size());
activeReaders = enumeratorMetrics.gauge("active-readers", () -> readerStates.size());
// Topic-specific metrics
MetricGroup topicGroup = enumeratorMetrics.addGroup("topics");
for (String topic : monitoredTopics) {
MetricGroup tGroup = topicGroup.addGroup("topic", topic);
tGroup.gauge("partitions", () -> getPartitionCount(topic));
tGroup.counter("splits-for-topic");
}
}
@Override
public void handleSplitRequest(int subtaskId, String requesterHostname) {
List<KafkaPartitionSplit> availableSplits = getAvailableSplits();
if (!availableSplits.isEmpty()) {
context.assignSplit(availableSplits.get(0), subtaskId);
splitsAssigned.inc();
}
}
@Override
public void addSplitsBack(List<KafkaPartitionSplit> splits, int subtaskId) {
unassignedSplitQueue.addAll(splits);
// Splits are back in the pool for reassignment
}
}Metric group for operator coordinators, which manage coordination between parallel operator instances.
/**
* Metric group for operator coordinators.
*/
public interface OperatorCoordinatorMetricGroup extends MetricGroup {
// Extends MetricGroup with operator coordinator context
}Usage Examples:
// In an operator coordinator implementation
public class CheckpointCoordinator implements OperatorCoordinator {
private OperatorCoordinatorMetricGroup coordinatorMetrics;
private Counter coordinationEvents;
private Counter successfulCoordinations;
private Counter failedCoordinations;
private Gauge<Integer> pendingOperations;
public void initialize(OperatorCoordinator.Context context) {
this.coordinatorMetrics = context.getMetricGroup();
// Register coordinator metrics
coordinationEvents = coordinatorMetrics.counter("coordination-events");
successfulCoordinations = coordinatorMetrics.counter("successful-coordinations");
failedCoordinations = coordinatorMetrics.counter("failed-coordinations");
pendingOperations = coordinatorMetrics.gauge("pending-operations", () -> operationQueue.size());
// Per-subtask metrics
MetricGroup subtaskGroup = coordinatorMetrics.addGroup("subtasks");
for (int i = 0; i < context.currentParallelism(); i++) {
MetricGroup stGroup = subtaskGroup.addGroup("subtask", String.valueOf(i));
stGroup.counter("messages-sent");
stGroup.counter("messages-received");
stGroup.gauge("last-seen", () -> getLastSeenTime(i));
}
}
@Override
public void handleEventFromOperator(int subtask, OperatorEvent event) {
coordinationEvents.inc();
try {
processCoordinationEvent(subtask, event);
successfulCoordinations.inc();
} catch (Exception e) {
failedCoordinations.inc();
throw new RuntimeException("Coordination failed", e);
}
}
}Metric group for cache-related metrics, useful for caching layers and buffering components.
/**
* Metric group for cache metrics.
*/
public interface CacheMetricGroup extends MetricGroup {
// Extends MetricGroup with cache-specific context
}Usage Examples:
// In a caching component
public class LookupCache {
private CacheMetricGroup cacheMetrics;
private Counter hits;
private Counter misses;
private Counter evictions;
private Gauge<Integer> cacheSize;
private Gauge<Double> hitRate;
public void initialize(CacheMetricGroup metrics) {
this.cacheMetrics = metrics;
// Standard cache metrics
hits = cacheMetrics.counter("hits");
misses = cacheMetrics.counter("misses");
evictions = cacheMetrics.counter("evictions");
cacheSize = cacheMetrics.gauge("size", () -> cache.size());
hitRate = cacheMetrics.gauge("hit-rate", this::calculateHitRate);
// Memory metrics
MetricGroup memoryGroup = cacheMetrics.addGroup("memory");
memoryGroup.gauge("used-bytes", () -> cache.estimatedSize());
memoryGroup.gauge("max-bytes", () -> cache.maximumSize());
}
public Object lookup(String key) {
Object value = cache.get(key);
if (value != null) {
hits.inc();
return value;
} else {
misses.inc();
return null;
}
}
private double calculateHitRate() {
long totalHits = hits.getCount();
long totalMisses = misses.getCount();
long total = totalHits + totalMisses;
return total > 0 ? (double) totalHits / total : 0.0;
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-metrics-core