0
# Apache Flink Connector Test Utils
1
2
The Apache Flink Connector Test Utils library provides a comprehensive testing framework for Apache Flink connectors. It enables developers to create standardized, robust tests for both source and sink connectors with support for various testing scenarios including failover, scaling, metrics validation, and external system integration.
3
4
## Package Information
5
6
- **Package Name**: flink-connector-test-utils
7
- **Package Type**: maven
8
- **Group ID**: org.apache.flink
9
- **Artifact ID**: flink-connector-test-utils
10
- **Language**: Java
11
- **Installation**:
12
```xml
13
<dependency>
14
<groupId>org.apache.flink</groupId>
15
<artifactId>flink-connector-test-utils</artifactId>
16
<version>2.1.0</version>
17
<scope>test</scope>
18
</dependency>
19
```
20
21
## Core Imports
22
23
```java
24
// Test suite base classes
25
import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
26
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
27
28
// JUnit annotations
29
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
30
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
31
32
// Test utilities
33
import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions;
34
import org.apache.flink.connector.testframe.utils.MetricQuerier;
35
36
// External context interfaces
37
import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
38
import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
39
```
40
41
## Basic Usage
42
43
### Creating a Sink Test Suite
44
45
```java
46
import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
47
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
48
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
49
import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
50
51
@ExtendWith(ConnectorTestingExtension.class)
52
public class MySinkTestSuite extends SinkTestSuiteBase<String> {
53
54
@TestEnv
55
MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();
56
57
@TestContext
58
ExternalContextFactory<MySinkExternalContext> sinkContextFactory =
59
(testName) -> new MySinkExternalContext(testName);
60
61
// Test methods are automatically provided by SinkTestSuiteBase
62
// Including: testBasicSink, testStartFromSavepoint, testScaleUp, etc.
63
}
64
65
// External context implementation
66
public class MySinkExternalContext extends DataStreamSinkV2ExternalContext<String> {
67
68
@Override
69
public Sink<String> createSink(TestingSinkSettings sinkSettings) {
70
// Return your sink implementation
71
return new MySink(/* configuration */);
72
}
73
74
@Override
75
public List<String> generateTestData(TestingSinkSettings sinkSettings, long seed) {
76
// Generate test data for your sink
77
return Arrays.asList("test1", "test2", "test3");
78
}
79
80
@Override
81
public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings sinkSettings) {
82
// Return a reader to validate data written to external system
83
return new MySinkDataReader(/* configuration */);
84
}
85
}
86
```
87
88
### Creating a Source Test Suite
89
90
```java
91
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
92
93
public class MySourceTestSuite extends SourceTestSuiteBase<String> {
94
95
@TestEnv
96
MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();
97
98
@TestContext
99
ExternalContextFactory<MySourceExternalContext> sourceContextFactory =
100
(testName) -> new MySourceExternalContext(testName);
101
102
// Test methods are automatically provided by SourceTestSuiteBase
103
// Including: testSourceSingleSplit, testMultipleSplits, testSavepoint, etc.
104
}
105
```
106
107
## Architecture
108
109
The testing framework is built around several key components:
110
111
- **Test Suite Base Classes**: Provide pre-built test cases for common scenarios (basic functionality, failover, scaling, metrics)
112
- **External Context Framework**: Abstracts external system interactions for both sources and sinks
113
- **Test Environment**: Manages Flink cluster lifecycle (MiniCluster or containerized)
114
- **JUnit Integration**: Annotation-driven configuration with resource lifecycle management
115
- **Assertion Utilities**: Specialized assertions for validating connector behavior with different semantic guarantees
116
- **Container Support**: TestContainers integration for isolated testing environments
117
118
## Capabilities
119
120
### Test Suite Framework
121
122
Core test suite base classes providing standardized test scenarios for connector validation. Supports both sink and source connectors with comprehensive test coverage.
123
124
```java { .api }
125
public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
126
// Test methods provided automatically via JUnit @TestTemplate
127
void testBasicSink(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic);
128
void testStartFromSavepoint(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic);
129
void testScaleUp(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic);
130
void testScaleDown(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic);
131
void testMetrics(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic);
132
}
133
134
public abstract class SourceTestSuiteBase<T> {
135
// Test methods provided automatically via JUnit @TestTemplate
136
void testSourceSingleSplit(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);
137
void testMultipleSplits(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);
138
void testSavepoint(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);
139
void testScaleUp(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);
140
void testScaleDown(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);
141
void testSourceMetrics(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);
142
void testIdleReader(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);
143
void testTaskManagerFailure(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, ClusterControllable controller, CheckpointingMode semantic);
144
}
145
```
146
147
[Test Suites](./test-suites.md)
148
149
### External System Integration
150
151
Framework for integrating with external systems, providing abstractions for source and sink connectors to interact with their respective external systems.
152
153
```java { .api }
154
public interface ExternalContext extends AutoCloseable {
155
List<URL> getConnectorJarPaths();
156
}
157
158
public interface ExternalContextFactory<C extends ExternalContext> {
159
C createExternalContext(String testName);
160
}
161
162
public abstract class DataStreamSinkV2ExternalContext<T> extends ExternalContext {
163
public abstract Sink<T> createSink(TestingSinkSettings sinkSettings);
164
public abstract List<T> generateTestData(TestingSinkSettings sinkSettings, long seed);
165
public abstract ExternalSystemDataReader<T> createSinkDataReader(TestingSinkSettings sinkSettings);
166
public abstract TypeInformation<T> getProducedType();
167
}
168
169
public abstract class DataStreamSourceExternalContext<T> extends ExternalContext {
170
public abstract Source<T, ?, ?> createSource(TestingSourceSettings sourceSettings);
171
public abstract List<T> generateTestData(TestingSourceSettings sourceSettings, int splitIndex, long seed);
172
public abstract ExternalSystemSplitDataWriter<T> createSourceSplitDataWriter(TestingSourceSettings sourceSettings);
173
public abstract TypeInformation<T> getProducedType();
174
}
175
```
176
177
[External System Integration](./external-systems.md)
178
179
### Test Environment Management
180
181
Test environment abstractions for managing Flink cluster lifecycle, supporting both MiniCluster and containerized deployments.
182
183
```java { .api }
184
public interface TestEnvironment extends TestResource {
185
StreamExecutionEnvironment createExecutionEnvironment(TestEnvironmentSettings envOptions);
186
Endpoint getRestEndpoint();
187
String getCheckpointUri();
188
189
class Endpoint {
190
public Endpoint(String address, int port);
191
public String getAddress();
192
public int getPort();
193
}
194
}
195
196
public interface TestResource {
197
void startUp() throws Exception;
198
void tearDown() throws Exception;
199
}
200
```
201
202
[Test Environments](./test-environments.md)
203
204
### Assertion and Validation Utilities
205
206
Specialized assertion utilities for validating connector behavior with support for different semantic guarantees (exactly-once, at-least-once).
207
208
```java { .api }
209
public final class CollectIteratorAssertions {
210
public static <T> CollectIteratorAssert<T> assertThat(Iterator<T> actual);
211
public static <T> UnorderedCollectIteratorAssert<T> assertUnordered(Iterator<T> actual);
212
}
213
214
public class CollectIteratorAssert<T> {
215
public CollectIteratorAssert<T> matchesRecordsFromSource(List<List<T>> expected, CheckpointingMode semantic);
216
public CollectIteratorAssert<T> withNumRecordsLimit(int limit);
217
}
218
```
219
220
[Assertions and Validation](./assertions.md)
221
222
### Metrics and Monitoring
223
224
Utilities for querying and validating Flink job metrics via REST API, enabling performance and behavior validation.
225
226
```java { .api }
227
public class MetricQuerier {
228
public MetricQuerier(Configuration configuration) throws ConfigurationException;
229
230
public static JobDetailsInfo getJobDetails(RestClient client, TestEnvironment.Endpoint endpoint, JobID jobId) throws Exception;
231
232
public Double getAggregatedMetricsByRestAPI(
233
TestEnvironment.Endpoint endpoint,
234
JobID jobId,
235
String sourceOrSinkName,
236
String metricName,
237
String filter
238
) throws Exception;
239
240
public AggregatedMetricsResponseBody getMetricList(TestEnvironment.Endpoint endpoint, JobID jobId, JobVertexID vertexId) throws Exception;
241
public AggregatedMetricsResponseBody getMetrics(TestEnvironment.Endpoint endpoint, JobID jobId, JobVertexID vertexId, String filters) throws Exception;
242
}
243
```
244
245
[Metrics and Monitoring](./metrics.md)
246
247
### JUnit Integration
248
249
Annotation-driven JUnit 5 integration with automatic resource lifecycle management and test parameterization.
250
251
```java { .api }
252
@Target(ElementType.FIELD)
253
@Retention(RetentionPolicy.RUNTIME)
254
public @interface TestEnv {}
255
256
@Target(ElementType.FIELD)
257
@Retention(RetentionPolicy.RUNTIME)
258
public @interface TestContext {}
259
260
@Target(ElementType.FIELD)
261
@Retention(RetentionPolicy.RUNTIME)
262
public @interface TestExternalSystem {}
263
264
@Target(ElementType.FIELD)
265
@Retention(RetentionPolicy.RUNTIME)
266
public @interface TestSemantics {}
267
268
public class ConnectorTestingExtension implements BeforeAllCallback, AfterAllCallback,
269
TestTemplateInvocationContextProvider, ParameterResolver {
270
// Automatic lifecycle management and parameter injection
271
}
272
```
273
274
[JUnit Integration](./junit-integration.md)
275
276
### Container Support
277
278
TestContainers integration for running tests in isolated containerized environments with custom Flink clusters.
279
280
```java { .api }
281
public class FlinkContainers {
282
public static FlinkContainer jobManager();
283
public static FlinkContainer taskManager();
284
public static FlinkContainer cluster();
285
}
286
287
public class FlinkContainerTestEnvironment implements TestEnvironment {
288
public FlinkContainerTestEnvironment(FlinkContainersSettings settings);
289
// Implements TestEnvironment methods
290
}
291
292
public class FlinkImageBuilder {
293
public static DockerImageName buildImage(List<URL> jarPaths) throws ImageBuildException;
294
}
295
```
296
297
[Container Support](./containers.md)
298
299
## Types
300
301
### Core Configuration Types
302
303
```java { .api }
304
public class TestEnvironmentSettings {
305
public static Builder builder();
306
307
public static class Builder {
308
public Builder setConnectorJarPaths(List<URL> connectorJarPaths);
309
public Builder setSavepointRestorePath(String savepointRestorePath);
310
public TestEnvironmentSettings build();
311
}
312
}
313
314
public class TestingSinkSettings {
315
public static Builder builder();
316
317
public static class Builder {
318
public Builder setCheckpointingMode(CheckpointingMode checkpointingMode);
319
public TestingSinkSettings build();
320
}
321
}
322
323
public class TestingSourceSettings {
324
public static Builder builder();
325
326
public static class Builder {
327
public Builder setBoundedness(Boundedness boundedness);
328
public Builder setCheckpointingMode(CheckpointingMode checkpointingMode);
329
public TestingSourceSettings build();
330
}
331
}
332
```
333
334
### Data Interface Types
335
336
```java { .api }
337
public interface ExternalSystemDataReader<T> {
338
List<T> poll(Duration timeout);
339
}
340
341
public interface ExternalSystemSplitDataWriter<T> {
342
void writeRecords(List<T> records);
343
}
344
```
345
346
### Container Configuration Types
347
348
```java { .api }
349
public class FlinkContainersSettings {
350
public static Builder builder();
351
352
public static class Builder {
353
public Builder setNumTaskManagers(int numTaskManagers);
354
public Builder setNumSlotsPerTaskManager(int numSlotsPerTaskManager);
355
public Builder setJobManagerMemory(String jobManagerMemory);
356
public Builder setTaskManagerMemory(String taskManagerMemory);
357
public FlinkContainersSettings build();
358
}
359
}
360
361
public class TestcontainersSettings {
362
public static Builder builder();
363
364
public static class Builder {
365
public Builder setNetwork(Network network);
366
public Builder setLogConsumers(Map<String, Consumer<OutputFrame>> logConsumers);
367
public TestcontainersSettings build();
368
}
369
}
370
```