0
# State Migration Testing
1
2
Comprehensive infrastructure for testing operator state migration and compatibility across Flink versions. This includes utilities for savepoint creation, restoration, verification, and specialized source/sink implementations for migration testing scenarios.
3
4
## Capabilities
5
6
### MigrationTestUtils
7
8
Utility class containing specialized sources, sinks, and helper components for migration testing.
9
10
```java { .api }
11
/**
12
* Utility class containing common functions and classes for migration tests
13
* Provides standardized components for testing state migration scenarios
14
*/
15
public class MigrationTestUtils {
16
17
/**
18
* Non-parallel source with list state for migration testing
19
* Creates checkpointed state with predefined string values
20
*/
21
public static class CheckpointingNonParallelSourceWithListState
22
implements SourceFunction<Tuple2<Long, Long>>, CheckpointedFunction {
23
24
static final ListStateDescriptor<String> STATE_DESCRIPTOR =
25
new ListStateDescriptor<>("source-state", StringSerializer.INSTANCE);
26
27
static final String CHECKPOINTED_STRING = "Here be dragons!";
28
static final String CHECKPOINTED_STRING_1 = "Here be more dragons!";
29
static final String CHECKPOINTED_STRING_2 = "Here be yet more dragons!";
30
static final String CHECKPOINTED_STRING_3 = "Here be the mostest dragons!";
31
32
/**
33
* Create source with specified number of elements
34
* @param numElements Number of elements to emit
35
*/
36
public CheckpointingNonParallelSourceWithListState(int numElements);
37
38
@Override
39
public void snapshotState(FunctionSnapshotContext context) throws Exception;
40
41
@Override
42
public void initializeState(FunctionInitializationContext context) throws Exception;
43
44
@Override
45
public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception;
46
47
@Override
48
public void cancel();
49
}
50
51
/**
52
* Source for verifying restored state from CheckpointingNonParallelSourceWithListState
53
* Validates that list state was properly restored with expected values
54
*/
55
public static class CheckingNonParallelSourceWithListState
56
extends RichSourceFunction<Tuple2<Long, Long>> implements CheckpointedFunction {
57
58
static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
59
CheckingNonParallelSourceWithListState.class + "_RESTORE_CHECK";
60
61
/**
62
* Create checking source with specified number of elements
63
* @param numElements Number of elements to emit after verification
64
*/
65
public CheckingNonParallelSourceWithListState(int numElements);
66
67
@Override
68
public void initializeState(FunctionInitializationContext context) throws Exception;
69
70
@Override
71
public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception;
72
}
73
74
/**
75
* Parallel source with union list state for migration testing
76
* Distributes state across parallel instances using hash-based distribution
77
*/
78
public static class CheckpointingParallelSourceWithUnionListState
79
extends RichSourceFunction<Tuple2<Long, Long>> implements CheckpointedFunction {
80
81
static final ListStateDescriptor<String> STATE_DESCRIPTOR =
82
new ListStateDescriptor<>("source-state", StringSerializer.INSTANCE);
83
84
static final String[] CHECKPOINTED_STRINGS = {
85
"Here be dragons!",
86
"Here be more dragons!",
87
"Here be yet more dragons!",
88
"Here be the mostest dragons!" };
89
90
/**
91
* Create parallel source with specified number of elements
92
* @param numElements Number of elements to emit per parallel instance
93
*/
94
public CheckpointingParallelSourceWithUnionListState(int numElements);
95
96
@Override
97
public void snapshotState(FunctionSnapshotContext context) throws Exception;
98
99
@Override
100
public void initializeState(FunctionInitializationContext context) throws Exception;
101
}
102
103
/**
104
* Source for verifying restored union list state
105
* Validates that union list state contains all expected values from all parallel instances
106
*/
107
public static class CheckingParallelSourceWithUnionListState
108
extends RichParallelSourceFunction<Tuple2<Long, Long>> implements CheckpointedFunction {
109
110
static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
111
CheckingParallelSourceWithUnionListState.class + "_RESTORE_CHECK";
112
113
/**
114
* Create checking parallel source
115
* @param numElements Number of elements to emit after verification
116
*/
117
public CheckingParallelSourceWithUnionListState(int numElements);
118
119
@Override
120
public void initializeState(FunctionInitializationContext context) throws Exception;
121
}
122
123
/**
124
* Sink that counts elements using accumulators for coordination
125
* Used to signal completion of migration test phases
126
*/
127
public static class AccumulatorCountingSink<T> extends RichSinkFunction<T> {
128
129
static final String NUM_ELEMENTS_ACCUMULATOR =
130
AccumulatorCountingSink.class + "_NUM_ELEMENTS";
131
132
/**
133
* Open sink and initialize accumulator
134
*/
135
@Override
136
public void open(Configuration parameters) throws Exception;
137
138
/**
139
* Count element and update accumulator
140
* @param value Element to count
141
* @param context Sink context
142
*/
143
@Override
144
public void invoke(T value, Context context) throws Exception;
145
}
146
}
147
```
148
149
**Usage Example:**
150
151
```java
152
public class MyMigrationTest extends SavepointMigrationTestBase {
153
154
@Test
155
public void testListStateMigration() throws Exception {
156
final int NUM_ELEMENTS = 100;
157
158
// Phase 1: Create savepoint with list state
159
StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment();
160
env1.setParallelism(DEFAULT_PARALLELISM);
161
162
env1.addSource(new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_ELEMENTS))
163
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
164
165
executeAndSavepoint(env1, "migration-savepoint",
166
Tuple2.of(MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_ELEMENTS));
167
168
// Phase 2: Restore and verify state
169
StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment();
170
env2.setParallelism(DEFAULT_PARALLELISM);
171
172
env2.addSource(new MigrationTestUtils.CheckingNonParallelSourceWithListState(NUM_ELEMENTS))
173
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
174
175
restoreAndExecute(env2, "migration-savepoint",
176
Tuple2.of(MigrationTestUtils.CheckingNonParallelSourceWithListState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1),
177
Tuple2.of(MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_ELEMENTS));
178
}
179
}
180
```
181
182
### AbstractOperatorRestoreTestBase
183
184
Foundation class for testing operator state restoration with standardized savepoint handling and job creation patterns.
185
186
```java { .api }
187
/**
188
* Base class for testing operator state migration and restoration across Flink versions
189
* Provides standardized infrastructure for savepoint-based testing
190
*/
191
public abstract class AbstractOperatorRestoreTestBase {
192
193
/**
194
* Create job that generates migration savepoint
195
* Implementation should create a job that produces state to be migrated
196
* @param env StreamExecutionEnvironment for job creation
197
* @return JobGraph for the migration job
198
* @throws Exception if job creation fails
199
*/
200
public abstract JobGraph createMigrationJob(StreamExecutionEnvironment env) throws Exception;
201
202
/**
203
* Create job that restores from migration savepoint and verifies state
204
* Implementation should create a job that validates migrated state
205
* @param env StreamExecutionEnvironment for job creation
206
* @return JobGraph for the restoration job
207
* @throws Exception if job creation fails
208
*/
209
public abstract JobGraph createRestoredJob(StreamExecutionEnvironment env) throws Exception;
210
211
/**
212
* Get name of savepoint resource for this test
213
* Should return the filename of the savepoint resource in test resources
214
* @return Resource name for the savepoint
215
*/
216
public abstract String getMigrationSavepointName();
217
218
/**
219
* Execute the complete migration test cycle
220
* 1. Create migration job and generate savepoint
221
* 2. Create restored job and validate state
222
*/
223
@Test
224
public void testRestore() throws Exception;
225
}
226
```
227
228
**Usage Example:**
229
230
```java
231
public class MyOperatorRestoreTest extends AbstractOperatorRestoreTestBase {
232
233
@Override
234
public JobGraph createMigrationJob(StreamExecutionEnvironment env) throws Exception {
235
env.addSource(new SourceWithManagedState())
236
.keyBy(x -> x.key)
237
.map(new StatefulMapFunction())
238
.addSink(new DiscardingSink<>());
239
240
return env.getStreamGraph().getJobGraph();
241
}
242
243
@Override
244
public JobGraph createRestoredJob(StreamExecutionEnvironment env) throws Exception {
245
env.addSource(new VerifyingSource())
246
.keyBy(x -> x.key)
247
.map(new StatefulMapFunction()) // Same function, should restore state
248
.addSink(new ValidatingSink<>());
249
250
return env.getStreamGraph().getJobGraph();
251
}
252
253
@Override
254
public String getMigrationSavepointName() {
255
return "my-operator-flink1.4-savepoint";
256
}
257
}
258
```
259
260
### AbstractKeyedOperatorRestoreTestBase
261
262
Specialized base class for testing keyed operator state restoration with keyed state management.
263
264
```java { .api }
265
/**
266
* Specialized base class for testing keyed operator state restoration
267
* Provides additional infrastructure for keyed state scenarios
268
*/
269
public abstract class AbstractKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {
270
271
/**
272
* Default key selector for keyed state tests
273
* @return Function to extract keys from test data
274
*/
275
protected abstract KeySelector<?, ?> getKeySelector();
276
277
/**
278
* Create keyed stream for migration testing
279
* @param env StreamExecutionEnvironment
280
* @return Configured DataStream with keying applied
281
*/
282
protected abstract DataStream<?> createKeyedStream(StreamExecutionEnvironment env);
283
}
284
```
285
286
### AbstractNonKeyedOperatorRestoreTestBase
287
288
Specialized base class for testing non-keyed (operator) state restoration.
289
290
```java { .api }
291
/**
292
* Specialized base class for testing non-keyed operator state restoration
293
* Handles operator state scenarios without keying requirements
294
*/
295
public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {
296
297
/**
298
* Create non-keyed stream for migration testing
299
* @param env StreamExecutionEnvironment
300
* @return Configured DataStream without keying
301
*/
302
protected abstract DataStream<?> createNonKeyedStream(StreamExecutionEnvironment env);
303
}
304
```
305
306
## Migration Testing Patterns
307
308
### List State Migration Pattern
309
310
Testing migration of list state across Flink versions:
311
312
```java
313
// Step 1: Create job with list state (older Flink version)
314
public class ListStateMigrationJob {
315
public static void main(String[] args) throws Exception {
316
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
317
env.addSource(new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(1000))
318
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
319
env.execute();
320
}
321
}
322
323
// Step 2: Restore and verify (newer Flink version)
324
public class ListStateRestorationJob {
325
public static void main(String[] args) throws Exception {
326
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
327
env.addSource(new MigrationTestUtils.CheckingNonParallelSourceWithListState(1000))
328
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
329
env.execute();
330
}
331
}
332
```
333
334
### Union State Migration Pattern
335
336
Testing migration of union list state with parallel instances:
337
338
```java
339
// Create job with union state
340
env.addSource(new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(1000))
341
.setParallelism(4)
342
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
343
344
// Restore and verify union state
345
env.addSource(new MigrationTestUtils.CheckingParallelSourceWithUnionListState(1000))
346
.setParallelism(4)
347
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
348
```
349
350
### Accumulator Coordination Pattern
351
352
Using accumulators to coordinate test phases:
353
354
```java
355
// Wait for specific accumulator values before proceeding
356
Tuple2<String, Integer> completionSignal =
357
Tuple2.of(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, expectedCount);
358
359
Tuple2<String, Integer> verificationSignal =
360
Tuple2.of(CheckingNonParallelSourceWithListState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1);
361
362
executeAndSavepoint(env, savepointPath, completionSignal);
363
restoreAndExecute(env, savepointPath, verificationSignal, completionSignal);
364
```
365
366
## State Validation
367
368
### List State Validation
369
370
The migration test utilities automatically validate that list state contains expected values:
371
372
```java
373
// CheckingNonParallelSourceWithListState validates:
374
assertThat(unionListState.get(), containsInAnyOrder(
375
CheckpointingNonParallelSourceWithListState.CHECKPOINTED_STRING,
376
CheckpointingNonParallelSourceWithListState.CHECKPOINTED_STRING_1,
377
CheckpointingNonParallelSourceWithListState.CHECKPOINTED_STRING_2,
378
CheckpointingNonParallelSourceWithListState.CHECKPOINTED_STRING_3
379
));
380
```
381
382
### Union State Validation
383
384
Union state validation ensures all parallel instances contribute their state:
385
386
```java
387
// CheckingParallelSourceWithUnionListState validates:
388
assertThat(unionListState.get(), containsInAnyOrder(
389
CheckpointingParallelSourceWithUnionListState.CHECKPOINTED_STRINGS
390
));
391
```
392
393
### Accumulator-based Verification
394
395
Accumulators provide coordination and verification signals:
396
397
```java
398
// Success accumulator indicates proper state restoration
399
getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
400
getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
401
402
// Count accumulator tracks processing progress
403
getRuntimeContext().getAccumulator(NUM_ELEMENTS_ACCUMULATOR).add(1);
404
```