0
# Web UI
1
2
Built-in web interface for visualizing streaming application metrics, batch processing status, receiver information, and performance analytics through an integrated dashboard.
3
4
## Capabilities
5
6
### StreamingTab
7
8
Main web UI tab integration with Spark's web interface providing streaming-specific dashboard and navigation.
9
10
```scala { .api }
11
/**
12
* Web UI tab for streaming applications
13
* Integrates with Spark's main web UI
14
*/
15
class StreamingTab(parent: SparkUI) extends SparkUITab(parent, "streaming") {
16
17
/** Get the main streaming page */
18
def streamingPage: StreamingPage
19
20
/** Get batch detail pages */
21
def batchPage: BatchPage
22
23
/** Attach streaming tab to Spark UI */
24
def attachTab(): Unit
25
26
/** Detach streaming tab from Spark UI */
27
def detachTab(): Unit
28
}
29
```
30
31
### StreamingPage
32
33
Main streaming dashboard page displaying overall application metrics, active receivers, and batch processing statistics.
34
35
```scala { .api }
36
/**
37
* Main streaming web UI page showing application overview
38
*/
39
class StreamingPage(parent: StreamingTab) extends WebUIPage("") {
40
41
/** Render the streaming dashboard HTML */
42
def render(request: HttpServletRequest): Seq[Node]
43
44
/** Get streaming statistics for display */
45
def streamingStatistics: StreamingStatistics
46
47
/** Get current receiver information */
48
def receiverInfo: Seq[ReceiverInfo]
49
50
/** Get recent batch information */
51
def recentBatches: Seq[BatchInfo]
52
}
53
```
54
55
### BatchPage
56
57
Detailed page for individual batch analysis showing input sources, processing stages, and output operations.
58
59
```scala { .api }
60
/**
61
* Web UI page for detailed batch information
62
*/
63
class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
64
65
/** Render batch detail page */
66
def render(request: HttpServletRequest): Seq[Node]
67
68
/** Get detailed batch information by batch time */
69
def getBatchInfo(batchTime: Time): Option[BatchInfo]
70
71
/** Get input stream details for batch */
72
def getInputStreamInfo(batchTime: Time): Map[Int, StreamInputInfo]
73
74
/** Get output operation details for batch */
75
def getOutputOperationInfo(batchTime: Time): Map[Int, OutputOperationInfo]
76
}
77
```
78
79
### StreamingJobProgressListener
80
81
Core listener that collects streaming metrics and provides data for the web UI dashboard.
82
83
```scala { .api }
84
/**
85
* Listener that tracks streaming job progress for web UI
86
* Automatically added when streaming tab is enabled
87
*/
88
class StreamingJobProgressListener(conf: SparkConf) extends StreamingListener {
89
90
// Data retention settings
91
/** Maximum number of batches to retain */
92
def retainedBatches: Int
93
94
/** Maximum number of completed batches to show */
95
def numBatchInfos: Int
96
97
// Batch information access
98
/** Get all retained batch information */
99
def batchInfos: Seq[BatchInfo]
100
101
/** Get batch info by time */
102
def getBatchInfo(batchTime: Time): Option[BatchInfo]
103
104
/** Get last completed batch */
105
def lastCompletedBatch: Option[BatchInfo]
106
107
/** Get currently processing batch */
108
def processingBatch: Option[BatchInfo]
109
110
// Receiver information
111
/** Get all receiver information */
112
def receiverInfos: Map[Int, ReceiverInfo]
113
114
/** Get receiver info by stream ID */
115
def getReceiverInfo(streamId: Int): Option[ReceiverInfo]
116
117
// Statistics computation
118
/** Get average processing delay */
119
def avgProcessingDelay: Option[Double]
120
121
/** Get average scheduling delay */
122
def avgSchedulingDelay: Option[Double]
123
124
/** Get total processed records */
125
def totalProcessedRecords: Long
126
127
/** Get processing rate (records/second) */
128
def processingRate: Double
129
130
// Event handling (inherited from StreamingListener)
131
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit
132
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit
133
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit
134
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit
135
override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit
136
override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit
137
}
138
```
139
140
### Batch UI Data
141
142
Data structures used by the web UI to display batch and streaming information.
143
144
```scala { .api }
145
/**
146
* UI data structure for batch display
147
*/
148
case class BatchUIData(
149
batchInfo: BatchInfo,
150
streamIdToInputInfo: Map[Int, StreamInputInfo],
151
outputOperations: Seq[OutputOperationUIData]
152
)
153
154
/**
155
* UI data structure for output operations
156
*/
157
case class OutputOperationUIData(
158
id: Int,
159
name: String,
160
description: String,
161
startTime: Option[Long],
162
endTime: Option[Long],
163
failureReason: Option[String]
164
)
165
166
/**
167
* UI data structure for receiver display
168
*/
169
case class ReceiverUIData(
170
streamId: Int,
171
name: String,
172
active: Boolean,
173
location: String,
174
lastErrorMessage: String,
175
lastErrorTime: Long
176
)
177
```
178
179
### UI Tables and Components
180
181
Reusable UI components for displaying streaming data in tabular format.
182
183
```scala { .api }
184
/**
185
* Table component for displaying all batches
186
*/
187
class AllBatchesTable(
188
batches: Seq[BatchUIData],
189
streaming: Boolean = true
190
) {
191
192
/** Render table as HTML */
193
def toHtmlTable: Seq[Node]
194
195
/** Generate table headers */
196
def headers: Seq[String]
197
198
/** Generate table rows */
199
def rows: Seq[Seq[Node]]
200
}
201
202
/**
203
* Table component for displaying receivers
204
*/
205
class ReceiversTable(receivers: Seq[ReceiverUIData]) {
206
207
/** Render receiver table */
208
def toHtmlTable: Seq[Node]
209
}
210
211
/**
212
* Component for displaying streaming statistics
213
*/
214
class StreamingStatisticsTable(stats: StreamingStatistics) {
215
216
/** Render statistics */
217
def toHtmlTable: Seq[Node]
218
}
219
```
220
221
### UI Utility Functions
222
223
Helper functions for formatting and displaying streaming data in the web interface.
224
225
```scala { .api }
226
/**
227
* Utility functions for streaming UI
228
*/
229
object UIUtils {
230
231
/** Format duration for display */
232
def formatDuration(milliseconds: Long): String
233
234
/** Format timestamp for display */
235
def formatDate(timestamp: Long): String
236
237
/** Format batch time for display */
238
def formatBatchTime(batchTime: Time): String
239
240
/** Generate progress bar HTML */
241
def progressBar(
242
completed: Int,
243
failed: Int,
244
total: Int
245
): Seq[Node]
246
247
/** Generate batch status badge */
248
def batchStatusBadge(batchInfo: BatchInfo): Seq[Node]
249
250
/** Generate receiver status indicator */
251
def receiverStatusIndicator(receiverInfo: ReceiverInfo): Seq[Node]
252
253
/** Format rate for display (records/sec) */
254
def formatRate(recordsPerSecond: Double): String
255
256
/** Format byte size for display */
257
def formatBytes(bytes: Long): String
258
}
259
```
260
261
## Web UI Features
262
263
### Dashboard Overview
264
265
The streaming web UI provides a comprehensive dashboard with:
266
267
- **Application Summary**: Current status, uptime, total batches processed
268
- **Batch Timeline**: Visual timeline of batch processing with status indicators
269
- **Performance Metrics**: Processing delays, scheduling delays, throughput rates
270
- **Receiver Status**: Active receivers, error states, data ingestion rates
271
- **Resource Utilization**: Memory usage, CPU metrics, executor information
272
273
### Batch Details
274
275
Detailed view for each batch includes:
276
277
- **Timing Information**: Submission, start, and completion times
278
- **Input Sources**: Data volume and sources for each input stream
279
- **Processing Stages**: Breakdown of computation stages and dependencies
280
- **Output Operations**: Status and performance of each output operation
281
- **Error Information**: Detailed error messages and stack traces for failures
282
283
### Interactive Features
284
285
- **Real-time Updates**: Dashboard refreshes automatically to show current status
286
- **Historical Data**: Browse historical batches and performance trends
287
- **Filtering**: Filter batches by status, time range, or specific criteria
288
- **Drill-down Navigation**: Click through from overview to detailed views
289
- **Export Capabilities**: Download metrics data for external analysis
290
291
## Usage Examples
292
293
### Accessing the Web UI
294
295
```scala
296
// The streaming web UI is automatically available when you create a StreamingContext
297
val ssc = new StreamingContext(conf, Seconds(2))
298
299
// Access the web UI at: http://<driver-host>:4040/streaming/
300
// The port is configurable via spark.ui.port (default 4040)
301
```
302
303
### Programmatic Access to UI Data
304
305
```scala
306
// Get the streaming job progress listener
307
val streamingListener = ssc.progressListener
308
309
// Access batch information
310
val recentBatches = streamingListener.batchInfos.take(10)
311
recentBatches.foreach { batch =>
312
println(s"Batch ${batch.batchTime}: " +
313
s"processing=${batch.processingDelay}ms, " +
314
s"records=${batch.streamIdToInputInfo.values.map(_.numRecords).sum}")
315
}
316
317
// Access receiver information
318
streamingListener.receiverInfos.foreach { case (streamId, receiverInfo) =>
319
println(s"Stream $streamId: ${receiverInfo.name} - Active: ${receiverInfo.active}")
320
}
321
322
// Get performance statistics
323
val avgProcessing = streamingListener.avgProcessingDelay.getOrElse(0.0)
324
val avgScheduling = streamingListener.avgSchedulingDelay.getOrElse(0.0)
325
println(s"Average delays - Processing: ${avgProcessing}ms, Scheduling: ${avgScheduling}ms")
326
```
327
328
### Custom UI Integration
329
330
```scala
331
// Add custom listener that integrates with external monitoring
332
class CustomUIListener extends StreamingListener {
333
334
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
335
val batch = batchCompleted.batchInfo
336
337
// Send data to external dashboard
338
sendToCustomDashboard(
339
batchTime = batch.batchTime,
340
processingDelay = batch.processingDelay,
341
inputRecords = batch.streamIdToInputInfo.values.map(_.numRecords).sum
342
)
343
}
344
345
private def sendToCustomDashboard(
346
batchTime: Time,
347
processingDelay: Option[Long],
348
inputRecords: Long
349
): Unit = {
350
// Implementation for custom dashboard integration
351
}
352
}
353
354
ssc.addStreamingListener(new CustomUIListener())
355
```
356
357
## Configuration
358
359
### UI Settings
360
361
Key configuration options for the streaming web UI:
362
363
```scala
364
val conf = new SparkConf()
365
.set("spark.ui.port", "4041") // Change web UI port
366
.set("spark.streaming.ui.retainedBatches", "1000") // Number of batches to retain
367
.set("spark.ui.retainedJobs", "1000") // Spark jobs to retain
368
.set("spark.ui.enabled", "true") // Enable web UI
369
```
370
371
### Memory Management
372
373
The web UI listener retains batch information in memory. Configure retention limits to manage memory usage:
374
375
```scala
376
// Limit number of retained batches to control memory
377
val conf = new SparkConf()
378
.set("spark.streaming.ui.retainedBatches", "100") // Retain last 100 batches
379
```
380
381
### Security
382
383
Configure security settings for production deployments:
384
385
```scala
386
val conf = new SparkConf()
387
.set("spark.ui.filters", "org.apache.spark.deploy.yarn.YarnProxyRedirectFilter")
388
.set("spark.authenticate", "true")
389
.set("spark.authenticate.secret", "secret-key")
390
```
391
392
## REST API
393
394
REST API endpoints for programmatic access to streaming metrics and application status.
395
396
### REST API Classes
397
398
Classes providing HTTP endpoints for streaming application data.
399
400
```scala { .api }
401
/**
402
* Root resource for streaming REST API endpoints
403
*/
404
class ApiStreamingRootResource {
405
406
/** Get streaming application information */
407
def streamingApp(): ApiStreamingApp
408
409
/** Get streaming statistics */
410
def getStreamingStatistics(): StreamingStatistics
411
412
/** Get batch information */
413
def getBatches(): Seq[BatchInfo]
414
415
/** Get specific batch by time */
416
def getBatch(batchTime: Long): BatchInfo
417
418
/** Get receiver information */
419
def getReceivers(): Seq[ReceiverInfo]
420
}
421
422
/**
423
* Streaming application resource for REST API
424
*/
425
class ApiStreamingApp {
426
427
/** Get application details */
428
def getApplicationInfo(): ApplicationInfo
429
430
/** Get streaming context status */
431
def getStatus(): String
432
}
433
```
434
435
### REST API Data Classes
436
437
Data transfer objects for REST API responses.
438
439
```scala { .api }
440
/**
441
* Streaming statistics for REST API responses
442
*/
443
case class StreamingStatistics(
444
startTime: Long,
445
batchDuration: Long,
446
numReceivers: Int,
447
numActiveReceivers: Int,
448
numInactiveReceivers: Int,
449
numTotalCompletedBatches: Long,
450
numRetainedCompletedBatches: Long,
451
numActiveBatches: Long,
452
numProcessedRecords: Long,
453
numReceivedRecords: Long,
454
avgInputSize: Double,
455
avgProcessingTime: Double,
456
avgSchedulingDelay: Double,
457
avgTotalDelay: Double
458
)
459
460
/**
461
* Batch information for REST API
462
*/
463
case class BatchInfo(
464
batchId: Long,
465
batchTime: Long,
466
status: String,
467
inputSize: Long,
468
schedulingDelay: Long,
469
processingDelay: Long,
470
outputOperations: Seq[OutputOperationInfo]
471
)
472
473
/**
474
* Output operation information for REST API
475
*/
476
case class OutputOperationInfo(
477
id: Int,
478
name: String,
479
description: String,
480
startTime: Long,
481
endTime: Long,
482
duration: Long,
483
status: String,
484
errorMessage: Option[String]
485
)
486
487
/**
488
* Receiver information for REST API
489
*/
490
case class ReceiverInfo(
491
streamId: Int,
492
name: String,
493
status: String,
494
location: String,
495
executorId: String,
496
lastErrorMessage: String,
497
lastErrorTime: Long
498
)
499
```
500
501
### REST API Endpoints
502
503
**Base URL**: `http://<driver-host>:<port>/api/v1/applications/<app-id>/streaming/`
504
505
**Available Endpoints**:
506
507
- `GET /` - Application streaming information
508
- `GET /batches` - List of all batches
509
- `GET /batches/{batch-time}` - Specific batch details
510
- `GET /receivers` - List of all receivers
511
- `GET /receivers/{stream-id}` - Specific receiver details
512
513
**Usage Examples**:
514
515
```bash
516
# Get streaming statistics
517
curl http://localhost:4040/api/v1/applications/app-123/streaming/
518
519
# Get batch information
520
curl http://localhost:4040/api/v1/applications/app-123/streaming/batches
521
522
# Get specific batch
523
curl http://localhost:4040/api/v1/applications/app-123/streaming/batches/1609459200000
524
525
# Get receiver status
526
curl http://localhost:4040/api/v1/applications/app-123/streaming/receivers
527
```