0
# Test Environments
1
2
Test environments provide specialized execution environments for running Flink jobs in test contexts. They execute jobs on LocalFlinkMiniCluster instances and can be configured as global execution contexts.
3
4
## Batch Testing Environment
5
6
### TestEnvironment
7
8
ExecutionEnvironment implementation that executes jobs on LocalFlinkMiniCluster for batch processing tests.
9
10
```java { .api }
11
public class TestEnvironment extends ExecutionEnvironment {
12
public TestEnvironment(LocalFlinkMiniCluster miniCluster, int parallelism, boolean isObjectReuseEnabled,
13
Collection<Path> jarFiles, Collection<URL> classPaths);
14
public TestEnvironment(LocalFlinkMiniCluster executor, int parallelism, boolean isObjectReuseEnabled);
15
16
public JobExecutionResult getLastJobExecutionResult();
17
public void startNewSession() throws Exception;
18
public JobExecutionResult execute(String jobName) throws Exception;
19
public String getExecutionPlan() throws Exception;
20
21
public void setAsContext();
22
public static void setAsContext(LocalFlinkMiniCluster miniCluster, int parallelism,
23
Collection<Path> jarFiles, Collection<URL> classPaths);
24
public static void setAsContext(LocalFlinkMiniCluster miniCluster, int parallelism);
25
public static void unsetAsContext();
26
}
27
```
28
29
**Usage Example:**
30
31
```java
32
// Create and configure test environment
33
LocalFlinkMiniCluster cluster = TestBaseUtils.startCluster(1, 4, false, false, true);
34
TestEnvironment testEnv = new TestEnvironment(cluster, 4, false);
35
36
// Set as global context
37
testEnv.setAsContext();
38
39
// Now ExecutionEnvironment.getExecutionEnvironment() returns this test environment
40
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
41
DataSet<String> result = env.fromElements("test").map(s -> s.toUpperCase());
42
JobExecutionResult jobResult = env.execute("test job");
43
44
// Clean up
45
TestEnvironment.unsetAsContext();
46
```
47
48
### CollectionTestEnvironment
49
50
Collection-based test environment for lightweight local testing without a cluster.
51
52
```java { .api }
53
public class CollectionTestEnvironment extends CollectionEnvironment {
54
public JobExecutionResult getLastJobExecutionResult();
55
public JobExecutionResult execute() throws Exception;
56
public JobExecutionResult execute(String jobName) throws Exception;
57
58
protected void setAsContext();
59
protected static void unsetAsContext();
60
}
61
```
62
63
**Usage Example:**
64
65
```java
66
// Use collection environment for simple tests
67
CollectionTestEnvironment collectionEnv = new CollectionTestEnvironment();
68
collectionEnv.setAsContext();
69
70
// Jobs will execute locally using Java collections
71
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
72
DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);
73
List<Integer> result = numbers.map(x -> x * 2).collect();
74
75
CollectionTestEnvironment.unsetAsContext();
76
```
77
78
## Streaming Testing Environment
79
80
### TestStreamEnvironment
81
82
StreamExecutionEnvironment implementation that executes streaming jobs on LocalFlinkMiniCluster.
83
84
```java { .api }
85
public class TestStreamEnvironment extends StreamExecutionEnvironment {
86
public TestStreamEnvironment(LocalFlinkMiniCluster miniCluster, int parallelism,
87
Collection<Path> jarFiles, Collection<URL> classPaths);
88
public TestStreamEnvironment(LocalFlinkMiniCluster miniCluster, int parallelism);
89
90
public JobExecutionResult execute(String jobName) throws Exception;
91
92
public static void setAsContext(LocalFlinkMiniCluster cluster, int parallelism,
93
Collection<Path> jarFiles, Collection<URL> classpaths);
94
public static void setAsContext(LocalFlinkMiniCluster cluster, int parallelism);
95
public static void unsetAsContext();
96
}
97
```
98
99
**Usage Example:**
100
101
```java
102
// Create streaming test environment
103
LocalFlinkMiniCluster cluster = TestBaseUtils.startCluster(1, 4, false, false, true);
104
TestStreamEnvironment.setAsContext(cluster, 4);
105
106
// Now StreamExecutionEnvironment.getExecutionEnvironment() returns test environment
107
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
108
DataStream<String> stream = env.fromElements("hello", "world")
109
.map(s -> s.toUpperCase());
110
111
stream.print();
112
JobExecutionResult result = env.execute("streaming test");
113
114
// Clean up
115
TestStreamEnvironment.unsetAsContext();
116
TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
117
```
118
119
## Environment Configuration
120
121
All test environments support configuration of:
122
123
- **Parallelism**: Number of parallel slots for job execution
124
- **Object Reuse**: Whether to reuse objects to reduce garbage collection (batch only)
125
- **JAR Files**: Additional JAR files to include in the classpath
126
- **Class Paths**: Additional URLs to include in the classpath
127
- **Cluster Configuration**: LocalFlinkMiniCluster settings
128
129
## Context Management
130
131
Test environments can be set as global contexts using static methods:
132
133
- `setAsContext()` - Makes the test environment the default for `ExecutionEnvironment.getExecutionEnvironment()`
134
- `unsetAsContext()` - Restores the previous execution environment
135
136
This allows existing code that calls `ExecutionEnvironment.getExecutionEnvironment()` to automatically use the test environment without modification.