0
# External System Integration
1
2
The external system integration framework provides abstractions for connectors to interact with their respective external systems during testing. This includes data generation, reading, writing, and connector instantiation.
3
4
## Capabilities
5
6
### Base External Context
7
8
Foundation interface for all external system integrations.
9
10
```java { .api }
11
/**
12
* Base interface for external system integration in testing framework
13
*/
14
public interface ExternalContext extends AutoCloseable {
15
/**
16
* Get connector JAR URLs for job submission
17
* @return List of connector JAR URLs to attach to Flink jobs
18
*/
19
List<URL> getConnectorJarPaths();
20
}
21
22
/**
23
* Factory for creating external context instances
24
* @param <C> Type of external context to create
25
*/
26
public interface ExternalContextFactory<C extends ExternalContext> {
27
/**
28
* Create external context instance for test
29
* @param testName Name of the current test for resource isolation
30
* @return External context instance
31
*/
32
C createExternalContext(String testName);
33
}
34
```
35
36
**Usage Examples:**
37
38
```java
39
// External context factory registration
40
@TestContext
41
ExternalContextFactory<MyExternalContext> contextFactory =
42
testName -> new MyExternalContext(testName);
43
44
// Custom external context implementation
45
public class MyExternalContext implements ExternalContext {
46
private final String testName;
47
48
public MyExternalContext(String testName) {
49
this.testName = testName;
50
}
51
52
@Override
53
public List<URL> getConnectorJarPaths() {
54
return Arrays.asList(
55
new File("target/my-connector.jar").toURI().toURL()
56
);
57
}
58
59
@Override
60
public void close() throws Exception {
61
// Cleanup external resources
62
}
63
}
64
```
65
66
### Sink External Context
67
68
Abstract base class for sink connector external system integration.
69
70
```java { .api }
71
/**
72
* External context for DataStream sink testing
73
* @param <T> Type of data elements handled by the sink
74
*/
75
public abstract class DataStreamSinkExternalContext<T> extends ExternalContext {
76
77
/**
78
* Create sink instance for testing
79
* @param sinkSettings Configuration settings for the sink
80
* @return Configured sink instance
81
* @throws UnsupportedOperationException if settings combination not supported
82
*/
83
public abstract Sink<T> createSink(TestingSinkSettings sinkSettings);
84
85
/**
86
* Generate test data for sink validation
87
* @param sinkSettings Sink configuration settings
88
* @param seed Random seed for reproducible data generation
89
* @return List of test data elements
90
*/
91
public abstract List<T> generateTestData(TestingSinkSettings sinkSettings, long seed);
92
93
/**
94
* Create reader for validating data written to external system
95
* @param sinkSettings Sink configuration settings
96
* @return Data reader for external system validation
97
*/
98
public abstract ExternalSystemDataReader<T> createSinkDataReader(TestingSinkSettings sinkSettings);
99
100
/**
101
* Get type information for data elements
102
* @return TypeInformation for proper serialization
103
*/
104
public abstract TypeInformation<T> getProducedType();
105
}
106
107
/**
108
* External context specifically for Sink V2 API
109
* @param <T> Type of data elements
110
*/
111
public abstract class DataStreamSinkV2ExternalContext<T> extends DataStreamSinkExternalContext<T> {
112
// Inherits all methods from DataStreamSinkExternalContext
113
// Specifically for org.apache.flink.api.connector.sink2.Sink implementations
114
}
115
116
/**
117
* External context for Table API sink testing
118
* @param <T> Type of data elements
119
*/
120
public abstract class TableSinkExternalContext<T> extends ExternalContext {
121
// Table-specific sink testing methods
122
}
123
```
124
125
**Usage Examples:**
126
127
```java
128
public class KafkaSinkExternalContext extends DataStreamSinkV2ExternalContext<String> {
129
130
private final String topicName;
131
private final KafkaContainer kafkaContainer;
132
133
public KafkaSinkExternalContext(String testName) {
134
this.topicName = "test-topic-" + testName;
135
this.kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"));
136
this.kafkaContainer.start();
137
}
138
139
@Override
140
public Sink<String> createSink(TestingSinkSettings sinkSettings) {
141
Properties props = new Properties();
142
props.setProperty("bootstrap.servers", kafkaContainer.getBootstrapServers());
143
144
return KafkaSink.<String>builder()
145
.setBootstrapServers(kafkaContainer.getBootstrapServers())
146
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
147
.setTopic(topicName)
148
.setValueSerializationSchema(new SimpleStringSchema())
149
.build())
150
.build();
151
}
152
153
@Override
154
public List<String> generateTestData(TestingSinkSettings sinkSettings, long seed) {
155
Random random = new Random(seed);
156
return IntStream.range(0, 100)
157
.mapToObj(i -> "test-record-" + i + "-" + random.nextInt(1000))
158
.collect(Collectors.toList());
159
}
160
161
@Override
162
public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings sinkSettings) {
163
return new KafkaDataReader(kafkaContainer.getBootstrapServers(), topicName);
164
}
165
166
@Override
167
public TypeInformation<String> getProducedType() {
168
return Types.STRING;
169
}
170
171
@Override
172
public void close() throws Exception {
173
kafkaContainer.stop();
174
}
175
}
176
```
177
178
### Source External Context
179
180
Abstract base class for source connector external system integration.
181
182
```java { .api }
183
/**
184
* External context for DataStream source testing
185
* @param <T> Type of data elements produced by the source
186
*/
187
public abstract class DataStreamSourceExternalContext<T> extends ExternalContext {
188
189
/**
190
* Create source instance for testing
191
* @param sourceSettings Configuration settings for the source
192
* @return Configured source instance
193
* @throws UnsupportedOperationException if settings combination not supported
194
*/
195
public abstract Source<T, ?, ?> createSource(TestingSourceSettings sourceSettings);
196
197
/**
198
* Generate test data for specific split
199
* @param sourceSettings Source configuration settings
200
* @param splitIndex Index of the split (for multiple split scenarios)
201
* @param seed Random seed for reproducible data generation
202
* @return List of test data elements for the split
203
*/
204
public abstract List<T> generateTestData(TestingSourceSettings sourceSettings, int splitIndex, long seed);
205
206
/**
207
* Create writer for sending test data to external system split
208
* @param sourceSettings Source configuration settings
209
* @return Split data writer for external system
210
*/
211
public abstract ExternalSystemSplitDataWriter<T> createSourceSplitDataWriter(TestingSourceSettings sourceSettings);
212
213
/**
214
* Get type information for data elements
215
* @return TypeInformation for proper deserialization
216
*/
217
public abstract TypeInformation<T> getProducedType();
218
}
219
220
/**
221
* External context for Table API source testing
222
* @param <T> Type of data elements
223
*/
224
public abstract class TableSourceExternalContext<T> extends ExternalContext {
225
// Table-specific source testing methods
226
}
227
```
228
229
**Usage Examples:**
230
231
```java
232
public class KafkaSourceExternalContext extends DataStreamSourceExternalContext<String> {
233
234
private final KafkaContainer kafkaContainer;
235
private final Map<Integer, String> splitTopics = new HashMap<>();
236
237
public KafkaSourceExternalContext(String testName) {
238
this.kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"));
239
this.kafkaContainer.start();
240
}
241
242
@Override
243
public Source<String, ?, ?> createSource(TestingSourceSettings sourceSettings) {
244
return KafkaSource.<String>builder()
245
.setBootstrapServers(kafkaContainer.getBootstrapServers())
246
.setTopics(splitTopics.values())
247
.setValueOnlyDeserializer(new SimpleStringSchema())
248
.setStartingOffsets(OffsetsInitializer.earliest())
249
.setBounded(sourceSettings.getBoundedness() == Boundedness.BOUNDED ?
250
OffsetsInitializer.latest() : null)
251
.build();
252
}
253
254
@Override
255
public List<String> generateTestData(TestingSourceSettings sourceSettings, int splitIndex, long seed) {
256
Random random = new Random(seed);
257
return IntStream.range(0, 50)
258
.mapToObj(i -> "split-" + splitIndex + "-record-" + i + "-" + random.nextInt(1000))
259
.collect(Collectors.toList());
260
}
261
262
@Override
263
public ExternalSystemSplitDataWriter<T> createSourceSplitDataWriter(TestingSourceSettings sourceSettings) {
264
return new KafkaSplitDataWriter(kafkaContainer.getBootstrapServers());
265
}
266
267
@Override
268
public TypeInformation<String> getProducedType() {
269
return Types.STRING;
270
}
271
}
272
```
273
274
### Data Access Interfaces
275
276
Interfaces for reading and writing data to external systems.
277
278
```java { .api }
279
/**
280
* Interface for reading data from external systems (used by sinks for validation)
281
* @param <T> Type of data elements
282
*/
283
public interface ExternalSystemDataReader<T> {
284
/**
285
* Poll for available data from external system
286
* @param timeout Maximum time to wait for data
287
* @return List of available data elements (may be empty)
288
*/
289
List<T> poll(Duration timeout);
290
}
291
292
/**
293
* Interface for writing data to external system splits (used by sources for test data setup)
294
* @param <T> Type of data elements
295
*/
296
public interface ExternalSystemSplitDataWriter<T> {
297
/**
298
* Write records to external system split
299
* @param records List of records to write
300
*/
301
void writeRecords(List<T> records);
302
}
303
```
304
305
**Usage Examples:**
306
307
```java
308
public class KafkaDataReader implements ExternalSystemDataReader<String> {
309
private final KafkaConsumer<String, String> consumer;
310
311
public KafkaDataReader(String bootstrapServers, String topic) {
312
Properties props = new Properties();
313
props.setProperty("bootstrap.servers", bootstrapServers);
314
props.setProperty("group.id", "test-consumer-" + UUID.randomUUID());
315
props.setProperty("key.deserializer", StringDeserializer.class.getName());
316
props.setProperty("value.deserializer", StringDeserializer.class.getName());
317
props.setProperty("auto.offset.reset", "earliest");
318
319
this.consumer = new KafkaConsumer<>(props);
320
this.consumer.subscribe(Arrays.asList(topic));
321
}
322
323
@Override
324
public List<String> poll(Duration timeout) {
325
ConsumerRecords<String, String> records = consumer.poll(timeout);
326
return StreamSupport.stream(records.spliterator(), false)
327
.map(ConsumerRecord::value)
328
.collect(Collectors.toList());
329
}
330
}
331
332
public class KafkaSplitDataWriter implements ExternalSystemSplitDataWriter<String> {
333
private final KafkaProducer<String, String> producer;
334
private final String topicPrefix;
335
336
public KafkaSplitDataWriter(String bootstrapServers) {
337
Properties props = new Properties();
338
props.setProperty("bootstrap.servers", bootstrapServers);
339
props.setProperty("key.serializer", StringSerializer.class.getName());
340
props.setProperty("value.serializer", StringSerializer.class.getName());
341
342
this.producer = new KafkaProducer<>(props);
343
this.topicPrefix = "test-source-";
344
}
345
346
@Override
347
public void writeRecords(List<String> records) {
348
String topic = topicPrefix + Thread.currentThread().getId();
349
records.forEach(record -> {
350
producer.send(new ProducerRecord<>(topic, record));
351
});
352
producer.flush();
353
}
354
}
355
```
356
357
### Containerized External Systems
358
359
Default implementation for containerized external systems using TestContainers.
360
361
```java { .api }
362
/**
363
* Abstract base class for containerized external systems
364
*/
365
public abstract class DefaultContainerizedExternalSystem {
366
367
/**
368
* Start containers and prepare external system
369
*/
370
protected abstract void startContainers();
371
372
/**
373
* Stop containers and cleanup resources
374
*/
375
protected abstract void stopContainers();
376
377
/**
378
* Get connection configuration for connectors
379
* @return Configuration properties for connector
380
*/
381
protected abstract Properties getConnectionProperties();
382
}
383
```
384
385
## Configuration Types
386
387
```java { .api }
388
/**
389
* Configuration settings for sink testing
390
*/
391
public class TestingSinkSettings {
392
public static Builder builder();
393
394
public static class Builder {
395
public Builder setCheckpointingMode(CheckpointingMode checkpointingMode);
396
public TestingSinkSettings build();
397
}
398
399
public CheckpointingMode getCheckpointingMode();
400
}
401
402
/**
403
* Configuration settings for source testing
404
*/
405
public class TestingSourceSettings {
406
public static Builder builder();
407
408
public static class Builder {
409
public Builder setBoundedness(Boundedness boundedness);
410
public Builder setCheckpointingMode(CheckpointingMode checkpointingMode);
411
public TestingSourceSettings build();
412
}
413
414
public Boundedness getBoundedness();
415
public CheckpointingMode getCheckpointingMode();
416
}
417
```
418
419
## Integration Patterns
420
421
### Lifecycle Management
422
423
External contexts follow a predictable lifecycle:
424
425
1. **Creation**: Context created via factory for each test case
426
2. **Setup**: External system resources initialized (containers, topics, etc.)
427
3. **Test Execution**: Context used throughout test execution
428
4. **Cleanup**: Context closed automatically after test completion
429
430
### Resource Isolation
431
432
Each test case gets its own external context instance, ensuring:
433
434
- **Test Isolation**: No interference between test cases
435
- **Resource Cleanup**: Automatic cleanup of external resources
436
- **Parallel Execution**: Tests can run in parallel safely
437
438
### Error Handling
439
440
External contexts should handle common error scenarios:
441
442
- **Resource Unavailability**: Throw clear exceptions when external systems are not available
443
- **Configuration Errors**: Validate configuration and provide helpful error messages
444
- **Cleanup Failures**: Log warnings but don't fail tests on cleanup issues