Web-based monitoring and management interface for Apache Flink with REST APIs and Angular dashboard.
—
Standalone server providing web interface and REST API for completed job analysis. The History Server allows operators to browse and analyze historical job execution data without requiring access to a running Flink cluster.
Entry point for the standalone history server with complete lifecycle management.
/**
* Main entry point for history server providing web interface for completed jobs
*/
public class HistoryServer {
/**
* Create history server with configuration
* @param config Flink configuration with history server settings
*/
public HistoryServer(Configuration config);
/**
* Start the history server and web interface
* @throws Exception if server fails to start
*/
public void start() throws Exception;
/**
* Stop the history server and cleanup resources
* @throws Exception if server fails to stop cleanly
*/
public void stop() throws Exception;
/**
* Get the bound web server port
* @return port number the server is listening on
*/
public int getWebPort();
/**
* Main entry point for standalone execution
* @param args command line arguments
* @throws Exception if server fails to start
*/
public static void main(String[] args) throws Exception;
}Usage Example:
// Start a history server programmatically
Configuration config = new Configuration();
config.setString(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, "8082");
config.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, "/path/to/job/archives");
config.set(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL, Duration.ofMinutes(10));
HistoryServer historyServer = new HistoryServer(config);
historyServer.start();
System.out.println("History server running on port: " + historyServer.getWebPort());
// Stop when done
historyServer.stop();# Start from command line
java -cp flink-dist.jar org.apache.flink.runtime.webmonitor.history.HistoryServer \
--configDir /path/to/flink/confFetches and caches job archives from configured directories with automatic refresh capabilities.
/**
* Fetches and caches job archives from configured directories
*/
public class HistoryServerArchiveFetcher {
/**
* Create archive fetcher for specified directories
* @param refreshInterval how often to check for new archives
* @param archiveDirs directories containing job archives
* @param webDir directory for extracted web files
* @param counters metrics counters for monitoring
*/
public HistoryServerArchiveFetcher(
Duration refreshInterval,
List<HistoryServer.ArchiveConfiguration> archiveDirs,
File webDir,
HistoryServer.Counters counters
);
/**
* Start the archive fetcher background process
*/
public void start();
/**
* Stop the archive fetcher and cleanup
*/
public void stop();
}Serves static files for the history server web UI including HTML, CSS, JavaScript, and other assets.
/**
* Serves static files for history server web UI
*/
public class HistoryServerStaticFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
/**
* Create static file server handler
* @param webDir directory containing web assets
*/
public HistoryServerStaticFileServerHandler(File webDir);
/**
* Handle HTTP request for static file
* @param ctx channel handler context
* @param request HTTP request
*/
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request);
}/**
* Configuration options for History Server
*/
public class HistoryServerOptions {
/**
* Port for the history server web interface
* Default: 8082
*/
public static final ConfigOption<String> HISTORY_SERVER_WEB_PORT;
/**
* Comma-separated list of directories containing job archives
*/
public static final ConfigOption<String> HISTORY_SERVER_ARCHIVE_DIRS;
/**
* Interval for refreshing job archives
* Default: 10 seconds
*/
public static final ConfigOption<Duration> HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL;
/**
* Directory for caching extracted web files
*/
public static final ConfigOption<String> HISTORY_SERVER_WEB_DIR;
/**
* SSL keystore file path for HTTPS
*/
public static final ConfigOption<String> HISTORY_SERVER_WEB_SSL_KEYSTORE;
/**
* SSL keystore password
*/
public static final ConfigOption<String> HISTORY_SERVER_WEB_SSL_KEYSTORE_PASSWORD;
/**
* SSL key password
*/
public static final ConfigOption<String> HISTORY_SERVER_WEB_SSL_KEY_PASSWORD;
}Configuration Example:
# flink-conf.yaml
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://namenode:port/path/to/archives,file:///local/path/to/archives
historyserver.archive.fs.refresh-interval: 10s
historyserver.web.tmpdir: /tmp/flink-web-historyThe History Server provides the following REST endpoints for programmatic access:
GET /configReturns the server configuration including archive directories and refresh intervals.
Response Example:
{
"refresh-interval": 10000,
"archive-dirs": [
"/path/to/archives1",
"/path/to/archives2"
],
"timezone-name": "UTC",
"timezone-offset": 0
}GET /joboverviewReturns overview of all completed jobs available in the archives.
Response Example:
{
"jobs": [
{
"jid": "job-id-1",
"name": "My Flink Job",
"state": "FINISHED",
"start-time": 1609459200000,
"end-time": 1609462800000,
"duration": 3600000,
"last-modification": 1609462800000,
"tasks": {
"total": 4,
"created": 0,
"scheduled": 0,
"deploying": 0,
"running": 0,
"finished": 4,
"canceling": 0,
"canceled": 0,
"failed": 0,
"reconciling": 0
}
}
]
}GET /jobs/:jobid
GET /jobs/:jobid/config
GET /jobs/:jobid/exceptions
GET /jobs/:jobid/accumulators
GET /jobs/:jobid/vertices/:vertexid
GET /jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindexReturns detailed information about specific completed jobs including configuration, exceptions, metrics, and execution details.
# Start history server as standalone process
$FLINK_HOME/bin/historyserver.sh start
# Stop history server
$FLINK_HOME/bin/historyserver.sh stop
# Start with custom configuration
$FLINK_HOME/bin/historyserver.sh start-foreground \
--configDir /custom/flink/conf// Embed history server in application
public class MyFlinkMonitor {
private HistoryServer historyServer;
public void startMonitoring() throws Exception {
Configuration config = new Configuration();
config.setString(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, "8082");
config.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS,
"hdfs://namenode:9000/flink/completed-jobs");
historyServer = new HistoryServer(config);
historyServer.start();
System.out.println("History monitoring available at: http://localhost:" +
historyServer.getWebPort());
}
public void stopMonitoring() throws Exception {
if (historyServer != null) {
historyServer.stop();
}
}
}The History Server expects job archives in the following structure:
archive-directory/
├── job-id-1/
│ ├── job_graph.json
│ ├── job_config.json
│ ├── job_exceptions.json
│ └── web/
│ ├── index.html
│ └── assets/
└── job-id-2/
├── job_graph.json
└── ...The History Server provides internal counters for monitoring:
These can be accessed programmatically or through JMX when enabled.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-runtime-web