0
# Connector Testing Framework
1
2
Comprehensive testing framework for Flink connectors with support for external systems, multiple test environments, and automated test suites. This framework enables thorough testing of both source and sink connectors with real external systems and controlled test environments.
3
4
## Capabilities
5
6
### Test Framework Core
7
8
#### ConnectorTestingExtension
9
10
JUnit 5 extension that provides comprehensive testing infrastructure for Flink connectors.
11
12
```java { .api }
13
/**
14
* JUnit 5 extension for connector testing
15
* Manages test lifecycle, external systems, and test environments
16
*/
17
@ExtendWith(ConnectorTestingExtension.class)
18
class ConnectorTestingExtension implements BeforeAllCallback, AfterAllCallback {
19
void beforeAll(ExtensionContext context) throws Exception;
20
void afterAll(ExtensionContext context) throws Exception;
21
}
22
```
23
24
**Usage Examples:**
25
26
```java
27
import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
28
import org.junit.jupiter.api.extension.ExtendWith;
29
30
@ExtendWith(ConnectorTestingExtension.class)
31
public class MyConnectorTest {
32
// Test methods will have access to managed test infrastructure
33
34
@Test
35
public void testSourceConnector() {
36
// Test implementation
37
}
38
}
39
```
40
41
### Test Environment Management
42
43
Core interfaces and implementations for managing test execution environments.
44
45
#### TestEnvironment
46
47
Base interface for test execution environments that can run Flink jobs.
48
49
```java { .api }
50
/**
51
* Test execution environment interface
52
* Provides abstraction over different Flink execution environments
53
*/
54
interface TestEnvironment extends TestResource {
55
/** Submit and execute a Flink job */
56
JobExecutionResult executeJob(JobGraph job) throws Exception;
57
58
/** Get cluster information */
59
ClusterClient<?> getClusterClient();
60
61
/** Get job manager REST address */
62
String getRestAddress();
63
64
/** Get web UI URL */
65
String getWebUIUrl();
66
}
67
68
/**
69
* Base interface for test resources with lifecycle management
70
*/
71
interface TestResource extends AutoCloseable {
72
/** Start/initialize the resource */
73
void startUp() throws Exception;
74
75
/** Stop/cleanup the resource */
76
void tearDown() throws Exception;
77
78
void close() throws Exception;
79
}
80
```
81
82
#### MiniClusterTestEnvironment
83
84
Test environment based on Flink's embedded MiniCluster for fast, lightweight testing.
85
86
```java { .api }
87
/**
88
* MiniCluster-based test environment
89
* Provides fast in-memory Flink execution for testing
90
*/
91
class MiniClusterTestEnvironment implements TestEnvironment, ClusterControllable {
92
/** Create with default configuration */
93
MiniClusterTestEnvironment();
94
95
/** Create with custom configuration */
96
MiniClusterTestEnvironment(MiniClusterConfiguration config);
97
98
/** Create builder for configuration */
99
static Builder builder();
100
101
JobExecutionResult executeJob(JobGraph job) throws Exception;
102
void triggerCheckpoint(long checkpointId) throws Exception;
103
void cancelJob(JobID jobId) throws Exception;
104
105
static class Builder {
106
Builder setParallelism(int parallelism);
107
Builder setCheckpointingEnabled(boolean enabled);
108
Builder setCheckpointInterval(Duration interval);
109
MiniClusterTestEnvironment build();
110
}
111
}
112
```
113
114
#### FlinkContainerTestEnvironment
115
116
Test environment using Docker containers for testing with a real Flink cluster.
117
118
```java { .api }
119
/**
120
* Docker container-based test environment
121
* Provides realistic Flink cluster environment for integration testing
122
*/
123
class FlinkContainerTestEnvironment implements TestEnvironment, ClusterControllable {
124
/** Create with default Flink image */
125
FlinkContainerTestEnvironment();
126
127
/** Create with custom Flink image */
128
FlinkContainerTestEnvironment(String flinkImageName);
129
130
/** Create builder for configuration */
131
static Builder builder();
132
133
JobExecutionResult executeJob(JobGraph job) throws Exception;
134
void restartTaskManager(int taskManagerIndex) throws Exception;
135
void stopTaskManager(int taskManagerIndex) throws Exception;
136
137
static class Builder {
138
Builder withFlinkImage(String imageName);
139
Builder withTaskManagers(int count);
140
Builder withTaskSlots(int slots);
141
Builder withJobManagerMemory(String memory);
142
Builder withTaskManagerMemory(String memory);
143
FlinkContainerTestEnvironment build();
144
}
145
}
146
```
147
148
**Usage Examples:**
149
150
```java
151
import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
152
import org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment;
153
154
// MiniCluster environment for fast tests
155
MiniClusterTestEnvironment miniEnv = MiniClusterTestEnvironment.builder()
156
.setParallelism(4)
157
.setCheckpointingEnabled(true)
158
.setCheckpointInterval(Duration.ofSeconds(1))
159
.build();
160
161
// Container environment for integration tests
162
FlinkContainerTestEnvironment containerEnv = FlinkContainerTestEnvironment.builder()
163
.withFlinkImage("flink:1.17")
164
.withTaskManagers(2)
165
.withTaskSlots(4)
166
.withJobManagerMemory("1g")
167
.withTaskManagerMemory("2g")
168
.build();
169
```
170
171
### External System Testing
172
173
Framework for testing connectors with real external systems like databases, message queues, etc.
174
175
#### ExternalContext
176
177
Base interface for managing external system lifecycle and interaction during tests.
178
179
```java { .api }
180
/**
181
* Context for external system interaction
182
* Manages lifecycle and provides access to external systems
183
*/
184
interface ExternalContext extends AutoCloseable {
185
/** Initialize external system for testing */
186
void setUp() throws Exception;
187
188
/** Clean up external system after testing */
189
void tearDown() throws Exception;
190
191
/** Get connection information for Flink connectors */
192
Properties getConnectionProperties();
193
194
/** Generate unique identifier for this test run */
195
String generateTestId();
196
}
197
198
/**
199
* Factory for creating external contexts
200
*/
201
interface ExternalContextFactory<C extends ExternalContext> {
202
/** Create external context for testing */
203
C createExternalContext(String testName);
204
205
/** Get display name for this external system */
206
String getDisplayName();
207
}
208
```
209
210
#### Data Reading and Writing
211
212
Interfaces for reading and writing test data to external systems.
213
214
```java { .api }
215
/**
216
* Read data from external systems for verification
217
*/
218
interface ExternalSystemDataReader<T> extends AutoCloseable {
219
/** Read all data from external system */
220
List<T> readData() throws Exception;
221
222
/** Read data with timeout */
223
List<T> readData(Duration timeout) throws Exception;
224
225
/** Read data matching criteria */
226
List<T> readData(Predicate<T> filter) throws Exception;
227
228
void close() throws Exception;
229
}
230
231
/**
232
* Write data to external systems in splits for parallel testing
233
*/
234
interface ExternalSystemSplitDataWriter<T> extends AutoCloseable {
235
/** Write split of data to external system */
236
void writeSplit(List<T> data, int splitIndex) throws Exception;
237
238
/** Write all splits and finalize */
239
void writeAndFinalize(List<List<T>> splits) throws Exception;
240
241
/** Get number of supported splits */
242
int getMaxParallelism();
243
244
void close() throws Exception;
245
}
246
```
247
248
**Usage Examples:**
249
250
```java
251
import org.apache.flink.connector.testframe.external.ExternalContext;
252
import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
253
254
// External context implementation
255
public class DatabaseExternalContext implements ExternalContext {
256
private Connection connection;
257
258
@Override
259
public void setUp() throws Exception {
260
connection = DriverManager.getConnection(getConnectionUrl());
261
// Initialize test database
262
}
263
264
@Override
265
public Properties getConnectionProperties() {
266
Properties props = new Properties();
267
props.setProperty("url", getConnectionUrl());
268
return props;
269
}
270
}
271
272
// Reading test data
273
ExternalSystemDataReader<MyRecord> reader = createDataReader();
274
List<MyRecord> results = reader.readData(Duration.ofSeconds(30));
275
assertEquals(expectedRecords, results);
276
```
277
278
### Source Testing Framework
279
280
Specialized testing framework for Flink source connectors.
281
282
#### Source Test Contexts
283
284
Contexts specifically designed for testing source connectors with different APIs.
285
286
```java { .api }
287
/**
288
* Context for DataStream source testing
289
* Provides source creation and data verification capabilities
290
*/
291
interface DataStreamSourceExternalContext<T> extends ExternalContext {
292
/** Create source for DataStream API */
293
SourceFunction<T> createSource(SourceSplitSerializer<?> splitSerializer);
294
295
/** Get data reader for verification */
296
ExternalSystemDataReader<T> createDataReader();
297
298
/** Generate test data splits */
299
List<List<T>> generateTestDataSplits(int numSplits);
300
}
301
302
/**
303
* Context for Table API source testing
304
*/
305
interface TableSourceExternalContext extends ExternalContext {
306
/** Create table source for Table API */
307
DynamicTableSource createTableSource(TableDescriptor descriptor);
308
309
/** Get table descriptor for source */
310
TableDescriptor getTableDescriptor();
311
312
/** Create catalog for table registration */
313
Catalog createCatalog();
314
}
315
```
316
317
#### SourceTestSuiteBase
318
319
Base class providing comprehensive test suite for source connectors.
320
321
```java { .api }
322
/**
323
* Base class for source test suites
324
* Provides standard test methods for source connector validation
325
*/
326
abstract class SourceTestSuiteBase<T> {
327
/** Test basic source functionality */
328
@TestTemplate
329
void testSourceReading() throws Exception;
330
331
/** Test source with checkpointing */
332
@TestTemplate
333
void testSourceWithCheckpointing() throws Exception;
334
335
/** Test source restart from checkpoint */
336
@TestTemplate
337
void testSourceRestartFromCheckpoint() throws Exception;
338
339
/** Test source parallelism handling */
340
@TestTemplate
341
void testSourceParallelism() throws Exception;
342
343
/** Test source idempotency */
344
@TestTemplate
345
void testSourceIdempotency() throws Exception;
346
347
/** Get external context for testing */
348
protected abstract DataStreamSourceExternalContext<T> getExternalContext();
349
350
/** Get test environment */
351
protected abstract TestEnvironment getTestEnvironment();
352
}
353
```
354
355
**Usage Examples:**
356
357
```java
358
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
359
import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
360
361
@ExtendWith(ConnectorTestingExtension.class)
362
public class MySourceConnectorTest extends SourceTestSuiteBase<MyRecord> {
363
364
@Override
365
protected DataStreamSourceExternalContext<MyRecord> getExternalContext() {
366
return new MySourceExternalContext();
367
}
368
369
@Override
370
protected TestEnvironment getTestEnvironment() {
371
return MiniClusterTestEnvironment.builder()
372
.setParallelism(2)
373
.build();
374
}
375
376
// All standard source tests are inherited and will run automatically
377
// testSourceReading(), testSourceWithCheckpointing(), etc.
378
}
379
```
380
381
### Sink Testing Framework
382
383
Specialized testing framework for Flink sink connectors.
384
385
#### Sink Test Contexts
386
387
Contexts for testing sink connectors with different APIs and semantics.
388
389
```java { .api }
390
/**
391
* Context for DataStream sink testing
392
* Supports both legacy SinkFunction and new Sink API
393
*/
394
interface DataStreamSinkExternalContext<T> extends ExternalContext, ResultTypeQueryable<T> {
395
/** Create sink function for legacy API */
396
SinkFunction<T> createSinkFunction();
397
398
/** Create data reader for result verification */
399
ExternalSystemDataReader<T> createDataReader();
400
401
/** Generate test data for sink testing */
402
List<T> generateTestData(int numRecords);
403
404
/** Get type information for records */
405
TypeInformation<T> getProducedType();
406
}
407
408
/**
409
* Context for Sink V2 API testing
410
*/
411
interface DataStreamSinkV2ExternalContext<T> extends DataStreamSinkExternalContext<T> {
412
/** Create sink for new Sink API */
413
Sink<T> createSink();
414
415
/** Test exactly-once semantics support */
416
boolean supportsExactlyOnce();
417
418
/** Test at-least-once semantics support */
419
boolean supportsAtLeastOnce();
420
}
421
422
/**
423
* Context for Table API sink testing
424
*/
425
interface TableSinkExternalContext extends ExternalContext {
426
/** Create table sink for Table API */
427
DynamicTableSink createTableSink(TableDescriptor descriptor);
428
429
/** Get table descriptor for sink */
430
TableDescriptor getTableDescriptor();
431
432
/** Create catalog for table registration */
433
Catalog createCatalog();
434
}
435
```
436
437
#### SinkTestSuiteBase
438
439
Base class providing comprehensive test suite for sink connectors.
440
441
```java { .api }
442
/**
443
* Base class for sink test suites
444
* Provides standard test methods for sink connector validation
445
*/
446
abstract class SinkTestSuiteBase<T> {
447
/** Test basic sink writing */
448
@TestTemplate
449
void testSinkWriting() throws Exception;
450
451
/** Test sink with checkpointing */
452
@TestTemplate
453
void testSinkWithCheckpointing() throws Exception;
454
455
/** Test sink exactly-once semantics */
456
@TestTemplate
457
void testSinkExactlyOnce() throws Exception;
458
459
/** Test sink at-least-once semantics */
460
@TestTemplate
461
void testSinkAtLeastOnce() throws Exception;
462
463
/** Test sink with multiple writers */
464
@TestTemplate
465
void testSinkParallelism() throws Exception;
466
467
/** Test sink failure recovery */
468
@TestTemplate
469
void testSinkFailureRecovery() throws Exception;
470
471
/** Get external context for testing */
472
protected abstract DataStreamSinkExternalContext<T> getExternalContext();
473
474
/** Get test environment */
475
protected abstract TestEnvironment getTestEnvironment();
476
}
477
```
478
479
**Usage Examples:**
480
481
```java
482
import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
483
import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
484
485
@ExtendWith(ConnectorTestingExtension.class)
486
public class MySinkConnectorTest extends SinkTestSuiteBase<MyRecord> {
487
488
@Override
489
protected DataStreamSinkV2ExternalContext<MyRecord> getExternalContext() {
490
return new MySinkExternalContext();
491
}
492
493
@Override
494
protected TestEnvironment getTestEnvironment() {
495
return FlinkContainerTestEnvironment.builder()
496
.withTaskManagers(2)
497
.build();
498
}
499
500
// All standard sink tests are inherited
501
// testSinkWriting(), testSinkExactlyOnce(), etc.
502
}
503
```
504
505
### Test Configuration Annotations
506
507
Annotations for configuring connector test behavior and requirements.
508
509
```java { .api }
510
/**
511
* Configure test context
512
*/
513
@interface TestContext {
514
/** External context factory class */
515
Class<? extends ExternalContextFactory<?>> value();
516
}
517
518
/**
519
* Configure test environment
520
*/
521
@interface TestEnv {
522
/** Test environment class */
523
Class<? extends TestEnvironment> value();
524
}
525
526
/**
527
* Configure external system for testing
528
*/
529
@interface TestExternalSystem {
530
/** External system identifier */
531
String value();
532
}
533
534
/**
535
* Configure test semantics requirements
536
*/
537
@interface TestSemantics {
538
/** Required delivery guarantees */
539
DeliveryGuarantee[] value();
540
}
541
```
542
543
**Usage Examples:**
544
545
```java
546
import org.apache.flink.connector.testframe.junit.annotations.*;
547
548
@ExtendWith(ConnectorTestingExtension.class)
549
@TestContext(MyConnectorExternalContextFactory.class)
550
@TestEnv(MiniClusterTestEnvironment.class)
551
@TestExternalSystem("kafka")
552
@TestSemantics({DeliveryGuarantee.EXACTLY_ONCE, DeliveryGuarantee.AT_LEAST_ONCE})
553
public class ConfiguredConnectorTest extends SourceTestSuiteBase<String> {
554
// Test configuration provided by annotations
555
}
556
```
557
558
### Container Testing Support
559
560
Docker container management for realistic Flink cluster testing.
561
562
#### FlinkContainers
563
564
Utility for managing Flink Docker containers in tests.
565
566
```java { .api }
567
/**
568
* Manage Flink containers for testing
569
* Handles JobManager and TaskManager containers
570
*/
571
class FlinkContainers implements BeforeAllCallback, AfterAllCallback {
572
/** Create with default Flink image */
573
FlinkContainers();
574
575
/** Create with custom image */
576
FlinkContainers(String flinkImage);
577
578
/** Get JobManager container */
579
GenericContainer<?> getJobManagerContainer();
580
581
/** Get TaskManager containers */
582
List<GenericContainer<?>> getTaskManagerContainers();
583
584
/** Get Flink REST client */
585
RestClusterClient<String> getRestClient();
586
587
void beforeAll(ExtensionContext context) throws Exception;
588
void afterAll(ExtensionContext context) throws Exception;
589
}
590
591
/**
592
* Build custom Flink Docker images for testing
593
*/
594
class FlinkImageBuilder {
595
/** Create builder with base Flink image */
596
static FlinkImageBuilder fromBaseImage(String baseImage);
597
598
/** Add JAR file to image */
599
FlinkImageBuilder addJar(Path jarPath);
600
601
/** Add connector dependencies */
602
FlinkImageBuilder addConnectorDependencies(String... coordinates);
603
604
/** Set custom configuration */
605
FlinkImageBuilder withConfiguration(String key, String value);
606
607
/** Build the custom image */
608
String build();
609
}
610
```
611
612
**Usage Examples:**
613
614
```java
615
import org.apache.flink.connector.testframe.container.FlinkContainers;
616
import org.apache.flink.connector.testframe.container.FlinkImageBuilder;
617
618
// Custom Flink image with connector
619
String customImage = FlinkImageBuilder
620
.fromBaseImage("flink:1.17")
621
.addConnectorDependencies("org.apache.flink:flink-connector-kafka:1.17.0")
622
.withConfiguration("state.backend", "filesystem")
623
.build();
624
625
// Use containers in test
626
@RegisterExtension
627
static FlinkContainers flinkContainers = new FlinkContainers(customImage);
628
629
@Test
630
void testWithContainers() {
631
RestClusterClient<?> client = flinkContainers.getRestClient();
632
// Submit job to container cluster
633
}
634
```
635
636
### Utility Classes
637
638
#### CollectIteratorAssert
639
640
Specialized assertions for collected test results.
641
642
```java { .api }
643
/**
644
* Assertions for collected test results
645
* Provides convenient verification of streaming results
646
*/
647
class CollectIteratorAssert<T> extends AbstractIterableAssert<CollectIteratorAssert<T>, Iterable<T>, T, ObjectAssert<T>> {
648
/** Assert results match expected values in order */
649
CollectIteratorAssert<T> containsExactly(T... expected);
650
651
/** Assert results contain expected values in any order */
652
CollectIteratorAssert<T> containsExactlyInAnyOrder(T... expected);
653
654
/** Assert result count matches expected */
655
CollectIteratorAssert<T> hasSize(int expectedSize);
656
657
/** Assert all results match predicate */
658
CollectIteratorAssert<T> allMatch(Predicate<T> predicate);
659
}
660
```
661
662
#### MetricQuerier
663
664
Utility for querying and asserting on Flink metrics during tests.
665
666
```java { .api }
667
/**
668
* Query and assert on Flink metrics
669
* Enables verification of connector behavior through metrics
670
*/
671
class MetricQuerier {
672
/** Create querier for test environment */
673
static MetricQuerier forEnvironment(TestEnvironment environment);
674
675
/** Query counter metric value */
676
long getCounterValue(String metricName);
677
678
/** Query gauge metric value */
679
double getGaugeValue(String metricName);
680
681
/** Query histogram metric */
682
HistogramStatistics getHistogramValue(String metricName);
683
684
/** Wait for metric to reach expected value */
685
void waitForMetric(String metricName, long expectedValue, Duration timeout);
686
687
/** Assert metric has expected value */
688
void assertMetricEquals(String metricName, long expectedValue);
689
}
690
```
691
692
**Usage Examples:**
693
694
```java
695
import org.apache.flink.connector.testframe.utils.CollectIteratorAssert;
696
import org.apache.flink.connector.testframe.utils.MetricQuerier;
697
698
// Collect and assert results
699
CloseableIterator<String> results = // ... collect from job
700
CollectIteratorAssert.assertThat(results)
701
.hasSize(1000)
702
.containsExactlyInAnyOrder(expectedResults.toArray(new String[0]));
703
704
// Query metrics
705
MetricQuerier metrics = MetricQuerier.forEnvironment(testEnvironment);
706
metrics.waitForMetric("numRecordsIn", 1000, Duration.ofSeconds(30));
707
metrics.assertMetricEquals("numRecordsOut", 1000);
708
```