Test utilities library for Apache Flink applications providing streaming environments, test data sources, result collection mechanisms, and metrics testing utilities.
Comprehensive utilities for testing metrics reporting in Flink applications, including metric collection, validation, and assertion utilities using Hamcrest matchers.
MetricListener captures and stores metrics registered under a root metric group for retrieval and testing.
/**
* Listens for metric and group registration under a root metric group and stores them for retrieval
*/
public class MetricListener {
/**
* Delimiter for metric identifiers
*/
public static final String DELIMITER = ".";
/**
* Name of root metric group
*/
public static final String ROOT_METRIC_GROUP_NAME = "rootMetricGroup";
/**
* Creates new metric listener with testing registry
*/
public MetricListener();
/**
* Returns the root metric group for registering metrics
* @return MetricGroup for metric registration
*/
public MetricGroup getMetricGroup();
/**
* Gets registered metric by type and identifier
* @param metricType Class of the metric type to retrieve
* @param identifier Metric identifier components
* @return Optional containing the metric if found
*/
public <T extends Metric> Optional<T> getMetric(Class<T> metricType, String... identifier);
/**
* Gets registered Meter metric
* @param identifier Metric identifier components
* @return Optional containing the Meter if found
*/
public Optional<Meter> getMeter(String... identifier);
/**
* Gets registered Counter metric
* @param identifier Metric identifier components
* @return Optional containing the Counter if found
*/
public Optional<Counter> getCounter(String... identifier);
/**
* Gets registered Histogram metric
* @param identifier Metric identifier components
* @return Optional containing the Histogram if found
*/
public Optional<Histogram> getHistogram(String... identifier);
/**
* Gets registered Gauge metric
* @param identifier Metric identifier components
* @return Optional containing the Gauge if found
*/
public <T> Optional<Gauge<T>> getGauge(String... identifier);
}Usage Example:
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
@Test
public void testMetricsReporting() throws Exception {
// Create metric listener
MetricListener metricListener = new MetricListener();
MetricGroup metricGroup = metricListener.getMetricGroup();
// Register a counter metric
Counter myCounter = metricGroup.counter("test", "myCounter");
myCounter.inc(5);
// Retrieve and verify the metric
Optional<Counter> retrievedCounter = metricListener.getCounter("test", "myCounter");
assertTrue(retrievedCounter.isPresent());
assertEquals(5L, retrievedCounter.get().getCount());
// Test with subgroups
MetricGroup subGroup = metricGroup.addGroup("subgroup");
Counter subCounter = subGroup.counter("subCounter");
subCounter.inc(10);
Optional<Counter> retrievedSubCounter = metricListener.getCounter("subgroup", "subCounter");
assertTrue(retrievedSubCounter.isPresent());
assertEquals(10L, retrievedSubCounter.get().getCount());
}MetricMatchers provides Hamcrest matchers for metric assertions in tests.
/**
* Provides Hamcrest matchers for metric assertions in tests
*/
public class MetricMatchers {
/**
* Creates matcher for Gauge metrics
* @param valueMatcher Matcher for the gauge value
* @return Matcher that checks if metric is a Gauge with matching value
*/
public static <V, T extends Metric> Matcher<T> isGauge(Matcher<V> valueMatcher);
/**
* Creates matcher for Counter metrics
* @param valueMatcher Matcher for the counter value
* @return Matcher that checks if metric is a Counter with matching value
*/
public static <T extends Metric> Matcher<T> isCounter(Matcher<Long> valueMatcher);
}Usage Example:
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.metrics.testutils.MetricMatchers;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
@Test
public void testMetricMatchers() throws Exception {
MetricListener metricListener = new MetricListener();
MetricGroup metricGroup = metricListener.getMetricGroup();
// Test Counter matcher
Counter counter = metricGroup.counter("testCounter");
counter.inc(42);
Optional<Counter> retrievedCounter = metricListener.getCounter("testCounter");
assertTrue(retrievedCounter.isPresent());
// Use metric matcher to assert counter value
assertThat(retrievedCounter.get(), MetricMatchers.isCounter(equalTo(42L)));
assertThat(retrievedCounter.get(), MetricMatchers.isCounter(greaterThan(40L)));
// Test Gauge matcher
Gauge<Integer> gauge = metricGroup.gauge("testGauge", () -> 100);
Optional<Gauge<Integer>> retrievedGauge = metricListener.getGauge("testGauge");
assertTrue(retrievedGauge.isPresent());
// Use metric matcher to assert gauge value
assertThat(retrievedGauge.get(), MetricMatchers.isGauge(equalTo(100)));
assertThat(retrievedGauge.get(), MetricMatchers.isGauge(greaterThanOrEqualTo(100)));
}Here's a comprehensive example showing how to test metrics in a Flink application:
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.metrics.testutils.MetricMatchers;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.AbstractTestBase;
public class MetricsTestExample extends AbstractTestBase {
@Test
public void testJobMetrics() throws Exception {
MetricListener metricListener = new MetricListener();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Create a function that reports metrics
DataStream<String> input = env.fromElements("a", "b", "c", "d", "e");
DataStream<String> result = input.map(new MetricsReportingMapFunction(metricListener));
result.print();
env.execute("Metrics Test Job");
// Verify metrics were reported
Optional<Counter> processedCounter = metricListener.getCounter("processed_elements");
assertTrue("Counter should be present", processedCounter.isPresent());
assertThat(processedCounter.get(), MetricMatchers.isCounter(equalTo(5L)));
Optional<Gauge<Long>> lastProcessedTime = metricListener.getGauge("last_processed_time");
assertTrue("Gauge should be present", lastProcessedTime.isPresent());
assertThat(lastProcessedTime.get(), MetricMatchers.isGauge(greaterThan(0L)));
}
private static class MetricsReportingMapFunction extends RichMapFunction<String, String> {
private final MetricListener metricListener;
private Counter processedElements;
private volatile long lastProcessedTime = 0;
public MetricsReportingMapFunction(MetricListener metricListener) {
this.metricListener = metricListener;
}
@Override
public void open(Configuration parameters) {
MetricGroup metricGroup = metricListener.getMetricGroup();
processedElements = metricGroup.counter("processed_elements");
metricGroup.gauge("last_processed_time", () -> lastProcessedTime);
}
@Override
public String map(String value) {
processedElements.inc();
lastProcessedTime = System.currentTimeMillis();
return value.toUpperCase();
}
}
}import org.apache.flink.metrics.Histogram;
@Test
public void testHistogramMetrics() throws Exception {
MetricListener metricListener = new MetricListener();
MetricGroup metricGroup = metricListener.getMetricGroup();
// Register histogram
Histogram histogram = metricGroup.histogram("response_time");
// Report some values
histogram.update(100);
histogram.update(200);
histogram.update(150);
// Retrieve and verify
Optional<Histogram> retrievedHistogram = metricListener.getHistogram("response_time");
assertTrue(retrievedHistogram.isPresent());
assertEquals(3, retrievedHistogram.get().getCount());
}import org.apache.flink.metrics.Meter;
@Test
public void testMeterMetrics() throws Exception {
MetricListener metricListener = new MetricListener();
MetricGroup metricGroup = metricListener.getMetricGroup();
// Register meter
Meter meter = metricGroup.meter("requests_per_second");
// Mark some events
meter.markEvent();
meter.markEvent(5);
// Retrieve and verify
Optional<Meter> retrievedMeter = metricListener.getMeter("requests_per_second");
assertTrue(retrievedMeter.isPresent());
assertEquals(6, retrievedMeter.get().getCount());
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-test-utils-2-12