Built-in web interface for visualizing streaming application metrics, batch processing status, receiver information, and performance analytics through an integrated dashboard.
Main web UI tab integration with Spark's web interface providing streaming-specific dashboard and navigation.
/**
* Web UI tab for streaming applications
* Integrates with Spark's main web UI
*/
class StreamingTab(parent: SparkUI) extends SparkUITab(parent, "streaming") {
/** Get the main streaming page */
def streamingPage: StreamingPage
/** Get batch detail pages */
def batchPage: BatchPage
/** Attach streaming tab to Spark UI */
def attachTab(): Unit
/** Detach streaming tab from Spark UI */
def detachTab(): Unit
}Main streaming dashboard page displaying overall application metrics, active receivers, and batch processing statistics.
/**
* Main streaming web UI page showing application overview
*/
class StreamingPage(parent: StreamingTab) extends WebUIPage("") {
/** Render the streaming dashboard HTML */
def render(request: HttpServletRequest): Seq[Node]
/** Get streaming statistics for display */
def streamingStatistics: StreamingStatistics
/** Get current receiver information */
def receiverInfo: Seq[ReceiverInfo]
/** Get recent batch information */
def recentBatches: Seq[BatchInfo]
}Detailed page for individual batch analysis showing input sources, processing stages, and output operations.
/**
* Web UI page for detailed batch information
*/
class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
/** Render batch detail page */
def render(request: HttpServletRequest): Seq[Node]
/** Get detailed batch information by batch time */
def getBatchInfo(batchTime: Time): Option[BatchInfo]
/** Get input stream details for batch */
def getInputStreamInfo(batchTime: Time): Map[Int, StreamInputInfo]
/** Get output operation details for batch */
def getOutputOperationInfo(batchTime: Time): Map[Int, OutputOperationInfo]
}Core listener that collects streaming metrics and provides data for the web UI dashboard.
/**
* Listener that tracks streaming job progress for web UI
* Automatically added when streaming tab is enabled
*/
class StreamingJobProgressListener(conf: SparkConf) extends StreamingListener {
// Data retention settings
/** Maximum number of batches to retain */
def retainedBatches: Int
/** Maximum number of completed batches to show */
def numBatchInfos: Int
// Batch information access
/** Get all retained batch information */
def batchInfos: Seq[BatchInfo]
/** Get batch info by time */
def getBatchInfo(batchTime: Time): Option[BatchInfo]
/** Get last completed batch */
def lastCompletedBatch: Option[BatchInfo]
/** Get currently processing batch */
def processingBatch: Option[BatchInfo]
// Receiver information
/** Get all receiver information */
def receiverInfos: Map[Int, ReceiverInfo]
/** Get receiver info by stream ID */
def getReceiverInfo(streamId: Int): Option[ReceiverInfo]
// Statistics computation
/** Get average processing delay */
def avgProcessingDelay: Option[Double]
/** Get average scheduling delay */
def avgSchedulingDelay: Option[Double]
/** Get total processed records */
def totalProcessedRecords: Long
/** Get processing rate (records/second) */
def processingRate: Double
// Event handling (inherited from StreamingListener)
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit
override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit
override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit
}Data structures used by the web UI to display batch and streaming information.
/**
* UI data structure for batch display
*/
case class BatchUIData(
batchInfo: BatchInfo,
streamIdToInputInfo: Map[Int, StreamInputInfo],
outputOperations: Seq[OutputOperationUIData]
)
/**
* UI data structure for output operations
*/
case class OutputOperationUIData(
id: Int,
name: String,
description: String,
startTime: Option[Long],
endTime: Option[Long],
failureReason: Option[String]
)
/**
* UI data structure for receiver display
*/
case class ReceiverUIData(
streamId: Int,
name: String,
active: Boolean,
location: String,
lastErrorMessage: String,
lastErrorTime: Long
)Reusable UI components for displaying streaming data in tabular format.
/**
* Table component for displaying all batches
*/
class AllBatchesTable(
batches: Seq[BatchUIData],
streaming: Boolean = true
) {
/** Render table as HTML */
def toHtmlTable: Seq[Node]
/** Generate table headers */
def headers: Seq[String]
/** Generate table rows */
def rows: Seq[Seq[Node]]
}
/**
* Table component for displaying receivers
*/
class ReceiversTable(receivers: Seq[ReceiverUIData]) {
/** Render receiver table */
def toHtmlTable: Seq[Node]
}
/**
* Component for displaying streaming statistics
*/
class StreamingStatisticsTable(stats: StreamingStatistics) {
/** Render statistics */
def toHtmlTable: Seq[Node]
}Helper functions for formatting and displaying streaming data in the web interface.
/**
* Utility functions for streaming UI
*/
object UIUtils {
/** Format duration for display */
def formatDuration(milliseconds: Long): String
/** Format timestamp for display */
def formatDate(timestamp: Long): String
/** Format batch time for display */
def formatBatchTime(batchTime: Time): String
/** Generate progress bar HTML */
def progressBar(
completed: Int,
failed: Int,
total: Int
): Seq[Node]
/** Generate batch status badge */
def batchStatusBadge(batchInfo: BatchInfo): Seq[Node]
/** Generate receiver status indicator */
def receiverStatusIndicator(receiverInfo: ReceiverInfo): Seq[Node]
/** Format rate for display (records/sec) */
def formatRate(recordsPerSecond: Double): String
/** Format byte size for display */
def formatBytes(bytes: Long): String
}The streaming web UI provides a comprehensive dashboard with:
Detailed view for each batch includes:
// The streaming web UI is automatically available when you create a StreamingContext
val ssc = new StreamingContext(conf, Seconds(2))
// Access the web UI at: http://<driver-host>:4040/streaming/
// The port is configurable via spark.ui.port (default 4040)// Get the streaming job progress listener
val streamingListener = ssc.progressListener
// Access batch information
val recentBatches = streamingListener.batchInfos.take(10)
recentBatches.foreach { batch =>
println(s"Batch ${batch.batchTime}: " +
s"processing=${batch.processingDelay}ms, " +
s"records=${batch.streamIdToInputInfo.values.map(_.numRecords).sum}")
}
// Access receiver information
streamingListener.receiverInfos.foreach { case (streamId, receiverInfo) =>
println(s"Stream $streamId: ${receiverInfo.name} - Active: ${receiverInfo.active}")
}
// Get performance statistics
val avgProcessing = streamingListener.avgProcessingDelay.getOrElse(0.0)
val avgScheduling = streamingListener.avgSchedulingDelay.getOrElse(0.0)
println(s"Average delays - Processing: ${avgProcessing}ms, Scheduling: ${avgScheduling}ms")// Add custom listener that integrates with external monitoring
class CustomUIListener extends StreamingListener {
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
val batch = batchCompleted.batchInfo
// Send data to external dashboard
sendToCustomDashboard(
batchTime = batch.batchTime,
processingDelay = batch.processingDelay,
inputRecords = batch.streamIdToInputInfo.values.map(_.numRecords).sum
)
}
private def sendToCustomDashboard(
batchTime: Time,
processingDelay: Option[Long],
inputRecords: Long
): Unit = {
// Implementation for custom dashboard integration
}
}
ssc.addStreamingListener(new CustomUIListener())Key configuration options for the streaming web UI:
val conf = new SparkConf()
.set("spark.ui.port", "4041") // Change web UI port
.set("spark.streaming.ui.retainedBatches", "1000") // Number of batches to retain
.set("spark.ui.retainedJobs", "1000") // Spark jobs to retain
.set("spark.ui.enabled", "true") // Enable web UIThe web UI listener retains batch information in memory. Configure retention limits to manage memory usage:
// Limit number of retained batches to control memory
val conf = new SparkConf()
.set("spark.streaming.ui.retainedBatches", "100") // Retain last 100 batchesConfigure security settings for production deployments:
val conf = new SparkConf()
.set("spark.ui.filters", "org.apache.spark.deploy.yarn.YarnProxyRedirectFilter")
.set("spark.authenticate", "true")
.set("spark.authenticate.secret", "secret-key")REST API endpoints for programmatic access to streaming metrics and application status.
Classes providing HTTP endpoints for streaming application data.
/**
* Root resource for streaming REST API endpoints
*/
class ApiStreamingRootResource {
/** Get streaming application information */
def streamingApp(): ApiStreamingApp
/** Get streaming statistics */
def getStreamingStatistics(): StreamingStatistics
/** Get batch information */
def getBatches(): Seq[BatchInfo]
/** Get specific batch by time */
def getBatch(batchTime: Long): BatchInfo
/** Get receiver information */
def getReceivers(): Seq[ReceiverInfo]
}
/**
* Streaming application resource for REST API
*/
class ApiStreamingApp {
/** Get application details */
def getApplicationInfo(): ApplicationInfo
/** Get streaming context status */
def getStatus(): String
}Data transfer objects for REST API responses.
/**
* Streaming statistics for REST API responses
*/
case class StreamingStatistics(
startTime: Long,
batchDuration: Long,
numReceivers: Int,
numActiveReceivers: Int,
numInactiveReceivers: Int,
numTotalCompletedBatches: Long,
numRetainedCompletedBatches: Long,
numActiveBatches: Long,
numProcessedRecords: Long,
numReceivedRecords: Long,
avgInputSize: Double,
avgProcessingTime: Double,
avgSchedulingDelay: Double,
avgTotalDelay: Double
)
/**
* Batch information for REST API
*/
case class BatchInfo(
batchId: Long,
batchTime: Long,
status: String,
inputSize: Long,
schedulingDelay: Long,
processingDelay: Long,
outputOperations: Seq[OutputOperationInfo]
)
/**
* Output operation information for REST API
*/
case class OutputOperationInfo(
id: Int,
name: String,
description: String,
startTime: Long,
endTime: Long,
duration: Long,
status: String,
errorMessage: Option[String]
)
/**
* Receiver information for REST API
*/
case class ReceiverInfo(
streamId: Int,
name: String,
status: String,
location: String,
executorId: String,
lastErrorMessage: String,
lastErrorTime: Long
)Base URL: http://<driver-host>:<port>/api/v1/applications/<app-id>/streaming/
Available Endpoints:
GET / - Application streaming informationGET /batches - List of all batchesGET /batches/{batch-time} - Specific batch detailsGET /receivers - List of all receiversGET /receivers/{stream-id} - Specific receiver detailsUsage Examples:
# Get streaming statistics
curl http://localhost:4040/api/v1/applications/app-123/streaming/
# Get batch information
curl http://localhost:4040/api/v1/applications/app-123/streaming/batches
# Get specific batch
curl http://localhost:4040/api/v1/applications/app-123/streaming/batches/1609459200000
# Get receiver status
curl http://localhost:4040/api/v1/applications/app-123/streaming/receivers