CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-test-utils

Testing utilities for Apache Flink connectors providing test frameworks, assertion utilities, and abstractions for comprehensive connector testing

Pending
Overview
Eval results
Files

assertions.mddocs/

Assertions and Validation

The assertion framework provides specialized utilities for validating connector behavior with support for different semantic guarantees (exactly-once, at-least-once). It integrates with Flink's collect sink mechanism to validate streaming results.

Capabilities

Collect Iterator Assertions

Entry point for creating assertion instances for iterator validation.

/**
 * Entry point for assertion methods for CollectIteratorAssert
 * Each method is a static factory for creating assertion instances
 */
public final class CollectIteratorAssertions {
    
    /**
     * Create ordered assertion instance for iterator validation
     * @param actual Iterator to validate
     * @param <T> Type of elements in iterator
     * @return CollectIteratorAssert instance for method chaining
     */
    public static <T> CollectIteratorAssert<T> assertThat(Iterator<T> actual);
    
    /**
     * Create unordered assertion instance for iterator validation
     * @param actual Iterator to validate  
     * @param <T> Type of elements in iterator
     * @return UnorderedCollectIteratorAssert instance for method chaining
     */
    public static <T> UnorderedCollectIteratorAssert<T> assertUnordered(Iterator<T> actual);
}

Usage Examples:

import static org.apache.flink.connector.testframe.utils.CollectIteratorAssertions.assertThat;
import static org.apache.flink.connector.testframe.utils.CollectIteratorAssertions.assertUnordered;

// Basic ordered assertion
Iterator<String> results = collectResults();
assertThat(results)
    .matchesRecordsFromSource(expectedData, CheckpointingMode.EXACTLY_ONCE);

// Unordered assertion for parallel processing
assertUnordered(results)
    .containsExactlyInAnyOrder(expectedData);

Ordered Iterator Assertions

Assertion utilities for validating iterator results with order considerations.

/**
 * Assertion utilities for ordered iterator validation
 * @param <T> Type of elements being validated
 */
public class CollectIteratorAssert<T> {
    
    /**
     * Validate iterator matches expected records from source with semantic guarantees
     * @param expected List of expected record collections (one per source split)
     * @param semantic Checkpointing semantic to validate against
     */
    public void matchesRecordsFromSource(
        List<List<T>> expected, 
        CheckpointingMode semantic
    );
    
    /**
     * Set limit on number of records to validate (for unbounded sources)
     * @param limit Maximum number of records to validate
     * @return Self for method chaining
     */
    public CollectIteratorAssert<T> withNumRecordsLimit(int limit);
}

Usage Examples:

// Source testing with multiple splits
List<List<String>> expectedBySplit = Arrays.asList(
    Arrays.asList("split1-record1", "split1-record2"),
    Arrays.asList("split2-record1", "split2-record2")
);

assertThat(resultIterator)
    .matchesRecordsFromSource(expectedBySplit, CheckpointingMode.EXACTLY_ONCE);

// Unbounded source testing with record limit
assertThat(resultIterator)
    .withNumRecordsLimit(100)
    .matchesRecordsFromSource(expectedBySplit, CheckpointingMode.AT_LEAST_ONCE);

Unordered Iterator Assertions

Assertion utilities for validating iterator results without order requirements.

/**
 * Assertion utilities for unordered iterator validation
 * @param <T> Type of elements being validated
 */
public class UnorderedCollectIteratorAssert<T> {
    
    /**
     * Set limit on number of records to validate (for unbounded sources)
     * @param limit Maximum number of records to validate
     * @return Self for method chaining
     */
    public UnorderedCollectIteratorAssert<T> withNumRecordsLimit(int limit);
    
    /**
     * Validate iterator matches expected records from source with semantic guarantees
     * @param expected List of expected record collections (one per source split)
     * @param semantic Checkpointing semantic to validate against
     */
    public void matchesRecordsFromSource(
        List<List<T>> expected, 
        CheckpointingMode semantic
    );
}

Usage Examples:

// Unordered validation for parallel processing
List<List<String>> expectedBySplit = Arrays.asList(
    Arrays.asList("split1-record1", "split1-record2"),
    Arrays.asList("split2-record1", "split2-record2")
);

assertUnordered(resultIterator)
    .matchesRecordsFromSource(expectedBySplit, CheckpointingMode.EXACTLY_ONCE);

// Unbounded source testing with record limit
assertUnordered(resultIterator)
    .withNumRecordsLimit(100)
    .matchesRecordsFromSource(expectedBySplit, CheckpointingMode.AT_LEAST_ONCE);

Semantic Guarantee Validation

Exactly-Once Semantics

Validates that each record appears exactly once in the results.

// Exactly-once validation
assertThat(resultIterator)
    .matchesRecordsFromSource(expectedData, CheckpointingMode.EXACTLY_ONCE);

// Behavior:
// - Each expected record must appear exactly once
// - No duplicates allowed
// - Missing records cause assertion failure
// - Extra records cause assertion failure

At-Least-Once Semantics

Validates that each record appears at least once, allowing for duplicates.

// At-least-once validation
assertThat(resultIterator)
    .matchesRecordsFromSource(expectedData, CheckpointingMode.AT_LEAST_ONCE);

// Behavior:
// - Each expected record must appear at least once
// - Duplicates are allowed and ignored
// - Missing records cause assertion failure
// - Extra records that match expected records are allowed

Integration with Test Framework

Automatic Result Collection

Test suites automatically set up result collection using Flink's collect sink.

/**
 * Helper class for building CollectResultIterator instances
 * @param <T> Type of collected elements
 */
protected static class CollectIteratorBuilder<T> {
    
    /**
     * Build CollectResultIterator bound to job client
     * @param jobClient Job client for result collection
     * @return CollectResultIterator for accessing results
     */
    protected CollectResultIterator<T> build(JobClient jobClient);
}

// Used internally by test suite base classes
protected CollectIteratorBuilder<T> addCollectSink(DataStream<T> stream);

Usage in Test Suites:

// Automatic setup in test suite base classes
DataStreamSource<T> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test Source");
CollectIteratorBuilder<T> iteratorBuilder = addCollectSink(stream);
JobClient jobClient = env.executeAsync("Test Job");

// Get results and validate
try (CollectResultIterator<T> resultIterator = iteratorBuilder.build(jobClient)) {
    assertThat(resultIterator)
        .matchesRecordsFromSource(expectedData, semantic);
}

Result Validation Patterns

Sink Testing Pattern

// In SinkTestSuiteBase
protected void checkResultWithSemantic(
    ExternalSystemDataReader<T> reader,
    List<T> testData,
    CheckpointingMode semantic
) throws Exception {
    
    final ArrayList<T> result = new ArrayList<>();
    waitUntilCondition(() -> {
        // Poll data from external system
        pollAndAppendResultData(result, reader, testData, 30, semantic);
        try {
            // Validate using assertions
            CollectIteratorAssertions.assertThat(sort(result).iterator())
                .matchesRecordsFromSource(Arrays.asList(sort(testData)), semantic);
            return true;
        } catch (Throwable t) {
            return false;
        }
    });
}

Source Testing Pattern

// In SourceTestSuiteBase
protected void checkResultWithSemantic(
    CloseableIterator<T> resultIterator,
    List<List<T>> testData,
    CheckpointingMode semantic,
    Integer limit
) {
    if (limit != null) {
        // Unbounded source with record limit
        Runnable runnable = () ->
            CollectIteratorAssertions.assertThat(resultIterator)
                .withNumRecordsLimit(limit)
                .matchesRecordsFromSource(testData, semantic);
        
        assertThatFuture(runAsync(runnable)).eventuallySucceeds();
    } else {
        // Bounded source
        CollectIteratorAssertions.assertThat(resultIterator)
            .matchesRecordsFromSource(testData, semantic);
    }
}

Advanced Validation Scenarios

Multi-Split Source Validation

Validates results from sources with multiple splits, ensuring each split's data is properly consumed.

// Test data organized by split
List<List<String>> testDataBySplit = Arrays.asList(
    Arrays.asList("split0-rec1", "split0-rec2", "split0-rec3"), // Split 0
    Arrays.asList("split1-rec1", "split1-rec2"),                 // Split 1  
    Arrays.asList("split2-rec1", "split2-rec2", "split2-rec3")  // Split 2
);

// Validation allows records from different splits to be interleaved
assertThat(resultIterator)
    .matchesRecordsFromSource(testDataBySplit, CheckpointingMode.EXACTLY_ONCE);

Bounded vs Unbounded Source Validation

// Bounded source - validate all expected records
public void testBoundedSource() {
    assertThat(resultIterator)
        .matchesRecordsFromSource(expectedData, semantic);
    // Will wait for job to finish naturally
}

// Unbounded source - validate limited number of records  
public void testUnboundedSource() {
    assertThat(resultIterator)
        .withNumRecordsLimit(expectedSize)
        .matchesRecordsFromSource(expectedData, semantic);
    // Will validate first expectedSize records then succeed
}

Failure Scenario Validation

// Validate recovery after failure
public void testTaskManagerFailure() {
    // Phase 1: Validate records before failure
    checkResultWithSemantic(iterator, beforeFailureData, semantic, beforeFailureData.size());
    
    // Trigger failure
    controller.triggerTaskManagerFailover(jobClient, () -> {
        // Write additional data after failure
        writeTestData(afterFailureData);
    });
    
    // Phase 2: Validate records after recovery
    checkResultWithSemantic(iterator, afterFailureData, semantic, afterFailureData.size());
}

Error Handling

Common Assertion Failures

// Record count mismatch
AssertionError: Expected 100 records but found 95
// Missing records in exactly-once
AssertionError: Expected record 'test-42' not found in results
// Unexpected records in exactly-once  
AssertionError: Unexpected record 'duplicate-7' found in results

Timeout Configuration

// Configure timeouts for result collection
CollectResultIterator<T> iterator = new CollectResultIterator<>(
    operatorUid,
    serializer, 
    accumulatorName,
    checkpointConfig,
    Duration.ofMinutes(5).toMillis() // 5 minute timeout
);

Debugging Failed Assertions

// Enable detailed logging for debugging
Logger logger = LoggerFactory.getLogger(CollectIteratorAssert.class);
// Add logging to capture actual vs expected results
// Use breakpoints to inspect iterator contents
// Check external system state for sink validations

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-test-utils

docs

assertions.md

containers.md

external-systems.md

index.md

junit-integration.md

metrics.md

test-environments.md

test-suites.md

tile.json