0
# JAR Management
1
2
Complete JAR file lifecycle management including upload, listing, execution, and deletion. This is the core functionality for submitting and managing Flink jobs through the web interface, providing REST endpoints for all JAR operations.
3
4
## Capabilities
5
6
### JAR Upload Handler
7
8
Handles multipart file uploads for JAR files via POST /jars/upload endpoint.
9
10
```java { .api }
11
/**
12
* Handler for uploading JAR files to the Flink web server.
13
* Supports multipart/form-data uploads and stores JARs in the configured directory.
14
*/
15
public class JarUploadHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, JarUploadResponseBody, EmptyMessageParameters> {
16
/**
17
* Create a new JAR upload handler.
18
*
19
* @param leaderRetriever Gateway retriever for accessing Flink cluster
20
* @param timeout Request timeout for upload operations
21
* @param responseHeaders HTTP headers to include in responses
22
* @param messageHeaders Message header specification for this endpoint
23
* @param jarDir Directory where uploaded JARs will be stored
24
* @param executor Executor for handling upload operations
25
*/
26
public JarUploadHandler(
27
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
28
Time timeout,
29
Map<String, String> responseHeaders,
30
MessageHeaders<EmptyRequestBody, JarUploadResponseBody, EmptyMessageParameters> messageHeaders,
31
Path jarDir,
32
Executor executor
33
);
34
}
35
```
36
37
### JAR List Handler
38
39
Provides listing of all uploaded JAR files with entry point information via GET /jars endpoint.
40
41
```java { .api }
42
/**
43
* Handler for listing uploaded JAR files and their available entry points.
44
* Returns detailed information about each JAR including main classes and program arguments.
45
*/
46
public class JarListHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, JarListInfo, EmptyMessageParameters> {
47
/**
48
* Create a new JAR list handler.
49
*
50
* @param leaderRetriever Gateway retriever for accessing Flink cluster
51
* @param timeout Request timeout for listing operations
52
* @param responseHeaders HTTP headers to include in responses
53
* @param messageHeaders Message header specification for this endpoint
54
* @param localAddressFuture Future containing the local server address
55
* @param jarDir Directory containing uploaded JARs
56
* @param configuration Flink configuration
57
* @param executor Executor for JAR analysis operations
58
*/
59
public JarListHandler(
60
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
61
Time timeout,
62
Map<String, String> responseHeaders,
63
MessageHeaders<EmptyRequestBody, JarListInfo, EmptyMessageParameters> messageHeaders,
64
CompletableFuture<String> localAddressFuture,
65
File jarDir,
66
Configuration configuration,
67
Executor executor
68
);
69
}
70
```
71
72
### JAR Run Handler
73
74
Executes uploaded JAR files as Flink jobs via POST /jars/:jarId/run endpoint.
75
76
```java { .api }
77
/**
78
* Handler for executing uploaded JAR files as Flink jobs.
79
* Supports job configuration including parallelism, savepoints, and program arguments.
80
*/
81
public class JarRunHandler extends AbstractRestHandler<DispatcherGateway, JarRunRequestBody, JarRunResponseBody, JarRunMessageParameters> {
82
/**
83
* Create a new JAR run handler.
84
*
85
* @param leaderRetriever Gateway retriever for accessing Flink cluster
86
* @param timeout Request timeout for job submission
87
* @param responseHeaders HTTP headers to include in responses
88
* @param messageHeaders Message header specification for this endpoint
89
* @param jarDir Directory containing uploaded JARs
90
* @param configuration Flink configuration
91
* @param executor Executor for job submission operations
92
* @param applicationRunnerSupplier Supplier for creating application runners
93
*/
94
public JarRunHandler(
95
GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
96
Time timeout,
97
Map<String, String> responseHeaders,
98
MessageHeaders<JarRunRequestBody, JarRunResponseBody, JarRunMessageParameters> messageHeaders,
99
Path jarDir,
100
Configuration configuration,
101
Executor executor,
102
Supplier<ApplicationRunner> applicationRunnerSupplier
103
);
104
}
105
```
106
107
### JAR Delete Handler
108
109
Deletes uploaded JAR files via DELETE /jars/:jarId endpoint.
110
111
```java { .api }
112
/**
113
* Handler for deleting uploaded JAR files from the server.
114
* Removes both the JAR file and any associated metadata.
115
*/
116
public class JarDeleteHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, JarDeleteMessageParameters> {
117
/**
118
* Create a new JAR delete handler.
119
*
120
* @param leaderRetriever Gateway retriever for accessing Flink cluster
121
* @param timeout Request timeout for delete operations
122
* @param responseHeaders HTTP headers to include in responses
123
* @param messageHeaders Message header specification for this endpoint
124
* @param jarDir Directory containing uploaded JARs
125
* @param executor Executor for delete operations
126
*/
127
public JarDeleteHandler(
128
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
129
Time timeout,
130
Map<String, String> responseHeaders,
131
MessageHeaders<EmptyRequestBody, EmptyResponseBody, JarDeleteMessageParameters> messageHeaders,
132
Path jarDir,
133
Executor executor
134
);
135
}
136
```
137
138
### JAR Plan Handler
139
140
Shows execution plan for JAR files without running them via GET and POST /jars/:jarId/plan endpoints.
141
142
```java { .api }
143
/**
144
* Handler for generating execution plans from uploaded JAR files.
145
* Shows the job graph that would be executed without actually running the job.
146
*/
147
public class JarPlanHandler extends AbstractRestHandler<RestfulGateway, JarPlanRequestBody, JobPlanInfo, JarPlanMessageParameters> {
148
/**
149
* Create a new JAR plan handler with default plan generator.
150
*
151
* @param leaderRetriever Gateway retriever for accessing Flink cluster
152
* @param timeout Request timeout for plan generation
153
* @param responseHeaders HTTP headers to include in responses
154
* @param messageHeaders Message header specification for this endpoint
155
* @param jarDir Directory containing uploaded JARs
156
* @param configuration Flink configuration
157
* @param executor Executor for plan generation operations
158
*/
159
public JarPlanHandler(
160
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
161
Time timeout,
162
Map<String, String> responseHeaders,
163
MessageHeaders<JarPlanRequestBody, JobPlanInfo, JarPlanMessageParameters> messageHeaders,
164
Path jarDir,
165
Configuration configuration,
166
Executor executor
167
);
168
169
/**
170
* Create a new JAR plan handler with custom plan generator.
171
*
172
* @param leaderRetriever Gateway retriever for accessing Flink cluster
173
* @param timeout Request timeout for plan generation
174
* @param responseHeaders HTTP headers to include in responses
175
* @param messageHeaders Message header specification for this endpoint
176
* @param jarDir Directory containing uploaded JARs
177
* @param configuration Flink configuration
178
* @param executor Executor for plan generation operations
179
* @param planGenerator Custom function to generate JobPlanInfo from JobGraph
180
*/
181
public JarPlanHandler(
182
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
183
Time timeout,
184
Map<String, String> responseHeaders,
185
MessageHeaders<JarPlanRequestBody, JobPlanInfo, JarPlanMessageParameters> messageHeaders,
186
Path jarDir,
187
Configuration configuration,
188
Executor executor,
189
Function<JobGraph, JobPlanInfo> planGenerator
190
);
191
}
192
```
193
194
## Usage Examples
195
196
### Complete JAR Management Workflow
197
198
```java
199
import org.apache.flink.runtime.webmonitor.handlers.*;
200
import org.apache.flink.configuration.Configuration;
201
import java.nio.file.Paths;
202
203
// Setup configuration
204
Configuration config = new Configuration();
205
Path jarDir = Paths.get("/tmp/flink-jars");
206
Time timeout = Time.seconds(30);
207
Map<String, String> headers = Collections.emptyMap();
208
209
// Create handlers
210
JarUploadHandler uploadHandler = new JarUploadHandler(
211
leaderRetriever,
212
timeout,
213
headers,
214
JarUploadHeaders.getInstance(),
215
jarDir,
216
executor
217
);
218
219
JarListHandler listHandler = new JarListHandler(
220
leaderRetriever,
221
timeout,
222
headers,
223
JarListHeaders.getInstance(),
224
jarDir,
225
config,
226
executor
227
);
228
229
JarRunHandler runHandler = new JarRunHandler(
230
leaderRetriever,
231
timeout,
232
headers,
233
JarRunHeaders.getInstance(),
234
jarDir,
235
config,
236
executor,
237
applicationRunnerSupplier
238
);
239
```
240
241
### Programmatic JAR Operations
242
243
```java
244
// Upload a JAR (would typically be handled by HTTP multipart upload)
245
// The actual upload is handled by the JarUploadHandler via HTTP POST
246
247
// List uploaded JARs
248
HandlerRequest<EmptyRequestBody, EmptyMessageParameters> listRequest =
249
HandlerRequest.create(EmptyRequestBody.getInstance(), EmptyMessageParameters.getInstance());
250
CompletableFuture<JarListInfo> jarList = listHandler.handleRequest(listRequest, restfulGateway);
251
252
// Run a JAR
253
JarRunRequestBody runRequest = new JarRunRequestBody();
254
runRequest.setEntryClassName("com.example.MyFlinkJob");
255
runRequest.setProgramArguments(Arrays.asList("--input", "/path/to/input"));
256
runRequest.setParallelism(4);
257
258
HandlerRequest<JarRunRequestBody, JarRunMessageParameters> runHandlerRequest =
259
HandlerRequest.create(runRequest, jarRunMessageParameters, pathParameters, queryParameters);
260
CompletableFuture<JarRunResponseBody> jobResult = runHandler.handleRequest(runHandlerRequest, restfulGateway);
261
```
262
263
### Error Handling in JAR Operations
264
265
All JAR handlers include comprehensive error handling for common scenarios:
266
267
- **File not found**: When specified JAR ID doesn't exist
268
- **Invalid JAR files**: When uploaded files are not valid JAR files
269
- **Class loading errors**: When entry classes cannot be loaded
270
- **Job submission failures**: When jobs fail to submit to the cluster
271
- **Timeout handling**: When operations exceed specified timeout limits
272
273
```java
274
// Typical error handling pattern in handler usage
275
jarRunHandler.handleRequest(request, gateway)
276
.whenComplete((result, throwable) -> {
277
if (throwable != null) {
278
if (throwable instanceof NotFoundException) {
279
// Handle JAR not found
280
} else if (throwable instanceof BadRequestException) {
281
// Handle invalid request parameters
282
} else {
283
// Handle other errors
284
}
285
} else {
286
// Process successful result
287
JobID jobId = result.getJobId();
288
}
289
});
290
```
291
292
## REST Endpoint Specifications
293
294
The JAR management handlers correspond to the following REST endpoints:
295
296
- **POST /jars/upload** - Upload JAR files (multipart/form-data)
297
- **GET /jars** - List uploaded JARs with entry point information
298
- **POST /jars/{jarId}/run** - Execute JAR as Flink job
299
- **DELETE /jars/{jarId}** - Delete uploaded JAR
300
- **GET /jars/{jarId}/plan** - Show execution plan without running
301
- **POST /jars/{jarId}/plan** - Show execution plan with custom parameters
302
303
All endpoints support proper HTTP status codes, error responses, and JSON request/response bodies where applicable.