Infrastructure for testing savepoint migration, state compatibility, and checkpoint recovery across Apache Flink versions. These utilities are essential for validating upgrade paths and ensuring state serialization compatibility.
Abstract base class providing infrastructure for testing savepoint migration between Flink versions.
public abstract class SavepointMigrationTestBase extends TestBaseUtils {
@Rule
protected TemporaryFolder tempFolder;
protected LocalFlinkMiniCluster cluster;
@Before
public void setup() throws Exception;
@After
public void cleanup() throws Exception;
// Core migration testing methods
protected void executeAndSavepoint(StreamExecutionEnvironment env, String savepointPath, Tuple2<String, Integer>... expectedAccumulators) throws Exception;
protected void restoreAndExecute(StreamExecutionEnvironment env, String savepointPath, Tuple2<String, Integer>... expectedAccumulators) throws Exception;
// Resource management
protected static String getResourceFilename(String filename);
// Cluster management
protected void startCluster() throws Exception;
protected void stopCluster() throws Exception;
}Base class for testing operator state restoration across versions with two-step migration testing.
public abstract class AbstractOperatorRestoreTestBase {
protected Configuration config;
protected LocalFlinkMiniCluster cluster;
@Before
public void setup() throws Exception;
@After
public void cleanup() throws Exception;
// Migration workflow methods
protected abstract JobGraph createMigrationJob() throws Exception;
protected abstract JobGraph createRestoredJob() throws Exception;
protected abstract String getMigrationSavepointName();
// Test execution
@Test
public void testRestore() throws Exception;
}Specialized base for testing keyed operator state restoration with parameterized testing.
public abstract class AbstractKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {
@Parameterized.Parameters(name = "Savepoint: {0}")
public static Collection<Tuple2<String, String>> getParameters();
protected String savepointPath;
protected String savepointVersion;
public AbstractKeyedOperatorRestoreTestBase(String savepointPath, String savepointVersion);
}Specialized base for testing non-keyed operator state restoration.
public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {
@Parameterized.Parameters(name = "Savepoint: {0}")
public static Collection<Tuple2<String, String>> getParameters();
protected String savepointPath;
protected String savepointVersion;
public AbstractNonKeyedOperatorRestoreTestBase(String savepointPath, String savepointVersion);
}Concrete test for validating savepoint migration from Flink 1.1.
public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigrationTestBase {
private static final int NUM_SOURCE_ELEMENTS = 4;
@Test
public void testSavepoint() throws Exception;
// Job creation methods
private JobGraph createJobGraphV2() throws IOException;
private JobGraph createJobGraphV3() throws IOException;
@Override
protected String getResourceFilename(String filename);
}Concrete test for validating savepoint migration from Flink 1.2.
public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigrationTestBase {
private static final int NUM_SOURCE_ELEMENTS = 4;
@Test
public void testSavepoint() throws Exception;
// Job graph creation
private JobGraph createJobGraph() throws IOException;
@Override
protected String getResourceFilename(String filename);
}Test for validating savepoint migration within Flink 1.3.
public class StatefulJobSavepointFrom13MigrationITCase extends SavepointMigrationTestBase {
@Test
public void testSavepoint() throws Exception;
@Override
protected String getResourceFilename(String filename);
}Infrastructure for testing fault tolerance in streaming applications with multi-TaskManager clusters.
public abstract class StreamFaultToleranceTestBase {
protected static final int NUM_TASK_MANAGERS = 2;
protected LocalFlinkMiniCluster cluster;
protected Configuration config;
@Before
public void setup() throws Exception;
@After
public void cleanup() throws Exception;
// Abstract methods for test implementation
protected abstract void testProgram(StreamExecutionEnvironment env);
protected void postSubmit() throws Exception;
// Execution control
@Test
public void runCheckpointedProgram() throws Exception;
}public class MySavepointMigrationTest extends SavepointMigrationTestBase {
@Test
public void testSavepointMigration() throws Exception {
// Create initial streaming environment and execute
StreamExecutionEnvironment env1 = createV1Environment();
// Execute and create savepoint
executeAndSavepoint(env1, "my-migration-savepoint");
// Create updated streaming environment
StreamExecutionEnvironment env2 = createV2Environment();
// Restore from savepoint and execute
restoreAndExecute(env2, "my-migration-savepoint");
}
private StreamExecutionEnvironment createV1Environment() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500);
env.fromElements(1, 2, 3, 4, 5)
.keyBy(x -> x % 2)
.map(new StatefulMapper())
.addSink(new DiscardingSink<>());
return env;
}
private StreamExecutionEnvironment createV2Environment() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500);
// Modified job with additional transformation
env.fromElements(1, 2, 3, 4, 5)
.keyBy(x -> x % 2)
.map(new StatefulMapper())
.map(x -> x * 2) // Additional transformation
.addSink(new DiscardingSink<>());
return env;
}
@Override
protected String getResourceFilename(String filename) {
return "savepoints/" + filename;
}
}public class MyOperatorRestoreTest extends AbstractKeyedOperatorRestoreTestBase {
public MyOperatorRestoreTest(String savepointPath, String savepointVersion) {
super(savepointPath, savepointVersion);
}
@Parameterized.Parameters(name = "Savepoint: {0}")
public static Collection<Tuple2<String, String>> getParameters() {
return Arrays.asList(
new Tuple2<>("keyed-flink1.2", "1.2"),
new Tuple2<>("keyed-flink1.3", "1.3")
);
}
@Override
protected JobGraph createMigrationJob() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500);
// Create job that generates state
env.fromElements(1, 2, 3, 4, 5)
.keyBy(x -> x)
.map(new StatefulKeyedFunction())
.addSink(new DiscardingSink<>());
return env.getStreamGraph().getJobGraph();
}
@Override
protected JobGraph createRestoredJob() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create job that validates restored state
env.fromElements(6, 7, 8, 9, 10)
.keyBy(x -> x)
.map(new ValidatingKeyedFunction())
.addSink(new DiscardingSink<>());
return env.getStreamGraph().getJobGraph();
}
@Override
protected String getMigrationSavepointName() {
return savepointPath;
}
}public class MyFaultToleranceTest extends StreamFaultToleranceTestBase {
@Override
protected void testProgram(StreamExecutionEnvironment env) {
env.enableCheckpointing(500);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000));
env.fromElements(1, 2, 3, 4, 5)
.map(new FailingMapper()) // Intentionally fails
.keyBy(x -> x % 2)
.map(new RecoveringMapper()) // Recovers from failure
.addSink(new ValidatingSink());
}
@Override
protected void postSubmit() throws Exception {
// Validation logic after job completion
Thread.sleep(2000); // Wait for completion
// Verify results through external validation
}
}