0
# Program Execution
1
2
Program packaging, classloader management, and execution utilities for submitting user applications to Flink clusters. Provides comprehensive support for JAR-based programs and execution environment management.
3
4
## Capabilities
5
6
### Packaged Program
7
8
Represents a program packaged in a JAR file with all necessary dependencies and configuration.
9
10
```java { .api }
11
/**
12
* Represents a program packaged in a JAR file
13
*/
14
public class PackagedProgram implements AutoCloseable {
15
16
// Manifest attribute constants
17
public static final String MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS = "program-class";
18
public static final String MANIFEST_ATTRIBUTE_MAIN_CLASS = "Main-Class";
19
20
/**
21
* Gets savepoint restore settings for this program
22
* @return SavepointRestoreSettings instance
23
*/
24
public SavepointRestoreSettings getSavepointSettings();
25
26
/**
27
* Gets the program arguments
28
* @return Array of program arguments
29
*/
30
public String[] getArguments();
31
32
/**
33
* Gets the main class name for this program
34
* @return Fully qualified main class name
35
*/
36
public String getMainClassName();
37
38
/**
39
* Gets program description by analyzing the JAR
40
* @return Program description string
41
* @throws ProgramInvocationException if description cannot be retrieved
42
*/
43
public String getDescription() throws ProgramInvocationException;
44
45
/**
46
* Invokes the program in interactive mode for execution
47
* @throws ProgramInvocationException if invocation fails
48
*/
49
public void invokeInteractiveModeForExecution() throws ProgramInvocationException;
50
51
/**
52
* Gets the required classpaths for this program
53
* @return List of classpath URLs
54
*/
55
public List<URL> getClasspaths();
56
57
/**
58
* Gets the user code classloader for this program
59
* @return ClassLoader for user code execution
60
*/
61
public ClassLoader getUserCodeClassLoader();
62
63
/**
64
* Gets the job JAR and all dependencies
65
* @return List of JAR and dependency URLs
66
*/
67
public List<URL> getJobJarAndDependencies();
68
69
/**
70
* Static method to get JAR and dependencies from file
71
* @param jarFile JAR file to analyze
72
* @param entryPointClassName Entry point class name
73
* @return List of JAR and dependency URLs
74
* @throws ProgramInvocationException if analysis fails
75
*/
76
public static List<URL> getJobJarAndDependencies(File jarFile, String entryPointClassName)
77
throws ProgramInvocationException;
78
79
/**
80
* Extracts nested libraries from a JAR file
81
* @param jarFile JAR file URL to extract from
82
* @return List of extracted library files
83
* @throws ProgramInvocationException if extraction fails
84
*/
85
public static List<File> extractContainedLibraries(URL jarFile)
86
throws ProgramInvocationException;
87
88
/**
89
* Creates a new builder for PackagedProgram
90
* @return Builder instance
91
*/
92
public static Builder newBuilder();
93
94
/**
95
* Closes the program and releases resources
96
*/
97
@Override
98
public void close();
99
100
/**
101
* Builder pattern for PackagedProgram construction
102
*/
103
public static class Builder {
104
/**
105
* Sets the JAR file for this program
106
* @param jarFile JAR file containing the program
107
* @return This builder instance
108
*/
109
public Builder setJarFile(File jarFile);
110
111
/**
112
* Sets the entry point class name
113
* @param entryPointClassName Fully qualified class name
114
* @return This builder instance
115
*/
116
public Builder setEntryPointClassName(String entryPointClassName);
117
118
/**
119
* Sets the program arguments
120
* @param args Variable arguments for the program
121
* @return This builder instance
122
*/
123
public Builder setArguments(String... args);
124
125
/**
126
* Sets additional user classpaths
127
* @param userClassPaths List of user classpath URLs
128
* @return This builder instance
129
*/
130
public Builder setUserClassPaths(List<URL> userClassPaths);
131
132
/**
133
* Sets the Flink configuration
134
* @param configuration Flink configuration instance
135
* @return This builder instance
136
*/
137
public Builder setConfiguration(Configuration configuration);
138
139
/**
140
* Sets savepoint restore settings
141
* @param savepointRestoreSettings Savepoint settings
142
* @return This builder instance
143
*/
144
public Builder setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings);
145
146
/**
147
* Builds the PackagedProgram instance
148
* @return PackagedProgram instance
149
* @throws ProgramInvocationException if building fails
150
*/
151
public PackagedProgram build() throws ProgramInvocationException;
152
}
153
}
154
```
155
156
**Usage Example:**
157
158
```java
159
import org.apache.flink.client.program.PackagedProgram;
160
import java.io.File;
161
162
// Create a packaged program from JAR
163
PackagedProgram program = PackagedProgram.newBuilder()
164
.setJarFile(new File("/path/to/my-flink-job.jar"))
165
.setEntryPointClassName("com.example.MyFlinkJob")
166
.setArguments("--input", "/data/input", "--output", "/data/output")
167
.build();
168
169
try {
170
System.out.println("Program: " + program.getDescription());
171
System.out.println("Main class: " + program.getMainClassName());
172
173
// Get classpath information
174
List<URL> dependencies = program.getJobJarAndDependencies();
175
System.out.println("Dependencies: " + dependencies.size());
176
177
// Execute the program
178
program.invokeInteractiveModeForExecution();
179
180
} finally {
181
program.close();
182
}
183
```
184
185
### Packaged Program Utilities
186
187
Utility functions for working with packaged programs.
188
189
```java { .api }
190
/**
191
* Utilities for packaged programs
192
*/
193
public class PackagedProgramUtils {
194
/**
195
* Checks if the program is a Python program
196
* @param entryPointClassName Entry point class name to check
197
* @return true if Python program, false otherwise
198
*/
199
public static boolean isPython(String entryPointClassName);
200
201
/**
202
* Gets the Python JAR URL for Python programs
203
* @return URL of the Python JAR
204
*/
205
public static URL getPythonJar();
206
}
207
```
208
209
### Program Retriever Interface
210
211
Interface for retrieving packaged programs from various sources.
212
213
```java { .api }
214
/**
215
* Interface for retrieving packaged programs
216
*/
217
public interface PackagedProgramRetriever {
218
/**
219
* Retrieves a packaged program
220
* @return PackagedProgram instance
221
* @throws ProgramRetrievalException if retrieval fails
222
*/
223
PackagedProgram getPackagedProgram() throws ProgramRetrievalException;
224
}
225
226
/**
227
* Default implementation for retrieving packaged programs
228
*/
229
public class DefaultPackagedProgramRetriever implements PackagedProgramRetriever {
230
@Override
231
public PackagedProgram getPackagedProgram() throws ProgramRetrievalException;
232
}
233
```
234
235
### Execution Environments
236
237
Specialized execution environments that provide context for program execution.
238
239
```java { .api }
240
/**
241
* Execution environment with context for DataSet programs
242
*/
243
public class ContextEnvironment extends ExecutionEnvironment {
244
// Provides execution context for DataSet API programs
245
// Automatically configured when running within Flink client
246
}
247
248
/**
249
* Execution environment for plan optimization
250
*/
251
public class OptimizerPlanEnvironment extends ExecutionEnvironment {
252
// Used for generating execution plans without actual execution
253
// Useful for plan analysis and optimization
254
}
255
256
/**
257
* Stream execution environment with context for DataStream programs
258
*/
259
public class StreamContextEnvironment extends StreamExecutionEnvironment {
260
// Provides execution context for DataStream API programs
261
// Automatically configured when running within Flink client
262
}
263
264
/**
265
* Environment for stream execution plans
266
*/
267
public class StreamPlanEnvironment extends StreamExecutionEnvironment {
268
// Used for generating stream execution plans
269
// Enables plan inspection without execution
270
}
271
```
272
273
**Usage Example:**
274
275
```java
276
import org.apache.flink.client.program.ContextEnvironment;
277
import org.apache.flink.api.java.ExecutionEnvironment;
278
279
// The environment is automatically set up when running through client
280
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
281
282
// If running in client context, this will be a ContextEnvironment
283
if (env instanceof ContextEnvironment) {
284
System.out.println("Running in client context");
285
}
286
287
// Create and execute DataSet program
288
DataSet<String> input = env.readTextFile("/path/to/input");
289
input.flatMap(new Tokenizer())
290
.groupBy(0)
291
.sum(1)
292
.writeAsCsv("/path/to/output");
293
294
env.execute("Word Count Example");
295
```
296
297
### Mini Cluster Factory
298
299
Factory for creating per-job mini clusters for testing and development.
300
301
```java { .api }
302
/**
303
* Factory for per-job mini clusters
304
*/
305
public class PerJobMiniClusterFactory {
306
/**
307
* Creates a mini cluster for running a single job
308
* @param configuration Cluster configuration
309
* @return MiniCluster instance configured for single job execution
310
*/
311
public static MiniCluster createMiniCluster(Configuration configuration);
312
}
313
```
314
315
## Exception Types
316
317
```java { .api }
318
/**
319
* Exception for program invocation errors
320
*/
321
public class ProgramInvocationException extends Exception {
322
/**
323
* Creates exception with message
324
* @param message Error message
325
*/
326
public ProgramInvocationException(String message);
327
328
/**
329
* Creates exception with message and job ID
330
* @param message Error message
331
* @param jobID Associated job ID
332
*/
333
public ProgramInvocationException(String message, JobID jobID);
334
335
/**
336
* Creates exception with cause
337
* @param cause Root cause throwable
338
*/
339
public ProgramInvocationException(Throwable cause);
340
341
/**
342
* Creates exception with message and cause
343
* @param message Error message
344
* @param cause Root cause throwable
345
*/
346
public ProgramInvocationException(String message, Throwable cause);
347
348
/**
349
* Creates exception with message, job ID and cause
350
* @param message Error message
351
* @param jobID Associated job ID
352
* @param cause Root cause throwable
353
*/
354
public ProgramInvocationException(String message, JobID jobID, Throwable cause);
355
}
356
357
/**
358
* Exception when program doesn't contain a job
359
*/
360
public class ProgramMissingJobException extends FlinkException {
361
/**
362
* Creates exception with message
363
* @param message Error message describing missing job
364
*/
365
public ProgramMissingJobException(String message);
366
}
367
368
/**
369
* Exception for program parameterization errors
370
*/
371
public class ProgramParametrizationException extends RuntimeException {
372
/**
373
* Creates exception with message
374
* @param message Error message
375
*/
376
public ProgramParametrizationException(String message);
377
378
/**
379
* Creates exception with message and cause
380
* @param message Error message
381
* @param cause Root cause throwable
382
*/
383
public ProgramParametrizationException(String message, Throwable cause);
384
}
385
386
/**
387
* Exception for program abortion (extends Error for immediate termination)
388
*/
389
public class ProgramAbortException extends Error {
390
/**
391
* Creates default abort exception
392
*/
393
public ProgramAbortException();
394
395
/**
396
* Creates abort exception with message
397
* @param message Abort message
398
*/
399
public ProgramAbortException(String message);
400
401
/**
402
* Creates abort exception with message and cause
403
* @param message Abort message
404
* @param cause Root cause throwable
405
*/
406
public ProgramAbortException(String message, Throwable cause);
407
}
408
409
/**
410
* Exception for program retrieval errors
411
*/
412
public class ProgramRetrievalException extends Exception {
413
public ProgramRetrievalException(String message);
414
public ProgramRetrievalException(String message, Throwable cause);
415
}
416
```
417
418
## Types
419
420
```java { .api }
421
public class SavepointRestoreSettings {
422
public static SavepointRestoreSettings none();
423
public static SavepointRestoreSettings forPath(String savepointPath);
424
public static SavepointRestoreSettings forPath(String savepointPath, boolean allowNonRestoredState);
425
426
public boolean restoreSavepoint();
427
public String getRestorePath();
428
public boolean allowNonRestoredState();
429
}
430
431
public abstract class ExecutionEnvironment {
432
public static ExecutionEnvironment getExecutionEnvironment();
433
public abstract JobExecutionResult execute(String jobName) throws Exception;
434
435
// DataSet API methods
436
public <X> DataSet<X> fromCollection(Collection<X> data);
437
public DataSet<String> readTextFile(String filePath);
438
}
439
440
public abstract class StreamExecutionEnvironment {
441
public static StreamExecutionEnvironment getExecutionEnvironment();
442
public JobExecutionResult execute(String jobName) throws Exception;
443
444
// DataStream API methods
445
public <T> DataStreamSource<T> fromCollection(Collection<T> data);
446
public DataStreamSource<String> socketTextStream(String hostname, int port);
447
}
448
449
public interface DataSet<T> {
450
<R> DataSet<R> map(MapFunction<T, R> mapper);
451
<R> DataSet<R> flatMap(FlatMapFunction<T, R> flatMapper);
452
DataSet<T> filter(FilterFunction<T> filter);
453
UnsortedGrouping<T> groupBy(int... fields);
454
DataSink<T> writeAsCsv(String filePath);
455
}
456
457
public interface DataStream<T> {
458
<R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper);
459
<R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper);
460
SingleOutputStreamOperator<T> filter(FilterFunction<T> filter);
461
DataStreamSink<T> print();
462
}
463
464
public class MiniCluster implements AutoCloseable {
465
public void start() throws Exception;
466
public void close() throws Exception;
467
public CompletableFuture<JobResult> executeJobBlocking(JobGraph job);
468
}
469
```