0
# Execution Environments
1
2
Executors for running Flink programs both locally for development and testing, and remotely on production clusters.
3
4
## Capabilities
5
6
### LocalExecutor
7
8
Executor for running Flink programs locally in the same JVM, ideal for development, testing, and debugging.
9
10
```java { .api }
11
/**
12
* Executor for running Flink programs locally in the same JVM
13
*/
14
public class LocalExecutor extends PlanExecutor {
15
/**
16
* Creates a local executor with default configuration
17
*/
18
public LocalExecutor();
19
20
/**
21
* Creates a local executor with specific configuration
22
* @param conf Configuration for local execution environment
23
*/
24
public LocalExecutor(Configuration conf);
25
26
/**
27
* Starts the local execution environment
28
* @throws Exception if startup fails
29
*/
30
public void start() throws Exception;
31
32
/**
33
* Stops the local execution environment and releases resources
34
* @throws Exception if shutdown fails
35
*/
36
public void stop() throws Exception;
37
38
/**
39
* Checks if the local executor is currently running
40
* @return true if the executor is running
41
*/
42
public boolean isRunning();
43
44
/**
45
* Executes a Flink execution plan locally
46
* @param plan The execution plan to run
47
* @return JobExecutionResult with execution details and results
48
* @throws Exception if execution fails
49
*/
50
public JobExecutionResult executePlan(Plan plan) throws Exception;
51
52
/**
53
* Gets the optimizer plan as JSON for visualization
54
* @param plan The execution plan to optimize
55
* @return JSON string representation of the optimized plan
56
* @throws Exception if plan optimization fails
57
*/
58
public String getOptimizerPlanAsJSON(Plan plan) throws Exception;
59
60
/**
61
* Ends a specific job session
62
* @param jobID The job ID to end
63
* @throws Exception if session termination fails
64
*/
65
public void endSession(JobID jobID) throws Exception;
66
67
/**
68
* Checks if files should be overwritten by default
69
* @return true if default overwrite is enabled
70
*/
71
public boolean isDefaultOverwriteFiles();
72
73
/**
74
* Sets whether files should be overwritten by default
75
* @param defaultOverwriteFiles true to enable default overwrite
76
*/
77
public void setDefaultOverwriteFiles(boolean defaultOverwriteFiles);
78
79
/**
80
* Gets the number of task slots per TaskManager
81
* @return Number of task slots
82
*/
83
public int getTaskManagerNumSlots();
84
85
/**
86
* Sets the number of task slots per TaskManager
87
* @param taskManagerNumSlots Number of task slots to configure
88
*/
89
public void setTaskManagerNumSlots(int taskManagerNumSlots);
90
91
/**
92
* Executes a program locally (static convenience method)
93
* @param program The program to execute
94
* @param args Command line arguments
95
* @return JobExecutionResult with execution details and results
96
* @throws Exception if execution fails
97
*/
98
public static JobExecutionResult execute(Program program, String... args) throws Exception;
99
100
/**
101
* Executes an execution plan locally (static convenience method)
102
* @param plan The execution plan to run
103
* @return JobExecutionResult with execution details and results
104
* @throws Exception if execution fails
105
*/
106
public static JobExecutionResult execute(Plan plan) throws Exception;
107
108
/**
109
* Gets the optimizer plan as JSON (static convenience method)
110
* @param plan The execution plan to optimize
111
* @return JSON string representation of the optimized plan
112
* @throws Exception if plan optimization fails
113
*/
114
public static String optimizerPlanAsJSON(Plan plan) throws Exception;
115
116
/**
117
* Gets the execution plan as JSON (static convenience method)
118
* @param plan The execution plan to convert
119
* @return JSON string representation of the plan
120
* @throws Exception if plan conversion fails
121
*/
122
public static String getPlanAsJSON(Plan plan) throws Exception;
123
}
124
```
125
126
### RemoteExecutor
127
128
Executor for running Flink programs on remote clusters, supporting various connection modes and configurations.
129
130
```java { .api }
131
/**
132
* Executor for running Flink programs on remote clusters
133
*/
134
public class RemoteExecutor extends PlanExecutor {
135
/**
136
* Creates a remote executor for a specific hostname and port
137
* @param hostname Hostname of the Flink cluster JobManager
138
* @param port Port of the Flink cluster JobManager
139
*/
140
public RemoteExecutor(String hostname, int port);
141
142
/**
143
* Creates a remote executor with a JAR file dependency
144
* @param hostname Hostname of the Flink cluster JobManager
145
* @param port Port of the Flink cluster JobManager
146
* @param jarFile JAR file URL to include in execution
147
*/
148
public RemoteExecutor(String hostname, int port, URL jarFile);
149
150
/**
151
* Creates a remote executor with hostname:port format
152
* @param hostport Hostname and port in "hostname:port" format
153
* @param jarFile JAR file URL to include in execution
154
*/
155
public RemoteExecutor(String hostport, URL jarFile);
156
157
/**
158
* Creates a remote executor with multiple JAR files
159
* @param hostname Hostname of the Flink cluster JobManager
160
* @param port Port of the Flink cluster JobManager
161
* @param jarFiles List of JAR file URLs to include in execution
162
*/
163
public RemoteExecutor(String hostname, int port, List<URL> jarFiles);
164
165
/**
166
* Creates a remote executor with custom configuration
167
* @param hostname Hostname of the Flink cluster JobManager
168
* @param port Port of the Flink cluster JobManager
169
* @param clientConfiguration Configuration for the client connection
170
*/
171
public RemoteExecutor(String hostname, int port, Configuration clientConfiguration);
172
173
/**
174
* Creates a remote executor with full configuration options
175
* @param inet Socket address of the JobManager
176
* @param clientConfiguration Configuration for the client connection
177
* @param jarFiles List of JAR file URLs to include in execution
178
* @param globalClasspaths List of global classpath URLs
179
*/
180
public RemoteExecutor(InetSocketAddress inet, Configuration clientConfiguration, List<URL> jarFiles, List<URL> globalClasspaths);
181
182
/**
183
* Starts the remote executor connection
184
* @throws Exception if connection startup fails
185
*/
186
public void start() throws Exception;
187
188
/**
189
* Stops the remote executor and closes connections
190
* @throws Exception if shutdown fails
191
*/
192
public void stop() throws Exception;
193
194
/**
195
* Checks if the remote executor is currently running
196
* @return true if the executor is running and connected
197
*/
198
public boolean isRunning();
199
200
/**
201
* Executes a Flink execution plan on the remote cluster
202
* @param plan The execution plan to run
203
* @return JobExecutionResult with execution details and results
204
* @throws Exception if execution fails
205
*/
206
public JobExecutionResult executePlan(Plan plan) throws Exception;
207
208
/**
209
* Executes a job with JARs on the remote cluster
210
* @param program The job with associated JAR files
211
* @return JobExecutionResult with execution details and results
212
* @throws Exception if execution fails
213
*/
214
public JobExecutionResult executePlanWithJars(JobWithJars program) throws Exception;
215
216
/**
217
* Gets the optimizer plan as JSON for visualization
218
* @param plan The execution plan to optimize
219
* @return JSON string representation of the optimized plan
220
* @throws Exception if plan optimization fails
221
*/
222
public String getOptimizerPlanAsJSON(Plan plan) throws Exception;
223
224
/**
225
* Ends a specific job session on the remote cluster
226
* @param jobID The job ID to end
227
* @throws Exception if session termination fails
228
*/
229
public void endSession(JobID jobID) throws Exception;
230
231
/**
232
* Sets the default parallelism for job execution
233
* @param defaultParallelism Default parallelism level
234
*/
235
public void setDefaultParallelism(int defaultParallelism);
236
237
/**
238
* Gets the default parallelism for job execution
239
* @return Default parallelism level
240
*/
241
public int getDefaultParallelism();
242
}
243
```
244
245
### Client Utilities
246
247
Utility methods for client operations and connection management.
248
249
```java { .api }
250
/**
251
* Common utility methods for client operations
252
*/
253
public class ClientUtils {
254
/**
255
* Parses a hostname:port string into an InetSocketAddress
256
* @param hostport String in "hostname:port" format
257
* @return InetSocketAddress parsed from the input string
258
* @throws IllegalArgumentException if the format is invalid
259
*/
260
public static InetSocketAddress parseHostPortAddress(String hostport);
261
}
262
```
263
264
**Usage Examples:**
265
266
```java
267
import org.apache.flink.client.LocalExecutor;
268
import org.apache.flink.client.RemoteExecutor;
269
import org.apache.flink.client.ClientUtils;
270
import org.apache.flink.api.common.Plan;
271
import org.apache.flink.configuration.Configuration;
272
import java.net.InetSocketAddress;
273
import java.net.URL;
274
import java.util.Arrays;
275
import java.util.List;
276
277
// Local execution for development/testing
278
LocalExecutor localExecutor = new LocalExecutor();
279
localExecutor.setTaskManagerNumSlots(4);
280
localExecutor.setDefaultOverwriteFiles(true);
281
282
try {
283
localExecutor.start();
284
285
// Execute a Flink plan locally
286
Plan myPlan = createMyFlinkPlan(); // Your plan creation logic
287
JobExecutionResult result = localExecutor.executePlan(myPlan);
288
289
System.out.println("Local execution completed in: " + result.getNetRuntime() + "ms");
290
291
// Get execution plan as JSON for visualization
292
String planJson = localExecutor.getOptimizerPlanAsJSON(myPlan);
293
System.out.println("Execution plan: " + planJson);
294
295
} finally {
296
localExecutor.stop();
297
}
298
299
// Remote execution on a cluster
300
String jobManagerHost = "flink-cluster.example.com";
301
int jobManagerPort = 6123;
302
303
// Basic remote executor
304
RemoteExecutor remoteExecutor = new RemoteExecutor(jobManagerHost, jobManagerPort);
305
306
// Remote executor with JAR dependencies
307
List<URL> jarFiles = Arrays.asList(
308
new File("my-job.jar").toURI().toURL(),
309
new File("dependencies.jar").toURI().toURL()
310
);
311
RemoteExecutor remoteExecutorWithJars = new RemoteExecutor(jobManagerHost, jobManagerPort, jarFiles);
312
313
// Remote executor with full configuration
314
Configuration config = new Configuration();
315
config.setString("jobmanager.rpc.address", jobManagerHost);
316
config.setInteger("jobmanager.rpc.port", jobManagerPort);
317
318
InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress("flink-cluster.example.com:6123");
319
RemoteExecutor configuredRemoteExecutor = new RemoteExecutor(
320
jobManagerAddress,
321
config,
322
jarFiles,
323
Arrays.asList() // global classpaths
324
);
325
326
try {
327
remoteExecutor.start();
328
remoteExecutor.setDefaultParallelism(8);
329
330
// Execute a plan on the remote cluster
331
JobExecutionResult remoteResult = remoteExecutor.executePlan(myPlan);
332
System.out.println("Remote execution completed in: " + remoteResult.getNetRuntime() + "ms");
333
334
} finally {
335
remoteExecutor.stop();
336
}
337
338
// Static convenience methods for quick execution
339
JobExecutionResult quickLocalResult = LocalExecutor.execute(myPlan);
340
String quickPlanJson = LocalExecutor.getPlanAsJSON(myPlan);
341
```
342
343
### PlanExecutor Base Class
344
345
Base class for all plan executors providing common execution interface.
346
347
```java { .api }
348
/**
349
* Base class for all plan executors
350
*/
351
public abstract class PlanExecutor {
352
/**
353
* Starts the executor
354
* @throws Exception if startup fails
355
*/
356
public abstract void start() throws Exception;
357
358
/**
359
* Stops the executor
360
* @throws Exception if shutdown fails
361
*/
362
public abstract void stop() throws Exception;
363
364
/**
365
* Checks if the executor is running
366
* @return true if the executor is active
367
*/
368
public abstract boolean isRunning();
369
370
/**
371
* Executes a Flink execution plan
372
* @param plan The execution plan to run
373
* @return JobExecutionResult with execution details
374
* @throws Exception if execution fails
375
*/
376
public abstract JobExecutionResult executePlan(Plan plan) throws Exception;
377
378
/**
379
* Gets the optimized execution plan as JSON
380
* @param plan The execution plan to optimize
381
* @return JSON representation of the optimized plan
382
* @throws Exception if optimization fails
383
*/
384
public abstract String getOptimizerPlanAsJSON(Plan plan) throws Exception;
385
386
/**
387
* Ends a job session
388
* @param jobID The job ID to terminate
389
* @throws Exception if session termination fails
390
*/
391
public abstract void endSession(JobID jobID) throws Exception;
392
}
393
```