0
# Program Packaging and Execution
1
2
Functionality for packaging Flink programs in JAR files, managing classpaths, handling user applications, and executing programs with proper isolation and resource management.
3
4
## Capabilities
5
6
### Packaged Program
7
8
Core class representing a Flink program packaged in a JAR file with full support for classpath management, argument handling, and program execution.
9
10
```java { .api }
11
/**
12
* Represents a Flink program packaged in a JAR file
13
*/
14
public class PackagedProgram implements AutoCloseable {
15
/**
16
* Create a new packaged program builder
17
* @return Builder instance for constructing PackagedProgram
18
*/
19
public static Builder newBuilder();
20
21
/**
22
* Get the main class of the packaged program
23
* @return Main class object
24
* @throws ProgramInvocationException if main class cannot be determined
25
*/
26
public Class<?> getMainClass() throws ProgramInvocationException;
27
28
/**
29
* Get program arguments
30
* @return Array of program arguments
31
*/
32
public String[] getArguments();
33
34
/**
35
* Get JAR file URL
36
* @return URL of the JAR file
37
*/
38
public URL getJarFile();
39
40
/**
41
* Get user code class loader
42
* @return Class loader for user code
43
*/
44
public URLClassLoader getUserCodeClassLoader();
45
46
/**
47
* Get savepoint restore settings
48
* @return Savepoint restore settings
49
*/
50
public SavepointRestoreSettings getSavepointRestoreSettings();
51
52
/**
53
* Check if this is a Python program
54
* @return true if Python program, false otherwise
55
*/
56
public boolean isPython();
57
58
/**
59
* Invoke the program's main method for interactive execution
60
* @throws ProgramInvocationException if execution fails
61
*/
62
public void invokeInteractiveModeForExecution() throws ProgramInvocationException;
63
64
/**
65
* Get list of classpath URLs
66
* @return List of classpath URLs
67
*/
68
public List<URL> getClasspaths();
69
70
/**
71
* Check if program has main method
72
* @return true if main method exists
73
*/
74
public boolean hasMainMethod();
75
76
@Override
77
public void close();
78
79
/**
80
* Builder for creating PackagedProgram instances
81
*/
82
public static class Builder {
83
/**
84
* Set JAR file for the program
85
* @param jarFile JAR file containing the program
86
* @return Builder instance
87
*/
88
public Builder setJarFile(File jarFile);
89
90
/**
91
* Set program arguments
92
* @param arguments Array of arguments
93
* @return Builder instance
94
*/
95
public Builder setArguments(String... arguments);
96
97
/**
98
* Set entry point class name
99
* @param entryPointClassName Fully qualified class name
100
* @return Builder instance
101
*/
102
public Builder setEntryPointClassName(@Nullable String entryPointClassName);
103
104
/**
105
* Set savepoint restore settings
106
* @param savepointRestoreSettings Savepoint settings
107
* @return Builder instance
108
*/
109
public Builder setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings);
110
111
/**
112
* Set additional classpath URLs
113
* @param classpaths List of classpath URLs
114
* @return Builder instance
115
*/
116
public Builder setClasspaths(List<URL> classpaths);
117
118
/**
119
* Set user code class loader
120
* @param userCodeClassLoader Class loader for user code
121
* @return Builder instance
122
*/
123
public Builder setUserCodeClassLoader(URLClassLoader userCodeClassLoader);
124
125
/**
126
* Build the PackagedProgram instance
127
* @return Configured PackagedProgram
128
* @throws ProgramInvocationException if construction fails
129
*/
130
public PackagedProgram build() throws ProgramInvocationException;
131
}
132
}
133
```
134
135
**Usage Examples:**
136
137
```java
138
import org.apache.flink.client.program.PackagedProgram;
139
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
140
141
// Create a simple packaged program
142
PackagedProgram program = PackagedProgram.newBuilder()
143
.setJarFile(new File("/path/to/flink-job.jar"))
144
.setArguments("--input", "input.txt", "--output", "output.txt")
145
.build();
146
147
// Create program with custom entry point
148
PackagedProgram programWithEntryPoint = PackagedProgram.newBuilder()
149
.setJarFile(new File("/path/to/job.jar"))
150
.setEntryPointClassName("com.mycompany.MyFlinkJob")
151
.setArguments("--parallelism", "4")
152
.build();
153
154
// Create program with savepoint restore
155
SavepointRestoreSettings savepointSettings = SavepointRestoreSettings
156
.forPath("/path/to/savepoint", false);
157
158
PackagedProgram programWithSavepoint = PackagedProgram.newBuilder()
159
.setJarFile(new File("/path/to/job.jar"))
160
.setSavepointRestoreSettings(savepointSettings)
161
.build();
162
163
// Use the program
164
System.out.println("Main class: " + program.getMainClass().getName());
165
System.out.println("Arguments: " + Arrays.toString(program.getArguments()));
166
167
// Clean up resources
168
program.close();
169
```
170
171
### Packaged Program Utilities
172
173
Utility functions for working with packaged programs, including Python program detection and program creation helpers.
174
175
```java { .api }
176
/**
177
* Utilities for working with packaged programs
178
*/
179
public class PackagedProgramUtils {
180
/**
181
* Check if the program is a Python program
182
* @param program Packaged program to check
183
* @return true if Python program
184
*/
185
public static boolean isPython(PackagedProgram program);
186
187
/**
188
* Check if file path represents a Python program
189
* @param file File to check
190
* @return true if Python file
191
*/
192
public static boolean isPython(File file);
193
194
/**
195
* Get program description from packaged program
196
* @param program Packaged program
197
* @return Program description or null
198
*/
199
@Nullable
200
public static String getProgramDescription(PackagedProgram program);
201
202
/**
203
* Extract nested libraries from JAR file
204
* @param jarFile JAR file to extract from
205
* @param extractionDir Target directory for extraction
206
* @return List of extracted library files
207
* @throws IOException if extraction fails
208
*/
209
public static List<File> extractNestedLibraries(File jarFile, File extractionDir)
210
throws IOException;
211
}
212
```
213
214
### Packaged Program Retriever
215
216
Interface and implementation for retrieving packaged programs, enabling flexible program loading strategies.
217
218
```java { .api }
219
/**
220
* Interface for retrieving packaged programs
221
*/
222
public interface PackagedProgramRetriever {
223
/**
224
* Retrieve a packaged program
225
* @return PackagedProgram instance
226
* @throws ProgramInvocationException if retrieval fails
227
*/
228
PackagedProgram getPackagedProgram() throws ProgramInvocationException;
229
}
230
231
/**
232
* Default implementation of packaged program retriever
233
*/
234
public class DefaultPackagedProgramRetriever implements PackagedProgramRetriever {
235
/**
236
* Create retriever with JAR file and arguments
237
* @param jarFile JAR file containing the program
238
* @param entryPointClassName Optional entry point class name
239
* @param programArguments Program arguments
240
* @param savepointRestoreSettings Savepoint restore settings
241
* @param classpaths Additional classpath URLs
242
*/
243
public DefaultPackagedProgramRetriever(
244
File jarFile,
245
@Nullable String entryPointClassName,
246
String[] programArguments,
247
SavepointRestoreSettings savepointRestoreSettings,
248
List<URL> classpaths
249
);
250
251
@Override
252
public PackagedProgram getPackagedProgram() throws ProgramInvocationException;
253
}
254
```
255
256
### Stream Environment Classes
257
258
Specialized execution environments for stream processing contexts and plan generation.
259
260
```java { .api }
261
/**
262
* Execution environment for stream processing contexts
263
*/
264
public class StreamContextEnvironment extends StreamExecutionEnvironment {
265
/**
266
* Create stream context environment
267
* @param client Cluster client for job submission
268
* @param parallelism Default parallelism
269
* @param userCodeClassLoader User code class loader
270
* @param savepointRestoreSettings Savepoint restore settings
271
*/
272
public StreamContextEnvironment(
273
ClusterClient<?> client,
274
int parallelism,
275
ClassLoader userCodeClassLoader,
276
SavepointRestoreSettings savepointRestoreSettings
277
);
278
279
@Override
280
public JobExecutionResult execute(String jobName) throws Exception;
281
282
@Override
283
public JobClient executeAsync(String jobName) throws Exception;
284
}
285
286
/**
287
* Environment for stream plan generation
288
*/
289
public class StreamPlanEnvironment extends StreamExecutionEnvironment {
290
/**
291
* Create stream plan environment
292
* @param executorServiceLoader Executor service loader
293
* @param configuration Flink configuration
294
* @param userCodeClassLoader User code class loader
295
* @param savepointRestoreSettings Savepoint restore settings
296
*/
297
public StreamPlanEnvironment(
298
PipelineExecutorServiceLoader executorServiceLoader,
299
Configuration configuration,
300
ClassLoader userCodeClassLoader,
301
SavepointRestoreSettings savepointRestoreSettings
302
);
303
304
@Override
305
public JobExecutionResult execute(String jobName) throws Exception;
306
307
/**
308
* Get the execution plan as JSON
309
* @return Execution plan JSON string
310
*/
311
public String getExecutionPlan();
312
}
313
```
314
315
### Mini Cluster Factory
316
317
Factory for creating per-job mini clusters for testing and local development.
318
319
```java { .api }
320
/**
321
* Factory for per-job mini clusters
322
*/
323
public class PerJobMiniClusterFactory {
324
/**
325
* Create per-job mini cluster factory
326
* @param miniClusterConfiguration Mini cluster configuration
327
*/
328
public PerJobMiniClusterFactory(MiniClusterConfiguration miniClusterConfiguration);
329
330
/**
331
* Create mini cluster for job execution
332
* @param jobGraph Job graph to execute
333
* @return Mini cluster instance
334
* @throws Exception if creation fails
335
*/
336
public MiniCluster create(JobGraph jobGraph) throws Exception;
337
}
338
```
339
340
## Types
341
342
```java { .api }
343
/**
344
* Settings for savepoint restoration
345
*/
346
public class SavepointRestoreSettings {
347
/**
348
* Create settings for restoring from savepoint path
349
* @param savepointPath Path to savepoint
350
* @param allowNonRestoredState Whether to allow non-restored state
351
* @return Savepoint restore settings
352
*/
353
public static SavepointRestoreSettings forPath(
354
String savepointPath,
355
boolean allowNonRestoredState
356
);
357
358
/**
359
* Create settings with no savepoint restoration
360
* @return Settings for no restoration
361
*/
362
public static SavepointRestoreSettings none();
363
364
/**
365
* Check if restoration is enabled
366
* @return true if restoration enabled
367
*/
368
public boolean restoreSavepoint();
369
370
/**
371
* Get savepoint path
372
* @return Savepoint path or null
373
*/
374
@Nullable
375
public String getRestorePath();
376
377
/**
378
* Check if non-restored state is allowed
379
* @return true if allowed
380
*/
381
public boolean allowNonRestoredState();
382
}
383
```
384
385
## Exception Handling
386
387
Program packaging operations handle various error conditions:
388
389
- **File Errors**: `FileNotFoundException` for missing JAR files, `IOException` for file access issues
390
- **Class Loading Errors**: `ClassNotFoundException` for missing main classes, `NoClassDefFoundError` for dependencies
391
- **Program Errors**: `ProgramInvocationException` for program execution failures
392
- **Configuration Errors**: `ProgramParametrizationException` for invalid program parameters
393
- **Missing Job Errors**: `ProgramMissingJobException` when program doesn't define a Flink job
394
395
**Common Error Patterns:**
396
397
```java
398
try {
399
PackagedProgram program = PackagedProgram.newBuilder()
400
.setJarFile(new File("job.jar"))
401
.build();
402
403
// Use program...
404
program.close();
405
406
} catch (ProgramInvocationException e) {
407
// Handle program construction/execution errors
408
System.err.println("Program error: " + e.getMessage());
409
} catch (FileNotFoundException e) {
410
// Handle missing JAR file
411
System.err.println("JAR file not found: " + e.getMessage());
412
}
413
```