Testing utilities for Apache Flink connectors providing test frameworks, assertion utilities, and abstractions for comprehensive connector testing
—
The assertion framework provides specialized utilities for validating connector behavior with support for different semantic guarantees (exactly-once, at-least-once). It integrates with Flink's collect sink mechanism to validate streaming results.
Entry point for creating assertion instances for iterator validation.
/**
* Entry point for assertion methods for CollectIteratorAssert
* Each method is a static factory for creating assertion instances
*/
public final class CollectIteratorAssertions {
/**
* Create ordered assertion instance for iterator validation
* @param actual Iterator to validate
* @param <T> Type of elements in iterator
* @return CollectIteratorAssert instance for method chaining
*/
public static <T> CollectIteratorAssert<T> assertThat(Iterator<T> actual);
/**
* Create unordered assertion instance for iterator validation
* @param actual Iterator to validate
* @param <T> Type of elements in iterator
* @return UnorderedCollectIteratorAssert instance for method chaining
*/
public static <T> UnorderedCollectIteratorAssert<T> assertUnordered(Iterator<T> actual);
}Usage Examples:
import static org.apache.flink.connector.testframe.utils.CollectIteratorAssertions.assertThat;
import static org.apache.flink.connector.testframe.utils.CollectIteratorAssertions.assertUnordered;
// Basic ordered assertion
Iterator<String> results = collectResults();
assertThat(results)
.matchesRecordsFromSource(expectedData, CheckpointingMode.EXACTLY_ONCE);
// Unordered assertion for parallel processing
assertUnordered(results)
.containsExactlyInAnyOrder(expectedData);Assertion utilities for validating iterator results with order considerations.
/**
* Assertion utilities for ordered iterator validation
* @param <T> Type of elements being validated
*/
public class CollectIteratorAssert<T> {
/**
* Validate iterator matches expected records from source with semantic guarantees
* @param expected List of expected record collections (one per source split)
* @param semantic Checkpointing semantic to validate against
*/
public void matchesRecordsFromSource(
List<List<T>> expected,
CheckpointingMode semantic
);
/**
* Set limit on number of records to validate (for unbounded sources)
* @param limit Maximum number of records to validate
* @return Self for method chaining
*/
public CollectIteratorAssert<T> withNumRecordsLimit(int limit);
}Usage Examples:
// Source testing with multiple splits
List<List<String>> expectedBySplit = Arrays.asList(
Arrays.asList("split1-record1", "split1-record2"),
Arrays.asList("split2-record1", "split2-record2")
);
assertThat(resultIterator)
.matchesRecordsFromSource(expectedBySplit, CheckpointingMode.EXACTLY_ONCE);
// Unbounded source testing with record limit
assertThat(resultIterator)
.withNumRecordsLimit(100)
.matchesRecordsFromSource(expectedBySplit, CheckpointingMode.AT_LEAST_ONCE);Assertion utilities for validating iterator results without order requirements.
/**
* Assertion utilities for unordered iterator validation
* @param <T> Type of elements being validated
*/
public class UnorderedCollectIteratorAssert<T> {
/**
* Set limit on number of records to validate (for unbounded sources)
* @param limit Maximum number of records to validate
* @return Self for method chaining
*/
public UnorderedCollectIteratorAssert<T> withNumRecordsLimit(int limit);
/**
* Validate iterator matches expected records from source with semantic guarantees
* @param expected List of expected record collections (one per source split)
* @param semantic Checkpointing semantic to validate against
*/
public void matchesRecordsFromSource(
List<List<T>> expected,
CheckpointingMode semantic
);
}Usage Examples:
// Unordered validation for parallel processing
List<List<String>> expectedBySplit = Arrays.asList(
Arrays.asList("split1-record1", "split1-record2"),
Arrays.asList("split2-record1", "split2-record2")
);
assertUnordered(resultIterator)
.matchesRecordsFromSource(expectedBySplit, CheckpointingMode.EXACTLY_ONCE);
// Unbounded source testing with record limit
assertUnordered(resultIterator)
.withNumRecordsLimit(100)
.matchesRecordsFromSource(expectedBySplit, CheckpointingMode.AT_LEAST_ONCE);Validates that each record appears exactly once in the results.
// Exactly-once validation
assertThat(resultIterator)
.matchesRecordsFromSource(expectedData, CheckpointingMode.EXACTLY_ONCE);
// Behavior:
// - Each expected record must appear exactly once
// - No duplicates allowed
// - Missing records cause assertion failure
// - Extra records cause assertion failureValidates that each record appears at least once, allowing for duplicates.
// At-least-once validation
assertThat(resultIterator)
.matchesRecordsFromSource(expectedData, CheckpointingMode.AT_LEAST_ONCE);
// Behavior:
// - Each expected record must appear at least once
// - Duplicates are allowed and ignored
// - Missing records cause assertion failure
// - Extra records that match expected records are allowedTest suites automatically set up result collection using Flink's collect sink.
/**
* Helper class for building CollectResultIterator instances
* @param <T> Type of collected elements
*/
protected static class CollectIteratorBuilder<T> {
/**
* Build CollectResultIterator bound to job client
* @param jobClient Job client for result collection
* @return CollectResultIterator for accessing results
*/
protected CollectResultIterator<T> build(JobClient jobClient);
}
// Used internally by test suite base classes
protected CollectIteratorBuilder<T> addCollectSink(DataStream<T> stream);Usage in Test Suites:
// Automatic setup in test suite base classes
DataStreamSource<T> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test Source");
CollectIteratorBuilder<T> iteratorBuilder = addCollectSink(stream);
JobClient jobClient = env.executeAsync("Test Job");
// Get results and validate
try (CollectResultIterator<T> resultIterator = iteratorBuilder.build(jobClient)) {
assertThat(resultIterator)
.matchesRecordsFromSource(expectedData, semantic);
}// In SinkTestSuiteBase
protected void checkResultWithSemantic(
ExternalSystemDataReader<T> reader,
List<T> testData,
CheckpointingMode semantic
) throws Exception {
final ArrayList<T> result = new ArrayList<>();
waitUntilCondition(() -> {
// Poll data from external system
pollAndAppendResultData(result, reader, testData, 30, semantic);
try {
// Validate using assertions
CollectIteratorAssertions.assertThat(sort(result).iterator())
.matchesRecordsFromSource(Arrays.asList(sort(testData)), semantic);
return true;
} catch (Throwable t) {
return false;
}
});
}// In SourceTestSuiteBase
protected void checkResultWithSemantic(
CloseableIterator<T> resultIterator,
List<List<T>> testData,
CheckpointingMode semantic,
Integer limit
) {
if (limit != null) {
// Unbounded source with record limit
Runnable runnable = () ->
CollectIteratorAssertions.assertThat(resultIterator)
.withNumRecordsLimit(limit)
.matchesRecordsFromSource(testData, semantic);
assertThatFuture(runAsync(runnable)).eventuallySucceeds();
} else {
// Bounded source
CollectIteratorAssertions.assertThat(resultIterator)
.matchesRecordsFromSource(testData, semantic);
}
}Validates results from sources with multiple splits, ensuring each split's data is properly consumed.
// Test data organized by split
List<List<String>> testDataBySplit = Arrays.asList(
Arrays.asList("split0-rec1", "split0-rec2", "split0-rec3"), // Split 0
Arrays.asList("split1-rec1", "split1-rec2"), // Split 1
Arrays.asList("split2-rec1", "split2-rec2", "split2-rec3") // Split 2
);
// Validation allows records from different splits to be interleaved
assertThat(resultIterator)
.matchesRecordsFromSource(testDataBySplit, CheckpointingMode.EXACTLY_ONCE);// Bounded source - validate all expected records
public void testBoundedSource() {
assertThat(resultIterator)
.matchesRecordsFromSource(expectedData, semantic);
// Will wait for job to finish naturally
}
// Unbounded source - validate limited number of records
public void testUnboundedSource() {
assertThat(resultIterator)
.withNumRecordsLimit(expectedSize)
.matchesRecordsFromSource(expectedData, semantic);
// Will validate first expectedSize records then succeed
}// Validate recovery after failure
public void testTaskManagerFailure() {
// Phase 1: Validate records before failure
checkResultWithSemantic(iterator, beforeFailureData, semantic, beforeFailureData.size());
// Trigger failure
controller.triggerTaskManagerFailover(jobClient, () -> {
// Write additional data after failure
writeTestData(afterFailureData);
});
// Phase 2: Validate records after recovery
checkResultWithSemantic(iterator, afterFailureData, semantic, afterFailureData.size());
}// Record count mismatch
AssertionError: Expected 100 records but found 95
// Missing records in exactly-once
AssertionError: Expected record 'test-42' not found in results
// Unexpected records in exactly-once
AssertionError: Unexpected record 'duplicate-7' found in results// Configure timeouts for result collection
CollectResultIterator<T> iterator = new CollectResultIterator<>(
operatorUid,
serializer,
accumulatorName,
checkpointConfig,
Duration.ofMinutes(5).toMillis() // 5 minute timeout
);// Enable detailed logging for debugging
Logger logger = LoggerFactory.getLogger(CollectIteratorAssert.class);
// Add logging to capture actual vs expected results
// Use breakpoints to inspect iterator contents
// Check external system state for sink validationsInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-test-utils