0
# Test Base Classes
1
2
Foundation classes that provide standardized setup, execution patterns, and infrastructure for different types of Flink testing scenarios. These abstract base classes handle cluster configuration, test execution coordination, and provide template methods for specific testing patterns.
3
4
## Capabilities
5
6
### StreamFaultToleranceTestBase
7
8
Base class for fault tolerant streaming program tests with automatic checkpointing, failure recovery, and cluster management.
9
10
```java { .api }
11
/**
12
* Base class for fault tolerant streaming program tests
13
* Automatically sets up MiniCluster with 3 task managers, 4 slots each (PARALLELISM = 12)
14
* Enables checkpointing every 500ms with infinite restart strategy
15
*/
16
public abstract class StreamFaultToleranceTestBase extends TestLogger {
17
18
protected static final int NUM_TASK_MANAGERS = 3;
19
protected static final int NUM_TASK_SLOTS = 4;
20
protected static final int PARALLELISM = 12;
21
22
/**
23
* Implementations define the test topology using the provided environment
24
* @param env StreamExecutionEnvironment with checkpointing and restarts configured
25
*/
26
public abstract void testProgram(StreamExecutionEnvironment env);
27
28
/**
29
* Implementations provide test verification logic executed after job completion
30
* Use this to verify results, check state, or validate behavior
31
*/
32
public abstract void postSubmit() throws Exception;
33
34
/**
35
* Runs the complete fault tolerance test cycle
36
* Sets up environment, executes testProgram(), handles failures, runs postSubmit()
37
*/
38
@Test
39
public void runCheckpointedProgram() throws Exception;
40
}
41
42
/**
43
* POJO for storing prefix, value, and count in fault tolerance tests
44
*/
45
public static class PrefixCount implements Serializable {
46
public String prefix;
47
public String value;
48
public long count;
49
50
public PrefixCount() {}
51
public PrefixCount(String prefix, String value, long count);
52
}
53
```
54
55
**Usage Example:**
56
57
```java
58
public class MyFaultToleranceTest extends StreamFaultToleranceTestBase {
59
60
@Override
61
public void testProgram(StreamExecutionEnvironment env) {
62
env.fromElements("a", "b", "c", "a", "b")
63
.keyBy(x -> x)
64
.map(new StatefulMapper())
65
.addSink(new TestListResultSink<>());
66
}
67
68
@Override
69
public void postSubmit() throws Exception {
70
List<String> results = TestListResultSink.getResults();
71
assertTrue("Results should contain processed elements", results.size() > 0);
72
}
73
}
74
```
75
76
### SavepointMigrationTestBase
77
78
Base class for savepoint migration testing with support for creating savepoints from one job and restoring them in another job to test state compatibility.
79
80
```java { .api }
81
/**
82
* Base class for savepoint migration testing
83
* Provides infrastructure for savepoint creation, restoration, and verification
84
*/
85
public abstract class SavepointMigrationTestBase extends TestBaseUtils {
86
87
protected static final int DEFAULT_PARALLELISM = 4;
88
89
/**
90
* Get resource file path for test savepoints
91
* @param filename Resource filename to locate
92
* @return Absolute path to resource file
93
*/
94
protected static String getResourceFilename(String filename);
95
96
/**
97
* Execute job and create savepoint when specified accumulators reach expected values
98
* @param env StreamExecutionEnvironment configured for the job
99
* @param savepointPath Path where savepoint should be saved
100
* @param expectedAccumulators Array of accumulator name/value pairs to wait for
101
*/
102
@SafeVarargs
103
protected final void executeAndSavepoint(
104
StreamExecutionEnvironment env,
105
String savepointPath,
106
Tuple2<String, Integer>... expectedAccumulators) throws Exception;
107
108
/**
109
* Restore job from savepoint and execute until specified accumulators reach expected values
110
* @param env StreamExecutionEnvironment configured for the restored job
111
* @param savepointPath Path to savepoint for restoration
112
* @param expectedAccumulators Array of accumulator name/value pairs to wait for
113
*/
114
@SafeVarargs
115
protected final void restoreAndExecute(
116
StreamExecutionEnvironment env,
117
String savepointPath,
118
Tuple2<String, Integer>... expectedAccumulators) throws Exception;
119
}
120
```
121
122
**Usage Example:**
123
124
```java
125
public class MySavepointMigrationTest extends SavepointMigrationTestBase {
126
127
@Test
128
public void testMigration() throws Exception {
129
// Create and execute job that generates savepoint
130
StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment();
131
env1.setParallelism(DEFAULT_PARALLELISM);
132
env1.addSource(new TestSource())
133
.keyBy(x -> x.f0)
134
.map(new StatefulFunction())
135
.addSink(new AccumulatorCountingSink<>());
136
137
executeAndSavepoint(env1, "test-savepoint",
138
Tuple2.of(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, 100));
139
140
// Restore from savepoint and verify state
141
StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment();
142
env2.setParallelism(DEFAULT_PARALLELISM);
143
env2.addSource(new VerifyingTestSource())
144
.keyBy(x -> x.f0)
145
.map(new StatefulFunction())
146
.addSink(new AccumulatorCountingSink<>());
147
148
restoreAndExecute(env2, "test-savepoint",
149
Tuple2.of(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, 50));
150
}
151
}
152
```
153
154
### CancelingTestBase
155
156
Base class for testing job cancellation scenarios with controlled timing and proper cluster setup.
157
158
```java { .api }
159
/**
160
* Base class for testing job cancellation
161
* Sets up MiniCluster with 2 task managers, 4 slots each (PARALLELISM = 4)
162
*/
163
public abstract class CancelingTestBase extends TestLogger {
164
165
protected static final int PARALLELISM = 4;
166
167
/**
168
* Submit job, wait specified time, then cancel and verify proper cancellation
169
* @param plan Flink execution plan to run and cancel
170
* @param msecsTillCanceling Milliseconds to wait before canceling
171
* @param maxTimeTillCanceled Maximum time to wait for cancellation completion
172
*/
173
protected void runAndCancelJob(Plan plan, int msecsTillCanceling, int maxTimeTillCanceled) throws Exception;
174
}
175
```
176
177
### AbstractOperatorRestoreTestBase
178
179
Base class for testing operator state migration and restoration across Flink versions with standardized savepoint handling.
180
181
```java { .api }
182
/**
183
* Base class for testing operator state migration and restoration
184
* Provides infrastructure for savepoint-based migration testing
185
*/
186
public abstract class AbstractOperatorRestoreTestBase {
187
188
/**
189
* Create job that generates migration savepoint
190
* @param env StreamExecutionEnvironment for job creation
191
* @return JobGraph for the migration job
192
*/
193
public abstract JobGraph createMigrationJob(StreamExecutionEnvironment env) throws Exception;
194
195
/**
196
* Create job that restores from migration savepoint and verifies state
197
* @param env StreamExecutionEnvironment for job creation
198
* @return JobGraph for the restoration job
199
*/
200
public abstract JobGraph createRestoredJob(StreamExecutionEnvironment env) throws Exception;
201
202
/**
203
* Get name of savepoint resource for this test
204
* @return Resource name for the savepoint
205
*/
206
public abstract String getMigrationSavepointName();
207
}
208
```
209
210
### AbstractKeyedOperatorRestoreTestBase
211
212
Specialized base class for testing keyed operator state restoration.
213
214
```java { .api }
215
/**
216
* Specialized base for testing keyed operator state restoration
217
* Extends AbstractOperatorRestoreTestBase with keyed state specific functionality
218
*/
219
public abstract class AbstractKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {
220
// Keyed state specific restoration testing
221
}
222
```
223
224
### AbstractNonKeyedOperatorRestoreTestBase
225
226
Specialized base class for testing non-keyed operator state restoration.
227
228
```java { .api }
229
/**
230
* Specialized base for testing non-keyed operator state restoration
231
* Extends AbstractOperatorRestoreTestBase with non-keyed state specific functionality
232
*/
233
public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {
234
// Non-keyed state specific restoration testing
235
}
236
```
237
238
### SimpleRecoveryITCaseBase
239
240
Base class for testing job recovery scenarios with failure simulation and restart strategies.
241
242
```java { .api }
243
/**
244
* Base class for testing job recovery scenarios
245
* Provides infrastructure for testing failed runs followed by successful runs
246
*/
247
public abstract class SimpleRecoveryITCaseBase {
248
// Abstract methods for defining failing and successful execution plans
249
// Built-in recovery and restart strategy testing
250
}
251
```
252
253
## Common Patterns
254
255
### Test Setup Pattern
256
All base classes follow a consistent pattern:
257
1. **Cluster Setup**: Standardized MiniClusterResource configuration
258
2. **Environment Configuration**: Proper timeouts, parallelism, and checkpointing
259
3. **Execution Coordination**: Template methods for test orchestration
260
4. **Result Verification**: Abstract methods for test-specific validation
261
262
### Error Handling
263
Base classes handle common error scenarios:
264
- **Job Submission Failures**: Automatic retry and proper error reporting
265
- **Timeout Handling**: Configurable deadlines with clear failure messages
266
- **Resource Cleanup**: Automatic cleanup of temporary files and cluster resources
267
- **Exception Propagation**: Proper exception handling and test failure reporting