Comprehensive test library for Apache Flink stream processing framework with integration tests, test utilities, and end-to-end validation tests.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-tests@2.1.00
# Apache Flink Tests
1
2
Apache Flink Tests is a comprehensive test library providing reusable testing infrastructure, utilities, and frameworks for testing Apache Flink stream processing functionality. This module packages as a test-jar, making its test utilities available to other Flink modules for comprehensive validation of streaming, batch processing, state management, and fault tolerance features.
3
4
## Package Information
5
6
- **Package Name**: flink-tests
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**: Add as test-jar dependency in Maven projects
10
- **Coordinates**: `org.apache.flink:flink-tests:2.1.0`
11
12
## Core Imports
13
14
```java
15
// Test utilities and base classes
16
import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
17
import org.apache.flink.test.checkpointing.utils.MigrationTestUtils;
18
import org.apache.flink.test.recovery.SimpleRecoveryITCaseBase;
19
import org.apache.flink.test.util.JobGraphRunningUtil;
20
21
// Test data and operators
22
import org.apache.flink.test.operators.util.CollectionDataStreams;
23
import org.apache.flink.runtime.operators.lifecycle.graph.TestJobBuilders;
24
```
25
26
Maven dependency (test-jar):
27
28
```xml
29
<dependency>
30
<groupId>org.apache.flink</groupId>
31
<artifactId>flink-tests</artifactId>
32
<version>2.1.0</version>
33
<type>test-jar</type>
34
<scope>test</scope>
35
</dependency>
36
```
37
38
## Basic Usage
39
40
```java
41
import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
42
import org.apache.flink.test.operators.util.CollectionDataStreams;
43
import org.apache.flink.streaming.api.datastream.DataStreamSource;
44
import org.apache.flink.api.java.tuple.Tuple3;
45
46
// Example: Using standard test data
47
public class MyFlinkTest {
48
49
@Test
50
public void testWithStandardData() throws Exception {
51
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
52
53
DataStreamSource<Tuple3<Integer, Long, String>> testData =
54
CollectionDataStreams.get3TupleDataSet(env);
55
56
// Use test data in your Flink job
57
DataStream<Integer> result = testData
58
.map(t -> t.f0)
59
.filter(x -> x > 5);
60
61
// Add sink and execute
62
result.print();
63
env.execute("Test Job");
64
}
65
}
66
67
// Example: Testing migration scenarios
68
public class MigrationTest extends SnapshotMigrationTestBase {
69
70
@Test
71
public void testStateMigration() throws Exception {
72
// Create and execute job with state
73
JobGraph job = createJobWithOperatorState();
74
SnapshotSpec snapshot = executeAndSnapshot(job);
75
76
// Restore and validate in new version
77
JobGraph restoredJob = createUpdatedJob();
78
restoreAndExecute(restoredJob, snapshot);
79
}
80
}
81
```
82
83
## Architecture
84
85
Apache Flink Tests is organized around several key testing frameworks:
86
87
- **Test Infrastructure**: Base classes and utilities for setting up test environments
88
- **Migration Framework**: Comprehensive support for testing snapshot migration across Flink versions
89
- **Fault Tolerance Framework**: Failure injection mechanisms and recovery testing utilities
90
- **Operator Lifecycle Framework**: Complete framework for testing streaming operator behavior and lifecycle events
91
- **State Management Framework**: Extensive support for testing state backends, checkpointing, and state migration
92
- **Test Data Framework**: Standardized datasets, POJOs, and data generators for consistent testing across modules
93
94
## Capabilities
95
96
### Checkpointing and Migration Testing
97
98
Comprehensive framework for testing snapshot migration across Flink versions with utilities for state validation and checkpoint management.
99
100
```java { .api }
101
public abstract class SnapshotMigrationTestBase {
102
protected SnapshotSpec executeAndSnapshot(JobGraph job) throws Exception;
103
protected void restoreAndExecute(JobGraph job, SnapshotSpec snapshot) throws Exception;
104
105
public static class SnapshotSpec {
106
public String getSnapshotPath();
107
public String getSnapshotVersion();
108
}
109
}
110
```
111
112
[Checkpointing and Migration](./checkpointing-migration.md)
113
114
### Fault Tolerance and Recovery Testing
115
116
Multiple failure injection mechanisms and recovery testing utilities for validating Flink's fault tolerance capabilities.
117
118
```java { .api }
119
public abstract class SimpleRecoveryITCaseBase {
120
protected void runAndCancelJob(JobGraph jobGraph) throws Exception;
121
122
public static class FailingMapper1 implements MapFunction<Integer, Integer>;
123
public static class FailingMapper2 implements MapFunction<Integer, Integer>;
124
}
125
```
126
127
[Fault Tolerance and Recovery](./fault-tolerance-recovery.md)
128
129
### Operator Lifecycle Testing
130
131
Complete framework for testing streaming operator behavior including startup, checkpointing, finishing, and shutdown phases.
132
133
```java { .api }
134
public class TestJobBuilders {
135
public static final TestJobBuilder SIMPLE_GRAPH_BUILDER;
136
public static final TestJobBuilder COMPLEX_GRAPH_BUILDER;
137
}
138
139
public class OneInputTestStreamOperator extends AbstractStreamOperator<TestDataElement>
140
implements OneInputStreamOperator<TestDataElement, TestDataElement>;
141
142
public class TestEventQueue {
143
public void add(TestEvent event);
144
public List<TestEvent> getEvents();
145
}
146
```
147
148
[Operator Lifecycle Testing](./operator-lifecycle.md)
149
150
### State Backend and Operator Restore Testing
151
152
Framework for testing state backend switching, operator restore scenarios, and state migration validation.
153
154
```java { .api }
155
public abstract class AbstractOperatorRestoreTestBase {
156
protected void testRestore() throws Exception;
157
}
158
159
public abstract class SavepointStateBackendSwitchTestBase {
160
protected void testSwitchingStateBackend() throws Exception;
161
}
162
```
163
164
[State Backend and Restore Testing](./state-backend-restore.md)
165
166
### Standardized Test Data and Utilities
167
168
Reusable datasets, POJOs, and runtime utilities for consistent testing across Flink modules.
169
170
```java { .api }
171
public class CollectionDataStreams {
172
public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env);
173
public static DataSet<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(ExecutionEnvironment env);
174
}
175
176
public class JobGraphRunningUtil {
177
public static void execute(JobGraph jobGraph, MiniCluster miniCluster) throws Exception;
178
}
179
```
180
181
[Test Data and Utilities](./test-data-utilities.md)
182
183
### Cancellation Testing Framework
184
185
Framework for testing job cancellation scenarios and cleanup behavior.
186
187
```java { .api }
188
public abstract class CancelingTestBase {
189
protected void runAndCancelJob(JobGraph jobGraph, long cancelAfterMs) throws Exception;
190
}
191
```
192
193
[Cancellation Testing](./cancellation-testing.md)
194
195
### Session Window Testing Framework
196
197
Specialized testing framework for session window functionality with event generation and validation.
198
199
```java { .api }
200
public class EventGeneratorFactory {
201
public static SessionEventGenerator create(SessionConfiguration config);
202
}
203
204
public class SessionEvent {
205
public String getSessionId();
206
public long getTimestamp();
207
public TestEventPayload getPayload();
208
}
209
```
210
211
[Session Window Testing](./session-window-testing.md)
212
213
### Plugin Testing Framework
214
215
Framework for testing Flink's plugin system and service provider interface (SPI) implementations.
216
217
```java { .api }
218
public abstract class PluginTestBase {
219
protected void testPluginLoading() throws Exception;
220
}
221
```
222
223
[Plugin Testing](./plugin-testing.md)
224
225
### Runtime Utilities
226
227
Job execution utilities, process management, and common testing operations for controlled test environments.
228
229
```java { .api }
230
public class JobGraphRunningUtil {
231
public static void execute(JobGraph jobGraph, MiniCluster miniCluster) throws Exception;
232
public static JobExecutionResult executeWithTimeout(JobGraph jobGraph, MiniCluster miniCluster, long timeoutMs) throws Exception;
233
}
234
235
public class TestEnvironmentUtil {
236
public static Configuration createTestClusterConfig(int parallelism, int numTaskManagers);
237
public static StreamExecutionEnvironment createTestStreamEnv(int parallelism, boolean checkpointingEnabled);
238
}
239
```
240
241
[Runtime Utilities](./runtime-utilities.md)
242
243
## Types
244
245
```java { .api }
246
// Core test data types
247
public class CustomType {
248
public String myString;
249
public int myInt;
250
public CustomType(String myString, int myInt);
251
}
252
253
public class POJO {
254
public int number;
255
public String str;
256
public POJO();
257
public POJO(int number, String str);
258
}
259
260
public class NestedPojo {
261
public POJO nested;
262
public long longField;
263
public NestedPojo(POJO nested, long longField);
264
}
265
266
public class CrazyNested {
267
public NestedPojo nestedPojo;
268
public POJO simplePojo;
269
public String stringField;
270
public CrazyNested(NestedPojo nestedPojo, POJO simplePojo, String stringField);
271
}
272
273
public class PojoWithDateAndEnum {
274
public Date dateField;
275
public TestEnum enumField;
276
public String stringField;
277
public PojoWithDateAndEnum(Date dateField, TestEnum enumField, String stringField);
278
279
public enum TestEnum { VALUE1, VALUE2, VALUE3 }
280
}
281
282
public class PojoWithCollectionGeneric {
283
public List<String> stringList;
284
public Map<String, Integer> stringIntMap;
285
public Set<Long> longSet;
286
public PojoWithCollectionGeneric(List<String> stringList, Map<String, Integer> stringIntMap, Set<Long> longSet);
287
}
288
289
// Fault tolerance types
290
public class PrefixCount {
291
public String prefix;
292
public Integer value;
293
public Long count;
294
public PrefixCount(String prefix, Integer value, Long count);
295
}
296
297
// Migration testing types
298
public class SnapshotSpec {
299
public String getSnapshotPath();
300
public String getSnapshotVersion();
301
}
302
303
// Event system types
304
public interface TestEvent {
305
String getOperatorId();
306
long getTimestamp();
307
}
308
309
public class OperatorStartedEvent implements TestEvent;
310
public class OperatorFinishedEvent implements TestEvent;
311
public class CheckpointStartedEvent implements TestEvent;
312
public class CheckpointCompletedEvent implements TestEvent;
313
public class InputEndedEvent implements TestEvent;
314
public class WatermarkReceivedEvent implements TestEvent;
315
public class TestCommandAckEvent implements TestEvent;
316
317
// Command system types
318
public interface TestCommand {
319
void execute(StreamOperator<?> operator);
320
String getCommandType();
321
}
322
323
public enum TestCommandScope {
324
ALL_SUBTASKS, SINGLE_SUBTASK
325
}
326
327
// Test data element
328
public class TestDataElement {
329
public String value;
330
public long timestamp;
331
public TestDataElement(String value, long timestamp);
332
}
333
334
// Session window testing types
335
public class SessionEvent {
336
public String getSessionId();
337
public long getTimestamp();
338
public TestEventPayload getPayload();
339
}
340
341
public class TestEventPayload {
342
public String data;
343
public Map<String, Object> properties;
344
}
345
346
// Enumeration types
347
public enum FailoverStrategy {
348
RestartAllFailoverStrategy,
349
RestartPipelinedRegionFailoverStrategy
350
}
351
352
public enum ExecutionMode {
353
CREATE_SNAPSHOT,
354
VERIFY_SNAPSHOT
355
}
356
357
public enum SnapshotType {
358
SAVEPOINT_CANONICAL,
359
SAVEPOINT_NATIVE,
360
CHECKPOINT
361
}
362
363
// Configuration types
364
public class SessionConfiguration {
365
public long sessionTimeout;
366
public long sessionGap;
367
public int maxConcurrentSessions;
368
}
369
370
public class TestConfiguration {
371
public int parallelism;
372
public long checkpointInterval;
373
public boolean enableCheckpointing;
374
}
375
```