0
# Apache Flink Tests
1
2
Apache Flink Tests provides comprehensive integration testing infrastructure for the Flink stream processing framework. This module contains test utilities, base classes, helper components, and testing patterns that enable thorough validation of Flink's streaming and batch processing capabilities, fault tolerance mechanisms, state management, and migration scenarios.
3
4
## Package Information
5
6
- **Package Name**: flink-tests_2.11
7
- **Package Type**: maven
8
- **Language**: Java with Scala support
9
- **Group ID**: org.apache.flink
10
- **Artifact ID**: flink-tests_2.11
11
- **Version**: 1.5.1
12
- **Installation**: Add Maven dependency to your test scope
13
14
## Core Imports
15
16
```java
17
import org.apache.flink.test.util.TestUtils;
18
import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase;
19
import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
20
import org.apache.flink.test.cancelling.CancelingTestBase;
21
```
22
23
For Scala tests:
24
```scala
25
import org.apache.flink.api.scala.completeness.ScalaAPICompletenessTestBase
26
import org.apache.flink.api.scala.migration.MigrationTestTypes._
27
```
28
29
## Basic Usage
30
31
```java
32
import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
33
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
34
35
public class MyFlinkTest extends StreamFaultToleranceTestBase {
36
37
@Override
38
public void testProgram(StreamExecutionEnvironment env) {
39
// Define your test streaming topology
40
env.fromElements(1, 2, 3, 4, 5)
41
.map(x -> x * 2)
42
.addSink(new TestListResultSink<>());
43
}
44
45
@Override
46
public void postSubmit() throws Exception {
47
// Add verification logic after execution
48
List<Integer> results = TestListResultSink.getResults();
49
assertEquals(Arrays.asList(2, 4, 6, 8, 10), results);
50
}
51
}
52
```
53
54
## Architecture
55
56
The flink-tests module is built around several key testing patterns:
57
58
- **Test Base Classes**: Abstract base classes providing standardized setup for different testing scenarios (fault tolerance, cancellation, migration, recovery)
59
- **Utility Components**: Helper classes for common testing operations (TestUtils, TestListResultSink, MigrationTestUtils)
60
- **Mock and Stub Components**: Testing implementations of Flink functions and operators for controlled testing scenarios
61
- **Configuration Infrastructure**: Standardized cluster setup and configuration management for reproducible test environments
62
- **Coordination Mechanisms**: File-based and accumulator-based coordination between test orchestration and job execution
63
64
This architecture enables comprehensive testing across all aspects of the Flink framework while providing reusable components for custom testing scenarios.
65
66
## Capabilities
67
68
### Test Base Classes
69
70
Foundation classes that provide standardized setup, execution patterns, and infrastructure for different types of Flink testing scenarios including fault tolerance, state migration, job cancellation, and recovery testing.
71
72
```java { .api }
73
public abstract class StreamFaultToleranceTestBase extends TestLogger {
74
public abstract void testProgram(StreamExecutionEnvironment env);
75
public abstract void postSubmit() throws Exception;
76
}
77
78
public abstract class SavepointMigrationTestBase extends TestBaseUtils {
79
protected final void executeAndSavepoint(StreamExecutionEnvironment env, String savepointPath, Tuple2<String, Integer>... expectedAccumulators) throws Exception;
80
protected final void restoreAndExecute(StreamExecutionEnvironment env, String savepointPath, Tuple2<String, Integer>... expectedAccumulators) throws Exception;
81
}
82
83
public abstract class CancelingTestBase extends TestLogger {
84
protected void runAndCancelJob(Plan plan, int msecsTillCanceling, int maxTimeTillCanceled) throws Exception;
85
}
86
```
87
88
[Test Base Classes](./test-base-classes.md)
89
90
### Test Utilities
91
92
Core utility classes and helper functions for common testing operations including execution handling, result collection, and test coordination mechanisms.
93
94
```java { .api }
95
public class TestUtils {
96
public static JobExecutionResult tryExecute(StreamExecutionEnvironment see, String name) throws Exception;
97
}
98
99
public class TestListResultSink<T> extends RichSinkFunction<T> {
100
public List<T> getResult();
101
public List<T> getSortedResult();
102
}
103
104
public class TestListWrapper {
105
public static int createList();
106
public static <T> List<T> getList(int listId);
107
}
108
```
109
110
[Test Utilities](./test-utilities.md)
111
112
### State Migration Testing
113
114
Comprehensive infrastructure for testing operator state migration and compatibility across Flink versions, including utilities for savepoint creation, restoration, and verification.
115
116
```java { .api }
117
public class MigrationTestUtils {
118
public static class CheckpointingNonParallelSourceWithListState implements SourceFunction<Tuple2<Long, Long>>, CheckpointedFunction;
119
public static class CheckingNonParallelSourceWithListState extends RichSourceFunction<Tuple2<Long, Long>> implements CheckpointedFunction;
120
public static class AccumulatorCountingSink<T> extends RichSinkFunction<T>;
121
}
122
123
public abstract class AbstractOperatorRestoreTestBase {
124
public abstract JobGraph createMigrationJob(StreamExecutionEnvironment env) throws Exception;
125
public abstract JobGraph createRestoredJob(StreamExecutionEnvironment env) throws Exception;
126
}
127
```
128
129
[State Migration Testing](./state-migration.md)
130
131
### Recovery and Fault Tolerance
132
133
Testing infrastructure for job recovery scenarios, restart strategies, failure simulation, and fault tolerance validation including TaskManager process failures.
134
135
```java { .api }
136
public abstract class SimpleRecoveryITCaseBase {
137
// Abstract methods for defining failing and successful execution plans
138
}
139
140
public abstract class AbstractTaskManagerProcessFailureRecoveryTest {
141
// Constants for file-based coordination
142
protected static final String READY_MARKER_FILE_PREFIX = "ready-";
143
protected static final String PROCEED_MARKER_FILE = "proceed";
144
protected static final String FINISH_MARKER_FILE_PREFIX = "finish-";
145
}
146
```
147
148
[Recovery and Fault Tolerance](./recovery-fault-tolerance.md)
149
150
### Scala Testing Support
151
152
Scala-specific testing components including API completeness validation, migration test types, and Scala-specific utility functions for comprehensive Scala API testing.
153
154
```scala { .api }
155
object MigrationTestTypes {
156
case class CustomCaseClass(a: String, b: Long)
157
case class CustomCaseClassWithNesting(a: Long, nested: CustomCaseClass)
158
object CustomEnum extends Enumeration {
159
val ONE, TWO, THREE = Value
160
}
161
}
162
163
abstract class ScalaAPICompletenessTestBase {
164
// Base class for testing Scala API completeness
165
}
166
```
167
168
[Scala Testing Support](./scala-testing.md)
169
170
## Constants and Configuration
171
172
### Standard Test Configuration
173
- **PARALLELISM**: 4 (most base classes)
174
- **HIGH_PARALLELISM**: 12 (StreamFaultToleranceTestBase)
175
- **NUM_TASK_MANAGERS**: 3 (StreamFaultToleranceTestBase)
176
- **NUM_TASK_SLOTS**: 4 (StreamFaultToleranceTestBase)
177
178
### Cluster Configuration
179
All base classes provide standardized MiniClusterResource configurations optimized for reliable testing environments with proper timeouts, memory allocation, and task distribution.
180
181
## Testing Categories
182
183
The flink-tests module enables comprehensive testing across these functional areas:
184
185
- **Broadcast Variables Testing** (`org.apache.flink.test.broadcastvars`)
186
- **Iterative Algorithm Testing** (`org.apache.flink.test.iterative`)
187
- **Streaming API Testing** (`org.apache.flink.test.streaming`)
188
- **State Management Testing** (`org.apache.flink.test.state`)
189
- **Runtime Testing** (`org.apache.flink.test.runtime`)
190
- **Recovery and Fault Tolerance** (`org.apache.flink.test.recovery`)
191
- **Operator Testing** (`org.apache.flink.test.operators`)
192
- **Checkpointing Testing** (`org.apache.flink.test.checkpointing`)
193
- **Windowing Testing** (`org.apache.flink.test.windowing`)