Complete framework for testing savepoint and checkpoint migration across different Flink versions. This framework provides infrastructure for validating that streaming applications can successfully restore from savepoints created with previous Flink versions, ensuring backward compatibility and state migration correctness.
Abstract base class providing core functionality for savepoint migration testing.
/**
* Base class for savepoint migration tests providing utilities for creating,
* restoring, and validating savepoints across different Flink versions
*/
public abstract class SavepointMigrationTestBase extends TestBaseUtils {
/**
* Get the full path to a test resource file
* @param filename Resource filename relative to test resources
* @return Full path to the resource file
*/
protected String getResourceFilename(String filename);
/**
* Execute a streaming job and create a savepoint at the specified path
* @param env StreamExecutionEnvironment configured for the test
* @param savepointPath Path where savepoint should be created
* @param expectedAccumulators Expected accumulator values for verification
* @throws Exception if job execution or savepoint creation fails
*/
protected void executeAndSavepoint(
StreamExecutionEnvironment env,
String savepointPath,
Tuple2<String, Integer>... expectedAccumulators) throws Exception;
/**
* Restore a streaming job from savepoint and execute to completion
* @param env StreamExecutionEnvironment configured for the test
* @param savepointPath Path to existing savepoint
* @param expectedAccumulators Expected accumulator values for verification
* @throws Exception if restoration or execution fails
*/
protected void restoreAndExecute(
StreamExecutionEnvironment env,
String savepointPath,
Tuple2<String, Integer>... expectedAccumulators) throws Exception;
}Usage Example:
public class MyMigrationTest extends SavepointMigrationTestBase {
@Test
public void testMigrationFromFlink17() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// Configure your streaming topology
env.addSource(new TestSource())
.keyBy(value -> value.getKey())
.map(new StatefulMapper())
.addSink(new TestSink());
// Restore from Flink 1.7 savepoint and verify
String savepointPath = getResourceFilename("migration-test-flink1.7-savepoint");
restoreAndExecute(env, savepointPath,
Tuple2.of("elements-count", 1000),
Tuple2.of("checkpoints-count", 10));
}
}Utility classes and sources/sinks specifically designed for migration testing.
/**
* Utility class containing specialized sources and sinks for migration testing
*/
public class MigrationTestUtils {
/**
* Source with list state for checkpointing tests (non-parallel)
*/
public static class CheckpointingNonParallelSourceWithListState
extends RichSourceFunction<Tuple2<Long, Long>>
implements ListCheckpointed<Long> {
public CheckpointingNonParallelSourceWithListState(int numElements);
public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception;
public void cancel();
public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception;
public void restoreState(List<Long> state) throws Exception;
}
/**
* Source for verifying restored state (non-parallel)
*/
public static class CheckingNonParallelSourceWithListState
extends RichSourceFunction<Tuple2<Long, Long>>
implements ListCheckpointed<Long> {
public CheckingNonParallelSourceWithListState(int numElements);
public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception;
public void cancel();
public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception;
public void restoreState(List<Long> state) throws Exception;
}
/**
* Parallel source with union list state for checkpointing tests
*/
public static class CheckpointingParallelSourceWithUnionListState
extends RichParallelSourceFunction<Tuple2<Long, Long>>
implements ListCheckpointed<Long> {
public CheckpointingParallelSourceWithUnionListState(int numElements);
public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception;
public void cancel();
public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception;
public void restoreState(List<Long> state) throws Exception;
}
/**
* Parallel source for verifying union list state restoration
*/
public static class CheckingParallelSourceWithUnionListState
extends RichParallelSourceFunction<Tuple2<Long, Long>>
implements ListCheckpointed<Long> {
public CheckingParallelSourceWithUnionListState(int numElements);
public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception;
public void cancel();
public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception;
public void restoreState(List<Long> state) throws Exception;
}
/**
* Sink that counts elements using accumulator
*/
public static class AccumulatorCountingSink<T> extends RichSinkFunction<T> {
public AccumulatorCountingSink(String accumulatorName);
public void open(Configuration parameters) throws Exception;
public void invoke(T value, Context context) throws Exception;
}
}Specialized source that can introduce controlled failures for fault tolerance and migration testing.
/**
* Source that can introduce artificial failures for fault tolerance testing
*/
public class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>> {
/**
* Functional interface for event emission strategies
*/
@FunctionalInterface
public interface EventEmittingGenerator {
void emitEvent(SourceContext<Tuple2<Long, IntType>> ctx, int eventSequenceNo);
}
/**
* Constructor for basic failing source
* @param eventGenerator Generator function for creating events
* @param numEvents Total number of events to generate
* @param numElementsUntilFailure Number of elements before inducing failure
* @param numSuccessfulCheckpoints Number of successful checkpoints before failure
*/
public FailingSource(
EventEmittingGenerator eventGenerator,
int numEvents,
int numElementsUntilFailure,
int numSuccessfulCheckpoints);
/**
* Constructor with failure position control
* @param eventGenerator Generator function for creating events
* @param numEvents Total number of events to generate
* @param failurePos Position at which to induce failure
* @param numSuccessfulCheckpoints Number of successful checkpoints before failure
* @param continueAfterFailure Whether to continue after failure
*/
public FailingSource(
EventEmittingGenerator eventGenerator,
int numEvents,
int failurePos,
int numSuccessfulCheckpoints,
boolean continueAfterFailure);
public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception;
public void cancel();
}Supporting data types and utilities for migration testing.
/**
* Simple integer wrapper for testing
*/
public class IntType {
public int value;
public IntType();
public IntType(int value);
public boolean equals(Object obj);
public int hashCode();
public String toString();
}
/**
* Sink for result validation in migration tests
*/
public class ValidatingSink<T> extends RichSinkFunction<T> {
public ValidatingSink(List<T> expectedValues);
public void open(Configuration parameters) throws Exception;
public void invoke(T value, Context context) throws Exception;
public void close() throws Exception;
}Common patterns for implementing migration tests:
Basic Migration Test Pattern:
public class StatefulJobMigrationTest extends SavepointMigrationTestBase {
@Test
public void testMigrationFromVersion14() throws Exception {
// 1. Setup environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new RocksDBStateBackend("file:///tmp/test-backend"));
env.enableCheckpointing(100);
// 2. Create topology with stateful operators
DataStream<Tuple2<Long, IntType>> source = env.addSource(
new MigrationTestUtils.CheckingNonParallelSourceWithListState(100));
source.keyBy(value -> value.f0)
.map(new StatefulMapFunction())
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>("count"));
// 3. Restore from savepoint and execute
String savepointPath = getResourceFilename("stateful-job-flink1.4-savepoint");
restoreAndExecute(env, savepointPath, Tuple2.of("count", 100));
}
}Fault Tolerance Migration Test:
@Test
public void testFaultToleranceMigration() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
env.enableCheckpointing(50);
// Source that will fail and recover
FailingSource source = new FailingSource(
(ctx, eventSeq) -> ctx.collect(Tuple2.of((long) eventSeq, new IntType(eventSeq))),
1000, // total events
500, // fail after 500 events
5 // after 5 successful checkpoints
);
env.addSource(source)
.keyBy(value -> value.f0 % 4)
.map(new RecoveringMapFunction())
.addSink(new ValidatingSink<>(expectedResults));
String savepointPath = getResourceFilename("fault-tolerance-test-savepoint");
restoreAndExecute(env, savepointPath, Tuple2.of("processed", 1000));
}State Evolution Migration Test:
@Test
public void testStateEvolutionMigration() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Test state schema evolution
env.addSource(new MigrationTestUtils.CheckingNonParallelSourceWithListState(50))
.keyBy(value -> value.f0)
.process(new EvolvingProcessFunction()) // Function with evolved state schema
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>("evolved-count"));
String savepointPath = getResourceFilename("state-evolution-flink1.6-savepoint");
restoreAndExecute(env, savepointPath, Tuple2.of("evolved-count", 50));
}This migration testing framework ensures that Flink applications maintain backward compatibility across version upgrades and that stateful streaming applications can successfully restore from savepoints created with previous versions.