0
# Test Base Classes
1
2
Test base classes provide standardized testing patterns, cluster lifecycle management, and support for parameterized testing across multiple execution modes. They handle the common setup and teardown required for Flink testing.
3
4
## Core Base Classes
5
6
### AbstractTestBase
7
8
Base class for tests that run test programs in a Flink mini cluster with automatic cluster lifecycle management.
9
10
```java { .api }
11
public abstract class AbstractTestBase extends TestBaseUtils {
12
@ClassRule
13
public static final TemporaryFolder temporaryFolder;
14
15
public AbstractTestBase(Configuration config);
16
17
public void startCluster() throws Exception;
18
public void stopCluster() throws Exception;
19
20
public int getTaskManagerNumSlots();
21
public void setTaskManagerNumSlots(int taskManagerNumSlots);
22
public int getNumTaskManagers();
23
public void setNumTaskManagers(int numTaskManagers);
24
25
public String getTempDirPath(String dirName) throws IOException;
26
public String getTempFilePath(String fileName) throws IOException;
27
public String createTempFile(String fileName, String contents) throws IOException;
28
public File createAndRegisterTempFile(String fileName) throws IOException;
29
}
30
```
31
32
**Usage Example:**
33
34
```java
35
public class MyCustomTest extends AbstractTestBase {
36
public MyCustomTest() {
37
super(new Configuration());
38
setNumTaskManagers(2);
39
setTaskManagerNumSlots(4);
40
}
41
42
@Test
43
public void testMyApplication() throws Exception {
44
startCluster();
45
46
// Create temp input file
47
String inputPath = createTempFile("input.txt", "line1\nline2\nline3");
48
49
// Your test logic here
50
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
51
DataSet<String> input = env.readTextFile(inputPath);
52
// ... test logic
53
54
stopCluster();
55
}
56
}
57
```
58
59
## Batch Testing Base Classes
60
61
### JavaProgramTestBase
62
63
Base class for Java API program tests supporting multiple execution modes (cluster, collection, object reuse).
64
65
```java { .api }
66
public abstract class JavaProgramTestBase extends AbstractTestBase {
67
public JavaProgramTestBase();
68
public JavaProgramTestBase(Configuration config);
69
70
public void setParallelism(int parallelism);
71
public void setNumberOfTestRepetitions(int numberOfTestRepetitions);
72
public int getParallelism();
73
public JobExecutionResult getLatestExecutionResult();
74
public boolean isCollectionExecution();
75
76
protected abstract void testProgram() throws Exception;
77
protected void preSubmit() throws Exception;
78
protected void postSubmit() throws Exception;
79
protected boolean skipCollectionExecution();
80
81
@Test
82
public void testJobWithObjectReuse() throws Exception;
83
@Test
84
public void testJobWithoutObjectReuse() throws Exception;
85
@Test
86
public void testJobCollectionExecution() throws Exception;
87
}
88
```
89
90
**Usage Example:**
91
92
```java
93
public class WordCountTest extends JavaProgramTestBase {
94
95
@Override
96
protected void testProgram() throws Exception {
97
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
98
99
DataSet<String> text = env.fromElements(
100
"hello world",
101
"hello flink",
102
"world flink"
103
);
104
105
DataSet<Tuple2<String, Integer>> counts = text
106
.flatMap(new Tokenizer())
107
.groupBy(0)
108
.sum(1);
109
110
List<Tuple2<String, Integer>> result = counts.collect();
111
112
String expected = "flink,2\nhello,2\nworld,2";
113
compareResultAsTuples(result, expected);
114
}
115
116
@Override
117
protected void preSubmit() throws Exception {
118
// Setup before job execution
119
setParallelism(4);
120
}
121
122
@Override
123
protected boolean skipCollectionExecution() {
124
// Skip collection execution for this test
125
return false;
126
}
127
}
128
```
129
130
### MultipleProgramsTestBase
131
132
Base class for parameterized unit tests that run multiple tests and reuse the same Flink cluster across test methods.
133
134
```java { .api }
135
public class MultipleProgramsTestBase extends TestBaseUtils {
136
protected static final int DEFAULT_PARALLELISM = 4;
137
protected static boolean startWebServer = false;
138
protected static LocalFlinkMiniCluster cluster = null;
139
140
public MultipleProgramsTestBase(TestExecutionMode mode);
141
142
@Before
143
public void setupEnvironment();
144
@After
145
public void teardownEnvironment();
146
@BeforeClass
147
public static void setup() throws Exception;
148
@AfterClass
149
public static void teardown() throws Exception;
150
151
@Parameterized.Parameters(name = "Execution mode = {0}")
152
public static Collection<Object[]> executionModes();
153
}
154
155
public enum TestExecutionMode {
156
CLUSTER,
157
CLUSTER_OBJECT_REUSE,
158
COLLECTION
159
}
160
```
161
162
**Usage Example:**
163
164
```java
165
@RunWith(Parameterized.class)
166
public class ParameterizedTest extends MultipleProgramsTestBase {
167
168
public ParameterizedTest(TestExecutionMode mode) {
169
super(mode);
170
}
171
172
@Test
173
public void testWordCount() throws Exception {
174
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
175
176
// Test runs in the mode specified by constructor parameter
177
DataSet<String> input = env.fromElements("hello", "world", "hello");
178
List<String> result = input.distinct().collect();
179
180
compareResultAsText(result, "hello\nworld");
181
}
182
183
@Test
184
public void testFilter() throws Exception {
185
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
186
187
DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);
188
List<Integer> evenNumbers = numbers.filter(x -> x % 2 == 0).collect();
189
190
compareResultAsText(evenNumbers, "2\n4");
191
}
192
}
193
```
194
195
## Streaming Testing Base Classes
196
197
### StreamingProgramTestBase
198
199
Abstract base class for streaming program tests.
200
201
```java { .api }
202
public abstract class StreamingProgramTestBase extends AbstractTestBase {
203
protected static final int DEFAULT_PARALLELISM = 4;
204
205
public StreamingProgramTestBase();
206
207
public void setParallelism(int parallelism);
208
public int getParallelism();
209
210
protected abstract void testProgram() throws Exception;
211
protected void preSubmit() throws Exception;
212
protected void postSubmit() throws Exception;
213
214
@Test
215
public void testJob() throws Exception;
216
}
217
```
218
219
**Usage Example:**
220
221
```java
222
public class StreamingWordCountTest extends StreamingProgramTestBase {
223
224
@Override
225
protected void testProgram() throws Exception {
226
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
227
env.setParallelism(getParallelism());
228
229
DataStream<String> text = env.fromElements(
230
"hello world",
231
"hello flink"
232
);
233
234
DataStream<Tuple2<String, Integer>> counts = text
235
.flatMap(new Tokenizer())
236
.keyBy(0)
237
.sum(1);
238
239
counts.print();
240
env.execute("Streaming WordCount");
241
}
242
243
@Override
244
protected void preSubmit() throws Exception {
245
setParallelism(2);
246
}
247
}
248
```
249
250
### StreamingMultipleProgramsTestBase
251
252
Base class for streaming unit tests that run multiple tests and reuse the same Flink cluster.
253
254
```java { .api }
255
public class StreamingMultipleProgramsTestBase extends AbstractTestBase {
256
protected static final int DEFAULT_PARALLELISM = 4;
257
protected static LocalFlinkMiniCluster cluster;
258
protected static final Logger LOG;
259
260
public StreamingMultipleProgramsTestBase();
261
262
@BeforeClass
263
public static void setup() throws Exception;
264
@AfterClass
265
public static void teardown() throws Exception;
266
}
267
```
268
269
**Usage Example:**
270
271
```java
272
public class StreamingIntegrationTest extends StreamingMultipleProgramsTestBase {
273
274
@Test
275
public void testStreamingMap() throws Exception {
276
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
277
278
DataStream<String> input = env.fromElements("a", "b", "c");
279
input.map(String::toUpperCase).print();
280
281
env.execute("Map Test");
282
}
283
284
@Test
285
public void testStreamingFilter() throws Exception {
286
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
287
288
DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);
289
numbers.filter(x -> x > 3).print();
290
291
env.execute("Filter Test");
292
}
293
}
294
```
295
296
## Common Patterns
297
298
### Cluster Configuration
299
300
All base classes support configuration of cluster parameters:
301
302
```java
303
public MyTest() {
304
super(new Configuration());
305
setNumTaskManagers(2);
306
setTaskManagerNumSlots(8);
307
}
308
```
309
310
### Temporary File Management
311
312
Use the provided temporary file utilities for test data:
313
314
```java
315
String inputFile = createTempFile("test-input.txt", "test data content");
316
String outputDir = getTempDirPath("test-output");
317
```
318
319
### Test Lifecycle Hooks
320
321
Override lifecycle methods for custom setup/teardown:
322
323
```java
324
@Override
325
protected void preSubmit() throws Exception {
326
// Setup before job execution
327
}
328
329
@Override
330
protected void postSubmit() throws Exception {
331
// Validation after job execution
332
}
333
```