CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-metrics-core

Core metrics interfaces and implementations for Apache Flink stream processing framework

Pending
Overview
Eval results
Files

metric-groups.mddocs/

Metric Organization

Hierarchical metric organization system for creating logical namespaces and managing metric lifecycles. MetricGroup provides the primary interface for registering metrics and creating nested organizational structures.

Capabilities

MetricGroup Interface

Core interface for organizing metrics into hierarchical namespaces. Supports metric registration, group creation, and scope management.

/**
 * A MetricGroup is a named container for Metrics and further metric subgroups.
 * Instances of this class can be used to register new metrics with Flink and 
 * to create a nested hierarchy based on the group names.
 */
public interface MetricGroup {
    
    // Counter registration methods
    /**
     * Creates and registers a new Counter with Flink.
     * @param name name of the counter
     * @return the created counter
     */
    Counter counter(String name);
    
    /**
     * Creates and registers a new Counter with Flink.
     * @param name name of the counter (as integer)
     * @return the created counter
     */
    default Counter counter(int name);
    
    /**
     * Registers a Counter with Flink.
     * @param name name of the counter
     * @param counter counter to register
     * @return the given counter
     */
    <C extends Counter> C counter(String name, C counter);
    
    /**
     * Registers a Counter with Flink.
     * @param name name of the counter (as integer)
     * @param counter counter to register
     * @return the given counter
     */
    default <C extends Counter> C counter(int name, C counter);
    
    // Gauge registration methods
    /**
     * Registers a new Gauge with Flink.
     * @param name name of the gauge
     * @param gauge gauge to register
     * @return the given gauge
     */
    <T, G extends Gauge<T>> G gauge(String name, G gauge);
    
    /**
     * Registers a new Gauge with Flink.
     * @param name name of the gauge (as integer)
     * @param gauge gauge to register
     * @return the given gauge
     */
    default <T, G extends Gauge<T>> G gauge(int name, G gauge);
    
    // Histogram registration methods
    /**
     * Registers a new Histogram with Flink.
     * @param name name of the histogram
     * @param histogram histogram to register
     * @return the registered histogram
     */
    <H extends Histogram> H histogram(String name, H histogram);
    
    /**
     * Registers a new Histogram with Flink.
     * @param name name of the histogram (as integer)
     * @param histogram histogram to register
     * @return the registered histogram
     */
    default <H extends Histogram> H histogram(int name, H histogram);
    
    // Meter registration methods
    /**
     * Registers a new Meter with Flink.
     * @param name name of the meter
     * @param meter meter to register
     * @return the registered meter
     */
    <M extends Meter> M meter(String name, M meter);
    
    /**
     * Registers a new Meter with Flink.
     * @param name name of the meter (as integer)
     * @param meter meter to register
     * @return the registered meter
     */
    default <M extends Meter> M meter(int name, M meter);
    
    // Group creation methods
    /**
     * Creates a new MetricGroup and adds it to this groups sub-groups.
     * @param name name of the group
     * @return the created group
     */
    MetricGroup addGroup(String name);
    
    /**
     * Creates a new MetricGroup and adds it to this groups sub-groups.
     * @param name name of the group (as integer)
     * @return the created group
     */
    default MetricGroup addGroup(int name);
    
    /**
     * Creates a new key-value MetricGroup pair. The key group is added to 
     * this groups sub-groups, while the value group is added to the key 
     * group's sub-groups. This method returns the value group.
     * @param key name of the first group
     * @param value name of the second group
     * @return the second created group
     */
    MetricGroup addGroup(String key, String value);
    
    // Scope and identification methods
    /**
     * Gets the scope as an array of the scope components.
     * @return scope components array
     */
    String[] getScopeComponents();
    
    /**
     * Returns a map of all variables and their associated value.
     * @return map of all variables and their associated value
     */
    Map<String, String> getAllVariables();
    
    /**
     * Returns the fully qualified metric name.
     * @param metricName metric name
     * @return fully qualified metric name
     */
    String getMetricIdentifier(String metricName);
    
    /**
     * Returns the fully qualified metric name with character filtering.
     * @param metricName metric name
     * @param filter character filter applied to scope components
     * @return fully qualified metric name
     */
    String getMetricIdentifier(String metricName, CharacterFilter filter);
    
    // Experimental span support
    /**
     * Adds a span to this metric group (experimental feature).
     * @param spanBuilder span builder to add
     */
    @Experimental
    default void addSpan(SpanBuilder spanBuilder) {}
}

Usage Examples:

// Basic metric registration
MetricGroup rootGroup = // ... obtained from runtime context
Counter totalEvents = rootGroup.counter("total-events");
Gauge<Integer> queueSize = rootGroup.gauge("queue-size", () -> queue.size());

// Hierarchical organization
MetricGroup operatorGroup = rootGroup.addGroup("operators");
MetricGroup mapOpGroup = operatorGroup.addGroup("map-operator");
Counter mapRecords = mapOpGroup.counter("records-processed");

// Key-value groups for dynamic naming
MetricGroup taskGroup = rootGroup.addGroup("task", taskId);
Counter taskCounter = taskGroup.counter("events");

// Multiple nesting levels
MetricGroup deepGroup = rootGroup
    .addGroup("processing")
    .addGroup("stage-1")  
    .addGroup("partition", String.valueOf(partitionId));

// Using integer names for dynamic metrics
for (int i = 0; i < numPartitions; i++) {
    MetricGroup partitionGroup = rootGroup.addGroup("partition", String.valueOf(i));
    Counter partitionCounter = partitionGroup.counter("records");
}

Logical Scope Provider

Extension interface for metric groups that support logical scopes, providing additional scope formatting capabilities.

/**
 * Extension for metric groups that support logical scopes.
 * This interface removes the need for reporters to depend on flink-runtime 
 * to access the logical scope.
 */
public interface LogicalScopeProvider {
    /**
     * Returns the logical scope for the metric group with the given filter 
     * applied to all scope components.
     * @param filter filter to apply to all scope components
     * @return logical scope
     */
    String getLogicalScope(CharacterFilter filter);
    
    /**
     * Returns the logical scope for the metric group with the given filter 
     * applied and the given delimiter used to concatenate scope components.
     * @param filter filter to apply to all scope components
     * @param delimiter delimiter to use for concatenating scope components
     * @return logical scope
     */
    String getLogicalScope(CharacterFilter filter, char delimiter);
    
    /**
     * Returns the underlying metric group.
     * @return underlying metric group
     */
    MetricGroup getWrappedMetricGroup();
    
    /**
     * Casts the given metric group to a LogicalScopeProvider, if it 
     * implements the interface.
     * @param metricGroup metric group to cast
     * @return cast metric group
     * @throws IllegalStateException if the metric group did not implement 
     *         the LogicalScopeProvider interface
     */
    static LogicalScopeProvider castFrom(MetricGroup metricGroup) 
            throws IllegalStateException {
        if (metricGroup instanceof LogicalScopeProvider) {
            return (LogicalScopeProvider) metricGroup;
        } else {
            throw new IllegalStateException(
                "The given metric group does not implement the LogicalScopeProvider interface.");
        }
    }
}

Usage Examples:

// Check if group supports logical scopes
if (metricGroup instanceof LogicalScopeProvider) {
    LogicalScopeProvider provider = (LogicalScopeProvider) metricGroup;
    String logicalScope = provider.getLogicalScope(CharacterFilter.NO_OP_FILTER);
}

// Safe casting with error handling
try {
    LogicalScopeProvider provider = LogicalScopeProvider.castFrom(metricGroup);
    String scope = provider.getLogicalScope(characterFilter, '.');
} catch (IllegalStateException e) {
    // Handle groups that don't support logical scopes
}

// Custom delimiter usage
LogicalScopeProvider provider = LogicalScopeProvider.castFrom(metricGroup);
String underscoreScope = provider.getLogicalScope(filter, '_');
String colonScope = provider.getLogicalScope(filter, ':');

Scope Management

Understanding how metric scopes work for building qualified metric names.

Scope Components:

// Example scope hierarchy: root -> operators -> map-op -> subtask-0
MetricGroup rootGroup = getRootGroup();           // []
MetricGroup opGroup = rootGroup.addGroup("operators");      // ["operators"]
MetricGroup mapGroup = opGroup.addGroup("map-op");          // ["operators", "map-op"]  
MetricGroup subtaskGroup = mapGroup.addGroup("subtask-0");  // ["operators", "map-op", "subtask-0"]

// Get scope information
String[] components = subtaskGroup.getScopeComponents();
// Result: ["operators", "map-op", "subtask-0"]

String identifier = subtaskGroup.getMetricIdentifier("records-processed");
// Result: "operators.map-op.subtask-0.records-processed"

Variable Substitution:

// Key-value groups create variables
MetricGroup taskGroup = rootGroup.addGroup("task", "task-123");
Map<String, String> variables = taskGroup.getAllVariables();
// Result: {"task": "task-123"}

// Variables can be used in scope templates by reporters
String template = "<host>.<tm_id>.<job_name>.<task>.<metric_name>";
// Variables would be substituted during reporting

Character Filtering:

// Define custom character filter
CharacterFilter dotToUnderscore = input -> input.replace('.', '_');

// Apply filter to metric identifier
String filtered = metricGroup.getMetricIdentifier("my.metric.name", dotToUnderscore);
// Transforms scope components but not the metric name itself

Unregistered Metrics Group

No-operation implementation that doesn't register metrics with the metrics system, useful for testing or disabled metrics scenarios.

/**
 * A special MetricGroup that does not register any metrics at the 
 * metrics registry and any reporters.
 */
@Internal
public class UnregisteredMetricsGroup implements MetricGroup {
    // All methods return no-op implementations or the passed-in metrics
    // without actual registration
}

Usage Examples:

// Create unregistered group for testing
MetricGroup testGroup = new UnregisteredMetricsGroup();

// Metrics are created but not reported
Counter counter = testGroup.counter("test-counter");
counter.inc(); // Works locally but not reported

// Useful for unit tests
public class MyOperatorTest {
    @Test
    public void testOperator() {
        MetricGroup metrics = new UnregisteredMetricsGroup();
        MyOperator operator = new MyOperator(metrics);
        // Test operator logic without metric reporting overhead
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-metrics-core

docs

configuration.md

core-metrics.md

implementations.md

index.md

metric-groups.md

reporters.md

specialized-groups.md

tracing.md

tile.json