0
# Test Base Classes
1
2
Abstract base classes providing common patterns and infrastructure for different testing scenarios including cancellation testing, fault tolerance testing, and recovery testing. These base classes standardize test setup, execution patterns, and verification procedures across different types of Flink tests.
3
4
## Capabilities
5
6
### Stream Fault Tolerance Test Base
7
8
Abstract base class for testing streaming applications under fault tolerance conditions.
9
10
```java { .api }
11
/**
12
* Abstract base class for fault tolerance testing of streaming applications
13
*/
14
public abstract class StreamFaultToleranceTestBase extends TestLogger {
15
16
// Test cluster configuration constants
17
public static final int NUM_TASK_MANAGERS = 2;
18
public static final int NUM_TASK_SLOTS = 8;
19
public static final int PARALLELISM = 4;
20
21
/**
22
* Define the streaming topology to be tested under fault conditions
23
* @param env Pre-configured StreamExecutionEnvironment
24
*/
25
public abstract void testProgram(StreamExecutionEnvironment env);
26
27
/**
28
* Verify test results after job completion
29
* Called after successful job execution to validate results
30
* @throws Exception if verification fails
31
*/
32
public abstract void postSubmit() throws Exception;
33
}
34
```
35
36
**Usage Example:**
37
38
```java
39
public class MyFaultToleranceTest extends StreamFaultToleranceTestBase {
40
41
private List<String> collectedResults = new ArrayList<>();
42
43
@Override
44
public void testProgram(StreamExecutionEnvironment env) {
45
// Configure environment for fault tolerance
46
env.setParallelism(PARALLELISM);
47
env.enableCheckpointing(100);
48
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000));
49
50
// Build fault-tolerant topology
51
env.addSource(new FaultTolerantSource())
52
.keyBy(value -> value.getKey())
53
.process(new StatefulProcessFunction())
54
.addSink(new CollectingSink(collectedResults));
55
}
56
57
@Override
58
public void postSubmit() throws Exception {
59
// Verify results after fault tolerance test
60
assertEquals(expectedResultCount, collectedResults.size());
61
assertTrue("Missing expected results", collectedResults.containsAll(expectedResults));
62
63
// Verify no duplicates after recovery
64
Set<String> uniqueResults = new HashSet<>(collectedResults);
65
assertEquals("Duplicate results detected", collectedResults.size(), uniqueResults.size());
66
}
67
68
@Test
69
public void testFaultTolerance() throws Exception {
70
// Test execution handled by base class infrastructure
71
runTest();
72
}
73
}
74
```
75
76
### Canceling Test Base
77
78
Abstract base class for testing job cancellation scenarios and cleanup behavior.
79
80
```java { .api }
81
/**
82
* Abstract base class for testing job cancellation and cleanup
83
*/
84
public abstract class CancelingTestBase extends TestLogger {
85
86
// Test execution constants
87
protected static final int PARALLELISM = 4;
88
protected static final Duration GET_FUTURE_TIMEOUT = Duration.ofSeconds(30);
89
90
/**
91
* Run a job and cancel it after specified time
92
* @param plan Job execution plan to run and cancel
93
* @param msecsTillCanceling Milliseconds to wait before canceling
94
* @param maxTimeTillCanceled Maximum time to wait for cancellation completion
95
* @throws Exception if job execution or cancellation fails
96
*/
97
protected void runAndCancelJob(
98
Plan plan,
99
int msecsTillCanceling,
100
int maxTimeTillCanceled) throws Exception;
101
}
102
```
103
104
**Usage Example:**
105
106
```java
107
public class MyCancellationTest extends CancelingTestBase {
108
109
@Test
110
public void testJobCancellation() throws Exception {
111
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
112
env.setParallelism(PARALLELISM);
113
114
// Create long-running job that can be cancelled
115
DataSet<String> input = env.fromElements("data");
116
input.map(new SlowMapper()) // Mapper that takes time to process
117
.output(new DiscardingOutputFormat<>());
118
119
Plan plan = env.createProgramPlan();
120
121
// Cancel job after 2 seconds, allow up to 10 seconds for cancellation
122
runAndCancelJob(plan, 2000, 10000);
123
124
// Test passes if job is successfully cancelled within time limit
125
}
126
}
127
```
128
129
### Recovery Test Base Classes
130
131
Base classes for testing different recovery strategies and failure scenarios.
132
133
```java { .api }
134
/**
135
* Base class for simple recovery testing scenarios
136
*/
137
public abstract class SimpleRecoveryITCaseBase extends TestLogger {
138
139
/**
140
* Execute recovery test with default configuration
141
* @throws Exception if recovery test fails
142
*/
143
protected abstract void executeRecoveryTest() throws Exception;
144
}
145
146
/**
147
* Base class for testing fixed delay restart strategy
148
*/
149
public abstract class SimpleRecoveryFixedDelayRestartStrategyITBase extends SimpleRecoveryITCaseBase {
150
151
/**
152
* Get restart strategy configuration for fixed delay
153
* @return RestartStrategy configured for fixed delay
154
*/
155
protected RestartStrategies.RestartStrategyConfiguration getRestartStrategy();
156
}
157
158
/**
159
* Base class for testing failure rate restart strategy
160
*/
161
public abstract class SimpleRecoveryFailureRateStrategyITBase extends SimpleRecoveryITCaseBase {
162
163
/**
164
* Get restart strategy configuration for failure rate limiting
165
* @return RestartStrategy configured for failure rate limiting
166
*/
167
protected RestartStrategies.RestartStrategyConfiguration getRestartStrategy();
168
}
169
```
170
171
### Prefix Count POJO
172
173
Common data type used in fault tolerance and recovery testing.
174
175
```java { .api }
176
/**
177
* POJO for prefix counting tests in fault tolerance scenarios
178
*/
179
public static class PrefixCount {
180
public String str;
181
public long count;
182
183
/**
184
* Default constructor
185
*/
186
public PrefixCount();
187
188
/**
189
* Constructor with string and count
190
* @param str String prefix
191
* @param count Count value
192
*/
193
public PrefixCount(String str, long count);
194
195
/**
196
* Check equality based on str and count fields
197
* @param obj Object to compare
198
* @return true if equal
199
*/
200
public boolean equals(Object obj);
201
202
/**
203
* Generate hash code based on str and count
204
* @return hash code
205
*/
206
public int hashCode();
207
208
/**
209
* String representation
210
* @return formatted string
211
*/
212
public String toString();
213
}
214
```
215
216
### Test Base Usage Patterns
217
218
Common patterns for implementing tests using base classes:
219
220
**Fault Tolerance Test Pattern:**
221
222
```java
223
public class StreamingJobFaultToleranceTest extends StreamFaultToleranceTestBase {
224
225
private TestListResultSink<PrefixCount> resultSink;
226
227
@Override
228
public void testProgram(StreamExecutionEnvironment env) {
229
// Configure for fault tolerance
230
env.setParallelism(PARALLELISM);
231
env.enableCheckpointing(50);
232
env.setStateBackend(new RocksDBStateBackend("file:///tmp/test"));
233
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000));
234
235
resultSink = new TestListResultSink<>();
236
237
// Create fault-tolerant streaming topology
238
env.addSource(new FailingSource(1000, 500, 3)) // Fail after 500 elements
239
.keyBy(value -> value.f0 % 4)
240
.process(new CountingProcessFunction())
241
.addSink(resultSink);
242
}
243
244
@Override
245
public void postSubmit() throws Exception {
246
List<PrefixCount> results = resultSink.getResult();
247
248
// Verify all expected results are present after recovery
249
assertEquals(4, results.size()); // One per key
250
251
long totalCount = results.stream().mapToLong(pc -> pc.count).sum();
252
assertEquals(1000, totalCount); // All elements processed exactly once
253
}
254
}
255
```
256
257
**Cancellation Test Pattern:**
258
259
```java
260
public class LongRunningJobCancellationTest extends CancelingTestBase {
261
262
@Test
263
public void testIterativeJobCancellation() throws Exception {
264
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
265
env.setParallelism(PARALLELISM);
266
267
// Create iterative job that runs indefinitely
268
DataSet<Long> initial = env.fromElements(1L);
269
IterativeDataSet<Long> iteration = initial.iterate(Integer.MAX_VALUE);
270
271
DataSet<Long> next = iteration.map(value -> value + 1);
272
DataSet<Long> result = iteration.closeWith(next);
273
274
result.output(new DiscardingOutputFormat<>());
275
276
Plan plan = env.createProgramPlan();
277
278
// Cancel after 3 seconds, allow up to 15 seconds for cancellation
279
runAndCancelJob(plan, 3000, 15000);
280
}
281
}
282
```
283
284
**Recovery Strategy Test Pattern:**
285
286
```java
287
public class FixedDelayRecoveryTest extends SimpleRecoveryFixedDelayRestartStrategyITBase {
288
289
@Override
290
protected void executeRecoveryTest() throws Exception {
291
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
292
env.setParallelism(2);
293
env.setRestartStrategy(getRestartStrategy());
294
env.enableCheckpointing(100);
295
296
TestListResultSink<String> sink = new TestListResultSink<>();
297
298
// Source that fails twice then succeeds
299
env.addSource(new RecoveringSource(3, 100))
300
.map(new StatelessMapper())
301
.addSink(sink);
302
303
env.execute("Recovery Test");
304
305
// Verify successful recovery
306
List<String> results = sink.getResult();
307
assertEquals(100, results.size());
308
assertTrue("Recovery failed", results.stream().allMatch(Objects::nonNull));
309
}
310
311
@Override
312
protected RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
313
// Fixed delay of 1 second, maximum 3 restart attempts
314
return RestartStrategies.fixedDelayRestart(3, 1000);
315
}
316
}
317
```
318
319
**Multi-Phase Test Pattern:**
320
321
```java
322
public class MultiPhaseRecoveryTest extends SimpleRecoveryITCaseBase {
323
324
@Override
325
protected void executeRecoveryTest() throws Exception {
326
// Phase 1: Generate savepoint
327
String savepointPath = runJobAndCreateSavepoint();
328
329
// Phase 2: Restore and verify
330
restoreJobAndVerifyResults(savepointPath);
331
332
// Phase 3: Test failure recovery
333
testFailureRecoveryFromSavepoint(savepointPath);
334
}
335
336
private String runJobAndCreateSavepoint() throws Exception {
337
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
338
// Implementation for savepoint generation
339
return savepointPath;
340
}
341
342
private void restoreJobAndVerifyResults(String savepointPath) throws Exception {
343
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
344
// Implementation for restoration and verification
345
}
346
347
private void testFailureRecoveryFromSavepoint(String savepointPath) throws Exception {
348
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
349
// Implementation for failure recovery testing
350
}
351
}
352
```
353
354
These test base classes provide standardized infrastructure for comprehensive testing of Flink applications under various failure and recovery scenarios, ensuring robust and reliable stream processing applications.