0
# Utilities and Extensions
1
2
Helper utilities and extension points for JAR processing and web submission functionality. These components provide reusable functionality for custom implementations and internal processing operations.
3
4
## Capabilities
5
6
### JAR Handler Utilities
7
8
Core utility class providing JAR processing functionality and context management.
9
10
```java { .api }
11
/**
12
* Utility class providing helper functions for JAR handlers.
13
* Contains static methods and context classes for JAR processing operations.
14
*/
15
public class JarHandlerUtils {
16
/**
17
* Tokenize program arguments string into individual arguments.
18
* Handles quoted arguments and proper escaping.
19
*
20
* @param args Program arguments as a single string
21
* @return List of individual argument strings
22
*/
23
public static List<String> tokenizeArguments(String args);
24
25
/**
26
* Context object for JAR processing operations.
27
* Provides methods for converting between different representations of JAR configurations.
28
*/
29
public static class JarHandlerContext {
30
/**
31
* Create a JAR handler context from an HTTP request.
32
* Extracts JAR configuration from request body and parameters.
33
*
34
* @param request HandlerRequest containing JAR configuration
35
* @param jarDir Directory containing uploaded JAR files
36
* @param log Logger instance for operation logging
37
* @return JarHandlerContext with extracted configuration
38
* @throws RestHandlerException if request parameters are invalid
39
*/
40
public static JarHandlerContext fromRequest(
41
HandlerRequest<JarRequestBody, ?> request,
42
Path jarDir,
43
Logger log
44
) throws RestHandlerException;
45
46
/**
47
* Apply JAR configuration to a Flink Configuration object.
48
* Sets parallelism, program arguments, job ID, and other job settings.
49
*
50
* @param configuration Target Flink configuration to modify
51
*/
52
public void applyToConfiguration(Configuration configuration);
53
54
/**
55
* Convert JAR configuration to a Flink JobGraph.
56
* Creates the execution graph for the job without running it.
57
*
58
* @param packagedProgram PackagedProgram containing the job
59
* @param configuration Flink configuration for job execution
60
* @param suppressOutput Whether to suppress program output during job graph creation
61
* @return JobGraph representing the job execution plan
62
* @throws ProgramInvocationException if job graph creation fails
63
*/
64
public JobGraph toJobGraph(
65
PackagedProgram packagedProgram,
66
Configuration configuration,
67
boolean suppressOutput
68
) throws ProgramInvocationException;
69
70
/**
71
* Convert JAR configuration to a PackagedProgram.
72
* Creates a packaged program that can be executed or analyzed.
73
*
74
* @param configuration Flink configuration for program creation
75
* @return PackagedProgram ready for execution
76
* @throws ProgramInvocationException if program creation fails
77
*/
78
public PackagedProgram toPackagedProgram(Configuration configuration) throws ProgramInvocationException;
79
80
/**
81
* Get the JAR file path for this context.
82
*
83
* @return Path to the JAR file
84
*/
85
public Path getJarFile();
86
87
/**
88
* Get the entry class name.
89
*
90
* @return Fully qualified entry class name, or null if not specified
91
*/
92
public String getEntryClassName();
93
94
/**
95
* Get the program arguments.
96
*
97
* @return List of program arguments
98
*/
99
public List<String> getProgramArguments();
100
101
/**
102
* Get the job parallelism.
103
*
104
* @return Parallelism value, or null for default
105
*/
106
public Integer getParallelism();
107
108
/**
109
* Get the job ID.
110
*
111
* @return Job ID, or null if not specified
112
*/
113
public JobID getJobId();
114
}
115
}
116
```
117
118
### Web Submission Extension
119
120
Extension point for adding JAR submission capabilities to the web interface.
121
122
```java { .api }
123
/**
124
* Extension that provides JAR submission capabilities to the Flink web interface.
125
* Implements WebMonitorExtension to integrate with the web server framework.
126
*/
127
public class WebSubmissionExtension implements WebMonitorExtension {
128
/**
129
* Create a web submission extension.
130
*
131
* @param configuration Flink configuration
132
* @param leaderRetriever Gateway retriever for accessing Flink cluster
133
* @param responseHeaders HTTP headers to include in responses
134
* @param leaderElectionService Service for leader election
135
* @param jarDir Directory for storing uploaded JAR files
136
* @param executor Executor for handling submission operations
137
* @param timeout Request timeout for operations
138
*/
139
public WebSubmissionExtension(
140
Configuration configuration,
141
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
142
Map<String, String> responseHeaders,
143
CompletableFuture<String> leaderElectionService,
144
Path jarDir,
145
Executor executor,
146
Time timeout
147
);
148
149
/**
150
* Get the collection of REST handlers provided by this extension.
151
* Returns handlers for JAR upload, list, run, delete, and plan operations.
152
*
153
* @return Collection of handler specifications and implementations
154
*/
155
public Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> getHandlers();
156
157
/**
158
* Asynchronously close the extension and clean up resources.
159
*
160
* @return CompletableFuture that completes when cleanup is finished
161
*/
162
public CompletableFuture<Void> closeAsync();
163
}
164
```
165
166
### HTTP Request Handler
167
168
Low-level HTTP request handler for file uploads and general request processing.
169
170
```java { .api }
171
/**
172
* Netty channel handler for processing HTTP requests and file uploads.
173
* Handles multipart/form-data uploads and basic HTTP request routing.
174
*/
175
public class HttpRequestHandler extends SimpleChannelInboundHandler<HttpObject> {
176
/**
177
* Create an HTTP request handler.
178
*
179
* @param uploadDir Directory for storing uploaded files
180
*/
181
public HttpRequestHandler(File uploadDir);
182
183
/**
184
* Check and create upload directory if it doesn't exist.
185
* Validates directory permissions and creates necessary parent directories.
186
*
187
* @param uploadDir Directory to check and create
188
* @return The validated upload directory
189
* @throws IOException if directory cannot be created or accessed
190
*/
191
public static File checkAndCreateUploadDir(File uploadDir) throws IOException;
192
193
/**
194
* Log external upload directory deletion for cleanup tracking.
195
* Used for auditing and debugging upload directory management.
196
*
197
* @param uploadDir Directory that was deleted
198
*/
199
public static void logExternalUploadDirDeletion(File uploadDir);
200
201
/**
202
* Handle incoming HTTP request objects.
203
* Processes both simple requests and multipart file uploads.
204
*
205
* @param ctx Netty channel handler context
206
* @param httpObject HTTP request object (request, content, etc.)
207
*/
208
protected void channelRead0(ChannelHandlerContext ctx, HttpObject httpObject);
209
}
210
```
211
212
### Pipeline Error Handler
213
214
Final error handler in the Netty pipeline for comprehensive error handling.
215
216
```java { .api }
217
/**
218
* Final error handler in the Netty pipeline for unhandled exceptions.
219
* Provides centralized error logging and response generation.
220
*/
221
public class PipelineErrorHandler extends SimpleChannelInboundHandler<HttpObject> {
222
/**
223
* Create a pipeline error handler.
224
*
225
* @param logger Logger for error reporting
226
*/
227
public PipelineErrorHandler(Logger logger);
228
229
/**
230
* Handle exceptions that occurred during request processing.
231
* Logs errors and sends appropriate HTTP error responses to clients.
232
*
233
* @param ctx Netty channel handler context
234
* @param cause Exception that occurred
235
*/
236
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause);
237
238
/**
239
* Handle HTTP objects that reached the end of the pipeline.
240
* Typically handles unprocessed requests with 404 responses.
241
*
242
* @param ctx Netty channel handler context
243
* @param httpObject Unprocessed HTTP object
244
*/
245
protected void channelRead0(ChannelHandlerContext ctx, HttpObject httpObject);
246
}
247
```
248
249
## Usage Examples
250
251
### JAR Processing with Utilities
252
253
```java
254
import org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils;
255
import org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils.JarHandlerContext;
256
import java.nio.file.Paths;
257
258
// Tokenize program arguments
259
String argsString = "--input /data/input --output /data/output --parallelism 4";
260
List<String> args = JarHandlerUtils.tokenizeArguments(argsString);
261
// Result: ["--input", "/data/input", "--output", "/data/output", "--parallelism", "4"]
262
263
// Create JAR context from request
264
HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request = createRequest();
265
Path jarDir = Paths.get("/tmp/flink-jars");
266
JarHandlerContext context = JarHandlerContext.fromRequest(request, jarDir);
267
268
// Apply configuration
269
Configuration flinkConfig = new Configuration();
270
context.applyToConfiguration(flinkConfig);
271
272
// Create job graph for execution plan
273
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
274
JobGraph jobGraph = context.toJobGraph(classLoader);
275
276
// Create packaged program for execution
277
PackagedProgram program = context.toPackagedProgram(classLoader);
278
```
279
280
### Web Submission Extension Setup
281
282
```java
283
import org.apache.flink.runtime.webmonitor.WebSubmissionExtension;
284
import org.apache.flink.configuration.Configuration;
285
import java.nio.file.Paths;
286
import java.util.concurrent.Executors;
287
288
// Setup extension configuration
289
Configuration config = new Configuration();
290
Path jarDir = Paths.get("/tmp/flink-web-jars");
291
Executor executor = Executors.newCachedThreadPool();
292
Time timeout = Time.seconds(60);
293
Map<String, String> responseHeaders = new HashMap<>();
294
responseHeaders.put("Access-Control-Allow-Origin", "*");
295
296
// Create web submission extension
297
WebSubmissionExtension extension = new WebSubmissionExtension(
298
config,
299
leaderRetriever,
300
responseHeaders,
301
leaderElectionService,
302
jarDir,
303
executor,
304
timeout
305
);
306
307
// Get handlers for router registration
308
Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers =
309
extension.getHandlers();
310
311
// Register handlers with router
312
for (Tuple2<RestHandlerSpecification, ChannelInboundHandler> handler : handlers) {
313
RestHandlerSpecification spec = handler.f0;
314
ChannelInboundHandler implementation = handler.f1;
315
router.addHandler(spec, implementation);
316
}
317
318
// Cleanup when shutting down
319
extension.closeAsync().get();
320
```
321
322
### HTTP Request Handler Integration
323
324
```java
325
import org.apache.flink.runtime.webmonitor.HttpRequestHandler;
326
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
327
import java.io.File;
328
329
// Setup upload directory
330
File uploadDir = new File("/tmp/flink-uploads");
331
File validatedDir = HttpRequestHandler.checkAndCreateUploadDir(uploadDir);
332
333
// Create HTTP request handler
334
HttpRequestHandler httpHandler = new HttpRequestHandler(validatedDir);
335
336
// Add to Netty pipeline
337
public void initChannel(SocketChannel ch) {
338
ChannelPipeline pipeline = ch.pipeline();
339
pipeline.addLast("httpRequestHandler", httpHandler);
340
341
// Add error handler as final stage
342
PipelineErrorHandler errorHandler = new PipelineErrorHandler(logger);
343
pipeline.addLast("errorHandler", errorHandler);
344
}
345
346
// Cleanup upload directory when needed
347
HttpRequestHandler.logExternalUploadDirDeletion(uploadDir);
348
```
349
350
### Advanced JAR Context Usage
351
352
```java
353
// Custom JAR context processing
354
public class CustomJarProcessor {
355
public void processJar(JarHandlerContext context) throws Exception {
356
// Extract JAR information
357
Path jarFile = context.getJarFile();
358
String entryClass = context.getEntryClassName();
359
List<String> args = context.getProgramArguments();
360
Integer parallelism = context.getParallelism();
361
362
// Create custom configuration
363
Configuration config = new Configuration();
364
config.setInteger(CoreOptions.DEFAULT_PARALLELISM,
365
parallelism != null ? parallelism : 1);
366
367
// Apply JAR-specific settings
368
context.applyToConfiguration(config);
369
370
// Create and validate job graph
371
ClassLoader jarClassLoader = createJarClassLoader(jarFile);
372
JobGraph jobGraph = context.toJobGraph(jarClassLoader);
373
374
// Validate job graph
375
validateJobGraph(jobGraph);
376
377
// Create packaged program for execution
378
PackagedProgram program = context.toPackagedProgram(jarClassLoader);
379
380
// Execute or further process
381
executeProgram(program, config);
382
}
383
384
private ClassLoader createJarClassLoader(Path jarFile) {
385
// Custom class loader creation logic
386
return URLClassLoader.newInstance(
387
new URL[]{jarFile.toUri().toURL()},
388
Thread.currentThread().getContextClassLoader()
389
);
390
}
391
392
private void validateJobGraph(JobGraph jobGraph) {
393
// Custom validation logic
394
if (jobGraph.getNumberOfVertices() == 0) {
395
throw new IllegalArgumentException("Job graph is empty");
396
}
397
}
398
399
private void executeProgram(PackagedProgram program, Configuration config) {
400
// Custom execution logic
401
}
402
}
403
```
404
405
### Error Handling Integration
406
407
```java
408
// Comprehensive error handling setup
409
public class WebServerSetup {
410
public void setupPipeline(SocketChannel ch) {
411
ChannelPipeline pipeline = ch.pipeline();
412
413
// Add request processing handlers
414
pipeline.addLast("httpRequestHandler", new HttpRequestHandler(uploadDir));
415
416
// Add business logic handlers
417
pipeline.addLast("jarUploadHandler", jarUploadHandler);
418
pipeline.addLast("jarRunHandler", jarRunHandler);
419
420
// Add final error handler
421
PipelineErrorHandler errorHandler = new PipelineErrorHandler(logger);
422
pipeline.addLast("pipelineErrorHandler", errorHandler);
423
}
424
}
425
426
// Custom error handling in JAR operations
427
public void handleJarOperation() {
428
try {
429
JarHandlerContext context = JarHandlerContext.fromRequest(request, jarDir);
430
JobGraph jobGraph = context.toJobGraph(classLoader);
431
// Process job graph
432
} catch (ClassNotFoundException e) {
433
logger.error("Entry class not found in JAR", e);
434
throw new BadRequestException("Invalid entry class: " + e.getMessage());
435
} catch (Exception e) {
436
logger.error("Failed to process JAR", e);
437
throw new InternalServerErrorException("JAR processing failed");
438
}
439
}
440
```
441
442
These utilities and extensions provide the foundation for building custom web interfaces and extending Flink's web capabilities while maintaining consistency with the core framework.