or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-streams.mdevent-monitoring.mdindex.mdinput-sources.mdjava-api.mdkey-value-ops.mdstate-management.mdstreaming-context.mdwindow-ops.md
tile.json

event-monitoring.mddocs/

Event Monitoring

Event monitoring in Spark Streaming provides detailed insights into streaming application performance, batch processing, receiver status, and output operations through a comprehensive listener system. This enables real-time monitoring, debugging, and performance optimization.

Capabilities

StreamingListener Interface

Core interface for receiving all streaming-related events during application execution.

/**
 * Base trait for receiving streaming system events
 */
trait StreamingListener {
  /**
   * Called when streaming context starts
   * @param streamingStarted - Event containing start time information
   */
  def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit = {}
  
  /**
   * Called when a batch is submitted for processing
   * @param batchSubmitted - Event containing batch submission details
   */
  def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {}
  
  /**
   * Called when batch processing starts
   * @param batchStarted - Event containing batch start details
   */
  def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {}
  
  /**
   * Called when batch processing completes
   * @param batchCompleted - Event containing batch completion details and metrics
   */
  def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {}
  
  /**
   * Called when an output operation starts
   * @param outputOperationStarted - Event containing output operation start details
   */
  def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = {}
  
  /**
   * Called when an output operation completes
   * @param outputOperationCompleted - Event containing output operation completion details
   */
  def onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {}
  
  /**
   * Called when a receiver starts
   * @param receiverStarted - Event containing receiver start information
   */
  def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {}
  
  /**
   * Called when a receiver encounters an error
   * @param receiverError - Event containing receiver error details
   */
  def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {}
  
  /**
   * Called when a receiver stops
   * @param receiverStopped - Event containing receiver stop information
   */
  def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = {}
}

Streaming Event Types

Detailed event objects providing comprehensive information about streaming operations.

/**
 * Base trait for all streaming events
 */
sealed trait StreamingListenerEvent

/**
 * Event fired when streaming context starts
 * @param time - Time when streaming started
 */
case class StreamingListenerStreamingStarted(time: Long) extends StreamingListenerEvent

/**
 * Event fired when a batch is submitted
 * @param batchInfo - Information about the submitted batch
 */
case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent

/**
 * Event fired when batch processing starts
 * @param batchInfo - Information about the batch being processed
 */
case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent

/**
 * Event fired when batch processing completes
 * @param batchInfo - Complete information about the processed batch including metrics
 */
case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent

/**
 * Event fired when an output operation starts
 * @param outputOperationInfo - Information about the output operation
 */
case class StreamingListenerOutputOperationStarted(outputOperationInfo: OutputOperationInfo) extends StreamingListenerEvent

/**
 * Event fired when an output operation completes
 * @param outputOperationInfo - Complete information about the output operation
 */
case class StreamingListenerOutputOperationCompleted(outputOperationInfo: OutputOperationInfo) extends StreamingListenerEvent

/**
 * Event fired when a receiver starts
 * @param receiverInfo - Information about the started receiver
 */
case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo) extends StreamingListenerEvent

/**
 * Event fired when a receiver encounters an error
 * @param receiverInfo - Information about the receiver that errored
 */
case class StreamingListenerReceiverError(receiverInfo: ReceiverInfo) extends StreamingListenerEvent

/**
 * Event fired when a receiver stops
 * @param receiverInfo - Information about the stopped receiver
 */
case class StreamingListenerReceiverStopped(receiverInfo: ReceiverInfo) extends StreamingListenerEvent

Batch Information

Detailed information about batch processing including timing and performance metrics.

/**
 * Information about a batch of streaming data
 * @param batchTime - Time of the batch
 * @param submissionTime - When the batch was submitted for processing
 * @param processingStartTime - When batch processing actually started (optional)
 * @param processingEndTime - When batch processing completed (optional)
 * @param streamIdToInputInfo - Map of input stream IDs to input information
 * @param outputOperationInfos - Information about output operations in this batch
 */
case class BatchInfo(
  batchTime: Time,
  streamIdToInputInfo: Map[Int, StreamInputInfo],
  submissionTime: Long,
  processingStartTime: Option[Long],
  processingEndTime: Option[Long],
  outputOperationInfos: Map[Int, OutputOperationInfo]
) {
  /**
   * Get total processing delay for this batch
   * @returns Processing delay in milliseconds, or -1 if not available
   */
  def processingDelay: Long
  
  /**
   * Get scheduling delay for this batch  
   * @returns Scheduling delay in milliseconds, or -1 if not available
   */
  def schedulingDelay: Long
  
  /**
   * Get total delay for this batch
   * @returns Total delay in milliseconds, or -1 if not available
   */
  def totalDelay: Long
  
  /**
   * Get number of records processed in this batch
   * @returns Total number of input records
   */
  def numRecords: Long
}

Output Operation Information

Details about individual output operations within batches.

/**
 * Information about an output operation
 * @param id - Unique identifier for the output operation
 * @param name - Human-readable name of the operation
 * @param description - Detailed description of the operation
 * @param startTime - When the operation started (optional)
 * @param endTime - When the operation completed (optional)
 * @param failureReason - Reason for failure if operation failed (optional)
 */
case class OutputOperationInfo(
  batchTime: Time,
  id: Int,
  name: String,
  description: String,
  startTime: Option[Long],
  endTime: Option[Long],
  failureReason: Option[String]
) {
  /**
   * Get duration of this output operation
   * @returns Duration in milliseconds, or -1 if not available
   */
  def duration: Long
}

Receiver Information

Information about streaming data receivers including status and error details.

/**
 * Information about stream data receivers
 * @param streamId - ID of the input stream
 * @param name - Name of the receiver
 * @param active - Whether the receiver is currently active
 * @param executorId - ID of executor running the receiver
 * @param lastErrorMessage - Last error message from receiver (optional)
 * @param lastError - Last error exception from receiver (optional)
 * @param lastErrorTime - Time of last error (optional)
 */
case class ReceiverInfo(
  streamId: Int,
  name: String,
  active: Boolean,
  executorId: String,
  lastErrorMessage: Option[String] = None,
  lastError: Option[String] = None,
  lastErrorTime: Option[Long] = None
)

Built-in Listener Implementations

Pre-built listeners for common monitoring scenarios.

/**
 * Listener that logs summary statistics about batches
 * @param numBatchInfos - Number of recent batches to track for statistics
 */
class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener {
  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit
  
  /**
   * Print current statistics summary
   */
  def printStats(): Unit
}

Usage Examples:

// Custom listener implementation
class CustomStreamingListener extends StreamingListener {
  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    val batchInfo = batchCompleted.batchInfo
    val processingTime = batchInfo.processingDelay
    val schedulingDelay = batchInfo.schedulingDelay
    val numRecords = batchInfo.numRecords
    
    println(s"Batch ${batchInfo.batchTime}: " +
           s"processed $numRecords records in ${processingTime}ms " +
           s"(scheduling delay: ${schedulingDelay}ms)")
    
    // Alert on high processing delay
    if (processingTime > 5000) {
      println(s"WARNING: High processing delay detected: ${processingTime}ms")
    }
  }
  
  override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
    val receiverInfo = receiverError.receiverInfo
    println(s"Receiver error on stream ${receiverInfo.streamId}: ${receiverInfo.lastErrorMessage}")
  }
}

// Add listener to streaming context
val customListener = new CustomStreamingListener()
ssc.addStreamingListener(customListener)

// Use built-in stats listener
val statsListener = new StatsReportListener(20)
ssc.addStreamingListener(statsListener)

Java API for Listeners

Java-friendly listener interface for Java applications.

/**
 * Abstract base class for Java streaming listeners
 */
public abstract class JavaStreamingListener {
    public void onStreamingStarted(StreamingListenerStreamingStarted streamingStarted) {}
    public void onBatchSubmitted(StreamingListenerBatchSubmitted batchSubmitted) {}
    public void onBatchStarted(StreamingListenerBatchStarted batchStarted) {}
    public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {}
    public void onOutputOperationStarted(StreamingListenerOutputOperationStarted outputOperationStarted) {}
    public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted outputOperationCompleted) {}
    public void onReceiverStarted(StreamingListenerReceiverStarted receiverStarted) {}
    public void onReceiverError(StreamingListenerReceiverError receiverError) {}
    public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {}
}

/**
 * Wrapper that converts Java listeners to Scala listeners
 */
class JavaStreamingListenerWrapper(javaStreamingListener: JavaStreamingListener) extends StreamingListener

Java Usage Examples:

// Custom Java listener
class MyJavaStreamingListener extends JavaStreamingListener {
    @Override
    public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
        BatchInfo batchInfo = batchCompleted.batchInfo();
        long processingDelay = batchInfo.processingDelay();
        long numRecords = batchInfo.numRecords();
        
        System.out.println(String.format(
            "Batch completed: %d records processed in %dms", 
            numRecords, processingDelay
        ));
    }
    
    @Override
    public void onReceiverError(StreamingListenerReceiverError receiverError) {
        ReceiverInfo info = receiverError.receiverInfo();
        System.err.println("Receiver error: " + info.lastErrorMessage().orElse("Unknown error"));
    }
}

// Add to Java streaming context
JavaStreamingListener listener = new MyJavaStreamingListener();
jssc.addStreamingListener(listener);

Advanced Monitoring Patterns

Performance Monitoring

Track key performance metrics and identify bottlenecks:

class PerformanceMonitoringListener extends StreamingListener {
  private val batchMetrics = scala.collection.mutable.ArrayBuffer[BatchMetrics]()
  
  case class BatchMetrics(
    batchTime: Time,
    schedulingDelay: Long,
    processingTime: Long,
    totalDelay: Long,
    numRecords: Long,
    recordsPerSecond: Double
  )
  
  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    val batch = batchCompleted.batchInfo
    val processingTime = batch.processingDelay
    val recordsPerSecond = if (processingTime > 0) batch.numRecords * 1000.0 / processingTime else 0.0
    
    val metrics = BatchMetrics(
      batch.batchTime,
      batch.schedulingDelay,
      processingTime,
      batch.totalDelay,
      batch.numRecords,
      recordsPerSecond
    )
    
    batchMetrics += metrics
    
    // Keep only recent metrics
    if (batchMetrics.size > 100) {
      batchMetrics.remove(0)
    }
    
    // Check for performance issues
    analyzePerformance(metrics)
  }
  
  private def analyzePerformance(metrics: BatchMetrics): Unit = {
    // Alert on high scheduling delay
    if (metrics.schedulingDelay > 1000) {
      println(s"HIGH SCHEDULING DELAY: ${metrics.schedulingDelay}ms at ${metrics.batchTime}")
    }
    
    // Alert on low throughput
    if (metrics.recordsPerSecond < 100) {
      println(s"LOW THROUGHPUT: ${metrics.recordsPerSecond} records/sec at ${metrics.batchTime}")
    }
    
    // Calculate moving averages for trend analysis
    if (batchMetrics.size >= 10) {
      val recent = batchMetrics.takeRight(10)
      val avgProcessingTime = recent.map(_.processingTime).sum / recent.size
      val avgThroughput = recent.map(_.recordsPerSecond).sum / recent.size
      
      println(s"Recent averages: ${avgProcessingTime}ms processing, ${avgThroughput} records/sec")
    }
  }
}

Error Tracking and Alerting

Monitor for errors and failures across the streaming application:

class ErrorTrackingListener extends StreamingListener {
  private val errorCounts = scala.collection.mutable.Map[String, Int]()
  private val recentErrors = scala.collection.mutable.Queue[ErrorEvent]()
  
  case class ErrorEvent(timestamp: Long, source: String, message: String)
  
  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    val batch = batchCompleted.batchInfo
    
    // Check for failed output operations
    batch.outputOperationInfos.values.foreach { opInfo =>
      opInfo.failureReason.foreach { reason =>
        recordError("OutputOperation", s"${opInfo.name}: $reason")
      }
    }
  }
  
  override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
    val receiver = receiverError.receiverInfo
    val errorMsg = receiver.lastErrorMessage.getOrElse("Unknown receiver error")
    recordError("Receiver", s"Stream ${receiver.streamId}: $errorMsg")
  }
  
  private def recordError(source: String, message: String): Unit = {
    val errorEvent = ErrorEvent(System.currentTimeMillis(), source, message)
    
    // Track error counts by source
    errorCounts(source) = errorCounts.getOrElse(source, 0) + 1
    
    // Keep recent errors for analysis
    recentErrors.enqueue(errorEvent)
    if (recentErrors.size > 50) {
      recentErrors.dequeue()
    }
    
    // Alert on error patterns
    checkErrorPatterns()
    
    println(s"ERROR [$source]: $message")
  }
  
  private def checkErrorPatterns(): Unit = {
    val now = System.currentTimeMillis()
    val recentWindow = now - 60000 // Last minute
    
    val recentErrorCount = recentErrors.count(_.timestamp > recentWindow)
    
    if (recentErrorCount > 5) {
      println(s"ALERT: $recentErrorCount errors in the last minute!")
    }
  }
}

Metrics Integration

Integration with external monitoring systems:

class MetricsIntegrationListener(metricsReporter: MetricsReporter) extends StreamingListener {
  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    val batch = batchCompleted.batchInfo
    
    // Send metrics to external system
    metricsReporter.gauge("streaming.batch.processing_delay", batch.processingDelay)
    metricsReporter.gauge("streaming.batch.scheduling_delay", batch.schedulingDelay)
    metricsReporter.gauge("streaming.batch.total_delay", batch.totalDelay)
    metricsReporter.gauge("streaming.batch.num_records", batch.numRecords)
    
    // Calculate throughput
    if (batch.processingDelay > 0) {
      val throughput = batch.numRecords * 1000.0 / batch.processingDelay
      metricsReporter.gauge("streaming.batch.throughput", throughput)
    }
    
    // Track batch success/failure
    val hasFailures = batch.outputOperationInfos.values.exists(_.failureReason.isDefined)
    metricsReporter.counter("streaming.batch.completed").increment()
    if (hasFailures) {
      metricsReporter.counter("streaming.batch.failed").increment()
    }
  }
  
  override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
    metricsReporter.counter("streaming.receiver.errors").increment()
  }
}

Event System Architecture

The streaming listener system provides:

  • Real-time Monitoring: Events are fired immediately as operations complete
  • Comprehensive Coverage: All major streaming operations generate events
  • Thread Safety: Listeners are called from a single thread in order
  • Error Isolation: Listener exceptions don't affect streaming processing
  • Historical Data: BatchInfo and other objects provide historical context

This enables building sophisticated monitoring, alerting, and analytics systems on top of Spark Streaming applications.