0
# Core Testing and Synchronization
1
2
Essential testing utilities including thread synchronization, test assertions, and JUnit integration. These utilities provide the foundation for reliable Flink unit tests with deterministic behavior and enhanced assertion capabilities.
3
4
## Capabilities
5
6
### Thread Synchronization
7
8
Core synchronization primitives for coordinating test execution across multiple threads.
9
10
#### OneShotLatch
11
12
A synchronization latch that can be triggered once and allows multiple threads to wait for that trigger event.
13
14
```java { .api }
15
/**
16
* One-time synchronization latch for test coordination
17
* Fires once only, then remains triggered
18
*/
19
class OneShotLatch {
20
/** Fire the latch, releasing all waiting threads */
21
void trigger();
22
23
/** Wait until the latch is triggered */
24
void await() throws InterruptedException;
25
26
/** Wait until triggered with timeout */
27
void await(long timeout, TimeUnit timeUnit) throws InterruptedException;
28
29
/** Wait until triggered, converting InterruptedException to RuntimeException */
30
void awaitQuietly();
31
32
/** Check if the latch has been triggered */
33
boolean isTriggered();
34
35
/** Reset the latch to untriggered state */
36
void reset();
37
}
38
```
39
40
**Usage Examples:**
41
42
```java
43
import org.apache.flink.core.testutils.OneShotLatch;
44
45
// Basic synchronization
46
OneShotLatch latch = new OneShotLatch();
47
48
// In test thread: wait for background task
49
Thread worker = new Thread(() -> {
50
// Do some work
51
latch.trigger(); // Signal completion
52
});
53
worker.start();
54
latch.await(); // Wait for worker to complete
55
56
// With timeout
57
if (!latch.await(5, TimeUnit.SECONDS)) {
58
fail("Worker did not complete within timeout");
59
}
60
```
61
62
#### MultiShotLatch
63
64
A reusable synchronization latch that automatically resets after each await() call.
65
66
```java { .api }
67
/**
68
* Reusable synchronization latch that resets after each await
69
* Useful for repeated coordination patterns
70
*/
71
class MultiShotLatch {
72
/** Fire the latch for the next waiting thread */
73
void trigger();
74
75
/** Wait until triggered, then automatically reset */
76
void await() throws InterruptedException;
77
78
/** Check if the latch is currently triggered */
79
boolean isTriggered();
80
}
81
```
82
83
**Usage Examples:**
84
85
```java
86
import org.apache.flink.core.testutils.MultiShotLatch;
87
88
MultiShotLatch latch = new MultiShotLatch();
89
90
// Producer-consumer pattern
91
Thread producer = new Thread(() -> {
92
for (int i = 0; i < 10; i++) {
93
// Produce item
94
latch.trigger(); // Signal item ready
95
}
96
});
97
98
Thread consumer = new Thread(() -> {
99
for (int i = 0; i < 10; i++) {
100
latch.await(); // Wait for next item
101
// Consume item
102
}
103
});
104
```
105
106
#### CheckedThread
107
108
Thread wrapper that captures exceptions from the thread execution and re-throws them when joining.
109
110
```java { .api }
111
/**
112
* Thread that catches exceptions and re-throws them on join
113
* Useful for testing error conditions in background threads
114
*/
115
abstract class CheckedThread extends Thread {
116
/** Override this method instead of run() - exceptions will be captured */
117
abstract void go() throws Exception;
118
119
/** Join the thread and re-throw any captured exceptions */
120
void sync() throws Exception;
121
122
/** Join with timeout and re-throw any captured exceptions */
123
void sync(long timeoutMillis) throws Exception;
124
}
125
```
126
127
**Usage Examples:**
128
129
```java
130
import org.apache.flink.core.testutils.CheckedThread;
131
132
CheckedThread testThread = new CheckedThread() {
133
@Override
134
void go() throws Exception {
135
// Test logic that might throw exceptions
136
if (someCondition) {
137
throw new RuntimeException("Test failure");
138
}
139
}
140
};
141
142
testThread.start();
143
testThread.sync(); // Will re-throw any exceptions from go()
144
```
145
146
### Test Utilities
147
148
General utility methods for common testing scenarios and operations.
149
150
#### CommonTestUtils
151
152
Collection of static utility methods for common test operations.
153
154
```java { .api }
155
/**
156
* General utility methods for unit tests
157
*/
158
class CommonTestUtils {
159
/** Create a serialized copy of an object for testing serialization */
160
static <T> T createCopySerializable(T original) throws IOException, ClassNotFoundException;
161
162
/** Create a temporary file with the specified contents */
163
static String createTempFile(String contents) throws IOException;
164
165
/** Block the current thread permanently (for testing interruption) */
166
static void blockForeverNonInterruptibly();
167
168
/** Set environment variables for testing */
169
static void setEnv(Map<String, String> newenv);
170
171
/** Check if exception chain contains a specific cause type */
172
static boolean containsCause(Throwable throwable, Class<? extends Throwable> cause);
173
174
/** Wait until a condition becomes true or timeout expires */
175
static void waitUtil(Supplier<Boolean> condition, Duration timeout, String errorMsg)
176
throws Exception;
177
}
178
```
179
180
**Usage Examples:**
181
182
```java
183
import org.apache.flink.core.testutils.CommonTestUtils;
184
185
// Test serialization
186
MyObject original = new MyObject();
187
MyObject copy = CommonTestUtils.createCopySerializable(original);
188
assertEquals(original, copy);
189
190
// Wait for condition
191
CommonTestUtils.waitUtil(
192
() -> service.isReady(),
193
Duration.ofSeconds(10),
194
"Service did not become ready"
195
);
196
197
// Check exception chain
198
try {
199
riskyOperation();
200
} catch (Exception e) {
201
assertTrue(CommonTestUtils.containsCause(e, IOException.class));
202
}
203
```
204
205
### Enhanced Assertions
206
207
Flink-specific assertion utilities that extend AssertJ with specialized testing capabilities.
208
209
#### FlinkAssertions
210
211
Static factory methods for creating Flink-specific assertions.
212
213
```java { .api }
214
/**
215
* Enhanced AssertJ assertions for Flink testing
216
*/
217
class FlinkAssertions extends Assertions {
218
/** Create enhanced assertions for CompletableFuture */
219
static <T> FlinkCompletableFutureAssert<T> assertThatFuture(CompletableFuture<T> actual);
220
221
/** Create assertions for exception cause chains */
222
static ListAssert<Throwable> assertThatChainOfCauses(Throwable root);
223
224
/** Extract chain of causes from exception */
225
static Stream<Throwable> chainOfCauses(Throwable throwable);
226
}
227
```
228
229
#### FlinkCompletableFutureAssert
230
231
Enhanced CompletableFuture assertions that don't rely on timeouts.
232
233
```java { .api }
234
/**
235
* Enhanced CompletableFuture assertions without timeout dependencies
236
*/
237
class FlinkCompletableFutureAssert<T>
238
extends AbstractCompletableFutureAssert<FlinkCompletableFutureAssert<T>, CompletableFuture<T>, T> {
239
240
/** Assert that the future eventually succeeds */
241
FlinkCompletableFutureAssert<T> eventuallySucceeds();
242
243
/** Assert that the future eventually fails */
244
FlinkCompletableFutureAssert<T> eventuallyFails();
245
246
/** Assert that the future completes with a specific value */
247
FlinkCompletableFutureAssert<T> eventuallySucceeds(T expectedValue);
248
}
249
```
250
251
**Usage Examples:**
252
253
```java
254
import org.apache.flink.core.testutils.FlinkAssertions;
255
256
// Future assertions
257
CompletableFuture<String> result = asyncOperation();
258
FlinkAssertions.assertThatFuture(result)
259
.eventuallySucceeds()
260
.isEqualTo("expected result");
261
262
// Exception chain assertions
263
try {
264
complexOperation();
265
} catch (Exception e) {
266
FlinkAssertions.assertThatChainOfCauses(e)
267
.hasSize(3)
268
.extracting(Throwable::getClass)
269
.contains(IOException.class, RuntimeException.class);
270
}
271
```
272
273
### JUnit Integration
274
275
Specialized JUnit rules and extensions for Flink testing scenarios.
276
277
#### RetryRule and Annotations
278
279
JUnit rule for automatically retrying failed tests with configurable conditions.
280
281
```java { .api }
282
/**
283
* JUnit rule to retry failed tests
284
* Use with @RetryOnFailure or @RetryOnException annotations
285
*/
286
class RetryRule implements TestRule {
287
TestRule apply(Statement base, Description description);
288
}
289
290
/**
291
* Retry test on any failure
292
*/
293
@interface RetryOnFailure {
294
/** Number of retry attempts */
295
int times() default 3;
296
}
297
298
/**
299
* Retry test on specific exception types
300
*/
301
@interface RetryOnException {
302
/** Number of retry attempts */
303
int times() default 3;
304
305
/** Exception type to retry on */
306
Class<? extends Throwable> exception();
307
}
308
```
309
310
**Usage Examples:**
311
312
```java
313
import org.apache.flink.testutils.junit.RetryRule;
314
import org.apache.flink.testutils.junit.RetryOnFailure;
315
import org.apache.flink.testutils.junit.RetryOnException;
316
317
public class FlinkRetryTest {
318
@Rule
319
public RetryRule retryRule = new RetryRule();
320
321
@Test
322
@RetryOnFailure(times = 5)
323
public void testWithRetryOnAnyFailure() {
324
// Test that might fail intermittently
325
if (Math.random() < 0.5) {
326
fail("Random failure");
327
}
328
}
329
330
@Test
331
@RetryOnException(times = 3, exception = IOException.class)
332
public void testWithRetryOnSpecificException() throws IOException {
333
// Test that might throw IOException
334
if (networkUnavailable()) {
335
throw new IOException("Network error");
336
}
337
}
338
}
339
```
340
341
#### SharedObjects
342
343
JUnit rule for sharing objects across test methods within a test class.
344
345
```java { .api }
346
/**
347
* Share objects across test methods in the same test class
348
*/
349
class SharedObjects extends ExternalResource {
350
/** Add an object to be shared across test methods */
351
<T> SharedReference<T> add(T object);
352
}
353
354
/**
355
* Reference to a shared object
356
*/
357
interface SharedReference<T> {
358
T get();
359
}
360
```
361
362
**Usage Examples:**
363
364
```java
365
import org.apache.flink.testutils.junit.SharedObjects;
366
367
public class SharedObjectTest {
368
@Rule
369
public SharedObjects sharedObjects = new SharedObjects();
370
371
private SharedReference<ExpensiveResource> resource;
372
373
@Before
374
public void setup() {
375
if (resource == null) {
376
resource = sharedObjects.add(new ExpensiveResource());
377
}
378
}
379
380
@Test
381
public void test1() {
382
ExpensiveResource r = resource.get();
383
// Use shared resource
384
}
385
386
@Test
387
public void test2() {
388
ExpensiveResource r = resource.get(); // Same instance as test1
389
// Use shared resource
390
}
391
}
392
```
393
394
### Executor Testing
395
396
Utilities for testing with controlled thread execution and scheduling.
397
398
#### ManuallyTriggeredScheduledExecutorService
399
400
Executor service that requires manual triggering for deterministic testing.
401
402
```java { .api }
403
/**
404
* Manually controlled executor for deterministic testing
405
* Tasks are queued but not executed until manually triggered
406
*/
407
class ManuallyTriggeredScheduledExecutorService implements ScheduledExecutorService {
408
/** Trigger execution of all queued tasks */
409
void triggerAll();
410
411
/** Trigger execution of the next queued task */
412
void triggerNext();
413
414
/** Get number of queued tasks */
415
int getQueueSize();
416
417
/** Check if any tasks are queued */
418
boolean hasQueuedTasks();
419
}
420
```
421
422
**Usage Examples:**
423
424
```java
425
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
426
427
ManuallyTriggeredScheduledExecutorService executor =
428
new ManuallyTriggeredScheduledExecutorService();
429
430
// Schedule tasks
431
executor.schedule(() -> System.out.println("Task 1"), 1, TimeUnit.SECONDS);
432
executor.schedule(() -> System.out.println("Task 2"), 2, TimeUnit.SECONDS);
433
434
// Tasks are queued but not executed yet
435
assertEquals(2, executor.getQueueSize());
436
437
// Manually trigger execution
438
executor.triggerNext(); // Executes "Task 1"
439
executor.triggerAll(); // Executes remaining tasks
440
```
441
442
### Test Marker Annotations
443
444
Annotations for marking tests that have known compatibility issues with specific environments.
445
446
```java { .api }
447
/** Mark tests that fail on Java 11 */
448
@interface FailsOnJava11 {}
449
450
/** Mark tests that fail on Java 17 */
451
@interface FailsOnJava17 {}
452
453
/** Mark tests that fail with adaptive scheduler */
454
@interface FailsWithAdaptiveScheduler {}
455
```
456
457
**Usage Examples:**
458
459
```java
460
import org.apache.flink.testutils.junit.FailsOnJava11;
461
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
462
463
public class CompatibilityTest {
464
@Test
465
@FailsOnJava11
466
public void testThatFailsOnJava11() {
467
// Test implementation
468
}
469
470
@Test
471
@FailsWithAdaptiveScheduler
472
public void testThatFailsWithAdaptiveScheduler() {
473
// Test implementation
474
}
475
}