Comprehensive testing utilities for Apache Flink stream processing framework
npx @tessl/cli install tessl/maven-org-apache-flink--flink-test-utils-parent@2.1.00
# Apache Flink Test Utils Parent
1
2
Apache Flink Test Utils Parent is a comprehensive collection of testing utilities for Apache Flink stream processing framework applications. This multi-module Maven project provides everything needed to test Flink applications effectively, from basic unit testing with synchronization utilities to complex connector testing, migration testing, and specialized testing for different Flink components.
3
4
## Package Information
5
6
- **Package Name**: flink-test-utils-parent
7
- **Package Type**: Maven (multi-module parent)
8
- **Language**: Java
9
- **Installation**:
10
```xml
11
<!-- Core testing utilities -->
12
<dependency>
13
<groupId>org.apache.flink</groupId>
14
<artifactId>flink-test-utils</artifactId>
15
<version>2.1.0</version>
16
<scope>test</scope>
17
</dependency>
18
19
<!-- JUnit integration -->
20
<dependency>
21
<groupId>org.apache.flink</groupId>
22
<artifactId>flink-test-utils-junit</artifactId>
23
<version>2.1.0</version>
24
<scope>test</scope>
25
</dependency>
26
27
<!-- Connector testing framework -->
28
<dependency>
29
<groupId>org.apache.flink</groupId>
30
<artifactId>flink-connector-test-utils</artifactId>
31
<version>2.1.0</version>
32
<scope>test</scope>
33
</dependency>
34
35
<!-- Migration testing -->
36
<dependency>
37
<groupId>org.apache.flink</groupId>
38
<artifactId>flink-migration-test-utils</artifactId>
39
<version>2.1.0</version>
40
<scope>test</scope>
41
</dependency>
42
43
<!-- Client testing -->
44
<dependency>
45
<groupId>org.apache.flink</groupId>
46
<artifactId>flink-clients-test-utils</artifactId>
47
<version>2.1.0</version>
48
<scope>test</scope>
49
</dependency>
50
51
<!-- Table filesystem testing -->
52
<dependency>
53
<groupId>org.apache.flink</groupId>
54
<artifactId>flink-table-filesystem-test-utils</artifactId>
55
<version>2.1.0</version>
56
<scope>test</scope>
57
</dependency>
58
```
59
60
## Core Imports
61
62
```java
63
// Core test synchronization (flink-test-utils-junit)
64
import org.apache.flink.core.testutils.OneShotLatch;
65
import org.apache.flink.core.testutils.CheckedThread;
66
import org.apache.flink.test.junit5.MiniClusterExtension;
67
68
// Test utilities (flink-test-utils)
69
import org.apache.flink.streaming.util.TestStreamEnvironment;
70
import org.apache.flink.streaming.util.FiniteTestSource;
71
import org.apache.flink.streaming.util.TestListResultSink;
72
import org.apache.flink.util.MetricListener;
73
import org.apache.flink.util.MetricAssertions;
74
75
// JUnit 5 integration (flink-test-utils-junit)
76
import org.apache.flink.testutils.junit.extensions.TestLoggerExtension;
77
import org.apache.flink.testutils.junit.extensions.RetryExtension;
78
import org.apache.flink.core.testutils.FlinkAssertions;
79
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
80
81
// Connector testing framework (flink-connector-test-utils)
82
import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
83
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
84
import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
85
import org.apache.flink.connector.testframe.environment.TestEnvironment;
86
import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
87
88
// Migration testing (flink-migration-test-utils)
89
import org.apache.flink.test.migration.MigrationTest;
90
import org.apache.flink.test.migration.PublishedVersionUtils;
91
92
// Client testing (flink-clients-test-utils)
93
import org.apache.flink.client.testjar.TestUserClassLoaderJob;
94
95
// Table filesystem testing (flink-table-filesystem-test-utils)
96
import org.apache.flink.table.file.testutils.TestFileSystemTableFactory;
97
import org.apache.flink.table.file.testutils.catalog.TestFileSystemCatalog;
98
```
99
100
## Basic Usage
101
102
```java
103
import org.apache.flink.core.testutils.OneShotLatch;
104
import org.apache.flink.streaming.util.TestStreamEnvironment;
105
import org.apache.flink.streaming.util.FiniteTestSource;
106
import org.apache.flink.streaming.util.TestListResultSink;
107
import org.apache.flink.test.junit5.MiniClusterExtension;
108
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
109
import org.junit.jupiter.api.Test;
110
import org.junit.jupiter.api.extension.RegisterExtension;
111
112
public class FlinkTestExample {
113
@RegisterExtension
114
static final MiniClusterExtension MINI_CLUSTER = new MiniClusterExtension(
115
new MiniClusterResourceConfiguration.Builder()
116
.setNumberTaskManagers(1)
117
.setNumberSlotsPerTaskManager(4)
118
.build());
119
120
@Test
121
public void testWithSynchronization() throws InterruptedException {
122
OneShotLatch latch = new OneShotLatch();
123
124
// Start background task
125
Thread worker = new Thread(() -> {
126
// Do some work
127
latch.trigger(); // Signal completion
128
});
129
worker.start();
130
131
// Wait for completion
132
latch.await();
133
worker.join();
134
}
135
136
@Test
137
public void testStreamingPipeline() throws Exception {
138
// Set up test environment
139
TestStreamEnvironment.setAsContext(MINI_CLUSTER.getMiniCluster(), 1);
140
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
141
142
// Create test source with finite data
143
FiniteTestSource<String> source = new FiniteTestSource<>("hello", "world", "test");
144
DataStream<String> stream = env.addSource(source);
145
146
// Create test sink to collect results
147
TestListResultSink<String> sink = new TestListResultSink<>();
148
stream.addSink(sink);
149
150
// Execute and verify results
151
env.execute("Test Job");
152
List<String> results = sink.getResult();
153
assertEquals(3, results.size());
154
assertTrue(results.contains("hello"));
155
}
156
}
157
```
158
159
## Architecture
160
161
The Flink Test Utils Parent is organized into six specialized modules, each targeting specific testing needs:
162
163
- **Core Testing Foundation**: Basic synchronization primitives, assertion utilities, and JUnit extensions
164
- **Test Environment Management**: Stream environments, test data sources, and execution contexts
165
- **Connector Testing Framework**: Comprehensive testing infrastructure for Flink connectors
166
- **Migration & Compatibility**: Tools for testing state migration and version compatibility
167
- **Specialized Testing**: Table API filesystem testing and client application testing
168
- **Test Data & Utilities**: Pre-built test datasets and utility functions
169
170
## Capabilities
171
172
### Core Testing and Synchronization
173
174
Essential testing utilities including thread synchronization, test assertions, and JUnit integration. Provides the foundation for reliable Flink unit tests.
175
176
```java { .api }
177
// Thread synchronization (flink-test-utils-junit)
178
class OneShotLatch {
179
void trigger();
180
void await() throws InterruptedException;
181
boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException;
182
void awaitQuietly();
183
boolean isTriggered();
184
void reset();
185
}
186
187
abstract class CheckedThread extends Thread {
188
abstract void go() throws Exception;
189
void sync() throws Exception;
190
void sync(long timeoutMillis) throws Exception;
191
}
192
193
// Enhanced assertions (flink-test-utils-junit)
194
class FlinkAssertions {
195
static <T> FlinkCompletableFutureAssert<T> assertThatFuture(CompletableFuture<T> actual);
196
static Stream<Throwable> chainOfCauses(Throwable throwable);
197
}
198
199
class FlinkCompletableFutureAssert<T> extends AbstractCompletableFutureAssert<FlinkCompletableFutureAssert<T>, CompletableFuture<T>, T> {
200
FlinkCompletableFutureAssert<T> eventuallySucceeds();
201
FlinkCompletableFutureAssert<T> eventuallyFails();
202
FlinkCompletableFutureAssert<T> eventuallySucceeds(T expectedValue);
203
}
204
205
// Manual executor (flink-test-utils-junit)
206
class ManuallyTriggeredScheduledExecutorService implements ScheduledExecutorService {
207
void triggerAll();
208
void triggerScheduledTasks();
209
void triggerPeriodicScheduledTasks();
210
Collection<ScheduledTask<?>> getScheduledTasks();
211
int getNumQueuedRunnables();
212
}
213
```
214
215
[Core Testing and Synchronization](./core-testing.md)
216
217
### Test Environments and Data Sources
218
219
Test execution environments, data sources, and utilities for creating controlled testing scenarios in Flink applications.
220
221
```java { .api }
222
// Test environments (flink-test-utils)
223
class TestStreamEnvironment extends StreamExecutionEnvironment {
224
TestStreamEnvironment(MiniCluster miniCluster, int parallelism);
225
TestStreamEnvironment(MiniCluster miniCluster, Configuration config, int parallelism,
226
Collection<Path> jarFiles, Collection<URL> classPaths);
227
static void setAsContext(MiniCluster miniCluster, int parallelism);
228
static void setAsContext(MiniCluster miniCluster, int parallelism,
229
Collection<Path> jarFiles, Collection<URL> classpaths);
230
static void unsetAsContext();
231
JobExecutionResult getLastJobExecutionResult();
232
}
233
234
// MiniCluster extension (flink-test-utils)
235
class MiniClusterExtension implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback,
236
AfterEachCallback, ParameterResolver {
237
MiniClusterExtension(MiniClusterResourceConfiguration configuration);
238
ClusterClient<?> getClusterClient();
239
URI getRestAddress();
240
MiniCluster getMiniCluster();
241
int getNumberSlots();
242
}
243
244
// Test data sources (flink-test-utils)
245
class FiniteTestSource<T> implements SourceFunction<T>, CheckpointListener {
246
FiniteTestSource(T... elements);
247
FiniteTestSource(Iterable<T> elements);
248
FiniteTestSource(BooleanSupplier couldExit, Iterable<T> elements);
249
void run(SourceContext<T> ctx) throws Exception;
250
void cancel();
251
void notifyCheckpointComplete(long checkpointId) throws Exception;
252
}
253
254
// Test sinks (flink-test-utils)
255
class TestListResultSink<T> extends RichSinkFunction<T> {
256
TestListResultSink();
257
void invoke(T value) throws Exception;
258
List<T> getResult();
259
List<T> getSortedResult();
260
}
261
```
262
263
[Test Environments and Data Sources](./test-environments.md)
264
265
### Connector Testing Framework
266
267
Comprehensive testing framework for Flink connectors with support for external systems, multiple test environments, and automated test suites.
268
269
```java { .api }
270
// Test framework extension (flink-connector-test-utils)
271
@ExtendWith(ConnectorTestingExtension.class)
272
class ConnectorTestingExtension implements TestTemplateInvocationContextProvider {
273
boolean supportsTestTemplate(ExtensionContext context);
274
Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContexts(ExtensionContext context);
275
}
276
277
// Test environment interfaces (flink-connector-test-utils)
278
interface TestEnvironment extends TestResource {
279
JobExecutionResult executeJob(JobGraph job) throws Exception;
280
ClusterClient<?> getClusterClient();
281
String getRestAddress();
282
String getWebUIUrl();
283
}
284
285
interface TestResource extends AutoCloseable {
286
void startUp() throws Exception;
287
void tearDown() throws Exception;
288
}
289
290
// Test environment implementations (flink-connector-test-utils)
291
class MiniClusterTestEnvironment implements TestEnvironment, ClusterControllable {
292
MiniClusterTestEnvironment();
293
MiniClusterTestEnvironment(MiniClusterConfiguration config);
294
static Builder builder();
295
296
static class Builder {
297
Builder setParallelism(int parallelism);
298
Builder setCheckpointingEnabled(boolean enabled);
299
Builder setCheckpointInterval(Duration interval);
300
MiniClusterTestEnvironment build();
301
}
302
}
303
304
// Test suite base classes (flink-connector-test-utils)
305
abstract class SourceTestSuiteBase<T, SplitT extends SourceSplit> {
306
@TestTemplate void testSourceReading(TestEnvironment testEnv, ExternalContext<DataStreamSourceExternalContext<T>> externalContext);
307
@TestTemplate void testTaskManagerFailover(TestEnvironment testEnv, ExternalContext<DataStreamSourceExternalContext<T>> externalContext);
308
@TestTemplate void testJobManagerFailover(TestEnvironment testEnv, ExternalContext<DataStreamSourceExternalContext<T>> externalContext);
309
protected abstract ExternalContext<DataStreamSourceExternalContext<T>> sourceExternalContext();
310
protected abstract Source<T, SplitT, ?> source();
311
}
312
313
abstract class SinkTestSuiteBase<T> {
314
@TestTemplate void testSinkWriteWithSingleSubtask(TestEnvironment testEnv, ExternalContext<DataStreamSinkV2<T>> externalContext);
315
@TestTemplate void testSinkWriteWithMultipleSubtasks(TestEnvironment testEnv, ExternalContext<DataStreamSinkV2<T>> externalContext);
316
@TestTemplate void testScaleUpSinkWriter(TestEnvironment testEnv, ExternalContext<DataStreamSinkV2<T>> externalContext);
317
@TestTemplate void testScaleDownSinkWriter(TestEnvironment testEnv, ExternalContext<DataStreamSinkV2<T>> externalContext);
318
protected abstract ExternalContext<DataStreamSinkV2<T>> sinkExternalContext();
319
protected abstract List<T> generateTestData(TestingSinkSettings sinkSettings, ExternalContext<DataStreamSinkV2<T>> externalContext);
320
}
321
```
322
323
[Connector Testing Framework](./connector-testing.md)
324
325
### Migration and Compatibility Testing
326
327
Utilities for testing state migration between Flink versions and ensuring compatibility across version upgrades.
328
329
```java { .api }
330
// Migration testing (flink-migration-test-utils)
331
interface MigrationTest {
332
static FlinkVersion getMostRecentlyPublishedVersion();
333
334
@SnapshotsGenerator
335
@interface SnapshotsGenerator {}
336
337
@ParameterizedSnapshotsGenerator
338
@interface ParameterizedSnapshotsGenerator {
339
String value();
340
}
341
}
342
343
// Version utilities (flink-migration-test-utils)
344
class PublishedVersionUtils {
345
static FlinkVersion getMostRecentlyPublishedVersion();
346
static List<FlinkVersion> getPublishedVersions();
347
}
348
349
class SnapshotGeneratorUtils {
350
static void generateSnapshots(Class<?> testClass, FlinkVersion flinkVersion, String targetDir) throws Exception;
351
}
352
```
353
354
[Migration and Compatibility Testing](./migration-testing.md)
355
356
### Table API and Filesystem Testing
357
358
Specialized testing utilities for Flink Table API applications and filesystem-based connectors.
359
360
```java { .api }
361
// Filesystem table testing (flink-table-filesystem-test-utils)
362
class TestFileSystemTableFactory extends FileSystemTableFactory {
363
static final String IDENTIFIER = "test-filesystem";
364
String factoryIdentifier();
365
DynamicTableSource createDynamicTableSource(Context context);
366
DynamicTableSink createDynamicTableSink(Context context);
367
}
368
369
class TestFileSystemCatalog extends GenericInMemoryCatalog {
370
TestFileSystemCatalog(String catalogName, String defaultDatabaseName, String basePath);
371
static boolean isFileSystemTable(Map<String, String> options);
372
void createDatabase(String databaseName, CatalogDatabase database, boolean ignoreIfExists) throws Exception;
373
void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws Exception;
374
}
375
376
class TestFileSystemCatalogFactory implements CatalogFactory {
377
static final String IDENTIFIER = "test-filesystem-catalog";
378
String factoryIdentifier();
379
Catalog createCatalog(Context context);
380
}
381
```
382
383
[Table API and Filesystem Testing](./table-testing.md)
384
385
### Client Application Testing
386
387
Testing utilities specifically designed for Flink client applications, including classloader testing and job submission scenarios.
388
389
```java { .api }
390
// Client testing utilities (flink-clients-test-utils)
391
class TestUserClassLoaderJob {
392
static void main(String[] args) throws Exception;
393
static void executeWithCustomClassLoader(ClassLoader classLoader) throws Exception;
394
static boolean verifyClassLoading(String className, ClassLoader expectedLoader);
395
static JobExecutionResult getLastExecutionResult();
396
}
397
398
class TestUserClassLoaderAdditionalArtifact {
399
static void loadArtifact(String artifactPath, ClassLoader classLoader) throws Exception;
400
static boolean isArtifactAvailable(String artifactName, ClassLoader classLoader);
401
static ArtifactMetadata getArtifactMetadata(String artifactPath);
402
static List<String> resolveDependencies(String artifactPath);
403
}
404
405
class TestUserClassLoaderJobLib {
406
static void someLibMethod();
407
}
408
```
409
410
[Client Application Testing](./client-testing.md)
411
412
## Types
413
414
```java { .api }
415
// Core synchronization types (flink-test-utils-junit)
416
class OneShotLatch {
417
void trigger();
418
void await() throws InterruptedException;
419
boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException;
420
void awaitQuietly();
421
boolean isTriggered();
422
void reset();
423
}
424
425
abstract class CheckedThread extends Thread {
426
CheckedThread(String name);
427
abstract void go() throws Exception;
428
void sync() throws Exception;
429
Throwable getError();
430
}
431
432
// Test assertion types (flink-test-utils-junit)
433
class FlinkCompletableFutureAssert<T> extends AbstractCompletableFutureAssert<FlinkCompletableFutureAssert<T>, CompletableFuture<T>, T> {
434
FlinkCompletableFutureAssert<T> eventuallySucceeds();
435
FlinkCompletableFutureAssert<T> eventuallyFails();
436
FlinkCompletableFutureAssert<T> eventuallySucceeds(T expectedValue);
437
}
438
439
// Test environment interfaces (flink-connector-test-utils)
440
interface TestEnvironment extends TestResource {
441
JobExecutionResult executeJob(JobGraph job) throws Exception;
442
ClusterClient<?> getClusterClient();
443
String getRestAddress();
444
String getWebUIUrl();
445
}
446
447
interface TestResource extends AutoCloseable {
448
void startUp() throws Exception;
449
void tearDown() throws Exception;
450
void close() throws Exception;
451
}
452
453
interface ExternalContext<T> extends AutoCloseable {
454
void setUp() throws Exception;
455
void tearDown() throws Exception;
456
Properties getConnectionProperties();
457
String generateTestId();
458
}
459
460
// Test data interfaces (flink-connector-test-utils)
461
interface ExternalSystemDataReader<T> extends AutoCloseable {
462
List<T> readData() throws Exception;
463
List<T> readData(Duration timeout) throws Exception;
464
List<T> readData(Predicate<T> filter) throws Exception;
465
void close() throws Exception;
466
}
467
468
interface ExternalSystemSplitDataWriter<T> extends AutoCloseable {
469
void writeSplit(List<T> data, int splitIndex) throws Exception;
470
void writeAndFinalize(List<List<T>> splits) throws Exception;
471
int getMaxParallelism();
472
void close() throws Exception;
473
}
474
475
// Test source/sink types (flink-test-utils)
476
class FiniteTestSource<T> implements SourceFunction<T>, CheckpointListener {
477
FiniteTestSource(T... elements);
478
FiniteTestSource(Iterable<T> elements);
479
FiniteTestSource(BooleanSupplier couldExit, Iterable<T> elements);
480
FiniteTestSource(BooleanSupplier couldExit, long waitTimeOut, Iterable<T> elements);
481
void run(SourceContext<T> ctx) throws Exception;
482
void cancel();
483
void notifyCheckpointComplete(long checkpointId) throws Exception;
484
void notifyCheckpointAborted(long checkpointId);
485
}
486
487
class TestListResultSink<T> extends RichSinkFunction<T> {
488
TestListResultSink();
489
void invoke(T value) throws Exception;
490
List<T> getResult();
491
List<T> getSortedResult();
492
}
493
494
// Metric testing types (flink-test-utils)
495
class MetricListener {
496
MetricListener();
497
MetricGroup getMetricGroup();
498
<T extends Metric> Optional<T> getMetric(Class<T> metricType, String... identifier);
499
Optional<Meter> getMeter(String... identifier);
500
Optional<Counter> getCounter(String... identifier);
501
Optional<Histogram> getHistogram(String... identifier);
502
<T> Optional<Gauge<T>> getGauge(String... identifier);
503
}
504
505
// Migration testing types (flink-migration-test-utils)
506
interface MigrationTest {
507
static FlinkVersion getMostRecentlyPublishedVersion();
508
}
509
510
class FlinkVersion implements Comparable<FlinkVersion> {
511
int getMajor();
512
int getMinor();
513
int getPatch();
514
String toString();
515
boolean isNewerThan(FlinkVersion other);
516
int compareTo(FlinkVersion other);
517
}
518
519
// Client testing types (flink-clients-test-utils)
520
class ArtifactMetadata {
521
String getName();
522
String getVersion();
523
List<String> getDependencies();
524
Map<String, String> getManifestAttributes();
525
}
526
```