or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

assertions.mdcontainers.mdexternal-systems.mdindex.mdjunit-integration.mdmetrics.mdtest-environments.mdtest-suites.md

containers.mddocs/

0

# Container Support

1

2

The container support provides TestContainers integration for running tests in isolated containerized environments with custom Flink clusters. This enables realistic testing scenarios with proper network isolation and resource constraints.

3

4

## Capabilities

5

6

### Flink Containers

7

8

Utilities for creating and managing Flink containers using TestContainers.

9

10

```java { .api }

11

/**

12

* Utility class for creating Flink containers

13

*/

14

public class FlinkContainers {

15

16

/**

17

* Create JobManager container with default configuration

18

* @return Configured JobManager container

19

*/

20

public static FlinkContainer jobManager();

21

22

/**

23

* Create TaskManager container with default configuration

24

* @return Configured TaskManager container

25

*/

26

public static FlinkContainer taskManager();

27

28

/**

29

* Create complete Flink cluster (JobManager + TaskManagers)

30

* @return Configured cluster containers

31

*/

32

public static FlinkContainer cluster();

33

34

/**

35

* Create JobManager with custom configuration

36

* @param configuration Flink configuration

37

* @return Configured JobManager container

38

*/

39

public static FlinkContainer jobManager(Configuration configuration);

40

41

/**

42

* Create TaskManager with custom configuration

43

* @param configuration Flink configuration

44

* @return Configured TaskManager container

45

*/

46

public static FlinkContainer taskManager(Configuration configuration);

47

}

48

```

49

50

**Usage Examples:**

51

52

```java

53

// Create simple cluster

54

FlinkContainer cluster = FlinkContainers.cluster();

55

cluster.start();

56

57

// Create custom JobManager

58

Configuration config = new Configuration();

59

config.setString(JobManagerOptions.ADDRESS, "jobmanager");

60

config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 4);

61

62

FlinkContainer jobManager = FlinkContainers.jobManager(config);

63

jobManager.start();

64

```

65

66

### Container Test Environment

67

68

TestContainers-based test environment for isolated testing.

69

70

```java { .api }

71

/**

72

* Test environment using Flink containers for isolated testing

73

*/

74

public class FlinkContainerTestEnvironment implements TestEnvironment, ClusterControllable {

75

76

/**

77

* Create container environment with default settings

78

*/

79

public FlinkContainerTestEnvironment();

80

81

/**

82

* Create container environment with custom settings

83

* @param settings Container configuration settings

84

*/

85

public FlinkContainerTestEnvironment(FlinkContainersSettings settings);

86

87

/**

88

* Create container environment with custom settings and testcontainers config

89

* @param settings Container configuration settings

90

* @param testcontainersSettings TestContainers configuration

91

*/

92

public FlinkContainerTestEnvironment(

93

FlinkContainersSettings settings,

94

TestcontainersSettings testcontainersSettings

95

);

96

97

@Override

98

public StreamExecutionEnvironment createExecutionEnvironment(TestEnvironmentSettings envOptions);

99

100

@Override

101

public Endpoint getRestEndpoint();

102

103

@Override

104

public String getCheckpointUri();

105

106

@Override

107

public void startUp() throws Exception;

108

109

@Override

110

public void tearDown() throws Exception;

111

112

/**

113

* Trigger TaskManager failover by stopping and restarting TaskManager containers

114

* @param jobClient Current job client

115

* @param afterFailAction Action to execute after triggering failover

116

*/

117

@Override

118

public void triggerTaskManagerFailover(JobClient jobClient, Runnable afterFailAction) throws Exception;

119

}

120

```

121

122

**Usage Examples:**

123

124

```java

125

// Default container environment

126

@TestEnv

127

FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment();

128

129

// Custom configuration

130

@TestEnv

131

FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(

132

FlinkContainersSettings.builder()

133

.setNumTaskManagers(3)

134

.setNumSlotsPerTaskManager(2)

135

.setJobManagerMemory("2g")

136

.setTaskManagerMemory("1g")

137

.build()

138

);

139

140

// With TestContainers settings

141

@TestEnv

142

FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(

143

FlinkContainersSettings.builder().build(),

144

TestcontainersSettings.builder()

145

.setNetwork(Network.newNetwork())

146

.build()

147

);

148

```

149

150

### Image Builder

151

152

Utility for building custom Flink images with connector JARs.

153

154

```java { .api }

155

/**

156

* Builder for custom Flink Docker images with connector JARs

157

*/

158

public class FlinkImageBuilder {

159

160

/**

161

* Build custom Flink image with connector JARs

162

* @param jarPaths List of JAR file URLs to include in image

163

* @return Docker image name for the built image

164

* @throws ImageBuildException if image build fails

165

*/

166

public static DockerImageName buildImage(List<URL> jarPaths) throws ImageBuildException;

167

168

/**

169

* Build custom Flink image with connector JARs and base image

170

* @param baseImage Base Flink image to extend

171

* @param jarPaths List of JAR file URLs to include in image

172

* @return Docker image name for the built image

173

* @throws ImageBuildException if image build fails

174

*/

175

public static DockerImageName buildImage(DockerImageName baseImage, List<URL> jarPaths) throws ImageBuildException;

176

177

/**

178

* Build custom Flink image with connector JARs and additional configuration

179

* @param baseImage Base Flink image to extend

180

* @param jarPaths List of JAR file URLs to include in image

181

* @param additionalFiles Additional files to copy into image

182

* @return Docker image name for the built image

183

* @throws ImageBuildException if image build fails

184

*/

185

public static DockerImageName buildImage(

186

DockerImageName baseImage,

187

List<URL> jarPaths,

188

Map<String, String> additionalFiles

189

) throws ImageBuildException;

190

}

191

192

/**

193

* Exception thrown when Docker image build fails

194

*/

195

public class ImageBuildException extends Exception {

196

public ImageBuildException(String message);

197

public ImageBuildException(String message, Throwable cause);

198

}

199

```

200

201

**Usage Examples:**

202

203

```java

204

// Build image with connector JARs

205

List<URL> connectorJars = Arrays.asList(

206

new File("target/my-connector.jar").toURI().toURL(),

207

new File("lib/dependency.jar").toURI().toURL()

208

);

209

210

try {

211

DockerImageName customImage = FlinkImageBuilder.buildImage(connectorJars);

212

213

// Use custom image in containers

214

FlinkContainer jobManager = FlinkContainers.jobManager()

215

.withDockerImageName(customImage);

216

217

} catch (ImageBuildException e) {

218

throw new TestAbortedException("Failed to build custom Flink image", e);

219

}

220

```

221

222

### TestContainers Configurator

223

224

Interface for configuring TestContainers behavior.

225

226

```java { .api }

227

/**

228

* Interface for configuring Flink TestContainers

229

*/

230

public interface FlinkTestcontainersConfigurator {

231

232

/**

233

* Configure JobManager container

234

* @param jobManager JobManager container to configure

235

*/

236

void configureJobManager(GenericContainer<?> jobManager);

237

238

/**

239

* Configure TaskManager container

240

* @param taskManager TaskManager container to configure

241

*/

242

void configureTaskManager(GenericContainer<?> taskManager);

243

244

/**

245

* Configure network settings

246

* @param network Network to configure

247

*/

248

void configureNetwork(Network network);

249

}

250

```

251

252

**Usage Examples:**

253

254

```java

255

public class CustomFlinkConfigurator implements FlinkTestcontainersConfigurator {

256

257

@Override

258

public void configureJobManager(GenericContainer<?> jobManager) {

259

jobManager

260

.withEnv("JVM_ARGS", "-Xmx2g -Xms2g")

261

.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("JobManager")))

262

.withStartupTimeout(Duration.ofMinutes(5));

263

}

264

265

@Override

266

public void configureTaskManager(GenericContainer<?> taskManager) {

267

taskManager

268

.withEnv("JVM_ARGS", "-Xmx1g -Xms1g")

269

.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("TaskManager")))

270

.withStartupTimeout(Duration.ofMinutes(3));

271

}

272

273

@Override

274

public void configureNetwork(Network network) {

275

// Custom network configuration

276

}

277

}

278

279

// Use custom configurator

280

FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(

281

FlinkContainersSettings.builder()

282

.setConfigurator(new CustomFlinkConfigurator())

283

.build()

284

);

285

```

286

287

## Configuration

288

289

### Flink Container Settings

290

291

Configuration for Flink container cluster setup.

292

293

```java { .api }

294

/**

295

* Configuration settings for Flink container environments

296

*/

297

public class FlinkContainersSettings {

298

299

public static Builder builder();

300

301

public static class Builder {

302

/**

303

* Set number of TaskManager containers to start

304

* @param numTaskManagers Number of TaskManager containers (default: 1)

305

* @return Builder instance

306

*/

307

public Builder setNumTaskManagers(int numTaskManagers);

308

309

/**

310

* Set number of slots per TaskManager container

311

* @param numSlotsPerTaskManager Task slots per TaskManager (default: 2)

312

* @return Builder instance

313

*/

314

public Builder setNumSlotsPerTaskManager(int numSlotsPerTaskManager);

315

316

/**

317

* Set JobManager memory allocation

318

* @param jobManagerMemory Memory setting (e.g., "1g", "512m") (default: "1g")

319

* @return Builder instance

320

*/

321

public Builder setJobManagerMemory(String jobManagerMemory);

322

323

/**

324

* Set TaskManager memory allocation

325

* @param taskManagerMemory Memory setting (e.g., "1g", "512m") (default: "1g")

326

* @return Builder instance

327

*/

328

public Builder setTaskManagerMemory(String taskManagerMemory);

329

330

/**

331

* Set base Docker image for containers

332

* @param baseImage Docker image name (default: "flink:1.18")

333

* @return Builder instance

334

*/

335

public Builder setBaseImage(DockerImageName baseImage);

336

337

/**

338

* Set custom TestContainers configurator

339

* @param configurator Custom configurator for container setup

340

* @return Builder instance

341

*/

342

public Builder setConfigurator(FlinkTestcontainersConfigurator configurator);

343

344

/**

345

* Enable/disable checkpoint recovery testing

346

* @param enableCheckpointRecovery Enable checkpoint recovery (default: true)

347

* @return Builder instance

348

*/

349

public Builder setEnableCheckpointRecovery(boolean enableCheckpointRecovery);

350

351

/**

352

* Build configured settings

353

* @return FlinkContainersSettings instance

354

*/

355

public FlinkContainersSettings build();

356

}

357

358

public int getNumTaskManagers();

359

public int getNumSlotsPerTaskManager();

360

public String getJobManagerMemory();

361

public String getTaskManagerMemory();

362

public DockerImageName getBaseImage();

363

public Optional<FlinkTestcontainersConfigurator> getConfigurator();

364

public boolean isCheckpointRecoveryEnabled();

365

}

366

```

367

368

### TestContainers Settings

369

370

General TestContainers configuration settings.

371

372

```java { .api }

373

/**

374

* General TestContainers configuration settings

375

*/

376

public class TestcontainersSettings {

377

378

public static Builder builder();

379

380

public static class Builder {

381

/**

382

* Set Docker network for container communication

383

* @param network Shared network for containers

384

* @return Builder instance

385

*/

386

public Builder setNetwork(Network network);

387

388

/**

389

* Set log consumers for container output

390

* @param logConsumers Map of container name to log consumer

391

* @return Builder instance

392

*/

393

public Builder setLogConsumers(Map<String, Consumer<OutputFrame>> logConsumers);

394

395

/**

396

* Set startup timeout for containers

397

* @param startupTimeout Maximum time to wait for container startup

398

* @return Builder instance

399

*/

400

public Builder setStartupTimeout(Duration startupTimeout);

401

402

/**

403

* Set container registry configuration

404

* @param registryConfig Docker registry configuration

405

* @return Builder instance

406

*/

407

public Builder setRegistryConfig(DockerClientConfig registryConfig);

408

409

/**

410

* Enable/disable container reuse between tests

411

* @param reuseContainers Enable container reuse (default: false)

412

* @return Builder instance

413

*/

414

public Builder setReuseContainers(boolean reuseContainers);

415

416

/**

417

* Build configured settings

418

* @return TestcontainersSettings instance

419

*/

420

public TestcontainersSettings build();

421

}

422

423

public Optional<Network> getNetwork();

424

public Map<String, Consumer<OutputFrame>> getLogConsumers();

425

public Duration getStartupTimeout();

426

public Optional<DockerClientConfig> getRegistryConfig();

427

public boolean isReuseContainers();

428

}

429

```

430

431

## Advanced Usage Patterns

432

433

### Multi-Container Test Setup

434

435

Set up complex test scenarios with multiple external systems.

436

437

```java

438

public class ComplexConnectorTestSuite extends SinkTestSuiteBase<String> {

439

440

// Shared network for all containers

441

private static final Network testNetwork = Network.newNetwork();

442

443

@TestEnv

444

FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(

445

FlinkContainersSettings.builder()

446

.setNumTaskManagers(2)

447

.setNumSlotsPerTaskManager(2)

448

.build(),

449

TestcontainersSettings.builder()

450

.setNetwork(testNetwork)

451

.setLogConsumers(Map.of(

452

"jobmanager", new Slf4jLogConsumer(LoggerFactory.getLogger("JobManager")),

453

"taskmanager", new Slf4jLogConsumer(LoggerFactory.getLogger("TaskManager"))

454

))

455

.build()

456

);

457

458

@TestContext

459

ExternalContextFactory<KafkaExternalContext> kafkaContextFactory = testName ->

460

new KafkaExternalContext(testName, testNetwork);

461

462

@TestContext

463

ExternalContextFactory<DatabaseExternalContext> dbContextFactory = testName ->

464

new DatabaseExternalContext(testName, testNetwork);

465

}

466

```

467

468

### Custom Image Building

469

470

Build custom Flink images for specific testing scenarios.

471

472

```java

473

public class CustomImageTestSuite extends SinkTestSuiteBase<String> {

474

475

private static DockerImageName customFlinkImage;

476

477

@BeforeAll

478

static void buildCustomImage() throws Exception {

479

List<URL> connectorJars = Arrays.asList(

480

new File("target/my-connector.jar").toURI().toURL(),

481

new File("lib/kafka-clients.jar").toURI().toURL(),

482

new File("lib/commons-lang3.jar").toURI().toURL()

483

);

484

485

Map<String, String> additionalFiles = Map.of(

486

"conf/log4j.properties", "/opt/flink/conf/log4j.properties",

487

"conf/flink-conf.yaml", "/opt/flink/conf/flink-conf.yaml"

488

);

489

490

customFlinkImage = FlinkImageBuilder.buildImage(

491

DockerImageName.parse("flink:1.18-java11"),

492

connectorJars,

493

additionalFiles

494

);

495

}

496

497

@TestEnv

498

FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(

499

FlinkContainersSettings.builder()

500

.setBaseImage(customFlinkImage)

501

.build()

502

);

503

}

504

```

505

506

### Failure Testing with Containers

507

508

Implement failure testing using container lifecycle control.

509

510

```java

511

public class FailoverTestSuite extends SourceTestSuiteBase<String> {

512

513

@TestEnv

514

FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(

515

FlinkContainersSettings.builder()

516

.setNumTaskManagers(3) // Multiple TaskManagers for failover testing

517

.setEnableCheckpointRecovery(true)

518

.build()

519

);

520

521

@TestTemplate

522

public void testTaskManagerFailover(

523

TestEnvironment testEnv,

524

DataStreamSourceExternalContext<String> externalContext,

525

ClusterControllable controller,

526

CheckpointingMode semantic

527

) throws Exception {

528

529

// Start job and validate initial results

530

JobClient jobClient = startTestJob(testEnv, externalContext, semantic);

531

validateInitialResults(jobClient);

532

533

// Trigger TaskManager failure

534

controller.triggerTaskManagerFailover(jobClient, () -> {

535

// Actions after failure triggered

536

LOG.info("TaskManager failure triggered, waiting for recovery...");

537

});

538

539

// Validate recovery and continued processing

540

validateRecoveryResults(jobClient);

541

}

542

}

543

```

544

545

### Resource Monitoring

546

547

Monitor container resource usage during tests.

548

549

```java

550

public class PerformanceTestSuite extends SinkTestSuiteBase<String> {

551

552

@TestEnv

553

FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(

554

FlinkContainersSettings.builder()

555

.setJobManagerMemory("2g")

556

.setTaskManagerMemory("4g")

557

.setConfigurator(new ResourceMonitoringConfigurator())

558

.build()

559

);

560

561

private static class ResourceMonitoringConfigurator implements FlinkTestcontainersConfigurator {

562

563

@Override

564

public void configureJobManager(GenericContainer<?> jobManager) {

565

jobManager

566

.withCreateContainerCmdModifier(cmd -> {

567

// Set memory limits

568

cmd.getHostConfig()

569

.withMemory(2L * 1024 * 1024 * 1024) // 2GB

570

.withCpuQuota(100000L); // 1 CPU

571

})

572

.withLogConsumer(new ResourceUsageLogConsumer("JobManager"));

573

}

574

575

@Override

576

public void configureTaskManager(GenericContainer<?> taskManager) {

577

taskManager

578

.withCreateContainerCmdModifier(cmd -> {

579

// Set memory limits

580

cmd.getHostConfig()

581

.withMemory(4L * 1024 * 1024 * 1024) // 4GB

582

.withCpuQuota(200000L); // 2 CPUs

583

})

584

.withLogConsumer(new ResourceUsageLogConsumer("TaskManager"));

585

}

586

587

@Override

588

public void configureNetwork(Network network) {

589

// Network configuration

590

}

591

}

592

}

593

```

594

595

## Best Practices

596

597

### Resource Management

598

599

```java

600

// Use try-with-resources for automatic cleanup

601

try (Network network = Network.newNetwork()) {

602

FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(

603

FlinkContainersSettings.builder().build(),

604

TestcontainersSettings.builder().setNetwork(network).build()

605

);

606

607

// Test execution

608

} // Network automatically closed

609

```

610

611

### Performance Optimization

612

613

```java

614

// Reuse containers when possible

615

TestcontainersSettings settings = TestcontainersSettings.builder()

616

.setReuseContainers(true) // Enable container reuse

617

.setStartupTimeout(Duration.ofMinutes(2)) // Reasonable timeout

618

.build();

619

620

// Use appropriate resource allocation

621

FlinkContainersSettings flinkSettings = FlinkContainersSettings.builder()

622

.setJobManagerMemory("1g") // Don't over-allocate for simple tests

623

.setTaskManagerMemory("1g")

624

.setNumTaskManagers(1) // Start with minimal cluster

625

.build();

626

```

627

628

### Error Handling

629

630

```java

631

@TestEnv

632

FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment() {

633

@Override

634

public void startUp() throws Exception {

635

try {

636

super.startUp();

637

} catch (Exception e) {

638

// Handle Docker not available

639

throw new TestAbortedException("Docker not available for container tests", e);

640

}

641

}

642

};

643

```

644

645

### CI/CD Integration

646

647

```yaml

648

# GitHub Actions example

649

jobs:

650

container-tests:

651

runs-on: ubuntu-latest

652

services:

653

docker:

654

image: docker:20.10

655

options: --privileged

656

steps:

657

- uses: actions/checkout@v3

658

- name: Set up JDK 11

659

uses: actions/setup-java@v3

660

with:

661

java-version: '11'

662

- name: Run container tests

663

run: ./mvnw test -Dtest=**/*ContainerTest*

664

```