0
# JAR Management
1
2
Complete JAR lifecycle management for Flink job submission including upload, execution, execution planning, and deletion capabilities.
3
4
## Capabilities
5
6
### JAR Upload
7
8
Upload JAR files to the Flink cluster for later execution. Supports multipart form data uploads with automatic validation.
9
10
```java { .api }
11
/**
12
* Handles JAR file uploads via multipart form data
13
* REST Endpoint: POST /jars/upload
14
*/
15
public class JarUploadHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, JarUploadResponseBody, EmptyMessageParameters> {
16
public JarUploadHandler(
17
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
18
Duration timeout,
19
Map<String, String> responseHeaders,
20
MessageHeaders<EmptyRequestBody, JarUploadResponseBody, EmptyMessageParameters> messageHeaders,
21
Path jarDir,
22
Executor executor
23
);
24
25
public CompletableFuture<JarUploadResponseBody> handleRequest(
26
HandlerRequest<EmptyRequestBody> request,
27
RestfulGateway gateway
28
) throws RestHandlerException;
29
}
30
31
/**
32
* Message headers for JAR upload endpoint
33
*/
34
public class JarUploadHeaders implements RuntimeMessageHeaders<EmptyRequestBody, JarUploadResponseBody, EmptyMessageParameters> {
35
public static JarUploadHeaders getInstance();
36
public boolean acceptsFileUploads();
37
public String getDescription();
38
}
39
```
40
41
**Usage Example:**
42
43
```bash
44
# Upload a JAR file via REST API
45
curl -X POST -F "jarfile=@/path/to/job.jar" http://localhost:8081/jars/upload
46
```
47
48
### JAR List
49
50
List all uploaded JAR files with metadata including entry points, upload timestamps, and file information.
51
52
```java { .api }
53
/**
54
* Lists all uploaded JAR files with metadata and entry points
55
* REST Endpoint: GET /jars
56
*/
57
public class JarListHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, JarListInfo, EmptyMessageParameters> {
58
public JarListHandler(
59
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
60
Duration timeout,
61
Map<String, String> responseHeaders,
62
MessageHeaders<EmptyRequestBody, JarListInfo, EmptyMessageParameters> messageHeaders,
63
CompletableFuture<String> localAddressFuture,
64
File jarDir,
65
Configuration configuration,
66
Executor executor
67
);
68
69
public CompletableFuture<JarListInfo> handleRequest(
70
HandlerRequest<EmptyRequestBody> request,
71
RestfulGateway gateway
72
) throws RestHandlerException;
73
}
74
```
75
76
**Usage Example:**
77
78
```bash
79
# List all uploaded JARs
80
curl http://localhost:8081/jars
81
```
82
83
### JAR Execution
84
85
Execute previously uploaded JAR files with configurable parameters including parallelism, program arguments, entry class, and savepoint restoration.
86
87
```java { .api }
88
/**
89
* Submits and runs jobs from uploaded JAR files
90
* REST Endpoint: POST /jars/:jarid/run
91
*/
92
public class JarRunHandler extends AbstractRestHandler<DispatcherGateway, JarRunRequestBody, JarRunResponseBody, JarRunMessageParameters> {
93
public JarRunHandler(
94
GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
95
Duration timeout,
96
Map<String, String> responseHeaders,
97
MessageHeaders<JarRunRequestBody, JarRunResponseBody, JarRunMessageParameters> messageHeaders,
98
Path jarDir,
99
Configuration configuration,
100
Executor executor,
101
java.util.function.Supplier<ApplicationRunner> applicationRunnerSupplier
102
);
103
104
public CompletableFuture<JarRunResponseBody> handleRequest(
105
HandlerRequest<JarRunRequestBody> request,
106
DispatcherGateway gateway
107
) throws RestHandlerException;
108
}
109
110
/**
111
* Request body for JAR execution with savepoint and configuration options
112
*/
113
public class JarRunRequestBody extends JarRequestBody implements RequestBody {
114
public JarRunRequestBody();
115
116
public JarRunRequestBody(
117
String entryClassName,
118
List<String> programArgumentsList,
119
Integer parallelism,
120
JobID jobId,
121
Boolean allowNonRestoredState,
122
String savepointPath,
123
RecoveryClaimMode recoveryClaimMode,
124
Map<String, String> flinkConfiguration
125
);
126
127
public Boolean getAllowNonRestoredState();
128
public String getSavepointPath();
129
public RecoveryClaimMode getRecoveryClaimMode();
130
public boolean isDeprecatedRestoreModeHasValue();
131
}
132
```
133
134
**Usage Example:**
135
136
```bash
137
# Execute a JAR with parameters
138
curl -X POST http://localhost:8081/jars/your-jar-id/run \
139
-H "Content-Type: application/json" \
140
-d '{
141
"entryClass": "com.example.FlinkJob",
142
"programArgs": ["--input", "hdfs://input", "--output", "hdfs://output"],
143
"parallelism": 4,
144
"savepointPath": "hdfs://savepoints/savepoint-123",
145
"allowNonRestoredState": false
146
}'
147
```
148
149
### JAR Execution Planning
150
151
Generate execution plans for JAR files without actually running them. Useful for validation and visualization of job graphs.
152
153
```java { .api }
154
/**
155
* Generates execution plans for JAR files without running them
156
* REST Endpoints: GET /jars/:jarid/plan, POST /jars/:jarid/plan
157
*/
158
public class JarPlanHandler extends AbstractRestHandler<RestfulGateway, JarPlanRequestBody, JobPlanInfo, JarPlanMessageParameters> {
159
public JarPlanHandler(
160
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
161
Duration timeout,
162
Map<String, String> responseHeaders,
163
MessageHeaders<JarPlanRequestBody, JobPlanInfo, JarPlanMessageParameters> messageHeaders,
164
Path jarDir,
165
Configuration configuration,
166
Executor executor
167
);
168
169
public CompletableFuture<JobPlanInfo> handleRequest(
170
HandlerRequest<JarPlanRequestBody> request,
171
RestfulGateway gateway
172
) throws RestHandlerException;
173
}
174
175
/**
176
* Abstract base class for JAR plan message headers
177
*/
178
public abstract class AbstractJarPlanHeaders implements RuntimeMessageHeaders<JarPlanRequestBody, JobPlanInfo, JarPlanMessageParameters> {
179
public Class<JobPlanInfo> getResponseClass();
180
public HttpResponseStatus getResponseStatusCode();
181
public Class<JarPlanRequestBody> getRequestClass();
182
public JarPlanMessageParameters getUnresolvedMessageParameters();
183
public String getTargetRestEndpointURL();
184
public String operationId();
185
public String getDescription();
186
}
187
188
/**
189
* Message headers for GET /jars/:jarid/plan endpoint
190
*/
191
public class JarPlanGetHeaders extends AbstractJarPlanHeaders {
192
public static JarPlanGetHeaders getInstance();
193
public HttpMethodWrapper getHttpMethod();
194
}
195
196
/**
197
* Message headers for POST /jars/:jarid/plan endpoint
198
*/
199
public class JarPlanPostHeaders extends AbstractJarPlanHeaders {
200
public static JarPlanPostHeaders getInstance();
201
public HttpMethodWrapper getHttpMethod();
202
}
203
204
/**
205
* Request body for JAR execution plan generation
206
*/
207
public class JarPlanRequestBody extends JarRequestBody implements RequestBody {
208
// Inherits entryClass, programArgs, parallelism from JarRequestBody
209
}
210
```
211
212
**Usage Example:**
213
214
```bash
215
# Get execution plan for a JAR
216
curl "http://localhost:8081/jars/your-jar-id/plan?entryClass=com.example.FlinkJob¶llelism=4"
217
218
# Or with POST body
219
curl -X POST http://localhost:8081/jars/your-jar-id/plan \
220
-H "Content-Type: application/json" \
221
-d '{
222
"entryClass": "com.example.FlinkJob",
223
"programArgs": ["--input", "test"],
224
"parallelism": 2
225
}'
226
```
227
228
### JAR Deletion
229
230
Delete uploaded JAR files from the cluster to free up storage space.
231
232
```java { .api }
233
/**
234
* Deletes uploaded JAR files
235
* REST Endpoint: DELETE /jars/:jarid
236
*/
237
public class JarDeleteHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, JarDeleteMessageParameters> {
238
public CompletableFuture<EmptyResponseBody> handleRequest(
239
HandlerRequest<EmptyRequestBody> request,
240
RestfulGateway gateway
241
) throws RestHandlerException;
242
}
243
```
244
245
**Usage Example:**
246
247
```bash
248
# Delete an uploaded JAR
249
curl -X DELETE http://localhost:8081/jars/your-jar-id
250
```
251
252
### Message Parameters and Path Variables
253
254
```java { .api }
255
/**
256
* Base message parameters for JAR operations
257
*/
258
public class JarMessageParameters extends MessageParameters {
259
public final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
260
}
261
262
/**
263
* Message parameters for JAR run operations
264
*/
265
public class JarRunMessageParameters extends JarMessageParameters {
266
public final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter();
267
public final ParallelismQueryParameter parallelismQueryParameter = new ParallelismQueryParameter();
268
public final ProgramArgQueryParameter programArgQueryParameter = new ProgramArgQueryParameter();
269
public final SavepointPathQueryParameter savepointPathQueryParameter = new SavepointPathQueryParameter();
270
public final AllowNonRestoredStateQueryParameter allowNonRestoredStateQueryParameter = new AllowNonRestoredStateQueryParameter();
271
}
272
273
/**
274
* Message parameters for JAR plan operations
275
*/
276
public class JarPlanMessageParameters extends JarMessageParameters {
277
public final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter();
278
public final ParallelismQueryParameter parallelismQueryParameter = new ParallelismQueryParameter();
279
public final ProgramArgQueryParameter programArgQueryParameter = new ProgramArgQueryParameter();
280
}
281
282
/**
283
* Message parameters for JAR delete operations
284
*/
285
public class JarDeleteMessageParameters extends JarMessageParameters {
286
// Only inherits jarIdPathParameter
287
}
288
289
/**
290
* Path parameter for identifying JAR files by ID
291
*/
292
public class JarIdPathParameter extends MessagePathParameter<String> {
293
public static final String KEY = "jarid";
294
295
protected String convertFromString(String value) throws ConversionException;
296
protected String convertToString(String value);
297
public String getDescription();
298
}
299
300
/**
301
* Base class for string query parameters
302
*/
303
public abstract class StringQueryParameter extends MessageQueryParameter<String> {
304
public StringQueryParameter(String key, MessageParameterRequisiteness requisiteness);
305
public final String convertStringToValue(String value);
306
public final String convertValueToString(String value);
307
}
308
309
/**
310
* Query parameters for JAR operations
311
*/
312
public class ParallelismQueryParameter extends MessageQueryParameter<Integer> {
313
public static final String KEY = "parallelism";
314
}
315
316
public class EntryClassQueryParameter extends StringQueryParameter {
317
public static final String KEY = "entryClass";
318
}
319
320
public class ProgramArgQueryParameter extends MessageQueryParameter<List<String>> {
321
public static final String KEY = "programArgs";
322
}
323
324
public class SavepointPathQueryParameter extends StringQueryParameter {
325
public static final String KEY = "savepointPath";
326
}
327
328
public class AllowNonRestoredStateQueryParameter extends MessageQueryParameter<Boolean> {
329
public static final String KEY = "allowNonRestoredState";
330
}
331
```
332
333
## Request and Response Models
334
335
### Base Request Body
336
337
```java { .api }
338
/**
339
* Base request body for JAR operations
340
*/
341
public abstract class JarRequestBody implements RequestBody {
342
/**
343
* Get the entry class name for JAR execution
344
*/
345
public String getEntryClassName();
346
347
/**
348
* Get the program arguments as a list
349
*/
350
public List<String> getProgramArgumentsList();
351
352
/**
353
* Get the parallelism setting for job execution
354
*/
355
public Integer getParallelism();
356
357
/**
358
* Get the job ID for the execution
359
*/
360
public JobID getJobId();
361
362
/**
363
* Get Flink configuration overrides
364
*/
365
public Configuration getFlinkConfiguration();
366
}
367
```
368
369
### Response Bodies
370
371
```java { .api }
372
/**
373
* Response for JAR upload operations
374
*/
375
public class JarUploadResponseBody implements ResponseBody {
376
public JarUploadResponseBody(String filename);
377
public String getFilename();
378
public UploadStatus getStatus();
379
380
public enum UploadStatus {
381
success
382
}
383
}
384
385
/**
386
* Response for JAR execution operations
387
*/
388
public class JarRunResponseBody implements ResponseBody {
389
public JarRunResponseBody(JobID jobId);
390
public JobID getJobId();
391
}
392
393
/**
394
* Response containing list of uploaded JARs with metadata
395
*/
396
public class JarListInfo implements ResponseBody {
397
public JarListInfo(String address, List<JarFileInfo> jarFileList);
398
public String getAddress();
399
public List<JarFileInfo> getFiles();
400
401
/**
402
* Individual JAR file metadata
403
*/
404
public static class JarFileInfo {
405
public JarFileInfo(String id, String name, long uploaded, List<JarEntryInfo> jarEntryList);
406
407
public String getId();
408
public String getName();
409
public long getUploaded();
410
public List<JarEntryInfo> getEntry();
411
412
// Public fields for JSON serialization
413
public String id;
414
public String name;
415
public long uploaded;
416
public List<JarEntryInfo> entry;
417
}
418
419
/**
420
* Entry point information for JAR files
421
*/
422
public static class JarEntryInfo {
423
public JarEntryInfo(String name, String description);
424
425
public String getName();
426
public String getDescription();
427
}
428
}
429
```
430
431
## Utility Classes
432
433
```java { .api }
434
/**
435
* JAR handling utilities for upload and execution
436
*/
437
public class JarHandlerUtils {
438
/**
439
* Tokenizes program arguments string into list of arguments
440
*/
441
static List<String> tokenizeArguments(@Nullable String args);
442
443
/**
444
* Context for JAR handler operations containing standard parameters
445
*/
446
public static class JarHandlerContext {
447
public static <R extends JarRequestBody> JarHandlerContext fromRequest(
448
HandlerRequest<R> request,
449
Path jarDir,
450
Logger log
451
) throws RestHandlerException;
452
453
/**
454
* Apply JAR request configuration to Flink configuration
455
*/
456
public void applyToConfiguration(
457
Configuration configuration,
458
HandlerRequest<? extends JarRequestBody> request
459
);
460
461
/**
462
* Create job graph from packaged program
463
*/
464
public JobGraph toJobGraph(
465
PackagedProgram packagedProgram,
466
Configuration configuration,
467
boolean suppressOutput
468
) throws Exception;
469
470
/**
471
* Create packaged program from configuration
472
*/
473
public PackagedProgram toPackagedProgram(Configuration configuration)
474
throws Exception;
475
}
476
}