Web-based monitoring and management interface for Apache Flink with REST APIs and Angular dashboard.
—
Modern TypeScript dashboard providing real-time monitoring, job visualization, resource management, and developer tools for Apache Flink clusters. Built with Angular 18 and Ng-Zorro (Ant Design) components.
// Angular core
import { Component, Injectable, OnInit } from '@angular/core';
import { Router, ActivatedRoute } from '@angular/router';
import { HttpClient } from '@angular/common/http';
// Ng-Zorro components
import { NzTableModule } from 'ng-zorro-antd/table';
import { NzCardModule } from 'ng-zorro-antd/card';
import { NzButtonModule } from 'ng-zorro-antd/button';
// Dashboard services
import { JobService } from './services/job.service';
import { TaskManagerService } from './services/task-manager.service';
import { StatusService } from './services/status.service';// Component setup
@Component({
selector: 'flink-job-list',
template: `
<nz-table [nzData]="jobs" [nzLoading]="loading">
<thead>
<tr>
<th>Job Name</th>
<th>Status</th>
<th>Start Time</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
<tr *ngFor="let job of jobs">
<td>{{ job.name }}</td>
<td><flink-job-badge [status]="job.state"></flink-job-badge></td>
<td>{{ job['start-time'] | humanizeDate }}</td>
<td>
<button nz-button (click)="viewJob(job.jid)">View</button>
</td>
</tr>
</tbody>
</nz-table>
`
})
export class JobListComponent implements OnInit {
jobs: JobOverview[] = [];
loading = false;
constructor(private jobService: JobService, private router: Router) {}
ngOnInit() {
this.loadJobs();
}
loadJobs() {
this.loading = true;
this.jobService.getJobs().subscribe(jobs => {
this.jobs = jobs;
this.loading = false;
});
}
viewJob(jobId: string) {
this.router.navigate(['/job', jobId]);
}
}The Angular dashboard follows a modular architecture with:
// Route configuration
export const routes: Routes = [
{ path: '', redirectTo: '/overview', pathMatch: 'full' },
{ path: 'overview', loadComponent: () => import('./pages/overview/overview.component') },
{ path: 'submit', loadComponent: () => import('./pages/submit/submit.component') },
{ path: 'job-manager', loadComponent: () => import('./pages/job-manager/job-manager.component') },
{ path: 'task-manager', loadComponent: () => import('./pages/task-manager/task-manager.component') },
{ path: 'job', loadChildren: () => import('./pages/job/job.routes') }
];/**
* Root application component with navigation sidebar
*/
@Component({
selector: 'flink-root',
template: `
<nz-layout class="main-layout">
<nz-sider [nzCollapsed]="isCollapsed" [nzTrigger]="null">
<div class="sidebar-logo">
<img src="assets/flink-logo.svg" alt="Apache Flink">
</div>
<ul nz-menu nzMode="inline" [nzInlineCollapsed]="isCollapsed">
<li nz-menu-item routerLink="/overview">
<i nz-icon nzType="dashboard"></i>
<span>Overview</span>
</li>
<li nz-menu-item routerLink="/submit">
<i nz-icon nzType="upload"></i>
<span>Submit Job</span>
</li>
<!-- Additional menu items -->
</ul>
</nz-sider>
<nz-layout>
<nz-header>
<i nz-icon [nzType]="isCollapsed ? 'menu-unfold' : 'menu-fold'"
(click)="isCollapsed = !isCollapsed"></i>
</nz-header>
<nz-content>
<router-outlet></router-outlet>
</nz-content>
</nz-layout>
</nz-layout>
`
})
export class AppComponent {
isCollapsed = false;
}/**
* Comprehensive job operations and monitoring service
*/
@Injectable({
providedIn: 'root'
})
export class JobService {
constructor(private http: HttpClient) {}
/**
* Get list of all jobs (running and completed)
*/
getJobs(): Observable<JobOverview[]>;
/**
* Get detailed information for a specific job
*/
getJobDetail(jobId: string): Observable<JobDetail>;
/**
* Get job configuration
*/
getJobConfig(jobId: string): Observable<JobConfig>;
/**
* Get job execution plan/graph
*/
getJobPlan(jobId: string): Observable<Plan>;
/**
* Get job metrics and performance data
*/
getJobMetrics(jobId: string): Observable<JobMetrics>;
/**
* Get job checkpoints information
*/
getJobCheckpoints(jobId: string): Observable<JobCheckpoint>;
/**
* Get job exceptions and error information
*/
getJobExceptions(jobId: string): Observable<JobException>;
/**
* Get backpressure information for job
*/
getJobBackpressure(jobId: string): Observable<JobBackpressure>;
/**
* Get flame graph data for performance analysis
*/
getJobFlameGraph(jobId: string, vertexId: string): Observable<JobFlameGraph>;
/**
* Cancel a running job
*/
cancelJob(jobId: string, mode?: 'cancel' | 'stop'): Observable<void>;
/**
* Trigger savepoint for job
*/
triggerSavepoint(jobId: string, targetDirectory?: string): Observable<SavepointInfo>;
}/**
* Task Manager monitoring and management service
*/
@Injectable({
providedIn: 'root'
})
export class TaskManagerService {
constructor(private http: HttpClient) {}
/**
* Get list of all task managers
*/
getTaskManagers(): Observable<TaskManager[]>;
/**
* Get detailed task manager information
*/
getTaskManagerDetail(taskManagerId: string): Observable<TaskManagerDetail>;
/**
* Get task manager metrics
*/
getTaskManagerMetrics(taskManagerId: string): Observable<TaskManagerMetrics>;
/**
* Get task manager logs
*/
getTaskManagerLogs(taskManagerId: string, logType: 'out' | 'log'): Observable<string>;
/**
* Get thread dump for task manager
*/
getTaskManagerThreadDump(taskManagerId: string): Observable<ThreadDump>;
/**
* Get profiling information
*/
getTaskManagerProfile(taskManagerId: string): Observable<ProfilingResult>;
}/**
* Cluster overview and statistics service
*/
@Injectable({
providedIn: 'root'
})
export class OverviewService {
constructor(private http: HttpClient) {}
/**
* Get cluster overview with job and resource statistics
*/
getOverview(): Observable<Overview>;
/**
* Get cluster configuration information
*/
getConfiguration(): Observable<Configuration>;
}/**
* Application status, configuration, and error messaging service
*/
@Injectable({
providedIn: 'root'
})
export class StatusService {
constructor(private http: HttpClient) {}
/**
* Get application configuration and feature flags
*/
getConfig(): Observable<DashboardConfiguration>;
/**
* Handle and display error messages
*/
handleError(error: any): void;
/**
* Show success notification
*/
showSuccess(message: string): void;
/**
* Show error notification
*/
showError(message: string): void;
}/**
* Job overview information for listing
*/
export interface JobOverview {
jid: string;
name: string;
state: JobState;
'start-time': number;
'end-time': number;
duration: number;
'last-modification': number;
tasks: TaskStatusCounts;
}
/**
* Detailed job information
*/
export interface JobDetail extends JobOverview {
vertices: JobVertex[];
'status-counts': TaskStatusCounts;
plan: Plan;
timestamps: JobTimestamps;
}
/**
* Job vertex information for execution graph
*/
export interface JobVertex {
id: string;
name: string;
parallelism: number;
status: JobVertexState;
'start-time': number;
'end-time': number;
duration: number;
tasks: TaskStatusCounts;
metrics: VertexMetrics;
}
/**
* Job execution plan/graph
*/
export interface Plan {
jid: string;
name: string;
nodes: PlanNode[];
edges: PlanEdge[];
}
/**
* Job configuration
*/
export interface JobConfig {
jid: string;
name: string;
'execution-config': ExecutionConfig;
}/**
* Job metrics and performance data
*/
export interface JobMetrics {
vertices: VertexMetrics[];
watermarks: WatermarkInfo[];
throughput: ThroughputInfo[];
}
/**
* Job checkpoint information
*/
export interface JobCheckpoint {
counts: CheckpointCounts;
summary: CheckpointSummary;
history: CheckpointStatistics[];
latest: {
completed: CheckpointStatistics | null;
savepoint: CheckpointStatistics | null;
failed: CheckpointStatistics | null;
restored: CheckpointStatistics | null;
};
}
/**
* Job backpressure monitoring data
*/
export interface JobBackpressure {
status: 'OK' | 'LOW' | 'HIGH';
'backpressure-level': 'OK' | 'LOW' | 'HIGH';
'end-timestamp': number;
subtasks: SubtaskBackpressure[];
}
/**
* Flame graph data for performance analysis
*/
export interface JobFlameGraph {
data: FlameGraphNode[];
endTime: number;
maxStackDepth: number;
}/**
* Task Manager information and status
*/
export interface TaskManager {
id: string;
path: string;
dataPort: number;
jmxPort: number;
timeSinceLastHeartbeat: number;
slotsNumber: number;
freeSlots: number;
totalResource: ResourceProfile;
freeResource: ResourceProfile;
hardware: HardwareDescription;
memoryConfiguration: MemoryConfiguration;
}
/**
* Cluster overview statistics
*/
export interface Overview {
taskmanagers: number;
'slots-total': number;
'slots-available': number;
'jobs-running': number;
'jobs-finished': number;
'jobs-cancelled': number;
'jobs-failed': number;
'flink-version': string;
'flink-commit': string;
}Real-time cluster monitoring with key metrics and navigation.
/**
* Overview dashboard component showing cluster statistics
*/
@Component({
selector: 'flink-overview',
template: `
<div class="overview-cards">
<nz-card nzTitle="Task Managers">
<nz-statistic [nzValue]="overview?.taskmanagers" nzSuffix="running"></nz-statistic>
</nz-card>
<nz-card nzTitle="Available Slots">
<nz-statistic
[nzValue]="overview?.['slots-available']"
[nzTotal]="overview?.['slots-total']"
nzSuffix="/ {{ overview?.['slots-total'] }}">
</nz-statistic>
</nz-card>
<nz-card nzTitle="Running Jobs">
<nz-statistic
[nzValue]="overview?.['jobs-running']"
[nzValueStyle]="{ color: '#52c41a' }">
</nz-statistic>
</nz-card>
</div>
<flink-job-list [jobs]="runningJobs"></flink-job-list>
`
})
export class OverviewComponent implements OnInit {
overview: Overview | null = null;
runningJobs: JobOverview[] = [];
constructor(
private overviewService: OverviewService,
private jobService: JobService
) {}
ngOnInit() {
this.loadOverview();
this.loadRunningJobs();
}
}Interactive job execution graph with detailed metrics and monitoring.
/**
* Interactive job graph visualization component using D3/Dagre
*/
@Component({
selector: 'flink-dagre',
template: `
<div class="dagre-container" #dagreContainer>
<svg #dagreSvg></svg>
<div class="dagre-controls">
<button nz-button (click)="zoomIn()">Zoom In</button>
<button nz-button (click)="zoomOut()">Zoom Out</button>
<button nz-button (click)="resetZoom()">Reset</button>
</div>
</div>
`
})
export class DagreComponent implements OnInit, OnDestroy {
@Input() plan: Plan | null = null;
@Input() jobDetail: JobDetail | null = null;
private svg: d3.Selection<SVGElement, unknown, HTMLElement, any>;
private zoom: d3.ZoomBehavior<SVGElement, unknown>;
ngOnInit() {
this.initializeSvg();
this.setupZoom();
if (this.plan) {
this.renderGraph();
}
}
private renderGraph() {
// D3/Dagre graph rendering logic
const g = new dagre.graphlib.Graph();
g.setGraph({});
g.setDefaultEdgeLabel(() => ({}));
// Add nodes and edges from plan
this.plan?.nodes.forEach(node => {
g.setNode(node.id, {
label: node.description,
width: 200,
height: 50
});
});
this.plan?.edges.forEach(edge => {
g.setEdge(edge.source, edge.target);
});
dagre.layout(g);
this.drawGraph(g);
}
}Comprehensive performance analysis with flame graphs and metrics visualization.
/**
* Flame graph component for performance analysis
*/
@Component({
selector: 'flink-flame-graph',
template: `
<div class="flame-graph-container" #flameGraphContainer>
<div class="flame-graph-controls">
<nz-select [(ngModel)]="selectedVertex" (ngModelChange)="loadFlameGraph()">
<nz-option *ngFor="let vertex of vertices"
[nzValue]="vertex.id"
[nzLabel]="vertex.name">
</nz-option>
</nz-select>
<button nz-button (click)="refreshFlameGraph()">Refresh</button>
</div>
<div #flameGraph class="flame-graph"></div>
</div>
`
})
export class FlameGraphComponent implements OnInit {
@Input() jobId: string = '';
@Input() vertices: JobVertex[] = [];
selectedVertex: string = '';
flameGraphData: JobFlameGraph | null = null;
constructor(private jobService: JobService) {}
ngOnInit() {
if (this.vertices.length > 0) {
this.selectedVertex = this.vertices[0].id;
this.loadFlameGraph();
}
}
loadFlameGraph() {
if (this.jobId && this.selectedVertex) {
this.jobService.getJobFlameGraph(this.jobId, this.selectedVertex)
.subscribe(data => {
this.flameGraphData = data;
this.renderFlameGraph();
});
}
}
private renderFlameGraph() {
// D3 flame graph rendering using d3-flame-graph library
const flameGraph = flamegraph()
.width(960)
.cellHeight(18)
.transitionDuration(750)
.minFrameSize(5)
.transitionEase(d3.easeCubic)
.sort(true)
.title('')
.onClick((d: any) => console.info('clicked:', d));
d3.select(this.flameGraphContainer.nativeElement)
.select('.flame-graph')
.datum(this.flameGraphData?.data)
.call(flameGraph);
}
}Task Manager and Job Manager monitoring with detailed metrics and logs.
/**
* Task Manager detail component with metrics and management
*/
@Component({
selector: 'flink-task-manager-detail',
template: `
<nz-card nzTitle="Task Manager Details">
<nz-descriptions nzBordered>
<nz-descriptions-item nzTitle="ID">{{ taskManager?.id }}</nz-descriptions-item>
<nz-descriptions-item nzTitle="Address">{{ taskManager?.path }}</nz-descriptions-item>
<nz-descriptions-item nzTitle="Slots">
{{ taskManager?.freeSlots }} / {{ taskManager?.slotsNumber }}
</nz-descriptions-item>
<nz-descriptions-item nzTitle="Memory">
{{ taskManager?.hardware?.managedMemory | humanizeBytes }}
</nz-descriptions-item>
</nz-descriptions>
</nz-card>
<nz-tabset>
<nz-tab nzTitle="Metrics">
<flink-task-manager-metrics [taskManagerId]="taskManagerId"></flink-task-manager-metrics>
</nz-tab>
<nz-tab nzTitle="Logs">
<flink-log-viewer
[logUrl]="getLogUrl('log')"
[taskManagerId]="taskManagerId">
</flink-log-viewer>
</nz-tab>
<nz-tab nzTitle="Thread Dump">
<flink-thread-dump [taskManagerId]="taskManagerId"></flink-thread-dump>
</nz-tab>
</nz-tabset>
`
})
export class TaskManagerDetailComponent implements OnInit {
@Input() taskManagerId: string = '';
taskManager: TaskManagerDetail | null = null;
constructor(private taskManagerService: TaskManagerService) {}
ngOnInit() {
this.loadTaskManagerDetail();
}
loadTaskManagerDetail() {
this.taskManagerService.getTaskManagerDetail(this.taskManagerId)
.subscribe(tm => this.taskManager = tm);
}
getLogUrl(type: 'log' | 'out'): string {
return `/taskmanagers/${this.taskManagerId}/logs/${type}`;
}
}/**
* Humanize date pipe for readable timestamps
*/
@Pipe({ name: 'humanizeDate' })
export class HumanizeDatePipe implements PipeTransform {
transform(value: number): string {
if (!value) return '-';
return new Date(value).toLocaleString();
}
}
/**
* Humanize duration pipe for time intervals
*/
@Pipe({ name: 'humanizeDuration' })
export class HumanizeDurationPipe implements PipeTransform {
transform(value: number): string {
if (!value) return '-';
const seconds = Math.floor(value / 1000);
const minutes = Math.floor(seconds / 60);
const hours = Math.floor(minutes / 60);
if (hours > 0) return `${hours}h ${minutes % 60}m`;
if (minutes > 0) return `${minutes}m ${seconds % 60}s`;
return `${seconds}s`;
}
}
/**
* Humanize bytes pipe for memory and storage sizes
*/
@Pipe({ name: 'humanizeBytes' })
export class HumanizeBytesPipe implements PipeTransform {
transform(bytes: number): string {
if (!bytes) return '0 B';
const k = 1024;
const sizes = ['B', 'KB', 'MB', 'GB', 'TB'];
const i = Math.floor(Math.log(bytes) / Math.log(k));
return parseFloat((bytes / Math.pow(k, i)).toFixed(2)) + ' ' + sizes[i];
}
}/**
* Job status badge component
*/
@Component({
selector: 'flink-job-badge',
template: `
<nz-badge
[nzStatus]="getBadgeStatus()"
[nzText]="status">
</nz-badge>
`
})
export class JobBadgeComponent {
@Input() status: JobState = 'INITIALIZING';
getBadgeStatus(): 'success' | 'processing' | 'error' | 'warning' | 'default' {
switch (this.status) {
case 'RUNNING': return 'processing';
case 'FINISHED': return 'success';
case 'FAILED': return 'error';
case 'CANCELED': return 'warning';
default: return 'default';
}
}
}The Angular dashboard provides a comprehensive, modern web interface for Apache Flink monitoring and management with real-time updates, interactive visualizations, and extensive developer tools for troubleshooting and performance analysis.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-runtime-web