Comprehensive utilities for testing metrics reporting in Flink applications, including metric collection, validation, and assertion utilities using Hamcrest matchers.
MetricListener captures and stores metrics registered under a root metric group for retrieval and testing.
/**
* Listens for metric and group registration under a root metric group and stores them for retrieval
*/
public class MetricListener {
/**
* Delimiter for metric identifiers
*/
public static final String DELIMITER = ".";
/**
* Name of root metric group
*/
public static final String ROOT_METRIC_GROUP_NAME = "rootMetricGroup";
/**
* Creates new metric listener with testing registry
*/
public MetricListener();
/**
* Returns the root metric group for registering metrics
* @return MetricGroup for metric registration
*/
public MetricGroup getMetricGroup();
/**
* Gets registered metric by type and identifier
* @param metricType Class of the metric type to retrieve
* @param identifier Metric identifier components
* @return Optional containing the metric if found
*/
public <T extends Metric> Optional<T> getMetric(Class<T> metricType, String... identifier);
/**
* Gets registered Meter metric
* @param identifier Metric identifier components
* @return Optional containing the Meter if found
*/
public Optional<Meter> getMeter(String... identifier);
/**
* Gets registered Counter metric
* @param identifier Metric identifier components
* @return Optional containing the Counter if found
*/
public Optional<Counter> getCounter(String... identifier);
/**
* Gets registered Histogram metric
* @param identifier Metric identifier components
* @return Optional containing the Histogram if found
*/
public Optional<Histogram> getHistogram(String... identifier);
/**
* Gets registered Gauge metric
* @param identifier Metric identifier components
* @return Optional containing the Gauge if found
*/
public <T> Optional<Gauge<T>> getGauge(String... identifier);
}Usage Example:
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
@Test
public void testMetricsReporting() throws Exception {
// Create metric listener
MetricListener metricListener = new MetricListener();
MetricGroup metricGroup = metricListener.getMetricGroup();
// Register a counter metric
Counter myCounter = metricGroup.counter("test", "myCounter");
myCounter.inc(5);
// Retrieve and verify the metric
Optional<Counter> retrievedCounter = metricListener.getCounter("test", "myCounter");
assertTrue(retrievedCounter.isPresent());
assertEquals(5L, retrievedCounter.get().getCount());
// Test with subgroups
MetricGroup subGroup = metricGroup.addGroup("subgroup");
Counter subCounter = subGroup.counter("subCounter");
subCounter.inc(10);
Optional<Counter> retrievedSubCounter = metricListener.getCounter("subgroup", "subCounter");
assertTrue(retrievedSubCounter.isPresent());
assertEquals(10L, retrievedSubCounter.get().getCount());
}MetricMatchers provides Hamcrest matchers for metric assertions in tests.
/**
* Provides Hamcrest matchers for metric assertions in tests
*/
public class MetricMatchers {
/**
* Creates matcher for Gauge metrics
* @param valueMatcher Matcher for the gauge value
* @return Matcher that checks if metric is a Gauge with matching value
*/
public static <V, T extends Metric> Matcher<T> isGauge(Matcher<V> valueMatcher);
/**
* Creates matcher for Counter metrics
* @param valueMatcher Matcher for the counter value
* @return Matcher that checks if metric is a Counter with matching value
*/
public static <T extends Metric> Matcher<T> isCounter(Matcher<Long> valueMatcher);
}Usage Example:
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.metrics.testutils.MetricMatchers;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
@Test
public void testMetricMatchers() throws Exception {
MetricListener metricListener = new MetricListener();
MetricGroup metricGroup = metricListener.getMetricGroup();
// Test Counter matcher
Counter counter = metricGroup.counter("testCounter");
counter.inc(42);
Optional<Counter> retrievedCounter = metricListener.getCounter("testCounter");
assertTrue(retrievedCounter.isPresent());
// Use metric matcher to assert counter value
assertThat(retrievedCounter.get(), MetricMatchers.isCounter(equalTo(42L)));
assertThat(retrievedCounter.get(), MetricMatchers.isCounter(greaterThan(40L)));
// Test Gauge matcher
Gauge<Integer> gauge = metricGroup.gauge("testGauge", () -> 100);
Optional<Gauge<Integer>> retrievedGauge = metricListener.getGauge("testGauge");
assertTrue(retrievedGauge.isPresent());
// Use metric matcher to assert gauge value
assertThat(retrievedGauge.get(), MetricMatchers.isGauge(equalTo(100)));
assertThat(retrievedGauge.get(), MetricMatchers.isGauge(greaterThanOrEqualTo(100)));
}Here's a comprehensive example showing how to test metrics in a Flink application:
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.metrics.testutils.MetricMatchers;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.AbstractTestBase;
public class MetricsTestExample extends AbstractTestBase {
@Test
public void testJobMetrics() throws Exception {
MetricListener metricListener = new MetricListener();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Create a function that reports metrics
DataStream<String> input = env.fromElements("a", "b", "c", "d", "e");
DataStream<String> result = input.map(new MetricsReportingMapFunction(metricListener));
result.print();
env.execute("Metrics Test Job");
// Verify metrics were reported
Optional<Counter> processedCounter = metricListener.getCounter("processed_elements");
assertTrue("Counter should be present", processedCounter.isPresent());
assertThat(processedCounter.get(), MetricMatchers.isCounter(equalTo(5L)));
Optional<Gauge<Long>> lastProcessedTime = metricListener.getGauge("last_processed_time");
assertTrue("Gauge should be present", lastProcessedTime.isPresent());
assertThat(lastProcessedTime.get(), MetricMatchers.isGauge(greaterThan(0L)));
}
private static class MetricsReportingMapFunction extends RichMapFunction<String, String> {
private final MetricListener metricListener;
private Counter processedElements;
private volatile long lastProcessedTime = 0;
public MetricsReportingMapFunction(MetricListener metricListener) {
this.metricListener = metricListener;
}
@Override
public void open(Configuration parameters) {
MetricGroup metricGroup = metricListener.getMetricGroup();
processedElements = metricGroup.counter("processed_elements");
metricGroup.gauge("last_processed_time", () -> lastProcessedTime);
}
@Override
public String map(String value) {
processedElements.inc();
lastProcessedTime = System.currentTimeMillis();
return value.toUpperCase();
}
}
}import org.apache.flink.metrics.Histogram;
@Test
public void testHistogramMetrics() throws Exception {
MetricListener metricListener = new MetricListener();
MetricGroup metricGroup = metricListener.getMetricGroup();
// Register histogram
Histogram histogram = metricGroup.histogram("response_time");
// Report some values
histogram.update(100);
histogram.update(200);
histogram.update(150);
// Retrieve and verify
Optional<Histogram> retrievedHistogram = metricListener.getHistogram("response_time");
assertTrue(retrievedHistogram.isPresent());
assertEquals(3, retrievedHistogram.get().getCount());
}import org.apache.flink.metrics.Meter;
@Test
public void testMeterMetrics() throws Exception {
MetricListener metricListener = new MetricListener();
MetricGroup metricGroup = metricListener.getMetricGroup();
// Register meter
Meter meter = metricGroup.meter("requests_per_second");
// Mark some events
meter.markEvent();
meter.markEvent(5);
// Retrieve and verify
Optional<Meter> retrievedMeter = metricListener.getMeter("requests_per_second");
assertTrue(retrievedMeter.isPresent());
assertEquals(6, retrievedMeter.get().getCount());
}