0
# Event Monitoring
1
2
Event monitoring in Spark Streaming provides detailed insights into streaming application performance, batch processing, receiver status, and output operations through a comprehensive listener system. This enables real-time monitoring, debugging, and performance optimization.
3
4
## Capabilities
5
6
### StreamingListener Interface
7
8
Core interface for receiving all streaming-related events during application execution.
9
10
```scala { .api }
11
/**
12
* Base trait for receiving streaming system events
13
*/
14
trait StreamingListener {
15
/**
16
* Called when streaming context starts
17
* @param streamingStarted - Event containing start time information
18
*/
19
def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit = {}
20
21
/**
22
* Called when a batch is submitted for processing
23
* @param batchSubmitted - Event containing batch submission details
24
*/
25
def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {}
26
27
/**
28
* Called when batch processing starts
29
* @param batchStarted - Event containing batch start details
30
*/
31
def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {}
32
33
/**
34
* Called when batch processing completes
35
* @param batchCompleted - Event containing batch completion details and metrics
36
*/
37
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {}
38
39
/**
40
* Called when an output operation starts
41
* @param outputOperationStarted - Event containing output operation start details
42
*/
43
def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = {}
44
45
/**
46
* Called when an output operation completes
47
* @param outputOperationCompleted - Event containing output operation completion details
48
*/
49
def onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {}
50
51
/**
52
* Called when a receiver starts
53
* @param receiverStarted - Event containing receiver start information
54
*/
55
def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {}
56
57
/**
58
* Called when a receiver encounters an error
59
* @param receiverError - Event containing receiver error details
60
*/
61
def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {}
62
63
/**
64
* Called when a receiver stops
65
* @param receiverStopped - Event containing receiver stop information
66
*/
67
def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = {}
68
}
69
```
70
71
### Streaming Event Types
72
73
Detailed event objects providing comprehensive information about streaming operations.
74
75
```scala { .api }
76
/**
77
* Base trait for all streaming events
78
*/
79
sealed trait StreamingListenerEvent
80
81
/**
82
* Event fired when streaming context starts
83
* @param time - Time when streaming started
84
*/
85
case class StreamingListenerStreamingStarted(time: Long) extends StreamingListenerEvent
86
87
/**
88
* Event fired when a batch is submitted
89
* @param batchInfo - Information about the submitted batch
90
*/
91
case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent
92
93
/**
94
* Event fired when batch processing starts
95
* @param batchInfo - Information about the batch being processed
96
*/
97
case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
98
99
/**
100
* Event fired when batch processing completes
101
* @param batchInfo - Complete information about the processed batch including metrics
102
*/
103
case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
104
105
/**
106
* Event fired when an output operation starts
107
* @param outputOperationInfo - Information about the output operation
108
*/
109
case class StreamingListenerOutputOperationStarted(outputOperationInfo: OutputOperationInfo) extends StreamingListenerEvent
110
111
/**
112
* Event fired when an output operation completes
113
* @param outputOperationInfo - Complete information about the output operation
114
*/
115
case class StreamingListenerOutputOperationCompleted(outputOperationInfo: OutputOperationInfo) extends StreamingListenerEvent
116
117
/**
118
* Event fired when a receiver starts
119
* @param receiverInfo - Information about the started receiver
120
*/
121
case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo) extends StreamingListenerEvent
122
123
/**
124
* Event fired when a receiver encounters an error
125
* @param receiverInfo - Information about the receiver that errored
126
*/
127
case class StreamingListenerReceiverError(receiverInfo: ReceiverInfo) extends StreamingListenerEvent
128
129
/**
130
* Event fired when a receiver stops
131
* @param receiverInfo - Information about the stopped receiver
132
*/
133
case class StreamingListenerReceiverStopped(receiverInfo: ReceiverInfo) extends StreamingListenerEvent
134
```
135
136
### Batch Information
137
138
Detailed information about batch processing including timing and performance metrics.
139
140
```scala { .api }
141
/**
142
* Information about a batch of streaming data
143
* @param batchTime - Time of the batch
144
* @param submissionTime - When the batch was submitted for processing
145
* @param processingStartTime - When batch processing actually started (optional)
146
* @param processingEndTime - When batch processing completed (optional)
147
* @param streamIdToInputInfo - Map of input stream IDs to input information
148
* @param outputOperationInfos - Information about output operations in this batch
149
*/
150
case class BatchInfo(
151
batchTime: Time,
152
streamIdToInputInfo: Map[Int, StreamInputInfo],
153
submissionTime: Long,
154
processingStartTime: Option[Long],
155
processingEndTime: Option[Long],
156
outputOperationInfos: Map[Int, OutputOperationInfo]
157
) {
158
/**
159
* Get total processing delay for this batch
160
* @returns Processing delay in milliseconds, or -1 if not available
161
*/
162
def processingDelay: Long
163
164
/**
165
* Get scheduling delay for this batch
166
* @returns Scheduling delay in milliseconds, or -1 if not available
167
*/
168
def schedulingDelay: Long
169
170
/**
171
* Get total delay for this batch
172
* @returns Total delay in milliseconds, or -1 if not available
173
*/
174
def totalDelay: Long
175
176
/**
177
* Get number of records processed in this batch
178
* @returns Total number of input records
179
*/
180
def numRecords: Long
181
}
182
```
183
184
### Output Operation Information
185
186
Details about individual output operations within batches.
187
188
```scala { .api }
189
/**
190
* Information about an output operation
191
* @param id - Unique identifier for the output operation
192
* @param name - Human-readable name of the operation
193
* @param description - Detailed description of the operation
194
* @param startTime - When the operation started (optional)
195
* @param endTime - When the operation completed (optional)
196
* @param failureReason - Reason for failure if operation failed (optional)
197
*/
198
case class OutputOperationInfo(
199
batchTime: Time,
200
id: Int,
201
name: String,
202
description: String,
203
startTime: Option[Long],
204
endTime: Option[Long],
205
failureReason: Option[String]
206
) {
207
/**
208
* Get duration of this output operation
209
* @returns Duration in milliseconds, or -1 if not available
210
*/
211
def duration: Long
212
}
213
```
214
215
### Receiver Information
216
217
Information about streaming data receivers including status and error details.
218
219
```scala { .api }
220
/**
221
* Information about stream data receivers
222
* @param streamId - ID of the input stream
223
* @param name - Name of the receiver
224
* @param active - Whether the receiver is currently active
225
* @param executorId - ID of executor running the receiver
226
* @param lastErrorMessage - Last error message from receiver (optional)
227
* @param lastError - Last error exception from receiver (optional)
228
* @param lastErrorTime - Time of last error (optional)
229
*/
230
case class ReceiverInfo(
231
streamId: Int,
232
name: String,
233
active: Boolean,
234
executorId: String,
235
lastErrorMessage: Option[String] = None,
236
lastError: Option[String] = None,
237
lastErrorTime: Option[Long] = None
238
)
239
```
240
241
### Built-in Listener Implementations
242
243
Pre-built listeners for common monitoring scenarios.
244
245
```scala { .api }
246
/**
247
* Listener that logs summary statistics about batches
248
* @param numBatchInfos - Number of recent batches to track for statistics
249
*/
250
class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener {
251
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit
252
253
/**
254
* Print current statistics summary
255
*/
256
def printStats(): Unit
257
}
258
```
259
260
**Usage Examples:**
261
262
```scala
263
// Custom listener implementation
264
class CustomStreamingListener extends StreamingListener {
265
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
266
val batchInfo = batchCompleted.batchInfo
267
val processingTime = batchInfo.processingDelay
268
val schedulingDelay = batchInfo.schedulingDelay
269
val numRecords = batchInfo.numRecords
270
271
println(s"Batch ${batchInfo.batchTime}: " +
272
s"processed $numRecords records in ${processingTime}ms " +
273
s"(scheduling delay: ${schedulingDelay}ms)")
274
275
// Alert on high processing delay
276
if (processingTime > 5000) {
277
println(s"WARNING: High processing delay detected: ${processingTime}ms")
278
}
279
}
280
281
override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
282
val receiverInfo = receiverError.receiverInfo
283
println(s"Receiver error on stream ${receiverInfo.streamId}: ${receiverInfo.lastErrorMessage}")
284
}
285
}
286
287
// Add listener to streaming context
288
val customListener = new CustomStreamingListener()
289
ssc.addStreamingListener(customListener)
290
291
// Use built-in stats listener
292
val statsListener = new StatsReportListener(20)
293
ssc.addStreamingListener(statsListener)
294
```
295
296
### Java API for Listeners
297
298
Java-friendly listener interface for Java applications.
299
300
```java { .api }
301
/**
302
* Abstract base class for Java streaming listeners
303
*/
304
public abstract class JavaStreamingListener {
305
public void onStreamingStarted(StreamingListenerStreamingStarted streamingStarted) {}
306
public void onBatchSubmitted(StreamingListenerBatchSubmitted batchSubmitted) {}
307
public void onBatchStarted(StreamingListenerBatchStarted batchStarted) {}
308
public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {}
309
public void onOutputOperationStarted(StreamingListenerOutputOperationStarted outputOperationStarted) {}
310
public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted outputOperationCompleted) {}
311
public void onReceiverStarted(StreamingListenerReceiverStarted receiverStarted) {}
312
public void onReceiverError(StreamingListenerReceiverError receiverError) {}
313
public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {}
314
}
315
316
/**
317
* Wrapper that converts Java listeners to Scala listeners
318
*/
319
class JavaStreamingListenerWrapper(javaStreamingListener: JavaStreamingListener) extends StreamingListener
320
```
321
322
**Java Usage Examples:**
323
324
```java
325
// Custom Java listener
326
class MyJavaStreamingListener extends JavaStreamingListener {
327
@Override
328
public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
329
BatchInfo batchInfo = batchCompleted.batchInfo();
330
long processingDelay = batchInfo.processingDelay();
331
long numRecords = batchInfo.numRecords();
332
333
System.out.println(String.format(
334
"Batch completed: %d records processed in %dms",
335
numRecords, processingDelay
336
));
337
}
338
339
@Override
340
public void onReceiverError(StreamingListenerReceiverError receiverError) {
341
ReceiverInfo info = receiverError.receiverInfo();
342
System.err.println("Receiver error: " + info.lastErrorMessage().orElse("Unknown error"));
343
}
344
}
345
346
// Add to Java streaming context
347
JavaStreamingListener listener = new MyJavaStreamingListener();
348
jssc.addStreamingListener(listener);
349
```
350
351
## Advanced Monitoring Patterns
352
353
### Performance Monitoring
354
355
Track key performance metrics and identify bottlenecks:
356
357
```scala
358
class PerformanceMonitoringListener extends StreamingListener {
359
private val batchMetrics = scala.collection.mutable.ArrayBuffer[BatchMetrics]()
360
361
case class BatchMetrics(
362
batchTime: Time,
363
schedulingDelay: Long,
364
processingTime: Long,
365
totalDelay: Long,
366
numRecords: Long,
367
recordsPerSecond: Double
368
)
369
370
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
371
val batch = batchCompleted.batchInfo
372
val processingTime = batch.processingDelay
373
val recordsPerSecond = if (processingTime > 0) batch.numRecords * 1000.0 / processingTime else 0.0
374
375
val metrics = BatchMetrics(
376
batch.batchTime,
377
batch.schedulingDelay,
378
processingTime,
379
batch.totalDelay,
380
batch.numRecords,
381
recordsPerSecond
382
)
383
384
batchMetrics += metrics
385
386
// Keep only recent metrics
387
if (batchMetrics.size > 100) {
388
batchMetrics.remove(0)
389
}
390
391
// Check for performance issues
392
analyzePerformance(metrics)
393
}
394
395
private def analyzePerformance(metrics: BatchMetrics): Unit = {
396
// Alert on high scheduling delay
397
if (metrics.schedulingDelay > 1000) {
398
println(s"HIGH SCHEDULING DELAY: ${metrics.schedulingDelay}ms at ${metrics.batchTime}")
399
}
400
401
// Alert on low throughput
402
if (metrics.recordsPerSecond < 100) {
403
println(s"LOW THROUGHPUT: ${metrics.recordsPerSecond} records/sec at ${metrics.batchTime}")
404
}
405
406
// Calculate moving averages for trend analysis
407
if (batchMetrics.size >= 10) {
408
val recent = batchMetrics.takeRight(10)
409
val avgProcessingTime = recent.map(_.processingTime).sum / recent.size
410
val avgThroughput = recent.map(_.recordsPerSecond).sum / recent.size
411
412
println(s"Recent averages: ${avgProcessingTime}ms processing, ${avgThroughput} records/sec")
413
}
414
}
415
}
416
```
417
418
### Error Tracking and Alerting
419
420
Monitor for errors and failures across the streaming application:
421
422
```scala
423
class ErrorTrackingListener extends StreamingListener {
424
private val errorCounts = scala.collection.mutable.Map[String, Int]()
425
private val recentErrors = scala.collection.mutable.Queue[ErrorEvent]()
426
427
case class ErrorEvent(timestamp: Long, source: String, message: String)
428
429
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
430
val batch = batchCompleted.batchInfo
431
432
// Check for failed output operations
433
batch.outputOperationInfos.values.foreach { opInfo =>
434
opInfo.failureReason.foreach { reason =>
435
recordError("OutputOperation", s"${opInfo.name}: $reason")
436
}
437
}
438
}
439
440
override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
441
val receiver = receiverError.receiverInfo
442
val errorMsg = receiver.lastErrorMessage.getOrElse("Unknown receiver error")
443
recordError("Receiver", s"Stream ${receiver.streamId}: $errorMsg")
444
}
445
446
private def recordError(source: String, message: String): Unit = {
447
val errorEvent = ErrorEvent(System.currentTimeMillis(), source, message)
448
449
// Track error counts by source
450
errorCounts(source) = errorCounts.getOrElse(source, 0) + 1
451
452
// Keep recent errors for analysis
453
recentErrors.enqueue(errorEvent)
454
if (recentErrors.size > 50) {
455
recentErrors.dequeue()
456
}
457
458
// Alert on error patterns
459
checkErrorPatterns()
460
461
println(s"ERROR [$source]: $message")
462
}
463
464
private def checkErrorPatterns(): Unit = {
465
val now = System.currentTimeMillis()
466
val recentWindow = now - 60000 // Last minute
467
468
val recentErrorCount = recentErrors.count(_.timestamp > recentWindow)
469
470
if (recentErrorCount > 5) {
471
println(s"ALERT: $recentErrorCount errors in the last minute!")
472
}
473
}
474
}
475
```
476
477
### Metrics Integration
478
479
Integration with external monitoring systems:
480
481
```scala
482
class MetricsIntegrationListener(metricsReporter: MetricsReporter) extends StreamingListener {
483
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
484
val batch = batchCompleted.batchInfo
485
486
// Send metrics to external system
487
metricsReporter.gauge("streaming.batch.processing_delay", batch.processingDelay)
488
metricsReporter.gauge("streaming.batch.scheduling_delay", batch.schedulingDelay)
489
metricsReporter.gauge("streaming.batch.total_delay", batch.totalDelay)
490
metricsReporter.gauge("streaming.batch.num_records", batch.numRecords)
491
492
// Calculate throughput
493
if (batch.processingDelay > 0) {
494
val throughput = batch.numRecords * 1000.0 / batch.processingDelay
495
metricsReporter.gauge("streaming.batch.throughput", throughput)
496
}
497
498
// Track batch success/failure
499
val hasFailures = batch.outputOperationInfos.values.exists(_.failureReason.isDefined)
500
metricsReporter.counter("streaming.batch.completed").increment()
501
if (hasFailures) {
502
metricsReporter.counter("streaming.batch.failed").increment()
503
}
504
}
505
506
override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
507
metricsReporter.counter("streaming.receiver.errors").increment()
508
}
509
}
510
```
511
512
## Event System Architecture
513
514
The streaming listener system provides:
515
516
- **Real-time Monitoring**: Events are fired immediately as operations complete
517
- **Comprehensive Coverage**: All major streaming operations generate events
518
- **Thread Safety**: Listeners are called from a single thread in order
519
- **Error Isolation**: Listener exceptions don't affect streaming processing
520
- **Historical Data**: BatchInfo and other objects provide historical context
521
522
This enables building sophisticated monitoring, alerting, and analytics systems on top of Spark Streaming applications.