or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-streams.mdevent-monitoring.mdindex.mdinput-sources.mdjava-api.mdkey-value-ops.mdstate-management.mdstreaming-context.mdwindow-ops.md

event-monitoring.mddocs/

0

# Event Monitoring

1

2

Event monitoring in Spark Streaming provides detailed insights into streaming application performance, batch processing, receiver status, and output operations through a comprehensive listener system. This enables real-time monitoring, debugging, and performance optimization.

3

4

## Capabilities

5

6

### StreamingListener Interface

7

8

Core interface for receiving all streaming-related events during application execution.

9

10

```scala { .api }

11

/**

12

* Base trait for receiving streaming system events

13

*/

14

trait StreamingListener {

15

/**

16

* Called when streaming context starts

17

* @param streamingStarted - Event containing start time information

18

*/

19

def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit = {}

20

21

/**

22

* Called when a batch is submitted for processing

23

* @param batchSubmitted - Event containing batch submission details

24

*/

25

def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {}

26

27

/**

28

* Called when batch processing starts

29

* @param batchStarted - Event containing batch start details

30

*/

31

def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {}

32

33

/**

34

* Called when batch processing completes

35

* @param batchCompleted - Event containing batch completion details and metrics

36

*/

37

def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {}

38

39

/**

40

* Called when an output operation starts

41

* @param outputOperationStarted - Event containing output operation start details

42

*/

43

def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = {}

44

45

/**

46

* Called when an output operation completes

47

* @param outputOperationCompleted - Event containing output operation completion details

48

*/

49

def onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {}

50

51

/**

52

* Called when a receiver starts

53

* @param receiverStarted - Event containing receiver start information

54

*/

55

def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {}

56

57

/**

58

* Called when a receiver encounters an error

59

* @param receiverError - Event containing receiver error details

60

*/

61

def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {}

62

63

/**

64

* Called when a receiver stops

65

* @param receiverStopped - Event containing receiver stop information

66

*/

67

def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = {}

68

}

69

```

70

71

### Streaming Event Types

72

73

Detailed event objects providing comprehensive information about streaming operations.

74

75

```scala { .api }

76

/**

77

* Base trait for all streaming events

78

*/

79

sealed trait StreamingListenerEvent

80

81

/**

82

* Event fired when streaming context starts

83

* @param time - Time when streaming started

84

*/

85

case class StreamingListenerStreamingStarted(time: Long) extends StreamingListenerEvent

86

87

/**

88

* Event fired when a batch is submitted

89

* @param batchInfo - Information about the submitted batch

90

*/

91

case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent

92

93

/**

94

* Event fired when batch processing starts

95

* @param batchInfo - Information about the batch being processed

96

*/

97

case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent

98

99

/**

100

* Event fired when batch processing completes

101

* @param batchInfo - Complete information about the processed batch including metrics

102

*/

103

case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent

104

105

/**

106

* Event fired when an output operation starts

107

* @param outputOperationInfo - Information about the output operation

108

*/

109

case class StreamingListenerOutputOperationStarted(outputOperationInfo: OutputOperationInfo) extends StreamingListenerEvent

110

111

/**

112

* Event fired when an output operation completes

113

* @param outputOperationInfo - Complete information about the output operation

114

*/

115

case class StreamingListenerOutputOperationCompleted(outputOperationInfo: OutputOperationInfo) extends StreamingListenerEvent

116

117

/**

118

* Event fired when a receiver starts

119

* @param receiverInfo - Information about the started receiver

120

*/

121

case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo) extends StreamingListenerEvent

122

123

/**

124

* Event fired when a receiver encounters an error

125

* @param receiverInfo - Information about the receiver that errored

126

*/

127

case class StreamingListenerReceiverError(receiverInfo: ReceiverInfo) extends StreamingListenerEvent

128

129

/**

130

* Event fired when a receiver stops

131

* @param receiverInfo - Information about the stopped receiver

132

*/

133

case class StreamingListenerReceiverStopped(receiverInfo: ReceiverInfo) extends StreamingListenerEvent

134

```

135

136

### Batch Information

137

138

Detailed information about batch processing including timing and performance metrics.

139

140

```scala { .api }

141

/**

142

* Information about a batch of streaming data

143

* @param batchTime - Time of the batch

144

* @param submissionTime - When the batch was submitted for processing

145

* @param processingStartTime - When batch processing actually started (optional)

146

* @param processingEndTime - When batch processing completed (optional)

147

* @param streamIdToInputInfo - Map of input stream IDs to input information

148

* @param outputOperationInfos - Information about output operations in this batch

149

*/

150

case class BatchInfo(

151

batchTime: Time,

152

streamIdToInputInfo: Map[Int, StreamInputInfo],

153

submissionTime: Long,

154

processingStartTime: Option[Long],

155

processingEndTime: Option[Long],

156

outputOperationInfos: Map[Int, OutputOperationInfo]

157

) {

158

/**

159

* Get total processing delay for this batch

160

* @returns Processing delay in milliseconds, or -1 if not available

161

*/

162

def processingDelay: Long

163

164

/**

165

* Get scheduling delay for this batch

166

* @returns Scheduling delay in milliseconds, or -1 if not available

167

*/

168

def schedulingDelay: Long

169

170

/**

171

* Get total delay for this batch

172

* @returns Total delay in milliseconds, or -1 if not available

173

*/

174

def totalDelay: Long

175

176

/**

177

* Get number of records processed in this batch

178

* @returns Total number of input records

179

*/

180

def numRecords: Long

181

}

182

```

183

184

### Output Operation Information

185

186

Details about individual output operations within batches.

187

188

```scala { .api }

189

/**

190

* Information about an output operation

191

* @param id - Unique identifier for the output operation

192

* @param name - Human-readable name of the operation

193

* @param description - Detailed description of the operation

194

* @param startTime - When the operation started (optional)

195

* @param endTime - When the operation completed (optional)

196

* @param failureReason - Reason for failure if operation failed (optional)

197

*/

198

case class OutputOperationInfo(

199

batchTime: Time,

200

id: Int,

201

name: String,

202

description: String,

203

startTime: Option[Long],

204

endTime: Option[Long],

205

failureReason: Option[String]

206

) {

207

/**

208

* Get duration of this output operation

209

* @returns Duration in milliseconds, or -1 if not available

210

*/

211

def duration: Long

212

}

213

```

214

215

### Receiver Information

216

217

Information about streaming data receivers including status and error details.

218

219

```scala { .api }

220

/**

221

* Information about stream data receivers

222

* @param streamId - ID of the input stream

223

* @param name - Name of the receiver

224

* @param active - Whether the receiver is currently active

225

* @param executorId - ID of executor running the receiver

226

* @param lastErrorMessage - Last error message from receiver (optional)

227

* @param lastError - Last error exception from receiver (optional)

228

* @param lastErrorTime - Time of last error (optional)

229

*/

230

case class ReceiverInfo(

231

streamId: Int,

232

name: String,

233

active: Boolean,

234

executorId: String,

235

lastErrorMessage: Option[String] = None,

236

lastError: Option[String] = None,

237

lastErrorTime: Option[Long] = None

238

)

239

```

240

241

### Built-in Listener Implementations

242

243

Pre-built listeners for common monitoring scenarios.

244

245

```scala { .api }

246

/**

247

* Listener that logs summary statistics about batches

248

* @param numBatchInfos - Number of recent batches to track for statistics

249

*/

250

class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener {

251

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit

252

253

/**

254

* Print current statistics summary

255

*/

256

def printStats(): Unit

257

}

258

```

259

260

**Usage Examples:**

261

262

```scala

263

// Custom listener implementation

264

class CustomStreamingListener extends StreamingListener {

265

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {

266

val batchInfo = batchCompleted.batchInfo

267

val processingTime = batchInfo.processingDelay

268

val schedulingDelay = batchInfo.schedulingDelay

269

val numRecords = batchInfo.numRecords

270

271

println(s"Batch ${batchInfo.batchTime}: " +

272

s"processed $numRecords records in ${processingTime}ms " +

273

s"(scheduling delay: ${schedulingDelay}ms)")

274

275

// Alert on high processing delay

276

if (processingTime > 5000) {

277

println(s"WARNING: High processing delay detected: ${processingTime}ms")

278

}

279

}

280

281

override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {

282

val receiverInfo = receiverError.receiverInfo

283

println(s"Receiver error on stream ${receiverInfo.streamId}: ${receiverInfo.lastErrorMessage}")

284

}

285

}

286

287

// Add listener to streaming context

288

val customListener = new CustomStreamingListener()

289

ssc.addStreamingListener(customListener)

290

291

// Use built-in stats listener

292

val statsListener = new StatsReportListener(20)

293

ssc.addStreamingListener(statsListener)

294

```

295

296

### Java API for Listeners

297

298

Java-friendly listener interface for Java applications.

299

300

```java { .api }

301

/**

302

* Abstract base class for Java streaming listeners

303

*/

304

public abstract class JavaStreamingListener {

305

public void onStreamingStarted(StreamingListenerStreamingStarted streamingStarted) {}

306

public void onBatchSubmitted(StreamingListenerBatchSubmitted batchSubmitted) {}

307

public void onBatchStarted(StreamingListenerBatchStarted batchStarted) {}

308

public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {}

309

public void onOutputOperationStarted(StreamingListenerOutputOperationStarted outputOperationStarted) {}

310

public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted outputOperationCompleted) {}

311

public void onReceiverStarted(StreamingListenerReceiverStarted receiverStarted) {}

312

public void onReceiverError(StreamingListenerReceiverError receiverError) {}

313

public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {}

314

}

315

316

/**

317

* Wrapper that converts Java listeners to Scala listeners

318

*/

319

class JavaStreamingListenerWrapper(javaStreamingListener: JavaStreamingListener) extends StreamingListener

320

```

321

322

**Java Usage Examples:**

323

324

```java

325

// Custom Java listener

326

class MyJavaStreamingListener extends JavaStreamingListener {

327

@Override

328

public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {

329

BatchInfo batchInfo = batchCompleted.batchInfo();

330

long processingDelay = batchInfo.processingDelay();

331

long numRecords = batchInfo.numRecords();

332

333

System.out.println(String.format(

334

"Batch completed: %d records processed in %dms",

335

numRecords, processingDelay

336

));

337

}

338

339

@Override

340

public void onReceiverError(StreamingListenerReceiverError receiverError) {

341

ReceiverInfo info = receiverError.receiverInfo();

342

System.err.println("Receiver error: " + info.lastErrorMessage().orElse("Unknown error"));

343

}

344

}

345

346

// Add to Java streaming context

347

JavaStreamingListener listener = new MyJavaStreamingListener();

348

jssc.addStreamingListener(listener);

349

```

350

351

## Advanced Monitoring Patterns

352

353

### Performance Monitoring

354

355

Track key performance metrics and identify bottlenecks:

356

357

```scala

358

class PerformanceMonitoringListener extends StreamingListener {

359

private val batchMetrics = scala.collection.mutable.ArrayBuffer[BatchMetrics]()

360

361

case class BatchMetrics(

362

batchTime: Time,

363

schedulingDelay: Long,

364

processingTime: Long,

365

totalDelay: Long,

366

numRecords: Long,

367

recordsPerSecond: Double

368

)

369

370

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {

371

val batch = batchCompleted.batchInfo

372

val processingTime = batch.processingDelay

373

val recordsPerSecond = if (processingTime > 0) batch.numRecords * 1000.0 / processingTime else 0.0

374

375

val metrics = BatchMetrics(

376

batch.batchTime,

377

batch.schedulingDelay,

378

processingTime,

379

batch.totalDelay,

380

batch.numRecords,

381

recordsPerSecond

382

)

383

384

batchMetrics += metrics

385

386

// Keep only recent metrics

387

if (batchMetrics.size > 100) {

388

batchMetrics.remove(0)

389

}

390

391

// Check for performance issues

392

analyzePerformance(metrics)

393

}

394

395

private def analyzePerformance(metrics: BatchMetrics): Unit = {

396

// Alert on high scheduling delay

397

if (metrics.schedulingDelay > 1000) {

398

println(s"HIGH SCHEDULING DELAY: ${metrics.schedulingDelay}ms at ${metrics.batchTime}")

399

}

400

401

// Alert on low throughput

402

if (metrics.recordsPerSecond < 100) {

403

println(s"LOW THROUGHPUT: ${metrics.recordsPerSecond} records/sec at ${metrics.batchTime}")

404

}

405

406

// Calculate moving averages for trend analysis

407

if (batchMetrics.size >= 10) {

408

val recent = batchMetrics.takeRight(10)

409

val avgProcessingTime = recent.map(_.processingTime).sum / recent.size

410

val avgThroughput = recent.map(_.recordsPerSecond).sum / recent.size

411

412

println(s"Recent averages: ${avgProcessingTime}ms processing, ${avgThroughput} records/sec")

413

}

414

}

415

}

416

```

417

418

### Error Tracking and Alerting

419

420

Monitor for errors and failures across the streaming application:

421

422

```scala

423

class ErrorTrackingListener extends StreamingListener {

424

private val errorCounts = scala.collection.mutable.Map[String, Int]()

425

private val recentErrors = scala.collection.mutable.Queue[ErrorEvent]()

426

427

case class ErrorEvent(timestamp: Long, source: String, message: String)

428

429

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {

430

val batch = batchCompleted.batchInfo

431

432

// Check for failed output operations

433

batch.outputOperationInfos.values.foreach { opInfo =>

434

opInfo.failureReason.foreach { reason =>

435

recordError("OutputOperation", s"${opInfo.name}: $reason")

436

}

437

}

438

}

439

440

override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {

441

val receiver = receiverError.receiverInfo

442

val errorMsg = receiver.lastErrorMessage.getOrElse("Unknown receiver error")

443

recordError("Receiver", s"Stream ${receiver.streamId}: $errorMsg")

444

}

445

446

private def recordError(source: String, message: String): Unit = {

447

val errorEvent = ErrorEvent(System.currentTimeMillis(), source, message)

448

449

// Track error counts by source

450

errorCounts(source) = errorCounts.getOrElse(source, 0) + 1

451

452

// Keep recent errors for analysis

453

recentErrors.enqueue(errorEvent)

454

if (recentErrors.size > 50) {

455

recentErrors.dequeue()

456

}

457

458

// Alert on error patterns

459

checkErrorPatterns()

460

461

println(s"ERROR [$source]: $message")

462

}

463

464

private def checkErrorPatterns(): Unit = {

465

val now = System.currentTimeMillis()

466

val recentWindow = now - 60000 // Last minute

467

468

val recentErrorCount = recentErrors.count(_.timestamp > recentWindow)

469

470

if (recentErrorCount > 5) {

471

println(s"ALERT: $recentErrorCount errors in the last minute!")

472

}

473

}

474

}

475

```

476

477

### Metrics Integration

478

479

Integration with external monitoring systems:

480

481

```scala

482

class MetricsIntegrationListener(metricsReporter: MetricsReporter) extends StreamingListener {

483

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {

484

val batch = batchCompleted.batchInfo

485

486

// Send metrics to external system

487

metricsReporter.gauge("streaming.batch.processing_delay", batch.processingDelay)

488

metricsReporter.gauge("streaming.batch.scheduling_delay", batch.schedulingDelay)

489

metricsReporter.gauge("streaming.batch.total_delay", batch.totalDelay)

490

metricsReporter.gauge("streaming.batch.num_records", batch.numRecords)

491

492

// Calculate throughput

493

if (batch.processingDelay > 0) {

494

val throughput = batch.numRecords * 1000.0 / batch.processingDelay

495

metricsReporter.gauge("streaming.batch.throughput", throughput)

496

}

497

498

// Track batch success/failure

499

val hasFailures = batch.outputOperationInfos.values.exists(_.failureReason.isDefined)

500

metricsReporter.counter("streaming.batch.completed").increment()

501

if (hasFailures) {

502

metricsReporter.counter("streaming.batch.failed").increment()

503

}

504

}

505

506

override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {

507

metricsReporter.counter("streaming.receiver.errors").increment()

508

}

509

}

510

```

511

512

## Event System Architecture

513

514

The streaming listener system provides:

515

516

- **Real-time Monitoring**: Events are fired immediately as operations complete

517

- **Comprehensive Coverage**: All major streaming operations generate events

518

- **Thread Safety**: Listeners are called from a single thread in order

519

- **Error Isolation**: Listener exceptions don't affect streaming processing

520

- **Historical Data**: BatchInfo and other objects provide historical context

521

522

This enables building sophisticated monitoring, alerting, and analytics systems on top of Spark Streaming applications.