Core metrics interfaces and implementations for Apache Flink stream processing framework
—
Configuration management and utility classes for metric system setup. Provides type-safe property access, character filtering for metric names, and supporting interfaces for metric system operation.
Type-safe configuration class extending Properties with utility methods for extracting primitive values with defaults.
/**
* A properties class with added utility method to extract primitives.
*/
public class MetricConfig extends Properties {
/**
* Gets string property with default value.
* @param key the property key
* @param defaultValue default value if key not found
* @return property value or default
*/
public String getString(String key, String defaultValue) { /* implementation */ }
/**
* Gets integer property with default value.
* @param key the property key
* @param defaultValue default value if key not found
* @return property value parsed as int or default
*/
public int getInteger(String key, int defaultValue) { /* implementation */ }
/**
* Gets long property with default value.
* @param key the property key
* @param defaultValue default value if key not found
* @return property value parsed as long or default
*/
public long getLong(String key, long defaultValue) { /* implementation */ }
/**
* Gets float property with default value.
* @param key the property key
* @param defaultValue default value if key not found
* @return property value parsed as float or default
*/
public float getFloat(String key, float defaultValue) { /* implementation */ }
/**
* Gets double property with default value.
* @param key the property key
* @param defaultValue default value if key not found
* @return property value parsed as double or default
*/
public double getDouble(String key, double defaultValue) { /* implementation */ }
/**
* Gets boolean property with default value.
* @param key the property key
* @param defaultValue default value if key not found
* @return property value parsed as boolean or default
*/
public boolean getBoolean(String key, boolean defaultValue) { /* implementation */ }
}Usage Examples:
// Creating and using MetricConfig
MetricConfig config = new MetricConfig();
// Set properties (inherited from Properties)
config.setProperty("reporter.host", "metrics.example.com");
config.setProperty("reporter.port", "8080");
config.setProperty("reporter.enabled", "true");
config.setProperty("reporter.batch.size", "100");
config.setProperty("reporter.timeout", "30.5");
// Type-safe property retrieval with defaults
String host = config.getString("reporter.host", "localhost");
int port = config.getInteger("reporter.port", 9090);
boolean enabled = config.getBoolean("reporter.enabled", false);
int batchSize = config.getInteger("reporter.batch.size", 50);
double timeout = config.getDouble("reporter.timeout", 10.0);
// Missing keys return defaults
String missingKey = config.getString("non.existent", "default-value");
int missingInt = config.getInteger("missing.int", 42);
// Use in reporter configuration
public class ConfigurableReporter implements MetricReporter {
private String endpoint;
private int batchSize;
private long flushInterval;
private boolean compressionEnabled;
@Override
public void open(MetricConfig config) {
// Required configuration
this.endpoint = config.getString("endpoint", null);
if (endpoint == null) {
throw new IllegalArgumentException("endpoint is required");
}
// Optional configuration with sensible defaults
this.batchSize = config.getInteger("batch.size", 100);
this.flushInterval = config.getLong("flush.interval", 5000L);
this.compressionEnabled = config.getBoolean("compression.enabled", true);
// Validation
if (batchSize <= 0) {
throw new IllegalArgumentException("batch.size must be positive");
}
initializeReporter();
}
}Function interface for filtering and transforming strings, commonly used for metric name normalization across different backends.
/**
* Interface for a character filter function. The filter function is given
* a string which the filter can transform. The returned string is the
* transformation result.
*/
public interface CharacterFilter {
/** No-operation filter that returns input unchanged. */
CharacterFilter NO_OP_FILTER = input -> input;
/**
* Filter the given string and generate a resulting string from it.
* For example, one implementation could filter out invalid characters
* from the input string.
* @param input Input string
* @return Filtered result string
*/
String filterCharacters(String input);
}Usage Examples:
// Common character filters
CharacterFilter noOp = CharacterFilter.NO_OP_FILTER;
CharacterFilter dotToUnderscore = input -> input.replace('.', '_');
CharacterFilter spacesToDashes = input -> input.replace(' ', '-');
CharacterFilter alphanumericOnly = input -> input.replaceAll("[^a-zA-Z0-9]", "");
// Combining filters
CharacterFilter combined = input -> {
String result = input.toLowerCase(); // lowercase
result = result.replace(' ', '-'); // spaces to dashes
result = result.replaceAll("[^a-z0-9-]", ""); // alphanumeric + dashes only
return result;
};
// Usage with metric identifiers
MetricGroup group = getRootGroup().addGroup("My Operator");
String metricName = "Records Processed Per Second";
String defaultId = group.getMetricIdentifier(metricName);
// Result: "My Operator.Records Processed Per Second"
String filteredId = group.getMetricIdentifier(metricName, combined);
// Result: "my-operator.records-processed-per-second"
// Custom filters for different backends
public class PrometheusCharacterFilter implements CharacterFilter {
@Override
public String filterCharacters(String input) {
// Prometheus naming conventions
return input.toLowerCase()
.replaceAll("[^a-z0-9_]", "_") // Replace invalid chars with underscores
.replaceAll("_{2,}", "_") // Collapse multiple underscores
.replaceAll("^_|_$", ""); // Remove leading/trailing underscores
}
}
public class GraphiteCharacterFilter implements CharacterFilter {
@Override
public String filterCharacters(String input) {
// Graphite naming conventions
return input.replace(' ', '_')
.replace(':', '_')
.replaceAll("[^a-zA-Z0-9._-]", "");
}
}
// Use in reporters
public class GraphiteReporter implements MetricReporter {
private final CharacterFilter filter = new GraphiteCharacterFilter();
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
String graphiteMetricName = group.getMetricIdentifier(metricName, filter);
registerWithGraphite(graphiteMetricName, metric);
}
}Interface for metrics that require periodic background updates, enabling time-windowed calculations and derived metrics.
/**
* An interface for metrics which should be updated in regular intervals
* by a background thread.
*/
public interface View {
/** The interval in which metrics are updated (5 seconds). */
int UPDATE_INTERVAL_SECONDS = 5;
/** This method will be called regularly to update the metric. */
void update();
}Usage Examples:
// Custom view implementation for rate calculation
public class CustomRateGauge implements Gauge<Double>, View {
private final AtomicLong counter = new AtomicLong(0);
private volatile long lastCount = 0;
private volatile double currentRate = 0.0;
public void increment() {
counter.incrementAndGet();
}
@Override
public Double getValue() {
return currentRate;
}
@Override
public void update() {
long currentCount = counter.get();
long deltaCount = currentCount - lastCount;
// Calculate rate per second over the update interval
currentRate = (double) deltaCount / UPDATE_INTERVAL_SECONDS;
lastCount = currentCount;
}
}
// Moving average implementation
public class MovingAverageView implements Gauge<Double>, View {
private final Queue<Double> samples = new LinkedList<>();
private final int windowSize;
private volatile double currentAverage = 0.0;
public MovingAverageView(int windowSize) {
this.windowSize = windowSize;
}
public synchronized void addSample(double value) {
samples.offer(value);
if (samples.size() > windowSize) {
samples.poll();
}
}
@Override
public Double getValue() {
return currentAverage;
}
@Override
public synchronized void update() {
if (!samples.isEmpty()) {
currentAverage = samples.stream()
.mapToDouble(Double::doubleValue)
.average()
.orElse(0.0);
}
}
}
// System resource monitoring
public class SystemResourceView implements Gauge<Map<String, Object>>, View {
private volatile Map<String, Object> resourceInfo = new HashMap<>();
@Override
public Map<String, Object> getValue() {
return new HashMap<>(resourceInfo);
}
@Override
public void update() {
Runtime runtime = Runtime.getRuntime();
Map<String, Object> newInfo = new HashMap<>();
// Memory information
long totalMemory = runtime.totalMemory();
long freeMemory = runtime.freeMemory();
long usedMemory = totalMemory - freeMemory;
newInfo.put("memory.total", totalMemory);
newInfo.put("memory.free", freeMemory);
newInfo.put("memory.used", usedMemory);
newInfo.put("memory.usage.ratio", (double) usedMemory / totalMemory);
// CPU information (simplified)
newInfo.put("processors", runtime.availableProcessors());
// Update atomically
resourceInfo = newInfo;
}
}
// Register views for automatic updates
CustomRateGauge processingRate = new CustomRateGauge();
metricGroup.gauge("processing-rate", processingRate);
// Flink will automatically call update() every 5 seconds
MovingAverageView avgLatency = new MovingAverageView(20);
metricGroup.gauge("average-latency", avgLatency);
// Background thread updates the moving average
SystemResourceView systemMetrics = new SystemResourceView();
metricGroup.gauge("system-resources", systemMetrics);
// Periodically updates system resource informationEnumeration defining the standard metric types supported by the Flink metrics system.
/**
* Enum describing the different metric types.
*/
public enum MetricType {
COUNTER,
METER,
GAUGE,
HISTOGRAM
}Usage Examples:
// Type checking and handling
public void handleMetric(Metric metric) {
MetricType type = metric.getMetricType();
switch (type) {
case COUNTER:
Counter counter = (Counter) metric;
System.out.println("Counter value: " + counter.getCount());
break;
case GAUGE:
Gauge<?> gauge = (Gauge<?>) metric;
System.out.println("Gauge value: " + gauge.getValue());
break;
case METER:
Meter meter = (Meter) metric;
System.out.println("Meter rate: " + meter.getRate() + " events/sec");
System.out.println("Meter count: " + meter.getCount());
break;
case HISTOGRAM:
Histogram histogram = (Histogram) metric;
HistogramStatistics stats = histogram.getStatistics();
System.out.println("Histogram count: " + histogram.getCount());
System.out.println("Histogram mean: " + stats.getMean());
break;
}
}
// Metric type-specific processing in reporters
public class TypeAwareReporter implements MetricReporter {
private final Map<MetricType, MetricHandler> handlers = new EnumMap<>(MetricType.class);
public TypeAwareReporter() {
handlers.put(MetricType.COUNTER, this::handleCounter);
handlers.put(MetricType.GAUGE, this::handleGauge);
handlers.put(MetricType.METER, this::handleMeter);
handlers.put(MetricType.HISTOGRAM, this::handleHistogram);
}
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
MetricType type = metric.getMetricType();
MetricHandler handler = handlers.get(type);
if (handler != null) {
String identifier = group.getMetricIdentifier(metricName);
handler.handle(identifier, metric);
}
}
private void handleCounter(String name, Metric metric) {
Counter counter = (Counter) metric;
// Register counter with monitoring system
}
private void handleGauge(String name, Metric metric) {
Gauge<?> gauge = (Gauge<?>) metric;
// Register gauge with monitoring system
}
private void handleMeter(String name, Metric metric) {
Meter meter = (Meter) metric;
// Register meter with monitoring system, possibly as multiple metrics
}
private void handleHistogram(String name, Metric metric) {
Histogram histogram = (Histogram) metric;
// Register histogram with monitoring system, possibly as multiple metrics
}
@FunctionalInterface
private interface MetricHandler {
void handle(String name, Metric metric);
}
}
// Type validation
public class MetricValidator {
public static void validateMetricType(Metric metric, MetricType expectedType) {
MetricType actualType = metric.getMetricType();
if (actualType != expectedType) {
throw new IllegalArgumentException(
String.format("Expected metric type %s but got %s", expectedType, actualType));
}
}
public static boolean isCounterType(Metric metric) {
return metric.getMetricType() == MetricType.COUNTER;
}
public static boolean isNumericType(Metric metric) {
MetricType type = metric.getMetricType();
return type == MetricType.COUNTER || type == MetricType.METER || type == MetricType.HISTOGRAM;
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-metrics-core