0
# State Backend and Operator Restore Testing
1
2
Framework for testing state backend switching, operator restore scenarios, and state migration validation. This framework ensures state compatibility across different backends and validates operator restore behavior.
3
4
## Capabilities
5
6
### Abstract Operator Restore Test Base
7
8
Base class providing common functionality for testing operator restore scenarios across different state management configurations.
9
10
```java { .api }
11
/**
12
* Abstract base class for operator restore testing scenarios
13
*/
14
public abstract class AbstractOperatorRestoreTestBase {
15
16
/**
17
* Test operator restore from savepoint or checkpoint
18
* @throws Exception if restore testing fails
19
*/
20
protected abstract void testRestore() throws Exception;
21
22
/**
23
* Create savepoint from running job for restore testing
24
* @param jobGraph job to create savepoint from
25
* @return String path to created savepoint
26
* @throws Exception if savepoint creation fails
27
*/
28
protected String createSavepoint(JobGraph jobGraph) throws Exception;
29
30
/**
31
* Restore job from savepoint and validate state
32
* @param jobGraph job to restore
33
* @param savepointPath path to savepoint
34
* @throws Exception if restore or validation fails
35
*/
36
protected void restoreFromSavepoint(JobGraph jobGraph, String savepointPath) throws Exception;
37
38
/**
39
* Configure state backend for testing
40
* @param backend state backend configuration
41
*/
42
protected void configureStateBackend(StateBackend backend);
43
}
44
```
45
46
### Keyed Operator Restore Testing
47
48
Specialized base class for testing restore scenarios with keyed state operators.
49
50
```java { .api }
51
/**
52
* Base class for testing keyed operator restore scenarios
53
*/
54
public abstract class AbstractKeyedOperatorRestoreTestBase
55
extends AbstractOperatorRestoreTestBase {
56
57
/**
58
* Test keyed state restore with different key serializers
59
* @param keySerializer serializer for key type
60
* @param valueSerializer serializer for value type
61
* @throws Exception if keyed restore test fails
62
*/
63
protected <K, V> void testKeyedStateRestore(
64
TypeSerializer<K> keySerializer,
65
TypeSerializer<V> valueSerializer) throws Exception;
66
67
/**
68
* Validate keyed state after restore
69
* @param expectedState expected state values after restore
70
* @param actualState actual restored state
71
* @return boolean indicating state validity
72
*/
73
protected <K, V> boolean validateKeyedState(
74
Map<K, V> expectedState,
75
Map<K, V> actualState);
76
77
/**
78
* Create keyed state test job with configurable state
79
* @param initialState initial state values
80
* @return JobGraph configured for keyed state testing
81
*/
82
protected <K, V> JobGraph createKeyedStateJob(Map<K, V> initialState);
83
}
84
```
85
86
### Non-Keyed Operator Restore Testing
87
88
Base class for testing restore scenarios with non-keyed (operator) state.
89
90
```java { .api }
91
/**
92
* Base class for testing non-keyed operator restore scenarios
93
*/
94
public abstract class AbstractNonKeyedOperatorRestoreTestBase
95
extends AbstractOperatorRestoreTestBase {
96
97
/**
98
* Test operator state restore with list state
99
* @param initialListState initial list state values
100
* @throws Exception if non-keyed restore test fails
101
*/
102
protected <T> void testListStateRestore(List<T> initialListState) throws Exception;
103
104
/**
105
* Test operator state restore with union list state
106
* @param initialUnionState initial union state values
107
* @throws Exception if union state restore test fails
108
*/
109
protected <T> void testUnionListStateRestore(List<T> initialUnionState) throws Exception;
110
111
/**
112
* Test operator state restore with broadcast state
113
* @param initialBroadcastState initial broadcast state
114
* @throws Exception if broadcast state restore test fails
115
*/
116
protected <K, V> void testBroadcastStateRestore(
117
Map<K, V> initialBroadcastState) throws Exception;
118
119
/**
120
* Validate operator state after restore
121
* @param expectedState expected operator state
122
* @param actualState actual restored operator state
123
* @return boolean indicating state validity
124
*/
125
protected boolean validateOperatorState(
126
OperatorState expectedState,
127
OperatorState actualState);
128
}
129
```
130
131
### State Backend Switch Testing
132
133
Framework for testing state backend switching scenarios and compatibility validation.
134
135
```java { .api }
136
/**
137
* Base class for testing state backend switching scenarios
138
*/
139
public abstract class SavepointStateBackendSwitchTestBase {
140
141
/**
142
* Test switching state backend while preserving state correctness
143
* @throws Exception if state backend switching test fails
144
*/
145
protected abstract void testSwitchingStateBackend() throws Exception;
146
147
/**
148
* Test switch from memory state backend to filesystem
149
* @param filesystemPath path for filesystem state backend
150
* @throws Exception if backend switch fails
151
*/
152
protected void testMemoryToFilesystemSwitch(String filesystemPath) throws Exception;
153
154
/**
155
* Test switch from filesystem state backend to RocksDB
156
* @param rocksDbPath path for RocksDB state backend
157
* @throws Exception if backend switch fails
158
*/
159
protected void testFilesystemToRocksDbSwitch(String rocksDbPath) throws Exception;
160
161
/**
162
* Test switch from RocksDB state backend back to memory
163
* @throws Exception if backend switch fails
164
*/
165
protected void testRocksDbToMemorySwitch() throws Exception;
166
167
/**
168
* Validate state consistency after backend switch
169
* @param originalState state before switch
170
* @param restoredState state after switch
171
* @return boolean indicating state consistency
172
*/
173
protected boolean validateStateConsistency(
174
StateSnapshot originalState,
175
StateSnapshot restoredState);
176
}
177
178
/**
179
* Specifications for state backend switching test scenarios
180
*/
181
public class BackendSwitchSpecs {
182
183
/**
184
* Specification for memory to filesystem switch
185
*/
186
public static class MemoryToFilesystemSpec {
187
/**
188
* Constructor for memory to filesystem switch spec
189
* @param targetPath filesystem path for state storage
190
* @param asyncSnapshot enable asynchronous snapshots
191
*/
192
public MemoryToFilesystemSpec(String targetPath, boolean asyncSnapshot);
193
194
public String getTargetPath();
195
public boolean isAsyncSnapshot();
196
}
197
198
/**
199
* Specification for filesystem to RocksDB switch
200
*/
201
public static class FilesystemToRocksDbSpec {
202
/**
203
* Constructor for filesystem to RocksDB switch spec
204
* @param rocksDbPath RocksDB storage path
205
* @param incrementalCheckpoints enable incremental checkpoints
206
*/
207
public FilesystemToRocksDbSpec(String rocksDbPath, boolean incrementalCheckpoints);
208
209
public String getRocksDbPath();
210
public boolean isIncrementalCheckpoints();
211
}
212
213
/**
214
* Create specification for complete backend switch test
215
* @param memoryToFs memory to filesystem spec
216
* @param fsToRocksDb filesystem to RocksDB spec
217
* @return CompleteSwitchSpec for full backend switching test
218
*/
219
public static CompleteSwitchSpec createCompleteSwitchSpec(
220
MemoryToFilesystemSpec memoryToFs,
221
FilesystemToRocksDbSpec fsToRocksDb);
222
}
223
```
224
225
### State Restore Utilities
226
227
Utility classes for common state restore testing operations and validation.
228
229
```java { .api }
230
/**
231
* Utility class for state restore testing operations
232
*/
233
public class StateRestoreTestUtils {
234
235
/**
236
* Create test job with configurable state for restore testing
237
* @param stateConfig state configuration parameters
238
* @return JobGraph configured for state restore testing
239
*/
240
public static JobGraph createStatefulTestJob(StateConfiguration stateConfig);
241
242
/**
243
* Execute job and create savepoint at specified interval
244
* @param jobGraph job to execute
245
* @param savepointIntervalMs interval between savepoints
246
* @return List of savepoint paths created
247
* @throws Exception if execution or savepoint creation fails
248
*/
249
public static List<String> executeAndCreateSavepoints(
250
JobGraph jobGraph,
251
long savepointIntervalMs) throws Exception;
252
253
/**
254
* Compare state snapshots for consistency validation
255
* @param snapshot1 first state snapshot
256
* @param snapshot2 second state snapshot
257
* @return StateComparisonResult containing comparison details
258
*/
259
public static StateComparisonResult compareStateSnapshots(
260
StateSnapshot snapshot1,
261
StateSnapshot snapshot2);
262
263
/**
264
* Extract state from running job for validation
265
* @param jobId identifier of running job
266
* @return StateSnapshot containing current job state
267
* @throws Exception if state extraction fails
268
*/
269
public static StateSnapshot extractJobState(JobID jobId) throws Exception;
270
}
271
272
/**
273
* Configuration for stateful test jobs
274
*/
275
public class StateConfiguration {
276
277
/**
278
* Constructor for state configuration
279
* @param keyedStateSize number of keyed state entries
280
* @param operatorStateSize size of operator state
281
* @param checkpointInterval checkpoint interval in milliseconds
282
*/
283
public StateConfiguration(
284
int keyedStateSize,
285
int operatorStateSize,
286
long checkpointInterval);
287
288
public int getKeyedStateSize();
289
public int getOperatorStateSize();
290
public long getCheckpointInterval();
291
}
292
293
/**
294
* Result of state snapshot comparison
295
*/
296
public class StateComparisonResult {
297
298
/**
299
* Check if state snapshots are identical
300
* @return boolean indicating state identity
301
*/
302
public boolean isIdentical();
303
304
/**
305
* Get differences between state snapshots
306
* @return List of StateDifference objects
307
*/
308
public List<StateDifference> getDifferences();
309
310
/**
311
* Get summary of comparison results
312
* @return String summary of state comparison
313
*/
314
public String getComparisonSummary();
315
}
316
```
317
318
**Usage Examples:**
319
320
```java
321
import org.apache.flink.test.state.operator.restore.*;
322
323
// Basic keyed state restore test
324
public class KeyedStateRestoreTest extends AbstractKeyedOperatorRestoreTestBase {
325
326
@Test
327
public void testSimpleKeyedStateRestore() throws Exception {
328
// Create initial state
329
Map<String, Integer> initialState = new HashMap<>();
330
initialState.put("key1", 100);
331
initialState.put("key2", 200);
332
initialState.put("key3", 300);
333
334
// Create job with keyed state
335
JobGraph job = createKeyedStateJob(initialState);
336
337
// Test restore scenario
338
testKeyedStateRestore(
339
StringSerializer.INSTANCE,
340
IntSerializer.INSTANCE);
341
}
342
343
@Override
344
protected void testRestore() throws Exception {
345
// Create savepoint
346
String savepointPath = createSavepoint(createKeyedStateJob(getTestState()));
347
348
// Modify job configuration
349
JobGraph modifiedJob = createModifiedKeyedStateJob();
350
351
// Restore and validate
352
restoreFromSavepoint(modifiedJob, savepointPath);
353
}
354
}
355
356
// Non-keyed state restore test
357
public class OperatorStateRestoreTest extends AbstractNonKeyedOperatorRestoreTestBase {
358
359
@Test
360
public void testListStateRestore() throws Exception {
361
List<String> initialListState = Arrays.asList("item1", "item2", "item3");
362
testListStateRestore(initialListState);
363
}
364
365
@Test
366
public void testUnionListStateRestore() throws Exception {
367
List<Integer> initialUnionState = Arrays.asList(1, 2, 3, 4, 5);
368
testUnionListStateRestore(initialUnionState);
369
}
370
371
@Test
372
public void testBroadcastStateRestore() throws Exception {
373
Map<String, String> initialBroadcastState = new HashMap<>();
374
initialBroadcastState.put("config1", "value1");
375
initialBroadcastState.put("config2", "value2");
376
377
testBroadcastStateRestore(initialBroadcastState);
378
}
379
}
380
381
// State backend switching test
382
public class StateBackendSwitchTest extends SavepointStateBackendSwitchTestBase {
383
384
@Test
385
public void testCompleteBackendSwitching() throws Exception {
386
testSwitchingStateBackend();
387
}
388
389
@Override
390
protected void testSwitchingStateBackend() throws Exception {
391
// Start with memory backend
392
configureStateBackend(new MemoryStateBackend());
393
394
// Create job and run briefly
395
JobGraph job = StateRestoreTestUtils.createStatefulTestJob(
396
new StateConfiguration(1000, 500, 5000L));
397
String memoryState = createSavepoint(job);
398
399
// Switch to filesystem backend
400
testMemoryToFilesystemSwitch("/tmp/flink-state");
401
402
// Switch to RocksDB backend
403
testFilesystemToRocksDbSwitch("/tmp/rocksdb-state");
404
405
// Switch back to memory backend
406
testRocksDbToMemorySwitch();
407
}
408
409
@Test
410
public void testBackendSwitchWithComplexState() throws Exception {
411
// Create specifications for backend switching
412
MemoryToFilesystemSpec memToFs = new BackendSwitchSpecs.MemoryToFilesystemSpec(
413
"/tmp/fs-state", true);
414
FilesystemToRocksDbSpec fsToRocks = new BackendSwitchSpecs.FilesystemToRocksDbSpec(
415
"/tmp/rocks-state", true);
416
417
BackendSwitchSpecs.CompleteSwitchSpec switchSpec =
418
BackendSwitchSpecs.createCompleteSwitchSpec(memToFs, fsToRocks);
419
420
// Execute complete switching test
421
executeBackendSwitchTest(switchSpec);
422
}
423
}
424
425
// Comprehensive state restore validation
426
public class StateRestoreValidationTest {
427
428
@Test
429
public void testStateConsistencyAcrossRestores() throws Exception {
430
StateConfiguration config = new StateConfiguration(5000, 1000, 2000L);
431
JobGraph job = StateRestoreTestUtils.createStatefulTestJob(config);
432
433
// Create multiple savepoints
434
List<String> savepoints = StateRestoreTestUtils.executeAndCreateSavepoints(
435
job, 10000L);
436
437
// Validate state consistency across savepoints
438
for (int i = 0; i < savepoints.size() - 1; i++) {
439
StateSnapshot snapshot1 = loadStateSnapshot(savepoints.get(i));
440
StateSnapshot snapshot2 = loadStateSnapshot(savepoints.get(i + 1));
441
442
StateComparisonResult comparison =
443
StateRestoreTestUtils.compareStateSnapshots(snapshot1, snapshot2);
444
445
// Validate progressive state changes
446
assertTrue(comparison.getDifferences().size() > 0);
447
assertFalse(comparison.isIdentical());
448
}
449
}
450
}
451
```