Comprehensive testing utilities for Apache Flink stream and batch processing applications
npx @tessl/cli install tessl/maven-org-apache-flink--flink-test-utils@2.1.00
# Flink Test Utils
1
2
Comprehensive testing utilities for Apache Flink stream and batch processing applications. This library provides essential components like MiniCluster management for JUnit integration, result verification utilities, metrics testing, specialized test connectors, and sample data sets designed for maximum reusability across Flink application testing, integration testing frameworks, performance benchmarking, and continuous integration pipelines.
3
4
## Package Information
5
6
- **Package Name**: flink-test-utils
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Group ID**: org.apache.flink
10
- **Artifact ID**: flink-test-utils
11
- **Version**: 2.1.0
12
- **Installation**: Maven dependency in pom.xml:
13
14
```xml
15
<dependency>
16
<groupId>org.apache.flink</groupId>
17
<artifactId>flink-test-utils</artifactId>
18
<version>2.1.0</version>
19
<scope>test</scope>
20
</dependency>
21
```
22
23
## Core Imports
24
25
Basic test base classes:
26
27
```java
28
import org.apache.flink.test.util.AbstractTestBase;
29
import org.apache.flink.test.util.TestBaseUtils;
30
```
31
32
JUnit 5 extensions:
33
34
```java
35
import org.apache.flink.test.junit5.MiniClusterExtension;
36
import org.apache.flink.test.junit5.InjectClusterClient;
37
```
38
39
Streaming test utilities:
40
41
```java
42
import org.apache.flink.streaming.util.TestStreamEnvironment;
43
import org.apache.flink.streaming.util.FiniteTestSource;
44
```
45
46
## Basic Usage
47
48
Setting up a basic Flink test with MiniCluster:
49
50
```java
51
import org.apache.flink.test.junit5.MiniClusterExtension;
52
import org.apache.flink.streaming.util.TestStreamEnvironment;
53
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
54
import org.junit.jupiter.api.Test;
55
import org.junit.jupiter.api.extension.RegisterExtension;
56
57
public class MyFlinkTest {
58
59
@RegisterExtension
60
static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
61
new MiniClusterExtension();
62
63
@Test
64
void testStreamingJob() throws Exception {
65
StreamExecutionEnvironment env =
66
MINI_CLUSTER_EXTENSION.getTestStreamEnvironment();
67
68
// Create your streaming job
69
env.fromElements(1, 2, 3, 4, 5)
70
.map(x -> x * 2)
71
.print();
72
73
// Execute and verify
74
env.execute("Test Job");
75
}
76
}
77
```
78
79
Verifying test results:
80
81
```java
82
import org.apache.flink.test.util.TestBaseUtils;
83
import java.util.Arrays;
84
import java.util.List;
85
86
// Compare collections
87
List<String> expected = Arrays.asList("a", "b", "c");
88
List<String> actual = getJobResults();
89
TestBaseUtils.compareResultCollections(expected, actual, String::compareTo);
90
91
// Compare with file output
92
TestBaseUtils.compareResultsByLinesInMemory("a\nb\nc", outputFilePath);
93
```
94
95
## Architecture
96
97
The flink-test-utils library is organized around several key architectural components:
98
99
- **Test Base Classes**: Provide MiniCluster integration and temporary file management for both JUnit 4 and JUnit 5
100
- **Result Verification**: Comprehensive utilities for comparing results across various formats (text, tuples, collections)
101
- **Metrics Testing**: Fluent assertions and listeners for validating Flink metrics
102
- **Test Environments**: Specialized environments for streaming and batch testing with configurable execution modes
103
- **Sample Data**: Predefined datasets for common algorithm testing scenarios
104
105
This design enables thorough testing of complex distributed streaming applications while ensuring compatibility with Flink's runtime components and execution models.
106
107
## Capabilities
108
109
### MiniCluster Management
110
111
JUnit integration for managing Flink MiniCluster lifecycle with support for both JUnit 4 and JUnit 5, including parameter injection for cluster clients and configuration randomization.
112
113
```java { .api }
114
@RegisterExtension
115
static final MiniClusterExtension MINI_CLUSTER_EXTENSION = new MiniClusterExtension();
116
117
@Test
118
void testWithCluster(@InjectClusterClient ClusterClient<?> client) { }
119
```
120
121
[MiniCluster Management](./minicluster-management.md)
122
123
### Result Verification
124
125
Comprehensive utilities for comparing and validating test results across various formats including text files, collections, tuples, and numeric data with delta comparisons.
126
127
```java { .api }
128
public static <T> void compareResultCollections(
129
List<T> expected, List<T> actual, Comparator<T> comparator);
130
131
public static void compareResultsByLinesInMemory(
132
String expected, String resultPath);
133
134
public static void compareKeyValuePairsWithDelta(
135
String expected, String resultPath, String delimiter, double maxDelta);
136
```
137
138
[Result Verification](./result-verification.md)
139
140
### Test Environments
141
142
Specialized execution environments for streaming and batch testing with multiple execution modes, object reuse configuration, and temporary file management.
143
144
```java { .api }
145
public class TestStreamEnvironment extends StreamExecutionEnvironment {
146
public TestStreamEnvironment(MiniCluster miniCluster, int parallelism);
147
public static void setAsContext(MiniCluster miniCluster, int parallelism);
148
public JobExecutionResult getLastJobExecutionResult();
149
}
150
```
151
152
[Test Environments](./test-environments.md)
153
154
### Metrics Testing
155
156
Fluent assertions and metric listeners for validating Flink metrics including counters, gauges, histograms, and meters with easy retrieval and verification capabilities.
157
158
```java { .api }
159
public static CounterAssert assertThatCounter(Metric actual);
160
public static <T> GaugeAssert<T> assertThatGauge(Metric actual);
161
162
public class MetricListener implements MetricReporter {
163
public <T extends Metric> Optional<T> getMetric(Class<T> metricType, String... identifier);
164
public Optional<Counter> getCounter(String... identifier);
165
}
166
```
167
168
[Metrics Testing](./metrics-testing.md)
169
170
### Test Data Sources
171
172
Finite test sources for controlled data emission with checkpoint coordination and sample datasets for common algorithms like PageRank, K-means, and Connected Components.
173
174
```java { .api }
175
public class FiniteTestSource<T> implements SourceFunction<T> {
176
public FiniteTestSource(T... elements);
177
public FiniteTestSource(Iterable<T> elements);
178
public FiniteTestSource(BooleanSupplier couldExit, Iterable<T> elements);
179
}
180
```
181
182
[Test Data Sources](./test-data-sources.md)
183
184
### Specialized Test Connectors
185
186
Test connectors for specific use cases including upsert operations testing with configurable serialization schemas and output file management.
187
188
```java { .api }
189
public static <IN> UpsertTestSinkBuilder<IN> builder();
190
191
public class UpsertTestSinkBuilder<IN> {
192
public UpsertTestSinkBuilder<IN> setOutputFile(File outputFile);
193
public UpsertTestSinkBuilder<IN> setKeySerializationSchema(SerializationSchema<IN> schema);
194
public UpsertTestSink<IN> build();
195
}
196
```
197
198
[Specialized Test Connectors](./specialized-connectors.md)
199
200
### Validation Utilities
201
202
Utilities for POJO serialization verification, JAR packaging validation, resource discovery, and parameter property handling to ensure comprehensive testing coverage.
203
204
```java { .api }
205
public static <T> void assertSerializedAsPojo(Class<T> clazz);
206
public static void assertJarContainsOnlyFilesMatching(Path jarPath, Collection<String> allowedPaths);
207
public static Path getResource(String resourceNameRegex);
208
```
209
210
[Validation Utilities](./validation-utilities.md)