or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

api-completeness.mdcheckpointing.mddata-generation.mdexecution.mdfault-tolerance.mdindex.mdstreaming.md
tile.json

checkpointing.mddocs/

Checkpointing and State Management

Infrastructure for testing savepoint migration, state compatibility, and checkpoint recovery across Apache Flink versions. These utilities are essential for validating upgrade paths and ensuring state serialization compatibility.

Core Base Classes

SavepointMigrationTestBase

Abstract base class providing infrastructure for testing savepoint migration between Flink versions.

public abstract class SavepointMigrationTestBase extends TestBaseUtils {
    @Rule
    protected TemporaryFolder tempFolder;
    protected LocalFlinkMiniCluster cluster;
    
    @Before
    public void setup() throws Exception;
    
    @After  
    public void cleanup() throws Exception;
    
    // Core migration testing methods
    protected void executeAndSavepoint(StreamExecutionEnvironment env, String savepointPath, Tuple2<String, Integer>... expectedAccumulators) throws Exception;
    protected void restoreAndExecute(StreamExecutionEnvironment env, String savepointPath, Tuple2<String, Integer>... expectedAccumulators) throws Exception;
    
    // Resource management
    protected static String getResourceFilename(String filename);
    
    // Cluster management
    protected void startCluster() throws Exception;
    protected void stopCluster() throws Exception;
}

AbstractOperatorRestoreTestBase

Base class for testing operator state restoration across versions with two-step migration testing.

public abstract class AbstractOperatorRestoreTestBase {
    protected Configuration config;
    protected LocalFlinkMiniCluster cluster;
    
    @Before
    public void setup() throws Exception;
    
    @After
    public void cleanup() throws Exception;
    
    // Migration workflow methods
    protected abstract JobGraph createMigrationJob() throws Exception;
    protected abstract JobGraph createRestoredJob() throws Exception;
    protected abstract String getMigrationSavepointName();
    
    // Test execution
    @Test
    public void testRestore() throws Exception;
}

AbstractKeyedOperatorRestoreTestBase

Specialized base for testing keyed operator state restoration with parameterized testing.

public abstract class AbstractKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {
    @Parameterized.Parameters(name = "Savepoint: {0}")
    public static Collection<Tuple2<String, String>> getParameters();
    
    protected String savepointPath;
    protected String savepointVersion;
    
    public AbstractKeyedOperatorRestoreTestBase(String savepointPath, String savepointVersion);
}

AbstractNonKeyedOperatorRestoreTestBase

Specialized base for testing non-keyed operator state restoration.

public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {
    @Parameterized.Parameters(name = "Savepoint: {0}")
    public static Collection<Tuple2<String, String>> getParameters();
    
    protected String savepointPath;
    protected String savepointVersion;
    
    public AbstractNonKeyedOperatorRestoreTestBase(String savepointPath, String savepointVersion);
}

Savepoint Migration Test Cases

StatefulJobSavepointFrom11MigrationITCase

Concrete test for validating savepoint migration from Flink 1.1.

public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigrationTestBase {
    private static final int NUM_SOURCE_ELEMENTS = 4;
    
    @Test
    public void testSavepoint() throws Exception;
    
    // Job creation methods
    private JobGraph createJobGraphV2() throws IOException;
    private JobGraph createJobGraphV3() throws IOException;
    
    @Override
    protected String getResourceFilename(String filename);
}

StatefulJobSavepointFrom12MigrationITCase

Concrete test for validating savepoint migration from Flink 1.2.

public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigrationTestBase {
    private static final int NUM_SOURCE_ELEMENTS = 4;
    
    @Test
    public void testSavepoint() throws Exception;
    
    // Job graph creation
    private JobGraph createJobGraph() throws IOException;
    
    @Override  
    protected String getResourceFilename(String filename);
}

StatefulJobSavepointFrom13MigrationITCase

Test for validating savepoint migration within Flink 1.3.

public class StatefulJobSavepointFrom13MigrationITCase extends SavepointMigrationTestBase {
    @Test
    public void testSavepoint() throws Exception;
    
    @Override
    protected String getResourceFilename(String filename);
}

Stream Fault Tolerance Base

StreamFaultToleranceTestBase

Infrastructure for testing fault tolerance in streaming applications with multi-TaskManager clusters.

public abstract class StreamFaultToleranceTestBase {
    protected static final int NUM_TASK_MANAGERS = 2;
    protected LocalFlinkMiniCluster cluster;
    protected Configuration config;
    
    @Before
    public void setup() throws Exception;
    
    @After
    public void cleanup() throws Exception;
    
    // Abstract methods for test implementation
    protected abstract void testProgram(StreamExecutionEnvironment env);
    protected void postSubmit() throws Exception;
    
    // Execution control
    @Test
    public void runCheckpointedProgram() throws Exception;
}

Usage Examples

Basic Savepoint Migration Test

public class MySavepointMigrationTest extends SavepointMigrationTestBase {
    
    @Test
    public void testSavepointMigration() throws Exception {
        // Create initial streaming environment and execute
        StreamExecutionEnvironment env1 = createV1Environment();
        
        // Execute and create savepoint
        executeAndSavepoint(env1, "my-migration-savepoint");
        
        // Create updated streaming environment
        StreamExecutionEnvironment env2 = createV2Environment();
        
        // Restore from savepoint and execute
        restoreAndExecute(env2, "my-migration-savepoint");
    }
    
    private StreamExecutionEnvironment createV1Environment() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(500);
        
        env.fromElements(1, 2, 3, 4, 5)
           .keyBy(x -> x % 2)
           .map(new StatefulMapper())
           .addSink(new DiscardingSink<>());
           
        return env;
    }
    
    private StreamExecutionEnvironment createV2Environment() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(500);
        
        // Modified job with additional transformation
        env.fromElements(1, 2, 3, 4, 5)
           .keyBy(x -> x % 2)
           .map(new StatefulMapper())
           .map(x -> x * 2) // Additional transformation
           .addSink(new DiscardingSink<>());
           
        return env;
    }
    
    @Override
    protected String getResourceFilename(String filename) {
        return "savepoints/" + filename;
    }
}

Operator State Restoration Test

public class MyOperatorRestoreTest extends AbstractKeyedOperatorRestoreTestBase {
    
    public MyOperatorRestoreTest(String savepointPath, String savepointVersion) {
        super(savepointPath, savepointVersion);
    }
    
    @Parameterized.Parameters(name = "Savepoint: {0}")
    public static Collection<Tuple2<String, String>> getParameters() {
        return Arrays.asList(
            new Tuple2<>("keyed-flink1.2", "1.2"),
            new Tuple2<>("keyed-flink1.3", "1.3")
        );
    }
    
    @Override
    protected JobGraph createMigrationJob() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(500);
        
        // Create job that generates state
        env.fromElements(1, 2, 3, 4, 5)
           .keyBy(x -> x)
           .map(new StatefulKeyedFunction())
           .addSink(new DiscardingSink<>());
           
        return env.getStreamGraph().getJobGraph();
    }
    
    @Override
    protected JobGraph createRestoredJob() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Create job that validates restored state
        env.fromElements(6, 7, 8, 9, 10)
           .keyBy(x -> x)
           .map(new ValidatingKeyedFunction())
           .addSink(new DiscardingSink<>());
           
        return env.getStreamGraph().getJobGraph();
    }
    
    @Override
    protected String getMigrationSavepointName() {
        return savepointPath;
    }
}

Stream Fault Tolerance Test

public class MyFaultToleranceTest extends StreamFaultToleranceTestBase {
    
    @Override
    protected void testProgram(StreamExecutionEnvironment env) {
        env.enableCheckpointing(500);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000));
        
        env.fromElements(1, 2, 3, 4, 5)
           .map(new FailingMapper()) // Intentionally fails
           .keyBy(x -> x % 2)  
           .map(new RecoveringMapper()) // Recovers from failure
           .addSink(new ValidatingSink());
    }
    
    @Override
    protected void postSubmit() throws Exception {
        // Validation logic after job completion
        Thread.sleep(2000); // Wait for completion
        // Verify results through external validation
    }
}