0
# Test Environments
1
2
Specialized execution environments for streaming and batch testing with multiple execution modes, object reuse configuration, and temporary file management. These environments provide isolated testing contexts for Flink applications.
3
4
## Capabilities
5
6
### Test Stream Environment
7
8
StreamExecutionEnvironment specifically designed for testing on MiniCluster with job execution tracking and context management.
9
10
```java { .api }
11
public class TestStreamEnvironment extends StreamExecutionEnvironment {
12
public TestStreamEnvironment(MiniCluster miniCluster, int parallelism);
13
public TestStreamEnvironment(
14
MiniCluster miniCluster,
15
Configuration config,
16
int parallelism,
17
Collection<Path> jarFiles,
18
Collection<URL> classPaths);
19
20
public static void setAsContext(MiniCluster miniCluster, int parallelism);
21
public static void setAsContext(
22
MiniCluster miniCluster,
23
int parallelism,
24
Collection<Path> jarFiles,
25
Collection<URL> classPaths);
26
public static void unsetAsContext();
27
28
public void setAsContext();
29
public JobExecutionResult getLastJobExecutionResult();
30
}
31
```
32
33
#### Usage Example
34
35
```java
36
import org.apache.flink.streaming.util.TestStreamEnvironment;
37
import org.apache.flink.runtime.minicluster.MiniCluster;
38
39
// Create test environment with specific parallelism
40
MiniCluster miniCluster = getMiniCluster();
41
TestStreamEnvironment env = new TestStreamEnvironment(miniCluster, 4);
42
43
// Set as global context for factory methods
44
TestStreamEnvironment.setAsContext(miniCluster, 4);
45
StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment();
46
47
// Execute job and get result
48
env.fromElements(1, 2, 3, 4, 5)
49
.map(x -> x * 2)
50
.print();
51
52
JobExecutionResult result = env.execute("Test Job");
53
JobExecutionResult lastResult = env.getLastJobExecutionResult();
54
```
55
56
### SQL Job Submission
57
58
Interface and implementations for different SQL job submission methods including embedded SQL clients, gateway clients, and REST clients.
59
60
```java { .api }
61
public interface SQLJobClientMode {
62
EmbeddedSqlClient getEmbeddedSqlClient();
63
GatewaySqlClient getGatewaySqlClient(String host, int port);
64
HiveJDBC getHiveJDBC(String host, int port);
65
RestClient getRestClient(String host, int port, String version);
66
}
67
```
68
69
#### SQL Client Types
70
71
```java { .api }
72
public static class EmbeddedSqlClient {
73
// Embedded SQL client implementation
74
}
75
76
public static class GatewaySqlClient {
77
// Gateway SQL client implementation
78
}
79
80
public static class HiveJDBC {
81
// Hive JDBC client implementation
82
}
83
84
public static class RestClient {
85
// REST client implementation
86
}
87
```
88
89
### Job Submission Utilities
90
91
Utilities for submitting and managing Flink jobs in test environments with support for both regular jobs and SQL jobs.
92
93
```java { .api }
94
public class JobSubmission {
95
// Job submission functionality
96
}
97
98
public class SQLJobSubmission {
99
// SQL job submission functionality
100
}
101
```
102
103
### Secure Test Environment
104
105
Test environment with Kerberos security support for testing secure Flink deployments.
106
107
```java { .api }
108
public class SecureTestEnvironment {
109
// Security context management for tests
110
}
111
```
112
113
### Testing Security Context
114
115
Security context management utilities for handling authentication and authorization in test environments.
116
117
```java { .api }
118
public class TestingSecurityContext {
119
// Security context utilities
120
}
121
```
122
123
### Process and Script Execution
124
125
Utilities for executing external processes and shell scripts within test environments.
126
127
```java { .api }
128
public class TestProcessBuilder {
129
// Process builder for test execution
130
}
131
132
public class ShellScript {
133
// Shell script execution utilities
134
}
135
```
136
137
### Pipeline Executor Service
138
139
Service loader for MiniCluster pipeline executor, enabling custom execution strategies for test environments.
140
141
```java { .api }
142
public class MiniClusterPipelineExecutorServiceLoader {
143
// Service loader for pipeline executors
144
}
145
```
146
147
## Execution Modes
148
149
### Test Execution Modes
150
151
Enumeration of available execution modes for parameterized testing with different cluster configurations.
152
153
```java { .api }
154
public enum TestExecutionMode {
155
CLUSTER, // Standard cluster execution
156
CLUSTER_OBJECT_REUSE // Cluster execution with object reuse optimization
157
}
158
```
159
160
### Collection Execution
161
162
Support for collection-based execution mode that runs jobs in-memory without cluster deployment, useful for unit testing individual operations.
163
164
```java { .api }
165
public boolean isCollectionExecution();
166
```
167
168
## Configuration and Utilities
169
170
### Test Utilities
171
172
General utility functions for test environment setup and management.
173
174
```java { .api }
175
public class TestUtils {
176
// Miscellaneous test utility functions
177
}
178
```
179
180
### File Utilities
181
182
File manipulation utilities specifically designed for test environments including temporary file management and path resolution.
183
184
```java { .api }
185
public class FileUtils {
186
// File manipulation utilities for tests
187
}
188
```
189
190
### Success Exception
191
192
Special exception type indicating successful test completion, used in specific testing scenarios where exceptions signal success rather than failure.
193
194
```java { .api }
195
public class SuccessException extends Exception {
196
// Exception indicating test success
197
}
198
```
199
200
## Usage Patterns
201
202
### Basic Streaming Test
203
204
```java
205
@Test
206
void testStreamingApplication() throws Exception {
207
TestStreamEnvironment env = new TestStreamEnvironment(miniCluster, 2);
208
209
DataStream<String> input = env.fromElements("a", "b", "c");
210
DataStream<String> result = input.map(String::toUpperCase);
211
212
result.print();
213
JobExecutionResult jobResult = env.execute("Upper Case Job");
214
215
// Verify execution was successful
216
assertNotNull(jobResult);
217
assertTrue(jobResult.getNetRuntime() > 0);
218
}
219
```
220
221
### Parameterized Execution Mode Testing
222
223
```java
224
@ParameterizedTest
225
@EnumSource(TestExecutionMode.class)
226
void testWithDifferentModes(TestExecutionMode mode) throws Exception {
227
// Configure environment based on execution mode
228
switch (mode) {
229
case CLUSTER:
230
// Standard cluster configuration
231
break;
232
case CLUSTER_OBJECT_REUSE:
233
// Object reuse optimization configuration
234
break;
235
}
236
237
// Run test with specified mode
238
}
239
```
240
241
### Context Management
242
243
```java
244
@BeforeEach
245
void setupTestContext() {
246
TestStreamEnvironment.setAsContext(miniCluster, 4);
247
}
248
249
@AfterEach
250
void cleanupTestContext() {
251
TestStreamEnvironment.unsetAsContext();
252
}
253
254
@Test
255
void testWithGlobalContext() {
256
// Use factory method - will return TestStreamEnvironment
257
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
258
// Test implementation
259
}
260
```