Framework for testing operator state restoration and migration, including utilities for both keyed and non-keyed state scenarios. This framework validates that streaming operators can correctly restore their state after failures, checkpoints, and version migrations.
Enumeration controlling function behavior for different test stages.
/**
* Execution mode enumeration for controlling test behavior
*/
public enum ExecutionMode {
/** Generate initial state and data */
GENERATE,
/** Migrate state to new format */
MIGRATE,
/** Restore from migrated state */
RESTORE
}Usage Example:
// Control test behavior based on execution mode
ExecutionMode mode = ExecutionMode.valueOf(args[0]);
switch (mode) {
case GENERATE:
// Generate initial state and create savepoint
runStateGenerationJob();
break;
case MIGRATE:
// Migrate state format if needed
runStateMigrationJob();
break;
case RESTORE:
// Restore from savepoint and verify
runStateRestorationJob();
break;
}Abstract base classes for testing operator state restoration scenarios.
/**
* Abstract base class for testing operator state restoration
*/
public abstract class AbstractOperatorRestoreTestBase {
/**
* Create the streaming topology for state testing
* @param env StreamExecutionEnvironment to configure
* @param mode ExecutionMode determining test behavior
*/
protected abstract void createRestorationTopology(
StreamExecutionEnvironment env,
ExecutionMode mode);
/**
* Verify state restoration was successful
* @param mode ExecutionMode that was executed
* @throws Exception if verification fails
*/
protected abstract void verifyRestorationResult(ExecutionMode mode) throws Exception;
}Framework for testing keyed operator state restoration.
/**
* Abstract base class for testing keyed operator state restoration
*/
public abstract class AbstractKeyedOperatorRestoreTestBase
extends AbstractOperatorRestoreTestBase {
/**
* Create stateful keyed operators for testing state restoration
* @param env StreamExecutionEnvironment to configure
* @param mode ExecutionMode determining behavior
*/
protected abstract void createKeyedStateTopology(
StreamExecutionEnvironment env,
ExecutionMode mode);
/**
* Verify keyed state was correctly restored
* @param expectedStateValues Expected state values after restoration
* @throws Exception if verification fails
*/
protected void verifyKeyedState(Map<String, Object> expectedStateValues) throws Exception;
}
/**
* Standalone job for testing keyed state migration
*/
public class KeyedJob {
/**
* Main entry point for keyed state migration testing
* @param args Command line arguments: [mode] [savepointPath] [checkpointDir]
* @throws Exception if job execution fails
*/
public static void main(String[] args) throws Exception;
}Framework for testing non-keyed operator state restoration.
/**
* Abstract base class for testing non-keyed operator state restoration
*/
public abstract class AbstractNonKeyedOperatorRestoreTestBase
extends AbstractOperatorRestoreTestBase {
/**
* Create stateful non-keyed operators for testing state restoration
* @param env StreamExecutionEnvironment to configure
* @param mode ExecutionMode determining behavior
*/
protected abstract void createNonKeyedStateTopology(
StreamExecutionEnvironment env,
ExecutionMode mode);
/**
* Verify non-keyed state was correctly restored
* @param expectedGlobalState Expected global state after restoration
* @throws Exception if verification fails
*/
protected void verifyNonKeyedState(Object expectedGlobalState) throws Exception;
}
/**
* Standalone job for testing non-keyed state migration
*/
public class NonKeyedJob {
/**
* Main entry point for non-keyed state migration testing
* @param args Command line arguments: [mode] [savepointPath] [checkpointDir]
* @throws Exception if job execution fails
*/
public static void main(String[] args) throws Exception;
}Common patterns for implementing state management tests:
Basic Keyed State Test:
public class KeyedStateRestorationTest extends AbstractKeyedOperatorRestoreTestBase {
private TestListResultSink<Tuple2<String, Integer>> resultSink;
@Override
protected void createKeyedStateTopology(StreamExecutionEnvironment env, ExecutionMode mode) {
env.setParallelism(2);
env.enableCheckpointing(100);
resultSink = new TestListResultSink<>();
DataStream<String> input;
if (mode == ExecutionMode.GENERATE) {
// Generate test data and state
input = env.fromElements("key1", "key1", "key2", "key2", "key1");
} else {
// Use minimal input for restoration testing
input = env.fromElements("key1", "key2");
}
input.keyBy(value -> value)
.process(new StatefulKeyedProcessFunction(mode))
.addSink(resultSink);
}
@Override
protected void verifyRestorationResult(ExecutionMode mode) throws Exception {
List<Tuple2<String, Integer>> results = resultSink.getResult();
if (mode == ExecutionMode.RESTORE) {
// Verify state was correctly restored
Map<String, Integer> stateMap = results.stream()
.collect(Collectors.toMap(t -> t.f0, t -> t.f1));
assertEquals(3, stateMap.get("key1").intValue()); // Previous count + 1
assertEquals(2, stateMap.get("key2").intValue()); // Previous count + 1
}
}
@Test
public void testKeyedStateRestoration() throws Exception {
// Phase 1: Generate state and create savepoint
String savepointPath = runTestPhase(ExecutionMode.GENERATE);
// Phase 2: Restore from savepoint and verify
runTestPhase(ExecutionMode.RESTORE, savepointPath);
}
}Non-Keyed State Test:
public class NonKeyedStateRestorationTest extends AbstractNonKeyedOperatorRestoreTestBase {
private TestListResultSink<Long> resultSink;
@Override
protected void createNonKeyedStateTopology(StreamExecutionEnvironment env, ExecutionMode mode) {
env.setParallelism(1); // Non-keyed operators typically use parallelism 1
env.enableCheckpointing(50);
resultSink = new TestListResultSink<>();
DataStream<Integer> input;
if (mode == ExecutionMode.GENERATE) {
input = env.fromElements(1, 2, 3, 4, 5);
} else {
input = env.fromElements(6, 7); // Additional elements for restore test
}
input.process(new StatefulNonKeyedProcessFunction(mode))
.addSink(resultSink);
}
@Override
protected void verifyRestorationResult(ExecutionMode mode) throws Exception {
List<Long> results = resultSink.getResult();
if (mode == ExecutionMode.RESTORE) {
// Verify global state was restored (running sum should continue)
long finalSum = results.get(results.size() - 1);
assertEquals(28, finalSum); // 15 (previous) + 6 + 7 = 28
}
}
}State Migration Test:
public class StateMigrationTest extends AbstractKeyedOperatorRestoreTestBase {
@Override
protected void createKeyedStateTopology(StreamExecutionEnvironment env, ExecutionMode mode) {
env.setParallelism(4);
env.enableCheckpointing(100);
DataStream<String> source = env.addSource(new TestDataSource(mode));
if (mode == ExecutionMode.MIGRATE) {
// Use evolved state schema
source.keyBy(value -> value)
.process(new EvolvedStateProcessFunction())
.addSink(new DiscardingSink<>());
} else {
// Use original state schema
source.keyBy(value -> value)
.process(new OriginalStateProcessFunction())
.addSink(new DiscardingSink<>());
}
}
@Test
public void testStateSchemaMigration() throws Exception {
// Generate with original schema
String originalSavepoint = runTestPhase(ExecutionMode.GENERATE);
// Migrate to new schema
String migratedSavepoint = runTestPhase(ExecutionMode.MIGRATE, originalSavepoint);
// Restore with new schema
runTestPhase(ExecutionMode.RESTORE, migratedSavepoint);
}
}Standalone Job Pattern:
// Using KeyedJob for state migration testing
public class KeyedStateMigrationITCase {
@Test
public void testKeyedStateMigration() throws Exception {
String checkpointDir = tempFolder.newFolder("checkpoints").getAbsolutePath();
String savepointPath = null;
// Generate phase
String[] generateArgs = {
ExecutionMode.GENERATE.toString(),
"null", // no input savepoint
checkpointDir
};
KeyedJob.main(generateArgs);
// Find generated savepoint
savepointPath = findLatestSavepoint(checkpointDir);
assertNotNull("Savepoint should be generated", savepointPath);
// Restore phase
String[] restoreArgs = {
ExecutionMode.RESTORE.toString(),
savepointPath,
checkpointDir
};
KeyedJob.main(restoreArgs);
// Test passes if restoration completes without exception
}
}Complex State Evolution Test:
public class ComplexStateEvolutionTest extends AbstractKeyedOperatorRestoreTestBase {
@Override
protected void createKeyedStateTopology(StreamExecutionEnvironment env, ExecutionMode mode) {
env.setParallelism(2);
env.enableCheckpointing(100);
DataStream<Tuple2<String, Integer>> source = createTestSource(mode);
source.keyBy(value -> value.f0)
.process(new MultiStateProcessFunction(mode))
.addSink(new TestResultSink());
}
/**
* Process function with multiple state types for evolution testing
*/
private static class MultiStateProcessFunction
extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {
private ValueState<Integer> countState;
private ListState<String> historyState;
private MapState<String, Long> timestampState;
@Override
public void open(Configuration parameters) throws Exception {
// Initialize state descriptors
countState = getRuntimeContext().getState(
new ValueStateDescriptor<>("count", Integer.class));
historyState = getRuntimeContext().getListState(
new ListStateDescriptor<>("history", String.class));
timestampState = getRuntimeContext().getMapState(
new MapStateDescriptor<>("timestamps", String.class, Long.class));
}
@Override
public void processElement(
Tuple2<String, Integer> value,
Context ctx,
Collector<String> out) throws Exception {
// Update multiple state types
Integer currentCount = countState.value();
countState.update(currentCount == null ? 1 : currentCount + 1);
historyState.add(value.toString());
timestampState.put(value.f0, ctx.timestamp());
// Emit result
out.collect(String.format("Key: %s, Count: %d",
value.f0, countState.value()));
}
}
}This state management testing framework ensures that Flink operators correctly maintain and restore their state across various failure and migration scenarios, providing confidence in the reliability of stateful streaming applications.