Testing utilities for Apache Flink stream processing framework providing test environments, base classes and sample data
npx @tessl/cli install tessl/maven-org-apache-flink--flink-test-utils_2-10@1.3.00
# Flink Test Utils
1
2
Flink Test Utils is a comprehensive testing library for Apache Flink applications providing test environments, base classes, utilities, and sample data for unit and integration testing of both streaming and batch processing applications.
3
4
## Package Information
5
6
- **Package Name**: flink-test-utils_2.10
7
- **Package Type**: Maven
8
- **Language**: Java (with Scala 2.10 support)
9
- **Installation**:
10
```xml
11
<dependency>
12
<groupId>org.apache.flink</groupId>
13
<artifactId>flink-test-utils_2.10</artifactId>
14
<version>1.3.3</version>
15
<scope>test</scope>
16
</dependency>
17
```
18
19
## Core Imports
20
21
```java
22
import org.apache.flink.test.util.*;
23
import org.apache.flink.streaming.util.*;
24
import org.apache.flink.test.testdata.*;
25
```
26
27
## Basic Usage
28
29
```java
30
// Basic test setup
31
public class MyFlinkTest extends JavaProgramTestBase {
32
@Override
33
protected void testProgram() throws Exception {
34
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
35
36
// Your test logic here
37
DataSet<String> input = env.fromElements("hello", "world");
38
DataSet<String> result = input.map(s -> s.toUpperCase());
39
40
// Collect and verify results
41
List<String> resultList = result.collect();
42
TestBaseUtils.compareResultAsText(resultList, "HELLO\\nWORLD");
43
}
44
}
45
```
46
47
## Architecture
48
49
The library is organized into three main areas:
50
51
- **Test Environments**: Execution environments for running Flink jobs in test contexts
52
- **Test Base Classes**: Abstract base classes providing standardized test patterns and cluster management
53
- **Test Utilities**: Helper classes for result validation, cluster management, and security testing
54
- **Sample Test Data**: Pre-built datasets for common algorithms and testing scenarios
55
56
## Capabilities
57
58
### Test Environment Management
59
60
Provides specialized execution environments for testing Flink applications in controlled environments.
61
62
```java { .api }
63
// Batch job testing environment
64
public class TestEnvironment extends ExecutionEnvironment {
65
public TestEnvironment(LocalFlinkMiniCluster miniCluster, int parallelism, boolean isObjectReuseEnabled);
66
public JobExecutionResult execute(String jobName) throws Exception;
67
public static void setAsContext(LocalFlinkMiniCluster miniCluster, int parallelism);
68
}
69
70
// Streaming job testing environment
71
public class TestStreamEnvironment extends StreamExecutionEnvironment {
72
public TestStreamEnvironment(LocalFlinkMiniCluster miniCluster, int parallelism);
73
public JobExecutionResult execute(String jobName) throws Exception;
74
public static void setAsContext(LocalFlinkMiniCluster cluster, int parallelism);
75
}
76
```
77
78
[Test Environments](./test-environments.md)
79
80
### Test Base Classes
81
82
Abstract base classes that provide standardized testing patterns, cluster lifecycle management, and parameterized testing across multiple execution modes.
83
84
```java { .api }
85
// Base class for batch program tests
86
public abstract class JavaProgramTestBase extends AbstractTestBase {
87
public JavaProgramTestBase();
88
protected abstract void testProgram() throws Exception;
89
public void setParallelism(int parallelism);
90
}
91
92
// Base class for streaming program tests
93
public abstract class StreamingProgramTestBase extends AbstractTestBase {
94
protected abstract void testProgram() throws Exception;
95
public void setParallelism(int parallelism);
96
}
97
98
// Base class for multiple program tests
99
public class MultipleProgramsTestBase extends TestBaseUtils {
100
public MultipleProgramsTestBase(TestExecutionMode mode);
101
}
102
```
103
104
[Test Base Classes](./test-base-classes.md)
105
106
### Test Utilities and Result Validation
107
108
Comprehensive utilities for cluster management, result comparison, test data handling, and specialized testing utilities.
109
110
```java { .api }
111
public class TestBaseUtils extends TestLogger {
112
// Cluster management
113
public static LocalFlinkMiniCluster startCluster(int numTaskManagers, int taskManagerNumSlots,
114
boolean startWebserver, boolean startZooKeeper,
115
boolean singleActorSystem) throws Exception;
116
117
// Result validation and comparison
118
public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath) throws Exception;
119
public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath) throws Exception;
120
public static <T> void compareResultAsTuples(List<T> result, String expected);
121
public static <T> void compareResultAsText(List<T> result, String expected);
122
public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath, String delimiter, double maxDelta) throws Exception;
123
public static void checkLinesAgainstRegexp(String resultPath, String regexp);
124
125
// File I/O utilities
126
public static BufferedReader[] getResultReader(String resultPath) throws IOException;
127
public static void readAllResultLines(List<String> target, String resultPath) throws IOException;
128
129
// Utility methods
130
public static String constructTestPath(Class<?> forClass, String folder);
131
public static String getFromHTTP(String url) throws Exception;
132
}
133
134
// Specialized testing utilities
135
public abstract class CheckedThread extends Thread {
136
public abstract void go() throws Exception;
137
public void sync() throws Exception;
138
}
139
140
public class RetryRule implements TestRule {
141
// JUnit rule for automatic test retries
142
}
143
144
public class CommonTestUtils {
145
public static <T extends Serializable> T createCopySerializable(T original);
146
public static void setEnv(Map<String, String> newenv);
147
}
148
```
149
150
[Test Utilities](./test-utilities.md)
151
152
### Security Testing
153
154
Support for testing Flink applications with security features enabled, including Kerberos authentication via MiniKDC.
155
156
```java { .api }
157
public class SecureTestEnvironment {
158
public static void prepare(TemporaryFolder tempFolder);
159
public static void cleanup();
160
public static Configuration populateFlinkSecureConfigurations(Configuration flinkConf);
161
}
162
163
public class TestingSecurityContext {
164
public static void install(SecurityConfiguration config,
165
Map<String, ClientSecurityConfiguration> clientSecurityConfigurationMap) throws Exception;
166
}
167
```
168
169
[Security Testing](./security-testing.md)
170
171
### Sample Test Data
172
173
Pre-built datasets and validation utilities for common Flink algorithms and testing scenarios.
174
175
```java { .api }
176
// PageRank test data
177
public class PageRankData {
178
public static final String VERTICES;
179
public static final String EDGES;
180
public static final String RANKS_AFTER_3_ITERATIONS;
181
}
182
183
// K-Means clustering test data
184
public class KMeansData {
185
public static final String DATAPOINTS;
186
public static final String INITIAL_CENTERS;
187
public static void checkResultsWithDelta(String expectedResults, List<String> resultLines, double maxDelta);
188
}
189
```
190
191
[Sample Test Data](./test-data.md)
192
193
## Common Types
194
195
```java { .api }
196
// Test execution modes
197
public enum TestExecutionMode {
198
CLUSTER,
199
CLUSTER_OBJECT_REUSE,
200
COLLECTION
201
}
202
203
// Security configuration for testing
204
public static class ClientSecurityConfiguration {
205
public String getPrincipal();
206
public String getKeytab();
207
public ClientSecurityConfiguration(String principal, String keytab);
208
}
209
210
// Test annotations for retry mechanisms
211
public @interface RetryOnFailure {
212
int times();
213
}
214
215
public @interface RetryOnException {
216
int times();
217
Class<? extends Throwable> exception();
218
}
219
220
// Local Flink cluster for testing
221
public class LocalFlinkMiniCluster {
222
// Mini cluster used by all test environments
223
}
224
225
// Duration utilities
226
public class FiniteDuration {
227
// Timeout specifications for cluster operations
228
}
229
```