0
# Execution Context
1
2
Execution environments that provide programmatic APIs for submitting and managing Flink jobs within applications.
3
4
## Capabilities
5
6
### ContextEnvironment
7
8
Execution environment for remote cluster execution that provides programmatic job submission with full context awareness.
9
10
```java { .api }
11
/**
12
* Execution environment for remote cluster execution with context awareness
13
*/
14
public class ContextEnvironment extends ExecutionEnvironment {
15
/**
16
* Creates a context environment for remote execution
17
* @param remoteConnection Client connection to the remote cluster
18
* @param jarFiles List of JAR file URLs to include
19
* @param classpaths List of additional classpath URLs
20
* @param userCodeClassLoader Class loader for user code
21
* @param savepointSettings Settings for savepoint restoration
22
*/
23
public ContextEnvironment(
24
ClusterClient remoteConnection,
25
List<URL> jarFiles,
26
List<URL> classpaths,
27
ClassLoader userCodeClassLoader,
28
SavepointRestoreSettings savepointSettings
29
);
30
31
/**
32
* Executes the job with the specified name
33
* @param jobName Name for the job execution
34
* @return JobExecutionResult with execution details and results
35
* @throws Exception if execution fails
36
*/
37
public JobExecutionResult execute(String jobName) throws Exception;
38
39
/**
40
* Gets the execution plan without executing the job
41
* @return String representation of the execution plan
42
* @throws Exception if plan generation fails
43
*/
44
public String getExecutionPlan() throws Exception;
45
46
/**
47
* Starts a new session for job execution
48
* @throws Exception if session creation fails
49
*/
50
public void startNewSession() throws Exception;
51
52
/**
53
* Gets the cluster client used by this environment
54
* @return ClusterClient instance for cluster communication
55
*/
56
public ClusterClient getClient();
57
58
/**
59
* Gets the list of JAR files associated with this environment
60
* @return List of JAR file URLs
61
*/
62
public List<URL> getJars();
63
64
/**
65
* Gets the list of additional classpaths
66
* @return List of classpath URLs
67
*/
68
public List<URL> getClasspaths();
69
70
/**
71
* Gets the user code class loader
72
* @return ClassLoader for user code execution
73
*/
74
public ClassLoader getUserCodeClassLoader();
75
76
/**
77
* Gets the savepoint restore settings
78
* @return SavepointRestoreSettings for job restoration
79
*/
80
public SavepointRestoreSettings getSavepointRestoreSettings();
81
82
/**
83
* Sets the context environment factory as the active factory
84
* @param factory ContextEnvironmentFactory to use for creating environments
85
*/
86
public static void setAsContext(ContextEnvironmentFactory factory);
87
88
/**
89
* Removes the current context environment factory
90
*/
91
public static void unsetContext();
92
}
93
```
94
95
### DetachedEnvironment
96
97
Execution environment for detached (non-blocking) job execution that returns immediately after job submission.
98
99
```java { .api }
100
/**
101
* Execution environment for detached (non-blocking) execution
102
*/
103
public class DetachedEnvironment extends ContextEnvironment {
104
/**
105
* Creates a detached environment for non-blocking remote execution
106
* @param remoteConnection Client connection to the remote cluster
107
* @param jarFiles List of JAR file URLs to include
108
* @param classpaths List of additional classpath URLs
109
* @param userCodeClassLoader Class loader for user code
110
* @param savepointSettings Settings for savepoint restoration
111
*/
112
public DetachedEnvironment(
113
ClusterClient remoteConnection,
114
List<URL> jarFiles,
115
List<URL> classpaths,
116
ClassLoader userCodeClassLoader,
117
SavepointRestoreSettings savepointSettings
118
);
119
120
/**
121
* Executes the job in detached mode (non-blocking)
122
* @param jobName Name for the job execution
123
* @return JobExecutionResult with job submission details (not execution results)
124
* @throws Exception if job submission fails
125
*/
126
public JobExecutionResult execute(String jobName) throws Exception;
127
128
/**
129
* Sets the detached execution plan
130
* @param plan FlinkPlan to execute in detached mode
131
*/
132
public void setDetachedPlan(FlinkPlan plan);
133
134
/**
135
* Finalizes the execution and returns submission result
136
* @return JobSubmissionResult with job ID and submission status
137
* @throws Exception if finalization fails
138
*/
139
public JobSubmissionResult finalizeExecute() throws Exception;
140
}
141
```
142
143
### ContextEnvironmentFactory
144
145
Factory for creating context-appropriate execution environments based on configuration and requirements.
146
147
```java { .api }
148
/**
149
* Factory for creating context-appropriate execution environments
150
*/
151
public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
152
/**
153
* Creates a context environment factory with full configuration
154
* @param client Cluster client for remote communication
155
* @param jarFilesToAttach List of JAR files to attach to jobs
156
* @param classpathsToAttach List of classpaths to attach to jobs
157
* @param userCodeClassLoader Class loader for user code
158
* @param defaultParallelism Default parallelism for job execution
159
* @param isDetached Whether to create detached environments
160
* @param savepointSettings Settings for savepoint restoration
161
*/
162
public ContextEnvironmentFactory(
163
ClusterClient client,
164
List<URL> jarFilesToAttach,
165
List<URL> classpathsToAttach,
166
ClassLoader userCodeClassLoader,
167
int defaultParallelism,
168
boolean isDetached,
169
SavepointRestoreSettings savepointSettings
170
);
171
172
/**
173
* Creates an execution environment based on factory configuration
174
* @return ExecutionEnvironment instance (ContextEnvironment or DetachedEnvironment)
175
*/
176
public ExecutionEnvironment createExecutionEnvironment();
177
178
/**
179
* Gets the last environment created by this factory
180
* @return Last created ExecutionEnvironment instance
181
*/
182
public ExecutionEnvironment getLastEnvCreated();
183
}
184
```
185
186
### Preview and Optimization Environments
187
188
Specialized environments for plan preview and optimization without actual execution.
189
190
```java { .api }
191
/**
192
* Execution environment for generating plan previews without execution
193
*/
194
public class PreviewPlanEnvironment extends ExecutionEnvironment {
195
/**
196
* Generates a plan preview instead of executing
197
* @param jobName Name for the job (used in preview)
198
* @return JobExecutionResult (empty - no actual execution)
199
* @throws Exception if preview generation fails
200
*/
201
public JobExecutionResult execute(String jobName) throws Exception;
202
203
/**
204
* Gets the execution plan for preview
205
* @return String representation of the execution plan
206
* @throws Exception if plan generation fails
207
*/
208
public String getExecutionPlan() throws Exception;
209
210
/**
211
* Gets the preview plan as a formatted string
212
* @return Formatted preview of the execution plan
213
* @throws Exception if preview generation fails
214
*/
215
public String getPreviewPlan() throws Exception;
216
}
217
218
/**
219
* Execution environment for plan optimization preview
220
*/
221
public class OptimizerPlanEnvironment extends ExecutionEnvironment {
222
/**
223
* Generates an optimized plan instead of executing
224
* @param jobName Name for the job (used in optimization)
225
* @return JobExecutionResult (empty - no actual execution)
226
* @throws Exception if optimization fails
227
*/
228
public JobExecutionResult execute(String jobName) throws Exception;
229
230
/**
231
* Gets the optimized execution plan
232
* @return String representation of the optimized plan
233
* @throws Exception if optimization fails
234
*/
235
public String getExecutionPlan() throws Exception;
236
}
237
```
238
239
**Usage Examples:**
240
241
```java
242
import org.apache.flink.client.program.ContextEnvironment;
243
import org.apache.flink.client.program.DetachedEnvironment;
244
import org.apache.flink.client.program.ContextEnvironmentFactory;
245
import org.apache.flink.client.program.StandaloneClusterClient;
246
import org.apache.flink.api.java.ExecutionEnvironment;
247
import org.apache.flink.configuration.Configuration;
248
import java.net.URL;
249
import java.util.Arrays;
250
import java.util.List;
251
252
// Setup cluster connection
253
Configuration config = new Configuration();
254
StandaloneClusterClient client = new StandaloneClusterClient(config);
255
256
List<URL> jarFiles = Arrays.asList(
257
new File("my-job.jar").toURI().toURL()
258
);
259
List<URL> classpaths = Arrays.asList();
260
261
// Create context environment for synchronous execution
262
ContextEnvironment contextEnv = new ContextEnvironment(
263
client,
264
jarFiles,
265
classpaths,
266
Thread.currentThread().getContextClassLoader(),
267
SavepointRestoreSettings.none()
268
);
269
270
// Set as the active execution environment
271
ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
272
client,
273
jarFiles,
274
classpaths,
275
Thread.currentThread().getContextClassLoader(),
276
4, // default parallelism
277
false, // not detached
278
SavepointRestoreSettings.none()
279
);
280
281
ContextEnvironment.setAsContext(factory);
282
283
try {
284
// Now your Flink program code can use ExecutionEnvironment.getExecutionEnvironment()
285
// and it will automatically use the context environment
286
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
287
288
// Build your Flink program
289
DataSet<String> data = env.fromElements("Hello", "World", "Flink");
290
data.print();
291
292
// Execute the job (this will use the context environment)
293
JobExecutionResult result = env.execute("My Context Job");
294
System.out.println("Job completed in: " + result.getNetRuntime() + "ms");
295
296
} finally {
297
ContextEnvironment.unsetContext();
298
client.shutdown();
299
}
300
301
// Detached execution for fire-and-forget jobs
302
DetachedEnvironment detachedEnv = new DetachedEnvironment(
303
client,
304
jarFiles,
305
classpaths,
306
Thread.currentThread().getContextClassLoader(),
307
SavepointRestoreSettings.none()
308
);
309
310
ContextEnvironmentFactory detachedFactory = new ContextEnvironmentFactory(
311
client,
312
jarFiles,
313
classpaths,
314
Thread.currentThread().getContextClassLoader(),
315
4, // default parallelism
316
true, // detached mode
317
SavepointRestoreSettings.none()
318
);
319
320
ContextEnvironment.setAsContext(detachedFactory);
321
322
try {
323
ExecutionEnvironment detachedEnvFromFactory = ExecutionEnvironment.getExecutionEnvironment();
324
325
// Build your Flink program
326
DataSet<String> data = detachedEnvFromFactory.fromElements("Detached", "Job", "Execution");
327
data.writeAsText("/path/to/output");
328
329
// This will submit the job and return immediately
330
JobExecutionResult detachedResult = detachedEnvFromFactory.execute("My Detached Job");
331
System.out.println("Job submitted with ID: " + detachedResult.getJobID());
332
333
} finally {
334
ContextEnvironment.unsetContext();
335
}
336
337
// Plan preview without execution
338
PreviewPlanEnvironment previewEnv = new PreviewPlanEnvironment();
339
previewEnv.setParallelism(4);
340
341
DataSet<String> previewData = previewEnv.fromElements("Preview", "Plan", "Generation");
342
previewData.map(s -> s.toUpperCase()).print();
343
344
// Get the execution plan without running the job
345
String executionPlan = previewEnv.getExecutionPlan();
346
System.out.println("Execution Plan Preview:");
347
System.out.println(executionPlan);
348
349
String previewPlan = previewEnv.getPreviewPlan();
350
System.out.println("Formatted Preview:");
351
System.out.println(previewPlan);
352
```
353
354
### Environment Management Interfaces
355
356
Base interfaces for execution environment management.
357
358
```java { .api }
359
/**
360
* Factory interface for creating execution environments
361
*/
362
public interface ExecutionEnvironmentFactory {
363
/**
364
* Creates an execution environment instance
365
* @return ExecutionEnvironment configured by this factory
366
*/
367
ExecutionEnvironment createExecutionEnvironment();
368
}
369
```