Spec Registry
Help your agents use open-source better. Learn more.
Find usage specs for your project’s dependencies
- Author
- tessl
- Last updated
- Spec files
maven-apache-spark
Describes: maven/org.apache.spark/spark-parent
- Description
- Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
- Author
- tessl
- Last updated
streaming.md docs/
1# Spark Streaming23Spark Streaming enables scalable, high-throughput, fault-tolerant stream processing of live data streams. It ingests data from sources like Kafka, Flume, Twitter, and TCP sockets, and processes it using high-level functions like map, reduce, join, and window operations.45## Core Concepts67Spark Streaming discretizes live data streams into micro-batches called **DStreams** (Discretized Streams). Each batch is processed as an RDD, enabling the use of Spark's batch processing APIs on streaming data.89## StreamingContext1011The main entry point for Spark Streaming functionality.1213### StreamingContext Class1415```scala { .api }16class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) extends Logging {17// Alternative constructors18def this(conf: SparkConf, batchDuration: Duration)19def this(master: String, appName: String, batchDuration: Duration, sparkHome: String = null, jars: Seq[String] = Nil, environment: Map[String, String] = Map())20def this(path: String, hadoopConf: Configuration = new Configuration())21}22```2324### Creating StreamingContext2526```scala27import org.apache.spark.streaming.{StreamingContext, Seconds, Minutes}28import org.apache.spark.{SparkContext, SparkConf}2930// From existing SparkContext31val sc = new SparkContext(conf)32val ssc = new StreamingContext(sc, Seconds(1))3334// From SparkConf35val conf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]")36val ssc = new StreamingContext(conf, Seconds(2))3738// With all parameters39val ssc = new StreamingContext(40master = "local[*]",41appName = "My Streaming App",42batchDuration = Seconds(1),43sparkHome = "/path/to/spark",44jars = Seq("app.jar"),45environment = Map("ENV_VAR" -> "value")46)4748// From checkpoint (recovery)49val ssc = new StreamingContext("hdfs://path/to/checkpoint", new Configuration())50```5152### Duration Helper Objects5354```scala { .api }55import org.apache.spark.streaming.{Milliseconds, Seconds, Minutes}5657Milliseconds(500) // 500 milliseconds58Seconds(1) // 1 second59Seconds(30) // 30 seconds60Minutes(1) // 1 minute61```6263## Input DStream Creation6465### Socket Streams6667**socketTextStream**: Create DStream from TCP socket68```scala { .api }69def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]70```7172```scala73// Connect to TCP socket for text data74val lines = ssc.socketTextStream("localhost", 9999)7576// Process the stream77val words = lines.flatMap(_.split(" "))78val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)79wordCounts.print()80```8182**socketStream**: Custom socket stream with converter83```scala { .api }84def socketStream[T: ClassTag](hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel): ReceiverInputDStream[T]85```8687**rawSocketStream**: Raw socket stream returning byte arrays88```scala { .api }89def rawSocketStream[T: ClassTag](hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[T]90```9192### File Streams9394**textFileStream**: Monitor directory for new text files95```scala { .api }96def textFileStream(directory: String): DStream[String]97```9899```scala100// Monitor directory for new files101val fileStream = ssc.textFileStream("hdfs://path/to/directory")102103// Process new files as they arrive104val processed = fileStream105.filter(_.nonEmpty)106.map(_.toUpperCase)107108processed.print()109```110111**fileStream**: Generic file stream with InputFormat112```scala { .api }113def fileStream[K, V, F <: NewInputFormat[K, V]: ClassTag](directory: String, filter: Path => Boolean = _ => true, newFilesOnly: Boolean = true): InputDStream[(K, V)]114```115116```scala117import org.apache.hadoop.mapreduce.lib.input.TextInputFormat118import org.apache.hadoop.io.{LongWritable, Text}119120val hadoopStream = ssc.fileStream[LongWritable, Text, TextInputFormat]("hdfs://input/dir")121val textStream = hadoopStream.map(_._2.toString)122```123124### Queue Streams (for testing)125126**queueStream**: Create stream from queue of RDDs127```scala { .api }128def queueStream[T: ClassTag](queue: Queue[RDD[T]], oneAtATime: Boolean = true, defaultRDD: RDD[T] = null): InputDStream[T]129```130131```scala132import scala.collection.mutable.Queue133134val rddQueue = Queue[RDD[Int]]()135136// Create stream from queue137val queueStream = ssc.queueStream(rddQueue)138139// Add RDDs to queue (simulate data arrival)140for (i <- 1 to 10) {141rddQueue += ssc.sparkContext.parallelize(1 to 100)142}143```144145### Custom Receiver Streams146147**receiverStream**: Create stream from custom receiver148```scala { .api }149def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]150```151152```scala153import org.apache.spark.streaming.receiver.Receiver154import org.apache.spark.storage.StorageLevel155156// Custom receiver example157class CustomReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {158def onStart() {159// Start receiving data160new Thread("Custom Receiver") {161override def run() { receive() }162}.start()163}164165def onStop() {166// Stop receiving data167}168169private def receive() {170while (!isStopped()) {171// Simulate data reception172val data = generateData()173store(data)174Thread.sleep(100)175}176}177}178179val customStream = ssc.receiverStream(new CustomReceiver())180```181182**actorStream**: Create stream from Akka Actor183```scala { .api }184def actorStream[T: ClassTag](props: Props, name: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[T]185```186187## DStream Transformations188189DStreams support transformations similar to RDDs, applied to each batch.190191### DStream Class192193```scala { .api }194abstract class DStream[T: ClassTag] extends Serializable with Logging {195def ssc: StreamingContext196def slideDuration: Duration197def dependencies: List[DStream[_]]198def compute(time: Time): Option[RDD[T]]199}200```201202### Basic Transformations203204**map**: Apply function to each element in each batch205```scala { .api }206def map[U: ClassTag](mapFunc: T => U): DStream[U]207```208209```scala210val numbers = ssc.socketTextStream("localhost", 9999)211val doubled = numbers.map(_.toInt * 2)212```213214**flatMap**: Apply function and flatten results215```scala { .api }216def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U]217```218219```scala220val lines = ssc.textFileStream("input/")221val words = lines.flatMap(_.split(" "))222```223224**filter**: Keep elements matching predicate225```scala { .api }226def filter(filterFunc: T => Boolean): DStream[T]227```228229```scala230val validLines = lines.filter(_.nonEmpty)231val longWords = words.filter(_.length > 5)232```233234**glom**: Coalesce elements within each partition into arrays235```scala { .api }236def glom(): DStream[Array[T]]237```238239### Stream Operations240241**union**: Union with another DStream242```scala { .api }243def union(that: DStream[T]): DStream[T]244```245246```scala247val stream1 = ssc.socketTextStream("host1", 9999)248val stream2 = ssc.socketTextStream("host2", 9999)249val combined = stream1.union(stream2)250```251252### Aggregation Transformations253254**count**: Count elements in each batch255```scala { .api }256def count(): DStream[Long]257```258259**countByValue**: Count occurrences of each value260```scala { .api }261def countByValue()(implicit ord: Ordering[T] = null): DStream[(T, Long)]262```263264**reduce**: Reduce elements in each batch265```scala { .api }266def reduce(reduceFunc: (T, T) => T): DStream[T]267```268269```scala270val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)271272val counts = numbers.count() // Count per batch273val sums = numbers.reduce(_ + _) // Sum per batch274val maxValues = numbers.reduce(math.max) // Max per batch275```276277### Advanced Transformations278279**transform**: Apply arbitrary RDD-to-RDD function280```scala { .api }281def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]282def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]283```284285```scala286val enhanced = stream.transform { (rdd, time) =>287// Access to both RDD and batch time288val timeString = time.toString289rdd.map(data => s"$timeString: $data")290.filter(_.contains("important"))291}292```293294**transformWith**: Transform with another DStream295```scala { .api }296def transformWith[U: ClassTag, V: ClassTag](other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]): DStream[V]297def transformWith[U: ClassTag, V: ClassTag](other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]): DStream[V]298```299300```scala301val stream1 = ssc.socketTextStream("localhost", 9999)302val stream2 = ssc.socketTextStream("localhost", 8888)303304val joined = stream1.transformWith(stream2) { (rdd1, rdd2) =>305// Join RDDs from different streams306val pairs1 = rdd1.map(line => (extractKey(line), line))307val pairs2 = rdd2.map(line => (extractKey(line), line))308pairs1.join(pairs2).map { case (key, (v1, v2)) => s"$v1 | $v2" }309}310```311312## Window Operations313314Window operations allow you to apply transformations over a sliding window of data.315316### Basic Windowing317318**window**: Return windowed DStream319```scala { .api }320def window(windowDuration: Duration): DStream[T]321def window(windowDuration: Duration, slideDuration: Duration): DStream[T]322```323324```scala325val lines = ssc.socketTextStream("localhost", 9999)326327// 30-second window, sliding every 10 seconds328val windowedLines = lines.window(Seconds(30), Seconds(10))329val windowCounts = windowedLines.count()330```331332### Windowed Reductions333334**reduceByWindow**: Reduce over a window335```scala { .api }336def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]337def reduceByWindow(reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]338```339340```scala341val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)342343// Sum over window344val windowSums = numbers.reduceByWindow(345_ + _, // Add new values346Seconds(60), // Window duration347Seconds(20) // Slide duration348)349350// Efficient windowed reduction with inverse function351val efficientSums = numbers.reduceByWindow(352_ + _, // Add function353_ - _, // Inverse (subtract) function354Seconds(60),355Seconds(20)356)357```358359**countByWindow**: Count elements over window360```scala { .api }361def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]362```363364**countByValueAndWindow**: Count values over window365```scala { .api }366def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null): DStream[(T, Long)]367```368369```scala370val words = ssc.socketTextStream("localhost", 9999).flatMap(_.split(" "))371372// Count words in 2-minute window, sliding every 30 seconds373val wordCounts = words.countByValueAndWindow(Minutes(2), Seconds(30))374wordCounts.print()375```376377## PairDStreamFunctions (Key-Value Operations)378379Operations available on DStreams of (key, value) pairs through implicit conversion.380381### Key-Value Transformations382383**keys and values**:384```scala { .api }385def keys: DStream[K]386def values: DStream[V]387```388389**mapValues**: Transform values while preserving keys390```scala { .api }391def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)]392```393394**flatMapValues**: FlatMap values while preserving keys395```scala { .api }396def flatMapValues[U: ClassTag](flatMapValuesFunc: V => TraversableOnce[U]): DStream[(K, U)]397```398399```scala400import org.apache.spark.streaming.StreamingContext._401402val pairs = ssc.socketTextStream("localhost", 9999)403.map(line => {404val parts = line.split(",")405(parts(0), parts(1).toInt)406})407408val doubled = pairs.mapValues(_ * 2)409val allKeys = pairs.keys410val allValues = pairs.values411```412413### Aggregation by Key414415**groupByKey**: Group values by key in each batch416```scala { .api }417def groupByKey(): DStream[(K, Iterable[V])]418def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])]419def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]420```421422**reduceByKey**: Reduce values by key in each batch423```scala { .api }424def reduceByKey(func: (V, V) => V): DStream[(K, V)]425def reduceByKey(func: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]426def reduceByKey(func: (V, V) => V, numPartitions: Int): DStream[(K, V)]427```428429**combineByKey**: Generic combine by key430```scala { .api }431def combineByKey[C: ClassTag](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, partitioner: Partitioner): DStream[(K, C)]432```433434```scala435val wordStream = ssc.socketTextStream("localhost", 9999)436.flatMap(_.split(" "))437.map(word => (word, 1))438439// Count words in each batch440val wordCounts = wordStream.reduceByKey(_ + _)441442// Group all occurrences443val wordGroups = wordStream.groupByKey()444```445446### Windowed Key-Value Operations447448**groupByKeyAndWindow**: Group by key over window449```scala { .api }450def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]451def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]452def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner): DStream[(K, Iterable[V])]453```454455**reduceByKeyAndWindow**: Reduce by key over window456```scala { .api }457def reduceByKeyAndWindow(func: (V, V) => V, windowDuration: Duration): DStream[(K, V)]458def reduceByKeyAndWindow(func: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]459def reduceByKeyAndWindow(func: (V, V) => V, invFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]460```461462**countByKeyAndWindow**: Count by key over window463```scala { .api }464def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Long)]465```466467```scala468val wordPairs = ssc.socketTextStream("localhost", 9999)469.flatMap(_.split(" "))470.map((_, 1))471472// Windowed word count (last 5 minutes, every 30 seconds)473val windowedWordCounts = wordPairs.reduceByKeyAndWindow(474_ + _, // Reduce function475Minutes(5), // Window duration476Seconds(30) // Slide duration477)478479// Efficient version with inverse function480val efficientWordCounts = wordPairs.reduceByKeyAndWindow(481_ + _, // Add function482_ - _, // Subtract function (inverse)483Minutes(5),484Seconds(30)485)486```487488### Join Operations489490**join**: Join with another DStream491```scala { .api }492def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]493def join[W: ClassTag](other: DStream[(K, W)], partitioner: Partitioner): DStream[(K, (V, W))]494```495496**leftOuterJoin**, **rightOuterJoin**, **fullOuterJoin**:497```scala { .api }498def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]499def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]500def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))]501```502503**cogroup**: Group together with another DStream504```scala { .api }505def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]506```507508```scala509val stream1 = ssc.socketTextStream("localhost", 9999)510.map(line => (line.split(",")(0), line.split(",")(1)))511512val stream2 = ssc.socketTextStream("localhost", 8888)513.map(line => (line.split(",")(0), line.split(",")(1)))514515// Inner join516val joined = stream1.join(stream2)517518// Left outer join519val leftJoined = stream1.leftOuterJoin(stream2)520521// Cogroup522val cogrouped = stream1.cogroup(stream2)523```524525## Stateful Operations526527### updateStateByKey528529Maintain state across batches for each key:530531```scala { .api }532def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]533def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner): DStream[(K, S)]534def updateStateByKey[S: ClassTag](updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean): DStream[(K, S)]535```536537```scala538// Running count of words539val wordPairs = ssc.socketTextStream("localhost", 9999)540.flatMap(_.split(" "))541.map((_, 1))542543val runningCounts = wordPairs.updateStateByKey { (values: Seq[Int], state: Option[Int]) =>544val newCount = values.sum + state.getOrElse(0)545Some(newCount)546}547548// Advanced state management549case class WordStats(count: Int, lastSeen: Long)550551val wordStats = wordPairs.updateStateByKey { (values: Seq[Int], state: Option[WordStats]) =>552val currentTime = System.currentTimeMillis()553val currentCount = values.sum554555state match {556case Some(stats) => Some(WordStats(stats.count + currentCount, currentTime))557case None => Some(WordStats(currentCount, currentTime))558}559}560```561562## DStream Actions563564Actions trigger the execution of DStream transformations.565566### Output Operations567568**print**: Print first 10 elements of each batch569```scala { .api }570def print(): Unit571def print(num: Int): Unit572```573574**foreachRDD**: Apply function to each RDD575```scala { .api }576def foreachRDD(foreachFunc: RDD[T] => Unit): Unit577def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit578```579580```scala581val processed = stream.map(process)582583// Print results584processed.print()585processed.print(20) // Print first 20 elements586587// Custom processing of each batch588processed.foreachRDD { rdd =>589val count = rdd.count()590if (count > 0) {591println(s"Batch size: $count")592rdd.take(10).foreach(println)593}594}595596// With time information597processed.foreachRDD { (rdd, time) =>598println(s"Batch time: $time, Count: ${rdd.count()}")599}600```601602### Save Operations603604**saveAsTextFiles**: Save each batch as text files605```scala { .api }606def saveAsTextFiles(prefix: String, suffix: String = ""): Unit607```608609**saveAsObjectFiles**: Save each batch as object files610```scala { .api }611def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit612```613614```scala615val processed = stream.map(_.toUpperCase)616617// Save each batch - creates files like output-1414060920000, output-1414060921000, etc.618processed.saveAsTextFiles("hdfs://path/to/output", ".txt")619620// Save as object files621processed.saveAsObjectFiles("hdfs://path/to/objects")622```623624## StreamingContext Control625626### Starting and Stopping627628**start**: Start the streaming computation629```scala { .api }630def start(): Unit631```632633**stop**: Stop the streaming context634```scala { .api }635def stop(): Unit636def stop(stopSparkContext: Boolean): Unit637def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit638```639640**awaitTermination**: Wait for termination641```scala { .api }642def awaitTermination(): Unit643def awaitTermination(timeout: Long): Boolean644```645646```scala647val ssc = new StreamingContext(conf, Seconds(1))648649// Define streaming computation650val stream = ssc.socketTextStream("localhost", 9999)651stream.print()652653// Start the computation654ssc.start()655656// Wait for termination657ssc.awaitTermination()658659// Or wait with timeout660val terminated = ssc.awaitTermination(60000) // 60 seconds661if (!terminated) {662println("Streaming did not terminate within 60 seconds")663ssc.stop()664}665```666667### Checkpointing668669**checkpoint**: Set checkpoint directory670```scala { .api }671def checkpoint(directory: String): Unit672```673674**getOrCreate**: Get existing context from checkpoint or create new one675```scala { .api }676def getOrCreate(checkpointPath: String, creatingFunc: () => StreamingContext, hadoopConf: Configuration = new Configuration()): StreamingContext677```678679```scala680// Enable checkpointing681ssc.checkpoint("hdfs://path/to/checkpoints")682683// Fault-tolerant pattern684def createStreamingContext(): StreamingContext = {685val ssc = new StreamingContext(conf, Seconds(1))686687// Define streaming computation688val lines = ssc.socketTextStream("localhost", 9999)689val words = lines.flatMap(_.split(" "))690val wordCounts = words.map((_, 1)).reduceByKey(_ + _)691wordCounts.print()692693ssc.checkpoint("hdfs://checkpoints")694ssc695}696697val ssc = StreamingContext.getOrCreate("hdfs://checkpoints", createStreamingContext _)698```699700### Context Properties701702**remember**: Set remember duration703```scala { .api }704def remember(duration: Duration): Unit705```706707**sparkContext**: Access underlying SparkContext708```scala { .api }709def sparkContext: SparkContext710```711712```scala713// Set how long to remember RDDs714ssc.remember(Minutes(10))715716// Access SparkContext717val sc = ssc.sparkContext718val broadcast = sc.broadcast(lookupTable)719```720721## Persistence and Caching722723DStreams can be persisted in memory for faster access:724725```scala { .api }726def persist(storageLevel: StorageLevel): DStream[T]727def persist(): DStream[T] // Uses MEMORY_ONLY_SER728def cache(): DStream[T] // Uses MEMORY_ONLY_SER729```730731```scala732import org.apache.spark.storage.StorageLevel733734val expensiveStream = ssc.socketTextStream("localhost", 9999)735.map(expensiveTransformation)736.cache() // Cache for reuse737738// Multiple operations on cached stream739val count = expensiveStream.count()740val sample = expensiveStream.sample(false, 0.1)741```742743## Performance and Best Practices744745### Batch Interval Selection746747```scala748// For low latency (100ms - 1s)749val ssc = new StreamingContext(conf, Milliseconds(500))750751// For high throughput (1s - 10s)752val ssc = new StreamingContext(conf, Seconds(5))753754// For batch processing style (minutes)755val ssc = new StreamingContext(conf, Minutes(2))756```757758### Parallelism and Partitioning759760```scala761// Increase parallelism for receivers762val numReceivers = 4763val streams = (1 to numReceivers).map { i =>764ssc.socketTextStream(s"host$i", 9999)765}766val unifiedStream = ssc.union(streams)767768// Repartition for better load balancing769val repartitioned = stream.transform(_.repartition(10))770```771772### Memory Management773774```scala775// Set appropriate storage levels776val persistedStream = stream777.map(expensiveOperation)778.persist(StorageLevel.MEMORY_AND_DISK_SER)779780// Enable checkpointing for fault tolerance781ssc.checkpoint("hdfs://checkpoints")782783// Use efficient serialization784conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")785```786787## Error Handling and Fault Tolerance788789Spark Streaming applications must handle various failure scenarios to ensure reliable operation.790791### Common Streaming Errors792793**StreamingContextException**: Invalid streaming context operations794```scala795try {796ssc.start()797ssc.start() // Error: context already started798} catch {799case e: IllegalStateException =>800println("Streaming context already started")801}802```803804**Receiver Failures**: Input stream receivers failing805```scala806// Monitor receiver status807ssc.addStreamingListener(new StreamingListener {808override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {809println(s"Receiver error: ${receiverError.receiverInfo.name}")810// Implement recovery logic811}812})813```814815**Batch Processing Delays**: When processing takes longer than batch interval816```scala817// Monitor batch processing times818ssc.addStreamingListener(new StreamingListener {819override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {820val processingTime = batchCompleted.batchInfo.processingDelay.getOrElse(0L)821val batchInterval = ssc.graph.batchDuration.milliseconds822823if (processingTime > batchInterval) {824println(s"Warning: Processing time ($processingTime ms) > batch interval ($batchInterval ms)")825}826}827})828```829830### Checkpoint Corruption831832**Checkpoint Recovery Failures**: When checkpoint data is corrupted833```scala834def createStreamingContext(): StreamingContext = {835val ssc = new StreamingContext(conf, Seconds(1))836// Define streaming logic837ssc.checkpoint("hdfs://checkpoints")838ssc839}840841try {842val ssc = StreamingContext.getOrCreate("hdfs://checkpoints", createStreamingContext _)843} catch {844case e: Exception =>845println(s"Checkpoint recovery failed: ${e.getMessage}")846// Fall back to creating new context847val ssc = createStreamingContext()848}849```850851**Checkpoint Directory Management**:852```scala853// Clean up old checkpoints periodically854import java.io.File855import org.apache.hadoop.fs.{FileSystem, Path}856857def cleanupCheckpoints(checkpointDir: String, retentionHours: Int): Unit = {858val fs = FileSystem.get(ssc.sparkContext.hadoopConfiguration)859val checkpointPath = new Path(checkpointDir)860861try {862val cutoffTime = System.currentTimeMillis() - (retentionHours * 60 * 60 * 1000)863val files = fs.listStatus(checkpointPath)864865files.foreach { fileStatus =>866if (fileStatus.getModificationTime < cutoffTime) {867fs.delete(fileStatus.getPath, true)868println(s"Deleted old checkpoint: ${fileStatus.getPath}")869}870}871} catch {872case e: Exception => println(s"Checkpoint cleanup failed: ${e.getMessage}")873}874}875```876877### Memory and Resource Errors878879**OutOfMemoryError in Streaming**:880```scala881// Monitor memory usage and adjust batch sizes882val memoryMonitoringStream = stream.transform { rdd =>883val memoryUsed = Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory884val memoryMax = Runtime.getRuntime.maxMemory885val memoryPercent = (memoryUsed.toDouble / memoryMax) * 100886887if (memoryPercent > 80) {888println(s"Warning: Memory usage at ${memoryPercent.toInt}%")889// Reduce batch size or increase memory890}891892rdd893}894```895896**Backpressure Issues**: When input rate exceeds processing capacity897```scala898// Enable backpressure (Spark 1.5+)899conf.set("spark.streaming.backpressure.enabled", "true")900conf.set("spark.streaming.backpressure.initialRate", "1000")901902// Manual rate limiting903conf.set("spark.streaming.kafka.maxRatePerPartition", "1000")904```905906### Network and Connectivity Errors907908**Socket Connection Failures**:909```scala910// Implement retry logic for socket connections911def createReliableSocketStream(hostname: String, port: Int, maxRetries: Int = 3): DStream[String] = {912var attempts = 0913var stream: DStream[String] = null914915while (attempts < maxRetries && stream == null) {916try {917stream = ssc.socketTextStream(hostname, port)918println(s"Connected to $hostname:$port")919} catch {920case e: ConnectException =>921attempts += 1922println(s"Connection attempt $attempts failed: ${e.getMessage}")923if (attempts < maxRetries) {924Thread.sleep(5000) // Wait 5 seconds before retry925}926}927}928929if (stream == null) {930throw new RuntimeException(s"Failed to connect after $maxRetries attempts")931}932933stream934}935```936937**Kafka Connection Issues**:938```scala939// Handle Kafka metadata refresh failures940val kafkaParams = Map[String, String](941"metadata.broker.list" -> "broker1:9092,broker2:9092",942"auto.offset.reset" -> "smallest",943"refresh.leader.backoff.ms" -> "1000",944"socket.timeout.ms" -> "30000",945"fetch.message.max.bytes" -> "1048576"946)947948try {949val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](950ssc, kafkaParams, topics951)952} catch {953case e: TimeoutException =>954println("Kafka connection timeout - check broker availability")955case e: Exception =>956println(s"Kafka stream creation failed: ${e.getMessage}")957}958```959960### Processing Errors and Recovery961962**Exception Handling in Transformations**:963```scala964val robustStream = stream.map { record =>965try {966processRecord(record)967} catch {968case e: NumberFormatException =>969println(s"Invalid number format in record: $record")970null // or default value971case e: Exception =>972println(s"Processing error for record $record: ${e.getMessage}")973null974}975}.filter(_ != null) // Remove failed records976```977978**Dead Letter Queue Pattern**:979```scala980val (successStream, errorStream) = stream.transform { rdd =>981val processed = rdd.map { record =>982try {983(Some(processRecord(record)), None)984} catch {985case e: Exception =>986(None, Some((record, e.getMessage)))987}988}.cache() // Cache to avoid recomputation989990val successes = processed.filter(_._1.isDefined).map(_._1.get)991val errors = processed.filter(_._2.isDefined).map(_._2.get)992993// Save errors to dead letter queue994errors.foreachPartition { partition =>995partition.foreach { case (record, error) =>996saveToDeadLetterQueue(record, error)997}998}9991000successes1001}1002```10031004### Best Practices for Error Handling100510061. **Enable Checkpointing**: Always use checkpointing for production applications10072. **Monitor Batch Processing Times**: Ensure processing time < batch interval10083. **Implement Circuit Breakers**: Fail fast when external services are down10094. **Use Write-Ahead Logs**: Enable WAL for reliable receivers10105. **Handle Partial Failures**: Process what you can, log what fails10116. **Set Up Monitoring**: Use Spark UI and external monitoring tools10121013```scala1014// Comprehensive error handling pattern1015def createRobustStreamingApp(): StreamingContext = {1016val ssc = new StreamingContext(conf, Seconds(1))10171018// Enable fault tolerance features1019ssc.checkpoint("hdfs://checkpoints")1020conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")1021conf.set("spark.streaming.backpressure.enabled", "true")10221023val stream = ssc.socketTextStream("localhost", 9999)1024.map(parseRecord)1025.filter(_.isDefined)1026.map(_.get)1027.handleErrors()1028.cache()10291030// Multiple outputs for different purposes1031stream.print()1032stream.saveAsTextFiles("hdfs://output/data")10331034// Add monitoring1035ssc.addStreamingListener(new CustomStreamingListener())10361037ssc1038}10391040implicit class RobustDStream[T](dstream: DStream[T]) {1041def handleErrors(): DStream[T] = {1042dstream.transform { rdd =>1043rdd.filter(_ != null).handlePartitionErrors()1044}1045}1046}1047```10481049This comprehensive guide covers the complete Spark Streaming API along with robust error handling patterns for building scalable, fault-tolerant stream processing applications.