CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-test-utils

Comprehensive testing utilities for Apache Flink stream and batch processing applications

Pending
Overview
Eval results
Files

metrics-testing.mddocs/

Metrics Testing

Fluent assertions and metric listeners for validating Flink metrics including counters, gauges, histograms, and meters with easy retrieval and verification capabilities. Enables comprehensive testing of application metrics and monitoring.

Capabilities

Metric Assertions

AssertJ-style fluent assertions for Flink metrics providing type-safe verification of metric values with readable error messages.

public class MetricAssertions {
    public static CounterAssert assertThatCounter(Metric actual);
    public static <T> GaugeAssert<T> assertThatGauge(Metric actual);
}

Counter Assertions

Assertions specifically for Counter metrics with value comparison capabilities.

public static class CounterAssert extends AbstractAssert<CounterAssert, Counter> {
    public CounterAssert isEqualTo(Object expected);
}

Gauge Assertions

Assertions for Gauge metrics supporting both exact value comparison and tolerance-based comparisons for numeric values.

public static class GaugeAssert<T> extends AbstractAssert<GaugeAssert<T>, Gauge<T>> {
    public GaugeAssert<T> isEqualTo(Object expected);
    public GaugeAssert<T> isCloseTo(long value, long epsilon);
}

Usage Example

import org.apache.flink.metrics.testutils.MetricAssertions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;

// Test counter metrics
Counter recordsProcessed = getCounterMetric("records_processed");
MetricAssertions.assertThatCounter(recordsProcessed)
    .isEqualTo(1000L);

// Test gauge metrics with exact value
Gauge<Double> cpuUsage = getGaugeMetric("cpu_usage");
MetricAssertions.assertThatGauge(cpuUsage)
    .isEqualTo(0.75);

// Test gauge metrics with tolerance
Gauge<Long> memoryUsage = getGaugeMetric("memory_usage");
MetricAssertions.assertThatGauge(memoryUsage)
    .isCloseTo(1024L, 50L);

Metric Listener

Comprehensive metric listener that captures metric registration events and provides convenient retrieval methods for all metric types.

public class MetricListener implements MetricReporter {
    public MetricListener();
    
    public MetricGroup getMetricGroup();
    
    public <T extends Metric> Optional<T> getMetric(Class<T> metricType, String... identifier);
    public Optional<Meter> getMeter(String... identifier);
    public Optional<Counter> getCounter(String... identifier);
    public Optional<Histogram> getHistogram(String... identifier);
    public <T> Optional<Gauge<T>> getGauge(String... identifier);
}

Usage Example

import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.metrics.*;

// Create and register metric listener
MetricListener metricListener = new MetricListener();

// Register with Flink's metric system
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + 
    ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
    MetricListener.class.getName());

// Retrieve specific metric types
Optional<Counter> counter = metricListener.getCounter("job", "task", "records_processed");
if (counter.isPresent()) {
    long count = counter.get().getCount();
    assertEquals(1000L, count);
}

Optional<Gauge<Double>> gauge = metricListener.getGauge("job", "task", "cpu_usage");
if (gauge.isPresent()) {
    double usage = gauge.get().getValue();
    assertTrue(usage >= 0.0 && usage <= 1.0);
}

Optional<Histogram> histogram = metricListener.getHistogram("job", "task", "latency");
if (histogram.isPresent()) {
    long count = histogram.get().getCount();
    assertTrue(count > 0);
}

Optional<Meter> meter = metricListener.getMeter("job", "task", "throughput");
if (meter.isPresent()) {
    double rate = meter.get().getRate();
    assertTrue(rate > 0.0);
}

Generic Metric Retrieval

Type-safe generic method for retrieving any metric type with compile-time type checking.

public <T extends Metric> Optional<T> getMetric(Class<T> metricType, String... identifier);

Usage Example

// Retrieve custom metric types
Optional<MyCustomMetric> customMetric = 
    metricListener.getMetric(MyCustomMetric.class, "job", "custom");

// Retrieve standard metrics with type safety
Optional<Counter> typedCounter = 
    metricListener.getMetric(Counter.class, "records", "processed");

Integration Patterns

Test Case Integration

Complete example of integrating metrics testing into a Flink test case with metric verification.

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.metrics.testutils.MetricAssertions;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

public class MetricsIntegrationTest {
    
    @RegisterExtension
    static final MiniClusterExtension MINI_CLUSTER = new MiniClusterExtension();
    
    @Test
    void testJobMetrics() throws Exception {
        MetricListener metricListener = new MetricListener();
        
        StreamExecutionEnvironment env = MINI_CLUSTER.getTestStreamEnvironment();
        
        // Configure metric reporting
        env.getConfig().setGlobalJobParameters(
            ParameterTool.fromMap(Collections.singletonMap("metrics.reporter", "test")));
        
        // Create streaming job
        env.fromElements(1, 2, 3, 4, 5)
           .map(new CountingMapFunction())
           .print();
           
        // Execute job
        JobExecutionResult result = env.execute("Metrics Test Job");
        
        // Verify metrics were captured
        Optional<Counter> processedRecords = 
            metricListener.getCounter("job", "map", "records_processed");
        assertTrue(processedRecords.isPresent());
        
        MetricAssertions.assertThatCounter(processedRecords.get())
            .isEqualTo(5L);
    }
}

Custom Metric Verification

Pattern for testing custom business metrics in Flink applications.

@Test
void testCustomBusinessMetrics() throws Exception {
    MetricListener listener = new MetricListener();
    
    // Execute job with custom metrics
    executeJobWithCustomMetrics();
    
    // Verify business-specific metrics
    Optional<Gauge<Long>> orderCount = listener.getGauge("business", "orders", "total");
    Optional<Counter> errorCount = listener.getCounter("business", "errors", "validation");
    Optional<Histogram> processingTime = listener.getHistogram("business", "processing", "duration");
    
    assertTrue(orderCount.isPresent());
    assertTrue(errorCount.isPresent());
    assertTrue(processingTime.isPresent());
    
    // Verify metric values
    assertTrue(orderCount.get().getValue() > 0);
    assertEquals(0L, errorCount.get().getCount()); // No errors expected
    assertTrue(processingTime.get().getCount() > 0);
}

Metric Group Access

Access to the metric group hierarchy for advanced metric organization verification.

@Test
void testMetricGroupStructure() {
    MetricListener listener = new MetricListener();
    MetricGroup rootGroup = listener.getMetricGroup();
    
    // Verify metric group hierarchy
    assertNotNull(rootGroup);
    
    // Access nested metric groups and verify structure
    // Implementation depends on your specific metric organization
}

Error Handling and Debugging

The metrics testing utilities provide clear error messages when:

  • Metrics are not found at the specified path
  • Metric types don't match expected types
  • Assertion values don't match actual metric values
  • Tolerance-based comparisons fail for numeric metrics

This enables quick identification and resolution of metrics-related test failures, helping ensure your Flink applications properly expose and update their metrics during execution.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-test-utils

docs

index.md

metrics-testing.md

minicluster-management.md

result-verification.md

specialized-connectors.md

test-data-sources.md

test-environments.md

validation-utilities.md

tile.json