Apache Flink Web Dashboard - Provides a web-based user interface for monitoring and managing Apache Flink jobs and runtime.
—
Standalone server functionality for viewing archived job information and serving static files. The History Server provides long-term job monitoring and analysis capabilities, allowing users to explore completed jobs through a web interface.
Main class for the standalone history server that serves archived job information and web dashboard files.
/**
* Standalone history server for viewing archived Flink job information.
* Provides web interface for exploring completed jobs and their execution details.
*/
public class HistoryServer {
/**
* Create a history server with the specified configuration.
*
* @param configuration Flink configuration containing history server settings
*/
public HistoryServer(Configuration configuration);
/**
* Create a history server with configuration and event listener.
*
* @param configuration Flink configuration containing history server settings
* @param eventListener Consumer for archive events (job discovery, updates, etc.)
*/
public HistoryServer(Configuration configuration, Consumer<ArchiveEvent> eventListener);
/**
* Main entry point for starting the history server as a standalone application.
* Reads configuration from command line arguments and system properties.
*
* @param args Command line arguments for server configuration
*/
public static void main(String[] args);
/**
* Start the history server and begin serving requests.
* This method blocks until the server is shut down.
*/
public void run();
}Manages the fetching and processing of job archives for the history server.
/**
* Fetches and manages job archives for the history server.
* Handles discovery, downloading, and processing of archived job information.
*/
public class HistoryServerArchiveFetcher {
/**
* Create an archive fetcher with the specified configuration.
*
* @param refreshDirs List of refresh locations containing paths and file systems
* @param webDir Directory for storing processed web dashboard files
* @param jobArchiveEventListener Consumer for archive events (created/deleted)
* @param cleanupExpiredArchives Whether to clean up expired job archives
* @param maxHistorySize Maximum number of jobs to retain in history (-1 for unlimited)
*/
public HistoryServerArchiveFetcher(
List<HistoryServer.RefreshLocation> refreshDirs,
File webDir,
Consumer<ArchiveEvent> jobArchiveEventListener,
boolean cleanupExpiredArchives,
int maxHistorySize
);
/**
* Fetch archives from all configured directories.
* Scans refresh locations for new job archives and processes them.
*/
void fetchArchives();
/**
* Event representing archive operations in the history server.
*/
public static class ArchiveEvent {
public ArchiveEvent(String jobID, ArchiveEventType operation);
public String getJobID();
public ArchiveEventType getType();
}
/**
* Types of archive events that can occur.
*/
public enum ArchiveEventType {
/** Job archive was created in history server. */
CREATED,
/** Job archive was deleted from history server. */
DELETED
}
}Netty handler for serving static files from the web dashboard.
/**
* Netty channel handler for serving static files for the history server web interface.
* Handles HTTP requests for static assets like HTML, CSS, JavaScript, and images.
*/
public class HistoryServerStaticFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
/**
* Create a static file server handler.
*
* @param webRootDir Root directory containing static web files
*/
public HistoryServerStaticFileServerHandler(File webRootDir);
/**
* Handle incoming HTTP requests for static files.
* Serves files from the configured web root directory.
*
* @param ctx Netty channel handler context
* @param request HTTP request for static file
*/
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request);
}import org.apache.flink.runtime.webmonitor.history.HistoryServer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HistoryServerOptions;
// Configure history server
Configuration config = new Configuration();
config.setString(HistoryServerOptions.HISTORY_SERVER_WEB_ADDRESS, "0.0.0.0");
config.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 8082);
config.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, "/path/to/archives");
config.setLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL, 10000L);
// Start history server
HistoryServer historyServer = new HistoryServer(config);
historyServer.run(); // Blocks until shutdownimport org.apache.flink.runtime.webmonitor.history.HistoryServer;
import java.util.function.Consumer;
// Create event listener for archive events
Consumer<ArchiveEvent> eventListener = event -> {
switch (event.getType()) {
case ARCHIVE_DISCOVERED:
System.out.println("New archive discovered: " + event.getArchivePath());
break;
case ARCHIVE_UPDATED:
System.out.println("Archive updated: " + event.getArchivePath());
break;
case ARCHIVE_REMOVED:
System.out.println("Archive removed: " + event.getArchivePath());
break;
}
};
// Create history server with event listener
HistoryServer historyServer = new HistoryServer(config, eventListener);
historyServer.run();// Run as standalone application
public class HistoryServerApplication {
public static void main(String[] args) {
// Pass configuration through command line arguments
// Example: --historyserver.web.address=0.0.0.0 --historyserver.web.port=8082
HistoryServer.main(args);
}
}import org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher;
import java.io.File;
import java.util.Arrays;
import java.util.List;
// Configure archive directories
List<HistoryServerArchiveFetcher.ArchiveDirectory> archiveDirs = Arrays.asList(
new HistoryServerArchiveFetcher.ArchiveDirectory("/path/to/local/archives"),
new HistoryServerArchiveFetcher.ArchiveDirectory("hdfs://namenode:port/flink-archives"),
new HistoryServerArchiveFetcher.ArchiveDirectory("s3://bucket/flink-archives")
);
// Create archive fetcher
HistoryServerArchiveFetcher fetcher = new HistoryServerArchiveFetcher(
10000L, // Refresh every 10 seconds
archiveDirs, // Archive directories to scan
new File("/web/dashboard"), // Web dashboard files
eventListener // Event listener for archive changes
);
// Start fetcher
fetcher.start();
// Stop when done
fetcher.stop();import org.apache.flink.runtime.webmonitor.history.HistoryServerStaticFileServerHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
import java.io.File;
// Setup static file serving in Netty pipeline
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// Add static file handler for web dashboard assets
File webRoot = new File("/path/to/web-dashboard");
HistoryServerStaticFileServerHandler staticHandler =
new HistoryServerStaticFileServerHandler(webRoot);
pipeline.addLast("staticFileHandler", staticHandler);
}The History Server supports extensive configuration through Flink's configuration system:
// Common configuration options
Configuration config = new Configuration();
// Server binding
config.setString(HistoryServerOptions.HISTORY_SERVER_WEB_ADDRESS, "localhost");
config.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 8082);
// Archive locations
config.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS,
"/local/archives,hdfs://namenode/archives");
// Refresh settings
config.setLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL, 10000L);
// Web dashboard location
config.setString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR, "/path/to/web-dashboard");
// SSL configuration (optional)
config.setBoolean(SecurityOptions.SSL_ENABLED, true);
config.setString(SecurityOptions.SSL_KEYSTORE, "/path/to/keystore");
config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");The History Server supports event-driven architecture for archive management:
// Archive event types available to event listeners
public enum ArchiveEventType {
ARCHIVE_DISCOVERED, // New archive found
ARCHIVE_UPDATED, // Existing archive updated
ARCHIVE_REMOVED // Archive no longer available
}
// Event handling example
Consumer<ArchiveEvent> listener = event -> {
String archivePath = event.getArchivePath();
JobID jobId = event.getJobId();
ArchiveEventType type = event.getType();
// Custom handling based on event type
handleArchiveEvent(type, archivePath, jobId);
};The History Server includes robust error handling for common scenarios:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-runtime-web-2-12