0
# Container Support
1
2
The container support provides TestContainers integration for running tests in isolated containerized environments with custom Flink clusters. This enables realistic testing scenarios with proper network isolation and resource constraints.
3
4
## Capabilities
5
6
### Flink Containers
7
8
Utilities for creating and managing Flink containers using TestContainers.
9
10
```java { .api }
11
/**
12
* Utility class for creating Flink containers
13
*/
14
public class FlinkContainers {
15
16
/**
17
* Create JobManager container with default configuration
18
* @return Configured JobManager container
19
*/
20
public static FlinkContainer jobManager();
21
22
/**
23
* Create TaskManager container with default configuration
24
* @return Configured TaskManager container
25
*/
26
public static FlinkContainer taskManager();
27
28
/**
29
* Create complete Flink cluster (JobManager + TaskManagers)
30
* @return Configured cluster containers
31
*/
32
public static FlinkContainer cluster();
33
34
/**
35
* Create JobManager with custom configuration
36
* @param configuration Flink configuration
37
* @return Configured JobManager container
38
*/
39
public static FlinkContainer jobManager(Configuration configuration);
40
41
/**
42
* Create TaskManager with custom configuration
43
* @param configuration Flink configuration
44
* @return Configured TaskManager container
45
*/
46
public static FlinkContainer taskManager(Configuration configuration);
47
}
48
```
49
50
**Usage Examples:**
51
52
```java
53
// Create simple cluster
54
FlinkContainer cluster = FlinkContainers.cluster();
55
cluster.start();
56
57
// Create custom JobManager
58
Configuration config = new Configuration();
59
config.setString(JobManagerOptions.ADDRESS, "jobmanager");
60
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 4);
61
62
FlinkContainer jobManager = FlinkContainers.jobManager(config);
63
jobManager.start();
64
```
65
66
### Container Test Environment
67
68
TestContainers-based test environment for isolated testing.
69
70
```java { .api }
71
/**
72
* Test environment using Flink containers for isolated testing
73
*/
74
public class FlinkContainerTestEnvironment implements TestEnvironment, ClusterControllable {
75
76
/**
77
* Create container environment with default settings
78
*/
79
public FlinkContainerTestEnvironment();
80
81
/**
82
* Create container environment with custom settings
83
* @param settings Container configuration settings
84
*/
85
public FlinkContainerTestEnvironment(FlinkContainersSettings settings);
86
87
/**
88
* Create container environment with custom settings and testcontainers config
89
* @param settings Container configuration settings
90
* @param testcontainersSettings TestContainers configuration
91
*/
92
public FlinkContainerTestEnvironment(
93
FlinkContainersSettings settings,
94
TestcontainersSettings testcontainersSettings
95
);
96
97
@Override
98
public StreamExecutionEnvironment createExecutionEnvironment(TestEnvironmentSettings envOptions);
99
100
@Override
101
public Endpoint getRestEndpoint();
102
103
@Override
104
public String getCheckpointUri();
105
106
@Override
107
public void startUp() throws Exception;
108
109
@Override
110
public void tearDown() throws Exception;
111
112
/**
113
* Trigger TaskManager failover by stopping and restarting TaskManager containers
114
* @param jobClient Current job client
115
* @param afterFailAction Action to execute after triggering failover
116
*/
117
@Override
118
public void triggerTaskManagerFailover(JobClient jobClient, Runnable afterFailAction) throws Exception;
119
}
120
```
121
122
**Usage Examples:**
123
124
```java
125
// Default container environment
126
@TestEnv
127
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment();
128
129
// Custom configuration
130
@TestEnv
131
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
132
FlinkContainersSettings.builder()
133
.setNumTaskManagers(3)
134
.setNumSlotsPerTaskManager(2)
135
.setJobManagerMemory("2g")
136
.setTaskManagerMemory("1g")
137
.build()
138
);
139
140
// With TestContainers settings
141
@TestEnv
142
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
143
FlinkContainersSettings.builder().build(),
144
TestcontainersSettings.builder()
145
.setNetwork(Network.newNetwork())
146
.build()
147
);
148
```
149
150
### Image Builder
151
152
Utility for building custom Flink images with connector JARs.
153
154
```java { .api }
155
/**
156
* Builder for custom Flink Docker images with connector JARs
157
*/
158
public class FlinkImageBuilder {
159
160
/**
161
* Build custom Flink image with connector JARs
162
* @param jarPaths List of JAR file URLs to include in image
163
* @return Docker image name for the built image
164
* @throws ImageBuildException if image build fails
165
*/
166
public static DockerImageName buildImage(List<URL> jarPaths) throws ImageBuildException;
167
168
/**
169
* Build custom Flink image with connector JARs and base image
170
* @param baseImage Base Flink image to extend
171
* @param jarPaths List of JAR file URLs to include in image
172
* @return Docker image name for the built image
173
* @throws ImageBuildException if image build fails
174
*/
175
public static DockerImageName buildImage(DockerImageName baseImage, List<URL> jarPaths) throws ImageBuildException;
176
177
/**
178
* Build custom Flink image with connector JARs and additional configuration
179
* @param baseImage Base Flink image to extend
180
* @param jarPaths List of JAR file URLs to include in image
181
* @param additionalFiles Additional files to copy into image
182
* @return Docker image name for the built image
183
* @throws ImageBuildException if image build fails
184
*/
185
public static DockerImageName buildImage(
186
DockerImageName baseImage,
187
List<URL> jarPaths,
188
Map<String, String> additionalFiles
189
) throws ImageBuildException;
190
}
191
192
/**
193
* Exception thrown when Docker image build fails
194
*/
195
public class ImageBuildException extends Exception {
196
public ImageBuildException(String message);
197
public ImageBuildException(String message, Throwable cause);
198
}
199
```
200
201
**Usage Examples:**
202
203
```java
204
// Build image with connector JARs
205
List<URL> connectorJars = Arrays.asList(
206
new File("target/my-connector.jar").toURI().toURL(),
207
new File("lib/dependency.jar").toURI().toURL()
208
);
209
210
try {
211
DockerImageName customImage = FlinkImageBuilder.buildImage(connectorJars);
212
213
// Use custom image in containers
214
FlinkContainer jobManager = FlinkContainers.jobManager()
215
.withDockerImageName(customImage);
216
217
} catch (ImageBuildException e) {
218
throw new TestAbortedException("Failed to build custom Flink image", e);
219
}
220
```
221
222
### TestContainers Configurator
223
224
Interface for configuring TestContainers behavior.
225
226
```java { .api }
227
/**
228
* Interface for configuring Flink TestContainers
229
*/
230
public interface FlinkTestcontainersConfigurator {
231
232
/**
233
* Configure JobManager container
234
* @param jobManager JobManager container to configure
235
*/
236
void configureJobManager(GenericContainer<?> jobManager);
237
238
/**
239
* Configure TaskManager container
240
* @param taskManager TaskManager container to configure
241
*/
242
void configureTaskManager(GenericContainer<?> taskManager);
243
244
/**
245
* Configure network settings
246
* @param network Network to configure
247
*/
248
void configureNetwork(Network network);
249
}
250
```
251
252
**Usage Examples:**
253
254
```java
255
public class CustomFlinkConfigurator implements FlinkTestcontainersConfigurator {
256
257
@Override
258
public void configureJobManager(GenericContainer<?> jobManager) {
259
jobManager
260
.withEnv("JVM_ARGS", "-Xmx2g -Xms2g")
261
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("JobManager")))
262
.withStartupTimeout(Duration.ofMinutes(5));
263
}
264
265
@Override
266
public void configureTaskManager(GenericContainer<?> taskManager) {
267
taskManager
268
.withEnv("JVM_ARGS", "-Xmx1g -Xms1g")
269
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("TaskManager")))
270
.withStartupTimeout(Duration.ofMinutes(3));
271
}
272
273
@Override
274
public void configureNetwork(Network network) {
275
// Custom network configuration
276
}
277
}
278
279
// Use custom configurator
280
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
281
FlinkContainersSettings.builder()
282
.setConfigurator(new CustomFlinkConfigurator())
283
.build()
284
);
285
```
286
287
## Configuration
288
289
### Flink Container Settings
290
291
Configuration for Flink container cluster setup.
292
293
```java { .api }
294
/**
295
* Configuration settings for Flink container environments
296
*/
297
public class FlinkContainersSettings {
298
299
public static Builder builder();
300
301
public static class Builder {
302
/**
303
* Set number of TaskManager containers to start
304
* @param numTaskManagers Number of TaskManager containers (default: 1)
305
* @return Builder instance
306
*/
307
public Builder setNumTaskManagers(int numTaskManagers);
308
309
/**
310
* Set number of slots per TaskManager container
311
* @param numSlotsPerTaskManager Task slots per TaskManager (default: 2)
312
* @return Builder instance
313
*/
314
public Builder setNumSlotsPerTaskManager(int numSlotsPerTaskManager);
315
316
/**
317
* Set JobManager memory allocation
318
* @param jobManagerMemory Memory setting (e.g., "1g", "512m") (default: "1g")
319
* @return Builder instance
320
*/
321
public Builder setJobManagerMemory(String jobManagerMemory);
322
323
/**
324
* Set TaskManager memory allocation
325
* @param taskManagerMemory Memory setting (e.g., "1g", "512m") (default: "1g")
326
* @return Builder instance
327
*/
328
public Builder setTaskManagerMemory(String taskManagerMemory);
329
330
/**
331
* Set base Docker image for containers
332
* @param baseImage Docker image name (default: "flink:1.18")
333
* @return Builder instance
334
*/
335
public Builder setBaseImage(DockerImageName baseImage);
336
337
/**
338
* Set custom TestContainers configurator
339
* @param configurator Custom configurator for container setup
340
* @return Builder instance
341
*/
342
public Builder setConfigurator(FlinkTestcontainersConfigurator configurator);
343
344
/**
345
* Enable/disable checkpoint recovery testing
346
* @param enableCheckpointRecovery Enable checkpoint recovery (default: true)
347
* @return Builder instance
348
*/
349
public Builder setEnableCheckpointRecovery(boolean enableCheckpointRecovery);
350
351
/**
352
* Build configured settings
353
* @return FlinkContainersSettings instance
354
*/
355
public FlinkContainersSettings build();
356
}
357
358
public int getNumTaskManagers();
359
public int getNumSlotsPerTaskManager();
360
public String getJobManagerMemory();
361
public String getTaskManagerMemory();
362
public DockerImageName getBaseImage();
363
public Optional<FlinkTestcontainersConfigurator> getConfigurator();
364
public boolean isCheckpointRecoveryEnabled();
365
}
366
```
367
368
### TestContainers Settings
369
370
General TestContainers configuration settings.
371
372
```java { .api }
373
/**
374
* General TestContainers configuration settings
375
*/
376
public class TestcontainersSettings {
377
378
public static Builder builder();
379
380
public static class Builder {
381
/**
382
* Set Docker network for container communication
383
* @param network Shared network for containers
384
* @return Builder instance
385
*/
386
public Builder setNetwork(Network network);
387
388
/**
389
* Set log consumers for container output
390
* @param logConsumers Map of container name to log consumer
391
* @return Builder instance
392
*/
393
public Builder setLogConsumers(Map<String, Consumer<OutputFrame>> logConsumers);
394
395
/**
396
* Set startup timeout for containers
397
* @param startupTimeout Maximum time to wait for container startup
398
* @return Builder instance
399
*/
400
public Builder setStartupTimeout(Duration startupTimeout);
401
402
/**
403
* Set container registry configuration
404
* @param registryConfig Docker registry configuration
405
* @return Builder instance
406
*/
407
public Builder setRegistryConfig(DockerClientConfig registryConfig);
408
409
/**
410
* Enable/disable container reuse between tests
411
* @param reuseContainers Enable container reuse (default: false)
412
* @return Builder instance
413
*/
414
public Builder setReuseContainers(boolean reuseContainers);
415
416
/**
417
* Build configured settings
418
* @return TestcontainersSettings instance
419
*/
420
public TestcontainersSettings build();
421
}
422
423
public Optional<Network> getNetwork();
424
public Map<String, Consumer<OutputFrame>> getLogConsumers();
425
public Duration getStartupTimeout();
426
public Optional<DockerClientConfig> getRegistryConfig();
427
public boolean isReuseContainers();
428
}
429
```
430
431
## Advanced Usage Patterns
432
433
### Multi-Container Test Setup
434
435
Set up complex test scenarios with multiple external systems.
436
437
```java
438
public class ComplexConnectorTestSuite extends SinkTestSuiteBase<String> {
439
440
// Shared network for all containers
441
private static final Network testNetwork = Network.newNetwork();
442
443
@TestEnv
444
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
445
FlinkContainersSettings.builder()
446
.setNumTaskManagers(2)
447
.setNumSlotsPerTaskManager(2)
448
.build(),
449
TestcontainersSettings.builder()
450
.setNetwork(testNetwork)
451
.setLogConsumers(Map.of(
452
"jobmanager", new Slf4jLogConsumer(LoggerFactory.getLogger("JobManager")),
453
"taskmanager", new Slf4jLogConsumer(LoggerFactory.getLogger("TaskManager"))
454
))
455
.build()
456
);
457
458
@TestContext
459
ExternalContextFactory<KafkaExternalContext> kafkaContextFactory = testName ->
460
new KafkaExternalContext(testName, testNetwork);
461
462
@TestContext
463
ExternalContextFactory<DatabaseExternalContext> dbContextFactory = testName ->
464
new DatabaseExternalContext(testName, testNetwork);
465
}
466
```
467
468
### Custom Image Building
469
470
Build custom Flink images for specific testing scenarios.
471
472
```java
473
public class CustomImageTestSuite extends SinkTestSuiteBase<String> {
474
475
private static DockerImageName customFlinkImage;
476
477
@BeforeAll
478
static void buildCustomImage() throws Exception {
479
List<URL> connectorJars = Arrays.asList(
480
new File("target/my-connector.jar").toURI().toURL(),
481
new File("lib/kafka-clients.jar").toURI().toURL(),
482
new File("lib/commons-lang3.jar").toURI().toURL()
483
);
484
485
Map<String, String> additionalFiles = Map.of(
486
"conf/log4j.properties", "/opt/flink/conf/log4j.properties",
487
"conf/flink-conf.yaml", "/opt/flink/conf/flink-conf.yaml"
488
);
489
490
customFlinkImage = FlinkImageBuilder.buildImage(
491
DockerImageName.parse("flink:1.18-java11"),
492
connectorJars,
493
additionalFiles
494
);
495
}
496
497
@TestEnv
498
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
499
FlinkContainersSettings.builder()
500
.setBaseImage(customFlinkImage)
501
.build()
502
);
503
}
504
```
505
506
### Failure Testing with Containers
507
508
Implement failure testing using container lifecycle control.
509
510
```java
511
public class FailoverTestSuite extends SourceTestSuiteBase<String> {
512
513
@TestEnv
514
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
515
FlinkContainersSettings.builder()
516
.setNumTaskManagers(3) // Multiple TaskManagers for failover testing
517
.setEnableCheckpointRecovery(true)
518
.build()
519
);
520
521
@TestTemplate
522
public void testTaskManagerFailover(
523
TestEnvironment testEnv,
524
DataStreamSourceExternalContext<String> externalContext,
525
ClusterControllable controller,
526
CheckpointingMode semantic
527
) throws Exception {
528
529
// Start job and validate initial results
530
JobClient jobClient = startTestJob(testEnv, externalContext, semantic);
531
validateInitialResults(jobClient);
532
533
// Trigger TaskManager failure
534
controller.triggerTaskManagerFailover(jobClient, () -> {
535
// Actions after failure triggered
536
LOG.info("TaskManager failure triggered, waiting for recovery...");
537
});
538
539
// Validate recovery and continued processing
540
validateRecoveryResults(jobClient);
541
}
542
}
543
```
544
545
### Resource Monitoring
546
547
Monitor container resource usage during tests.
548
549
```java
550
public class PerformanceTestSuite extends SinkTestSuiteBase<String> {
551
552
@TestEnv
553
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
554
FlinkContainersSettings.builder()
555
.setJobManagerMemory("2g")
556
.setTaskManagerMemory("4g")
557
.setConfigurator(new ResourceMonitoringConfigurator())
558
.build()
559
);
560
561
private static class ResourceMonitoringConfigurator implements FlinkTestcontainersConfigurator {
562
563
@Override
564
public void configureJobManager(GenericContainer<?> jobManager) {
565
jobManager
566
.withCreateContainerCmdModifier(cmd -> {
567
// Set memory limits
568
cmd.getHostConfig()
569
.withMemory(2L * 1024 * 1024 * 1024) // 2GB
570
.withCpuQuota(100000L); // 1 CPU
571
})
572
.withLogConsumer(new ResourceUsageLogConsumer("JobManager"));
573
}
574
575
@Override
576
public void configureTaskManager(GenericContainer<?> taskManager) {
577
taskManager
578
.withCreateContainerCmdModifier(cmd -> {
579
// Set memory limits
580
cmd.getHostConfig()
581
.withMemory(4L * 1024 * 1024 * 1024) // 4GB
582
.withCpuQuota(200000L); // 2 CPUs
583
})
584
.withLogConsumer(new ResourceUsageLogConsumer("TaskManager"));
585
}
586
587
@Override
588
public void configureNetwork(Network network) {
589
// Network configuration
590
}
591
}
592
}
593
```
594
595
## Best Practices
596
597
### Resource Management
598
599
```java
600
// Use try-with-resources for automatic cleanup
601
try (Network network = Network.newNetwork()) {
602
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
603
FlinkContainersSettings.builder().build(),
604
TestcontainersSettings.builder().setNetwork(network).build()
605
);
606
607
// Test execution
608
} // Network automatically closed
609
```
610
611
### Performance Optimization
612
613
```java
614
// Reuse containers when possible
615
TestcontainersSettings settings = TestcontainersSettings.builder()
616
.setReuseContainers(true) // Enable container reuse
617
.setStartupTimeout(Duration.ofMinutes(2)) // Reasonable timeout
618
.build();
619
620
// Use appropriate resource allocation
621
FlinkContainersSettings flinkSettings = FlinkContainersSettings.builder()
622
.setJobManagerMemory("1g") // Don't over-allocate for simple tests
623
.setTaskManagerMemory("1g")
624
.setNumTaskManagers(1) // Start with minimal cluster
625
.build();
626
```
627
628
### Error Handling
629
630
```java
631
@TestEnv
632
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment() {
633
@Override
634
public void startUp() throws Exception {
635
try {
636
super.startUp();
637
} catch (Exception e) {
638
// Handle Docker not available
639
throw new TestAbortedException("Docker not available for container tests", e);
640
}
641
}
642
};
643
```
644
645
### CI/CD Integration
646
647
```yaml
648
# GitHub Actions example
649
jobs:
650
container-tests:
651
runs-on: ubuntu-latest
652
services:
653
docker:
654
image: docker:20.10
655
options: --privileged
656
steps:
657
- uses: actions/checkout@v3
658
- name: Set up JDK 11
659
uses: actions/setup-java@v3
660
with:
661
java-version: '11'
662
- name: Run container tests
663
run: ./mvnw test -Dtest=**/*ContainerTest*
664
```