or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-streaming.mdindex.mdinput-sources.mdjava-api.mdmonitoring-listeners.mdstate-management.mdweb-ui.md

web-ui.mddocs/

0

# Web UI

1

2

Built-in web interface for visualizing streaming application metrics, batch processing status, receiver information, and performance analytics through an integrated dashboard.

3

4

## Capabilities

5

6

### StreamingTab

7

8

Main web UI tab integration with Spark's web interface providing streaming-specific dashboard and navigation.

9

10

```scala { .api }

11

/**

12

* Web UI tab for streaming applications

13

* Integrates with Spark's main web UI

14

*/

15

class StreamingTab(parent: SparkUI) extends SparkUITab(parent, "streaming") {

16

17

/** Get the main streaming page */

18

def streamingPage: StreamingPage

19

20

/** Get batch detail pages */

21

def batchPage: BatchPage

22

23

/** Attach streaming tab to Spark UI */

24

def attachTab(): Unit

25

26

/** Detach streaming tab from Spark UI */

27

def detachTab(): Unit

28

}

29

```

30

31

### StreamingPage

32

33

Main streaming dashboard page displaying overall application metrics, active receivers, and batch processing statistics.

34

35

```scala { .api }

36

/**

37

* Main streaming web UI page showing application overview

38

*/

39

class StreamingPage(parent: StreamingTab) extends WebUIPage("") {

40

41

/** Render the streaming dashboard HTML */

42

def render(request: HttpServletRequest): Seq[Node]

43

44

/** Get streaming statistics for display */

45

def streamingStatistics: StreamingStatistics

46

47

/** Get current receiver information */

48

def receiverInfo: Seq[ReceiverInfo]

49

50

/** Get recent batch information */

51

def recentBatches: Seq[BatchInfo]

52

}

53

```

54

55

### BatchPage

56

57

Detailed page for individual batch analysis showing input sources, processing stages, and output operations.

58

59

```scala { .api }

60

/**

61

* Web UI page for detailed batch information

62

*/

63

class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {

64

65

/** Render batch detail page */

66

def render(request: HttpServletRequest): Seq[Node]

67

68

/** Get detailed batch information by batch time */

69

def getBatchInfo(batchTime: Time): Option[BatchInfo]

70

71

/** Get input stream details for batch */

72

def getInputStreamInfo(batchTime: Time): Map[Int, StreamInputInfo]

73

74

/** Get output operation details for batch */

75

def getOutputOperationInfo(batchTime: Time): Map[Int, OutputOperationInfo]

76

}

77

```

78

79

### StreamingJobProgressListener

80

81

Core listener that collects streaming metrics and provides data for the web UI dashboard.

82

83

```scala { .api }

84

/**

85

* Listener that tracks streaming job progress for web UI

86

* Automatically added when streaming tab is enabled

87

*/

88

class StreamingJobProgressListener(conf: SparkConf) extends StreamingListener {

89

90

// Data retention settings

91

/** Maximum number of batches to retain */

92

def retainedBatches: Int

93

94

/** Maximum number of completed batches to show */

95

def numBatchInfos: Int

96

97

// Batch information access

98

/** Get all retained batch information */

99

def batchInfos: Seq[BatchInfo]

100

101

/** Get batch info by time */

102

def getBatchInfo(batchTime: Time): Option[BatchInfo]

103

104

/** Get last completed batch */

105

def lastCompletedBatch: Option[BatchInfo]

106

107

/** Get currently processing batch */

108

def processingBatch: Option[BatchInfo]

109

110

// Receiver information

111

/** Get all receiver information */

112

def receiverInfos: Map[Int, ReceiverInfo]

113

114

/** Get receiver info by stream ID */

115

def getReceiverInfo(streamId: Int): Option[ReceiverInfo]

116

117

// Statistics computation

118

/** Get average processing delay */

119

def avgProcessingDelay: Option[Double]

120

121

/** Get average scheduling delay */

122

def avgSchedulingDelay: Option[Double]

123

124

/** Get total processed records */

125

def totalProcessedRecords: Long

126

127

/** Get processing rate (records/second) */

128

def processingRate: Double

129

130

// Event handling (inherited from StreamingListener)

131

override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit

132

override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit

133

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit

134

override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit

135

override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit

136

override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit

137

}

138

```

139

140

### Batch UI Data

141

142

Data structures used by the web UI to display batch and streaming information.

143

144

```scala { .api }

145

/**

146

* UI data structure for batch display

147

*/

148

case class BatchUIData(

149

batchInfo: BatchInfo,

150

streamIdToInputInfo: Map[Int, StreamInputInfo],

151

outputOperations: Seq[OutputOperationUIData]

152

)

153

154

/**

155

* UI data structure for output operations

156

*/

157

case class OutputOperationUIData(

158

id: Int,

159

name: String,

160

description: String,

161

startTime: Option[Long],

162

endTime: Option[Long],

163

failureReason: Option[String]

164

)

165

166

/**

167

* UI data structure for receiver display

168

*/

169

case class ReceiverUIData(

170

streamId: Int,

171

name: String,

172

active: Boolean,

173

location: String,

174

lastErrorMessage: String,

175

lastErrorTime: Long

176

)

177

```

178

179

### UI Tables and Components

180

181

Reusable UI components for displaying streaming data in tabular format.

182

183

```scala { .api }

184

/**

185

* Table component for displaying all batches

186

*/

187

class AllBatchesTable(

188

batches: Seq[BatchUIData],

189

streaming: Boolean = true

190

) {

191

192

/** Render table as HTML */

193

def toHtmlTable: Seq[Node]

194

195

/** Generate table headers */

196

def headers: Seq[String]

197

198

/** Generate table rows */

199

def rows: Seq[Seq[Node]]

200

}

201

202

/**

203

* Table component for displaying receivers

204

*/

205

class ReceiversTable(receivers: Seq[ReceiverUIData]) {

206

207

/** Render receiver table */

208

def toHtmlTable: Seq[Node]

209

}

210

211

/**

212

* Component for displaying streaming statistics

213

*/

214

class StreamingStatisticsTable(stats: StreamingStatistics) {

215

216

/** Render statistics */

217

def toHtmlTable: Seq[Node]

218

}

219

```

220

221

### UI Utility Functions

222

223

Helper functions for formatting and displaying streaming data in the web interface.

224

225

```scala { .api }

226

/**

227

* Utility functions for streaming UI

228

*/

229

object UIUtils {

230

231

/** Format duration for display */

232

def formatDuration(milliseconds: Long): String

233

234

/** Format timestamp for display */

235

def formatDate(timestamp: Long): String

236

237

/** Format batch time for display */

238

def formatBatchTime(batchTime: Time): String

239

240

/** Generate progress bar HTML */

241

def progressBar(

242

completed: Int,

243

failed: Int,

244

total: Int

245

): Seq[Node]

246

247

/** Generate batch status badge */

248

def batchStatusBadge(batchInfo: BatchInfo): Seq[Node]

249

250

/** Generate receiver status indicator */

251

def receiverStatusIndicator(receiverInfo: ReceiverInfo): Seq[Node]

252

253

/** Format rate for display (records/sec) */

254

def formatRate(recordsPerSecond: Double): String

255

256

/** Format byte size for display */

257

def formatBytes(bytes: Long): String

258

}

259

```

260

261

## Web UI Features

262

263

### Dashboard Overview

264

265

The streaming web UI provides a comprehensive dashboard with:

266

267

- **Application Summary**: Current status, uptime, total batches processed

268

- **Batch Timeline**: Visual timeline of batch processing with status indicators

269

- **Performance Metrics**: Processing delays, scheduling delays, throughput rates

270

- **Receiver Status**: Active receivers, error states, data ingestion rates

271

- **Resource Utilization**: Memory usage, CPU metrics, executor information

272

273

### Batch Details

274

275

Detailed view for each batch includes:

276

277

- **Timing Information**: Submission, start, and completion times

278

- **Input Sources**: Data volume and sources for each input stream

279

- **Processing Stages**: Breakdown of computation stages and dependencies

280

- **Output Operations**: Status and performance of each output operation

281

- **Error Information**: Detailed error messages and stack traces for failures

282

283

### Interactive Features

284

285

- **Real-time Updates**: Dashboard refreshes automatically to show current status

286

- **Historical Data**: Browse historical batches and performance trends

287

- **Filtering**: Filter batches by status, time range, or specific criteria

288

- **Drill-down Navigation**: Click through from overview to detailed views

289

- **Export Capabilities**: Download metrics data for external analysis

290

291

## Usage Examples

292

293

### Accessing the Web UI

294

295

```scala

296

// The streaming web UI is automatically available when you create a StreamingContext

297

val ssc = new StreamingContext(conf, Seconds(2))

298

299

// Access the web UI at: http://<driver-host>:4040/streaming/

300

// The port is configurable via spark.ui.port (default 4040)

301

```

302

303

### Programmatic Access to UI Data

304

305

```scala

306

// Get the streaming job progress listener

307

val streamingListener = ssc.progressListener

308

309

// Access batch information

310

val recentBatches = streamingListener.batchInfos.take(10)

311

recentBatches.foreach { batch =>

312

println(s"Batch ${batch.batchTime}: " +

313

s"processing=${batch.processingDelay}ms, " +

314

s"records=${batch.streamIdToInputInfo.values.map(_.numRecords).sum}")

315

}

316

317

// Access receiver information

318

streamingListener.receiverInfos.foreach { case (streamId, receiverInfo) =>

319

println(s"Stream $streamId: ${receiverInfo.name} - Active: ${receiverInfo.active}")

320

}

321

322

// Get performance statistics

323

val avgProcessing = streamingListener.avgProcessingDelay.getOrElse(0.0)

324

val avgScheduling = streamingListener.avgSchedulingDelay.getOrElse(0.0)

325

println(s"Average delays - Processing: ${avgProcessing}ms, Scheduling: ${avgScheduling}ms")

326

```

327

328

### Custom UI Integration

329

330

```scala

331

// Add custom listener that integrates with external monitoring

332

class CustomUIListener extends StreamingListener {

333

334

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {

335

val batch = batchCompleted.batchInfo

336

337

// Send data to external dashboard

338

sendToCustomDashboard(

339

batchTime = batch.batchTime,

340

processingDelay = batch.processingDelay,

341

inputRecords = batch.streamIdToInputInfo.values.map(_.numRecords).sum

342

)

343

}

344

345

private def sendToCustomDashboard(

346

batchTime: Time,

347

processingDelay: Option[Long],

348

inputRecords: Long

349

): Unit = {

350

// Implementation for custom dashboard integration

351

}

352

}

353

354

ssc.addStreamingListener(new CustomUIListener())

355

```

356

357

## Configuration

358

359

### UI Settings

360

361

Key configuration options for the streaming web UI:

362

363

```scala

364

val conf = new SparkConf()

365

.set("spark.ui.port", "4041") // Change web UI port

366

.set("spark.streaming.ui.retainedBatches", "1000") // Number of batches to retain

367

.set("spark.ui.retainedJobs", "1000") // Spark jobs to retain

368

.set("spark.ui.enabled", "true") // Enable web UI

369

```

370

371

### Memory Management

372

373

The web UI listener retains batch information in memory. Configure retention limits to manage memory usage:

374

375

```scala

376

// Limit number of retained batches to control memory

377

val conf = new SparkConf()

378

.set("spark.streaming.ui.retainedBatches", "100") // Retain last 100 batches

379

```

380

381

### Security

382

383

Configure security settings for production deployments:

384

385

```scala

386

val conf = new SparkConf()

387

.set("spark.ui.filters", "org.apache.spark.deploy.yarn.YarnProxyRedirectFilter")

388

.set("spark.authenticate", "true")

389

.set("spark.authenticate.secret", "secret-key")

390

```

391

392

## REST API

393

394

REST API endpoints for programmatic access to streaming metrics and application status.

395

396

### REST API Classes

397

398

Classes providing HTTP endpoints for streaming application data.

399

400

```scala { .api }

401

/**

402

* Root resource for streaming REST API endpoints

403

*/

404

class ApiStreamingRootResource {

405

406

/** Get streaming application information */

407

def streamingApp(): ApiStreamingApp

408

409

/** Get streaming statistics */

410

def getStreamingStatistics(): StreamingStatistics

411

412

/** Get batch information */

413

def getBatches(): Seq[BatchInfo]

414

415

/** Get specific batch by time */

416

def getBatch(batchTime: Long): BatchInfo

417

418

/** Get receiver information */

419

def getReceivers(): Seq[ReceiverInfo]

420

}

421

422

/**

423

* Streaming application resource for REST API

424

*/

425

class ApiStreamingApp {

426

427

/** Get application details */

428

def getApplicationInfo(): ApplicationInfo

429

430

/** Get streaming context status */

431

def getStatus(): String

432

}

433

```

434

435

### REST API Data Classes

436

437

Data transfer objects for REST API responses.

438

439

```scala { .api }

440

/**

441

* Streaming statistics for REST API responses

442

*/

443

case class StreamingStatistics(

444

startTime: Long,

445

batchDuration: Long,

446

numReceivers: Int,

447

numActiveReceivers: Int,

448

numInactiveReceivers: Int,

449

numTotalCompletedBatches: Long,

450

numRetainedCompletedBatches: Long,

451

numActiveBatches: Long,

452

numProcessedRecords: Long,

453

numReceivedRecords: Long,

454

avgInputSize: Double,

455

avgProcessingTime: Double,

456

avgSchedulingDelay: Double,

457

avgTotalDelay: Double

458

)

459

460

/**

461

* Batch information for REST API

462

*/

463

case class BatchInfo(

464

batchId: Long,

465

batchTime: Long,

466

status: String,

467

inputSize: Long,

468

schedulingDelay: Long,

469

processingDelay: Long,

470

outputOperations: Seq[OutputOperationInfo]

471

)

472

473

/**

474

* Output operation information for REST API

475

*/

476

case class OutputOperationInfo(

477

id: Int,

478

name: String,

479

description: String,

480

startTime: Long,

481

endTime: Long,

482

duration: Long,

483

status: String,

484

errorMessage: Option[String]

485

)

486

487

/**

488

* Receiver information for REST API

489

*/

490

case class ReceiverInfo(

491

streamId: Int,

492

name: String,

493

status: String,

494

location: String,

495

executorId: String,

496

lastErrorMessage: String,

497

lastErrorTime: Long

498

)

499

```

500

501

### REST API Endpoints

502

503

**Base URL**: `http://<driver-host>:<port>/api/v1/applications/<app-id>/streaming/`

504

505

**Available Endpoints**:

506

507

- `GET /` - Application streaming information

508

- `GET /batches` - List of all batches

509

- `GET /batches/{batch-time}` - Specific batch details

510

- `GET /receivers` - List of all receivers

511

- `GET /receivers/{stream-id}` - Specific receiver details

512

513

**Usage Examples**:

514

515

```bash

516

# Get streaming statistics

517

curl http://localhost:4040/api/v1/applications/app-123/streaming/

518

519

# Get batch information

520

curl http://localhost:4040/api/v1/applications/app-123/streaming/batches

521

522

# Get specific batch

523

curl http://localhost:4040/api/v1/applications/app-123/streaming/batches/1609459200000

524

525

# Get receiver status

526

curl http://localhost:4040/api/v1/applications/app-123/streaming/receivers

527

```