CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-runtime-web

Web-based monitoring and management interface for Apache Flink with REST APIs and Angular dashboard.

Pending
Overview
Eval results
Files

web-server.mddocs/

Web Server Infrastructure

Core HTTP request handling, server bootstrap, and pipeline management using the Netty framework. Provides the foundation for both the Runtime Web interface and History Server.

Capabilities

Web Frontend Bootstrap

Encapsulates Netty server bootstrap for web frontend with SSL support and configurable networking options.

/**
 * Netty server bootstrap for web frontend with SSL and configuration support
 */
public class WebFrontendBootstrap {
    /**
     * Create web frontend bootstrap with full configuration
     * @param router request router for handling HTTP paths
     * @param log logger for server events
     * @param tmpDir temporary directory for file operations
     * @param sslHandlerFactory SSL handler factory for HTTPS (nullable)
     * @param configuredAddress configured server address
     * @param configuredPort configured server port
     * @param configuration Flink configuration
     * @throws IOException if server cannot bind to port
     * @throws InterruptedException if bootstrap is interrupted
     */
    public WebFrontendBootstrap(
        Router router,
        Logger log,
        File tmpDir,
        SSLHandlerFactory sslHandlerFactory,
        String configuredAddress,
        int configuredPort,
        Configuration configuration
    ) throws IOException, InterruptedException;
    
    /**
     * Get the actual bound server port
     * @return port number the server is listening on
     */
    public int getServerPort();
    
    /**
     * Get the REST API base address
     * @return server address for REST API access
     */
    public String getRestAddress();
    
    /**
     * Shutdown the server and cleanup resources
     */
    public void shutdown();
}

Usage Example:

// Create and start web frontend
Router router = new Router();
Logger logger = LoggerFactory.getLogger(WebFrontendBootstrap.class);
File tmpDir = new File("/tmp/flink-web");

// Optional SSL configuration
SSLHandlerFactory sslFactory = null; // or configure for HTTPS

Configuration config = new Configuration();
WebFrontendBootstrap bootstrap = new WebFrontendBootstrap(
    router, logger, tmpDir, sslFactory, "localhost", 8081, config
);

System.out.println("Web server running on port: " + bootstrap.getServerPort());
System.out.println("REST API available at: " + bootstrap.getRestAddress());

// Cleanup when done
bootstrap.shutdown();

HTTP Request Handler

Main HTTP request handler for file uploads, request delegation, and multipart form processing.

/**
 * Main HTTP request handler for file uploads and request delegation
 */
public class HttpRequestHandler extends SimpleChannelInboundHandler<HttpObject> {
    /**
     * Create HTTP request handler with temporary directory
     * @param tmpDir directory for temporary file storage during uploads
     */
    public HttpRequestHandler(File tmpDir);
    
    /**
     * Process HTTP requests including file uploads and routing
     * @param ctx channel handler context
     * @param msg HTTP request object
     * @throws Exception if request processing fails
     */
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception;
    
    /**
     * Utility method to check and create upload directory
     * @param uploadDir directory to create for file uploads
     * @throws IOException if directory cannot be created
     */
    public static void checkAndCreateUploadDir(File uploadDir) throws IOException;
}

Usage Example:

// Set up HTTP request handler in Netty pipeline
File tmpDir = new File("/tmp/flink-uploads");
HttpRequestHandler.checkAndCreateUploadDir(tmpDir);

HttpRequestHandler requestHandler = new HttpRequestHandler(tmpDir);

// Add to Netty pipeline
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("http-request-handler", requestHandler);

Web Monitor Extension

Container for web submission handlers, configuring JAR upload, run, plan, and delete endpoints.

/**
 * Web submission handlers container for JAR management endpoints
 */
public class WebSubmissionExtension implements WebMonitorExtension {
    /**
     * Create web submission extension with full configuration
     * @param configuration Flink configuration
     * @param leaderRetriever gateway retriever for cluster communication  
     * @param responseHeaders HTTP response headers to include
     * @param localAddressFuture future for local server address
     * @param jarDir directory for storing uploaded JAR files
     * @param executor executor for async operations
     * @param timeout request timeout duration
     */
    public WebSubmissionExtension(
        Configuration configuration,
        GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
        Map<String, String> responseHeaders,
        CompletableFuture<String> localAddressFuture,
        Path jarDir,
        Executor executor,
        Duration timeout
    );
    
    /**
     * Get all REST handlers provided by this extension
     * @return collection of handler specifications and channel handlers
     */
    public Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> getHandlers();
    
    /**
     * Async cleanup of extension resources
     * @return future that completes when cleanup is done
     */
    public CompletableFuture<Void> closeAsync();
}

Pipeline Error Handler

Last handler in the Netty pipeline for error handling, logging, and unknown message processing.

/**
 * Pipeline error handler for unknown messages and exception handling
 */
public class PipelineErrorHandler extends SimpleChannelInboundHandler<Object> {
    /**
     * Create pipeline error handler with logger
     * @param logger logger for error messages and unknown requests
     */
    public PipelineErrorHandler(Logger logger);
    
    /**
     * Handle unknown messages not processed by other handlers
     * @param ctx channel handler context
     * @param message unknown message object
     * @throws Exception if message handling fails
     */
    protected void channelRead0(ChannelHandlerContext ctx, Object message) throws Exception;
    
    /**
     * Handle exceptions caught in the pipeline
     * @param ctx channel handler context
     * @param cause exception that was caught
     * @throws Exception if exception handling fails
     */
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

Utility Classes

Log URL Utilities

Generate URLs for TaskManager and JobManager log access with proper formatting and validation.

/**
 * Utilities for generating log URLs for TaskManager and JobManager logs
 */
public class LogUrlUtil {
    /**
     * Create log URL for TaskManager logs
     * @param webInterfaceURL base web interface URL
     * @param taskManagerId TaskManager identifier
     * @param logName name of the log file
     * @return formatted log URL
     */
    public static String createTaskManagerLogUrl(
        String webInterfaceURL,
        ResourceID taskManagerId, 
        String logName
    );
    
    /**
     * Create log URL for JobManager logs
     * @param webInterfaceURL base web interface URL
     * @param logName name of the log file
     * @return formatted log URL
     */
    public static String createJobManagerLogUrl(
        String webInterfaceURL,
        String logName
    );
    
    /**
     * Validate and format web interface URL
     * @param baseUrl base URL to validate
     * @return properly formatted URL
     * @throws IllegalArgumentException if URL is invalid
     */
    public static String validateWebInterfaceUrl(String baseUrl);
}

Usage Example:

// Generate TaskManager log URLs
String webUrl = "http://localhost:8081";
ResourceID taskManagerId = ResourceID.generate();
String logUrl = LogUrlUtil.createTaskManagerLogUrl(webUrl, taskManagerId, "taskmanager.log");
String outUrl = LogUrlUtil.createTaskManagerLogUrl(webUrl, taskManagerId, "taskmanager.out");

// Generate JobManager log URLs  
String jmLogUrl = LogUrlUtil.createJobManagerLogUrl(webUrl, "jobmanager.log");
String jmOutUrl = LogUrlUtil.createJobManagerLogUrl(webUrl, "jobmanager.out");

System.out.println("TaskManager log: " + logUrl);
System.out.println("JobManager log: " + jmLogUrl);

Server Configuration

Netty Configuration Options

The web server supports extensive configuration through Flink's configuration system:

// Server binding configuration
config.setString(RestOptions.BIND_PORT, "8081");
config.setString(RestOptions.BIND_ADDRESS, "0.0.0.0");

// SSL configuration
config.setBoolean(SecurityOptions.SSL_REST_ENABLED, true);
config.setString(SecurityOptions.SSL_REST_KEYSTORE, "/path/to/keystore.jks");
config.setString(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, "password");

// Upload configuration
config.setString(WebOptions.UPLOAD_DIR, "/tmp/flink-web-uploads");
config.setLong(WebOptions.MAX_UPLOAD_SIZE, 100 * 1024 * 1024); // 100MB

// Timeout configuration
config.set(RestOptions.SERVER_MAX_CONTENT_LENGTH, MemorySize.ofMebiBytes(64));
config.set(RestOptions.CONNECTION_TIMEOUT, Duration.ofSeconds(30));

SSL/TLS Support

/**
 * SSL configuration for secure HTTPS connections
 */
public interface SSLHandlerFactory {
    /**
     * Create SSL handler for channel pipeline
     * @param alloc byte buffer allocator
     * @return SSL handler for secure connections
     * @throws SSLException if SSL configuration is invalid
     */
    SslHandler createSSLHandler(ByteBufAllocator alloc) throws SSLException;
}

Integration Patterns

Custom Web Extensions

// Create custom web extension
public class MyWebExtension implements WebMonitorExtension {
    @Override
    public Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> getHandlers() {
        return Arrays.asList(
            Tuple2.of(MyCustomHeaders.getInstance(), new MyCustomHandler())
        );
    }
    
    @Override 
    public CompletableFuture<Void> closeAsync() {
        // Cleanup resources
        return CompletableFuture.completedFuture(null);
    }
}

Server Lifecycle Management

// Complete server setup and lifecycle
public class FlinkWebServer {
    private WebFrontendBootstrap bootstrap;
    
    public void start(Configuration config) throws Exception {
        // Set up router and handlers
        Router router = new Router();
        
        // Add extensions
        WebSubmissionExtension submission = new WebSubmissionExtension(/*...*/);
        for (Tuple2<RestHandlerSpecification, ChannelInboundHandler> handler : submission.getHandlers()) {
            router.addHandler(handler.f0, handler.f1);
        }
        
        // Start server
        bootstrap = new WebFrontendBootstrap(
            router, logger, tmpDir, sslFactory, "localhost", 8081, config
        );
        
        System.out.println("Flink Web Server started on port: " + bootstrap.getServerPort());
    }
    
    public void stop() {
        if (bootstrap != null) {
            bootstrap.shutdown();
        }
    }
}

Error Handling and Monitoring

Request Processing Errors

The web server provides comprehensive error handling:

  • HTTP 400 for malformed requests
  • HTTP 404 for unknown endpoints
  • HTTP 413 for oversized uploads
  • HTTP 500 for internal server errors
  • Proper JSON error responses with detail messages

Monitoring Integration

// Server metrics and monitoring
public class WebServerMetrics {
    private final Counter requestCounter;
    private final Histogram requestLatency;
    private final Gauge activeConnections;
    
    public void recordRequest(String endpoint, long duration) {
        requestCounter.inc();
        requestLatency.update(duration);
    }
}

The web server infrastructure provides a robust, configurable foundation for Flink's web interfaces with comprehensive support for file uploads, SSL/TLS, custom extensions, and proper error handling.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-runtime-web

docs

angular-dashboard.md

dto.md

history-server.md

index.md

jar-management.md

web-server.md

tile.json