CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-metrics-core

Core metrics interfaces and implementations for Apache Flink stream processing framework

Pending
Overview
Eval results
Files

reporters.mddocs/

Reporter Framework

Pluggable system for exporting metrics to external monitoring systems. The reporter framework enables integration with various monitoring backends through configurable metric exporters that can operate in both push and pull patterns.

Capabilities

MetricReporter Interface

Core interface for implementing metric reporters that export metrics to external systems. Provides lifecycle management and metric notification callbacks.

/**
 * Metric reporters are used to export Metrics to an external backend.
 * Metric reporters are instantiated via a MetricReporterFactory.
 */
public interface MetricReporter {
    
    /**
     * Configures this reporter. If the reporter was instantiated generically 
     * and hence parameter-less, this method is the place where the reporter 
     * sets its basic fields based on configuration values.
     * This method is always called first on a newly instantiated reporter.
     * @param config A properties object that contains all parameters set for this reporter
     */
    void open(MetricConfig config);
    
    /**
     * Closes this reporter. Should be used to close channels, streams and release resources.
     */
    void close();
    
    /**
     * Called when a new Metric was added.
     * @param metric the metric that was added
     * @param metricName the name of the metric
     * @param group the group that contains the metric
     */
    void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
    
    /**
     * Called when a Metric was removed.
     * @param metric the metric that should be removed
     * @param metricName the name of the metric
     * @param group the group that contains the metric
     */
    void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
}

Usage Examples:

// Custom reporter implementation
public class ConsoleMetricReporter implements MetricReporter {
    private boolean isOpen = false;
    
    @Override
    public void open(MetricConfig config) {
        this.isOpen = true;
        System.out.println("Console reporter started");
    }
    
    @Override
    public void close() {
        this.isOpen = false;
        System.out.println("Console reporter stopped");
    }
    
    @Override
    public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
        String identifier = group.getMetricIdentifier(metricName);
        System.out.println("Added metric: " + identifier + " of type " + metric.getMetricType());
    }
    
    @Override
    public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
        String identifier = group.getMetricIdentifier(metricName);
        System.out.println("Removed metric: " + identifier);
    }
}

// Handle different metric types in reporter
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
    String identifier = group.getMetricIdentifier(metricName);
    
    switch (metric.getMetricType()) {
        case COUNTER:
            Counter counter = (Counter) metric;
            registerCounter(identifier, counter);
            break;
        case GAUGE:
            Gauge<?> gauge = (Gauge<?>) metric;
            registerGauge(identifier, gauge);
            break;
        case METER:
            Meter meter = (Meter) metric;
            registerMeter(identifier, meter);
            break;
        case HISTOGRAM:
            Histogram histogram = (Histogram) metric;
            registerHistogram(identifier, histogram);
            break;
    }
}

MetricReporterFactory Interface

Factory interface for creating metric reporters, enabling plugin-based reporter loading and configuration.

/**
 * MetricReporter factory. Metric reporters that can be instantiated with 
 * a factory automatically qualify for being loaded as a plugin, so long as 
 * the reporter jar is self-contained and contains a 
 * META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory 
 * file containing the qualified class name of the factory.
 */
public interface MetricReporterFactory {
    /**
     * Creates a new metric reporter.
     * @param properties configured properties for the reporter
     * @return created metric reporter
     */
    MetricReporter createMetricReporter(Properties properties);
}

Usage Examples:

// Factory implementation
public class ConsoleReporterFactory implements MetricReporterFactory {
    @Override
    public MetricReporter createMetricReporter(Properties properties) {
        return new ConsoleMetricReporter();
    }
}

// Factory with configuration
public class ConfigurableReporterFactory implements MetricReporterFactory {
    @Override
    public MetricReporter createMetricReporter(Properties properties) {
        String endpoint = properties.getProperty("endpoint", "localhost:8080");
        int interval = Integer.parseInt(properties.getProperty("interval", "60"));
        
        ConfigurableReporter reporter = new ConfigurableReporter();
        reporter.setEndpoint(endpoint);
        reporter.setReportInterval(interval);
        
        return reporter;
    }
}

// Plugin registration (in META-INF/services file)
// META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory
// com.example.ConsoleReporterFactory
// com.example.ConfigurableReporterFactory

Scheduled Reporter Interface

Interface for reporters that actively send out data periodically rather than only responding to metric addition/removal events.

/**
 * Interface for reporters that actively send out data periodically.
 */
public interface Scheduled {
    /**
     * Report the current measurements. This method is called periodically 
     * by the metrics registry that uses the reporter. This method must not 
     * block for a significant amount of time, any reporter needing more time 
     * should instead run the operation asynchronously.
     */
    void report();
}

Usage Examples:

// Scheduled reporter implementation
public class PeriodicReporter implements MetricReporter, Scheduled {
    private final Map<String, Metric> registeredMetrics = new ConcurrentHashMap<>();
    private final HttpClient httpClient = HttpClient.newHttpClient();
    private String endpoint;
    
    @Override
    public void open(MetricConfig config) {
        this.endpoint = config.getString("endpoint", "http://localhost:8080/metrics");
    }
    
    @Override
    public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
        String identifier = group.getMetricIdentifier(metricName);
        registeredMetrics.put(identifier, metric);
    }
    
    @Override
    public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
        String identifier = group.getMetricIdentifier(metricName);
        registeredMetrics.remove(identifier);
    }
    
    // Called periodically by Flink
    @Override
    public void report() {
        Map<String, Object> metrics = new HashMap<>();
        
        for (Map.Entry<String, Metric> entry : registeredMetrics.entrySet()) {
            String name = entry.getKey();
            Metric metric = entry.getValue();
            
            Object value = extractValue(metric);
            if (value != null) {
                metrics.put(name, value);
            }
        }
        
        // Send metrics asynchronously to avoid blocking
        CompletableFuture.runAsync(() -> sendMetrics(metrics));
    }
    
    private Object extractValue(Metric metric) {
        switch (metric.getMetricType()) {
            case COUNTER:
                return ((Counter) metric).getCount();
            case GAUGE:
                return ((Gauge<?>) metric).getValue();
            case METER:
                Meter meter = (Meter) metric;
                Map<String, Object> meterData = new HashMap<>();
                meterData.put("rate", meter.getRate());
                meterData.put("count", meter.getCount());
                return meterData;
            case HISTOGRAM:
                Histogram histogram = (Histogram) metric;
                HistogramStatistics stats = histogram.getStatistics();
                Map<String, Object> histogramData = new HashMap<>();
                histogramData.put("count", histogram.getCount());
                histogramData.put("mean", stats.getMean());
                histogramData.put("p95", stats.getQuantile(0.95));
                histogramData.put("max", stats.getMax());
                histogramData.put("min", stats.getMin());
                return histogramData;
            default:
                return null;
        }
    }
    
    @Override
    public void close() {
        // Clean up resources
    }
}

AbstractReporter Base Class

Base implementation providing common functionality for metric reporters.

/**
 * Base implementation for metric reporters.
 */
public abstract class AbstractReporter implements MetricReporter {
    // Provides common functionality and convenience methods for reporter implementations
}

Reporter Annotations

Annotations for controlling reporter instantiation behavior.

/**
 * Marker annotation for factory-instantiated reporters.
 */
public @interface InstantiateViaFactory {
}

/**
 * Marker annotation for reflection-based instantiation.
 */
public @interface InterceptInstantiationViaReflection {
}

Usage Examples:

// Factory-based reporter
@InstantiateViaFactory
public class MyFactoryReporter implements MetricReporter {
    // Will be instantiated via MetricReporterFactory
}

// Reflection-based reporter
@InterceptInstantiationViaReflection  
public class MyReflectionReporter implements MetricReporter {
    // Will be instantiated via reflection with default constructor
}

Reporter Configuration Patterns

Common patterns for configuring reporters using MetricConfig.

Configuration Handling:

public class DatabaseReporter implements MetricReporter {
    private String jdbcUrl;
    private String username;
    private String password;
    private int batchSize;
    private long flushInterval;
    
    @Override
    public void open(MetricConfig config) {
        // Required configuration
        this.jdbcUrl = config.getString("jdbc.url", null);
        if (jdbcUrl == null) {
            throw new IllegalArgumentException("jdbc.url is required");
        }
        
        // Optional configuration with defaults
        this.username = config.getString("jdbc.username", "metrics");
        this.password = config.getString("jdbc.password", "");
        this.batchSize = config.getInteger("batch.size", 100);
        this.flushInterval = config.getLong("flush.interval", 30000); // 30 seconds
        
        // Initialize database connection
        initializeConnection();
    }
}

Error Handling in Reporters:

public class RobustReporter implements MetricReporter, Scheduled {
    private final AtomicBoolean isHealthy = new AtomicBoolean(true);
    
    @Override
    public void report() {
        if (!isHealthy.get()) {
            return; // Skip reporting if unhealthy
        }
        
        try {
            doReport();
        } catch (Exception e) {
            isHealthy.set(false);
            scheduleHealthCheck();
            log.warn("Reporter became unhealthy", e);
        }
    }
    
    private void scheduleHealthCheck() {
        CompletableFuture.delayedExecutor(30, TimeUnit.SECONDS)
            .execute(() -> {
                try {
                    if (checkHealth()) {
                        isHealthy.set(true);
                        log.info("Reporter recovered");
                    }
                } catch (Exception e) {
                    // Will retry on next scheduled check
                }
            });
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-metrics-core

docs

configuration.md

core-metrics.md

implementations.md

index.md

metric-groups.md

reporters.md

specialized-groups.md

tracing.md

tile.json