0
# State Management Testing
1
2
Framework for testing operator state restoration and migration, including utilities for both keyed and non-keyed state scenarios. This framework validates that streaming operators can correctly restore their state after failures, checkpoints, and version migrations.
3
4
## Capabilities
5
6
### Execution Mode Control
7
8
Enumeration controlling function behavior for different test stages.
9
10
```java { .api }
11
/**
12
* Execution mode enumeration for controlling test behavior
13
*/
14
public enum ExecutionMode {
15
/** Generate initial state and data */
16
GENERATE,
17
18
/** Migrate state to new format */
19
MIGRATE,
20
21
/** Restore from migrated state */
22
RESTORE
23
}
24
```
25
26
**Usage Example:**
27
28
```java
29
// Control test behavior based on execution mode
30
ExecutionMode mode = ExecutionMode.valueOf(args[0]);
31
32
switch (mode) {
33
case GENERATE:
34
// Generate initial state and create savepoint
35
runStateGenerationJob();
36
break;
37
case MIGRATE:
38
// Migrate state format if needed
39
runStateMigrationJob();
40
break;
41
case RESTORE:
42
// Restore from savepoint and verify
43
runStateRestorationJob();
44
break;
45
}
46
```
47
48
### Operator Restore Test Base Classes
49
50
Abstract base classes for testing operator state restoration scenarios.
51
52
```java { .api }
53
/**
54
* Abstract base class for testing operator state restoration
55
*/
56
public abstract class AbstractOperatorRestoreTestBase {
57
58
/**
59
* Create the streaming topology for state testing
60
* @param env StreamExecutionEnvironment to configure
61
* @param mode ExecutionMode determining test behavior
62
*/
63
protected abstract void createRestorationTopology(
64
StreamExecutionEnvironment env,
65
ExecutionMode mode);
66
67
/**
68
* Verify state restoration was successful
69
* @param mode ExecutionMode that was executed
70
* @throws Exception if verification fails
71
*/
72
protected abstract void verifyRestorationResult(ExecutionMode mode) throws Exception;
73
}
74
```
75
76
### Keyed Operator Restore Testing
77
78
Framework for testing keyed operator state restoration.
79
80
```java { .api }
81
/**
82
* Abstract base class for testing keyed operator state restoration
83
*/
84
public abstract class AbstractKeyedOperatorRestoreTestBase
85
extends AbstractOperatorRestoreTestBase {
86
87
/**
88
* Create stateful keyed operators for testing state restoration
89
* @param env StreamExecutionEnvironment to configure
90
* @param mode ExecutionMode determining behavior
91
*/
92
protected abstract void createKeyedStateTopology(
93
StreamExecutionEnvironment env,
94
ExecutionMode mode);
95
96
/**
97
* Verify keyed state was correctly restored
98
* @param expectedStateValues Expected state values after restoration
99
* @throws Exception if verification fails
100
*/
101
protected void verifyKeyedState(Map<String, Object> expectedStateValues) throws Exception;
102
}
103
104
/**
105
* Standalone job for testing keyed state migration
106
*/
107
public class KeyedJob {
108
109
/**
110
* Main entry point for keyed state migration testing
111
* @param args Command line arguments: [mode] [savepointPath] [checkpointDir]
112
* @throws Exception if job execution fails
113
*/
114
public static void main(String[] args) throws Exception;
115
}
116
```
117
118
### Non-Keyed Operator Restore Testing
119
120
Framework for testing non-keyed operator state restoration.
121
122
```java { .api }
123
/**
124
* Abstract base class for testing non-keyed operator state restoration
125
*/
126
public abstract class AbstractNonKeyedOperatorRestoreTestBase
127
extends AbstractOperatorRestoreTestBase {
128
129
/**
130
* Create stateful non-keyed operators for testing state restoration
131
* @param env StreamExecutionEnvironment to configure
132
* @param mode ExecutionMode determining behavior
133
*/
134
protected abstract void createNonKeyedStateTopology(
135
StreamExecutionEnvironment env,
136
ExecutionMode mode);
137
138
/**
139
* Verify non-keyed state was correctly restored
140
* @param expectedGlobalState Expected global state after restoration
141
* @throws Exception if verification fails
142
*/
143
protected void verifyNonKeyedState(Object expectedGlobalState) throws Exception;
144
}
145
146
/**
147
* Standalone job for testing non-keyed state migration
148
*/
149
public class NonKeyedJob {
150
151
/**
152
* Main entry point for non-keyed state migration testing
153
* @param args Command line arguments: [mode] [savepointPath] [checkpointDir]
154
* @throws Exception if job execution fails
155
*/
156
public static void main(String[] args) throws Exception;
157
}
158
```
159
160
### State Management Test Patterns
161
162
Common patterns for implementing state management tests:
163
164
**Basic Keyed State Test:**
165
166
```java
167
public class KeyedStateRestorationTest extends AbstractKeyedOperatorRestoreTestBase {
168
169
private TestListResultSink<Tuple2<String, Integer>> resultSink;
170
171
@Override
172
protected void createKeyedStateTopology(StreamExecutionEnvironment env, ExecutionMode mode) {
173
env.setParallelism(2);
174
env.enableCheckpointing(100);
175
176
resultSink = new TestListResultSink<>();
177
178
DataStream<String> input;
179
180
if (mode == ExecutionMode.GENERATE) {
181
// Generate test data and state
182
input = env.fromElements("key1", "key1", "key2", "key2", "key1");
183
} else {
184
// Use minimal input for restoration testing
185
input = env.fromElements("key1", "key2");
186
}
187
188
input.keyBy(value -> value)
189
.process(new StatefulKeyedProcessFunction(mode))
190
.addSink(resultSink);
191
}
192
193
@Override
194
protected void verifyRestorationResult(ExecutionMode mode) throws Exception {
195
List<Tuple2<String, Integer>> results = resultSink.getResult();
196
197
if (mode == ExecutionMode.RESTORE) {
198
// Verify state was correctly restored
199
Map<String, Integer> stateMap = results.stream()
200
.collect(Collectors.toMap(t -> t.f0, t -> t.f1));
201
202
assertEquals(3, stateMap.get("key1").intValue()); // Previous count + 1
203
assertEquals(2, stateMap.get("key2").intValue()); // Previous count + 1
204
}
205
}
206
207
@Test
208
public void testKeyedStateRestoration() throws Exception {
209
// Phase 1: Generate state and create savepoint
210
String savepointPath = runTestPhase(ExecutionMode.GENERATE);
211
212
// Phase 2: Restore from savepoint and verify
213
runTestPhase(ExecutionMode.RESTORE, savepointPath);
214
}
215
}
216
```
217
218
**Non-Keyed State Test:**
219
220
```java
221
public class NonKeyedStateRestorationTest extends AbstractNonKeyedOperatorRestoreTestBase {
222
223
private TestListResultSink<Long> resultSink;
224
225
@Override
226
protected void createNonKeyedStateTopology(StreamExecutionEnvironment env, ExecutionMode mode) {
227
env.setParallelism(1); // Non-keyed operators typically use parallelism 1
228
env.enableCheckpointing(50);
229
230
resultSink = new TestListResultSink<>();
231
232
DataStream<Integer> input;
233
234
if (mode == ExecutionMode.GENERATE) {
235
input = env.fromElements(1, 2, 3, 4, 5);
236
} else {
237
input = env.fromElements(6, 7); // Additional elements for restore test
238
}
239
240
input.process(new StatefulNonKeyedProcessFunction(mode))
241
.addSink(resultSink);
242
}
243
244
@Override
245
protected void verifyRestorationResult(ExecutionMode mode) throws Exception {
246
List<Long> results = resultSink.getResult();
247
248
if (mode == ExecutionMode.RESTORE) {
249
// Verify global state was restored (running sum should continue)
250
long finalSum = results.get(results.size() - 1);
251
assertEquals(28, finalSum); // 15 (previous) + 6 + 7 = 28
252
}
253
}
254
}
255
```
256
257
**State Migration Test:**
258
259
```java
260
public class StateMigrationTest extends AbstractKeyedOperatorRestoreTestBase {
261
262
@Override
263
protected void createKeyedStateTopology(StreamExecutionEnvironment env, ExecutionMode mode) {
264
env.setParallelism(4);
265
env.enableCheckpointing(100);
266
267
DataStream<String> source = env.addSource(new TestDataSource(mode));
268
269
if (mode == ExecutionMode.MIGRATE) {
270
// Use evolved state schema
271
source.keyBy(value -> value)
272
.process(new EvolvedStateProcessFunction())
273
.addSink(new DiscardingSink<>());
274
} else {
275
// Use original state schema
276
source.keyBy(value -> value)
277
.process(new OriginalStateProcessFunction())
278
.addSink(new DiscardingSink<>());
279
}
280
}
281
282
@Test
283
public void testStateSchemaMigration() throws Exception {
284
// Generate with original schema
285
String originalSavepoint = runTestPhase(ExecutionMode.GENERATE);
286
287
// Migrate to new schema
288
String migratedSavepoint = runTestPhase(ExecutionMode.MIGRATE, originalSavepoint);
289
290
// Restore with new schema
291
runTestPhase(ExecutionMode.RESTORE, migratedSavepoint);
292
}
293
}
294
```
295
296
**Standalone Job Pattern:**
297
298
```java
299
// Using KeyedJob for state migration testing
300
public class KeyedStateMigrationITCase {
301
302
@Test
303
public void testKeyedStateMigration() throws Exception {
304
String checkpointDir = tempFolder.newFolder("checkpoints").getAbsolutePath();
305
String savepointPath = null;
306
307
// Generate phase
308
String[] generateArgs = {
309
ExecutionMode.GENERATE.toString(),
310
"null", // no input savepoint
311
checkpointDir
312
};
313
314
KeyedJob.main(generateArgs);
315
316
// Find generated savepoint
317
savepointPath = findLatestSavepoint(checkpointDir);
318
assertNotNull("Savepoint should be generated", savepointPath);
319
320
// Restore phase
321
String[] restoreArgs = {
322
ExecutionMode.RESTORE.toString(),
323
savepointPath,
324
checkpointDir
325
};
326
327
KeyedJob.main(restoreArgs);
328
// Test passes if restoration completes without exception
329
}
330
}
331
```
332
333
**Complex State Evolution Test:**
334
335
```java
336
public class ComplexStateEvolutionTest extends AbstractKeyedOperatorRestoreTestBase {
337
338
@Override
339
protected void createKeyedStateTopology(StreamExecutionEnvironment env, ExecutionMode mode) {
340
env.setParallelism(2);
341
env.enableCheckpointing(100);
342
343
DataStream<Tuple2<String, Integer>> source = createTestSource(mode);
344
345
source.keyBy(value -> value.f0)
346
.process(new MultiStateProcessFunction(mode))
347
.addSink(new TestResultSink());
348
}
349
350
/**
351
* Process function with multiple state types for evolution testing
352
*/
353
private static class MultiStateProcessFunction
354
extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {
355
356
private ValueState<Integer> countState;
357
private ListState<String> historyState;
358
private MapState<String, Long> timestampState;
359
360
@Override
361
public void open(Configuration parameters) throws Exception {
362
// Initialize state descriptors
363
countState = getRuntimeContext().getState(
364
new ValueStateDescriptor<>("count", Integer.class));
365
historyState = getRuntimeContext().getListState(
366
new ListStateDescriptor<>("history", String.class));
367
timestampState = getRuntimeContext().getMapState(
368
new MapStateDescriptor<>("timestamps", String.class, Long.class));
369
}
370
371
@Override
372
public void processElement(
373
Tuple2<String, Integer> value,
374
Context ctx,
375
Collector<String> out) throws Exception {
376
377
// Update multiple state types
378
Integer currentCount = countState.value();
379
countState.update(currentCount == null ? 1 : currentCount + 1);
380
381
historyState.add(value.toString());
382
timestampState.put(value.f0, ctx.timestamp());
383
384
// Emit result
385
out.collect(String.format("Key: %s, Count: %d",
386
value.f0, countState.value()));
387
}
388
}
389
}
390
```
391
392
This state management testing framework ensures that Flink operators correctly maintain and restore their state across various failure and migration scenarios, providing confidence in the reliability of stateful streaming applications.