0
# Program Execution
1
2
The Apache Flink Program Execution module (`org.apache.flink.client.program.*`) provides comprehensive program packaging, execution environments, and job lifecycle management capabilities. This module handles the packaging of user programs, provides cluster client implementations, and manages execution contexts for both batch and streaming applications.
3
4
## Core Program Interfaces
5
6
### ClusterClient { .api }
7
8
Main interface for interacting with Flink clusters, providing job submission and management capabilities.
9
10
```java
11
public interface ClusterClient<T> extends AutoCloseable {
12
// Cluster information
13
T getClusterId();
14
Configuration getFlinkConfiguration();
15
String getWebInterfaceURL();
16
17
// Job listing and status
18
CompletableFuture<Collection<JobStatusMessage>> listJobs();
19
CompletableFuture<JobStatus> getJobStatus(JobID jobId);
20
CompletableFuture<JobResult> requestJobResult(JobID jobId);
21
22
// Job submission and management
23
CompletableFuture<JobID> submitJob(JobGraph jobGraph);
24
CompletableFuture<Acknowledge> cancel(JobID jobId);
25
CompletableFuture<String> cancelWithSavepoint(JobID jobId, String savepointDirectory);
26
CompletableFuture<String> stopWithSavepoint(JobID jobId, boolean advanceToEndOfEventTime, String savepointDirectory);
27
28
// Savepoint operations
29
CompletableFuture<String> triggerSavepoint(JobID jobId, String savepointDirectory);
30
CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath);
31
32
// Accumulators and metrics
33
default CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID) { }
34
CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader);
35
36
// Coordination requests
37
CompletableFuture<CoordinationResponse> sendCoordinationRequest(JobID jobId,
38
OperatorID operatorId,
39
CoordinationRequest request);
40
41
// Cluster management
42
void shutDownCluster();
43
void close();
44
}
45
```
46
47
### PackagedProgram { .api }
48
49
Represents a packaged Flink program with all necessary metadata and dependencies.
50
51
```java
52
public class PackagedProgram implements AutoCloseable {
53
// Constants
54
public static final String MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS = "program-class";
55
public static final String MANIFEST_ATTRIBUTE_MAIN_CLASS = "Main-Class";
56
57
// Program information
58
public String getMainClassName() { }
59
public String[] getArguments() { }
60
public ClassLoader getUserCodeClassLoader() { }
61
public Configuration getConfiguration() { }
62
public SavepointRestoreSettings getSavepointSettings() { }
63
64
// Dependencies and resources
65
public List<URL> getJobJarAndDependencies() { }
66
public static List<URL> getJobJarAndDependencies(File jarFile, String entryPointClassName) { }
67
68
// Program execution
69
public void invokeInteractiveModeForExecution() { }
70
public String getDescription() { }
71
public boolean isPython() { }
72
73
// Resource management
74
public void close() { }
75
76
// Builder pattern
77
public static Builder newBuilder() { }
78
79
public static class Builder {
80
public Builder setJarFile(File jarFile) { }
81
public Builder setUserClassPaths(List<URL> userClassPaths) { }
82
public Builder setEntryPointClassName(String entryPointClassName) { }
83
public Builder setConfiguration(Configuration configuration) { }
84
public Builder setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings) { }
85
public Builder setArguments(String... arguments) { }
86
public PackagedProgram build() { }
87
}
88
}
89
```
90
91
### ClusterClientProvider { .api }
92
93
Provider interface for obtaining cluster clients.
94
95
```java
96
public interface ClusterClientProvider<T> {
97
ClusterClient<T> getClusterClient();
98
}
99
```
100
101
## Execution Environment Classes
102
103
### ContextEnvironment { .api }
104
105
Execution environment that delegates to a configured pipeline executor.
106
107
```java
108
public class ContextEnvironment extends ExecutionEnvironment {
109
// Static context management
110
public static void setAsContext(PipelineExecutorServiceLoader executorServiceLoader,
111
Configuration configuration,
112
ClassLoader userCodeClassLoader,
113
boolean enforceSingleJobExecution,
114
boolean suppressSysout) { }
115
116
public static void unsetAsContext() { }
117
118
// ExecutionEnvironment implementation with delegation to executor
119
}
120
```
121
122
### StreamContextEnvironment { .api }
123
124
Stream execution environment for context-based execution.
125
126
```java
127
public class StreamContextEnvironment extends StreamExecutionEnvironment {
128
// Static context management
129
public static void setAsContext(PipelineExecutorServiceLoader executorServiceLoader,
130
Configuration configuration,
131
ClassLoader userCodeClassLoader,
132
boolean enforceSingleJobExecution,
133
boolean suppressSysout) { }
134
135
public static void unsetAsContext() { }
136
137
// StreamExecutionEnvironment implementation with delegation
138
}
139
```
140
141
### OptimizerPlanEnvironment { .api }
142
143
Execution environment for creating execution plans without execution.
144
145
```java
146
public class OptimizerPlanEnvironment extends ExecutionEnvironment {
147
// Constructor
148
public OptimizerPlanEnvironment(Optimizer optimizer, int defaultParallelism) { }
149
150
// Plan generation methods
151
public OptimizedPlan getOptimizedPlan(Program program) { }
152
public OptimizedPlan getOptimizedPlan(List<DataSinkNode> sinks) { }
153
public String getExecutionPlan() { }
154
155
// ExecutionEnvironment overrides for optimization
156
public JobExecutionResult execute() { }
157
public JobExecutionResult execute(String jobName) { }
158
159
// Plan access
160
public Plan createProgramPlan() { }
161
public Plan createProgramPlan(String jobName) { }
162
}
163
```
164
165
### StreamPlanEnvironment { .api }
166
167
Stream execution environment for plan generation.
168
169
```java
170
public class StreamPlanEnvironment extends StreamExecutionEnvironment {
171
// Constructor
172
public StreamPlanEnvironment(PipelineExecutorServiceLoader executorServiceLoader,
173
Configuration configuration,
174
ClassLoader userClassLoader) { }
175
176
// Stream plan generation methods
177
public StreamGraph getStreamGraph() { }
178
public StreamGraph getStreamGraph(boolean clearTransformations) { }
179
public String getExecutionPlan() { }
180
181
// StreamExecutionEnvironment overrides for planning
182
public JobExecutionResult execute() { }
183
public JobExecutionResult execute(String jobName) { }
184
public JobExecutionResult execute(StreamGraph streamGraph) { }
185
186
// Plan conversion utilities
187
public JobGraph getJobGraph() { }
188
public JobGraph getJobGraph(String jobName) { }
189
}
190
```
191
192
## Cluster Client Implementations
193
194
### MiniClusterClient { .api }
195
196
Cluster client implementation for local mini clusters.
197
198
```java
199
public class MiniClusterClient implements ClusterClient<MiniClusterClient.MiniClusterId> {
200
// ClusterClient interface implementation for mini clusters
201
202
// Nested cluster ID class
203
public static class MiniClusterId {
204
// Mini cluster identification
205
}
206
}
207
```
208
209
## Program Utilities
210
211
### PackagedProgramRetriever { .api }
212
213
Interface for retrieving packaged programs.
214
215
```java
216
public interface PackagedProgramRetriever {
217
PackagedProgram getPackagedProgram();
218
}
219
```
220
221
### DefaultPackagedProgramRetriever { .api }
222
223
Default implementation of packaged program retriever.
224
225
```java
226
public class DefaultPackagedProgramRetriever implements PackagedProgramRetriever {
227
public PackagedProgram getPackagedProgram() { }
228
}
229
```
230
231
### PackagedProgramUtils { .api }
232
233
Utility enum with static methods for working with packaged programs.
234
235
```java
236
public enum PackagedProgramUtils {
237
// Static utility methods
238
public static boolean isPython(String entryPointClassName) { }
239
public static Pipeline getPipelineFromProgram(PackagedProgram program,
240
Configuration configuration,
241
int parallelism,
242
boolean suppressOutput) { }
243
public static URI resolveURI(String path) { }
244
}
245
```
246
247
### PerJobMiniClusterFactory { .api }
248
249
Factory for creating per-job mini clusters.
250
251
```java
252
public class PerJobMiniClusterFactory {
253
// Constructor
254
public PerJobMiniClusterFactory(Configuration configuration) { }
255
256
// Mini cluster creation methods for per-job execution
257
public MiniCluster createMiniCluster(JobGraph jobGraph,
258
Configuration configuration) { }
259
260
public MiniClusterConfiguration createMiniClusterConfiguration(JobGraph jobGraph,
261
Configuration configuration) { }
262
263
// Resource calculation methods
264
public int calculateNumberOfTaskManagers(JobGraph jobGraph) { }
265
public int calculateTaskSlotsPerTaskManager(Configuration configuration) { }
266
267
// Configuration setup
268
public Configuration setupConfiguration(Configuration originalConfig,
269
JobGraph jobGraph) { }
270
}
271
```
272
273
## Exception Classes
274
275
### ProgramInvocationException { .api }
276
277
Exception thrown during program invocation.
278
279
```java
280
public class ProgramInvocationException extends Exception {
281
// Constructors
282
public ProgramInvocationException(String message) { }
283
public ProgramInvocationException(String message, Throwable cause) { }
284
public ProgramInvocationException(Throwable cause) { }
285
}
286
```
287
288
### ProgramParametrizationException { .api }
289
290
Runtime exception for program parametrization errors.
291
292
```java
293
public class ProgramParametrizationException extends RuntimeException {
294
// Constructors
295
public ProgramParametrizationException(String message) { }
296
public ProgramParametrizationException(String message, Throwable cause) { }
297
public ProgramParametrizationException(Throwable cause) { }
298
}
299
```
300
301
### ProgramMissingJobException { .api }
302
303
Exception thrown when a program doesn't define any jobs.
304
305
```java
306
public class ProgramMissingJobException extends FlinkException {
307
// Constructors
308
public ProgramMissingJobException(String message) { }
309
public ProgramMissingJobException(String message, Throwable cause) { }
310
}
311
```
312
313
### ProgramAbortException { .api }
314
315
Error thrown to abort program execution.
316
317
```java
318
public class ProgramAbortException extends Error {
319
// Constructors
320
public ProgramAbortException(String message) { }
321
public ProgramAbortException(String message, Throwable cause) { }
322
}
323
```
324
325
## Client Utilities
326
327
### ClientUtils Core Methods { .api }
328
329
Detailed implementation of core ClientUtils methods for program execution and job management.
330
331
#### buildUserCodeClassLoader Method
332
333
```java
334
public static URLClassLoader buildUserCodeClassLoader(List<URL> jars,
335
List<URL> classpaths,
336
ClassLoader parent,
337
Configuration configuration) {
338
// Combines JAR files and classpath URLs into a single URLClassLoader
339
// Ensures proper isolation of user code from system classes
340
// Handles parent-first or child-first delegation based on configuration
341
// Returns URLClassLoader configured for user code execution
342
}
343
```
344
345
**Parameters:**
346
- `jars`: List of JAR file URLs containing user program code
347
- `classpaths`: Additional classpath URLs for dependencies
348
- `parent`: Parent ClassLoader for delegation
349
- `configuration`: Flink configuration containing classloader settings
350
351
**Returns:** URLClassLoader configured for user code isolation
352
353
#### executeProgram Method
354
355
```java
356
public static void executeProgram(PipelineExecutorServiceLoader executorServiceLoader,
357
Configuration configuration,
358
PackagedProgram program,
359
boolean enforceSingleJobExecution,
360
boolean suppressSysout) {
361
// Sets up execution environment contexts
362
// Loads and executes the packaged program
363
// Handles both batch and streaming execution modes
364
// Manages job lifecycle including submission and monitoring
365
// Cleans up resources after execution completion
366
}
367
```
368
369
**Parameters:**
370
- `executorServiceLoader`: Service loader for pipeline executors
371
- `configuration`: Execution configuration
372
- `program`: Packaged program to execute
373
- `enforceSingleJobExecution`: Whether to enforce single job execution
374
- `suppressSysout`: Whether to suppress system output during execution
375
376
#### waitUntilJobInitializationFinished Method
377
378
```java
379
public static void waitUntilJobInitializationFinished(SupplierWithException<JobStatus, Exception> jobStatusSupplier,
380
SupplierWithException<JobResult, Exception> jobResultSupplier,
381
ClassLoader userCodeClassloader) {
382
// Polls job status until initialization is complete
383
// Handles various job states during startup phase
384
// Manages timeout and retry logic for status checks
385
// Switches context to user classloader for status operations
386
// Throws appropriate exceptions for initialization failures
387
}
388
```
389
390
**Parameters:**
391
- `jobStatusSupplier`: Supplier for retrieving current job status
392
- `jobResultSupplier`: Supplier for retrieving job result when available
393
- `userCodeClassloader`: User code classloader for context switching
394
395
## Usage Examples
396
397
### Basic Program Execution
398
399
```java
400
// Create packaged program
401
PackagedProgram program = PackagedProgram.newBuilder()
402
.setJarFile(new File("my-flink-job.jar"))
403
.setEntryPointClassName("com.example.MyFlinkJob")
404
.setArguments("--input", "/data/input", "--output", "/data/output")
405
.setConfiguration(new Configuration())
406
.build();
407
408
// Execute program using ClientUtils
409
Configuration config = new Configuration();
410
config.setString("execution.target", "local");
411
412
PipelineExecutorServiceLoader executorLoader =
413
PipelineExecutorServiceLoader.fromConfiguration(config);
414
415
try {
416
ClientUtils.executeProgram(executorLoader, config, program, false, false);
417
} finally {
418
program.close();
419
}
420
```
421
422
### Cluster Client Usage
423
424
```java
425
// Get cluster client through factory
426
Configuration config = new Configuration();
427
config.setString("rest.address", "localhost");
428
config.setInteger("rest.port", 8081);
429
430
ClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader();
431
ClusterClientFactory<StandaloneClusterId> factory = serviceLoader.getClusterClientFactory(config);
432
433
try (ClusterDescriptor<StandaloneClusterId> descriptor = factory.createClusterDescriptor(config)) {
434
StandaloneClusterId clusterId = factory.getClusterId(config);
435
436
try (ClusterClient<StandaloneClusterId> client = descriptor.retrieve(clusterId).getClusterClient()) {
437
// Submit job
438
JobGraph jobGraph = /* create job graph */;
439
CompletableFuture<JobID> jobIdFuture = client.submitJob(jobGraph);
440
JobID jobId = jobIdFuture.get();
441
442
// Monitor job status
443
CompletableFuture<JobStatus> statusFuture = client.getJobStatus(jobId);
444
JobStatus status = statusFuture.get();
445
System.out.println("Job status: " + status);
446
447
// Get job result
448
CompletableFuture<JobResult> resultFuture = client.requestJobResult(jobId);
449
JobResult result = resultFuture.get();
450
}
451
}
452
```
453
454
### Context Environment Usage
455
456
```java
457
// Set up context environment
458
Configuration config = new Configuration();
459
PipelineExecutorServiceLoader executorLoader =
460
PipelineExecutorServiceLoader.fromConfiguration(config);
461
ClassLoader userClassLoader = Thread.currentThread().getContextClassLoader();
462
463
ContextEnvironment.setAsContext(executorLoader, config, userClassLoader, false, false);
464
StreamContextEnvironment.setAsContext(executorLoader, config, userClassLoader, false, false);
465
466
try {
467
// Your Flink program code here
468
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
469
// ... define your job ...
470
env.execute("My Flink Job");
471
} finally {
472
// Clean up context
473
ContextEnvironment.unsetAsContext();
474
StreamContextEnvironment.unsetAsContext();
475
}
476
```
477
478
### Mini Cluster Usage
479
480
```java
481
// Create mini cluster configuration
482
Configuration config = new Configuration();
483
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 4);
484
485
// Create and start mini cluster
486
MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration(
487
config,
488
1, // number of task managers
489
RpcServiceUtils.createRemoteRpcService(/* configuration */),
490
"localhost"
491
));
492
493
miniCluster.start();
494
495
try {
496
// Create mini cluster client
497
MiniClusterClient client = new MiniClusterClient(config, miniCluster);
498
499
// Use client for job operations
500
CompletableFuture<Collection<JobStatusMessage>> jobs = client.listJobs();
501
502
} finally {
503
miniCluster.close();
504
}
505
```
506
507
### Savepoint Operations
508
509
```java
510
// Trigger savepoint
511
try (ClusterClient<StandaloneClusterId> client = /* get cluster client */) {
512
JobID jobId = /* your job ID */;
513
String savepointDirectory = "hdfs://namenode:port/savepoints";
514
515
CompletableFuture<String> savepointFuture =
516
client.triggerSavepoint(jobId, savepointDirectory);
517
String savepointPath = savepointFuture.get();
518
519
System.out.println("Savepoint created at: " + savepointPath);
520
521
// Later, dispose the savepoint if no longer needed
522
CompletableFuture<Acknowledge> disposeFuture = client.disposeSavepoint(savepointPath);
523
disposeFuture.get();
524
}
525
```
526
527
### Pipeline from Program
528
529
```java
530
// Extract pipeline from packaged program
531
PackagedProgram program = /* create packaged program */;
532
Configuration config = new Configuration();
533
int parallelism = 4;
534
boolean suppressOutput = true;
535
536
Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(
537
program, config, parallelism, suppressOutput);
538
539
// Use pipeline with executor
540
PipelineExecutor executor = /* get executor */;
541
CompletableFuture<JobClient> jobClientFuture =
542
executor.execute(pipeline, config, program.getUserCodeClassLoader());
543
```
544
545
## Required Imports
546
547
```java
548
import org.apache.flink.client.program.ClusterClient;
549
import org.apache.flink.client.program.PackagedProgram;
550
import org.apache.flink.client.program.ClusterClientProvider;
551
import org.apache.flink.client.program.ContextEnvironment;
552
import org.apache.flink.client.program.StreamContextEnvironment;
553
import org.apache.flink.client.program.OptimizerPlanEnvironment;
554
import org.apache.flink.client.program.StreamPlanEnvironment;
555
import org.apache.flink.client.program.MiniClusterClient;
556
import org.apache.flink.client.program.PackagedProgramRetriever;
557
import org.apache.flink.client.program.DefaultPackagedProgramRetriever;
558
import org.apache.flink.client.program.PackagedProgramUtils;
559
import org.apache.flink.client.program.PerJobMiniClusterFactory;
560
import org.apache.flink.client.program.ProgramInvocationException;
561
import org.apache.flink.client.program.ProgramParametrizationException;
562
import org.apache.flink.client.program.ProgramMissingJobException;
563
import org.apache.flink.client.program.ProgramAbortException;
564
import org.apache.flink.client.ClientUtils;
565
import org.apache.flink.optimizer.Optimizer;
566
import org.apache.flink.optimizer.plan.OptimizedPlan;
567
import org.apache.flink.optimizer.dag.DataSinkNode;
568
import org.apache.flink.api.common.Plan;
569
import org.apache.flink.streaming.api.graph.StreamGraph;
570
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
571
import org.apache.flink.util.function.SupplierWithException;
572
import org.apache.flink.api.common.ExecutionConfig;
573
import org.apache.flink.api.java.ExecutionEnvironment;
574
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
575
import org.apache.flink.api.common.JobID;
576
import org.apache.flink.api.common.JobStatus;
577
import org.apache.flink.runtime.jobgraph.JobGraph;
578
import org.apache.flink.runtime.jobmaster.JobResult;
579
import org.apache.flink.runtime.messages.Acknowledge;
580
import org.apache.flink.runtime.jobgraph.OperatorID;
581
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
582
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
583
import org.apache.flink.core.execution.JobClient;
584
import org.apache.flink.core.execution.PipelineExecutor;
585
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
586
import org.apache.flink.api.dag.Pipeline;
587
import org.apache.flink.configuration.Configuration;
588
import org.apache.flink.runtime.state.StateBackend;
589
import org.apache.flink.runtime.minicluster.MiniCluster;
590
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
591
import org.apache.flink.util.FlinkException;
592
import java.util.concurrent.CompletableFuture;
593
import java.util.Collection;
594
import java.util.Map;
595
import java.util.List;
596
import java.io.File;
597
import java.net.URL;
598
import java.net.URI;
599
import java.net.URLClassLoader;
600
```