CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-runtime-web-2-12

Apache Flink Web Dashboard - Provides a web-based user interface for monitoring and managing Apache Flink jobs and runtime.

Pending
Overview
Eval results
Files

utilities-extensions.mddocs/

Utilities and Extensions

Helper utilities and extension points for JAR processing and web submission functionality. These components provide reusable functionality for custom implementations and internal processing operations.

Capabilities

JAR Handler Utilities

Core utility class providing JAR processing functionality and context management.

/**
 * Utility class providing helper functions for JAR handlers.
 * Contains static methods and context classes for JAR processing operations.
 */
public class JarHandlerUtils {
    /**
     * Tokenize program arguments string into individual arguments.
     * Handles quoted arguments and proper escaping.
     * 
     * @param args Program arguments as a single string
     * @return List of individual argument strings
     */
    public static List<String> tokenizeArguments(String args);
    
    /**
     * Context object for JAR processing operations.
     * Provides methods for converting between different representations of JAR configurations.
     */
    public static class JarHandlerContext {
        /**
         * Create a JAR handler context from an HTTP request.
         * Extracts JAR configuration from request body and parameters.
         * 
         * @param request HandlerRequest containing JAR configuration
         * @param jarDir Directory containing uploaded JAR files
         * @param log Logger instance for operation logging
         * @return JarHandlerContext with extracted configuration
         * @throws RestHandlerException if request parameters are invalid
         */
        public static JarHandlerContext fromRequest(
            HandlerRequest<JarRequestBody, ?> request, 
            Path jarDir,
            Logger log
        ) throws RestHandlerException;
        
        /**
         * Apply JAR configuration to a Flink Configuration object.
         * Sets parallelism, program arguments, job ID, and other job settings.
         * 
         * @param configuration Target Flink configuration to modify
         */
        public void applyToConfiguration(Configuration configuration);
        
        /**
         * Convert JAR configuration to a Flink JobGraph.
         * Creates the execution graph for the job without running it.
         * 
         * @param packagedProgram PackagedProgram containing the job
         * @param configuration Flink configuration for job execution
         * @param suppressOutput Whether to suppress program output during job graph creation
         * @return JobGraph representing the job execution plan
         * @throws ProgramInvocationException if job graph creation fails
         */
        public JobGraph toJobGraph(
            PackagedProgram packagedProgram,
            Configuration configuration,
            boolean suppressOutput
        ) throws ProgramInvocationException;
        
        /**
         * Convert JAR configuration to a PackagedProgram.
         * Creates a packaged program that can be executed or analyzed.
         * 
         * @param configuration Flink configuration for program creation
         * @return PackagedProgram ready for execution
         * @throws ProgramInvocationException if program creation fails
         */
        public PackagedProgram toPackagedProgram(Configuration configuration) throws ProgramInvocationException;
        
        /**
         * Get the JAR file path for this context.
         * 
         * @return Path to the JAR file
         */
        public Path getJarFile();
        
        /**
         * Get the entry class name.
         * 
         * @return Fully qualified entry class name, or null if not specified
         */
        public String getEntryClassName();
        
        /**
         * Get the program arguments.
         * 
         * @return List of program arguments
         */
        public List<String> getProgramArguments();
        
        /**
         * Get the job parallelism.
         * 
         * @return Parallelism value, or null for default
         */
        public Integer getParallelism();
        
        /**
         * Get the job ID.
         * 
         * @return Job ID, or null if not specified
         */
        public JobID getJobId();
    }
}

Web Submission Extension

Extension point for adding JAR submission capabilities to the web interface.

/**
 * Extension that provides JAR submission capabilities to the Flink web interface.
 * Implements WebMonitorExtension to integrate with the web server framework.
 */
public class WebSubmissionExtension implements WebMonitorExtension {
    /**
     * Create a web submission extension.
     * 
     * @param configuration Flink configuration
     * @param leaderRetriever Gateway retriever for accessing Flink cluster
     * @param responseHeaders HTTP headers to include in responses
     * @param leaderElectionService Service for leader election
     * @param jarDir Directory for storing uploaded JAR files
     * @param executor Executor for handling submission operations
     * @param timeout Request timeout for operations
     */
    public WebSubmissionExtension(
        Configuration configuration,
        GatewayRetriever<? extends RestfulGateway> leaderRetriever,
        Map<String, String> responseHeaders,
        CompletableFuture<String> leaderElectionService,
        Path jarDir,
        Executor executor,
        Time timeout
    );
    
    /**
     * Get the collection of REST handlers provided by this extension.
     * Returns handlers for JAR upload, list, run, delete, and plan operations.
     * 
     * @return Collection of handler specifications and implementations
     */
    public Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> getHandlers();
    
    /**
     * Asynchronously close the extension and clean up resources.
     * 
     * @return CompletableFuture that completes when cleanup is finished
     */
    public CompletableFuture<Void> closeAsync();
}

HTTP Request Handler

Low-level HTTP request handler for file uploads and general request processing.

/**
 * Netty channel handler for processing HTTP requests and file uploads.
 * Handles multipart/form-data uploads and basic HTTP request routing.
 */
public class HttpRequestHandler extends SimpleChannelInboundHandler<HttpObject> {
    /**
     * Create an HTTP request handler.
     * 
     * @param uploadDir Directory for storing uploaded files
     */
    public HttpRequestHandler(File uploadDir);
    
    /**
     * Check and create upload directory if it doesn't exist.
     * Validates directory permissions and creates necessary parent directories.
     * 
     * @param uploadDir Directory to check and create
     * @return The validated upload directory
     * @throws IOException if directory cannot be created or accessed
     */
    public static File checkAndCreateUploadDir(File uploadDir) throws IOException;
    
    /**
     * Log external upload directory deletion for cleanup tracking.
     * Used for auditing and debugging upload directory management.
     * 
     * @param uploadDir Directory that was deleted
     */
    public static void logExternalUploadDirDeletion(File uploadDir);
    
    /**
     * Handle incoming HTTP request objects.
     * Processes both simple requests and multipart file uploads.
     * 
     * @param ctx Netty channel handler context
     * @param httpObject HTTP request object (request, content, etc.)
     */
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject httpObject);
}

Pipeline Error Handler

Final error handler in the Netty pipeline for comprehensive error handling.

/**
 * Final error handler in the Netty pipeline for unhandled exceptions.
 * Provides centralized error logging and response generation.
 */
public class PipelineErrorHandler extends SimpleChannelInboundHandler<HttpObject> {
    /**
     * Create a pipeline error handler.
     * 
     * @param logger Logger for error reporting
     */
    public PipelineErrorHandler(Logger logger);
    
    /**
     * Handle exceptions that occurred during request processing.
     * Logs errors and sends appropriate HTTP error responses to clients.
     * 
     * @param ctx Netty channel handler context
     * @param cause Exception that occurred
     */
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause);
    
    /**
     * Handle HTTP objects that reached the end of the pipeline.
     * Typically handles unprocessed requests with 404 responses.
     * 
     * @param ctx Netty channel handler context
     * @param httpObject Unprocessed HTTP object
     */
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject httpObject);
}

Usage Examples

JAR Processing with Utilities

import org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils;
import org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils.JarHandlerContext;
import java.nio.file.Paths;

// Tokenize program arguments
String argsString = "--input /data/input --output /data/output --parallelism 4";
List<String> args = JarHandlerUtils.tokenizeArguments(argsString);
// Result: ["--input", "/data/input", "--output", "/data/output", "--parallelism", "4"]

// Create JAR context from request
HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request = createRequest();
Path jarDir = Paths.get("/tmp/flink-jars");
JarHandlerContext context = JarHandlerContext.fromRequest(request, jarDir);

// Apply configuration
Configuration flinkConfig = new Configuration();
context.applyToConfiguration(flinkConfig);

// Create job graph for execution plan
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
JobGraph jobGraph = context.toJobGraph(classLoader);

// Create packaged program for execution
PackagedProgram program = context.toPackagedProgram(classLoader);

Web Submission Extension Setup

import org.apache.flink.runtime.webmonitor.WebSubmissionExtension;
import org.apache.flink.configuration.Configuration;
import java.nio.file.Paths;
import java.util.concurrent.Executors;

// Setup extension configuration
Configuration config = new Configuration();
Path jarDir = Paths.get("/tmp/flink-web-jars");
Executor executor = Executors.newCachedThreadPool();
Time timeout = Time.seconds(60);
Map<String, String> responseHeaders = new HashMap<>();
responseHeaders.put("Access-Control-Allow-Origin", "*");

// Create web submission extension
WebSubmissionExtension extension = new WebSubmissionExtension(
    config,
    leaderRetriever,
    responseHeaders,
    leaderElectionService,
    jarDir,
    executor,
    timeout
);

// Get handlers for router registration
Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = 
    extension.getHandlers();

// Register handlers with router
for (Tuple2<RestHandlerSpecification, ChannelInboundHandler> handler : handlers) {
    RestHandlerSpecification spec = handler.f0;
    ChannelInboundHandler implementation = handler.f1;
    router.addHandler(spec, implementation);
}

// Cleanup when shutting down
extension.closeAsync().get();

HTTP Request Handler Integration

import org.apache.flink.runtime.webmonitor.HttpRequestHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
import java.io.File;

// Setup upload directory
File uploadDir = new File("/tmp/flink-uploads");
File validatedDir = HttpRequestHandler.checkAndCreateUploadDir(uploadDir);

// Create HTTP request handler
HttpRequestHandler httpHandler = new HttpRequestHandler(validatedDir);

// Add to Netty pipeline
public void initChannel(SocketChannel ch) {
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addLast("httpRequestHandler", httpHandler);
    
    // Add error handler as final stage
    PipelineErrorHandler errorHandler = new PipelineErrorHandler(logger);
    pipeline.addLast("errorHandler", errorHandler);
}

// Cleanup upload directory when needed
HttpRequestHandler.logExternalUploadDirDeletion(uploadDir);

Advanced JAR Context Usage

// Custom JAR context processing
public class CustomJarProcessor {
    public void processJar(JarHandlerContext context) throws Exception {
        // Extract JAR information
        Path jarFile = context.getJarFile();
        String entryClass = context.getEntryClassName();
        List<String> args = context.getProgramArguments();
        Integer parallelism = context.getParallelism();
        
        // Create custom configuration
        Configuration config = new Configuration();
        config.setInteger(CoreOptions.DEFAULT_PARALLELISM, 
            parallelism != null ? parallelism : 1);
        
        // Apply JAR-specific settings
        context.applyToConfiguration(config);
        
        // Create and validate job graph
        ClassLoader jarClassLoader = createJarClassLoader(jarFile);
        JobGraph jobGraph = context.toJobGraph(jarClassLoader);
        
        // Validate job graph
        validateJobGraph(jobGraph);
        
        // Create packaged program for execution
        PackagedProgram program = context.toPackagedProgram(jarClassLoader);
        
        // Execute or further process
        executeProgram(program, config);
    }
    
    private ClassLoader createJarClassLoader(Path jarFile) {
        // Custom class loader creation logic
        return URLClassLoader.newInstance(
            new URL[]{jarFile.toUri().toURL()},
            Thread.currentThread().getContextClassLoader()
        );
    }
    
    private void validateJobGraph(JobGraph jobGraph) {
        // Custom validation logic
        if (jobGraph.getNumberOfVertices() == 0) {
            throw new IllegalArgumentException("Job graph is empty");
        }
    }
    
    private void executeProgram(PackagedProgram program, Configuration config) {
        // Custom execution logic
    }
}

Error Handling Integration

// Comprehensive error handling setup
public class WebServerSetup {
    public void setupPipeline(SocketChannel ch) {
        ChannelPipeline pipeline = ch.pipeline();
        
        // Add request processing handlers
        pipeline.addLast("httpRequestHandler", new HttpRequestHandler(uploadDir));
        
        // Add business logic handlers
        pipeline.addLast("jarUploadHandler", jarUploadHandler);
        pipeline.addLast("jarRunHandler", jarRunHandler);
        
        // Add final error handler
        PipelineErrorHandler errorHandler = new PipelineErrorHandler(logger);
        pipeline.addLast("pipelineErrorHandler", errorHandler);
    }
}

// Custom error handling in JAR operations
public void handleJarOperation() {
    try {
        JarHandlerContext context = JarHandlerContext.fromRequest(request, jarDir);
        JobGraph jobGraph = context.toJobGraph(classLoader);
        // Process job graph
    } catch (ClassNotFoundException e) {
        logger.error("Entry class not found in JAR", e);
        throw new BadRequestException("Invalid entry class: " + e.getMessage());
    } catch (Exception e) {
        logger.error("Failed to process JAR", e);
        throw new InternalServerErrorException("JAR processing failed");
    }
}

These utilities and extensions provide the foundation for building custom web interfaces and extending Flink's web capabilities while maintaining consistency with the core framework.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-runtime-web-2-12

docs

data-transfer-objects.md

history-server.md

index.md

jar-management.md

rest-api-specifications.md

utilities-extensions.md

web-server-bootstrap.md

tile.json