Testing utilities for Apache Flink connectors providing test frameworks, assertion utilities, and abstractions for comprehensive connector testing
—
The metrics framework provides utilities for querying and validating Flink job metrics via REST API. This enables performance testing, behavior validation, and monitoring of connector operations during test execution.
Main utility class for querying job metrics from Flink cluster via REST API.
/**
* Utility for querying job metrics via Flink REST API
*/
public class MetricQuerier {
/**
* Create metric querier with configuration
* @param configuration Flink configuration for REST client
* @throws ConfigurationException if configuration is invalid
*/
public MetricQuerier(Configuration configuration) throws ConfigurationException;
/**
* Get job details including vertex information
* @param client REST client instance
* @param endpoint Test environment REST endpoint
* @param jobId Job ID to query
* @return Job details with vertex information
* @throws Exception if query fails
*/
public static JobDetailsInfo getJobDetails(
RestClient client,
TestEnvironment.Endpoint endpoint,
JobID jobId
) throws Exception;
/**
* Get aggregated metrics for source/sink operator
* @param endpoint Test environment REST endpoint
* @param jobId Job ID to query
* @param sourceOrSinkName Name of source or sink operator
* @param metricName Name of metric to query (e.g., "numRecordsIn", "numRecordsOut")
* @param filter Optional filter for metric selection (e.g., "Writer" for Sink V2)
* @return Aggregated metric value across all subtasks
* @throws Exception if query fails or metric not found
*/
public Double getAggregatedMetricsByRestAPI(
TestEnvironment.Endpoint endpoint,
JobID jobId,
String sourceOrSinkName,
String metricName,
String filter
) throws Exception;
/**
* Get list of available metrics for job vertex
* @param endpoint Test environment REST endpoint
* @param jobId Job ID to query
* @param vertexId Vertex ID to query metrics for
* @return Response containing available metrics
* @throws Exception if query fails
*/
public AggregatedMetricsResponseBody getMetricList(
TestEnvironment.Endpoint endpoint,
JobID jobId,
JobVertexID vertexId
) throws Exception;
/**
* Get specific metrics with filtering
* @param endpoint Test environment REST endpoint
* @param jobId Job ID to query
* @param vertexId Vertex ID to query
* @param filters Comma-separated list of metric filters
* @return Response containing filtered metrics
* @throws Exception if query fails
*/
public AggregatedMetricsResponseBody getMetrics(
TestEnvironment.Endpoint endpoint,
JobID jobId,
JobVertexID vertexId,
String filters
) throws Exception;
}Usage Examples:
// Create metric querier
MetricQuerier metricQuerier = new MetricQuerier(new Configuration());
// Query sink metrics
Double numRecordsOut = metricQuerier.getAggregatedMetricsByRestAPI(
testEnv.getRestEndpoint(),
jobClient.getJobID(),
"MySink", // sink operator name
MetricNames.NUM_RECORDS_SEND, // metric name
"Writer" // filter for Sink V2
);
// Validate expected record count
assertThat(numRecordsOut).isEqualTo(expectedRecordCount);Standard metrics available for source and sink connectors.
/**
* Configuration constants for connector testing
*/
public class ConnectorTestConstants {
public static final long METRIC_FETCHER_UPDATE_INTERVAL_MS = 1000L;
public static final long SLOT_REQUEST_TIMEOUT_MS = 10_000L;
public static final long HEARTBEAT_TIMEOUT_MS = 5_000L;
public static final long HEARTBEAT_INTERVAL_MS = 1000L;
public static final Duration DEFAULT_COLLECT_DATA_TIMEOUT = Duration.ofSeconds(120L);
}Common metric names used in connector testing (from Flink's MetricNames class):
"numRecordsIn" - Number of records received by source"numRecordsOut" - Number of records emitted by source"numRecordsSend" - Number of records sent by sink"numBytesIn" - Number of bytes received"numBytesOut" - Number of bytes emittedUsage Examples:
// Query different metric types using metric name strings
Double recordsIn = metricQuerier.getAggregatedMetricsByRestAPI(
endpoint, jobId, "MySource", "numRecordsIn", null);
Double recordsOut = metricQuerier.getAggregatedMetricsByRestAPI(
endpoint, jobId, "MySink", "numRecordsSend", "Writer");
Double bytesIn = metricQuerier.getAggregatedMetricsByRestAPI(
endpoint, jobId, "MySource", "numBytesIn", null);Test suites include automatic metrics validation for sources and sinks.
// From SinkTestSuiteBase.testMetrics()
@TestTemplate
@DisplayName("Test sink metrics")
public void testMetrics(
TestEnvironment testEnv,
DataStreamSinkExternalContext<T> externalContext,
CheckpointingMode semantic
) throws Exception {
// Generate and send test data
List<T> testRecords = generateTestData(sinkSettings, externalContext);
// Create and execute job
StreamExecutionEnvironment env = testEnv.createExecutionEnvironment(settings);
// ... job setup and execution
// Validate metrics
MetricQuerier queryRestClient = new MetricQuerier(new Configuration());
waitUntilCondition(() -> {
try {
return compareSinkMetrics(
queryRestClient,
testEnv,
externalContext,
jobClient.getJobID(),
sinkName,
MetricNames.NUM_RECORDS_SEND,
testRecords.size()
);
} catch (Exception e) {
return false; // Retry on failure
}
});
}Implement custom metrics validation in your test classes.
public class MyConnectorTestSuite extends SinkTestSuiteBase<String> {
@Test
public void testCustomMetrics() throws Exception {
// Execute connector job
JobClient jobClient = executeConnectorJob();
// Query custom metrics
MetricQuerier querier = new MetricQuerier(new Configuration());
// Validate throughput metrics
Double throughput = querier.getAggregatedMetricsByRestAPI(
testEnv.getRestEndpoint(),
jobClient.getJobID(),
"MyConnector",
"recordsPerSecond",
null
);
assertThat(throughput).isGreaterThan(1000.0); // Minimum throughput requirement
// Validate error metrics
Double errorRate = querier.getAggregatedMetricsByRestAPI(
testEnv.getRestEndpoint(),
jobClient.getJobID(),
"MyConnector",
"errorRate",
null
);
assertThat(errorRate).isLessThan(0.01); // Less than 1% error rate
}
}Validate connector throughput meets performance requirements.
public void validateThroughput(JobClient jobClient, int expectedRecords, Duration testDuration) throws Exception {
MetricQuerier querier = new MetricQuerier(new Configuration());
// Wait for job to process all records
waitUntilCondition(() -> {
try {
Double processedRecords = querier.getAggregatedMetricsByRestAPI(
testEnv.getRestEndpoint(),
jobClient.getJobID(),
"MySource",
NUM_RECORDS_OUT,
null
);
return processedRecords >= expectedRecords;
} catch (Exception e) {
return false;
}
});
// Calculate and validate throughput
double throughput = expectedRecords / testDuration.toSeconds();
assertThat(throughput).isGreaterThan(100.0); // Records per second
}Validate connector latency remains within acceptable bounds.
public void validateLatency(JobClient jobClient) throws Exception {
MetricQuerier querier = new MetricQuerier(new Configuration());
// Query latency metrics
Double avgLatency = querier.getAggregatedMetricsByRestAPI(
testEnv.getRestEndpoint(),
jobClient.getJobID(),
"MyConnector",
LATENCY,
null
);
// Validate latency is under 100ms
assertThat(avgLatency).isLessThan(100.0);
}Validate memory and CPU usage patterns.
public void validateResourceUsage(JobClient jobClient) throws Exception {
MetricQuerier querier = new MetricQuerier(new Configuration());
// Query memory usage
Double heapUsed = querier.getAggregatedMetricsByRestAPI(
testEnv.getRestEndpoint(),
jobClient.getJobID(),
"MyConnector",
"memoryHeapUsed",
null
);
Double heapMax = querier.getAggregatedMetricsByRestAPI(
testEnv.getRestEndpoint(),
jobClient.getJobID(),
"MyConnector",
"memoryHeapMax",
null
);
// Validate memory usage is under 80% of max
double memoryUsageRatio = heapUsed / heapMax;
assertThat(memoryUsageRatio).isLessThan(0.8);
}Query metrics across multiple operators in complex pipelines.
public void validatePipelineMetrics(JobClient jobClient) throws Exception {
MetricQuerier querier = new MetricQuerier(new Configuration());
// Query source metrics
Double sourceRecords = querier.getAggregatedMetricsByRestAPI(
testEnv.getRestEndpoint(), jobClient.getJobID(), "Source", NUM_RECORDS_OUT, null);
// Query transformation metrics
Double transformRecords = querier.getAggregatedMetricsByRestAPI(
testEnv.getRestEndpoint(), jobClient.getJobID(), "Transform", NUM_RECORDS_OUT, null);
// Query sink metrics
Double sinkRecords = querier.getAggregatedMetricsByRestAPI(
testEnv.getRestEndpoint(), jobClient.getJobID(), "Sink", NUM_RECORDS_SEND, "Writer");
// Validate record flow through pipeline
assertThat(sourceRecords).isEqualTo(transformRecords);
assertThat(transformRecords).isEqualTo(sinkRecords);
}Compare metrics across test runs for performance regression detection.
public void compareWithBaseline(JobClient jobClient, MetricsBaseline baseline) throws Exception {
MetricQuerier querier = new MetricQuerier(new Configuration());
// Query current metrics
Double currentThroughput = querier.getAggregatedMetricsByRestAPI(
testEnv.getRestEndpoint(), jobClient.getJobID(), "MyConnector", "throughput", null);
Double currentLatency = querier.getAggregatedMetricsByRestAPI(
testEnv.getRestEndpoint(), jobClient.getJobID(), "MyConnector", LATENCY, null);
// Compare with baseline (allow 10% deviation)
assertThat(currentThroughput).isGreaterThan(baseline.getThroughput() * 0.9);
assertThat(currentLatency).isLessThan(baseline.getLatency() * 1.1);
}Analyze metrics across parallel subtasks for load balancing validation.
public void validateLoadBalancing(JobClient jobClient) throws Exception {
MetricQuerier querier = new MetricQuerier(new Configuration());
// Get metrics for each subtask
JobDetailsInfo jobDetails = MetricQuerier.getJobDetails(
new RestClient(new Configuration(), Executors.newCachedThreadPool()),
testEnv.getRestEndpoint(),
jobClient.getJobID()
);
// Find source vertex
JobVertexID sourceVertexId = jobDetails.getJobVertexInfos().stream()
.filter(v -> v.getName().contains("Source"))
.findFirst()
.map(JobDetailsInfo.JobVertexDetailsInfo::getJobVertexID)
.orElseThrow();
// Get per-subtask metrics
AggregatedMetricsResponseBody metrics = querier.getMetrics(
testEnv.getRestEndpoint(),
jobClient.getJobID(),
sourceVertexId,
NUM_RECORDS_OUT
);
// Validate load distribution
List<Double> subtaskValues = metrics.getMetrics().stream()
.map(AggregatedMetric::getSum)
.collect(Collectors.toList());
double mean = subtaskValues.stream().mapToDouble(Double::doubleValue).average().orElse(0.0);
double maxDeviation = subtaskValues.stream()
.mapToDouble(value -> Math.abs(value - mean) / mean)
.max().orElse(0.0);
// Ensure load is balanced within 20%
assertThat(maxDeviation).isLessThan(0.2);
}try {
Double metric = querier.getAggregatedMetricsByRestAPI(endpoint, jobId, operatorName, metricName, filter);
} catch (IllegalStateException e) {
// Metric not found - operator name or metric name incorrect
fail("Metric not found: " + e.getMessage());
} catch (Exception e) {
// Network or cluster issues
throw new AssumptionViolatedException("Unable to query metrics", e);
}// Use waitUntilCondition for retry logic
waitUntilCondition(() -> {
try {
Double metric = querier.getAggregatedMetricsByRestAPI(endpoint, jobId, operatorName, metricName, filter);
return Precision.equals(expectedValue, metric);
} catch (Exception e) {
// Retry on failure
return false;
}
}, Duration.ofMinutes(2)); // 2 minute timeout// List all available metrics for debugging
AggregatedMetricsResponseBody allMetrics = querier.getMetricList(endpoint, jobId, vertexId);
allMetrics.getMetrics().forEach(metric -> {
System.out.println("Available metric: " + metric.getId());
});
// Get job details for vertex information
JobDetailsInfo jobDetails = MetricQuerier.getJobDetails(restClient, endpoint, jobId);
jobDetails.getJobVertexInfos().forEach(vertex -> {
System.out.println("Vertex: " + vertex.getName() + " ID: " + vertex.getJobVertexID());
});Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-test-utils