0
# Web Server Infrastructure
1
2
Core HTTP request handling, server bootstrap, and pipeline management using the Netty framework. Provides the foundation for both the Runtime Web interface and History Server.
3
4
## Capabilities
5
6
### Web Frontend Bootstrap
7
8
Encapsulates Netty server bootstrap for web frontend with SSL support and configurable networking options.
9
10
```java { .api }
11
/**
12
* Netty server bootstrap for web frontend with SSL and configuration support
13
*/
14
public class WebFrontendBootstrap {
15
/**
16
* Create web frontend bootstrap with full configuration
17
* @param router request router for handling HTTP paths
18
* @param log logger for server events
19
* @param tmpDir temporary directory for file operations
20
* @param sslHandlerFactory SSL handler factory for HTTPS (nullable)
21
* @param configuredAddress configured server address
22
* @param configuredPort configured server port
23
* @param configuration Flink configuration
24
* @throws IOException if server cannot bind to port
25
* @throws InterruptedException if bootstrap is interrupted
26
*/
27
public WebFrontendBootstrap(
28
Router router,
29
Logger log,
30
File tmpDir,
31
SSLHandlerFactory sslHandlerFactory,
32
String configuredAddress,
33
int configuredPort,
34
Configuration configuration
35
) throws IOException, InterruptedException;
36
37
/**
38
* Get the actual bound server port
39
* @return port number the server is listening on
40
*/
41
public int getServerPort();
42
43
/**
44
* Get the REST API base address
45
* @return server address for REST API access
46
*/
47
public String getRestAddress();
48
49
/**
50
* Shutdown the server and cleanup resources
51
*/
52
public void shutdown();
53
}
54
```
55
56
**Usage Example:**
57
58
```java
59
// Create and start web frontend
60
Router router = new Router();
61
Logger logger = LoggerFactory.getLogger(WebFrontendBootstrap.class);
62
File tmpDir = new File("/tmp/flink-web");
63
64
// Optional SSL configuration
65
SSLHandlerFactory sslFactory = null; // or configure for HTTPS
66
67
Configuration config = new Configuration();
68
WebFrontendBootstrap bootstrap = new WebFrontendBootstrap(
69
router, logger, tmpDir, sslFactory, "localhost", 8081, config
70
);
71
72
System.out.println("Web server running on port: " + bootstrap.getServerPort());
73
System.out.println("REST API available at: " + bootstrap.getRestAddress());
74
75
// Cleanup when done
76
bootstrap.shutdown();
77
```
78
79
### HTTP Request Handler
80
81
Main HTTP request handler for file uploads, request delegation, and multipart form processing.
82
83
```java { .api }
84
/**
85
* Main HTTP request handler for file uploads and request delegation
86
*/
87
public class HttpRequestHandler extends SimpleChannelInboundHandler<HttpObject> {
88
/**
89
* Create HTTP request handler with temporary directory
90
* @param tmpDir directory for temporary file storage during uploads
91
*/
92
public HttpRequestHandler(File tmpDir);
93
94
/**
95
* Process HTTP requests including file uploads and routing
96
* @param ctx channel handler context
97
* @param msg HTTP request object
98
* @throws Exception if request processing fails
99
*/
100
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception;
101
102
/**
103
* Utility method to check and create upload directory
104
* @param uploadDir directory to create for file uploads
105
* @throws IOException if directory cannot be created
106
*/
107
public static void checkAndCreateUploadDir(File uploadDir) throws IOException;
108
}
109
```
110
111
**Usage Example:**
112
113
```java
114
// Set up HTTP request handler in Netty pipeline
115
File tmpDir = new File("/tmp/flink-uploads");
116
HttpRequestHandler.checkAndCreateUploadDir(tmpDir);
117
118
HttpRequestHandler requestHandler = new HttpRequestHandler(tmpDir);
119
120
// Add to Netty pipeline
121
ChannelPipeline pipeline = channel.pipeline();
122
pipeline.addLast("http-request-handler", requestHandler);
123
```
124
125
### Web Monitor Extension
126
127
Container for web submission handlers, configuring JAR upload, run, plan, and delete endpoints.
128
129
```java { .api }
130
/**
131
* Web submission handlers container for JAR management endpoints
132
*/
133
public class WebSubmissionExtension implements WebMonitorExtension {
134
/**
135
* Create web submission extension with full configuration
136
* @param configuration Flink configuration
137
* @param leaderRetriever gateway retriever for cluster communication
138
* @param responseHeaders HTTP response headers to include
139
* @param localAddressFuture future for local server address
140
* @param jarDir directory for storing uploaded JAR files
141
* @param executor executor for async operations
142
* @param timeout request timeout duration
143
*/
144
public WebSubmissionExtension(
145
Configuration configuration,
146
GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
147
Map<String, String> responseHeaders,
148
CompletableFuture<String> localAddressFuture,
149
Path jarDir,
150
Executor executor,
151
Duration timeout
152
);
153
154
/**
155
* Get all REST handlers provided by this extension
156
* @return collection of handler specifications and channel handlers
157
*/
158
public Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> getHandlers();
159
160
/**
161
* Async cleanup of extension resources
162
* @return future that completes when cleanup is done
163
*/
164
public CompletableFuture<Void> closeAsync();
165
}
166
```
167
168
### Pipeline Error Handler
169
170
Last handler in the Netty pipeline for error handling, logging, and unknown message processing.
171
172
```java { .api }
173
/**
174
* Pipeline error handler for unknown messages and exception handling
175
*/
176
public class PipelineErrorHandler extends SimpleChannelInboundHandler<Object> {
177
/**
178
* Create pipeline error handler with logger
179
* @param logger logger for error messages and unknown requests
180
*/
181
public PipelineErrorHandler(Logger logger);
182
183
/**
184
* Handle unknown messages not processed by other handlers
185
* @param ctx channel handler context
186
* @param message unknown message object
187
* @throws Exception if message handling fails
188
*/
189
protected void channelRead0(ChannelHandlerContext ctx, Object message) throws Exception;
190
191
/**
192
* Handle exceptions caught in the pipeline
193
* @param ctx channel handler context
194
* @param cause exception that was caught
195
* @throws Exception if exception handling fails
196
*/
197
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
198
}
199
```
200
201
## Utility Classes
202
203
### Log URL Utilities
204
205
Generate URLs for TaskManager and JobManager log access with proper formatting and validation.
206
207
```java { .api }
208
/**
209
* Utilities for generating log URLs for TaskManager and JobManager logs
210
*/
211
public class LogUrlUtil {
212
/**
213
* Create log URL for TaskManager logs
214
* @param webInterfaceURL base web interface URL
215
* @param taskManagerId TaskManager identifier
216
* @param logName name of the log file
217
* @return formatted log URL
218
*/
219
public static String createTaskManagerLogUrl(
220
String webInterfaceURL,
221
ResourceID taskManagerId,
222
String logName
223
);
224
225
/**
226
* Create log URL for JobManager logs
227
* @param webInterfaceURL base web interface URL
228
* @param logName name of the log file
229
* @return formatted log URL
230
*/
231
public static String createJobManagerLogUrl(
232
String webInterfaceURL,
233
String logName
234
);
235
236
/**
237
* Validate and format web interface URL
238
* @param baseUrl base URL to validate
239
* @return properly formatted URL
240
* @throws IllegalArgumentException if URL is invalid
241
*/
242
public static String validateWebInterfaceUrl(String baseUrl);
243
}
244
```
245
246
**Usage Example:**
247
248
```java
249
// Generate TaskManager log URLs
250
String webUrl = "http://localhost:8081";
251
ResourceID taskManagerId = ResourceID.generate();
252
String logUrl = LogUrlUtil.createTaskManagerLogUrl(webUrl, taskManagerId, "taskmanager.log");
253
String outUrl = LogUrlUtil.createTaskManagerLogUrl(webUrl, taskManagerId, "taskmanager.out");
254
255
// Generate JobManager log URLs
256
String jmLogUrl = LogUrlUtil.createJobManagerLogUrl(webUrl, "jobmanager.log");
257
String jmOutUrl = LogUrlUtil.createJobManagerLogUrl(webUrl, "jobmanager.out");
258
259
System.out.println("TaskManager log: " + logUrl);
260
System.out.println("JobManager log: " + jmLogUrl);
261
```
262
263
## Server Configuration
264
265
### Netty Configuration Options
266
267
The web server supports extensive configuration through Flink's configuration system:
268
269
```java
270
// Server binding configuration
271
config.setString(RestOptions.BIND_PORT, "8081");
272
config.setString(RestOptions.BIND_ADDRESS, "0.0.0.0");
273
274
// SSL configuration
275
config.setBoolean(SecurityOptions.SSL_REST_ENABLED, true);
276
config.setString(SecurityOptions.SSL_REST_KEYSTORE, "/path/to/keystore.jks");
277
config.setString(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, "password");
278
279
// Upload configuration
280
config.setString(WebOptions.UPLOAD_DIR, "/tmp/flink-web-uploads");
281
config.setLong(WebOptions.MAX_UPLOAD_SIZE, 100 * 1024 * 1024); // 100MB
282
283
// Timeout configuration
284
config.set(RestOptions.SERVER_MAX_CONTENT_LENGTH, MemorySize.ofMebiBytes(64));
285
config.set(RestOptions.CONNECTION_TIMEOUT, Duration.ofSeconds(30));
286
```
287
288
### SSL/TLS Support
289
290
```java { .api }
291
/**
292
* SSL configuration for secure HTTPS connections
293
*/
294
public interface SSLHandlerFactory {
295
/**
296
* Create SSL handler for channel pipeline
297
* @param alloc byte buffer allocator
298
* @return SSL handler for secure connections
299
* @throws SSLException if SSL configuration is invalid
300
*/
301
SslHandler createSSLHandler(ByteBufAllocator alloc) throws SSLException;
302
}
303
```
304
305
## Integration Patterns
306
307
### Custom Web Extensions
308
309
```java
310
// Create custom web extension
311
public class MyWebExtension implements WebMonitorExtension {
312
@Override
313
public Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> getHandlers() {
314
return Arrays.asList(
315
Tuple2.of(MyCustomHeaders.getInstance(), new MyCustomHandler())
316
);
317
}
318
319
@Override
320
public CompletableFuture<Void> closeAsync() {
321
// Cleanup resources
322
return CompletableFuture.completedFuture(null);
323
}
324
}
325
```
326
327
### Server Lifecycle Management
328
329
```java
330
// Complete server setup and lifecycle
331
public class FlinkWebServer {
332
private WebFrontendBootstrap bootstrap;
333
334
public void start(Configuration config) throws Exception {
335
// Set up router and handlers
336
Router router = new Router();
337
338
// Add extensions
339
WebSubmissionExtension submission = new WebSubmissionExtension(/*...*/);
340
for (Tuple2<RestHandlerSpecification, ChannelInboundHandler> handler : submission.getHandlers()) {
341
router.addHandler(handler.f0, handler.f1);
342
}
343
344
// Start server
345
bootstrap = new WebFrontendBootstrap(
346
router, logger, tmpDir, sslFactory, "localhost", 8081, config
347
);
348
349
System.out.println("Flink Web Server started on port: " + bootstrap.getServerPort());
350
}
351
352
public void stop() {
353
if (bootstrap != null) {
354
bootstrap.shutdown();
355
}
356
}
357
}
358
```
359
360
## Error Handling and Monitoring
361
362
### Request Processing Errors
363
364
The web server provides comprehensive error handling:
365
366
- HTTP 400 for malformed requests
367
- HTTP 404 for unknown endpoints
368
- HTTP 413 for oversized uploads
369
- HTTP 500 for internal server errors
370
- Proper JSON error responses with detail messages
371
372
### Monitoring Integration
373
374
```java
375
// Server metrics and monitoring
376
public class WebServerMetrics {
377
private final Counter requestCounter;
378
private final Histogram requestLatency;
379
private final Gauge activeConnections;
380
381
public void recordRequest(String endpoint, long duration) {
382
requestCounter.inc();
383
requestLatency.update(duration);
384
}
385
}
386
```
387
388
The web server infrastructure provides a robust, configurable foundation for Flink's web interfaces with comprehensive support for file uploads, SSL/TLS, custom extensions, and proper error handling.