Comprehensive testing infrastructure and utilities for Apache Flink stream processing framework
npx @tessl/cli install tessl/maven-org-apache-flink--flink-tests_2.11@1.8.00
# Flink Tests
1
2
The Flink Tests module provides comprehensive testing infrastructure and utilities for Apache Flink applications. This test-jar package offers data generators, test base classes, migration testing frameworks, and specialized utilities for testing stream processing applications with fault tolerance, state management, and performance validation.
3
4
## Package Information
5
6
- **Package Name**: flink-tests_2.11
7
- **Package Type**: maven
8
- **Language**: Java/Scala
9
- **Installation**:
10
```xml
11
<dependency>
12
<groupId>org.apache.flink</groupId>
13
<artifactId>flink-tests_2.11</artifactId>
14
<version>1.8.3</version>
15
<type>test-jar</type>
16
<scope>test</scope>
17
</dependency>
18
```
19
20
## Core Imports
21
22
```java
23
// Test data generation
24
import org.apache.flink.test.operators.util.CollectionDataSets;
25
import org.apache.flink.test.operators.util.ValueCollectionDataSets;
26
27
// Input format utilities
28
import org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat;
29
import org.apache.flink.test.util.InfiniteIntegerTupleInputFormat;
30
import org.apache.flink.test.util.InfiniteIntegerInputFormat;
31
32
// Migration testing
33
import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase;
34
import org.apache.flink.test.checkpointing.utils.MigrationTestUtils;
35
36
// Streaming utilities and fault injection
37
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
38
import org.apache.flink.test.streaming.runtime.util.TestListWrapper;
39
import org.apache.flink.test.checkpointing.utils.FailingSource;
40
import org.apache.flink.test.checkpointing.utils.ValidatingSink;
41
42
// Test base classes
43
import org.apache.flink.test.util.TestUtils;
44
import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
45
import org.apache.flink.test.recovery.SimpleRecoveryITCaseBase;
46
47
// State management testing
48
import org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
49
import org.apache.flink.test.state.operator.restore.AbstractKeyedOperatorRestoreTestBase;
50
import org.apache.flink.test.state.operator.restore.AbstractNonKeyedOperatorRestoreTestBase;
51
52
// Test functions
53
import org.apache.flink.test.testfunctions.Tokenizer;
54
```
55
56
## Basic Usage
57
58
```java
59
import org.apache.flink.api.java.ExecutionEnvironment;
60
import org.apache.flink.api.java.DataSet;
61
import org.apache.flink.api.java.tuple.Tuple3;
62
import org.apache.flink.test.operators.util.CollectionDataSets;
63
64
// Create test data for batch processing tests
65
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
66
DataSet<Tuple3<Integer, Long, String>> testData = CollectionDataSets.get3TupleDataSet(env);
67
68
// Use the data in your test
69
testData.map(/* your transformation */).collect();
70
```
71
72
```java
73
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
74
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
75
76
// Collect results in streaming tests
77
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
78
TestListResultSink<String> sink = new TestListResultSink<>();
79
80
dataStream.addSink(sink);
81
env.execute();
82
83
List<String> results = sink.getResult();
84
```
85
86
## Architecture
87
88
The Flink Tests module is organized around several key components:
89
90
- **Data Generation**: Standardized test datasets for various data types and structures
91
- **Migration Framework**: Complete infrastructure for testing savepoint and checkpoint migration across Flink versions
92
- **Streaming Utilities**: Thread-safe result collection and streaming-specific test helpers
93
- **Test Base Classes**: Abstract base classes providing common test patterns for different scenarios
94
- **Fault Tolerance Testing**: Specialized sources and sinks for testing failure scenarios and recovery
95
- **State Management Testing**: Framework for validating operator state restoration and migration
96
- **Performance Testing**: Manual programs for benchmarking and scalability testing
97
98
## Capabilities
99
100
### Test Data Generation
101
102
Comprehensive data generators providing standardized test datasets, input format utilities, and test functions for DataSet and DataStream operations. Includes tuple data, POJO data, nested structures, custom types, infinite data generators, and common transformation functions.
103
104
```java { .api }
105
// 3-tuple dataset with 21 records
106
public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env);
107
108
// Custom POJO dataset with 21 records
109
public static DataSet<CustomType> getCustomTypeDataSet(ExecutionEnvironment env);
110
111
// Small datasets for quick tests (3 records each)
112
public static DataSet<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(ExecutionEnvironment env);
113
114
// Input format utilities for continuous data generation
115
public class UniformIntTupleGeneratorInputFormat extends GenericInputFormat<Tuple2<Integer, Integer>> {
116
public UniformIntTupleGeneratorInputFormat(int numKeys, int numValsPerKey);
117
}
118
119
public class InfiniteIntegerInputFormat extends GenericInputFormat<Integer> {
120
public InfiniteIntegerInputFormat(boolean addDelay);
121
}
122
123
// Test functions for common transformations
124
public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
125
public void flatMap(String value, Collector<Tuple2<String, Integer>> out);
126
}
127
```
128
129
[Test Data Generation](./test-data-generation.md)
130
131
### Migration Testing Framework
132
133
Complete framework for testing savepoint and checkpoint migration across different Flink versions. Provides base classes, utilities, and pre-configured sources/sinks for migration validation.
134
135
```java { .api }
136
public abstract class SavepointMigrationTestBase extends TestBaseUtils {
137
protected String getResourceFilename(String filename);
138
protected void executeAndSavepoint(StreamExecutionEnvironment env, String savepointPath,
139
Tuple2<String, Integer>... expectedAccumulators) throws Exception;
140
protected void restoreAndExecute(StreamExecutionEnvironment env, String savepointPath,
141
Tuple2<String, Integer>... expectedAccumulators) throws Exception;
142
}
143
```
144
145
[Migration Testing Framework](./migration-testing.md)
146
147
### Streaming Test Utilities
148
149
Thread-safe utilities for collecting and validating results in streaming applications. Includes result sinks, test wrappers, fault injection sources, and streaming-specific helper functions.
150
151
```java { .api }
152
public class TestListResultSink<T> extends RichSinkFunction<T> {
153
public List<T> getResult();
154
public List<T> getSortedResult();
155
}
156
157
public class TestListWrapper {
158
public static TestListWrapper getInstance();
159
public int createList();
160
public List<Object> getList(int listId);
161
}
162
163
public class FailingSource<T> extends RichSourceFunction<T> {
164
public FailingSource(EventEmittingGenerator<T> generator, int failAfterElements);
165
public static interface EventEmittingGenerator<T> extends Serializable {
166
public void emitEvent(SourceContext<T> ctx, int eventSequenceNo);
167
}
168
}
169
170
public class ValidatingSink<T> extends RichSinkFunction<T> {
171
public ValidatingSink(ResultChecker<T> resultChecker, CountUpdater countUpdater);
172
public static interface ResultChecker<T> extends Serializable {
173
public boolean checkResult(T result);
174
}
175
public static interface CountUpdater extends Serializable {
176
public void updateCount(long count);
177
}
178
}
179
```
180
181
[Streaming Test Utilities](./streaming-utilities.md)
182
183
### Test Base Classes
184
185
Abstract base classes providing common patterns for different testing scenarios including cancellation testing, fault tolerance testing, and recovery testing.
186
187
```java { .api }
188
public abstract class StreamFaultToleranceTestBase extends TestLogger {
189
public static final int NUM_TASK_MANAGERS = 2;
190
public static final int NUM_TASK_SLOTS = 8;
191
public static final int PARALLELISM = 4;
192
193
public abstract void testProgram(StreamExecutionEnvironment env);
194
public abstract void postSubmit() throws Exception;
195
}
196
197
public abstract class SimpleRecoveryITCaseBase extends TestLogger {
198
protected abstract void executeRecoveryTest() throws Exception;
199
}
200
```
201
202
[Test Base Classes](./test-base-classes.md)
203
204
### Class Loading Test Programs
205
206
Complete programs for testing dynamic class loading, user code isolation, and class loading policies. Each program serves as a standalone test case for different class loading scenarios.
207
208
```java { .api }
209
public class StreamingProgram {
210
public static void main(String[] args) throws Exception;
211
}
212
213
public class CheckpointedStreamingProgram {
214
public static void main(String[] args) throws Exception;
215
}
216
217
public class KMeansForTest {
218
public static void main(String[] args) throws Exception;
219
public static class Point { /* 2D point representation */ }
220
public static class Centroid extends Point { /* cluster center */ }
221
}
222
```
223
224
[Class Loading Test Programs](./class-loading-programs.md)
225
226
### State Management Testing
227
228
Framework for testing operator state restoration and migration, including utilities for both keyed and non-keyed state scenarios.
229
230
```java { .api }
231
public enum ExecutionMode {
232
GENERATE, MIGRATE, RESTORE
233
}
234
235
public abstract class AbstractOperatorRestoreTestBase {
236
protected abstract StreamExecutionEnvironment createMigrationJob(String savepointPath) throws Exception;
237
protected abstract StreamExecutionEnvironment createRestoredJob(String savepointPath) throws Exception;
238
protected abstract String getMigrationSavepointName();
239
}
240
241
public abstract class AbstractKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {
242
// Specialized testing for keyed state operators
243
}
244
245
public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {
246
// Specialized testing for non-keyed state operators
247
}
248
249
public class KeyedJob {
250
public static void main(String[] args) throws Exception;
251
}
252
253
public class NonKeyedJob {
254
public static void main(String[] args) throws Exception;
255
}
256
```
257
258
[State Management Testing](./state-management-testing.md)
259
260
### Performance Testing Programs
261
262
Manual programs for performance benchmarking, scalability testing, and resource usage validation. These programs are designed for manual execution and analysis.
263
264
```java { .api }
265
public class MassiveStringSorting {
266
public static void main(String[] args) throws Exception;
267
}
268
269
public class StreamingScalabilityAndLatency {
270
public static void main(String[] args) throws Exception;
271
}
272
273
public class ReducePerformance {
274
public static void main(String[] args) throws Exception;
275
}
276
```
277
278
[Performance Testing Programs](./performance-testing.md)
279
280
## Common Types
281
282
```java { .api }
283
// Test data POJO
284
public static class CustomType {
285
public int myInt;
286
public long myLong;
287
public String myString;
288
289
public CustomType();
290
public CustomType(int i, long l, String s);
291
}
292
293
// Success indication exception
294
public class SuccessException extends RuntimeException {
295
public SuccessException();
296
}
297
298
// Simple integer wrapper for testing
299
public class IntType {
300
public int value;
301
public IntType(int value);
302
}
303
304
// Test enum
305
public enum Category {
306
CAT_A, CAT_B
307
}
308
```
309
310
## Error Handling
311
312
The testing framework includes specialized exceptions and error handling patterns:
313
314
- **SuccessException**: RuntimeException thrown to indicate successful test completion in scenarios where normal completion is not expected
315
- **Fault injection**: Controlled failure injection through FailingSource and related utilities
316
- **Validation failures**: Clear error reporting through ValidatingSink and migration test utilities
317
- **Timeout handling**: Configurable timeouts for long-running test operations
318
319
Tests should handle these exceptions appropriately and use the provided error injection mechanisms for fault tolerance testing.