or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-collection.mdindex.mdmetrics-testing.mdsecurity-testing.mdtest-data-providers.mdtest-environments.md
tile.json

metrics-testing.mddocs/

Metrics Testing

Comprehensive utilities for testing metrics reporting in Flink applications, including metric collection, validation, and assertion utilities using Hamcrest matchers.

Capabilities

Metric Listener

MetricListener captures and stores metrics registered under a root metric group for retrieval and testing.

/**
 * Listens for metric and group registration under a root metric group and stores them for retrieval
 */
public class MetricListener {
    /**
     * Delimiter for metric identifiers
     */
    public static final String DELIMITER = ".";

    /**
     * Name of root metric group
     */
    public static final String ROOT_METRIC_GROUP_NAME = "rootMetricGroup";

    /**
     * Creates new metric listener with testing registry
     */
    public MetricListener();

    /**
     * Returns the root metric group for registering metrics
     * @return MetricGroup for metric registration
     */
    public MetricGroup getMetricGroup();

    /**
     * Gets registered metric by type and identifier
     * @param metricType Class of the metric type to retrieve
     * @param identifier Metric identifier components
     * @return Optional containing the metric if found
     */
    public <T extends Metric> Optional<T> getMetric(Class<T> metricType, String... identifier);

    /**
     * Gets registered Meter metric
     * @param identifier Metric identifier components
     * @return Optional containing the Meter if found
     */
    public Optional<Meter> getMeter(String... identifier);

    /**
     * Gets registered Counter metric
     * @param identifier Metric identifier components
     * @return Optional containing the Counter if found
     */
    public Optional<Counter> getCounter(String... identifier);

    /**
     * Gets registered Histogram metric
     * @param identifier Metric identifier components
     * @return Optional containing the Histogram if found
     */
    public Optional<Histogram> getHistogram(String... identifier);

    /**
     * Gets registered Gauge metric
     * @param identifier Metric identifier components
     * @return Optional containing the Gauge if found
     */
    public <T> Optional<Gauge<T>> getGauge(String... identifier);
}

Usage Example:

import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;

@Test
public void testMetricsReporting() throws Exception {
    // Create metric listener
    MetricListener metricListener = new MetricListener();
    MetricGroup metricGroup = metricListener.getMetricGroup();
    
    // Register a counter metric
    Counter myCounter = metricGroup.counter("test", "myCounter");
    myCounter.inc(5);
    
    // Retrieve and verify the metric
    Optional<Counter> retrievedCounter = metricListener.getCounter("test", "myCounter");
    assertTrue(retrievedCounter.isPresent());
    assertEquals(5L, retrievedCounter.get().getCount());
    
    // Test with subgroups
    MetricGroup subGroup = metricGroup.addGroup("subgroup");
    Counter subCounter = subGroup.counter("subCounter");
    subCounter.inc(10);
    
    Optional<Counter> retrievedSubCounter = metricListener.getCounter("subgroup", "subCounter");
    assertTrue(retrievedSubCounter.isPresent());
    assertEquals(10L, retrievedSubCounter.get().getCount());
}

Metric Matchers

MetricMatchers provides Hamcrest matchers for metric assertions in tests.

/**
 * Provides Hamcrest matchers for metric assertions in tests
 */
public class MetricMatchers {

    /**
     * Creates matcher for Gauge metrics
     * @param valueMatcher Matcher for the gauge value
     * @return Matcher that checks if metric is a Gauge with matching value
     */
    public static <V, T extends Metric> Matcher<T> isGauge(Matcher<V> valueMatcher);

    /**
     * Creates matcher for Counter metrics
     * @param valueMatcher Matcher for the counter value
     * @return Matcher that checks if metric is a Counter with matching value
     */
    public static <T extends Metric> Matcher<T> isCounter(Matcher<Long> valueMatcher);
}

Usage Example:

import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.metrics.testutils.MetricMatchers;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

@Test
public void testMetricMatchers() throws Exception {
    MetricListener metricListener = new MetricListener();
    MetricGroup metricGroup = metricListener.getMetricGroup();
    
    // Test Counter matcher
    Counter counter = metricGroup.counter("testCounter");
    counter.inc(42);
    
    Optional<Counter> retrievedCounter = metricListener.getCounter("testCounter");
    assertTrue(retrievedCounter.isPresent());
    
    // Use metric matcher to assert counter value
    assertThat(retrievedCounter.get(), MetricMatchers.isCounter(equalTo(42L)));
    assertThat(retrievedCounter.get(), MetricMatchers.isCounter(greaterThan(40L)));
    
    // Test Gauge matcher
    Gauge<Integer> gauge = metricGroup.gauge("testGauge", () -> 100);
    
    Optional<Gauge<Integer>> retrievedGauge = metricListener.getGauge("testGauge");
    assertTrue(retrievedGauge.isPresent());
    
    // Use metric matcher to assert gauge value
    assertThat(retrievedGauge.get(), MetricMatchers.isGauge(equalTo(100)));
    assertThat(retrievedGauge.get(), MetricMatchers.isGauge(greaterThanOrEqualTo(100)));
}

Complete Metrics Testing Example

Here's a comprehensive example showing how to test metrics in a Flink application:

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.metrics.testutils.MetricMatchers;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.AbstractTestBase;

public class MetricsTestExample extends AbstractTestBase {
    
    @Test
    public void testJobMetrics() throws Exception {
        MetricListener metricListener = new MetricListener();
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        // Create a function that reports metrics
        DataStream<String> input = env.fromElements("a", "b", "c", "d", "e");
        DataStream<String> result = input.map(new MetricsReportingMapFunction(metricListener));
        
        result.print();
        env.execute("Metrics Test Job");
        
        // Verify metrics were reported
        Optional<Counter> processedCounter = metricListener.getCounter("processed_elements");
        assertTrue("Counter should be present", processedCounter.isPresent());
        assertThat(processedCounter.get(), MetricMatchers.isCounter(equalTo(5L)));
        
        Optional<Gauge<Long>> lastProcessedTime = metricListener.getGauge("last_processed_time");
        assertTrue("Gauge should be present", lastProcessedTime.isPresent());
        assertThat(lastProcessedTime.get(), MetricMatchers.isGauge(greaterThan(0L)));
    }
    
    private static class MetricsReportingMapFunction extends RichMapFunction<String, String> {
        private final MetricListener metricListener;
        private Counter processedElements;
        private volatile long lastProcessedTime = 0;
        
        public MetricsReportingMapFunction(MetricListener metricListener) {
            this.metricListener = metricListener;
        }
        
        @Override
        public void open(Configuration parameters) {
            MetricGroup metricGroup = metricListener.getMetricGroup();
            processedElements = metricGroup.counter("processed_elements");
            metricGroup.gauge("last_processed_time", () -> lastProcessedTime);
        }
        
        @Override
        public String map(String value) {
            processedElements.inc();
            lastProcessedTime = System.currentTimeMillis();
            return value.toUpperCase();
        }
    }
}

Testing Different Metric Types

Testing Histograms

import org.apache.flink.metrics.Histogram;

@Test
public void testHistogramMetrics() throws Exception {
    MetricListener metricListener = new MetricListener();
    MetricGroup metricGroup = metricListener.getMetricGroup();
    
    // Register histogram
    Histogram histogram = metricGroup.histogram("response_time");
    
    // Report some values
    histogram.update(100);
    histogram.update(200);
    histogram.update(150);
    
    // Retrieve and verify
    Optional<Histogram> retrievedHistogram = metricListener.getHistogram("response_time");
    assertTrue(retrievedHistogram.isPresent());
    assertEquals(3, retrievedHistogram.get().getCount());
}

Testing Meters

import org.apache.flink.metrics.Meter;

@Test
public void testMeterMetrics() throws Exception {
    MetricListener metricListener = new MetricListener();
    MetricGroup metricGroup = metricListener.getMetricGroup();
    
    // Register meter
    Meter meter = metricGroup.meter("requests_per_second");
    
    // Mark some events
    meter.markEvent();
    meter.markEvent(5);
    
    // Retrieve and verify
    Optional<Meter> retrievedMeter = metricListener.getMeter("requests_per_second");
    assertTrue(retrievedMeter.isPresent());
    assertEquals(6, retrievedMeter.get().getCount());
}