Comprehensive testing utilities for Apache Flink stream and batch processing applications
—
Fluent assertions and metric listeners for validating Flink metrics including counters, gauges, histograms, and meters with easy retrieval and verification capabilities. Enables comprehensive testing of application metrics and monitoring.
AssertJ-style fluent assertions for Flink metrics providing type-safe verification of metric values with readable error messages.
public class MetricAssertions {
public static CounterAssert assertThatCounter(Metric actual);
public static <T> GaugeAssert<T> assertThatGauge(Metric actual);
}Assertions specifically for Counter metrics with value comparison capabilities.
public static class CounterAssert extends AbstractAssert<CounterAssert, Counter> {
public CounterAssert isEqualTo(Object expected);
}Assertions for Gauge metrics supporting both exact value comparison and tolerance-based comparisons for numeric values.
public static class GaugeAssert<T> extends AbstractAssert<GaugeAssert<T>, Gauge<T>> {
public GaugeAssert<T> isEqualTo(Object expected);
public GaugeAssert<T> isCloseTo(long value, long epsilon);
}import org.apache.flink.metrics.testutils.MetricAssertions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
// Test counter metrics
Counter recordsProcessed = getCounterMetric("records_processed");
MetricAssertions.assertThatCounter(recordsProcessed)
.isEqualTo(1000L);
// Test gauge metrics with exact value
Gauge<Double> cpuUsage = getGaugeMetric("cpu_usage");
MetricAssertions.assertThatGauge(cpuUsage)
.isEqualTo(0.75);
// Test gauge metrics with tolerance
Gauge<Long> memoryUsage = getGaugeMetric("memory_usage");
MetricAssertions.assertThatGauge(memoryUsage)
.isCloseTo(1024L, 50L);Comprehensive metric listener that captures metric registration events and provides convenient retrieval methods for all metric types.
public class MetricListener implements MetricReporter {
public MetricListener();
public MetricGroup getMetricGroup();
public <T extends Metric> Optional<T> getMetric(Class<T> metricType, String... identifier);
public Optional<Meter> getMeter(String... identifier);
public Optional<Counter> getCounter(String... identifier);
public Optional<Histogram> getHistogram(String... identifier);
public <T> Optional<Gauge<T>> getGauge(String... identifier);
}import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.metrics.*;
// Create and register metric listener
MetricListener metricListener = new MetricListener();
// Register with Flink's metric system
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." +
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
MetricListener.class.getName());
// Retrieve specific metric types
Optional<Counter> counter = metricListener.getCounter("job", "task", "records_processed");
if (counter.isPresent()) {
long count = counter.get().getCount();
assertEquals(1000L, count);
}
Optional<Gauge<Double>> gauge = metricListener.getGauge("job", "task", "cpu_usage");
if (gauge.isPresent()) {
double usage = gauge.get().getValue();
assertTrue(usage >= 0.0 && usage <= 1.0);
}
Optional<Histogram> histogram = metricListener.getHistogram("job", "task", "latency");
if (histogram.isPresent()) {
long count = histogram.get().getCount();
assertTrue(count > 0);
}
Optional<Meter> meter = metricListener.getMeter("job", "task", "throughput");
if (meter.isPresent()) {
double rate = meter.get().getRate();
assertTrue(rate > 0.0);
}Type-safe generic method for retrieving any metric type with compile-time type checking.
public <T extends Metric> Optional<T> getMetric(Class<T> metricType, String... identifier);// Retrieve custom metric types
Optional<MyCustomMetric> customMetric =
metricListener.getMetric(MyCustomMetric.class, "job", "custom");
// Retrieve standard metrics with type safety
Optional<Counter> typedCounter =
metricListener.getMetric(Counter.class, "records", "processed");Complete example of integrating metrics testing into a Flink test case with metric verification.
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.metrics.testutils.MetricAssertions;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
public class MetricsIntegrationTest {
@RegisterExtension
static final MiniClusterExtension MINI_CLUSTER = new MiniClusterExtension();
@Test
void testJobMetrics() throws Exception {
MetricListener metricListener = new MetricListener();
StreamExecutionEnvironment env = MINI_CLUSTER.getTestStreamEnvironment();
// Configure metric reporting
env.getConfig().setGlobalJobParameters(
ParameterTool.fromMap(Collections.singletonMap("metrics.reporter", "test")));
// Create streaming job
env.fromElements(1, 2, 3, 4, 5)
.map(new CountingMapFunction())
.print();
// Execute job
JobExecutionResult result = env.execute("Metrics Test Job");
// Verify metrics were captured
Optional<Counter> processedRecords =
metricListener.getCounter("job", "map", "records_processed");
assertTrue(processedRecords.isPresent());
MetricAssertions.assertThatCounter(processedRecords.get())
.isEqualTo(5L);
}
}Pattern for testing custom business metrics in Flink applications.
@Test
void testCustomBusinessMetrics() throws Exception {
MetricListener listener = new MetricListener();
// Execute job with custom metrics
executeJobWithCustomMetrics();
// Verify business-specific metrics
Optional<Gauge<Long>> orderCount = listener.getGauge("business", "orders", "total");
Optional<Counter> errorCount = listener.getCounter("business", "errors", "validation");
Optional<Histogram> processingTime = listener.getHistogram("business", "processing", "duration");
assertTrue(orderCount.isPresent());
assertTrue(errorCount.isPresent());
assertTrue(processingTime.isPresent());
// Verify metric values
assertTrue(orderCount.get().getValue() > 0);
assertEquals(0L, errorCount.get().getCount()); // No errors expected
assertTrue(processingTime.get().getCount() > 0);
}Access to the metric group hierarchy for advanced metric organization verification.
@Test
void testMetricGroupStructure() {
MetricListener listener = new MetricListener();
MetricGroup rootGroup = listener.getMetricGroup();
// Verify metric group hierarchy
assertNotNull(rootGroup);
// Access nested metric groups and verify structure
// Implementation depends on your specific metric organization
}The metrics testing utilities provide clear error messages when:
This enables quick identification and resolution of metrics-related test failures, helping ensure your Flink applications properly expose and update their metrics during execution.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-test-utils