Comprehensive testing infrastructure and utilities for Apache Flink stream processing framework
—
Framework for testing operator state restoration and migration, including utilities for both keyed and non-keyed state scenarios. This framework validates that streaming operators can correctly restore their state after failures, checkpoints, and version migrations.
Enumeration controlling function behavior for different test stages.
/**
* Execution mode enumeration for controlling test behavior
*/
public enum ExecutionMode {
/** Generate initial state and data */
GENERATE,
/** Migrate state to new format */
MIGRATE,
/** Restore from migrated state */
RESTORE
}Usage Example:
// Control test behavior based on execution mode
ExecutionMode mode = ExecutionMode.valueOf(args[0]);
switch (mode) {
case GENERATE:
// Generate initial state and create savepoint
runStateGenerationJob();
break;
case MIGRATE:
// Migrate state format if needed
runStateMigrationJob();
break;
case RESTORE:
// Restore from savepoint and verify
runStateRestorationJob();
break;
}Abstract base classes for testing operator state restoration scenarios.
/**
* Abstract base class for testing operator state restoration
*/
public abstract class AbstractOperatorRestoreTestBase {
/**
* Create the streaming topology for state testing
* @param env StreamExecutionEnvironment to configure
* @param mode ExecutionMode determining test behavior
*/
protected abstract void createRestorationTopology(
StreamExecutionEnvironment env,
ExecutionMode mode);
/**
* Verify state restoration was successful
* @param mode ExecutionMode that was executed
* @throws Exception if verification fails
*/
protected abstract void verifyRestorationResult(ExecutionMode mode) throws Exception;
}Framework for testing keyed operator state restoration.
/**
* Abstract base class for testing keyed operator state restoration
*/
public abstract class AbstractKeyedOperatorRestoreTestBase
extends AbstractOperatorRestoreTestBase {
/**
* Create stateful keyed operators for testing state restoration
* @param env StreamExecutionEnvironment to configure
* @param mode ExecutionMode determining behavior
*/
protected abstract void createKeyedStateTopology(
StreamExecutionEnvironment env,
ExecutionMode mode);
/**
* Verify keyed state was correctly restored
* @param expectedStateValues Expected state values after restoration
* @throws Exception if verification fails
*/
protected void verifyKeyedState(Map<String, Object> expectedStateValues) throws Exception;
}
/**
* Standalone job for testing keyed state migration
*/
public class KeyedJob {
/**
* Main entry point for keyed state migration testing
* @param args Command line arguments: [mode] [savepointPath] [checkpointDir]
* @throws Exception if job execution fails
*/
public static void main(String[] args) throws Exception;
}Framework for testing non-keyed operator state restoration.
/**
* Abstract base class for testing non-keyed operator state restoration
*/
public abstract class AbstractNonKeyedOperatorRestoreTestBase
extends AbstractOperatorRestoreTestBase {
/**
* Create stateful non-keyed operators for testing state restoration
* @param env StreamExecutionEnvironment to configure
* @param mode ExecutionMode determining behavior
*/
protected abstract void createNonKeyedStateTopology(
StreamExecutionEnvironment env,
ExecutionMode mode);
/**
* Verify non-keyed state was correctly restored
* @param expectedGlobalState Expected global state after restoration
* @throws Exception if verification fails
*/
protected void verifyNonKeyedState(Object expectedGlobalState) throws Exception;
}
/**
* Standalone job for testing non-keyed state migration
*/
public class NonKeyedJob {
/**
* Main entry point for non-keyed state migration testing
* @param args Command line arguments: [mode] [savepointPath] [checkpointDir]
* @throws Exception if job execution fails
*/
public static void main(String[] args) throws Exception;
}Common patterns for implementing state management tests:
Basic Keyed State Test:
public class KeyedStateRestorationTest extends AbstractKeyedOperatorRestoreTestBase {
private TestListResultSink<Tuple2<String, Integer>> resultSink;
@Override
protected void createKeyedStateTopology(StreamExecutionEnvironment env, ExecutionMode mode) {
env.setParallelism(2);
env.enableCheckpointing(100);
resultSink = new TestListResultSink<>();
DataStream<String> input;
if (mode == ExecutionMode.GENERATE) {
// Generate test data and state
input = env.fromElements("key1", "key1", "key2", "key2", "key1");
} else {
// Use minimal input for restoration testing
input = env.fromElements("key1", "key2");
}
input.keyBy(value -> value)
.process(new StatefulKeyedProcessFunction(mode))
.addSink(resultSink);
}
@Override
protected void verifyRestorationResult(ExecutionMode mode) throws Exception {
List<Tuple2<String, Integer>> results = resultSink.getResult();
if (mode == ExecutionMode.RESTORE) {
// Verify state was correctly restored
Map<String, Integer> stateMap = results.stream()
.collect(Collectors.toMap(t -> t.f0, t -> t.f1));
assertEquals(3, stateMap.get("key1").intValue()); // Previous count + 1
assertEquals(2, stateMap.get("key2").intValue()); // Previous count + 1
}
}
@Test
public void testKeyedStateRestoration() throws Exception {
// Phase 1: Generate state and create savepoint
String savepointPath = runTestPhase(ExecutionMode.GENERATE);
// Phase 2: Restore from savepoint and verify
runTestPhase(ExecutionMode.RESTORE, savepointPath);
}
}Non-Keyed State Test:
public class NonKeyedStateRestorationTest extends AbstractNonKeyedOperatorRestoreTestBase {
private TestListResultSink<Long> resultSink;
@Override
protected void createNonKeyedStateTopology(StreamExecutionEnvironment env, ExecutionMode mode) {
env.setParallelism(1); // Non-keyed operators typically use parallelism 1
env.enableCheckpointing(50);
resultSink = new TestListResultSink<>();
DataStream<Integer> input;
if (mode == ExecutionMode.GENERATE) {
input = env.fromElements(1, 2, 3, 4, 5);
} else {
input = env.fromElements(6, 7); // Additional elements for restore test
}
input.process(new StatefulNonKeyedProcessFunction(mode))
.addSink(resultSink);
}
@Override
protected void verifyRestorationResult(ExecutionMode mode) throws Exception {
List<Long> results = resultSink.getResult();
if (mode == ExecutionMode.RESTORE) {
// Verify global state was restored (running sum should continue)
long finalSum = results.get(results.size() - 1);
assertEquals(28, finalSum); // 15 (previous) + 6 + 7 = 28
}
}
}State Migration Test:
public class StateMigrationTest extends AbstractKeyedOperatorRestoreTestBase {
@Override
protected void createKeyedStateTopology(StreamExecutionEnvironment env, ExecutionMode mode) {
env.setParallelism(4);
env.enableCheckpointing(100);
DataStream<String> source = env.addSource(new TestDataSource(mode));
if (mode == ExecutionMode.MIGRATE) {
// Use evolved state schema
source.keyBy(value -> value)
.process(new EvolvedStateProcessFunction())
.addSink(new DiscardingSink<>());
} else {
// Use original state schema
source.keyBy(value -> value)
.process(new OriginalStateProcessFunction())
.addSink(new DiscardingSink<>());
}
}
@Test
public void testStateSchemaMigration() throws Exception {
// Generate with original schema
String originalSavepoint = runTestPhase(ExecutionMode.GENERATE);
// Migrate to new schema
String migratedSavepoint = runTestPhase(ExecutionMode.MIGRATE, originalSavepoint);
// Restore with new schema
runTestPhase(ExecutionMode.RESTORE, migratedSavepoint);
}
}Standalone Job Pattern:
// Using KeyedJob for state migration testing
public class KeyedStateMigrationITCase {
@Test
public void testKeyedStateMigration() throws Exception {
String checkpointDir = tempFolder.newFolder("checkpoints").getAbsolutePath();
String savepointPath = null;
// Generate phase
String[] generateArgs = {
ExecutionMode.GENERATE.toString(),
"null", // no input savepoint
checkpointDir
};
KeyedJob.main(generateArgs);
// Find generated savepoint
savepointPath = findLatestSavepoint(checkpointDir);
assertNotNull("Savepoint should be generated", savepointPath);
// Restore phase
String[] restoreArgs = {
ExecutionMode.RESTORE.toString(),
savepointPath,
checkpointDir
};
KeyedJob.main(restoreArgs);
// Test passes if restoration completes without exception
}
}Complex State Evolution Test:
public class ComplexStateEvolutionTest extends AbstractKeyedOperatorRestoreTestBase {
@Override
protected void createKeyedStateTopology(StreamExecutionEnvironment env, ExecutionMode mode) {
env.setParallelism(2);
env.enableCheckpointing(100);
DataStream<Tuple2<String, Integer>> source = createTestSource(mode);
source.keyBy(value -> value.f0)
.process(new MultiStateProcessFunction(mode))
.addSink(new TestResultSink());
}
/**
* Process function with multiple state types for evolution testing
*/
private static class MultiStateProcessFunction
extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {
private ValueState<Integer> countState;
private ListState<String> historyState;
private MapState<String, Long> timestampState;
@Override
public void open(Configuration parameters) throws Exception {
// Initialize state descriptors
countState = getRuntimeContext().getState(
new ValueStateDescriptor<>("count", Integer.class));
historyState = getRuntimeContext().getListState(
new ListStateDescriptor<>("history", String.class));
timestampState = getRuntimeContext().getMapState(
new MapStateDescriptor<>("timestamps", String.class, Long.class));
}
@Override
public void processElement(
Tuple2<String, Integer> value,
Context ctx,
Collector<String> out) throws Exception {
// Update multiple state types
Integer currentCount = countState.value();
countState.update(currentCount == null ? 1 : currentCount + 1);
historyState.add(value.toString());
timestampState.put(value.f0, ctx.timestamp());
// Emit result
out.collect(String.format("Key: %s, Count: %d",
value.f0, countState.value()));
}
}
}This state management testing framework ensures that Flink operators correctly maintain and restore their state across various failure and migration scenarios, providing confidence in the reliability of stateful streaming applications.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-tests-2-11