or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-streaming.mdindex.mdinput-sources.mdjava-api.mdmonitoring-listeners.mdstate-management.mdweb-ui.md
tile.json

web-ui.mddocs/

Web UI

Built-in web interface for visualizing streaming application metrics, batch processing status, receiver information, and performance analytics through an integrated dashboard.

Capabilities

StreamingTab

Main web UI tab integration with Spark's web interface providing streaming-specific dashboard and navigation.

/**
 * Web UI tab for streaming applications
 * Integrates with Spark's main web UI
 */
class StreamingTab(parent: SparkUI) extends SparkUITab(parent, "streaming") {
  
  /** Get the main streaming page */
  def streamingPage: StreamingPage
  
  /** Get batch detail pages */
  def batchPage: BatchPage
  
  /** Attach streaming tab to Spark UI */
  def attachTab(): Unit
  
  /** Detach streaming tab from Spark UI */
  def detachTab(): Unit
}

StreamingPage

Main streaming dashboard page displaying overall application metrics, active receivers, and batch processing statistics.

/**
 * Main streaming web UI page showing application overview
 */
class StreamingPage(parent: StreamingTab) extends WebUIPage("") {
  
  /** Render the streaming dashboard HTML */
  def render(request: HttpServletRequest): Seq[Node]
  
  /** Get streaming statistics for display */
  def streamingStatistics: StreamingStatistics
  
  /** Get current receiver information */
  def receiverInfo: Seq[ReceiverInfo]
  
  /** Get recent batch information */
  def recentBatches: Seq[BatchInfo]
}

BatchPage

Detailed page for individual batch analysis showing input sources, processing stages, and output operations.

/**
 * Web UI page for detailed batch information
 */
class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
  
  /** Render batch detail page */
  def render(request: HttpServletRequest): Seq[Node]
  
  /** Get detailed batch information by batch time */
  def getBatchInfo(batchTime: Time): Option[BatchInfo]
  
  /** Get input stream details for batch */
  def getInputStreamInfo(batchTime: Time): Map[Int, StreamInputInfo]
  
  /** Get output operation details for batch */
  def getOutputOperationInfo(batchTime: Time): Map[Int, OutputOperationInfo]
}

StreamingJobProgressListener

Core listener that collects streaming metrics and provides data for the web UI dashboard.

/**
 * Listener that tracks streaming job progress for web UI
 * Automatically added when streaming tab is enabled
 */
class StreamingJobProgressListener(conf: SparkConf) extends StreamingListener {
  
  // Data retention settings
  /** Maximum number of batches to retain */
  def retainedBatches: Int
  
  /** Maximum number of completed batches to show */
  def numBatchInfos: Int
  
  // Batch information access
  /** Get all retained batch information */
  def batchInfos: Seq[BatchInfo]
  
  /** Get batch info by time */
  def getBatchInfo(batchTime: Time): Option[BatchInfo]
  
  /** Get last completed batch */
  def lastCompletedBatch: Option[BatchInfo]
  
  /** Get currently processing batch */
  def processingBatch: Option[BatchInfo]
  
  // Receiver information
  /** Get all receiver information */
  def receiverInfos: Map[Int, ReceiverInfo]
  
  /** Get receiver info by stream ID */
  def getReceiverInfo(streamId: Int): Option[ReceiverInfo]
  
  // Statistics computation
  /** Get average processing delay */
  def avgProcessingDelay: Option[Double]
  
  /** Get average scheduling delay */
  def avgSchedulingDelay: Option[Double]
  
  /** Get total processed records */
  def totalProcessedRecords: Long
  
  /** Get processing rate (records/second) */
  def processingRate: Double
  
  // Event handling (inherited from StreamingListener)
  override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit
  override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit  
  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit
  override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit
  override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit
  override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit
}

Batch UI Data

Data structures used by the web UI to display batch and streaming information.

/**
 * UI data structure for batch display
 */
case class BatchUIData(
  batchInfo: BatchInfo,
  streamIdToInputInfo: Map[Int, StreamInputInfo],
  outputOperations: Seq[OutputOperationUIData]
)

/**
 * UI data structure for output operations
 */  
case class OutputOperationUIData(
  id: Int,
  name: String,
  description: String,
  startTime: Option[Long],
  endTime: Option[Long],
  failureReason: Option[String]
)

/**
 * UI data structure for receiver display
 */
case class ReceiverUIData(
  streamId: Int,
  name: String,
  active: Boolean,
  location: String,
  lastErrorMessage: String,
  lastErrorTime: Long
)

UI Tables and Components

Reusable UI components for displaying streaming data in tabular format.

/**
 * Table component for displaying all batches
 */
class AllBatchesTable(
  batches: Seq[BatchUIData],
  streaming: Boolean = true
) {
  
  /** Render table as HTML */
  def toHtmlTable: Seq[Node]
  
  /** Generate table headers */
  def headers: Seq[String]
  
  /** Generate table rows */
  def rows: Seq[Seq[Node]]
}

/**
 * Table component for displaying receivers
 */
class ReceiversTable(receivers: Seq[ReceiverUIData]) {
  
  /** Render receiver table */  
  def toHtmlTable: Seq[Node]
}

/**
 * Component for displaying streaming statistics
 */
class StreamingStatisticsTable(stats: StreamingStatistics) {
  
  /** Render statistics */
  def toHtmlTable: Seq[Node]
}

UI Utility Functions

Helper functions for formatting and displaying streaming data in the web interface.

/**
 * Utility functions for streaming UI
 */
object UIUtils {
  
  /** Format duration for display */
  def formatDuration(milliseconds: Long): String
  
  /** Format timestamp for display */
  def formatDate(timestamp: Long): String
  
  /** Format batch time for display */
  def formatBatchTime(batchTime: Time): String
  
  /** Generate progress bar HTML */
  def progressBar(
    completed: Int,
    failed: Int, 
    total: Int
  ): Seq[Node]
  
  /** Generate batch status badge */
  def batchStatusBadge(batchInfo: BatchInfo): Seq[Node]
  
  /** Generate receiver status indicator */
  def receiverStatusIndicator(receiverInfo: ReceiverInfo): Seq[Node]
  
  /** Format rate for display (records/sec) */
  def formatRate(recordsPerSecond: Double): String
  
  /** Format byte size for display */
  def formatBytes(bytes: Long): String
}

Web UI Features

Dashboard Overview

The streaming web UI provides a comprehensive dashboard with:

  • Application Summary: Current status, uptime, total batches processed
  • Batch Timeline: Visual timeline of batch processing with status indicators
  • Performance Metrics: Processing delays, scheduling delays, throughput rates
  • Receiver Status: Active receivers, error states, data ingestion rates
  • Resource Utilization: Memory usage, CPU metrics, executor information

Batch Details

Detailed view for each batch includes:

  • Timing Information: Submission, start, and completion times
  • Input Sources: Data volume and sources for each input stream
  • Processing Stages: Breakdown of computation stages and dependencies
  • Output Operations: Status and performance of each output operation
  • Error Information: Detailed error messages and stack traces for failures

Interactive Features

  • Real-time Updates: Dashboard refreshes automatically to show current status
  • Historical Data: Browse historical batches and performance trends
  • Filtering: Filter batches by status, time range, or specific criteria
  • Drill-down Navigation: Click through from overview to detailed views
  • Export Capabilities: Download metrics data for external analysis

Usage Examples

Accessing the Web UI

// The streaming web UI is automatically available when you create a StreamingContext
val ssc = new StreamingContext(conf, Seconds(2))

// Access the web UI at: http://<driver-host>:4040/streaming/
// The port is configurable via spark.ui.port (default 4040)

Programmatic Access to UI Data

// Get the streaming job progress listener
val streamingListener = ssc.progressListener

// Access batch information
val recentBatches = streamingListener.batchInfos.take(10)
recentBatches.foreach { batch =>
  println(s"Batch ${batch.batchTime}: " +
         s"processing=${batch.processingDelay}ms, " +
         s"records=${batch.streamIdToInputInfo.values.map(_.numRecords).sum}")
}

// Access receiver information
streamingListener.receiverInfos.foreach { case (streamId, receiverInfo) =>
  println(s"Stream $streamId: ${receiverInfo.name} - Active: ${receiverInfo.active}")
}

// Get performance statistics
val avgProcessing = streamingListener.avgProcessingDelay.getOrElse(0.0)
val avgScheduling = streamingListener.avgSchedulingDelay.getOrElse(0.0)
println(s"Average delays - Processing: ${avgProcessing}ms, Scheduling: ${avgScheduling}ms")

Custom UI Integration

// Add custom listener that integrates with external monitoring
class CustomUIListener extends StreamingListener {
  
  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    val batch = batchCompleted.batchInfo
    
    // Send data to external dashboard
    sendToCustomDashboard(
      batchTime = batch.batchTime,
      processingDelay = batch.processingDelay,
      inputRecords = batch.streamIdToInputInfo.values.map(_.numRecords).sum
    )
  }
  
  private def sendToCustomDashboard(
    batchTime: Time, 
    processingDelay: Option[Long], 
    inputRecords: Long
  ): Unit = {
    // Implementation for custom dashboard integration
  }
}

ssc.addStreamingListener(new CustomUIListener())

Configuration

UI Settings

Key configuration options for the streaming web UI:

val conf = new SparkConf()
  .set("spark.ui.port", "4041")  // Change web UI port
  .set("spark.streaming.ui.retainedBatches", "1000") // Number of batches to retain
  .set("spark.ui.retainedJobs", "1000")  // Spark jobs to retain
  .set("spark.ui.enabled", "true")  // Enable web UI

Memory Management

The web UI listener retains batch information in memory. Configure retention limits to manage memory usage:

// Limit number of retained batches to control memory
val conf = new SparkConf()
  .set("spark.streaming.ui.retainedBatches", "100")  // Retain last 100 batches

Security

Configure security settings for production deployments:

val conf = new SparkConf()
  .set("spark.ui.filters", "org.apache.spark.deploy.yarn.YarnProxyRedirectFilter")
  .set("spark.authenticate", "true")
  .set("spark.authenticate.secret", "secret-key")

REST API

REST API endpoints for programmatic access to streaming metrics and application status.

REST API Classes

Classes providing HTTP endpoints for streaming application data.

/**
 * Root resource for streaming REST API endpoints
 */
class ApiStreamingRootResource {
  
  /** Get streaming application information */
  def streamingApp(): ApiStreamingApp
  
  /** Get streaming statistics */
  def getStreamingStatistics(): StreamingStatistics
  
  /** Get batch information */
  def getBatches(): Seq[BatchInfo]
  
  /** Get specific batch by time */
  def getBatch(batchTime: Long): BatchInfo
  
  /** Get receiver information */
  def getReceivers(): Seq[ReceiverInfo]
}

/**
 * Streaming application resource for REST API
 */
class ApiStreamingApp {
  
  /** Get application details */
  def getApplicationInfo(): ApplicationInfo
  
  /** Get streaming context status */
  def getStatus(): String
}

REST API Data Classes

Data transfer objects for REST API responses.

/**
 * Streaming statistics for REST API responses
 */
case class StreamingStatistics(
  startTime: Long,
  batchDuration: Long,
  numReceivers: Int,
  numActiveReceivers: Int,
  numInactiveReceivers: Int,
  numTotalCompletedBatches: Long,
  numRetainedCompletedBatches: Long,
  numActiveBatches: Long,
  numProcessedRecords: Long,
  numReceivedRecords: Long,
  avgInputSize: Double,
  avgProcessingTime: Double,
  avgSchedulingDelay: Double,
  avgTotalDelay: Double
)

/**
 * Batch information for REST API
 */
case class BatchInfo(
  batchId: Long,
  batchTime: Long,
  status: String,
  inputSize: Long,
  schedulingDelay: Long,
  processingDelay: Long,
  outputOperations: Seq[OutputOperationInfo]
)

/**
 * Output operation information for REST API
 */
case class OutputOperationInfo(
  id: Int,
  name: String,
  description: String,
  startTime: Long,
  endTime: Long,
  duration: Long,
  status: String,
  errorMessage: Option[String]
)

/**
 * Receiver information for REST API
 */
case class ReceiverInfo(
  streamId: Int,
  name: String,
  status: String,
  location: String,
  executorId: String,
  lastErrorMessage: String,
  lastErrorTime: Long
)

REST API Endpoints

Base URL: http://<driver-host>:<port>/api/v1/applications/<app-id>/streaming/

Available Endpoints:

  • GET / - Application streaming information
  • GET /batches - List of all batches
  • GET /batches/{batch-time} - Specific batch details
  • GET /receivers - List of all receivers
  • GET /receivers/{stream-id} - Specific receiver details

Usage Examples:

# Get streaming statistics
curl http://localhost:4040/api/v1/applications/app-123/streaming/

# Get batch information
curl http://localhost:4040/api/v1/applications/app-123/streaming/batches

# Get specific batch
curl http://localhost:4040/api/v1/applications/app-123/streaming/batches/1609459200000

# Get receiver status
curl http://localhost:4040/api/v1/applications/app-123/streaming/receivers