Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
—
Spark Streaming enables scalable, high-throughput, fault-tolerant stream processing of live data streams. It ingests data from sources like Kafka, Flume, Twitter, and TCP sockets, and processes it using high-level functions like map, reduce, join, and window operations.
Spark Streaming discretizes live data streams into micro-batches called DStreams (Discretized Streams). Each batch is processed as an RDD, enabling the use of Spark's batch processing APIs on streaming data.
The main entry point for Spark Streaming functionality.
class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) extends Logging {
// Alternative constructors
def this(conf: SparkConf, batchDuration: Duration)
def this(master: String, appName: String, batchDuration: Duration, sparkHome: String = null, jars: Seq[String] = Nil, environment: Map[String, String] = Map())
def this(path: String, hadoopConf: Configuration = new Configuration())
}import org.apache.spark.streaming.{StreamingContext, Seconds, Minutes}
import org.apache.spark.{SparkContext, SparkConf}
// From existing SparkContext
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))
// From SparkConf
val conf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(2))
// With all parameters
val ssc = new StreamingContext(
master = "local[*]",
appName = "My Streaming App",
batchDuration = Seconds(1),
sparkHome = "/path/to/spark",
jars = Seq("app.jar"),
environment = Map("ENV_VAR" -> "value")
)
// From checkpoint (recovery)
val ssc = new StreamingContext("hdfs://path/to/checkpoint", new Configuration())import org.apache.spark.streaming.{Milliseconds, Seconds, Minutes}
Milliseconds(500) // 500 milliseconds
Seconds(1) // 1 second
Seconds(30) // 30 seconds
Minutes(1) // 1 minutesocketTextStream: Create DStream from TCP socket
def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]// Connect to TCP socket for text data
val lines = ssc.socketTextStream("localhost", 9999)
// Process the stream
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
wordCounts.print()socketStream: Custom socket stream with converter
def socketStream[T: ClassTag](hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel): ReceiverInputDStream[T]rawSocketStream: Raw socket stream returning byte arrays
def rawSocketStream[T: ClassTag](hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[T]textFileStream: Monitor directory for new text files
def textFileStream(directory: String): DStream[String]// Monitor directory for new files
val fileStream = ssc.textFileStream("hdfs://path/to/directory")
// Process new files as they arrive
val processed = fileStream
.filter(_.nonEmpty)
.map(_.toUpperCase)
processed.print()fileStream: Generic file stream with InputFormat
def fileStream[K, V, F <: NewInputFormat[K, V]: ClassTag](directory: String, filter: Path => Boolean = _ => true, newFilesOnly: Boolean = true): InputDStream[(K, V)]import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}
val hadoopStream = ssc.fileStream[LongWritable, Text, TextInputFormat]("hdfs://input/dir")
val textStream = hadoopStream.map(_._2.toString)queueStream: Create stream from queue of RDDs
def queueStream[T: ClassTag](queue: Queue[RDD[T]], oneAtATime: Boolean = true, defaultRDD: RDD[T] = null): InputDStream[T]import scala.collection.mutable.Queue
val rddQueue = Queue[RDD[Int]]()
// Create stream from queue
val queueStream = ssc.queueStream(rddQueue)
// Add RDDs to queue (simulate data arrival)
for (i <- 1 to 10) {
rddQueue += ssc.sparkContext.parallelize(1 to 100)
}receiverStream: Create stream from custom receiver
def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.storage.StorageLevel
// Custom receiver example
class CustomReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
def onStart() {
// Start receiving data
new Thread("Custom Receiver") {
override def run() { receive() }
}.start()
}
def onStop() {
// Stop receiving data
}
private def receive() {
while (!isStopped()) {
// Simulate data reception
val data = generateData()
store(data)
Thread.sleep(100)
}
}
}
val customStream = ssc.receiverStream(new CustomReceiver())actorStream: Create stream from Akka Actor
def actorStream[T: ClassTag](props: Props, name: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[T]DStreams support transformations similar to RDDs, applied to each batch.
abstract class DStream[T: ClassTag] extends Serializable with Logging {
def ssc: StreamingContext
def slideDuration: Duration
def dependencies: List[DStream[_]]
def compute(time: Time): Option[RDD[T]]
}map: Apply function to each element in each batch
def map[U: ClassTag](mapFunc: T => U): DStream[U]val numbers = ssc.socketTextStream("localhost", 9999)
val doubled = numbers.map(_.toInt * 2)flatMap: Apply function and flatten results
def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U]val lines = ssc.textFileStream("input/")
val words = lines.flatMap(_.split(" "))filter: Keep elements matching predicate
def filter(filterFunc: T => Boolean): DStream[T]val validLines = lines.filter(_.nonEmpty)
val longWords = words.filter(_.length > 5)glom: Coalesce elements within each partition into arrays
def glom(): DStream[Array[T]]union: Union with another DStream
def union(that: DStream[T]): DStream[T]val stream1 = ssc.socketTextStream("host1", 9999)
val stream2 = ssc.socketTextStream("host2", 9999)
val combined = stream1.union(stream2)count: Count elements in each batch
def count(): DStream[Long]countByValue: Count occurrences of each value
def countByValue()(implicit ord: Ordering[T] = null): DStream[(T, Long)]reduce: Reduce elements in each batch
def reduce(reduceFunc: (T, T) => T): DStream[T]val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)
val counts = numbers.count() // Count per batch
val sums = numbers.reduce(_ + _) // Sum per batch
val maxValues = numbers.reduce(math.max) // Max per batchtransform: Apply arbitrary RDD-to-RDD function
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]val enhanced = stream.transform { (rdd, time) =>
// Access to both RDD and batch time
val timeString = time.toString
rdd.map(data => s"$timeString: $data")
.filter(_.contains("important"))
}transformWith: Transform with another DStream
def transformWith[U: ClassTag, V: ClassTag](other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]): DStream[V]
def transformWith[U: ClassTag, V: ClassTag](other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]): DStream[V]val stream1 = ssc.socketTextStream("localhost", 9999)
val stream2 = ssc.socketTextStream("localhost", 8888)
val joined = stream1.transformWith(stream2) { (rdd1, rdd2) =>
// Join RDDs from different streams
val pairs1 = rdd1.map(line => (extractKey(line), line))
val pairs2 = rdd2.map(line => (extractKey(line), line))
pairs1.join(pairs2).map { case (key, (v1, v2)) => s"$v1 | $v2" }
}Window operations allow you to apply transformations over a sliding window of data.
window: Return windowed DStream
def window(windowDuration: Duration): DStream[T]
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]val lines = ssc.socketTextStream("localhost", 9999)
// 30-second window, sliding every 10 seconds
val windowedLines = lines.window(Seconds(30), Seconds(10))
val windowCounts = windowedLines.count()reduceByWindow: Reduce over a window
def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
def reduceByWindow(reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)
// Sum over window
val windowSums = numbers.reduceByWindow(
_ + _, // Add new values
Seconds(60), // Window duration
Seconds(20) // Slide duration
)
// Efficient windowed reduction with inverse function
val efficientSums = numbers.reduceByWindow(
_ + _, // Add function
_ - _, // Inverse (subtract) function
Seconds(60),
Seconds(20)
)countByWindow: Count elements over window
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]countByValueAndWindow: Count values over window
def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null): DStream[(T, Long)]val words = ssc.socketTextStream("localhost", 9999).flatMap(_.split(" "))
// Count words in 2-minute window, sliding every 30 seconds
val wordCounts = words.countByValueAndWindow(Minutes(2), Seconds(30))
wordCounts.print()Operations available on DStreams of (key, value) pairs through implicit conversion.
keys and values:
def keys: DStream[K]
def values: DStream[V]mapValues: Transform values while preserving keys
def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)]flatMapValues: FlatMap values while preserving keys
def flatMapValues[U: ClassTag](flatMapValuesFunc: V => TraversableOnce[U]): DStream[(K, U)]import org.apache.spark.streaming.StreamingContext._
val pairs = ssc.socketTextStream("localhost", 9999)
.map(line => {
val parts = line.split(",")
(parts(0), parts(1).toInt)
})
val doubled = pairs.mapValues(_ * 2)
val allKeys = pairs.keys
val allValues = pairs.valuesgroupByKey: Group values by key in each batch
def groupByKey(): DStream[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])]
def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]reduceByKey: Reduce values by key in each batch
def reduceByKey(func: (V, V) => V): DStream[(K, V)]
def reduceByKey(func: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): DStream[(K, V)]combineByKey: Generic combine by key
def combineByKey[C: ClassTag](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, partitioner: Partitioner): DStream[(K, C)]val wordStream = ssc.socketTextStream("localhost", 9999)
.flatMap(_.split(" "))
.map(word => (word, 1))
// Count words in each batch
val wordCounts = wordStream.reduceByKey(_ + _)
// Group all occurrences
val wordGroups = wordStream.groupByKey()groupByKeyAndWindow: Group by key over window
def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner): DStream[(K, Iterable[V])]reduceByKeyAndWindow: Reduce by key over window
def reduceByKeyAndWindow(func: (V, V) => V, windowDuration: Duration): DStream[(K, V)]
def reduceByKeyAndWindow(func: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]
def reduceByKeyAndWindow(func: (V, V) => V, invFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]countByKeyAndWindow: Count by key over window
def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Long)]val wordPairs = ssc.socketTextStream("localhost", 9999)
.flatMap(_.split(" "))
.map((_, 1))
// Windowed word count (last 5 minutes, every 30 seconds)
val windowedWordCounts = wordPairs.reduceByKeyAndWindow(
_ + _, // Reduce function
Minutes(5), // Window duration
Seconds(30) // Slide duration
)
// Efficient version with inverse function
val efficientWordCounts = wordPairs.reduceByKeyAndWindow(
_ + _, // Add function
_ - _, // Subtract function (inverse)
Minutes(5),
Seconds(30)
)join: Join with another DStream
def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]
def join[W: ClassTag](other: DStream[(K, W)], partitioner: Partitioner): DStream[(K, (V, W))]leftOuterJoin, rightOuterJoin, fullOuterJoin:
def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]
def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))]cogroup: Group together with another DStream
def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]val stream1 = ssc.socketTextStream("localhost", 9999)
.map(line => (line.split(",")(0), line.split(",")(1)))
val stream2 = ssc.socketTextStream("localhost", 8888)
.map(line => (line.split(",")(0), line.split(",")(1)))
// Inner join
val joined = stream1.join(stream2)
// Left outer join
val leftJoined = stream1.leftOuterJoin(stream2)
// Cogroup
val cogrouped = stream1.cogroup(stream2)Maintain state across batches for each key:
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner): DStream[(K, S)]
def updateStateByKey[S: ClassTag](updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean): DStream[(K, S)]// Running count of words
val wordPairs = ssc.socketTextStream("localhost", 9999)
.flatMap(_.split(" "))
.map((_, 1))
val runningCounts = wordPairs.updateStateByKey { (values: Seq[Int], state: Option[Int]) =>
val newCount = values.sum + state.getOrElse(0)
Some(newCount)
}
// Advanced state management
case class WordStats(count: Int, lastSeen: Long)
val wordStats = wordPairs.updateStateByKey { (values: Seq[Int], state: Option[WordStats]) =>
val currentTime = System.currentTimeMillis()
val currentCount = values.sum
state match {
case Some(stats) => Some(WordStats(stats.count + currentCount, currentTime))
case None => Some(WordStats(currentCount, currentTime))
}
}Actions trigger the execution of DStream transformations.
print: Print first 10 elements of each batch
def print(): Unit
def print(num: Int): UnitforeachRDD: Apply function to each RDD
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unitval processed = stream.map(process)
// Print results
processed.print()
processed.print(20) // Print first 20 elements
// Custom processing of each batch
processed.foreachRDD { rdd =>
val count = rdd.count()
if (count > 0) {
println(s"Batch size: $count")
rdd.take(10).foreach(println)
}
}
// With time information
processed.foreachRDD { (rdd, time) =>
println(s"Batch time: $time, Count: ${rdd.count()}")
}saveAsTextFiles: Save each batch as text files
def saveAsTextFiles(prefix: String, suffix: String = ""): UnitsaveAsObjectFiles: Save each batch as object files
def saveAsObjectFiles(prefix: String, suffix: String = ""): Unitval processed = stream.map(_.toUpperCase)
// Save each batch - creates files like output-1414060920000, output-1414060921000, etc.
processed.saveAsTextFiles("hdfs://path/to/output", ".txt")
// Save as object files
processed.saveAsObjectFiles("hdfs://path/to/objects")start: Start the streaming computation
def start(): Unitstop: Stop the streaming context
def stop(): Unit
def stop(stopSparkContext: Boolean): Unit
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): UnitawaitTermination: Wait for termination
def awaitTermination(): Unit
def awaitTermination(timeout: Long): Booleanval ssc = new StreamingContext(conf, Seconds(1))
// Define streaming computation
val stream = ssc.socketTextStream("localhost", 9999)
stream.print()
// Start the computation
ssc.start()
// Wait for termination
ssc.awaitTermination()
// Or wait with timeout
val terminated = ssc.awaitTermination(60000) // 60 seconds
if (!terminated) {
println("Streaming did not terminate within 60 seconds")
ssc.stop()
}checkpoint: Set checkpoint directory
def checkpoint(directory: String): UnitgetOrCreate: Get existing context from checkpoint or create new one
def getOrCreate(checkpointPath: String, creatingFunc: () => StreamingContext, hadoopConf: Configuration = new Configuration()): StreamingContext// Enable checkpointing
ssc.checkpoint("hdfs://path/to/checkpoints")
// Fault-tolerant pattern
def createStreamingContext(): StreamingContext = {
val ssc = new StreamingContext(conf, Seconds(1))
// Define streaming computation
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.checkpoint("hdfs://checkpoints")
ssc
}
val ssc = StreamingContext.getOrCreate("hdfs://checkpoints", createStreamingContext _)remember: Set remember duration
def remember(duration: Duration): UnitsparkContext: Access underlying SparkContext
def sparkContext: SparkContext// Set how long to remember RDDs
ssc.remember(Minutes(10))
// Access SparkContext
val sc = ssc.sparkContext
val broadcast = sc.broadcast(lookupTable)DStreams can be persisted in memory for faster access:
def persist(storageLevel: StorageLevel): DStream[T]
def persist(): DStream[T] // Uses MEMORY_ONLY_SER
def cache(): DStream[T] // Uses MEMORY_ONLY_SERimport org.apache.spark.storage.StorageLevel
val expensiveStream = ssc.socketTextStream("localhost", 9999)
.map(expensiveTransformation)
.cache() // Cache for reuse
// Multiple operations on cached stream
val count = expensiveStream.count()
val sample = expensiveStream.sample(false, 0.1)// For low latency (100ms - 1s)
val ssc = new StreamingContext(conf, Milliseconds(500))
// For high throughput (1s - 10s)
val ssc = new StreamingContext(conf, Seconds(5))
// For batch processing style (minutes)
val ssc = new StreamingContext(conf, Minutes(2))// Increase parallelism for receivers
val numReceivers = 4
val streams = (1 to numReceivers).map { i =>
ssc.socketTextStream(s"host$i", 9999)
}
val unifiedStream = ssc.union(streams)
// Repartition for better load balancing
val repartitioned = stream.transform(_.repartition(10))// Set appropriate storage levels
val persistedStream = stream
.map(expensiveOperation)
.persist(StorageLevel.MEMORY_AND_DISK_SER)
// Enable checkpointing for fault tolerance
ssc.checkpoint("hdfs://checkpoints")
// Use efficient serialization
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")Spark Streaming applications must handle various failure scenarios to ensure reliable operation.
StreamingContextException: Invalid streaming context operations
try {
ssc.start()
ssc.start() // Error: context already started
} catch {
case e: IllegalStateException =>
println("Streaming context already started")
}Receiver Failures: Input stream receivers failing
// Monitor receiver status
ssc.addStreamingListener(new StreamingListener {
override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
println(s"Receiver error: ${receiverError.receiverInfo.name}")
// Implement recovery logic
}
})Batch Processing Delays: When processing takes longer than batch interval
// Monitor batch processing times
ssc.addStreamingListener(new StreamingListener {
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
val processingTime = batchCompleted.batchInfo.processingDelay.getOrElse(0L)
val batchInterval = ssc.graph.batchDuration.milliseconds
if (processingTime > batchInterval) {
println(s"Warning: Processing time ($processingTime ms) > batch interval ($batchInterval ms)")
}
}
})Checkpoint Recovery Failures: When checkpoint data is corrupted
def createStreamingContext(): StreamingContext = {
val ssc = new StreamingContext(conf, Seconds(1))
// Define streaming logic
ssc.checkpoint("hdfs://checkpoints")
ssc
}
try {
val ssc = StreamingContext.getOrCreate("hdfs://checkpoints", createStreamingContext _)
} catch {
case e: Exception =>
println(s"Checkpoint recovery failed: ${e.getMessage}")
// Fall back to creating new context
val ssc = createStreamingContext()
}Checkpoint Directory Management:
// Clean up old checkpoints periodically
import java.io.File
import org.apache.hadoop.fs.{FileSystem, Path}
def cleanupCheckpoints(checkpointDir: String, retentionHours: Int): Unit = {
val fs = FileSystem.get(ssc.sparkContext.hadoopConfiguration)
val checkpointPath = new Path(checkpointDir)
try {
val cutoffTime = System.currentTimeMillis() - (retentionHours * 60 * 60 * 1000)
val files = fs.listStatus(checkpointPath)
files.foreach { fileStatus =>
if (fileStatus.getModificationTime < cutoffTime) {
fs.delete(fileStatus.getPath, true)
println(s"Deleted old checkpoint: ${fileStatus.getPath}")
}
}
} catch {
case e: Exception => println(s"Checkpoint cleanup failed: ${e.getMessage}")
}
}OutOfMemoryError in Streaming:
// Monitor memory usage and adjust batch sizes
val memoryMonitoringStream = stream.transform { rdd =>
val memoryUsed = Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory
val memoryMax = Runtime.getRuntime.maxMemory
val memoryPercent = (memoryUsed.toDouble / memoryMax) * 100
if (memoryPercent > 80) {
println(s"Warning: Memory usage at ${memoryPercent.toInt}%")
// Reduce batch size or increase memory
}
rdd
}Backpressure Issues: When input rate exceeds processing capacity
// Enable backpressure (Spark 1.5+)
conf.set("spark.streaming.backpressure.enabled", "true")
conf.set("spark.streaming.backpressure.initialRate", "1000")
// Manual rate limiting
conf.set("spark.streaming.kafka.maxRatePerPartition", "1000")Socket Connection Failures:
// Implement retry logic for socket connections
def createReliableSocketStream(hostname: String, port: Int, maxRetries: Int = 3): DStream[String] = {
var attempts = 0
var stream: DStream[String] = null
while (attempts < maxRetries && stream == null) {
try {
stream = ssc.socketTextStream(hostname, port)
println(s"Connected to $hostname:$port")
} catch {
case e: ConnectException =>
attempts += 1
println(s"Connection attempt $attempts failed: ${e.getMessage}")
if (attempts < maxRetries) {
Thread.sleep(5000) // Wait 5 seconds before retry
}
}
}
if (stream == null) {
throw new RuntimeException(s"Failed to connect after $maxRetries attempts")
}
stream
}Kafka Connection Issues:
// Handle Kafka metadata refresh failures
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "broker1:9092,broker2:9092",
"auto.offset.reset" -> "smallest",
"refresh.leader.backoff.ms" -> "1000",
"socket.timeout.ms" -> "30000",
"fetch.message.max.bytes" -> "1048576"
)
try {
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics
)
} catch {
case e: TimeoutException =>
println("Kafka connection timeout - check broker availability")
case e: Exception =>
println(s"Kafka stream creation failed: ${e.getMessage}")
}Exception Handling in Transformations:
val robustStream = stream.map { record =>
try {
processRecord(record)
} catch {
case e: NumberFormatException =>
println(s"Invalid number format in record: $record")
null // or default value
case e: Exception =>
println(s"Processing error for record $record: ${e.getMessage}")
null
}
}.filter(_ != null) // Remove failed recordsDead Letter Queue Pattern:
val (successStream, errorStream) = stream.transform { rdd =>
val processed = rdd.map { record =>
try {
(Some(processRecord(record)), None)
} catch {
case e: Exception =>
(None, Some((record, e.getMessage)))
}
}.cache() // Cache to avoid recomputation
val successes = processed.filter(_._1.isDefined).map(_._1.get)
val errors = processed.filter(_._2.isDefined).map(_._2.get)
// Save errors to dead letter queue
errors.foreachPartition { partition =>
partition.foreach { case (record, error) =>
saveToDeadLetterQueue(record, error)
}
}
successes
}// Comprehensive error handling pattern
def createRobustStreamingApp(): StreamingContext = {
val ssc = new StreamingContext(conf, Seconds(1))
// Enable fault tolerance features
ssc.checkpoint("hdfs://checkpoints")
conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
conf.set("spark.streaming.backpressure.enabled", "true")
val stream = ssc.socketTextStream("localhost", 9999)
.map(parseRecord)
.filter(_.isDefined)
.map(_.get)
.handleErrors()
.cache()
// Multiple outputs for different purposes
stream.print()
stream.saveAsTextFiles("hdfs://output/data")
// Add monitoring
ssc.addStreamingListener(new CustomStreamingListener())
ssc
}
implicit class RobustDStream[T](dstream: DStream[T]) {
def handleErrors(): DStream[T] = {
dstream.transform { rdd =>
rdd.filter(_ != null).handlePartitionErrors()
}
}
}This comprehensive guide covers the complete Spark Streaming API along with robust error handling patterns for building scalable, fault-tolerant stream processing applications.
Install with Tessl CLI
npx tessl i tessl/maven-apache-spark