CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-apache-spark

Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R

Pending
Overview
Eval results
Files

streaming.mddocs/

Spark Streaming

Spark Streaming enables scalable, high-throughput, fault-tolerant stream processing of live data streams. It ingests data from sources like Kafka, Flume, Twitter, and TCP sockets, and processes it using high-level functions like map, reduce, join, and window operations.

Core Concepts

Spark Streaming discretizes live data streams into micro-batches called DStreams (Discretized Streams). Each batch is processed as an RDD, enabling the use of Spark's batch processing APIs on streaming data.

StreamingContext

The main entry point for Spark Streaming functionality.

StreamingContext Class

class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) extends Logging {
  // Alternative constructors
  def this(conf: SparkConf, batchDuration: Duration)
  def this(master: String, appName: String, batchDuration: Duration, sparkHome: String = null, jars: Seq[String] = Nil, environment: Map[String, String] = Map())
  def this(path: String, hadoopConf: Configuration = new Configuration())
}

Creating StreamingContext

import org.apache.spark.streaming.{StreamingContext, Seconds, Minutes}
import org.apache.spark.{SparkContext, SparkConf}

// From existing SparkContext
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))

// From SparkConf
val conf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(2))

// With all parameters
val ssc = new StreamingContext(
  master = "local[*]",
  appName = "My Streaming App", 
  batchDuration = Seconds(1),
  sparkHome = "/path/to/spark",
  jars = Seq("app.jar"),
  environment = Map("ENV_VAR" -> "value")
)

// From checkpoint (recovery)
val ssc = new StreamingContext("hdfs://path/to/checkpoint", new Configuration())

Duration Helper Objects

import org.apache.spark.streaming.{Milliseconds, Seconds, Minutes}

Milliseconds(500)  // 500 milliseconds
Seconds(1)         // 1 second  
Seconds(30)        // 30 seconds
Minutes(1)         // 1 minute

Input DStream Creation

Socket Streams

socketTextStream: Create DStream from TCP socket

def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]
// Connect to TCP socket for text data
val lines = ssc.socketTextStream("localhost", 9999)

// Process the stream
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
wordCounts.print()

socketStream: Custom socket stream with converter

def socketStream[T: ClassTag](hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel): ReceiverInputDStream[T]

rawSocketStream: Raw socket stream returning byte arrays

def rawSocketStream[T: ClassTag](hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[T]

File Streams

textFileStream: Monitor directory for new text files

def textFileStream(directory: String): DStream[String]
// Monitor directory for new files
val fileStream = ssc.textFileStream("hdfs://path/to/directory")

// Process new files as they arrive
val processed = fileStream
  .filter(_.nonEmpty)
  .map(_.toUpperCase)

processed.print()

fileStream: Generic file stream with InputFormat

def fileStream[K, V, F <: NewInputFormat[K, V]: ClassTag](directory: String, filter: Path => Boolean = _ => true, newFilesOnly: Boolean = true): InputDStream[(K, V)]
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}

val hadoopStream = ssc.fileStream[LongWritable, Text, TextInputFormat]("hdfs://input/dir")
val textStream = hadoopStream.map(_._2.toString)

Queue Streams (for testing)

queueStream: Create stream from queue of RDDs

def queueStream[T: ClassTag](queue: Queue[RDD[T]], oneAtATime: Boolean = true, defaultRDD: RDD[T] = null): InputDStream[T]
import scala.collection.mutable.Queue

val rddQueue = Queue[RDD[Int]]()

// Create stream from queue
val queueStream = ssc.queueStream(rddQueue)

// Add RDDs to queue (simulate data arrival)
for (i <- 1 to 10) {
  rddQueue += ssc.sparkContext.parallelize(1 to 100)
}

Custom Receiver Streams

receiverStream: Create stream from custom receiver

def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.storage.StorageLevel

// Custom receiver example
class CustomReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
  def onStart() {
    // Start receiving data
    new Thread("Custom Receiver") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {
    // Stop receiving data
  }

  private def receive() {
    while (!isStopped()) {
      // Simulate data reception
      val data = generateData()
      store(data)
      Thread.sleep(100)
    }
  }
}

val customStream = ssc.receiverStream(new CustomReceiver())

actorStream: Create stream from Akka Actor

def actorStream[T: ClassTag](props: Props, name: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[T]

DStream Transformations

DStreams support transformations similar to RDDs, applied to each batch.

DStream Class

abstract class DStream[T: ClassTag] extends Serializable with Logging {
  def ssc: StreamingContext
  def slideDuration: Duration
  def dependencies: List[DStream[_]]
  def compute(time: Time): Option[RDD[T]]
}

Basic Transformations

map: Apply function to each element in each batch

def map[U: ClassTag](mapFunc: T => U): DStream[U]
val numbers = ssc.socketTextStream("localhost", 9999)
val doubled = numbers.map(_.toInt * 2)

flatMap: Apply function and flatten results

def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U]
val lines = ssc.textFileStream("input/")
val words = lines.flatMap(_.split(" "))

filter: Keep elements matching predicate

def filter(filterFunc: T => Boolean): DStream[T]
val validLines = lines.filter(_.nonEmpty)
val longWords = words.filter(_.length > 5)

glom: Coalesce elements within each partition into arrays

def glom(): DStream[Array[T]]

Stream Operations

union: Union with another DStream

def union(that: DStream[T]): DStream[T]
val stream1 = ssc.socketTextStream("host1", 9999)
val stream2 = ssc.socketTextStream("host2", 9999)
val combined = stream1.union(stream2)

Aggregation Transformations

count: Count elements in each batch

def count(): DStream[Long]

countByValue: Count occurrences of each value

def countByValue()(implicit ord: Ordering[T] = null): DStream[(T, Long)]

reduce: Reduce elements in each batch

def reduce(reduceFunc: (T, T) => T): DStream[T]
val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)

val counts = numbers.count()           // Count per batch
val sums = numbers.reduce(_ + _)       // Sum per batch
val maxValues = numbers.reduce(math.max) // Max per batch

Advanced Transformations

transform: Apply arbitrary RDD-to-RDD function

def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]
val enhanced = stream.transform { (rdd, time) =>
  // Access to both RDD and batch time
  val timeString = time.toString
  rdd.map(data => s"$timeString: $data")
    .filter(_.contains("important"))
}

transformWith: Transform with another DStream

def transformWith[U: ClassTag, V: ClassTag](other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]): DStream[V]
def transformWith[U: ClassTag, V: ClassTag](other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]): DStream[V]
val stream1 = ssc.socketTextStream("localhost", 9999)
val stream2 = ssc.socketTextStream("localhost", 8888)

val joined = stream1.transformWith(stream2) { (rdd1, rdd2) =>
  // Join RDDs from different streams
  val pairs1 = rdd1.map(line => (extractKey(line), line))
  val pairs2 = rdd2.map(line => (extractKey(line), line))
  pairs1.join(pairs2).map { case (key, (v1, v2)) => s"$v1 | $v2" }
}

Window Operations

Window operations allow you to apply transformations over a sliding window of data.

Basic Windowing

window: Return windowed DStream

def window(windowDuration: Duration): DStream[T]
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
val lines = ssc.socketTextStream("localhost", 9999)

// 30-second window, sliding every 10 seconds
val windowedLines = lines.window(Seconds(30), Seconds(10))
val windowCounts = windowedLines.count()

Windowed Reductions

reduceByWindow: Reduce over a window

def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
def reduceByWindow(reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)

// Sum over window
val windowSums = numbers.reduceByWindow(
  _ + _,                    // Add new values
  Seconds(60),             // Window duration
  Seconds(20)              // Slide duration
)

// Efficient windowed reduction with inverse function
val efficientSums = numbers.reduceByWindow(
  _ + _,                    // Add function
  _ - _,                    // Inverse (subtract) function
  Seconds(60),
  Seconds(20)
)

countByWindow: Count elements over window

def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]

countByValueAndWindow: Count values over window

def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null): DStream[(T, Long)]
val words = ssc.socketTextStream("localhost", 9999).flatMap(_.split(" "))

// Count words in 2-minute window, sliding every 30 seconds
val wordCounts = words.countByValueAndWindow(Minutes(2), Seconds(30))
wordCounts.print()

PairDStreamFunctions (Key-Value Operations)

Operations available on DStreams of (key, value) pairs through implicit conversion.

Key-Value Transformations

keys and values:

def keys: DStream[K]
def values: DStream[V]

mapValues: Transform values while preserving keys

def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)]

flatMapValues: FlatMap values while preserving keys

def flatMapValues[U: ClassTag](flatMapValuesFunc: V => TraversableOnce[U]): DStream[(K, U)]
import org.apache.spark.streaming.StreamingContext._

val pairs = ssc.socketTextStream("localhost", 9999)
  .map(line => {
    val parts = line.split(",")
    (parts(0), parts(1).toInt)
  })

val doubled = pairs.mapValues(_ * 2)
val allKeys = pairs.keys
val allValues = pairs.values

Aggregation by Key

groupByKey: Group values by key in each batch

def groupByKey(): DStream[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])]
def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]

reduceByKey: Reduce values by key in each batch

def reduceByKey(func: (V, V) => V): DStream[(K, V)]
def reduceByKey(func: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): DStream[(K, V)]

combineByKey: Generic combine by key

def combineByKey[C: ClassTag](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, partitioner: Partitioner): DStream[(K, C)]
val wordStream = ssc.socketTextStream("localhost", 9999)
  .flatMap(_.split(" "))
  .map(word => (word, 1))

// Count words in each batch
val wordCounts = wordStream.reduceByKey(_ + _)

// Group all occurrences
val wordGroups = wordStream.groupByKey()

Windowed Key-Value Operations

groupByKeyAndWindow: Group by key over window

def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner): DStream[(K, Iterable[V])]

reduceByKeyAndWindow: Reduce by key over window

def reduceByKeyAndWindow(func: (V, V) => V, windowDuration: Duration): DStream[(K, V)]
def reduceByKeyAndWindow(func: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]
def reduceByKeyAndWindow(func: (V, V) => V, invFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]

countByKeyAndWindow: Count by key over window

def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Long)]
val wordPairs = ssc.socketTextStream("localhost", 9999)
  .flatMap(_.split(" "))
  .map((_, 1))

// Windowed word count (last 5 minutes, every 30 seconds)
val windowedWordCounts = wordPairs.reduceByKeyAndWindow(
  _ + _,           // Reduce function
  Minutes(5),      // Window duration  
  Seconds(30)      // Slide duration
)

// Efficient version with inverse function
val efficientWordCounts = wordPairs.reduceByKeyAndWindow(
  _ + _,           // Add function
  _ - _,           // Subtract function (inverse)
  Minutes(5),
  Seconds(30)
)

Join Operations

join: Join with another DStream

def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]
def join[W: ClassTag](other: DStream[(K, W)], partitioner: Partitioner): DStream[(K, (V, W))]

leftOuterJoin, rightOuterJoin, fullOuterJoin:

def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]  
def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))]

cogroup: Group together with another DStream

def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]
val stream1 = ssc.socketTextStream("localhost", 9999)
  .map(line => (line.split(",")(0), line.split(",")(1)))

val stream2 = ssc.socketTextStream("localhost", 8888)  
  .map(line => (line.split(",")(0), line.split(",")(1)))

// Inner join
val joined = stream1.join(stream2)

// Left outer join  
val leftJoined = stream1.leftOuterJoin(stream2)

// Cogroup
val cogrouped = stream1.cogroup(stream2)

Stateful Operations

updateStateByKey

Maintain state across batches for each key:

def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner): DStream[(K, S)]
def updateStateByKey[S: ClassTag](updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean): DStream[(K, S)]
// Running count of words
val wordPairs = ssc.socketTextStream("localhost", 9999)
  .flatMap(_.split(" "))
  .map((_, 1))

val runningCounts = wordPairs.updateStateByKey { (values: Seq[Int], state: Option[Int]) =>
  val newCount = values.sum + state.getOrElse(0)
  Some(newCount)
}

// Advanced state management
case class WordStats(count: Int, lastSeen: Long)

val wordStats = wordPairs.updateStateByKey { (values: Seq[Int], state: Option[WordStats]) =>
  val currentTime = System.currentTimeMillis()
  val currentCount = values.sum
  
  state match {
    case Some(stats) => Some(WordStats(stats.count + currentCount, currentTime))
    case None => Some(WordStats(currentCount, currentTime))
  }
}

DStream Actions

Actions trigger the execution of DStream transformations.

Output Operations

print: Print first 10 elements of each batch

def print(): Unit
def print(num: Int): Unit

foreachRDD: Apply function to each RDD

def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit
val processed = stream.map(process)

// Print results
processed.print()
processed.print(20)  // Print first 20 elements

// Custom processing of each batch
processed.foreachRDD { rdd =>
  val count = rdd.count()
  if (count > 0) {
    println(s"Batch size: $count")
    rdd.take(10).foreach(println)
  }
}

// With time information
processed.foreachRDD { (rdd, time) =>
  println(s"Batch time: $time, Count: ${rdd.count()}")
}

Save Operations

saveAsTextFiles: Save each batch as text files

def saveAsTextFiles(prefix: String, suffix: String = ""): Unit

saveAsObjectFiles: Save each batch as object files

def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit
val processed = stream.map(_.toUpperCase)

// Save each batch - creates files like output-1414060920000, output-1414060921000, etc.
processed.saveAsTextFiles("hdfs://path/to/output", ".txt")

// Save as object files
processed.saveAsObjectFiles("hdfs://path/to/objects")

StreamingContext Control

Starting and Stopping

start: Start the streaming computation

def start(): Unit

stop: Stop the streaming context

def stop(): Unit
def stop(stopSparkContext: Boolean): Unit
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit

awaitTermination: Wait for termination

def awaitTermination(): Unit
def awaitTermination(timeout: Long): Boolean
val ssc = new StreamingContext(conf, Seconds(1))

// Define streaming computation
val stream = ssc.socketTextStream("localhost", 9999)
stream.print()

// Start the computation
ssc.start()

// Wait for termination
ssc.awaitTermination()

// Or wait with timeout
val terminated = ssc.awaitTermination(60000) // 60 seconds
if (!terminated) {
  println("Streaming did not terminate within 60 seconds")
  ssc.stop()
}

Checkpointing

checkpoint: Set checkpoint directory

def checkpoint(directory: String): Unit

getOrCreate: Get existing context from checkpoint or create new one

def getOrCreate(checkpointPath: String, creatingFunc: () => StreamingContext, hadoopConf: Configuration = new Configuration()): StreamingContext
// Enable checkpointing
ssc.checkpoint("hdfs://path/to/checkpoints")

// Fault-tolerant pattern
def createStreamingContext(): StreamingContext = {
  val ssc = new StreamingContext(conf, Seconds(1))
  
  // Define streaming computation
  val lines = ssc.socketTextStream("localhost", 9999)
  val words = lines.flatMap(_.split(" "))
  val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
  wordCounts.print()
  
  ssc.checkpoint("hdfs://checkpoints")
  ssc
}

val ssc = StreamingContext.getOrCreate("hdfs://checkpoints", createStreamingContext _)

Context Properties

remember: Set remember duration

def remember(duration: Duration): Unit

sparkContext: Access underlying SparkContext

def sparkContext: SparkContext
// Set how long to remember RDDs
ssc.remember(Minutes(10))

// Access SparkContext
val sc = ssc.sparkContext
val broadcast = sc.broadcast(lookupTable)

Persistence and Caching

DStreams can be persisted in memory for faster access:

def persist(storageLevel: StorageLevel): DStream[T]
def persist(): DStream[T]  // Uses MEMORY_ONLY_SER
def cache(): DStream[T]    // Uses MEMORY_ONLY_SER
import org.apache.spark.storage.StorageLevel

val expensiveStream = ssc.socketTextStream("localhost", 9999)
  .map(expensiveTransformation)
  .cache()  // Cache for reuse

// Multiple operations on cached stream
val count = expensiveStream.count()
val sample = expensiveStream.sample(false, 0.1)

Performance and Best Practices

Batch Interval Selection

// For low latency (100ms - 1s)
val ssc = new StreamingContext(conf, Milliseconds(500))

// For high throughput (1s - 10s) 
val ssc = new StreamingContext(conf, Seconds(5))

// For batch processing style (minutes)
val ssc = new StreamingContext(conf, Minutes(2))

Parallelism and Partitioning

// Increase parallelism for receivers
val numReceivers = 4
val streams = (1 to numReceivers).map { i =>
  ssc.socketTextStream(s"host$i", 9999)
}
val unifiedStream = ssc.union(streams)

// Repartition for better load balancing
val repartitioned = stream.transform(_.repartition(10))

Memory Management

// Set appropriate storage levels
val persistedStream = stream
  .map(expensiveOperation)
  .persist(StorageLevel.MEMORY_AND_DISK_SER)

// Enable checkpointing for fault tolerance
ssc.checkpoint("hdfs://checkpoints")

// Use efficient serialization
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

Error Handling and Fault Tolerance

Spark Streaming applications must handle various failure scenarios to ensure reliable operation.

Common Streaming Errors

StreamingContextException: Invalid streaming context operations

try {
  ssc.start()
  ssc.start()  // Error: context already started
} catch {
  case e: IllegalStateException => 
    println("Streaming context already started")
}

Receiver Failures: Input stream receivers failing

// Monitor receiver status
ssc.addStreamingListener(new StreamingListener {
  override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
    println(s"Receiver error: ${receiverError.receiverInfo.name}")
    // Implement recovery logic
  }
})

Batch Processing Delays: When processing takes longer than batch interval

// Monitor batch processing times
ssc.addStreamingListener(new StreamingListener {
  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    val processingTime = batchCompleted.batchInfo.processingDelay.getOrElse(0L)
    val batchInterval = ssc.graph.batchDuration.milliseconds
    
    if (processingTime > batchInterval) {
      println(s"Warning: Processing time ($processingTime ms) > batch interval ($batchInterval ms)")
    }
  }
})

Checkpoint Corruption

Checkpoint Recovery Failures: When checkpoint data is corrupted

def createStreamingContext(): StreamingContext = {
  val ssc = new StreamingContext(conf, Seconds(1))
  // Define streaming logic
  ssc.checkpoint("hdfs://checkpoints")
  ssc
}

try {
  val ssc = StreamingContext.getOrCreate("hdfs://checkpoints", createStreamingContext _)
} catch {
  case e: Exception =>
    println(s"Checkpoint recovery failed: ${e.getMessage}")
    // Fall back to creating new context
    val ssc = createStreamingContext()
}

Checkpoint Directory Management:

// Clean up old checkpoints periodically
import java.io.File
import org.apache.hadoop.fs.{FileSystem, Path}

def cleanupCheckpoints(checkpointDir: String, retentionHours: Int): Unit = {
  val fs = FileSystem.get(ssc.sparkContext.hadoopConfiguration)
  val checkpointPath = new Path(checkpointDir)
  
  try {
    val cutoffTime = System.currentTimeMillis() - (retentionHours * 60 * 60 * 1000)
    val files = fs.listStatus(checkpointPath)
    
    files.foreach { fileStatus =>
      if (fileStatus.getModificationTime < cutoffTime) {
        fs.delete(fileStatus.getPath, true)
        println(s"Deleted old checkpoint: ${fileStatus.getPath}")
      }
    }
  } catch {
    case e: Exception => println(s"Checkpoint cleanup failed: ${e.getMessage}")
  }
}

Memory and Resource Errors

OutOfMemoryError in Streaming:

// Monitor memory usage and adjust batch sizes
val memoryMonitoringStream = stream.transform { rdd =>
  val memoryUsed = Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory
  val memoryMax = Runtime.getRuntime.maxMemory
  val memoryPercent = (memoryUsed.toDouble / memoryMax) * 100
  
  if (memoryPercent > 80) {
    println(s"Warning: Memory usage at ${memoryPercent.toInt}%")
    // Reduce batch size or increase memory
  }
  
  rdd
}

Backpressure Issues: When input rate exceeds processing capacity

// Enable backpressure (Spark 1.5+)
conf.set("spark.streaming.backpressure.enabled", "true")
conf.set("spark.streaming.backpressure.initialRate", "1000")

// Manual rate limiting
conf.set("spark.streaming.kafka.maxRatePerPartition", "1000")

Network and Connectivity Errors

Socket Connection Failures:

// Implement retry logic for socket connections
def createReliableSocketStream(hostname: String, port: Int, maxRetries: Int = 3): DStream[String] = {
  var attempts = 0
  var stream: DStream[String] = null
  
  while (attempts < maxRetries && stream == null) {
    try {
      stream = ssc.socketTextStream(hostname, port)
      println(s"Connected to $hostname:$port")
    } catch {
      case e: ConnectException =>
        attempts += 1
        println(s"Connection attempt $attempts failed: ${e.getMessage}")
        if (attempts < maxRetries) {
          Thread.sleep(5000) // Wait 5 seconds before retry
        }
    }
  }
  
  if (stream == null) {
    throw new RuntimeException(s"Failed to connect after $maxRetries attempts")
  }
  
  stream
}

Kafka Connection Issues:

// Handle Kafka metadata refresh failures
val kafkaParams = Map[String, String](
  "metadata.broker.list" -> "broker1:9092,broker2:9092",
  "auto.offset.reset" -> "smallest",
  "refresh.leader.backoff.ms" -> "1000",
  "socket.timeout.ms" -> "30000",
  "fetch.message.max.bytes" -> "1048576"
)

try {
  val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topics
  )
} catch {
  case e: TimeoutException =>
    println("Kafka connection timeout - check broker availability")
  case e: Exception =>
    println(s"Kafka stream creation failed: ${e.getMessage}")
}

Processing Errors and Recovery

Exception Handling in Transformations:

val robustStream = stream.map { record =>
  try {
    processRecord(record)
  } catch {
    case e: NumberFormatException =>
      println(s"Invalid number format in record: $record")
      null // or default value
    case e: Exception =>
      println(s"Processing error for record $record: ${e.getMessage}")
      null
  }
}.filter(_ != null) // Remove failed records

Dead Letter Queue Pattern:

val (successStream, errorStream) = stream.transform { rdd =>
  val processed = rdd.map { record =>
    try {
      (Some(processRecord(record)), None)
    } catch {
      case e: Exception =>
        (None, Some((record, e.getMessage)))
    }
  }.cache() // Cache to avoid recomputation
  
  val successes = processed.filter(_._1.isDefined).map(_._1.get)
  val errors = processed.filter(_._2.isDefined).map(_._2.get)
  
  // Save errors to dead letter queue
  errors.foreachPartition { partition =>
    partition.foreach { case (record, error) =>
      saveToDeadLetterQueue(record, error)
    }
  }
  
  successes
}

Best Practices for Error Handling

  1. Enable Checkpointing: Always use checkpointing for production applications
  2. Monitor Batch Processing Times: Ensure processing time < batch interval
  3. Implement Circuit Breakers: Fail fast when external services are down
  4. Use Write-Ahead Logs: Enable WAL for reliable receivers
  5. Handle Partial Failures: Process what you can, log what fails
  6. Set Up Monitoring: Use Spark UI and external monitoring tools
// Comprehensive error handling pattern
def createRobustStreamingApp(): StreamingContext = {
  val ssc = new StreamingContext(conf, Seconds(1))
  
  // Enable fault tolerance features
  ssc.checkpoint("hdfs://checkpoints")
  conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
  conf.set("spark.streaming.backpressure.enabled", "true")
  
  val stream = ssc.socketTextStream("localhost", 9999)
    .map(parseRecord)
    .filter(_.isDefined)
    .map(_.get)
    .handleErrors()
    .cache()
  
  // Multiple outputs for different purposes
  stream.print()
  stream.saveAsTextFiles("hdfs://output/data")
  
  // Add monitoring
  ssc.addStreamingListener(new CustomStreamingListener())
  
  ssc
}

implicit class RobustDStream[T](dstream: DStream[T]) {
  def handleErrors(): DStream[T] = {
    dstream.transform { rdd =>
      rdd.filter(_ != null).handlePartitionErrors()
    }
  }
}

This comprehensive guide covers the complete Spark Streaming API along with robust error handling patterns for building scalable, fault-tolerant stream processing applications.

Install with Tessl CLI

npx tessl i tessl/maven-apache-spark

docs

caching-persistence.md

core-rdd.md

data-sources.md

graphx.md

index.md

java-api.md

key-value-operations.md

mllib.md

python-api.md

spark-context.md

sql.md

streaming.md

tile.json