Comprehensive test library for Apache Flink stream processing framework with integration tests, test utilities, and end-to-end validation tests.
—
Framework for testing state backend switching, operator restore scenarios, and state migration validation. This framework ensures state compatibility across different backends and validates operator restore behavior.
Base class providing common functionality for testing operator restore scenarios across different state management configurations.
/**
* Abstract base class for operator restore testing scenarios
*/
public abstract class AbstractOperatorRestoreTestBase {
/**
* Test operator restore from savepoint or checkpoint
* @throws Exception if restore testing fails
*/
protected abstract void testRestore() throws Exception;
/**
* Create savepoint from running job for restore testing
* @param jobGraph job to create savepoint from
* @return String path to created savepoint
* @throws Exception if savepoint creation fails
*/
protected String createSavepoint(JobGraph jobGraph) throws Exception;
/**
* Restore job from savepoint and validate state
* @param jobGraph job to restore
* @param savepointPath path to savepoint
* @throws Exception if restore or validation fails
*/
protected void restoreFromSavepoint(JobGraph jobGraph, String savepointPath) throws Exception;
/**
* Configure state backend for testing
* @param backend state backend configuration
*/
protected void configureStateBackend(StateBackend backend);
}Specialized base class for testing restore scenarios with keyed state operators.
/**
* Base class for testing keyed operator restore scenarios
*/
public abstract class AbstractKeyedOperatorRestoreTestBase
extends AbstractOperatorRestoreTestBase {
/**
* Test keyed state restore with different key serializers
* @param keySerializer serializer for key type
* @param valueSerializer serializer for value type
* @throws Exception if keyed restore test fails
*/
protected <K, V> void testKeyedStateRestore(
TypeSerializer<K> keySerializer,
TypeSerializer<V> valueSerializer) throws Exception;
/**
* Validate keyed state after restore
* @param expectedState expected state values after restore
* @param actualState actual restored state
* @return boolean indicating state validity
*/
protected <K, V> boolean validateKeyedState(
Map<K, V> expectedState,
Map<K, V> actualState);
/**
* Create keyed state test job with configurable state
* @param initialState initial state values
* @return JobGraph configured for keyed state testing
*/
protected <K, V> JobGraph createKeyedStateJob(Map<K, V> initialState);
}Base class for testing restore scenarios with non-keyed (operator) state.
/**
* Base class for testing non-keyed operator restore scenarios
*/
public abstract class AbstractNonKeyedOperatorRestoreTestBase
extends AbstractOperatorRestoreTestBase {
/**
* Test operator state restore with list state
* @param initialListState initial list state values
* @throws Exception if non-keyed restore test fails
*/
protected <T> void testListStateRestore(List<T> initialListState) throws Exception;
/**
* Test operator state restore with union list state
* @param initialUnionState initial union state values
* @throws Exception if union state restore test fails
*/
protected <T> void testUnionListStateRestore(List<T> initialUnionState) throws Exception;
/**
* Test operator state restore with broadcast state
* @param initialBroadcastState initial broadcast state
* @throws Exception if broadcast state restore test fails
*/
protected <K, V> void testBroadcastStateRestore(
Map<K, V> initialBroadcastState) throws Exception;
/**
* Validate operator state after restore
* @param expectedState expected operator state
* @param actualState actual restored operator state
* @return boolean indicating state validity
*/
protected boolean validateOperatorState(
OperatorState expectedState,
OperatorState actualState);
}Framework for testing state backend switching scenarios and compatibility validation.
/**
* Base class for testing state backend switching scenarios
*/
public abstract class SavepointStateBackendSwitchTestBase {
/**
* Test switching state backend while preserving state correctness
* @throws Exception if state backend switching test fails
*/
protected abstract void testSwitchingStateBackend() throws Exception;
/**
* Test switch from memory state backend to filesystem
* @param filesystemPath path for filesystem state backend
* @throws Exception if backend switch fails
*/
protected void testMemoryToFilesystemSwitch(String filesystemPath) throws Exception;
/**
* Test switch from filesystem state backend to RocksDB
* @param rocksDbPath path for RocksDB state backend
* @throws Exception if backend switch fails
*/
protected void testFilesystemToRocksDbSwitch(String rocksDbPath) throws Exception;
/**
* Test switch from RocksDB state backend back to memory
* @throws Exception if backend switch fails
*/
protected void testRocksDbToMemorySwitch() throws Exception;
/**
* Validate state consistency after backend switch
* @param originalState state before switch
* @param restoredState state after switch
* @return boolean indicating state consistency
*/
protected boolean validateStateConsistency(
StateSnapshot originalState,
StateSnapshot restoredState);
}
/**
* Specifications for state backend switching test scenarios
*/
public class BackendSwitchSpecs {
/**
* Specification for memory to filesystem switch
*/
public static class MemoryToFilesystemSpec {
/**
* Constructor for memory to filesystem switch spec
* @param targetPath filesystem path for state storage
* @param asyncSnapshot enable asynchronous snapshots
*/
public MemoryToFilesystemSpec(String targetPath, boolean asyncSnapshot);
public String getTargetPath();
public boolean isAsyncSnapshot();
}
/**
* Specification for filesystem to RocksDB switch
*/
public static class FilesystemToRocksDbSpec {
/**
* Constructor for filesystem to RocksDB switch spec
* @param rocksDbPath RocksDB storage path
* @param incrementalCheckpoints enable incremental checkpoints
*/
public FilesystemToRocksDbSpec(String rocksDbPath, boolean incrementalCheckpoints);
public String getRocksDbPath();
public boolean isIncrementalCheckpoints();
}
/**
* Create specification for complete backend switch test
* @param memoryToFs memory to filesystem spec
* @param fsToRocksDb filesystem to RocksDB spec
* @return CompleteSwitchSpec for full backend switching test
*/
public static CompleteSwitchSpec createCompleteSwitchSpec(
MemoryToFilesystemSpec memoryToFs,
FilesystemToRocksDbSpec fsToRocksDb);
}Utility classes for common state restore testing operations and validation.
/**
* Utility class for state restore testing operations
*/
public class StateRestoreTestUtils {
/**
* Create test job with configurable state for restore testing
* @param stateConfig state configuration parameters
* @return JobGraph configured for state restore testing
*/
public static JobGraph createStatefulTestJob(StateConfiguration stateConfig);
/**
* Execute job and create savepoint at specified interval
* @param jobGraph job to execute
* @param savepointIntervalMs interval between savepoints
* @return List of savepoint paths created
* @throws Exception if execution or savepoint creation fails
*/
public static List<String> executeAndCreateSavepoints(
JobGraph jobGraph,
long savepointIntervalMs) throws Exception;
/**
* Compare state snapshots for consistency validation
* @param snapshot1 first state snapshot
* @param snapshot2 second state snapshot
* @return StateComparisonResult containing comparison details
*/
public static StateComparisonResult compareStateSnapshots(
StateSnapshot snapshot1,
StateSnapshot snapshot2);
/**
* Extract state from running job for validation
* @param jobId identifier of running job
* @return StateSnapshot containing current job state
* @throws Exception if state extraction fails
*/
public static StateSnapshot extractJobState(JobID jobId) throws Exception;
}
/**
* Configuration for stateful test jobs
*/
public class StateConfiguration {
/**
* Constructor for state configuration
* @param keyedStateSize number of keyed state entries
* @param operatorStateSize size of operator state
* @param checkpointInterval checkpoint interval in milliseconds
*/
public StateConfiguration(
int keyedStateSize,
int operatorStateSize,
long checkpointInterval);
public int getKeyedStateSize();
public int getOperatorStateSize();
public long getCheckpointInterval();
}
/**
* Result of state snapshot comparison
*/
public class StateComparisonResult {
/**
* Check if state snapshots are identical
* @return boolean indicating state identity
*/
public boolean isIdentical();
/**
* Get differences between state snapshots
* @return List of StateDifference objects
*/
public List<StateDifference> getDifferences();
/**
* Get summary of comparison results
* @return String summary of state comparison
*/
public String getComparisonSummary();
}Usage Examples:
import org.apache.flink.test.state.operator.restore.*;
// Basic keyed state restore test
public class KeyedStateRestoreTest extends AbstractKeyedOperatorRestoreTestBase {
@Test
public void testSimpleKeyedStateRestore() throws Exception {
// Create initial state
Map<String, Integer> initialState = new HashMap<>();
initialState.put("key1", 100);
initialState.put("key2", 200);
initialState.put("key3", 300);
// Create job with keyed state
JobGraph job = createKeyedStateJob(initialState);
// Test restore scenario
testKeyedStateRestore(
StringSerializer.INSTANCE,
IntSerializer.INSTANCE);
}
@Override
protected void testRestore() throws Exception {
// Create savepoint
String savepointPath = createSavepoint(createKeyedStateJob(getTestState()));
// Modify job configuration
JobGraph modifiedJob = createModifiedKeyedStateJob();
// Restore and validate
restoreFromSavepoint(modifiedJob, savepointPath);
}
}
// Non-keyed state restore test
public class OperatorStateRestoreTest extends AbstractNonKeyedOperatorRestoreTestBase {
@Test
public void testListStateRestore() throws Exception {
List<String> initialListState = Arrays.asList("item1", "item2", "item3");
testListStateRestore(initialListState);
}
@Test
public void testUnionListStateRestore() throws Exception {
List<Integer> initialUnionState = Arrays.asList(1, 2, 3, 4, 5);
testUnionListStateRestore(initialUnionState);
}
@Test
public void testBroadcastStateRestore() throws Exception {
Map<String, String> initialBroadcastState = new HashMap<>();
initialBroadcastState.put("config1", "value1");
initialBroadcastState.put("config2", "value2");
testBroadcastStateRestore(initialBroadcastState);
}
}
// State backend switching test
public class StateBackendSwitchTest extends SavepointStateBackendSwitchTestBase {
@Test
public void testCompleteBackendSwitching() throws Exception {
testSwitchingStateBackend();
}
@Override
protected void testSwitchingStateBackend() throws Exception {
// Start with memory backend
configureStateBackend(new MemoryStateBackend());
// Create job and run briefly
JobGraph job = StateRestoreTestUtils.createStatefulTestJob(
new StateConfiguration(1000, 500, 5000L));
String memoryState = createSavepoint(job);
// Switch to filesystem backend
testMemoryToFilesystemSwitch("/tmp/flink-state");
// Switch to RocksDB backend
testFilesystemToRocksDbSwitch("/tmp/rocksdb-state");
// Switch back to memory backend
testRocksDbToMemorySwitch();
}
@Test
public void testBackendSwitchWithComplexState() throws Exception {
// Create specifications for backend switching
MemoryToFilesystemSpec memToFs = new BackendSwitchSpecs.MemoryToFilesystemSpec(
"/tmp/fs-state", true);
FilesystemToRocksDbSpec fsToRocks = new BackendSwitchSpecs.FilesystemToRocksDbSpec(
"/tmp/rocks-state", true);
BackendSwitchSpecs.CompleteSwitchSpec switchSpec =
BackendSwitchSpecs.createCompleteSwitchSpec(memToFs, fsToRocks);
// Execute complete switching test
executeBackendSwitchTest(switchSpec);
}
}
// Comprehensive state restore validation
public class StateRestoreValidationTest {
@Test
public void testStateConsistencyAcrossRestores() throws Exception {
StateConfiguration config = new StateConfiguration(5000, 1000, 2000L);
JobGraph job = StateRestoreTestUtils.createStatefulTestJob(config);
// Create multiple savepoints
List<String> savepoints = StateRestoreTestUtils.executeAndCreateSavepoints(
job, 10000L);
// Validate state consistency across savepoints
for (int i = 0; i < savepoints.size() - 1; i++) {
StateSnapshot snapshot1 = loadStateSnapshot(savepoints.get(i));
StateSnapshot snapshot2 = loadStateSnapshot(savepoints.get(i + 1));
StateComparisonResult comparison =
StateRestoreTestUtils.compareStateSnapshots(snapshot1, snapshot2);
// Validate progressive state changes
assertTrue(comparison.getDifferences().size() > 0);
assertFalse(comparison.isIdentical());
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-tests