Event monitoring in Spark Streaming provides detailed insights into streaming application performance, batch processing, receiver status, and output operations through a comprehensive listener system. This enables real-time monitoring, debugging, and performance optimization.
Core interface for receiving all streaming-related events during application execution.
/**
* Base trait for receiving streaming system events
*/
trait StreamingListener {
/**
* Called when streaming context starts
* @param streamingStarted - Event containing start time information
*/
def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit = {}
/**
* Called when a batch is submitted for processing
* @param batchSubmitted - Event containing batch submission details
*/
def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {}
/**
* Called when batch processing starts
* @param batchStarted - Event containing batch start details
*/
def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {}
/**
* Called when batch processing completes
* @param batchCompleted - Event containing batch completion details and metrics
*/
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {}
/**
* Called when an output operation starts
* @param outputOperationStarted - Event containing output operation start details
*/
def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = {}
/**
* Called when an output operation completes
* @param outputOperationCompleted - Event containing output operation completion details
*/
def onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {}
/**
* Called when a receiver starts
* @param receiverStarted - Event containing receiver start information
*/
def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {}
/**
* Called when a receiver encounters an error
* @param receiverError - Event containing receiver error details
*/
def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {}
/**
* Called when a receiver stops
* @param receiverStopped - Event containing receiver stop information
*/
def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = {}
}Detailed event objects providing comprehensive information about streaming operations.
/**
* Base trait for all streaming events
*/
sealed trait StreamingListenerEvent
/**
* Event fired when streaming context starts
* @param time - Time when streaming started
*/
case class StreamingListenerStreamingStarted(time: Long) extends StreamingListenerEvent
/**
* Event fired when a batch is submitted
* @param batchInfo - Information about the submitted batch
*/
case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent
/**
* Event fired when batch processing starts
* @param batchInfo - Information about the batch being processed
*/
case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
/**
* Event fired when batch processing completes
* @param batchInfo - Complete information about the processed batch including metrics
*/
case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
/**
* Event fired when an output operation starts
* @param outputOperationInfo - Information about the output operation
*/
case class StreamingListenerOutputOperationStarted(outputOperationInfo: OutputOperationInfo) extends StreamingListenerEvent
/**
* Event fired when an output operation completes
* @param outputOperationInfo - Complete information about the output operation
*/
case class StreamingListenerOutputOperationCompleted(outputOperationInfo: OutputOperationInfo) extends StreamingListenerEvent
/**
* Event fired when a receiver starts
* @param receiverInfo - Information about the started receiver
*/
case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo) extends StreamingListenerEvent
/**
* Event fired when a receiver encounters an error
* @param receiverInfo - Information about the receiver that errored
*/
case class StreamingListenerReceiverError(receiverInfo: ReceiverInfo) extends StreamingListenerEvent
/**
* Event fired when a receiver stops
* @param receiverInfo - Information about the stopped receiver
*/
case class StreamingListenerReceiverStopped(receiverInfo: ReceiverInfo) extends StreamingListenerEventDetailed information about batch processing including timing and performance metrics.
/**
* Information about a batch of streaming data
* @param batchTime - Time of the batch
* @param submissionTime - When the batch was submitted for processing
* @param processingStartTime - When batch processing actually started (optional)
* @param processingEndTime - When batch processing completed (optional)
* @param streamIdToInputInfo - Map of input stream IDs to input information
* @param outputOperationInfos - Information about output operations in this batch
*/
case class BatchInfo(
batchTime: Time,
streamIdToInputInfo: Map[Int, StreamInputInfo],
submissionTime: Long,
processingStartTime: Option[Long],
processingEndTime: Option[Long],
outputOperationInfos: Map[Int, OutputOperationInfo]
) {
/**
* Get total processing delay for this batch
* @returns Processing delay in milliseconds, or -1 if not available
*/
def processingDelay: Long
/**
* Get scheduling delay for this batch
* @returns Scheduling delay in milliseconds, or -1 if not available
*/
def schedulingDelay: Long
/**
* Get total delay for this batch
* @returns Total delay in milliseconds, or -1 if not available
*/
def totalDelay: Long
/**
* Get number of records processed in this batch
* @returns Total number of input records
*/
def numRecords: Long
}Details about individual output operations within batches.
/**
* Information about an output operation
* @param id - Unique identifier for the output operation
* @param name - Human-readable name of the operation
* @param description - Detailed description of the operation
* @param startTime - When the operation started (optional)
* @param endTime - When the operation completed (optional)
* @param failureReason - Reason for failure if operation failed (optional)
*/
case class OutputOperationInfo(
batchTime: Time,
id: Int,
name: String,
description: String,
startTime: Option[Long],
endTime: Option[Long],
failureReason: Option[String]
) {
/**
* Get duration of this output operation
* @returns Duration in milliseconds, or -1 if not available
*/
def duration: Long
}Information about streaming data receivers including status and error details.
/**
* Information about stream data receivers
* @param streamId - ID of the input stream
* @param name - Name of the receiver
* @param active - Whether the receiver is currently active
* @param executorId - ID of executor running the receiver
* @param lastErrorMessage - Last error message from receiver (optional)
* @param lastError - Last error exception from receiver (optional)
* @param lastErrorTime - Time of last error (optional)
*/
case class ReceiverInfo(
streamId: Int,
name: String,
active: Boolean,
executorId: String,
lastErrorMessage: Option[String] = None,
lastError: Option[String] = None,
lastErrorTime: Option[Long] = None
)Pre-built listeners for common monitoring scenarios.
/**
* Listener that logs summary statistics about batches
* @param numBatchInfos - Number of recent batches to track for statistics
*/
class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener {
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit
/**
* Print current statistics summary
*/
def printStats(): Unit
}Usage Examples:
// Custom listener implementation
class CustomStreamingListener extends StreamingListener {
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
val batchInfo = batchCompleted.batchInfo
val processingTime = batchInfo.processingDelay
val schedulingDelay = batchInfo.schedulingDelay
val numRecords = batchInfo.numRecords
println(s"Batch ${batchInfo.batchTime}: " +
s"processed $numRecords records in ${processingTime}ms " +
s"(scheduling delay: ${schedulingDelay}ms)")
// Alert on high processing delay
if (processingTime > 5000) {
println(s"WARNING: High processing delay detected: ${processingTime}ms")
}
}
override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
val receiverInfo = receiverError.receiverInfo
println(s"Receiver error on stream ${receiverInfo.streamId}: ${receiverInfo.lastErrorMessage}")
}
}
// Add listener to streaming context
val customListener = new CustomStreamingListener()
ssc.addStreamingListener(customListener)
// Use built-in stats listener
val statsListener = new StatsReportListener(20)
ssc.addStreamingListener(statsListener)Java-friendly listener interface for Java applications.
/**
* Abstract base class for Java streaming listeners
*/
public abstract class JavaStreamingListener {
public void onStreamingStarted(StreamingListenerStreamingStarted streamingStarted) {}
public void onBatchSubmitted(StreamingListenerBatchSubmitted batchSubmitted) {}
public void onBatchStarted(StreamingListenerBatchStarted batchStarted) {}
public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {}
public void onOutputOperationStarted(StreamingListenerOutputOperationStarted outputOperationStarted) {}
public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted outputOperationCompleted) {}
public void onReceiverStarted(StreamingListenerReceiverStarted receiverStarted) {}
public void onReceiverError(StreamingListenerReceiverError receiverError) {}
public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {}
}
/**
* Wrapper that converts Java listeners to Scala listeners
*/
class JavaStreamingListenerWrapper(javaStreamingListener: JavaStreamingListener) extends StreamingListenerJava Usage Examples:
// Custom Java listener
class MyJavaStreamingListener extends JavaStreamingListener {
@Override
public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
BatchInfo batchInfo = batchCompleted.batchInfo();
long processingDelay = batchInfo.processingDelay();
long numRecords = batchInfo.numRecords();
System.out.println(String.format(
"Batch completed: %d records processed in %dms",
numRecords, processingDelay
));
}
@Override
public void onReceiverError(StreamingListenerReceiverError receiverError) {
ReceiverInfo info = receiverError.receiverInfo();
System.err.println("Receiver error: " + info.lastErrorMessage().orElse("Unknown error"));
}
}
// Add to Java streaming context
JavaStreamingListener listener = new MyJavaStreamingListener();
jssc.addStreamingListener(listener);Track key performance metrics and identify bottlenecks:
class PerformanceMonitoringListener extends StreamingListener {
private val batchMetrics = scala.collection.mutable.ArrayBuffer[BatchMetrics]()
case class BatchMetrics(
batchTime: Time,
schedulingDelay: Long,
processingTime: Long,
totalDelay: Long,
numRecords: Long,
recordsPerSecond: Double
)
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
val batch = batchCompleted.batchInfo
val processingTime = batch.processingDelay
val recordsPerSecond = if (processingTime > 0) batch.numRecords * 1000.0 / processingTime else 0.0
val metrics = BatchMetrics(
batch.batchTime,
batch.schedulingDelay,
processingTime,
batch.totalDelay,
batch.numRecords,
recordsPerSecond
)
batchMetrics += metrics
// Keep only recent metrics
if (batchMetrics.size > 100) {
batchMetrics.remove(0)
}
// Check for performance issues
analyzePerformance(metrics)
}
private def analyzePerformance(metrics: BatchMetrics): Unit = {
// Alert on high scheduling delay
if (metrics.schedulingDelay > 1000) {
println(s"HIGH SCHEDULING DELAY: ${metrics.schedulingDelay}ms at ${metrics.batchTime}")
}
// Alert on low throughput
if (metrics.recordsPerSecond < 100) {
println(s"LOW THROUGHPUT: ${metrics.recordsPerSecond} records/sec at ${metrics.batchTime}")
}
// Calculate moving averages for trend analysis
if (batchMetrics.size >= 10) {
val recent = batchMetrics.takeRight(10)
val avgProcessingTime = recent.map(_.processingTime).sum / recent.size
val avgThroughput = recent.map(_.recordsPerSecond).sum / recent.size
println(s"Recent averages: ${avgProcessingTime}ms processing, ${avgThroughput} records/sec")
}
}
}Monitor for errors and failures across the streaming application:
class ErrorTrackingListener extends StreamingListener {
private val errorCounts = scala.collection.mutable.Map[String, Int]()
private val recentErrors = scala.collection.mutable.Queue[ErrorEvent]()
case class ErrorEvent(timestamp: Long, source: String, message: String)
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
val batch = batchCompleted.batchInfo
// Check for failed output operations
batch.outputOperationInfos.values.foreach { opInfo =>
opInfo.failureReason.foreach { reason =>
recordError("OutputOperation", s"${opInfo.name}: $reason")
}
}
}
override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
val receiver = receiverError.receiverInfo
val errorMsg = receiver.lastErrorMessage.getOrElse("Unknown receiver error")
recordError("Receiver", s"Stream ${receiver.streamId}: $errorMsg")
}
private def recordError(source: String, message: String): Unit = {
val errorEvent = ErrorEvent(System.currentTimeMillis(), source, message)
// Track error counts by source
errorCounts(source) = errorCounts.getOrElse(source, 0) + 1
// Keep recent errors for analysis
recentErrors.enqueue(errorEvent)
if (recentErrors.size > 50) {
recentErrors.dequeue()
}
// Alert on error patterns
checkErrorPatterns()
println(s"ERROR [$source]: $message")
}
private def checkErrorPatterns(): Unit = {
val now = System.currentTimeMillis()
val recentWindow = now - 60000 // Last minute
val recentErrorCount = recentErrors.count(_.timestamp > recentWindow)
if (recentErrorCount > 5) {
println(s"ALERT: $recentErrorCount errors in the last minute!")
}
}
}Integration with external monitoring systems:
class MetricsIntegrationListener(metricsReporter: MetricsReporter) extends StreamingListener {
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
val batch = batchCompleted.batchInfo
// Send metrics to external system
metricsReporter.gauge("streaming.batch.processing_delay", batch.processingDelay)
metricsReporter.gauge("streaming.batch.scheduling_delay", batch.schedulingDelay)
metricsReporter.gauge("streaming.batch.total_delay", batch.totalDelay)
metricsReporter.gauge("streaming.batch.num_records", batch.numRecords)
// Calculate throughput
if (batch.processingDelay > 0) {
val throughput = batch.numRecords * 1000.0 / batch.processingDelay
metricsReporter.gauge("streaming.batch.throughput", throughput)
}
// Track batch success/failure
val hasFailures = batch.outputOperationInfos.values.exists(_.failureReason.isDefined)
metricsReporter.counter("streaming.batch.completed").increment()
if (hasFailures) {
metricsReporter.counter("streaming.batch.failed").increment()
}
}
override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
metricsReporter.counter("streaming.receiver.errors").increment()
}
}The streaming listener system provides:
This enables building sophisticated monitoring, alerting, and analytics systems on top of Spark Streaming applications.