0
# Program Management
1
2
Utilities for packaging Flink programs as JAR files and managing their execution with dependencies and classpath handling.
3
4
## Capabilities
5
6
### PackagedProgram
7
8
Represents a Flink program packaged in a JAR file with its dependencies, providing comprehensive program metadata and execution support.
9
10
```java { .api }
11
/**
12
* This class encapsulates represents a program, packaged in a jar file. It supplies
13
* functionality to extract nested libraries, search for the program entry point, and extract
14
* a program plan.
15
*/
16
public class PackagedProgram {
17
18
// Public Constants
19
20
/**
21
* Property name of the entry in JAR manifest file that describes the Flink specific entry point.
22
*/
23
public static final String MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS = "program-class";
24
25
/**
26
* Property name of the entry in JAR manifest file that describes the class with the main method.
27
*/
28
public static final String MANIFEST_ATTRIBUTE_MAIN_CLASS = "Main-Class";
29
30
// Constructors
31
32
/**
33
* Creates an instance that wraps the plan defined in the jar file using the given
34
* argument.
35
* @param jarFile The jar file which contains the plan and a Manifest which defines the program-class
36
* @param args Optional. The arguments used to create the pact plan, depend on implementation of the pact plan. See getDescription().
37
* @throws ProgramInvocationException This invocation is thrown if the Program can't be properly loaded. Causes
38
* may be a missing / wrong class or manifest files.
39
*/
40
public PackagedProgram(File jarFile, String... args) throws ProgramInvocationException;
41
42
/**
43
* Creates an instance that wraps the plan defined in the jar file using the given
44
* argument.
45
* @param jarFile The jar file which contains the plan and a Manifest which defines the program-class
46
* @param classpaths Additional classpath URLs needed by the Program.
47
* @param args Optional. The arguments used to create the pact plan, depend on implementation of the pact plan. See getDescription().
48
* @throws ProgramInvocationException This invocation is thrown if the Program can't be properly loaded. Causes
49
* may be a missing / wrong class or manifest files.
50
*/
51
public PackagedProgram(File jarFile, List<URL> classpaths, String... args) throws ProgramInvocationException;
52
53
/**
54
* Creates an instance that wraps the plan defined in the jar file using the given
55
* arguments. For generating the plan the class defined in the className parameter
56
* is used.
57
* @param jarFile The jar file which contains the plan.
58
* @param entryPointClassName Name of the class which generates the plan. Overrides the class defined in the jar file manifest
59
* @param args Optional. The arguments used to create the pact plan, depend on implementation of the pact plan. See getDescription().
60
* @throws ProgramInvocationException This invocation is thrown if the Program can't be properly loaded. Causes
61
* may be a missing / wrong class or manifest files.
62
*/
63
public PackagedProgram(File jarFile, String entryPointClassName, String... args) throws ProgramInvocationException;
64
65
/**
66
* Creates an instance that wraps the plan defined in the jar file using the given
67
* arguments. For generating the plan the class defined in the className parameter
68
* is used.
69
* @param jarFile The jar file which contains the plan.
70
* @param classpaths Additional classpath URLs needed by the Program.
71
* @param entryPointClassName Name of the class which generates the plan. Overrides the class defined in the jar file manifest
72
* @param args Optional. The arguments used to create the pact plan, depend on implementation of the pact plan. See getDescription().
73
* @throws ProgramInvocationException This invocation is thrown if the Program can't be properly loaded. Causes
74
* may be a missing / wrong class or manifest files.
75
*/
76
public PackagedProgram(File jarFile, List<URL> classpaths, String entryPointClassName, String... args) throws ProgramInvocationException;
77
78
// Configuration Methods
79
80
/**
81
* Sets the savepoint restore settings for this program.
82
* @param savepointSettings The savepoint restore settings
83
*/
84
public void setSavepointRestoreSettings(SavepointRestoreSettings savepointSettings);
85
86
/**
87
* Gets the savepoint restore settings for this program.
88
* @return The savepoint restore settings
89
*/
90
public SavepointRestoreSettings getSavepointSettings();
91
92
// Program Information Methods
93
94
/**
95
* Returns the arguments of this program.
96
* @return The arguments of this program
97
*/
98
public String[] getArguments();
99
100
/**
101
* Returns the main class name of this program.
102
* @return The main class name
103
*/
104
public String getMainClassName();
105
106
/**
107
* Returns true if this program uses interactive mode (main method).
108
* @return true if using interactive mode
109
*/
110
public boolean isUsingInteractiveMode();
111
112
/**
113
* Returns true if this program uses the program entry point.
114
* @return true if using program entry point
115
*/
116
public boolean isUsingProgramEntryPoint();
117
118
/**
119
* Returns the description provided by the Program class. This
120
* may contain a description of the plan itself and its arguments.
121
* @return The description of the PactProgram's input parameters.
122
* @throws ProgramInvocationException This invocation is thrown if the Program can't be properly loaded. Causes
123
* may be a missing / wrong class or manifest files.
124
*/
125
public String getDescription() throws ProgramInvocationException;
126
127
// Plan Generation Methods
128
129
/**
130
* Returns the plan without the required jars when the files are already provided by the cluster.
131
* @return The plan without attached jar files.
132
* @throws ProgramInvocationException if plan generation fails
133
*/
134
public JobWithJars getPlanWithoutJars() throws ProgramInvocationException;
135
136
/**
137
* Returns the plan with all required jars.
138
* @return The plan with attached jar files.
139
* @throws ProgramInvocationException if plan generation fails
140
*/
141
public JobWithJars getPlanWithJars() throws ProgramInvocationException;
142
143
/**
144
* Returns the analyzed plan without any optimizations.
145
* @return the analyzed plan without any optimizations.
146
* @throws ProgramInvocationException Thrown if an error occurred in the user-provided pact assembler. This may indicate
147
* missing parameters for generation.
148
*/
149
public String getPreviewPlan() throws ProgramInvocationException;
150
151
// Execution Methods
152
153
/**
154
* This method assumes that the context environment is prepared, or the execution
155
* will be a local execution by default.
156
* @throws ProgramInvocationException if execution fails
157
*/
158
public void invokeInteractiveModeForExecution() throws ProgramInvocationException;
159
160
// Classpath and Library Methods
161
162
/**
163
* Returns the classpaths that are required by the program.
164
* @return List of URLs for classpaths
165
*/
166
public List<URL> getClasspaths();
167
168
/**
169
* Gets the ClassLoader that must be used to load user code classes.
170
* @return The user code ClassLoader.
171
*/
172
public ClassLoader getUserCodeClassLoader();
173
174
/**
175
* Returns all provided libraries needed to run the program.
176
* @return List of library URLs
177
*/
178
public List<URL> getAllLibraries();
179
180
/**
181
* Deletes all temporary files created for contained packaged libraries.
182
*/
183
public void deleteExtractedLibraries();
184
185
// Static Utility Methods
186
187
/**
188
* Takes all JAR files that are contained in this program's JAR file and extracts them
189
* to the system's temp directory.
190
* @param jarFile The JAR file to extract libraries from
191
* @return The file names of the extracted temporary files.
192
* @throws ProgramInvocationException Thrown, if the extraction process failed.
193
*/
194
public static List<File> extractContainedLibraries(URL jarFile) throws ProgramInvocationException;
195
196
/**
197
* Deletes the extracted temporary library files.
198
* @param tempLibraries List of temporary library files to delete
199
*/
200
public static void deleteExtractedLibraries(List<File> tempLibraries);
201
202
/**
203
* Gets the command line arguments for this program
204
* @return Array of command line arguments
205
*/
206
public String[] getArguments();
207
208
/**
209
* Gets the fully qualified main class name
210
* @return Main class name for program execution
211
*/
212
public String getMainClassName();
213
214
/**
215
* Checks if the program uses interactive mode for execution
216
* @return true if interactive mode is enabled
217
*/
218
public boolean isUsingInteractiveMode();
219
220
/**
221
* Checks if the program uses a specific entry point class
222
* @return true if using program entry point
223
*/
224
public boolean isUsingProgramEntryPoint();
225
226
/**
227
* Gets the execution plan with associated JAR files
228
* @return JobWithJars containing the execution plan and dependencies
229
* @throws Exception if plan generation fails
230
*/
231
public JobWithJars getPlanWithJars() throws Exception;
232
233
/**
234
* Gets the execution plan without JAR file dependencies
235
* @return JobWithJars containing only the execution plan
236
* @throws Exception if plan generation fails
237
*/
238
public JobWithJars getPlanWithoutJars() throws Exception;
239
240
/**
241
* Gets a preview of the execution plan as a string
242
* @return String representation of the execution plan
243
* @throws Exception if plan preview generation fails
244
*/
245
public String getPreviewPlan() throws Exception;
246
247
/**
248
* Gets a description of the program
249
* @return Program description string
250
* @throws Exception if description retrieval fails
251
*/
252
public String getDescription() throws Exception;
253
254
/**
255
* Invokes the program in interactive mode for execution
256
* @throws Exception if interactive execution fails
257
*/
258
public void invokeInteractiveModeForExecution() throws Exception;
259
260
/**
261
* Gets all library URLs associated with this program
262
* @return List of library URLs
263
*/
264
public List<URL> getAllLibraries();
265
266
/**
267
* Gets the additional classpath URLs
268
* @return List of classpath URLs
269
*/
270
public List<URL> getClasspaths();
271
272
/**
273
* Gets the user code class loader for this program
274
* @return ClassLoader for user code execution
275
*/
276
public ClassLoader getUserCodeClassLoader();
277
278
/**
279
* Deletes any temporarily extracted library files
280
*/
281
public void deleteExtractedLibraries();
282
283
/**
284
* Sets savepoint restore settings for this program
285
* @param savepointSettings Settings for savepoint restoration
286
*/
287
public void setSavepointRestoreSettings(SavepointRestoreSettings savepointSettings);
288
}
289
```
290
291
**Program Manifest Constants:**
292
293
```java { .api }
294
/**
295
* Manifest attribute for the assembler class
296
*/
297
public static final String MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS = "program-class";
298
299
/**
300
* Manifest attribute for the main class
301
*/
302
public static final String MANIFEST_ATTRIBUTE_MAIN_CLASS = "Main-Class";
303
```
304
305
### JobWithJars
306
307
Represents a Flink dataflow plan with associated JAR files and classpaths for execution.
308
309
```java { .api }
310
/**
311
* Represents a Flink dataflow plan with associated JAR files and classpaths
312
*/
313
public class JobWithJars {
314
/**
315
* Creates a job with execution plan and dependencies
316
* @param plan The Flink execution plan
317
* @param jarFiles List of JAR file URLs
318
* @param classpaths List of additional classpath URLs
319
*/
320
public JobWithJars(Plan plan, List<URL> jarFiles, List<URL> classpaths);
321
322
/**
323
* Creates a job with execution plan and single JAR file
324
* @param plan The Flink execution plan
325
* @param jarFile Single JAR file URL
326
*/
327
public JobWithJars(Plan plan, URL jarFile);
328
329
/**
330
* Gets the Flink execution plan
331
* @return The execution plan for this job
332
*/
333
public Plan getPlan();
334
335
/**
336
* Gets the list of JAR file URLs
337
* @return List of JAR file URLs required for execution
338
*/
339
public List<URL> getJarFiles();
340
341
/**
342
* Gets the list of additional classpath URLs
343
* @return List of classpath URLs for dependencies
344
*/
345
public List<URL> getClasspaths();
346
347
/**
348
* Gets the user code class loader
349
* @return ClassLoader for user code execution
350
*/
351
public ClassLoader getUserCodeClassLoader();
352
353
/**
354
* Validates that a JAR file URL is valid and accessible
355
* @param jar The JAR file URL to validate
356
* @throws Exception if the JAR file is invalid or inaccessible
357
*/
358
public static void checkJarFile(URL jar) throws Exception;
359
360
/**
361
* Builds a user code class loader from JAR files and classpaths
362
* @param jars List of JAR file URLs
363
* @param classpaths List of classpath URLs
364
* @param parent Parent class loader
365
* @return Configured ClassLoader for user code
366
*/
367
public static ClassLoader buildUserCodeClassLoader(List<URL> jars, List<URL> classpaths, ClassLoader parent);
368
}
369
```
370
371
**Usage Examples:**
372
373
```java
374
import org.apache.flink.client.program.PackagedProgram;
375
import org.apache.flink.client.program.JobWithJars;
376
import java.io.File;
377
import java.net.URL;
378
import java.util.Arrays;
379
import java.util.List;
380
381
// Create a basic packaged program
382
File jarFile = new File("path/to/my-flink-job.jar");
383
PackagedProgram program = new PackagedProgram(jarFile, "arg1", "arg2", "arg3");
384
385
// Get program information
386
System.out.println("Main class: " + program.getMainClassName());
387
System.out.println("Arguments: " + Arrays.toString(program.getArguments()));
388
389
// Create a program with additional dependencies
390
List<URL> dependencies = Arrays.asList(
391
new File("lib/dependency1.jar").toURI().toURL(),
392
new File("lib/dependency2.jar").toURI().toURL()
393
);
394
395
PackagedProgram programWithDeps = new PackagedProgram(
396
jarFile,
397
dependencies,
398
"com.example.MyFlinkJob",
399
"arg1", "arg2"
400
);
401
402
// Get execution plan
403
JobWithJars jobWithJars = program.getPlanWithJars();
404
Plan executionPlan = jobWithJars.getPlan();
405
406
// Preview execution plan
407
String planPreview = program.getPreviewPlan();
408
System.out.println("Execution plan preview: " + planPreview);
409
410
// Access class loader for custom operations
411
ClassLoader userClassLoader = program.getUserCodeClassLoader();
412
413
// Clean up temporary files
414
program.deleteExtractedLibraries();
415
```
416
417
### Program Exception Types
418
419
Exception types for program-related errors.
420
421
```java { .api }
422
/**
423
* Exception indicating errors during Flink program invocation
424
*/
425
public class ProgramInvocationException extends Exception {
426
/**
427
* Creates exception with error message
428
* @param message Error description
429
*/
430
public ProgramInvocationException(String message);
431
432
/**
433
* Creates exception with underlying cause
434
* @param cause Root cause of the error
435
*/
436
public ProgramInvocationException(Throwable cause);
437
438
/**
439
* Creates exception with message and underlying cause
440
* @param message Error description
441
* @param cause Root cause of the error
442
*/
443
public ProgramInvocationException(String message, Throwable cause);
444
}
445
446
/**
447
* Runtime exception indicating errors in Flink program parametrization
448
*/
449
public class ProgramParametrizationException extends RuntimeException {
450
/**
451
* Creates exception with error message
452
* @param message Error description
453
*/
454
public ProgramParametrizationException(String message);
455
}
456
457
/**
458
* Exception indicating no job was executed during program invocation
459
*/
460
public class ProgramMissingJobException extends Exception {
461
public ProgramMissingJobException();
462
}
463
```
464
465
### Savepoint Restore Settings
466
467
Configuration for restoring jobs from savepoints.
468
469
```java { .api }
470
/**
471
* Settings for restoring jobs from savepoints
472
*/
473
public class SavepointRestoreSettings {
474
/**
475
* Creates settings for no savepoint restoration
476
* @return SavepointRestoreSettings with no restoration
477
*/
478
public static SavepointRestoreSettings none();
479
480
/**
481
* Creates settings for restoring from a savepoint path
482
* @param savepointPath Path to the savepoint directory
483
* @return SavepointRestoreSettings configured for the specified path
484
*/
485
public static SavepointRestoreSettings forPath(String savepointPath);
486
487
/**
488
* Creates settings for restoring from a savepoint with non-restored state handling
489
* @param savepointPath Path to the savepoint directory
490
* @param allowNonRestoredState Whether to allow state that cannot be restored
491
* @return SavepointRestoreSettings with specified configuration
492
*/
493
public static SavepointRestoreSettings forPath(String savepointPath, boolean allowNonRestoredState);
494
}
495
```