0
# Checkpointing and State Management
1
2
Infrastructure for testing savepoint migration, state compatibility, and checkpoint recovery across Apache Flink versions. These utilities are essential for validating upgrade paths and ensuring state serialization compatibility.
3
4
## Core Base Classes
5
6
### SavepointMigrationTestBase
7
8
Abstract base class providing infrastructure for testing savepoint migration between Flink versions.
9
10
```java { .api }
11
public abstract class SavepointMigrationTestBase extends TestBaseUtils {
12
@Rule
13
protected TemporaryFolder tempFolder;
14
protected LocalFlinkMiniCluster cluster;
15
16
@Before
17
public void setup() throws Exception;
18
19
@After
20
public void cleanup() throws Exception;
21
22
// Core migration testing methods
23
protected void executeAndSavepoint(StreamExecutionEnvironment env, String savepointPath, Tuple2<String, Integer>... expectedAccumulators) throws Exception;
24
protected void restoreAndExecute(StreamExecutionEnvironment env, String savepointPath, Tuple2<String, Integer>... expectedAccumulators) throws Exception;
25
26
// Resource management
27
protected static String getResourceFilename(String filename);
28
29
// Cluster management
30
protected void startCluster() throws Exception;
31
protected void stopCluster() throws Exception;
32
}
33
```
34
35
### AbstractOperatorRestoreTestBase
36
37
Base class for testing operator state restoration across versions with two-step migration testing.
38
39
```java { .api }
40
public abstract class AbstractOperatorRestoreTestBase {
41
protected Configuration config;
42
protected LocalFlinkMiniCluster cluster;
43
44
@Before
45
public void setup() throws Exception;
46
47
@After
48
public void cleanup() throws Exception;
49
50
// Migration workflow methods
51
protected abstract JobGraph createMigrationJob() throws Exception;
52
protected abstract JobGraph createRestoredJob() throws Exception;
53
protected abstract String getMigrationSavepointName();
54
55
// Test execution
56
@Test
57
public void testRestore() throws Exception;
58
}
59
```
60
61
### AbstractKeyedOperatorRestoreTestBase
62
63
Specialized base for testing keyed operator state restoration with parameterized testing.
64
65
```java { .api }
66
public abstract class AbstractKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {
67
@Parameterized.Parameters(name = "Savepoint: {0}")
68
public static Collection<Tuple2<String, String>> getParameters();
69
70
protected String savepointPath;
71
protected String savepointVersion;
72
73
public AbstractKeyedOperatorRestoreTestBase(String savepointPath, String savepointVersion);
74
}
75
```
76
77
### AbstractNonKeyedOperatorRestoreTestBase
78
79
Specialized base for testing non-keyed operator state restoration.
80
81
```java { .api }
82
public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {
83
@Parameterized.Parameters(name = "Savepoint: {0}")
84
public static Collection<Tuple2<String, String>> getParameters();
85
86
protected String savepointPath;
87
protected String savepointVersion;
88
89
public AbstractNonKeyedOperatorRestoreTestBase(String savepointPath, String savepointVersion);
90
}
91
```
92
93
## Savepoint Migration Test Cases
94
95
### StatefulJobSavepointFrom11MigrationITCase
96
97
Concrete test for validating savepoint migration from Flink 1.1.
98
99
```java { .api }
100
public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigrationTestBase {
101
private static final int NUM_SOURCE_ELEMENTS = 4;
102
103
@Test
104
public void testSavepoint() throws Exception;
105
106
// Job creation methods
107
private JobGraph createJobGraphV2() throws IOException;
108
private JobGraph createJobGraphV3() throws IOException;
109
110
@Override
111
protected String getResourceFilename(String filename);
112
}
113
```
114
115
### StatefulJobSavepointFrom12MigrationITCase
116
117
Concrete test for validating savepoint migration from Flink 1.2.
118
119
```java { .api }
120
public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigrationTestBase {
121
private static final int NUM_SOURCE_ELEMENTS = 4;
122
123
@Test
124
public void testSavepoint() throws Exception;
125
126
// Job graph creation
127
private JobGraph createJobGraph() throws IOException;
128
129
@Override
130
protected String getResourceFilename(String filename);
131
}
132
```
133
134
### StatefulJobSavepointFrom13MigrationITCase
135
136
Test for validating savepoint migration within Flink 1.3.
137
138
```java { .api }
139
public class StatefulJobSavepointFrom13MigrationITCase extends SavepointMigrationTestBase {
140
@Test
141
public void testSavepoint() throws Exception;
142
143
@Override
144
protected String getResourceFilename(String filename);
145
}
146
```
147
148
## Stream Fault Tolerance Base
149
150
### StreamFaultToleranceTestBase
151
152
Infrastructure for testing fault tolerance in streaming applications with multi-TaskManager clusters.
153
154
```java { .api }
155
public abstract class StreamFaultToleranceTestBase {
156
protected static final int NUM_TASK_MANAGERS = 2;
157
protected LocalFlinkMiniCluster cluster;
158
protected Configuration config;
159
160
@Before
161
public void setup() throws Exception;
162
163
@After
164
public void cleanup() throws Exception;
165
166
// Abstract methods for test implementation
167
protected abstract void testProgram(StreamExecutionEnvironment env);
168
protected void postSubmit() throws Exception;
169
170
// Execution control
171
@Test
172
public void runCheckpointedProgram() throws Exception;
173
}
174
```
175
176
## Usage Examples
177
178
### Basic Savepoint Migration Test
179
180
```java
181
public class MySavepointMigrationTest extends SavepointMigrationTestBase {
182
183
@Test
184
public void testSavepointMigration() throws Exception {
185
// Create initial streaming environment and execute
186
StreamExecutionEnvironment env1 = createV1Environment();
187
188
// Execute and create savepoint
189
executeAndSavepoint(env1, "my-migration-savepoint");
190
191
// Create updated streaming environment
192
StreamExecutionEnvironment env2 = createV2Environment();
193
194
// Restore from savepoint and execute
195
restoreAndExecute(env2, "my-migration-savepoint");
196
}
197
198
private StreamExecutionEnvironment createV1Environment() {
199
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
200
env.enableCheckpointing(500);
201
202
env.fromElements(1, 2, 3, 4, 5)
203
.keyBy(x -> x % 2)
204
.map(new StatefulMapper())
205
.addSink(new DiscardingSink<>());
206
207
return env;
208
}
209
210
private StreamExecutionEnvironment createV2Environment() {
211
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
212
env.enableCheckpointing(500);
213
214
// Modified job with additional transformation
215
env.fromElements(1, 2, 3, 4, 5)
216
.keyBy(x -> x % 2)
217
.map(new StatefulMapper())
218
.map(x -> x * 2) // Additional transformation
219
.addSink(new DiscardingSink<>());
220
221
return env;
222
}
223
224
@Override
225
protected String getResourceFilename(String filename) {
226
return "savepoints/" + filename;
227
}
228
}
229
```
230
231
### Operator State Restoration Test
232
233
```java
234
public class MyOperatorRestoreTest extends AbstractKeyedOperatorRestoreTestBase {
235
236
public MyOperatorRestoreTest(String savepointPath, String savepointVersion) {
237
super(savepointPath, savepointVersion);
238
}
239
240
@Parameterized.Parameters(name = "Savepoint: {0}")
241
public static Collection<Tuple2<String, String>> getParameters() {
242
return Arrays.asList(
243
new Tuple2<>("keyed-flink1.2", "1.2"),
244
new Tuple2<>("keyed-flink1.3", "1.3")
245
);
246
}
247
248
@Override
249
protected JobGraph createMigrationJob() throws Exception {
250
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
251
env.enableCheckpointing(500);
252
253
// Create job that generates state
254
env.fromElements(1, 2, 3, 4, 5)
255
.keyBy(x -> x)
256
.map(new StatefulKeyedFunction())
257
.addSink(new DiscardingSink<>());
258
259
return env.getStreamGraph().getJobGraph();
260
}
261
262
@Override
263
protected JobGraph createRestoredJob() throws Exception {
264
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
265
266
// Create job that validates restored state
267
env.fromElements(6, 7, 8, 9, 10)
268
.keyBy(x -> x)
269
.map(new ValidatingKeyedFunction())
270
.addSink(new DiscardingSink<>());
271
272
return env.getStreamGraph().getJobGraph();
273
}
274
275
@Override
276
protected String getMigrationSavepointName() {
277
return savepointPath;
278
}
279
}
280
```
281
282
### Stream Fault Tolerance Test
283
284
```java
285
public class MyFaultToleranceTest extends StreamFaultToleranceTestBase {
286
287
@Override
288
protected void testProgram(StreamExecutionEnvironment env) {
289
env.enableCheckpointing(500);
290
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000));
291
292
env.fromElements(1, 2, 3, 4, 5)
293
.map(new FailingMapper()) // Intentionally fails
294
.keyBy(x -> x % 2)
295
.map(new RecoveringMapper()) // Recovers from failure
296
.addSink(new ValidatingSink());
297
}
298
299
@Override
300
protected void postSubmit() throws Exception {
301
// Validation logic after job completion
302
Thread.sleep(2000); // Wait for completion
303
// Verify results through external validation
304
}
305
}
306
```