0
# Migration Testing Framework
1
2
Complete framework for testing savepoint and checkpoint migration across different Flink versions. This framework provides infrastructure for validating that streaming applications can successfully restore from savepoints created with previous Flink versions, ensuring backward compatibility and state migration correctness.
3
4
## Capabilities
5
6
### Savepoint Migration Test Base
7
8
Abstract base class providing core functionality for savepoint migration testing.
9
10
```java { .api }
11
/**
12
* Base class for savepoint migration tests providing utilities for creating,
13
* restoring, and validating savepoints across different Flink versions
14
*/
15
public abstract class SavepointMigrationTestBase extends TestBaseUtils {
16
17
/**
18
* Get the full path to a test resource file
19
* @param filename Resource filename relative to test resources
20
* @return Full path to the resource file
21
*/
22
protected String getResourceFilename(String filename);
23
24
/**
25
* Execute a streaming job and create a savepoint at the specified path
26
* @param env StreamExecutionEnvironment configured for the test
27
* @param savepointPath Path where savepoint should be created
28
* @param expectedAccumulators Expected accumulator values for verification
29
* @throws Exception if job execution or savepoint creation fails
30
*/
31
protected void executeAndSavepoint(
32
StreamExecutionEnvironment env,
33
String savepointPath,
34
Tuple2<String, Integer>... expectedAccumulators) throws Exception;
35
36
/**
37
* Restore a streaming job from savepoint and execute to completion
38
* @param env StreamExecutionEnvironment configured for the test
39
* @param savepointPath Path to existing savepoint
40
* @param expectedAccumulators Expected accumulator values for verification
41
* @throws Exception if restoration or execution fails
42
*/
43
protected void restoreAndExecute(
44
StreamExecutionEnvironment env,
45
String savepointPath,
46
Tuple2<String, Integer>... expectedAccumulators) throws Exception;
47
}
48
```
49
50
**Usage Example:**
51
52
```java
53
public class MyMigrationTest extends SavepointMigrationTestBase {
54
55
@Test
56
public void testMigrationFromFlink17() throws Exception {
57
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
58
env.setParallelism(4);
59
60
// Configure your streaming topology
61
env.addSource(new TestSource())
62
.keyBy(value -> value.getKey())
63
.map(new StatefulMapper())
64
.addSink(new TestSink());
65
66
// Restore from Flink 1.7 savepoint and verify
67
String savepointPath = getResourceFilename("migration-test-flink1.7-savepoint");
68
restoreAndExecute(env, savepointPath,
69
Tuple2.of("elements-count", 1000),
70
Tuple2.of("checkpoints-count", 10));
71
}
72
}
73
```
74
75
### Migration Test Utilities
76
77
Utility classes and sources/sinks specifically designed for migration testing.
78
79
```java { .api }
80
/**
81
* Utility class containing specialized sources and sinks for migration testing
82
*/
83
public class MigrationTestUtils {
84
85
/**
86
* Source with list state for checkpointing tests (non-parallel)
87
*/
88
public static class CheckpointingNonParallelSourceWithListState
89
extends RichSourceFunction<Tuple2<Long, Long>>
90
implements ListCheckpointed<Long> {
91
92
public CheckpointingNonParallelSourceWithListState(int numElements);
93
94
public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception;
95
public void cancel();
96
public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception;
97
public void restoreState(List<Long> state) throws Exception;
98
}
99
100
/**
101
* Source for verifying restored state (non-parallel)
102
*/
103
public static class CheckingNonParallelSourceWithListState
104
extends RichSourceFunction<Tuple2<Long, Long>>
105
implements ListCheckpointed<Long> {
106
107
public CheckingNonParallelSourceWithListState(int numElements);
108
109
public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception;
110
public void cancel();
111
public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception;
112
public void restoreState(List<Long> state) throws Exception;
113
}
114
115
/**
116
* Parallel source with union list state for checkpointing tests
117
*/
118
public static class CheckpointingParallelSourceWithUnionListState
119
extends RichParallelSourceFunction<Tuple2<Long, Long>>
120
implements ListCheckpointed<Long> {
121
122
public CheckpointingParallelSourceWithUnionListState(int numElements);
123
124
public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception;
125
public void cancel();
126
public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception;
127
public void restoreState(List<Long> state) throws Exception;
128
}
129
130
/**
131
* Parallel source for verifying union list state restoration
132
*/
133
public static class CheckingParallelSourceWithUnionListState
134
extends RichParallelSourceFunction<Tuple2<Long, Long>>
135
implements ListCheckpointed<Long> {
136
137
public CheckingParallelSourceWithUnionListState(int numElements);
138
139
public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception;
140
public void cancel();
141
public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception;
142
public void restoreState(List<Long> state) throws Exception;
143
}
144
145
/**
146
* Sink that counts elements using accumulator
147
*/
148
public static class AccumulatorCountingSink<T> extends RichSinkFunction<T> {
149
150
public AccumulatorCountingSink(String accumulatorName);
151
152
public void open(Configuration parameters) throws Exception;
153
public void invoke(T value, Context context) throws Exception;
154
}
155
}
156
```
157
158
### Failing Source for Fault Tolerance Testing
159
160
Specialized source that can introduce controlled failures for fault tolerance and migration testing.
161
162
```java { .api }
163
/**
164
* Source that can introduce artificial failures for fault tolerance testing
165
*/
166
public class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>> {
167
168
/**
169
* Functional interface for event emission strategies
170
*/
171
@FunctionalInterface
172
public interface EventEmittingGenerator {
173
void emitEvent(SourceContext<Tuple2<Long, IntType>> ctx, int eventSequenceNo);
174
}
175
176
/**
177
* Constructor for basic failing source
178
* @param eventGenerator Generator function for creating events
179
* @param numEvents Total number of events to generate
180
* @param numElementsUntilFailure Number of elements before inducing failure
181
* @param numSuccessfulCheckpoints Number of successful checkpoints before failure
182
*/
183
public FailingSource(
184
EventEmittingGenerator eventGenerator,
185
int numEvents,
186
int numElementsUntilFailure,
187
int numSuccessfulCheckpoints);
188
189
/**
190
* Constructor with failure position control
191
* @param eventGenerator Generator function for creating events
192
* @param numEvents Total number of events to generate
193
* @param failurePos Position at which to induce failure
194
* @param numSuccessfulCheckpoints Number of successful checkpoints before failure
195
* @param continueAfterFailure Whether to continue after failure
196
*/
197
public FailingSource(
198
EventEmittingGenerator eventGenerator,
199
int numEvents,
200
int failurePos,
201
int numSuccessfulCheckpoints,
202
boolean continueAfterFailure);
203
204
public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception;
205
public void cancel();
206
}
207
```
208
209
### Supporting Types
210
211
Supporting data types and utilities for migration testing.
212
213
```java { .api }
214
/**
215
* Simple integer wrapper for testing
216
*/
217
public class IntType {
218
public int value;
219
220
public IntType();
221
public IntType(int value);
222
223
public boolean equals(Object obj);
224
public int hashCode();
225
public String toString();
226
}
227
228
/**
229
* Sink for result validation in migration tests
230
*/
231
public class ValidatingSink<T> extends RichSinkFunction<T> {
232
233
public ValidatingSink(List<T> expectedValues);
234
235
public void open(Configuration parameters) throws Exception;
236
public void invoke(T value, Context context) throws Exception;
237
public void close() throws Exception;
238
}
239
```
240
241
### Migration Testing Patterns
242
243
Common patterns for implementing migration tests:
244
245
**Basic Migration Test Pattern:**
246
247
```java
248
public class StatefulJobMigrationTest extends SavepointMigrationTestBase {
249
250
@Test
251
public void testMigrationFromVersion14() throws Exception {
252
// 1. Setup environment
253
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
254
env.setStateBackend(new RocksDBStateBackend("file:///tmp/test-backend"));
255
env.enableCheckpointing(100);
256
257
// 2. Create topology with stateful operators
258
DataStream<Tuple2<Long, IntType>> source = env.addSource(
259
new MigrationTestUtils.CheckingNonParallelSourceWithListState(100));
260
261
source.keyBy(value -> value.f0)
262
.map(new StatefulMapFunction())
263
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>("count"));
264
265
// 3. Restore from savepoint and execute
266
String savepointPath = getResourceFilename("stateful-job-flink1.4-savepoint");
267
restoreAndExecute(env, savepointPath, Tuple2.of("count", 100));
268
}
269
}
270
```
271
272
**Fault Tolerance Migration Test:**
273
274
```java
275
@Test
276
public void testFaultToleranceMigration() throws Exception {
277
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
278
env.setParallelism(4);
279
env.enableCheckpointing(50);
280
281
// Source that will fail and recover
282
FailingSource source = new FailingSource(
283
(ctx, eventSeq) -> ctx.collect(Tuple2.of((long) eventSeq, new IntType(eventSeq))),
284
1000, // total events
285
500, // fail after 500 events
286
5 // after 5 successful checkpoints
287
);
288
289
env.addSource(source)
290
.keyBy(value -> value.f0 % 4)
291
.map(new RecoveringMapFunction())
292
.addSink(new ValidatingSink<>(expectedResults));
293
294
String savepointPath = getResourceFilename("fault-tolerance-test-savepoint");
295
restoreAndExecute(env, savepointPath, Tuple2.of("processed", 1000));
296
}
297
```
298
299
**State Evolution Migration Test:**
300
301
```java
302
@Test
303
public void testStateEvolutionMigration() throws Exception {
304
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
305
env.setParallelism(1);
306
307
// Test state schema evolution
308
env.addSource(new MigrationTestUtils.CheckingNonParallelSourceWithListState(50))
309
.keyBy(value -> value.f0)
310
.process(new EvolvingProcessFunction()) // Function with evolved state schema
311
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>("evolved-count"));
312
313
String savepointPath = getResourceFilename("state-evolution-flink1.6-savepoint");
314
restoreAndExecute(env, savepointPath, Tuple2.of("evolved-count", 50));
315
}
316
```
317
318
This migration testing framework ensures that Flink applications maintain backward compatibility across version upgrades and that stateful streaming applications can successfully restore from savepoints created with previous versions.