0
# Test Environments
1
2
Test environments manage the Flink cluster lifecycle and provide execution contexts for connector tests. They abstract away the complexity of cluster management while supporting various deployment modes.
3
4
## Capabilities
5
6
### Test Environment Interface
7
8
Base interface for all test environment implementations.
9
10
```java { .api }
11
/**
12
* Test environment for running Flink jobs
13
* Manages Flink cluster lifecycle and provides execution context
14
*/
15
public interface TestEnvironment extends TestResource {
16
17
/**
18
* Create StreamExecutionEnvironment for job building and execution
19
* @param envOptions Environment configuration options
20
* @return Configured StreamExecutionEnvironment bound to this cluster
21
*/
22
StreamExecutionEnvironment createExecutionEnvironment(TestEnvironmentSettings envOptions);
23
24
/**
25
* Get REST endpoint for cluster communication
26
* @return Endpoint with address and port for REST API access
27
*/
28
Endpoint getRestEndpoint();
29
30
/**
31
* Get checkpoint/savepoint storage path
32
* @return URI string for checkpoint and savepoint storage
33
*/
34
String getCheckpointUri();
35
36
/**
37
* Endpoint configuration for REST API access
38
*/
39
class Endpoint {
40
public Endpoint(String address, int port);
41
public String getAddress();
42
public int getPort();
43
}
44
}
45
46
/**
47
* Base interface for test resource lifecycle management
48
*/
49
public interface TestResource {
50
/**
51
* Start up the test resource (idempotent operation)
52
* @throws Exception if startup fails
53
*/
54
void startUp() throws Exception;
55
56
/**
57
* Tear down the test resource
58
* Should handle cleanup even if startup never occurred
59
* @throws Exception if teardown fails
60
*/
61
void tearDown() throws Exception;
62
}
63
```
64
65
**Usage Examples:**
66
67
```java
68
// Environment registration
69
@TestEnv
70
MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();
71
72
// Using environment in test
73
StreamExecutionEnvironment env = testEnv.createExecutionEnvironment(
74
TestEnvironmentSettings.builder()
75
.setConnectorJarPaths(externalContext.getConnectorJarPaths())
76
.build()
77
);
78
```
79
80
### MiniCluster Test Environment
81
82
In-process test environment using Flink's MiniCluster for fast, lightweight testing.
83
84
```java { .api }
85
/**
86
* Test environment using Flink MiniCluster for in-process testing
87
*/
88
public class MiniClusterTestEnvironment implements TestEnvironment {
89
90
/**
91
* Create MiniCluster environment with default configuration
92
*/
93
public MiniClusterTestEnvironment();
94
95
/**
96
* Create MiniCluster environment with custom configuration
97
* @param miniClusterConfig MiniCluster configuration
98
*/
99
public MiniClusterTestEnvironment(MiniClusterConfiguration miniClusterConfig);
100
101
@Override
102
public StreamExecutionEnvironment createExecutionEnvironment(TestEnvironmentSettings envOptions);
103
104
@Override
105
public Endpoint getRestEndpoint();
106
107
@Override
108
public String getCheckpointUri();
109
110
@Override
111
public void startUp() throws Exception;
112
113
@Override
114
public void tearDown() throws Exception;
115
}
116
```
117
118
**Usage Examples:**
119
120
```java
121
// Default MiniCluster environment
122
@TestEnv
123
MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();
124
125
// Custom MiniCluster configuration
126
@TestEnv
127
MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment(
128
new MiniClusterConfiguration(
129
new Configuration(),
130
2, // number of task managers
131
2, // number of slots per task manager
132
ResourceID.generate(),
133
Time.minutes(10) // timeout
134
)
135
);
136
```
137
138
### Container Test Environment
139
140
Containerized test environment using Docker containers for isolated testing.
141
142
```java { .api }
143
/**
144
* Test environment using Flink containers for isolated testing
145
*/
146
public class FlinkContainerTestEnvironment implements TestEnvironment {
147
148
/**
149
* Create container environment with specified settings
150
* @param settings Container configuration settings
151
*/
152
public FlinkContainerTestEnvironment(FlinkContainersSettings settings);
153
154
/**
155
* Create container environment with default settings
156
*/
157
public FlinkContainerTestEnvironment();
158
159
@Override
160
public StreamExecutionEnvironment createExecutionEnvironment(TestEnvironmentSettings envOptions);
161
162
@Override
163
public Endpoint getRestEndpoint();
164
165
@Override
166
public String getCheckpointUri();
167
168
@Override
169
public void startUp() throws Exception;
170
171
@Override
172
public void tearDown() throws Exception;
173
}
174
```
175
176
**Usage Examples:**
177
178
```java
179
// Default container environment
180
@TestEnv
181
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment();
182
183
// Custom container configuration
184
@TestEnv
185
FlinkContainerTestEnvironment testEnv = new FlinkContainerTestEnvironment(
186
FlinkContainersSettings.builder()
187
.setNumTaskManagers(2)
188
.setNumSlotsPerTaskManager(2)
189
.setJobManagerMemory("1g")
190
.setTaskManagerMemory("1g")
191
.build()
192
);
193
```
194
195
### Cluster Controllable Interface
196
197
Interface for environments that support cluster control operations like failover simulation.
198
199
```java { .api }
200
/**
201
* Interface for test environments that support cluster control operations
202
*/
203
public interface ClusterControllable {
204
205
/**
206
* Trigger TaskManager failover during test execution
207
* @param jobClient Current job client
208
* @param afterFailAction Action to execute after triggering failover
209
* @throws Exception if failover cannot be triggered
210
*/
211
void triggerTaskManagerFailover(JobClient jobClient, Runnable afterFailAction) throws Exception;
212
}
213
```
214
215
**Usage Examples:**
216
217
```java
218
// In test method that supports cluster controllable
219
@TestTemplate
220
public void testTaskManagerFailure(
221
TestEnvironment testEnv,
222
DataStreamSourceExternalContext<T> externalContext,
223
ClusterControllable controller, // Injected when environment supports it
224
CheckpointingMode semantic
225
) throws Exception {
226
// Test implementation uses controller to trigger failover
227
controller.triggerTaskManagerFailover(jobClient, () -> {
228
// Actions to perform after failover is triggered
229
});
230
}
231
```
232
233
## Configuration
234
235
### Test Environment Settings
236
237
Configuration for test environment behavior and job submission.
238
239
```java { .api }
240
/**
241
* Configuration settings for test environment setup
242
*/
243
public class TestEnvironmentSettings {
244
245
public static Builder builder();
246
247
public static class Builder {
248
/**
249
* Set connector JAR paths to attach to jobs
250
* @param connectorJarPaths List of connector JAR URLs
251
* @return Builder instance
252
*/
253
public Builder setConnectorJarPaths(List<URL> connectorJarPaths);
254
255
/**
256
* Set savepoint path for job restoration
257
* @param savepointRestorePath Path to savepoint for job restart
258
* @return Builder instance
259
*/
260
public Builder setSavepointRestorePath(String savepointRestorePath);
261
262
/**
263
* Build the settings instance
264
* @return Configured TestEnvironmentSettings
265
*/
266
public TestEnvironmentSettings build();
267
}
268
269
public List<URL> getConnectorJarPaths();
270
public Optional<String> getSavepointRestorePath();
271
}
272
```
273
274
**Usage Examples:**
275
276
```java
277
// Basic environment settings
278
TestEnvironmentSettings settings = TestEnvironmentSettings.builder()
279
.setConnectorJarPaths(externalContext.getConnectorJarPaths())
280
.build();
281
282
// Settings with savepoint restoration
283
TestEnvironmentSettings restartSettings = TestEnvironmentSettings.builder()
284
.setConnectorJarPaths(externalContext.getConnectorJarPaths())
285
.setSavepointRestorePath("/path/to/savepoint")
286
.build();
287
288
StreamExecutionEnvironment env = testEnv.createExecutionEnvironment(settings);
289
```
290
291
### Container Settings
292
293
Configuration for containerized test environments.
294
295
```java { .api }
296
/**
297
* Configuration settings for Flink container environments
298
*/
299
public class FlinkContainersSettings {
300
301
public static Builder builder();
302
303
public static class Builder {
304
/**
305
* Set number of TaskManager containers
306
* @param numTaskManagers Number of TaskManager containers to start
307
* @return Builder instance
308
*/
309
public Builder setNumTaskManagers(int numTaskManagers);
310
311
/**
312
* Set number of slots per TaskManager
313
* @param numSlotsPerTaskManager Slots per TaskManager container
314
* @return Builder instance
315
*/
316
public Builder setNumSlotsPerTaskManager(int numSlotsPerTaskManager);
317
318
/**
319
* Set JobManager memory allocation
320
* @param jobManagerMemory Memory allocation (e.g., "1g", "512m")
321
* @return Builder instance
322
*/
323
public Builder setJobManagerMemory(String jobManagerMemory);
324
325
/**
326
* Set TaskManager memory allocation
327
* @param taskManagerMemory Memory allocation (e.g., "1g", "512m")
328
* @return Builder instance
329
*/
330
public Builder setTaskManagerMemory(String taskManagerMemory);
331
332
/**
333
* Build the settings instance
334
* @return Configured FlinkContainersSettings
335
*/
336
public FlinkContainersSettings build();
337
}
338
339
public int getNumTaskManagers();
340
public int getNumSlotsPerTaskManager();
341
public String getJobManagerMemory();
342
public String getTaskManagerMemory();
343
}
344
345
/**
346
* General TestContainers configuration settings
347
*/
348
public class TestcontainersSettings {
349
350
public static Builder builder();
351
352
public static class Builder {
353
/**
354
* Set Docker network for container communication
355
* @param network TestContainers network instance
356
* @return Builder instance
357
*/
358
public Builder setNetwork(Network network);
359
360
/**
361
* Set log consumers for container output
362
* @param logConsumers Map of container name to log consumer
363
* @return Builder instance
364
*/
365
public Builder setLogConsumers(Map<String, Consumer<OutputFrame>> logConsumers);
366
367
/**
368
* Build the settings instance
369
* @return Configured TestcontainersSettings
370
*/
371
public TestcontainersSettings build();
372
}
373
374
public Optional<Network> getNetwork();
375
public Map<String, Consumer<OutputFrame>> getLogConsumers();
376
}
377
```
378
379
## Environment Lifecycle
380
381
### Startup Sequence
382
383
1. **Resource Allocation**: Allocate cluster resources (containers, processes)
384
2. **Cluster Initialization**: Start JobManager and TaskManagers
385
3. **Service Discovery**: Establish REST endpoints and communication
386
4. **Readiness Check**: Verify cluster is ready for job submission
387
388
### Teardown Sequence
389
390
1. **Job Termination**: Cancel any running jobs
391
2. **Cluster Shutdown**: Stop TaskManagers and JobManager
392
3. **Resource Cleanup**: Clean up containers, temporary files
393
4. **Network Cleanup**: Remove Docker networks and volumes
394
395
### Lifecycle Management
396
397
Test environments follow PER-CLASS lifecycle:
398
399
- **Single Instance**: One environment instance per test class
400
- **Shared Resources**: All test methods in class share the same cluster
401
- **Performance Optimization**: Avoids expensive cluster startup/teardown per test
402
- **Resource Efficiency**: Reduces resource usage for test suites
403
404
```java
405
@TestEnv
406
MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();
407
// Started once before first test method
408
// Shared by all test methods in the class
409
// Torn down after last test method completes
410
```
411
412
## Performance Considerations
413
414
### MiniCluster vs Containers
415
416
**MiniCluster Advantages:**
417
- Faster startup/teardown
418
- Lower resource usage
419
- Simpler debugging
420
- Better for unit-style tests
421
422
**Container Advantages:**
423
- Better isolation
424
- More realistic deployment
425
- Network isolation
426
- Better for integration tests
427
428
### Resource Configuration
429
430
```java
431
// Lightweight configuration for fast tests
432
@TestEnv
433
MiniClusterTestEnvironment lightEnv = new MiniClusterTestEnvironment(
434
new MiniClusterConfiguration(
435
new Configuration(),
436
1, // single task manager
437
1, // single slot
438
ResourceID.generate(),
439
Time.seconds(30) // short timeout
440
)
441
);
442
443
// Heavy configuration for complex tests
444
@TestEnv
445
FlinkContainerTestEnvironment heavyEnv = new FlinkContainerTestEnvironment(
446
FlinkContainersSettings.builder()
447
.setNumTaskManagers(4)
448
.setNumSlotsPerTaskManager(4)
449
.setJobManagerMemory("2g")
450
.setTaskManagerMemory("2g")
451
.build()
452
);
453
```
454
455
## Error Handling
456
457
### Common Failure Scenarios
458
459
- **Port Conflicts**: Environment handles port allocation automatically
460
- **Resource Exhaustion**: Clear error messages for insufficient resources
461
- **Network Issues**: Retry logic for container networking
462
- **Cleanup Failures**: Warnings logged but don't fail tests
463
464
### Best Practices
465
466
- **Timeout Configuration**: Set appropriate timeouts for cluster operations
467
- **Resource Limits**: Configure memory and CPU limits appropriately
468
- **Error Recovery**: Implement retry logic for transient failures
469
- **Logging**: Enable detailed logging for troubleshooting