CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-runtime-web

Web-based monitoring and management interface for Apache Flink with REST APIs and Angular dashboard.

Pending
Overview
Eval results
Files

angular-dashboard.mddocs/

Angular Web Dashboard

Modern TypeScript dashboard providing real-time monitoring, job visualization, resource management, and developer tools for Apache Flink clusters. Built with Angular 18 and Ng-Zorro (Ant Design) components.

Package Information

  • Framework: Angular 18
  • Language: TypeScript
  • UI Library: Ng-Zorro (Ant Design)
  • Build Tool: Angular CLI
  • Visualization: D3.js, G2, Dagre for charts and graphs

Core Imports

// Angular core
import { Component, Injectable, OnInit } from '@angular/core';
import { Router, ActivatedRoute } from '@angular/router';
import { HttpClient } from '@angular/common/http';

// Ng-Zorro components
import { NzTableModule } from 'ng-zorro-antd/table';
import { NzCardModule } from 'ng-zorro-antd/card';
import { NzButtonModule } from 'ng-zorro-antd/button';

// Dashboard services
import { JobService } from './services/job.service';
import { TaskManagerService } from './services/task-manager.service';
import { StatusService } from './services/status.service';

Basic Usage

// Component setup
@Component({
  selector: 'flink-job-list',
  template: `
    <nz-table [nzData]="jobs" [nzLoading]="loading">
      <thead>
        <tr>
          <th>Job Name</th>
          <th>Status</th>
          <th>Start Time</th>
          <th>Actions</th>
        </tr>
      </thead>
      <tbody>
        <tr *ngFor="let job of jobs">
          <td>{{ job.name }}</td>
          <td><flink-job-badge [status]="job.state"></flink-job-badge></td>
          <td>{{ job['start-time'] | humanizeDate }}</td>
          <td>
            <button nz-button (click)="viewJob(job.jid)">View</button>
          </td>
        </tr>
      </tbody>
    </nz-table>
  `
})
export class JobListComponent implements OnInit {
  jobs: JobOverview[] = [];
  loading = false;

  constructor(private jobService: JobService, private router: Router) {}

  ngOnInit() {
    this.loadJobs();
  }

  loadJobs() {
    this.loading = true;
    this.jobService.getJobs().subscribe(jobs => {
      this.jobs = jobs;
      this.loading = false;
    });
  }

  viewJob(jobId: string) {
    this.router.navigate(['/job', jobId]);
  }
}

Architecture

The Angular dashboard follows a modular architecture with:

  • Standalone Components: Modern Angular approach with individual component modules
  • Service Layer: Injectable services for API communication and state management
  • Route-based Lazy Loading: Feature modules loaded on demand for performance
  • Type-safe Models: Complete TypeScript interfaces for all data structures
  • Reactive Programming: RxJS observables for async data handling

Application Structure

Main Routes

// Route configuration
export const routes: Routes = [
  { path: '', redirectTo: '/overview', pathMatch: 'full' },
  { path: 'overview', loadComponent: () => import('./pages/overview/overview.component') },
  { path: 'submit', loadComponent: () => import('./pages/submit/submit.component') },
  { path: 'job-manager', loadComponent: () => import('./pages/job-manager/job-manager.component') },
  { path: 'task-manager', loadComponent: () => import('./pages/task-manager/task-manager.component') },
  { path: 'job', loadChildren: () => import('./pages/job/job.routes') }
];

Core Layout

/**
 * Root application component with navigation sidebar
 */
@Component({
  selector: 'flink-root',
  template: `
    <nz-layout class="main-layout">
      <nz-sider [nzCollapsed]="isCollapsed" [nzTrigger]="null">
        <div class="sidebar-logo">
          <img src="assets/flink-logo.svg" alt="Apache Flink">
        </div>
        <ul nz-menu nzMode="inline" [nzInlineCollapsed]="isCollapsed">
          <li nz-menu-item routerLink="/overview">
            <i nz-icon nzType="dashboard"></i>
            <span>Overview</span>
          </li>
          <li nz-menu-item routerLink="/submit">
            <i nz-icon nzType="upload"></i>
            <span>Submit Job</span>
          </li>
          <!-- Additional menu items -->
        </ul>
      </nz-sider>
      <nz-layout>
        <nz-header>
          <i nz-icon [nzType]="isCollapsed ? 'menu-unfold' : 'menu-fold'"
             (click)="isCollapsed = !isCollapsed"></i>
        </nz-header>
        <nz-content>
          <router-outlet></router-outlet>
        </nz-content>
      </nz-layout>
    </nz-layout>
  `
})
export class AppComponent {
  isCollapsed = false;
}

Services and API Communication

Core Services

JobService

/**
 * Comprehensive job operations and monitoring service
 */
@Injectable({
  providedIn: 'root'
})
export class JobService {
  constructor(private http: HttpClient) {}

  /**
   * Get list of all jobs (running and completed)
   */
  getJobs(): Observable<JobOverview[]>;

  /**
   * Get detailed information for a specific job
   */
  getJobDetail(jobId: string): Observable<JobDetail>;

  /**
   * Get job configuration
   */
  getJobConfig(jobId: string): Observable<JobConfig>;

  /**
   * Get job execution plan/graph
   */
  getJobPlan(jobId: string): Observable<Plan>;

  /**
   * Get job metrics and performance data
   */
  getJobMetrics(jobId: string): Observable<JobMetrics>;

  /**
   * Get job checkpoints information
   */
  getJobCheckpoints(jobId: string): Observable<JobCheckpoint>;

  /**
   * Get job exceptions and error information
   */
  getJobExceptions(jobId: string): Observable<JobException>;

  /**
   * Get backpressure information for job
   */
  getJobBackpressure(jobId: string): Observable<JobBackpressure>;

  /**
   * Get flame graph data for performance analysis
   */
  getJobFlameGraph(jobId: string, vertexId: string): Observable<JobFlameGraph>;

  /**
   * Cancel a running job
   */
  cancelJob(jobId: string, mode?: 'cancel' | 'stop'): Observable<void>;

  /**
   * Trigger savepoint for job
   */
  triggerSavepoint(jobId: string, targetDirectory?: string): Observable<SavepointInfo>;
}

TaskManagerService

/**
 * Task Manager monitoring and management service
 */
@Injectable({
  providedIn: 'root'
})
export class TaskManagerService {
  constructor(private http: HttpClient) {}

  /**
   * Get list of all task managers
   */
  getTaskManagers(): Observable<TaskManager[]>;

  /**
   * Get detailed task manager information
   */
  getTaskManagerDetail(taskManagerId: string): Observable<TaskManagerDetail>;

  /**
   * Get task manager metrics
   */
  getTaskManagerMetrics(taskManagerId: string): Observable<TaskManagerMetrics>;

  /**
   * Get task manager logs
   */
  getTaskManagerLogs(taskManagerId: string, logType: 'out' | 'log'): Observable<string>;

  /**
   * Get thread dump for task manager
   */
  getTaskManagerThreadDump(taskManagerId: string): Observable<ThreadDump>;

  /**
   * Get profiling information
   */
  getTaskManagerProfile(taskManagerId: string): Observable<ProfilingResult>;
}

OverviewService

/**
 * Cluster overview and statistics service
 */
@Injectable({
  providedIn: 'root'
})
export class OverviewService {
  constructor(private http: HttpClient) {}

  /**
   * Get cluster overview with job and resource statistics
   */
  getOverview(): Observable<Overview>;

  /**
   * Get cluster configuration information
   */
  getConfiguration(): Observable<Configuration>;
}

StatusService

/**
 * Application status, configuration, and error messaging service
 */
@Injectable({
  providedIn: 'root'
})
export class StatusService {
  constructor(private http: HttpClient) {}

  /**
   * Get application configuration and feature flags
   */
  getConfig(): Observable<DashboardConfiguration>;

  /**
   * Handle and display error messages
   */
  handleError(error: any): void;

  /**
   * Show success notification
   */
  showSuccess(message: string): void;

  /**
   * Show error notification
   */
  showError(message: string): void;
}

Data Models and Interfaces

Job Models

/**
 * Job overview information for listing
 */
export interface JobOverview {
  jid: string;
  name: string;
  state: JobState;
  'start-time': number;
  'end-time': number;
  duration: number;
  'last-modification': number;
  tasks: TaskStatusCounts;
}

/**
 * Detailed job information
 */
export interface JobDetail extends JobOverview {
  vertices: JobVertex[];
  'status-counts': TaskStatusCounts;
  plan: Plan;
  timestamps: JobTimestamps;
}

/**
 * Job vertex information for execution graph
 */
export interface JobVertex {
  id: string;
  name: string;
  parallelism: number;
  status: JobVertexState;
  'start-time': number;
  'end-time': number;
  duration: number;
  tasks: TaskStatusCounts;
  metrics: VertexMetrics;
}

/**
 * Job execution plan/graph
 */
export interface Plan {
  jid: string;
  name: string;
  nodes: PlanNode[];
  edges: PlanEdge[];
}

/**
 * Job configuration
 */
export interface JobConfig {
  jid: string;
  name: string;
  'execution-config': ExecutionConfig;
}

Monitoring Models

/**
 * Job metrics and performance data
 */
export interface JobMetrics {
  vertices: VertexMetrics[];
  watermarks: WatermarkInfo[];
  throughput: ThroughputInfo[];
}

/**
 * Job checkpoint information
 */
export interface JobCheckpoint {
  counts: CheckpointCounts;
  summary: CheckpointSummary;
  history: CheckpointStatistics[];
  latest: {
    completed: CheckpointStatistics | null;
    savepoint: CheckpointStatistics | null;
    failed: CheckpointStatistics | null;
    restored: CheckpointStatistics | null;
  };
}

/**
 * Job backpressure monitoring data
 */
export interface JobBackpressure {
  status: 'OK' | 'LOW' | 'HIGH';
  'backpressure-level': 'OK' | 'LOW' | 'HIGH';
  'end-timestamp': number;
  subtasks: SubtaskBackpressure[];
}

/**
 * Flame graph data for performance analysis
 */
export interface JobFlameGraph {
  data: FlameGraphNode[];
  endTime: number;
  maxStackDepth: number;
}

Resource Models

/**
 * Task Manager information and status
 */
export interface TaskManager {
  id: string;
  path: string;
  dataPort: number;
  jmxPort: number;
  timeSinceLastHeartbeat: number;
  slotsNumber: number;
  freeSlots: number;
  totalResource: ResourceProfile;
  freeResource: ResourceProfile;
  hardware: HardwareDescription;
  memoryConfiguration: MemoryConfiguration;
}

/**
 * Cluster overview statistics
 */
export interface Overview {
  taskmanagers: number;
  'slots-total': number;
  'slots-available': number;
  'jobs-running': number;
  'jobs-finished': number;
  'jobs-cancelled': number;
  'jobs-failed': number;
  'flink-version': string;
  'flink-commit': string;
}

Major Dashboard Features

1. Cluster Overview Dashboard

Real-time cluster monitoring with key metrics and navigation.

/**
 * Overview dashboard component showing cluster statistics
 */
@Component({
  selector: 'flink-overview',
  template: `
    <div class="overview-cards">
      <nz-card nzTitle="Task Managers">
        <nz-statistic [nzValue]="overview?.taskmanagers" nzSuffix="running"></nz-statistic>
      </nz-card>
      <nz-card nzTitle="Available Slots">
        <nz-statistic 
          [nzValue]="overview?.['slots-available']" 
          [nzTotal]="overview?.['slots-total']"
          nzSuffix="/ {{ overview?.['slots-total'] }}">
        </nz-statistic>
      </nz-card>
      <nz-card nzTitle="Running Jobs">
        <nz-statistic 
          [nzValue]="overview?.['jobs-running']" 
          [nzValueStyle]="{ color: '#52c41a' }">
        </nz-statistic>
      </nz-card>
    </div>
    
    <flink-job-list [jobs]="runningJobs"></flink-job-list>
  `
})
export class OverviewComponent implements OnInit {
  overview: Overview | null = null;
  runningJobs: JobOverview[] = [];

  constructor(
    private overviewService: OverviewService,
    private jobService: JobService
  ) {}

  ngOnInit() {
    this.loadOverview();
    this.loadRunningJobs();
  }
}

2. Job Visualization and Monitoring

Interactive job execution graph with detailed metrics and monitoring.

/**
 * Interactive job graph visualization component using D3/Dagre
 */
@Component({
  selector: 'flink-dagre',
  template: `
    <div class="dagre-container" #dagreContainer>
      <svg #dagreSvg></svg>
      <div class="dagre-controls">
        <button nz-button (click)="zoomIn()">Zoom In</button>
        <button nz-button (click)="zoomOut()">Zoom Out</button>
        <button nz-button (click)="resetZoom()">Reset</button>
      </div>
    </div>
  `
})
export class DagreComponent implements OnInit, OnDestroy {
  @Input() plan: Plan | null = null;
  @Input() jobDetail: JobDetail | null = null;
  
  private svg: d3.Selection<SVGElement, unknown, HTMLElement, any>;
  private zoom: d3.ZoomBehavior<SVGElement, unknown>;

  ngOnInit() {
    this.initializeSvg();
    this.setupZoom();
    if (this.plan) {
      this.renderGraph();
    }
  }

  private renderGraph() {
    // D3/Dagre graph rendering logic
    const g = new dagre.graphlib.Graph();
    g.setGraph({});
    g.setDefaultEdgeLabel(() => ({}));

    // Add nodes and edges from plan
    this.plan?.nodes.forEach(node => {
      g.setNode(node.id, {
        label: node.description,
        width: 200,
        height: 50
      });
    });

    this.plan?.edges.forEach(edge => {
      g.setEdge(edge.source, edge.target);
    });

    dagre.layout(g);
    this.drawGraph(g);
  }
}

3. Performance Monitoring

Comprehensive performance analysis with flame graphs and metrics visualization.

/**
 * Flame graph component for performance analysis
 */
@Component({
  selector: 'flink-flame-graph',
  template: `
    <div class="flame-graph-container" #flameGraphContainer>
      <div class="flame-graph-controls">
        <nz-select [(ngModel)]="selectedVertex" (ngModelChange)="loadFlameGraph()">
          <nz-option *ngFor="let vertex of vertices" 
                     [nzValue]="vertex.id" 
                     [nzLabel]="vertex.name">
          </nz-option>
        </nz-select>
        <button nz-button (click)="refreshFlameGraph()">Refresh</button>
      </div>
      <div #flameGraph class="flame-graph"></div>
    </div>
  `
})
export class FlameGraphComponent implements OnInit {
  @Input() jobId: string = '';
  @Input() vertices: JobVertex[] = [];
  
  selectedVertex: string = '';
  flameGraphData: JobFlameGraph | null = null;

  constructor(private jobService: JobService) {}

  ngOnInit() {
    if (this.vertices.length > 0) {
      this.selectedVertex = this.vertices[0].id;
      this.loadFlameGraph();
    }
  }

  loadFlameGraph() {
    if (this.jobId && this.selectedVertex) {
      this.jobService.getJobFlameGraph(this.jobId, this.selectedVertex)
        .subscribe(data => {
          this.flameGraphData = data;
          this.renderFlameGraph();
        });
    }
  }

  private renderFlameGraph() {
    // D3 flame graph rendering using d3-flame-graph library
    const flameGraph = flamegraph()
      .width(960)
      .cellHeight(18)
      .transitionDuration(750)
      .minFrameSize(5)
      .transitionEase(d3.easeCubic)
      .sort(true)
      .title('')
      .onClick((d: any) => console.info('clicked:', d));

    d3.select(this.flameGraphContainer.nativeElement)
      .select('.flame-graph')
      .datum(this.flameGraphData?.data)
      .call(flameGraph);
  }
}

4. Resource Management

Task Manager and Job Manager monitoring with detailed metrics and logs.

/**
 * Task Manager detail component with metrics and management
 */
@Component({
  selector: 'flink-task-manager-detail',
  template: `
    <nz-card nzTitle="Task Manager Details">
      <nz-descriptions nzBordered>
        <nz-descriptions-item nzTitle="ID">{{ taskManager?.id }}</nz-descriptions-item>
        <nz-descriptions-item nzTitle="Address">{{ taskManager?.path }}</nz-descriptions-item>
        <nz-descriptions-item nzTitle="Slots">
          {{ taskManager?.freeSlots }} / {{ taskManager?.slotsNumber }}
        </nz-descriptions-item>
        <nz-descriptions-item nzTitle="Memory">
          {{ taskManager?.hardware?.managedMemory | humanizeBytes }}
        </nz-descriptions-item>
      </nz-descriptions>
    </nz-card>

    <nz-tabset>
      <nz-tab nzTitle="Metrics">
        <flink-task-manager-metrics [taskManagerId]="taskManagerId"></flink-task-manager-metrics>
      </nz-tab>
      <nz-tab nzTitle="Logs">
        <flink-log-viewer 
          [logUrl]="getLogUrl('log')"
          [taskManagerId]="taskManagerId">
        </flink-log-viewer>
      </nz-tab>
      <nz-tab nzTitle="Thread Dump">
        <flink-thread-dump [taskManagerId]="taskManagerId"></flink-thread-dump>
      </nz-tab>
    </nz-tabset>
  `
})
export class TaskManagerDetailComponent implements OnInit {
  @Input() taskManagerId: string = '';
  
  taskManager: TaskManagerDetail | null = null;

  constructor(private taskManagerService: TaskManagerService) {}

  ngOnInit() {
    this.loadTaskManagerDetail();
  }

  loadTaskManagerDetail() {
    this.taskManagerService.getTaskManagerDetail(this.taskManagerId)
      .subscribe(tm => this.taskManager = tm);
  }

  getLogUrl(type: 'log' | 'out'): string {
    return `/taskmanagers/${this.taskManagerId}/logs/${type}`;
  }
}

Shared Utilities and Pipes

Custom Pipes for Data Formatting

/**
 * Humanize date pipe for readable timestamps
 */
@Pipe({ name: 'humanizeDate' })
export class HumanizeDatePipe implements PipeTransform {
  transform(value: number): string {
    if (!value) return '-';
    return new Date(value).toLocaleString();
  }
}

/**
 * Humanize duration pipe for time intervals
 */
@Pipe({ name: 'humanizeDuration' })
export class HumanizeDurationPipe implements PipeTransform {
  transform(value: number): string {
    if (!value) return '-';
    const seconds = Math.floor(value / 1000);
    const minutes = Math.floor(seconds / 60);
    const hours = Math.floor(minutes / 60);
    
    if (hours > 0) return `${hours}h ${minutes % 60}m`;
    if (minutes > 0) return `${minutes}m ${seconds % 60}s`;
    return `${seconds}s`;
  }
}

/**
 * Humanize bytes pipe for memory and storage sizes
 */
@Pipe({ name: 'humanizeBytes' })
export class HumanizeBytesPipe implements PipeTransform {
  transform(bytes: number): string {
    if (!bytes) return '0 B';
    const k = 1024;
    const sizes = ['B', 'KB', 'MB', 'GB', 'TB'];
    const i = Math.floor(Math.log(bytes) / Math.log(k));
    return parseFloat((bytes / Math.pow(k, i)).toFixed(2)) + ' ' + sizes[i];
  }
}

Badge Components for Status Indicators

/**
 * Job status badge component
 */
@Component({
  selector: 'flink-job-badge',
  template: `
    <nz-badge 
      [nzStatus]="getBadgeStatus()" 
      [nzText]="status">
    </nz-badge>
  `
})
export class JobBadgeComponent {
  @Input() status: JobState = 'INITIALIZING';

  getBadgeStatus(): 'success' | 'processing' | 'error' | 'warning' | 'default' {
    switch (this.status) {
      case 'RUNNING': return 'processing';
      case 'FINISHED': return 'success';
      case 'FAILED': return 'error';
      case 'CANCELED': return 'warning';
      default: return 'default';
    }
  }
}

The Angular dashboard provides a comprehensive, modern web interface for Apache Flink monitoring and management with real-time updates, interactive visualizations, and extensive developer tools for troubleshooting and performance analysis.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-runtime-web

docs

angular-dashboard.md

dto.md

history-server.md

index.md

jar-management.md

web-server.md

tile.json