CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-metrics-core

Core metrics interfaces and implementations for Apache Flink stream processing framework

Pending
Overview
Eval results
Files

configuration.mddocs/

Configuration and Utilities

Configuration management and utility classes for metric system setup. Provides type-safe property access, character filtering for metric names, and supporting interfaces for metric system operation.

Capabilities

MetricConfig Class

Type-safe configuration class extending Properties with utility methods for extracting primitive values with defaults.

/**
 * A properties class with added utility method to extract primitives.
 */
public class MetricConfig extends Properties {
    
    /**
     * Gets string property with default value.
     * @param key the property key
     * @param defaultValue default value if key not found
     * @return property value or default
     */
    public String getString(String key, String defaultValue) { /* implementation */ }
    
    /**
     * Gets integer property with default value.
     * @param key the property key
     * @param defaultValue default value if key not found
     * @return property value parsed as int or default
     */
    public int getInteger(String key, int defaultValue) { /* implementation */ }
    
    /**
     * Gets long property with default value.
     * @param key the property key
     * @param defaultValue default value if key not found
     * @return property value parsed as long or default
     */
    public long getLong(String key, long defaultValue) { /* implementation */ }
    
    /**
     * Gets float property with default value.
     * @param key the property key
     * @param defaultValue default value if key not found
     * @return property value parsed as float or default
     */
    public float getFloat(String key, float defaultValue) { /* implementation */ }
    
    /**
     * Gets double property with default value.
     * @param key the property key
     * @param defaultValue default value if key not found
     * @return property value parsed as double or default
     */
    public double getDouble(String key, double defaultValue) { /* implementation */ }
    
    /**
     * Gets boolean property with default value.
     * @param key the property key
     * @param defaultValue default value if key not found
     * @return property value parsed as boolean or default
     */
    public boolean getBoolean(String key, boolean defaultValue) { /* implementation */ }
}

Usage Examples:

// Creating and using MetricConfig
MetricConfig config = new MetricConfig();

// Set properties (inherited from Properties)
config.setProperty("reporter.host", "metrics.example.com");
config.setProperty("reporter.port", "8080");
config.setProperty("reporter.enabled", "true");
config.setProperty("reporter.batch.size", "100");
config.setProperty("reporter.timeout", "30.5");

// Type-safe property retrieval with defaults
String host = config.getString("reporter.host", "localhost");
int port = config.getInteger("reporter.port", 9090);
boolean enabled = config.getBoolean("reporter.enabled", false);
int batchSize = config.getInteger("reporter.batch.size", 50);
double timeout = config.getDouble("reporter.timeout", 10.0);

// Missing keys return defaults
String missingKey = config.getString("non.existent", "default-value");
int missingInt = config.getInteger("missing.int", 42);

// Use in reporter configuration
public class ConfigurableReporter implements MetricReporter {
    private String endpoint;
    private int batchSize;
    private long flushInterval;
    private boolean compressionEnabled;
    
    @Override
    public void open(MetricConfig config) {
        // Required configuration
        this.endpoint = config.getString("endpoint", null);
        if (endpoint == null) {
            throw new IllegalArgumentException("endpoint is required");
        }
        
        // Optional configuration with sensible defaults
        this.batchSize = config.getInteger("batch.size", 100);
        this.flushInterval = config.getLong("flush.interval", 5000L);
        this.compressionEnabled = config.getBoolean("compression.enabled", true);
        
        // Validation
        if (batchSize <= 0) {
            throw new IllegalArgumentException("batch.size must be positive");
        }
        
        initializeReporter();
    }
}

CharacterFilter Interface

Function interface for filtering and transforming strings, commonly used for metric name normalization across different backends.

/**
 * Interface for a character filter function. The filter function is given 
 * a string which the filter can transform. The returned string is the 
 * transformation result.
 */
public interface CharacterFilter {
    
    /** No-operation filter that returns input unchanged. */
    CharacterFilter NO_OP_FILTER = input -> input;
    
    /**
     * Filter the given string and generate a resulting string from it.
     * For example, one implementation could filter out invalid characters 
     * from the input string.
     * @param input Input string
     * @return Filtered result string
     */
    String filterCharacters(String input);
}

Usage Examples:

// Common character filters
CharacterFilter noOp = CharacterFilter.NO_OP_FILTER;
CharacterFilter dotToUnderscore = input -> input.replace('.', '_');
CharacterFilter spacesToDashes = input -> input.replace(' ', '-');
CharacterFilter alphanumericOnly = input -> input.replaceAll("[^a-zA-Z0-9]", "");

// Combining filters
CharacterFilter combined = input -> {
    String result = input.toLowerCase();           // lowercase
    result = result.replace(' ', '-');            // spaces to dashes
    result = result.replaceAll("[^a-z0-9-]", ""); // alphanumeric + dashes only
    return result;
};

// Usage with metric identifiers
MetricGroup group = getRootGroup().addGroup("My Operator");
String metricName = "Records Processed Per Second";

String defaultId = group.getMetricIdentifier(metricName);
// Result: "My Operator.Records Processed Per Second"  

String filteredId = group.getMetricIdentifier(metricName, combined);
// Result: "my-operator.records-processed-per-second"

// Custom filters for different backends
public class PrometheusCharacterFilter implements CharacterFilter {
    @Override
    public String filterCharacters(String input) {
        // Prometheus naming conventions
        return input.toLowerCase()
            .replaceAll("[^a-z0-9_]", "_")  // Replace invalid chars with underscores
            .replaceAll("_{2,}", "_")       // Collapse multiple underscores
            .replaceAll("^_|_$", "");       // Remove leading/trailing underscores
    }
}

public class GraphiteCharacterFilter implements CharacterFilter {
    @Override
    public String filterCharacters(String input) {
        // Graphite naming conventions  
        return input.replace(' ', '_')
            .replace(':', '_')
            .replaceAll("[^a-zA-Z0-9._-]", "");
    }
}

// Use in reporters
public class GraphiteReporter implements MetricReporter {
    private final CharacterFilter filter = new GraphiteCharacterFilter();
    
    @Override
    public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
        String graphiteMetricName = group.getMetricIdentifier(metricName, filter);
        registerWithGraphite(graphiteMetricName, metric);
    }
}

View Interface

Interface for metrics that require periodic background updates, enabling time-windowed calculations and derived metrics.

/**
 * An interface for metrics which should be updated in regular intervals 
 * by a background thread.
 */
public interface View {
    /** The interval in which metrics are updated (5 seconds). */
    int UPDATE_INTERVAL_SECONDS = 5;
    
    /** This method will be called regularly to update the metric. */
    void update();
}

Usage Examples:

// Custom view implementation for rate calculation
public class CustomRateGauge implements Gauge<Double>, View {
    private final AtomicLong counter = new AtomicLong(0);
    private volatile long lastCount = 0;
    private volatile double currentRate = 0.0;
    
    public void increment() {
        counter.incrementAndGet();
    }
    
    @Override
    public Double getValue() {
        return currentRate;
    }
    
    @Override
    public void update() {
        long currentCount = counter.get();
        long deltaCount = currentCount - lastCount;
        
        // Calculate rate per second over the update interval
        currentRate = (double) deltaCount / UPDATE_INTERVAL_SECONDS;
        
        lastCount = currentCount;
    }
}

// Moving average implementation
public class MovingAverageView implements Gauge<Double>, View {
    private final Queue<Double> samples = new LinkedList<>();
    private final int windowSize;
    private volatile double currentAverage = 0.0;
    
    public MovingAverageView(int windowSize) {
        this.windowSize = windowSize;
    }
    
    public synchronized void addSample(double value) {
        samples.offer(value);
        if (samples.size() > windowSize) {
            samples.poll();
        }
    }
    
    @Override
    public Double getValue() {
        return currentAverage;
    }
    
    @Override
    public synchronized void update() {
        if (!samples.isEmpty()) {
            currentAverage = samples.stream()
                .mapToDouble(Double::doubleValue)
                .average()
                .orElse(0.0);
        }
    }
}

// System resource monitoring
public class SystemResourceView implements Gauge<Map<String, Object>>, View {
    private volatile Map<String, Object> resourceInfo = new HashMap<>();
    
    @Override
    public Map<String, Object> getValue() {
        return new HashMap<>(resourceInfo);
    }
    
    @Override
    public void update() {
        Runtime runtime = Runtime.getRuntime();
        Map<String, Object> newInfo = new HashMap<>();
        
        // Memory information
        long totalMemory = runtime.totalMemory();
        long freeMemory = runtime.freeMemory();
        long usedMemory = totalMemory - freeMemory;
        
        newInfo.put("memory.total", totalMemory);
        newInfo.put("memory.free", freeMemory);
        newInfo.put("memory.used", usedMemory);
        newInfo.put("memory.usage.ratio", (double) usedMemory / totalMemory);
        
        // CPU information (simplified)
        newInfo.put("processors", runtime.availableProcessors());
        
        // Update atomically
        resourceInfo = newInfo;
    }
}

// Register views for automatic updates
CustomRateGauge processingRate = new CustomRateGauge();
metricGroup.gauge("processing-rate", processingRate);
// Flink will automatically call update() every 5 seconds

MovingAverageView avgLatency = new MovingAverageView(20);
metricGroup.gauge("average-latency", avgLatency);
// Background thread updates the moving average

SystemResourceView systemMetrics = new SystemResourceView();
metricGroup.gauge("system-resources", systemMetrics);
// Periodically updates system resource information

MetricType Enumeration

Enumeration defining the standard metric types supported by the Flink metrics system.

/**
 * Enum describing the different metric types.
 */
public enum MetricType {
    COUNTER,
    METER,
    GAUGE,
    HISTOGRAM
}

Usage Examples:

// Type checking and handling
public void handleMetric(Metric metric) {
    MetricType type = metric.getMetricType();
    
    switch (type) {
        case COUNTER:
            Counter counter = (Counter) metric;
            System.out.println("Counter value: " + counter.getCount());
            break;
            
        case GAUGE:
            Gauge<?> gauge = (Gauge<?>) metric;
            System.out.println("Gauge value: " + gauge.getValue());
            break;
            
        case METER:
            Meter meter = (Meter) metric;
            System.out.println("Meter rate: " + meter.getRate() + " events/sec");
            System.out.println("Meter count: " + meter.getCount());
            break;
            
        case HISTOGRAM:
            Histogram histogram = (Histogram) metric;
            HistogramStatistics stats = histogram.getStatistics();
            System.out.println("Histogram count: " + histogram.getCount());
            System.out.println("Histogram mean: " + stats.getMean());
            break;
    }
}

// Metric type-specific processing in reporters
public class TypeAwareReporter implements MetricReporter {
    private final Map<MetricType, MetricHandler> handlers = new EnumMap<>(MetricType.class);
    
    public TypeAwareReporter() {
        handlers.put(MetricType.COUNTER, this::handleCounter);
        handlers.put(MetricType.GAUGE, this::handleGauge);
        handlers.put(MetricType.METER, this::handleMeter);
        handlers.put(MetricType.HISTOGRAM, this::handleHistogram);
    }
    
    @Override
    public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
        MetricType type = metric.getMetricType();
        MetricHandler handler = handlers.get(type);
        
        if (handler != null) {
            String identifier = group.getMetricIdentifier(metricName);
            handler.handle(identifier, metric);
        }
    }
    
    private void handleCounter(String name, Metric metric) {
        Counter counter = (Counter) metric;
        // Register counter with monitoring system
    }
    
    private void handleGauge(String name, Metric metric) {
        Gauge<?> gauge = (Gauge<?>) metric;
        // Register gauge with monitoring system
    }
    
    private void handleMeter(String name, Metric metric) {
        Meter meter = (Meter) metric;
        // Register meter with monitoring system, possibly as multiple metrics
    }
    
    private void handleHistogram(String name, Metric metric) {
        Histogram histogram = (Histogram) metric;
        // Register histogram with monitoring system, possibly as multiple metrics
    }
    
    @FunctionalInterface
    private interface MetricHandler {
        void handle(String name, Metric metric);
    }
}

// Type validation
public class MetricValidator {
    public static void validateMetricType(Metric metric, MetricType expectedType) {
        MetricType actualType = metric.getMetricType();
        if (actualType != expectedType) {
            throw new IllegalArgumentException(
                String.format("Expected metric type %s but got %s", expectedType, actualType));
        }
    }
    
    public static boolean isCounterType(Metric metric) {
        return metric.getMetricType() == MetricType.COUNTER;
    }
    
    public static boolean isNumericType(Metric metric) {
        MetricType type = metric.getMetricType();
        return type == MetricType.COUNTER || type == MetricType.METER || type == MetricType.HISTOGRAM;
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-metrics-core

docs

configuration.md

core-metrics.md

implementations.md

index.md

metric-groups.md

reporters.md

specialized-groups.md

tracing.md

tile.json