CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-test-utils

Testing utilities for Apache Flink connectors providing test frameworks, assertion utilities, and abstractions for comprehensive connector testing

Pending
Overview
Eval results
Files

metrics.mddocs/

Metrics and Monitoring

The metrics framework provides utilities for querying and validating Flink job metrics via REST API. This enables performance testing, behavior validation, and monitoring of connector operations during test execution.

Capabilities

Metric Querier

Main utility class for querying job metrics from Flink cluster via REST API.

/**
 * Utility for querying job metrics via Flink REST API
 */
public class MetricQuerier {
    
    /**
     * Create metric querier with configuration
     * @param configuration Flink configuration for REST client
     * @throws ConfigurationException if configuration is invalid
     */
    public MetricQuerier(Configuration configuration) throws ConfigurationException;
    
    /**
     * Get job details including vertex information
     * @param client REST client instance
     * @param endpoint Test environment REST endpoint
     * @param jobId Job ID to query
     * @return Job details with vertex information
     * @throws Exception if query fails
     */
    public static JobDetailsInfo getJobDetails(
        RestClient client, 
        TestEnvironment.Endpoint endpoint, 
        JobID jobId
    ) throws Exception;
    
    /**
     * Get aggregated metrics for source/sink operator
     * @param endpoint Test environment REST endpoint
     * @param jobId Job ID to query
     * @param sourceOrSinkName Name of source or sink operator
     * @param metricName Name of metric to query (e.g., "numRecordsIn", "numRecordsOut")
     * @param filter Optional filter for metric selection (e.g., "Writer" for Sink V2)
     * @return Aggregated metric value across all subtasks
     * @throws Exception if query fails or metric not found
     */
    public Double getAggregatedMetricsByRestAPI(
        TestEnvironment.Endpoint endpoint,
        JobID jobId,
        String sourceOrSinkName,
        String metricName,
        String filter
    ) throws Exception;
    
    /**
     * Get list of available metrics for job vertex
     * @param endpoint Test environment REST endpoint
     * @param jobId Job ID to query
     * @param vertexId Vertex ID to query metrics for
     * @return Response containing available metrics
     * @throws Exception if query fails
     */
    public AggregatedMetricsResponseBody getMetricList(
        TestEnvironment.Endpoint endpoint,
        JobID jobId,
        JobVertexID vertexId
    ) throws Exception;
    
    /**
     * Get specific metrics with filtering
     * @param endpoint Test environment REST endpoint
     * @param jobId Job ID to query
     * @param vertexId Vertex ID to query
     * @param filters Comma-separated list of metric filters
     * @return Response containing filtered metrics
     * @throws Exception if query fails
     */
    public AggregatedMetricsResponseBody getMetrics(
        TestEnvironment.Endpoint endpoint,
        JobID jobId,
        JobVertexID vertexId,
        String filters
    ) throws Exception;
}

Usage Examples:

// Create metric querier
MetricQuerier metricQuerier = new MetricQuerier(new Configuration());

// Query sink metrics
Double numRecordsOut = metricQuerier.getAggregatedMetricsByRestAPI(
    testEnv.getRestEndpoint(),
    jobClient.getJobID(),
    "MySink", // sink operator name
    MetricNames.NUM_RECORDS_SEND, // metric name
    "Writer" // filter for Sink V2
);

// Validate expected record count
assertThat(numRecordsOut).isEqualTo(expectedRecordCount);

Common Metrics

Standard metrics available for source and sink connectors.

/**
 * Configuration constants for connector testing
 */
public class ConnectorTestConstants {
    public static final long METRIC_FETCHER_UPDATE_INTERVAL_MS = 1000L;
    public static final long SLOT_REQUEST_TIMEOUT_MS = 10_000L;
    public static final long HEARTBEAT_TIMEOUT_MS = 5_000L;
    public static final long HEARTBEAT_INTERVAL_MS = 1000L;
    public static final Duration DEFAULT_COLLECT_DATA_TIMEOUT = Duration.ofSeconds(120L);
}

Common metric names used in connector testing (from Flink's MetricNames class):

  • "numRecordsIn" - Number of records received by source
  • "numRecordsOut" - Number of records emitted by source
  • "numRecordsSend" - Number of records sent by sink
  • "numBytesIn" - Number of bytes received
  • "numBytesOut" - Number of bytes emitted

Usage Examples:

// Query different metric types using metric name strings
Double recordsIn = metricQuerier.getAggregatedMetricsByRestAPI(
    endpoint, jobId, "MySource", "numRecordsIn", null);

Double recordsOut = metricQuerier.getAggregatedMetricsByRestAPI(
    endpoint, jobId, "MySink", "numRecordsSend", "Writer");

Double bytesIn = metricQuerier.getAggregatedMetricsByRestAPI(
    endpoint, jobId, "MySource", "numBytesIn", null);

Integration with Test Suites

Automatic Metrics Testing

Test suites include automatic metrics validation for sources and sinks.

// From SinkTestSuiteBase.testMetrics()
@TestTemplate
@DisplayName("Test sink metrics")
public void testMetrics(
    TestEnvironment testEnv,
    DataStreamSinkExternalContext<T> externalContext,
    CheckpointingMode semantic
) throws Exception {
    
    // Generate and send test data
    List<T> testRecords = generateTestData(sinkSettings, externalContext);
    
    // Create and execute job
    StreamExecutionEnvironment env = testEnv.createExecutionEnvironment(settings);
    // ... job setup and execution
    
    // Validate metrics
    MetricQuerier queryRestClient = new MetricQuerier(new Configuration());
    waitUntilCondition(() -> {
        try {
            return compareSinkMetrics(
                queryRestClient,
                testEnv,
                externalContext,
                jobClient.getJobID(),
                sinkName,
                MetricNames.NUM_RECORDS_SEND,
                testRecords.size()
            );
        } catch (Exception e) {
            return false; // Retry on failure
        }
    });
}

Custom Metrics Validation

Implement custom metrics validation in your test classes.

public class MyConnectorTestSuite extends SinkTestSuiteBase<String> {
    
    @Test
    public void testCustomMetrics() throws Exception {
        // Execute connector job
        JobClient jobClient = executeConnectorJob();
        
        // Query custom metrics
        MetricQuerier querier = new MetricQuerier(new Configuration());
        
        // Validate throughput metrics
        Double throughput = querier.getAggregatedMetricsByRestAPI(
            testEnv.getRestEndpoint(),
            jobClient.getJobID(),
            "MyConnector",
            "recordsPerSecond",
            null
        );
        
        assertThat(throughput).isGreaterThan(1000.0); // Minimum throughput requirement
        
        // Validate error metrics
        Double errorRate = querier.getAggregatedMetricsByRestAPI(
            testEnv.getRestEndpoint(),
            jobClient.getJobID(),
            "MyConnector", 
            "errorRate",
            null
        );
        
        assertThat(errorRate).isLessThan(0.01); // Less than 1% error rate
    }
}

Metric Validation Patterns

Throughput Validation

Validate connector throughput meets performance requirements.

public void validateThroughput(JobClient jobClient, int expectedRecords, Duration testDuration) throws Exception {
    MetricQuerier querier = new MetricQuerier(new Configuration());
    
    // Wait for job to process all records
    waitUntilCondition(() -> {
        try {
            Double processedRecords = querier.getAggregatedMetricsByRestAPI(
                testEnv.getRestEndpoint(),
                jobClient.getJobID(),
                "MySource",
                NUM_RECORDS_OUT,
                null
            );
            return processedRecords >= expectedRecords;
        } catch (Exception e) {
            return false;
        }
    });
    
    // Calculate and validate throughput
    double throughput = expectedRecords / testDuration.toSeconds();
    assertThat(throughput).isGreaterThan(100.0); // Records per second
}

Latency Validation

Validate connector latency remains within acceptable bounds.

public void validateLatency(JobClient jobClient) throws Exception {
    MetricQuerier querier = new MetricQuerier(new Configuration());
    
    // Query latency metrics
    Double avgLatency = querier.getAggregatedMetricsByRestAPI(
        testEnv.getRestEndpoint(),
        jobClient.getJobID(),
        "MyConnector",
        LATENCY,
        null
    );
    
    // Validate latency is under 100ms
    assertThat(avgLatency).isLessThan(100.0);
}

Resource Usage Validation

Validate memory and CPU usage patterns.

public void validateResourceUsage(JobClient jobClient) throws Exception {
    MetricQuerier querier = new MetricQuerier(new Configuration());
    
    // Query memory usage
    Double heapUsed = querier.getAggregatedMetricsByRestAPI(
        testEnv.getRestEndpoint(),
        jobClient.getJobID(),
        "MyConnector",
        "memoryHeapUsed",
        null
    );
    
    Double heapMax = querier.getAggregatedMetricsByRestAPI(
        testEnv.getRestEndpoint(),
        jobClient.getJobID(),
        "MyConnector",
        "memoryHeapMax",
        null
    );
    
    // Validate memory usage is under 80% of max
    double memoryUsageRatio = heapUsed / heapMax;
    assertThat(memoryUsageRatio).isLessThan(0.8);
}

Advanced Metrics Scenarios

Multi-Operator Metrics

Query metrics across multiple operators in complex pipelines.

public void validatePipelineMetrics(JobClient jobClient) throws Exception {
    MetricQuerier querier = new MetricQuerier(new Configuration());
    
    // Query source metrics
    Double sourceRecords = querier.getAggregatedMetricsByRestAPI(
        testEnv.getRestEndpoint(), jobClient.getJobID(), "Source", NUM_RECORDS_OUT, null);
        
    // Query transformation metrics  
    Double transformRecords = querier.getAggregatedMetricsByRestAPI(
        testEnv.getRestEndpoint(), jobClient.getJobID(), "Transform", NUM_RECORDS_OUT, null);
        
    // Query sink metrics
    Double sinkRecords = querier.getAggregatedMetricsByRestAPI(
        testEnv.getRestEndpoint(), jobClient.getJobID(), "Sink", NUM_RECORDS_SEND, "Writer");
    
    // Validate record flow through pipeline
    assertThat(sourceRecords).isEqualTo(transformRecords);
    assertThat(transformRecords).isEqualTo(sinkRecords);
}

Historical Metrics Comparison

Compare metrics across test runs for performance regression detection.

public void compareWithBaseline(JobClient jobClient, MetricsBaseline baseline) throws Exception {
    MetricQuerier querier = new MetricQuerier(new Configuration());
    
    // Query current metrics
    Double currentThroughput = querier.getAggregatedMetricsByRestAPI(
        testEnv.getRestEndpoint(), jobClient.getJobID(), "MyConnector", "throughput", null);
        
    Double currentLatency = querier.getAggregatedMetricsByRestAPI(
        testEnv.getRestEndpoint(), jobClient.getJobID(), "MyConnector", LATENCY, null);
    
    // Compare with baseline (allow 10% deviation)
    assertThat(currentThroughput).isGreaterThan(baseline.getThroughput() * 0.9);
    assertThat(currentLatency).isLessThan(baseline.getLatency() * 1.1);
}

Parallel Subtask Metrics

Analyze metrics across parallel subtasks for load balancing validation.

public void validateLoadBalancing(JobClient jobClient) throws Exception {
    MetricQuerier querier = new MetricQuerier(new Configuration());
    
    // Get metrics for each subtask
    JobDetailsInfo jobDetails = MetricQuerier.getJobDetails(
        new RestClient(new Configuration(), Executors.newCachedThreadPool()),
        testEnv.getRestEndpoint(),
        jobClient.getJobID()
    );
    
    // Find source vertex
    JobVertexID sourceVertexId = jobDetails.getJobVertexInfos().stream()
        .filter(v -> v.getName().contains("Source"))
        .findFirst()
        .map(JobDetailsInfo.JobVertexDetailsInfo::getJobVertexID)
        .orElseThrow();
    
    // Get per-subtask metrics
    AggregatedMetricsResponseBody metrics = querier.getMetrics(
        testEnv.getRestEndpoint(),
        jobClient.getJobID(), 
        sourceVertexId,
        NUM_RECORDS_OUT
    );
    
    // Validate load distribution
    List<Double> subtaskValues = metrics.getMetrics().stream()
        .map(AggregatedMetric::getSum)
        .collect(Collectors.toList());
        
    double mean = subtaskValues.stream().mapToDouble(Double::doubleValue).average().orElse(0.0);
    double maxDeviation = subtaskValues.stream()
        .mapToDouble(value -> Math.abs(value - mean) / mean)
        .max().orElse(0.0);
        
    // Ensure load is balanced within 20%
    assertThat(maxDeviation).isLessThan(0.2);
}

Error Handling

Metric Query Failures

try {
    Double metric = querier.getAggregatedMetricsByRestAPI(endpoint, jobId, operatorName, metricName, filter);
} catch (IllegalStateException e) {
    // Metric not found - operator name or metric name incorrect
    fail("Metric not found: " + e.getMessage());
} catch (Exception e) {
    // Network or cluster issues
    throw new AssumptionViolatedException("Unable to query metrics", e);
}

Timeout Handling

// Use waitUntilCondition for retry logic
waitUntilCondition(() -> {
    try {
        Double metric = querier.getAggregatedMetricsByRestAPI(endpoint, jobId, operatorName, metricName, filter);
        return Precision.equals(expectedValue, metric);
    } catch (Exception e) {
        // Retry on failure
        return false;
    }
}, Duration.ofMinutes(2)); // 2 minute timeout

Debugging Metrics Issues

// List all available metrics for debugging
AggregatedMetricsResponseBody allMetrics = querier.getMetricList(endpoint, jobId, vertexId);
allMetrics.getMetrics().forEach(metric -> {
    System.out.println("Available metric: " + metric.getId());
});

// Get job details for vertex information
JobDetailsInfo jobDetails = MetricQuerier.getJobDetails(restClient, endpoint, jobId);
jobDetails.getJobVertexInfos().forEach(vertex -> {
    System.out.println("Vertex: " + vertex.getName() + " ID: " + vertex.getJobVertexID());
});

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-test-utils

docs

assertions.md

containers.md

external-systems.md

index.md

junit-integration.md

metrics.md

test-environments.md

test-suites.md

tile.json