0
# Checkpointing and Migration Testing
1
2
Comprehensive framework for testing snapshot migration across Flink versions with utilities for state validation and checkpoint management. This framework enables testing of state compatibility, migration correctness, and checkpoint recovery scenarios.
3
4
## Capabilities
5
6
### Snapshot Migration Test Base
7
8
Abstract base class for testing snapshot migration across Flink versions, providing structured approach to create snapshots in one version and restore them in another.
9
10
```java { .api }
11
/**
12
* Base class for testing snapshot migration across Flink versions
13
*/
14
public abstract class SnapshotMigrationTestBase {
15
/**
16
* Execute job and create snapshot for migration testing
17
* @param job JobGraph to execute and snapshot
18
* @return SnapshotSpec containing snapshot metadata
19
* @throws Exception if execution or snapshotting fails
20
*/
21
protected SnapshotSpec executeAndSnapshot(JobGraph job) throws Exception;
22
23
/**
24
* Restore from snapshot and execute job to validate migration
25
* @param job JobGraph to execute with restored state
26
* @param snapshot SnapshotSpec containing snapshot location and metadata
27
* @throws Exception if restore or execution fails
28
*/
29
protected void restoreAndExecute(JobGraph job, SnapshotSpec snapshot) throws Exception;
30
31
/**
32
* Snapshot specification containing metadata for migration testing
33
*/
34
public static class SnapshotSpec {
35
/**
36
* Get the filesystem path to the snapshot
37
* @return String path to snapshot directory
38
*/
39
public String getSnapshotPath();
40
41
/**
42
* Get the Flink version that created this snapshot
43
* @return String version identifier
44
*/
45
public String getSnapshotVersion();
46
}
47
}
48
```
49
50
### Migration Test Utilities
51
52
Comprehensive utilities for migration testing including sources, sinks, and operators with state management capabilities.
53
54
```java { .api }
55
/**
56
* Utility class providing components for migration testing scenarios
57
*/
58
public class MigrationTestUtils {
59
60
/**
61
* Source function with operator list state for migration testing
62
*/
63
public static class CheckpointingNonParallelSourceWithListState
64
implements SourceFunction<Integer> {
65
66
/**
67
* Constructor for checkpointing source with list state
68
* @param numElements number of elements to emit
69
*/
70
public CheckpointingNonParallelSourceWithListState(int numElements);
71
}
72
73
/**
74
* Source function for validating restored list state after migration
75
*/
76
public static class CheckingNonParallelSourceWithListState
77
implements SourceFunction<Integer> {
78
79
/**
80
* Constructor for validation source
81
* @param numElements number of elements for validation
82
*/
83
public CheckingNonParallelSourceWithListState(int numElements);
84
}
85
86
/**
87
* Parallel source with union list state for migration testing
88
*/
89
public static class CheckpointingParallelSourceWithUnionListState
90
implements SourceFunction<Integer> {
91
92
/**
93
* Constructor for parallel source with union state
94
* @param numElements elements per subtask
95
*/
96
public CheckpointingParallelSourceWithUnionListState(int numElements);
97
}
98
99
/**
100
* Parallel source for validating union list state after migration
101
*/
102
public static class CheckingParallelSourceWithUnionListState
103
implements SourceFunction<Integer> {
104
105
/**
106
* Constructor for parallel validation source
107
* @param numElements number of elements for validation
108
*/
109
public CheckingParallelSourceWithUnionListState(int numElements);
110
}
111
112
/**
113
* Sink that counts elements using Flink accumulators
114
*/
115
public static class AccumulatorCountingSink<T> implements SinkFunction<T> {
116
117
/**
118
* Constructor for accumulator counting sink
119
* @param accumulatorName name of the accumulator to use
120
*/
121
public AccumulatorCountingSink(String accumulatorName);
122
}
123
124
/**
125
* Source with configurable failure injection for testing recovery
126
*/
127
public static class FailingSource implements SourceFunction<Integer> {
128
129
/**
130
* Constructor for failing source
131
* @param failAfterElements number of elements before failure
132
* @param maxElements maximum elements to emit after recovery
133
*/
134
public FailingSource(int failAfterElements, int maxElements);
135
}
136
137
/**
138
* Source for testing job cancellation scenarios
139
*/
140
public static class CancellingIntegerSource implements SourceFunction<Integer> {
141
142
/**
143
* Constructor for cancelling source
144
* @param cancelAfterElements elements to emit before triggering cancellation
145
*/
146
public CancellingIntegerSource(int cancelAfterElements);
147
}
148
149
/**
150
* Sink that accumulates integer values for validation
151
*/
152
public static class AccumulatingIntegerSink implements SinkFunction<Integer> {
153
154
/**
155
* Constructor for accumulating sink
156
* @param outputList list to accumulate values into
157
*/
158
public AccumulatingIntegerSink(List<Integer> outputList);
159
}
160
161
/**
162
* Sink for validating output against expected values
163
*/
164
public static class ValidatingSink<T> implements SinkFunction<T> {
165
166
/**
167
* Constructor for validating sink
168
* @param expectedValues expected output values for validation
169
*/
170
public ValidatingSink(List<T> expectedValues);
171
}
172
}
173
```
174
175
**Usage Examples:**
176
177
```java
178
import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
179
import org.apache.flink.test.checkpointing.utils.MigrationTestUtils.*;
180
181
// Basic migration test
182
public class StateMigrationTest extends SnapshotMigrationTestBase {
183
184
@Test
185
public void testListStateMigration() throws Exception {
186
// Create job with stateful source
187
JobGraph job = new JobGraph();
188
job.addVertex(new JobVertex("source",
189
new CheckpointingNonParallelSourceWithListState(100)));
190
job.addVertex(new JobVertex("sink",
191
new AccumulatorCountingSink<>("count")));
192
193
// Execute and create snapshot
194
SnapshotSpec snapshot = executeAndSnapshot(job);
195
196
// Create validation job
197
JobGraph validationJob = new JobGraph();
198
validationJob.addVertex(new JobVertex("validation-source",
199
new CheckingNonParallelSourceWithListState(
200
Arrays.asList(1, 2, 3, 4, 5))));
201
202
// Restore and validate
203
restoreAndExecute(validationJob, snapshot);
204
}
205
206
@Test
207
public void testUnionStateMigration() throws Exception {
208
// Test parallel source with union state
209
JobGraph job = new JobGraph();
210
JobVertex sourceVertex = new JobVertex("parallel-source",
211
new CheckpointingParallelSourceWithUnionListState(50));
212
sourceVertex.setParallelism(4);
213
job.addVertex(sourceVertex);
214
215
SnapshotSpec snapshot = executeAndSnapshot(job);
216
217
// Validation with different parallelism
218
JobGraph validationJob = new JobGraph();
219
JobVertex validationVertex = new JobVertex("validation-source",
220
new CheckingParallelSourceWithUnionListState(
221
expectedUnionState));
222
validationVertex.setParallelism(2);
223
validationJob.addVertex(validationVertex);
224
225
restoreAndExecute(validationJob, snapshot);
226
}
227
}
228
```