0
# Test Utilities
1
2
TestBaseUtils provides comprehensive utilities for Flink testing including cluster management, result comparison, file I/O operations, and test data handling.
3
4
## Cluster Management
5
6
### Starting and Stopping Clusters
7
8
```java { .api }
9
public class TestBaseUtils extends TestLogger {
10
public static FiniteDuration DEFAULT_TIMEOUT;
11
12
// Start a mini cluster with full configuration
13
public static LocalFlinkMiniCluster startCluster(int numTaskManagers, int taskManagerNumSlots,
14
boolean startWebserver, boolean startZooKeeper,
15
boolean singleActorSystem) throws Exception;
16
17
// Start cluster with configuration object
18
public static LocalFlinkMiniCluster startCluster(Configuration config, boolean singleActorSystem) throws Exception;
19
20
// Stop cluster with timeout
21
public static void stopCluster(LocalFlinkMiniCluster executor, FiniteDuration timeout) throws Exception;
22
}
23
```
24
25
**Usage Example:**
26
27
```java
28
// Start a cluster with 2 task managers, 4 slots each
29
LocalFlinkMiniCluster cluster = TestBaseUtils.startCluster(2, 4, false, false, true);
30
31
try {
32
// Use cluster for testing
33
TestEnvironment testEnv = new TestEnvironment(cluster, 4, false);
34
// ... run tests
35
} finally {
36
// Always clean up
37
TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
38
}
39
```
40
41
## Result Reading and File I/O
42
43
### Reading Test Results
44
45
```java { .api }
46
// Get readers for result files
47
public static BufferedReader[] getResultReader(String resultPath) throws IOException;
48
public static BufferedReader[] getResultReader(String resultPath, String[] excludePrefixes,
49
boolean inOrderOfFiles) throws IOException;
50
51
// Get input streams for result files
52
public static BufferedInputStream[] getResultInputStream(String resultPath) throws IOException;
53
public static BufferedInputStream[] getResultInputStream(String resultPath, String[] excludePrefixes) throws IOException;
54
55
// Read all result lines into a list
56
public static void readAllResultLines(List<String> target, String resultPath) throws IOException;
57
public static void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes) throws IOException;
58
public static void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes,
59
boolean inOrderOfFiles) throws IOException;
60
```
61
62
**Usage Example:**
63
64
```java
65
// Read results from output directory
66
List<String> results = new ArrayList<>();
67
TestBaseUtils.readAllResultLines(results, "/path/to/output");
68
69
// Read results excluding log files
70
String[] excludePatterns = {"_logs", ".log"};
71
TestBaseUtils.readAllResultLines(results, "/path/to/output", excludePatterns);
72
73
// Process results
74
for (String line : results) {
75
System.out.println("Result: " + line);
76
}
77
```
78
79
## Result Comparison and Validation
80
81
### Text-based Result Comparison
82
83
```java { .api }
84
// Compare results line by line (order doesn't matter)
85
public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath) throws Exception;
86
public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath,
87
String[] excludePrefixes) throws Exception;
88
89
// Compare results with strict ordering
90
public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath) throws Exception;
91
public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath,
92
String[] excludePrefixes) throws Exception;
93
94
// Check results against regular expression
95
public static void checkLinesAgainstRegexp(String resultPath, String regexp);
96
```
97
98
**Usage Example:**
99
100
```java
101
// Compare unordered results
102
String expected = "apple\nbanana\ncherry";
103
TestBaseUtils.compareResultsByLinesInMemory(expected, "/path/to/results");
104
105
// Compare with strict ordering
106
TestBaseUtils.compareResultsByLinesInMemoryWithStrictOrder(expected, "/path/to/results");
107
108
// Validate format with regex
109
TestBaseUtils.checkLinesAgainstRegexp("/path/to/results", "\\d+,\\w+");
110
```
111
112
### Collection-based Result Comparison
113
114
```java { .api }
115
// Compare collected results as tuples
116
public static <T> void compareResultAsTuples(List<T> result, String expected);
117
118
// Compare collected results as text
119
public static <T> void compareResultAsText(List<T> result, String expected);
120
121
// Compare with strict ordering
122
public static <T> void compareOrderedResultAsText(List<T> result, String expected);
123
public static <T> void compareOrderedResultAsText(List<T> result, String expected, boolean asTuples);
124
125
// Check if results contain expected values
126
public static <T> void containsResultAsText(List<T> result, String expected);
127
128
// Generic comparison with custom comparator
129
public static <X> void compareResultCollections(List<X> expected, List<X> actual, Comparator<X> comparator);
130
```
131
132
**Usage Example:**
133
134
```java
135
// Test word count results
136
DataSet<Tuple2<String, Integer>> wordCounts = // ... your computation
137
List<Tuple2<String, Integer>> results = wordCounts.collect();
138
139
String expected = "apple,3\nbanana,1\ncherry,2";
140
TestBaseUtils.compareResultAsTuples(results, expected);
141
142
// Test simple string results
143
DataSet<String> words = // ... your computation
144
List<String> wordList = words.collect();
145
146
String expectedWords = "apple\nbanana\ncherry";
147
TestBaseUtils.compareResultAsText(wordList, expectedWords);
148
```
149
150
### Numerical Result Comparison
151
152
```java { .api }
153
// Compare key-value pairs with delta tolerance
154
public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath,
155
String delimiter, double maxDelta) throws Exception;
156
public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath, String[] excludePrefixes,
157
String delimiter, double maxDelta) throws Exception;
158
```
159
160
**Usage Example:**
161
162
```java
163
// Compare floating-point results with tolerance
164
String expected = "pi,3.14159\ne,2.71828";
165
TestBaseUtils.compareKeyValuePairsWithDelta(expected, "/path/to/results", ",", 0.001);
166
```
167
168
## Utility Methods
169
170
### Test Configuration
171
172
```java { .api }
173
// Convert configurations to parameterized test parameters
174
protected static Collection<Object[]> toParameterList(Configuration... testConfigs);
175
protected static Collection<Object[]> toParameterList(List<Configuration> testConfigs);
176
177
// Set environment variables for testing
178
public static void setEnv(Map<String, String> newenv);
179
```
180
181
**Usage Example:**
182
183
```java
184
// Create parameterized test configurations
185
Configuration config1 = new Configuration();
186
config1.setString("key1", "value1");
187
188
Configuration config2 = new Configuration();
189
config2.setString("key2", "value2");
190
191
@Parameterized.Parameters
192
public static Collection<Object[]> getConfigurations() {
193
return TestBaseUtils.toParameterList(config1, config2);
194
}
195
196
// Set test environment variables
197
Map<String, String> testEnv = new HashMap<>();
198
testEnv.put("FLINK_HOME", "/test/flink");
199
TestBaseUtils.setEnv(testEnv);
200
```
201
202
### Path and URL Utilities
203
204
```java { .api }
205
// Construct test resource paths
206
public static String constructTestPath(Class<?> forClass, String folder);
207
public static String constructTestURI(Class<?> forClass, String folder);
208
209
// Fetch content from HTTP endpoint
210
public static String getFromHTTP(String url) throws Exception;
211
```
212
213
**Usage Example:**
214
215
```java
216
// Get test data path relative to test class
217
String testDataPath = TestBaseUtils.constructTestPath(MyTest.class, "testdata");
218
String inputFile = testDataPath + "/input.txt";
219
220
// Construct test resource URI
221
String testDataURI = TestBaseUtils.constructTestURI(MyTest.class, "resources");
222
223
// Fetch test data from web endpoint (for integration tests)
224
String jsonData = TestBaseUtils.getFromHTTP("http://localhost:8080/test-data");
225
```
226
227
## Helper Classes
228
229
### TupleComparator
230
231
```java { .api }
232
public static class TupleComparator<T extends Tuple> implements Comparator<T> {
233
// Compares tuples field by field for consistent ordering
234
}
235
```
236
237
**Usage Example:**
238
239
```java
240
// Sort tuple results for consistent comparison
241
List<Tuple2<String, Integer>> results = wordCounts.collect();
242
results.sort(new TestBaseUtils.TupleComparator<>());
243
244
// Now compare with expected results
245
String expected = "apple,1\nbanana,2\ncherry,3";
246
TestBaseUtils.compareResultAsTuples(results, expected);
247
```
248
249
## Testing Utilities
250
251
### CheckedThread
252
253
Thread utility that propagates exceptions from background threads to the main test thread.
254
255
```java { .api }
256
/**
257
* Thread that captures exceptions during execution and makes them available to the calling thread
258
*/
259
public abstract class CheckedThread extends Thread {
260
public CheckedThread();
261
public CheckedThread(String name);
262
263
// Main work method - implement this instead of run()
264
public abstract void go() throws Exception;
265
266
// Join thread and re-throw any exceptions that occurred
267
public void sync() throws Exception;
268
}
269
```
270
271
**Usage Example:**
272
273
```java
274
@Test
275
public void testConcurrentOperations() throws Exception {
276
CheckedThread worker = new CheckedThread("test-worker") {
277
@Override
278
public void go() throws Exception {
279
// This code runs in background thread
280
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
281
DataSet<String> result = env.fromElements("test").map(String::toUpperCase);
282
List<String> output = result.collect();
283
284
// Any exceptions thrown here will be propagated to main thread
285
assertEquals("Unexpected result", "TEST", output.get(0));
286
}
287
};
288
289
worker.start();
290
291
// This will throw any exception that occurred in the background thread
292
worker.sync();
293
}
294
```
295
296
### Retry Mechanisms
297
298
JUnit rules and annotations for automatically retrying failed tests.
299
300
#### RetryRule
301
302
```java { .api }
303
/**
304
* JUnit rule that enables automatic test retries based on annotations
305
*/
306
public class RetryRule implements TestRule {
307
public RetryRule();
308
public Statement apply(Statement base, Description description);
309
}
310
```
311
312
#### RetryOnFailure
313
314
```java { .api }
315
/**
316
* Annotation to retry tests that fail for any reason
317
*/
318
public @interface RetryOnFailure {
319
int times(); // Number of retry attempts
320
}
321
```
322
323
#### RetryOnException
324
325
```java { .api }
326
/**
327
* Annotation to retry tests that fail with specific exception types
328
*/
329
public @interface RetryOnException {
330
int times(); // Number of retry attempts
331
Class<? extends Throwable> exception(); // Exception type to retry on
332
}
333
```
334
335
**Usage Example:**
336
337
```java
338
public class FlakeyTest {
339
@Rule
340
public RetryRule retryRule = new RetryRule();
341
342
@Test
343
@RetryOnFailure(times = 3)
344
public void testUnstableOperation() throws Exception {
345
// Test that might fail due to timing issues
346
// Will be retried up to 3 times if it fails
347
performUnstableOperation();
348
}
349
350
@Test
351
@RetryOnException(times = 2, exception = ConnectException.class)
352
public void testNetworkOperation() throws Exception {
353
// Test that might fail due to network connectivity
354
// Will be retried up to 2 times if ConnectException occurs
355
connectToExternalService();
356
}
357
}
358
```
359
360
### TestLogger
361
362
Base class that provides automatic logging for test execution.
363
364
```java { .api }
365
/**
366
* Base test class with automatic test lifecycle logging
367
*/
368
public class TestLogger {
369
protected final Logger log; // Logger instance for test class
370
371
@Rule
372
public TestRule watchman; // Automatic test logging rule
373
}
374
```
375
376
**Usage Example:**
377
378
```java
379
public class MyTest extends TestLogger {
380
@Test
381
public void testWithLogging() {
382
// Logging is automatically available
383
log.info("Starting test execution");
384
385
// Test logic here
386
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
387
// ...
388
389
log.info("Test completed successfully");
390
// Test start/end automatically logged by watchman rule
391
}
392
}
393
```
394
395
### Common Test Utilities
396
397
```java { .api }
398
/**
399
* Common testing utilities for environment setup and validation
400
*/
401
public class CommonTestUtils {
402
// Environment utilities
403
public static String getTempDir();
404
public static void setEnv(Map<String, String> newenv);
405
public static void setEnv(Map<String, String> newenv, boolean clearPrevious);
406
407
// Serialization testing
408
public static <T extends Serializable> T createCopySerializable(T original);
409
410
// File utilities
411
public static String createTempFile(String contents);
412
413
// Threading utilities
414
public static void blockForeverNonInterruptibly();
415
416
// Environment checks
417
public static void assumeJava8();
418
419
// Exception utilities
420
public static boolean containsCause(Throwable throwable, Class<? extends Throwable> causeType);
421
}
422
```
423
424
**Usage Example:**
425
426
```java
427
@Test
428
public void testSerializationRoundtrip() throws Exception {
429
MySerializableClass original = new MySerializableClass("test");
430
431
// Test that object survives serialization/deserialization
432
MySerializableClass copy = CommonTestUtils.createCopySerializable(original);
433
assertEquals("Data should survive serialization", original.getData(), copy.getData());
434
}
435
436
@Test
437
public void testWithTempEnvironment() throws Exception {
438
// Set up temporary environment variables
439
Map<String, String> testEnv = new HashMap<>();
440
testEnv.put("TEST_MODE", "true");
441
testEnv.put("LOG_LEVEL", "DEBUG");
442
443
CommonTestUtils.setEnv(testEnv);
444
445
// Run test with modified environment
446
// Environment will be restored after test
447
}
448
```
449
450
## Common Testing Patterns
451
452
### Complete Test Setup
453
454
```java
455
public class IntegrationTest {
456
private LocalFlinkMiniCluster cluster;
457
458
@Before
459
public void setup() throws Exception {
460
cluster = TestBaseUtils.startCluster(1, 4, false, false, true);
461
}
462
463
@After
464
public void teardown() throws Exception {
465
if (cluster != null) {
466
TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
467
}
468
}
469
470
@Test
471
public void testJobExecution() throws Exception {
472
TestEnvironment testEnv = new TestEnvironment(cluster, 4, false);
473
testEnv.setAsContext();
474
475
try {
476
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
477
DataSet<String> result = env.fromElements("test").map(String::toUpperCase);
478
List<String> output = result.collect();
479
480
TestBaseUtils.compareResultAsText(output, "TEST");
481
} finally {
482
TestEnvironment.unsetAsContext();
483
}
484
}
485
}
486
```
487
488
### File-based Testing
489
490
```java
491
@Test
492
public void testFileProcessing() throws Exception {
493
// Write test input
494
String inputPath = createTempFile("input.txt", "line1\nline2\nline3");
495
String outputPath = getTempDirPath("output");
496
497
// Run job
498
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
499
env.readTextFile(inputPath)
500
.map(String::toUpperCase)
501
.writeAsText(outputPath);
502
env.execute();
503
504
// Validate results
505
String expected = "LINE1\nLINE2\nLINE3";
506
TestBaseUtils.compareResultsByLinesInMemory(expected, outputPath);
507
}
508
```