or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

assertions.mdcontainers.mdexternal-systems.mdindex.mdjunit-integration.mdmetrics.mdtest-environments.mdtest-suites.md

test-environments.mddocs/

0

# Test Environments

1

2

Test environments manage the Flink cluster lifecycle and provide execution contexts for connector tests. They abstract away the complexity of cluster management while supporting various deployment modes.

3

4

## Capabilities

5

6

### Test Environment Interface

7

8

Base interface for all test environment implementations.

9

10

```java { .api }

11

/**

12

* Test environment for running Flink jobs

13

* Manages Flink cluster lifecycle and provides execution context

14

*/

15

public interface TestEnvironment extends TestResource {

16

17

/**

18

* Create StreamExecutionEnvironment for job building and execution

19

* @param envOptions Environment configuration options

20

* @return Configured StreamExecutionEnvironment bound to this cluster

21

*/

22

StreamExecutionEnvironment createExecutionEnvironment(TestEnvironmentSettings envOptions);

23

24

/**

25

* Get REST endpoint for cluster communication

26

* @return Endpoint with address and port for REST API access

27

*/

28

Endpoint getRestEndpoint();

29

30

/**

31

* Get checkpoint/savepoint storage path

32

* @return URI string for checkpoint and savepoint storage

33

*/

34

String getCheckpointUri();

35

36

/**

37

* Endpoint configuration for REST API access

38

*/

39

class Endpoint {

40

public Endpoint(String address, int port);

41

public String getAddress();

42

public int getPort();

43

}

44

}

45

46

/**

47

* Base interface for test resource lifecycle management

48

*/

49

public interface TestResource {

50

/**

51

* Start up the test resource (idempotent operation)

52

* @throws Exception if startup fails

53

*/

54

void startUp() throws Exception;

55

56

/**

57

* Tear down the test resource

58

* Should handle cleanup even if startup never occurred

59

* @throws Exception if teardown fails

60

*/

61

void tearDown() throws Exception;

62

}

63

```

64

65

**Usage Examples:**

66

67

```java

68

// Environment registration

69

@TestEnv

70

MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();

71

72

// Using environment in test

73

StreamExecutionEnvironment env = testEnv.createExecutionEnvironment(

74

TestEnvironmentSettings.builder()

75

.setConnectorJarPaths(externalContext.getConnectorJarPaths())

76

.build()

77

);

78

```

79

80

### MiniCluster Test Environment

81

82

In-process test environment using Flink's MiniCluster for fast, lightweight testing.

83

84

```java { .api }

85

/**

86

* Test environment using Flink MiniCluster for in-process testing

87

*/

88

public class MiniClusterTestEnvironment implements TestEnvironment {

89

90

/**

91

* Create MiniCluster environment with default configuration

92

*/

93

public MiniClusterTestEnvironment();

94

95

/**

96

* Create MiniCluster environment with custom configuration

97

* @param miniClusterConfig MiniCluster configuration

98

*/

99

public MiniClusterTestEnvironment(MiniClusterConfiguration miniClusterConfig);

100

101

@Override

102

public StreamExecutionEnvironment createExecutionEnvironment(TestEnvironmentSettings envOptions);

103

104

@Override

105

public Endpoint getRestEndpoint();

106

107

@Override

108

public String getCheckpointUri();

109

110

@Override

111

public void startUp() throws Exception;

112

113

@Override

114

public void tearDown() throws Exception;

115

}

116

```

117

118

**Usage Examples:**

119

120

```java

121

// Default MiniCluster environment

122

@TestEnv

123

MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();

124

125

// Custom MiniCluster configuration

126

@TestEnv

127

MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment(

128

new MiniClusterConfiguration(

129

new Configuration(),

130

2, // number of task managers

131

2, // number of slots per task manager

132

ResourceID.generate(),

133

Time.minutes(10) // timeout

134

)

135

);

136

```

137

138

### Container Test Environment

139

140

Containerized test environment using Docker containers for isolated testing.

141

142

```java { .api }

143

/**

144

* Test environment using Flink containers for isolated testing

145

*/

146

public class FlinkContainerTestEnvironment implements TestEnvironment {

147

148

/**

149

* Create container environment with specified settings

150

* @param settings Container configuration settings

151

*/

152

public FlinkContainerTestEnvironment(FlinkContainersSettings settings);

153

154

/**

155

* Create container environment with default settings

156

*/

157

public FlinkContainerTestEnvironment();

158

159

@Override

160

public StreamExecutionEnvironment createExecutionEnvironment(TestEnvironmentSettings envOptions);

161

162

@Override

163

public Endpoint getRestEndpoint();

164

165

@Override

166

public String getCheckpointUri();

167

168

@Override

169

public void startUp() throws Exception;

170

171

@Override

172

public void tearDown() throws Exception;

173

}

174

```

175

176

**Usage Examples:**

177

178

```java

179

// Default container environment

180

@TestEnv

181

FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment();

182

183

// Custom container configuration

184

@TestEnv

185

FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(

186

FlinkContainersSettings.builder()

187

.setNumTaskManagers(2)

188

.setNumSlotsPerTaskManager(2)

189

.setJobManagerMemory("1g")

190

.setTaskManagerMemory("1g")

191

.build()

192

);

193

```

194

195

### Cluster Controllable Interface

196

197

Interface for environments that support cluster control operations like failover simulation.

198

199

```java { .api }

200

/**

201

* Interface for test environments that support cluster control operations

202

*/

203

public interface ClusterControllable {

204

205

/**

206

* Trigger TaskManager failover during test execution

207

* @param jobClient Current job client

208

* @param afterFailAction Action to execute after triggering failover

209

* @throws Exception if failover cannot be triggered

210

*/

211

void triggerTaskManagerFailover(JobClient jobClient, Runnable afterFailAction) throws Exception;

212

}

213

```

214

215

**Usage Examples:**

216

217

```java

218

// In test method that supports cluster controllable

219

@TestTemplate

220

public void testTaskManagerFailure(

221

TestEnvironment testEnv,

222

DataStreamSourceExternalContext<T> externalContext,

223

ClusterControllable controller, // Injected when environment supports it

224

CheckpointingMode semantic

225

) throws Exception {

226

// Test implementation uses controller to trigger failover

227

controller.triggerTaskManagerFailover(jobClient, () -> {

228

// Actions to perform after failover is triggered

229

});

230

}

231

```

232

233

## Configuration

234

235

### Test Environment Settings

236

237

Configuration for test environment behavior and job submission.

238

239

```java { .api }

240

/**

241

* Configuration settings for test environment setup

242

*/

243

public class TestEnvironmentSettings {

244

245

public static Builder builder();

246

247

public static class Builder {

248

/**

249

* Set connector JAR paths to attach to jobs

250

* @param connectorJarPaths List of connector JAR URLs

251

* @return Builder instance

252

*/

253

public Builder setConnectorJarPaths(List<URL> connectorJarPaths);

254

255

/**

256

* Set savepoint path for job restoration

257

* @param savepointRestorePath Path to savepoint for job restart

258

* @return Builder instance

259

*/

260

public Builder setSavepointRestorePath(String savepointRestorePath);

261

262

/**

263

* Build the settings instance

264

* @return Configured TestEnvironmentSettings

265

*/

266

public TestEnvironmentSettings build();

267

}

268

269

public List<URL> getConnectorJarPaths();

270

public Optional<String> getSavepointRestorePath();

271

}

272

```

273

274

**Usage Examples:**

275

276

```java

277

// Basic environment settings

278

TestEnvironmentSettings settings = TestEnvironmentSettings.builder()

279

.setConnectorJarPaths(externalContext.getConnectorJarPaths())

280

.build();

281

282

// Settings with savepoint restoration

283

TestEnvironmentSettings restartSettings = TestEnvironmentSettings.builder()

284

.setConnectorJarPaths(externalContext.getConnectorJarPaths())

285

.setSavepointRestorePath("/path/to/savepoint")

286

.build();

287

288

StreamExecutionEnvironment env = testEnv.createExecutionEnvironment(settings);

289

```

290

291

### Container Settings

292

293

Configuration for containerized test environments.

294

295

```java { .api }

296

/**

297

* Configuration settings for Flink container environments

298

*/

299

public class FlinkContainersSettings {

300

301

public static Builder builder();

302

303

public static class Builder {

304

/**

305

* Set number of TaskManager containers

306

* @param numTaskManagers Number of TaskManager containers to start

307

* @return Builder instance

308

*/

309

public Builder setNumTaskManagers(int numTaskManagers);

310

311

/**

312

* Set number of slots per TaskManager

313

* @param numSlotsPerTaskManager Slots per TaskManager container

314

* @return Builder instance

315

*/

316

public Builder setNumSlotsPerTaskManager(int numSlotsPerTaskManager);

317

318

/**

319

* Set JobManager memory allocation

320

* @param jobManagerMemory Memory allocation (e.g., "1g", "512m")

321

* @return Builder instance

322

*/

323

public Builder setJobManagerMemory(String jobManagerMemory);

324

325

/**

326

* Set TaskManager memory allocation

327

* @param taskManagerMemory Memory allocation (e.g., "1g", "512m")

328

* @return Builder instance

329

*/

330

public Builder setTaskManagerMemory(String taskManagerMemory);

331

332

/**

333

* Build the settings instance

334

* @return Configured FlinkContainersSettings

335

*/

336

public FlinkContainersSettings build();

337

}

338

339

public int getNumTaskManagers();

340

public int getNumSlotsPerTaskManager();

341

public String getJobManagerMemory();

342

public String getTaskManagerMemory();

343

}

344

345

/**

346

* General TestContainers configuration settings

347

*/

348

public class TestcontainersSettings {

349

350

public static Builder builder();

351

352

public static class Builder {

353

/**

354

* Set Docker network for container communication

355

* @param network TestContainers network instance

356

* @return Builder instance

357

*/

358

public Builder setNetwork(Network network);

359

360

/**

361

* Set log consumers for container output

362

* @param logConsumers Map of container name to log consumer

363

* @return Builder instance

364

*/

365

public Builder setLogConsumers(Map<String, Consumer<OutputFrame>> logConsumers);

366

367

/**

368

* Build the settings instance

369

* @return Configured TestcontainersSettings

370

*/

371

public TestcontainersSettings build();

372

}

373

374

public Optional<Network> getNetwork();

375

public Map<String, Consumer<OutputFrame>> getLogConsumers();

376

}

377

```

378

379

## Environment Lifecycle

380

381

### Startup Sequence

382

383

1. **Resource Allocation**: Allocate cluster resources (containers, processes)

384

2. **Cluster Initialization**: Start JobManager and TaskManagers

385

3. **Service Discovery**: Establish REST endpoints and communication

386

4. **Readiness Check**: Verify cluster is ready for job submission

387

388

### Teardown Sequence

389

390

1. **Job Termination**: Cancel any running jobs

391

2. **Cluster Shutdown**: Stop TaskManagers and JobManager

392

3. **Resource Cleanup**: Clean up containers, temporary files

393

4. **Network Cleanup**: Remove Docker networks and volumes

394

395

### Lifecycle Management

396

397

Test environments follow PER-CLASS lifecycle:

398

399

- **Single Instance**: One environment instance per test class

400

- **Shared Resources**: All test methods in class share the same cluster

401

- **Performance Optimization**: Avoids expensive cluster startup/teardown per test

402

- **Resource Efficiency**: Reduces resource usage for test suites

403

404

```java

405

@TestEnv

406

MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();

407

// Started once before first test method

408

// Shared by all test methods in the class

409

// Torn down after last test method completes

410

```

411

412

## Performance Considerations

413

414

### MiniCluster vs Containers

415

416

**MiniCluster Advantages:**

417

- Faster startup/teardown

418

- Lower resource usage

419

- Simpler debugging

420

- Better for unit-style tests

421

422

**Container Advantages:**

423

- Better isolation

424

- More realistic deployment

425

- Network isolation

426

- Better for integration tests

427

428

### Resource Configuration

429

430

```java

431

// Lightweight configuration for fast tests

432

@TestEnv

433

MiniClusterTestEnvironment lightEnv = new MiniClusterTestEnvironment(

434

new MiniClusterConfiguration(

435

new Configuration(),

436

1, // single task manager

437

1, // single slot

438

ResourceID.generate(),

439

Time.seconds(30) // short timeout

440

)

441

);

442

443

// Heavy configuration for complex tests

444

@TestEnv

445

FlinkContainerTestEnvironment heavyEnv = new FlinkContainerTestEnvironment(

446

FlinkContainersSettings.builder()

447

.setNumTaskManagers(4)

448

.setNumSlotsPerTaskManager(4)

449

.setJobManagerMemory("2g")

450

.setTaskManagerMemory("2g")

451

.build()

452

);

453

```

454

455

## Error Handling

456

457

### Common Failure Scenarios

458

459

- **Port Conflicts**: Environment handles port allocation automatically

460

- **Resource Exhaustion**: Clear error messages for insufficient resources

461

- **Network Issues**: Retry logic for container networking

462

- **Cleanup Failures**: Warnings logged but don't fail tests

463

464

### Best Practices

465

466

- **Timeout Configuration**: Set appropriate timeouts for cluster operations

467

- **Resource Limits**: Configure memory and CPU limits appropriately

468

- **Error Recovery**: Implement retry logic for transient failures

469

- **Logging**: Enable detailed logging for troubleshooting