CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-tests

Comprehensive test library for Apache Flink stream processing framework with integration tests, test utilities, and end-to-end validation tests.

Pending
Overview
Eval results
Files

checkpointing-migration.mddocs/

Checkpointing and Migration Testing

Comprehensive framework for testing snapshot migration across Flink versions with utilities for state validation and checkpoint management. This framework enables testing of state compatibility, migration correctness, and checkpoint recovery scenarios.

Capabilities

Snapshot Migration Test Base

Abstract base class for testing snapshot migration across Flink versions, providing structured approach to create snapshots in one version and restore them in another.

/**
 * Base class for testing snapshot migration across Flink versions
 */
public abstract class SnapshotMigrationTestBase {
    /**
     * Execute job and create snapshot for migration testing
     * @param job JobGraph to execute and snapshot
     * @return SnapshotSpec containing snapshot metadata
     * @throws Exception if execution or snapshotting fails
     */
    protected SnapshotSpec executeAndSnapshot(JobGraph job) throws Exception;
    
    /**
     * Restore from snapshot and execute job to validate migration
     * @param job JobGraph to execute with restored state
     * @param snapshot SnapshotSpec containing snapshot location and metadata
     * @throws Exception if restore or execution fails
     */
    protected void restoreAndExecute(JobGraph job, SnapshotSpec snapshot) throws Exception;
    
    /**
     * Snapshot specification containing metadata for migration testing
     */
    public static class SnapshotSpec {
        /**
         * Get the filesystem path to the snapshot
         * @return String path to snapshot directory
         */
        public String getSnapshotPath();
        
        /**
         * Get the Flink version that created this snapshot
         * @return String version identifier
         */
        public String getSnapshotVersion();
    }
}

Migration Test Utilities

Comprehensive utilities for migration testing including sources, sinks, and operators with state management capabilities.

/**
 * Utility class providing components for migration testing scenarios
 */
public class MigrationTestUtils {
    
    /**
     * Source function with operator list state for migration testing
     */
    public static class CheckpointingNonParallelSourceWithListState 
        implements SourceFunction<Integer> {
        
        /**
         * Constructor for checkpointing source with list state
         * @param numElements number of elements to emit
         */
        public CheckpointingNonParallelSourceWithListState(int numElements);
    }
    
    /**
     * Source function for validating restored list state after migration
     */
    public static class CheckingNonParallelSourceWithListState 
        implements SourceFunction<Integer> {
        
        /**
         * Constructor for validation source
         * @param numElements number of elements for validation
         */
        public CheckingNonParallelSourceWithListState(int numElements);
    }
    
    /**
     * Parallel source with union list state for migration testing
     */
    public static class CheckpointingParallelSourceWithUnionListState 
        implements SourceFunction<Integer> {
        
        /**
         * Constructor for parallel source with union state
         * @param numElements elements per subtask
         */
        public CheckpointingParallelSourceWithUnionListState(int numElements);
    }
    
    /**
     * Parallel source for validating union list state after migration
     */
    public static class CheckingParallelSourceWithUnionListState 
        implements SourceFunction<Integer> {
        
        /**
         * Constructor for parallel validation source
         * @param numElements number of elements for validation
         */
        public CheckingParallelSourceWithUnionListState(int numElements);
    }
    
    /**
     * Sink that counts elements using Flink accumulators
     */
    public static class AccumulatorCountingSink<T> implements SinkFunction<T> {
        
        /**
         * Constructor for accumulator counting sink
         * @param accumulatorName name of the accumulator to use
         */
        public AccumulatorCountingSink(String accumulatorName);
    }
    
    /**
     * Source with configurable failure injection for testing recovery
     */
    public static class FailingSource implements SourceFunction<Integer> {
        
        /**
         * Constructor for failing source
         * @param failAfterElements number of elements before failure
         * @param maxElements maximum elements to emit after recovery
         */
        public FailingSource(int failAfterElements, int maxElements);
    }
    
    /**
     * Source for testing job cancellation scenarios
     */
    public static class CancellingIntegerSource implements SourceFunction<Integer> {
        
        /**
         * Constructor for cancelling source
         * @param cancelAfterElements elements to emit before triggering cancellation
         */
        public CancellingIntegerSource(int cancelAfterElements);
    }
    
    /**
     * Sink that accumulates integer values for validation
     */
    public static class AccumulatingIntegerSink implements SinkFunction<Integer> {
        
        /**
         * Constructor for accumulating sink
         * @param outputList list to accumulate values into
         */
        public AccumulatingIntegerSink(List<Integer> outputList);
    }
    
    /**
     * Sink for validating output against expected values
     */
    public static class ValidatingSink<T> implements SinkFunction<T> {
        
        /**
         * Constructor for validating sink
         * @param expectedValues expected output values for validation
         */
        public ValidatingSink(List<T> expectedValues);
    }
}

Usage Examples:

import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
import org.apache.flink.test.checkpointing.utils.MigrationTestUtils.*;

// Basic migration test
public class StateMigrationTest extends SnapshotMigrationTestBase {
    
    @Test
    public void testListStateMigration() throws Exception {
        // Create job with stateful source
        JobGraph job = new JobGraph();
        job.addVertex(new JobVertex("source", 
            new CheckpointingNonParallelSourceWithListState(100)));
        job.addVertex(new JobVertex("sink", 
            new AccumulatorCountingSink<>("count")));
        
        // Execute and create snapshot
        SnapshotSpec snapshot = executeAndSnapshot(job);
        
        // Create validation job 
        JobGraph validationJob = new JobGraph();
        validationJob.addVertex(new JobVertex("validation-source", 
            new CheckingNonParallelSourceWithListState(
                Arrays.asList(1, 2, 3, 4, 5))));
        
        // Restore and validate
        restoreAndExecute(validationJob, snapshot);
    }
    
    @Test 
    public void testUnionStateMigration() throws Exception {
        // Test parallel source with union state
        JobGraph job = new JobGraph();
        JobVertex sourceVertex = new JobVertex("parallel-source",
            new CheckpointingParallelSourceWithUnionListState(50));
        sourceVertex.setParallelism(4);
        job.addVertex(sourceVertex);
        
        SnapshotSpec snapshot = executeAndSnapshot(job);
        
        // Validation with different parallelism
        JobGraph validationJob = new JobGraph();
        JobVertex validationVertex = new JobVertex("validation-source",
            new CheckingParallelSourceWithUnionListState(
                expectedUnionState));
        validationVertex.setParallelism(2);
        validationJob.addVertex(validationVertex);
        
        restoreAndExecute(validationJob, snapshot);
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-tests

docs

cancellation-testing.md

checkpointing-migration.md

fault-tolerance-recovery.md

index.md

operator-lifecycle.md

plugin-testing.md

runtime-utilities.md

session-window-testing.md

state-backend-restore.md

test-data-utilities.md

tile.json