0
# Spark Streaming
1
2
Spark Streaming enables scalable, high-throughput, fault-tolerant stream processing of live data streams. It ingests data from sources like Kafka, Flume, Twitter, and TCP sockets, and processes it using high-level functions like map, reduce, join, and window operations.
3
4
## Core Concepts
5
6
Spark Streaming discretizes live data streams into micro-batches called **DStreams** (Discretized Streams). Each batch is processed as an RDD, enabling the use of Spark's batch processing APIs on streaming data.
7
8
## StreamingContext
9
10
The main entry point for Spark Streaming functionality.
11
12
### StreamingContext Class
13
14
```scala { .api }
15
class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) extends Logging {
16
// Alternative constructors
17
def this(conf: SparkConf, batchDuration: Duration)
18
def this(master: String, appName: String, batchDuration: Duration, sparkHome: String = null, jars: Seq[String] = Nil, environment: Map[String, String] = Map())
19
def this(path: String, hadoopConf: Configuration = new Configuration())
20
}
21
```
22
23
### Creating StreamingContext
24
25
```scala
26
import org.apache.spark.streaming.{StreamingContext, Seconds, Minutes}
27
import org.apache.spark.{SparkContext, SparkConf}
28
29
// From existing SparkContext
30
val sc = new SparkContext(conf)
31
val ssc = new StreamingContext(sc, Seconds(1))
32
33
// From SparkConf
34
val conf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]")
35
val ssc = new StreamingContext(conf, Seconds(2))
36
37
// With all parameters
38
val ssc = new StreamingContext(
39
master = "local[*]",
40
appName = "My Streaming App",
41
batchDuration = Seconds(1),
42
sparkHome = "/path/to/spark",
43
jars = Seq("app.jar"),
44
environment = Map("ENV_VAR" -> "value")
45
)
46
47
// From checkpoint (recovery)
48
val ssc = new StreamingContext("hdfs://path/to/checkpoint", new Configuration())
49
```
50
51
### Duration Helper Objects
52
53
```scala { .api }
54
import org.apache.spark.streaming.{Milliseconds, Seconds, Minutes}
55
56
Milliseconds(500) // 500 milliseconds
57
Seconds(1) // 1 second
58
Seconds(30) // 30 seconds
59
Minutes(1) // 1 minute
60
```
61
62
## Input DStream Creation
63
64
### Socket Streams
65
66
**socketTextStream**: Create DStream from TCP socket
67
```scala { .api }
68
def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]
69
```
70
71
```scala
72
// Connect to TCP socket for text data
73
val lines = ssc.socketTextStream("localhost", 9999)
74
75
// Process the stream
76
val words = lines.flatMap(_.split(" "))
77
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
78
wordCounts.print()
79
```
80
81
**socketStream**: Custom socket stream with converter
82
```scala { .api }
83
def socketStream[T: ClassTag](hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel): ReceiverInputDStream[T]
84
```
85
86
**rawSocketStream**: Raw socket stream returning byte arrays
87
```scala { .api }
88
def rawSocketStream[T: ClassTag](hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[T]
89
```
90
91
### File Streams
92
93
**textFileStream**: Monitor directory for new text files
94
```scala { .api }
95
def textFileStream(directory: String): DStream[String]
96
```
97
98
```scala
99
// Monitor directory for new files
100
val fileStream = ssc.textFileStream("hdfs://path/to/directory")
101
102
// Process new files as they arrive
103
val processed = fileStream
104
.filter(_.nonEmpty)
105
.map(_.toUpperCase)
106
107
processed.print()
108
```
109
110
**fileStream**: Generic file stream with InputFormat
111
```scala { .api }
112
def fileStream[K, V, F <: NewInputFormat[K, V]: ClassTag](directory: String, filter: Path => Boolean = _ => true, newFilesOnly: Boolean = true): InputDStream[(K, V)]
113
```
114
115
```scala
116
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
117
import org.apache.hadoop.io.{LongWritable, Text}
118
119
val hadoopStream = ssc.fileStream[LongWritable, Text, TextInputFormat]("hdfs://input/dir")
120
val textStream = hadoopStream.map(_._2.toString)
121
```
122
123
### Queue Streams (for testing)
124
125
**queueStream**: Create stream from queue of RDDs
126
```scala { .api }
127
def queueStream[T: ClassTag](queue: Queue[RDD[T]], oneAtATime: Boolean = true, defaultRDD: RDD[T] = null): InputDStream[T]
128
```
129
130
```scala
131
import scala.collection.mutable.Queue
132
133
val rddQueue = Queue[RDD[Int]]()
134
135
// Create stream from queue
136
val queueStream = ssc.queueStream(rddQueue)
137
138
// Add RDDs to queue (simulate data arrival)
139
for (i <- 1 to 10) {
140
rddQueue += ssc.sparkContext.parallelize(1 to 100)
141
}
142
```
143
144
### Custom Receiver Streams
145
146
**receiverStream**: Create stream from custom receiver
147
```scala { .api }
148
def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]
149
```
150
151
```scala
152
import org.apache.spark.streaming.receiver.Receiver
153
import org.apache.spark.storage.StorageLevel
154
155
// Custom receiver example
156
class CustomReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
157
def onStart() {
158
// Start receiving data
159
new Thread("Custom Receiver") {
160
override def run() { receive() }
161
}.start()
162
}
163
164
def onStop() {
165
// Stop receiving data
166
}
167
168
private def receive() {
169
while (!isStopped()) {
170
// Simulate data reception
171
val data = generateData()
172
store(data)
173
Thread.sleep(100)
174
}
175
}
176
}
177
178
val customStream = ssc.receiverStream(new CustomReceiver())
179
```
180
181
**actorStream**: Create stream from Akka Actor
182
```scala { .api }
183
def actorStream[T: ClassTag](props: Props, name: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[T]
184
```
185
186
## DStream Transformations
187
188
DStreams support transformations similar to RDDs, applied to each batch.
189
190
### DStream Class
191
192
```scala { .api }
193
abstract class DStream[T: ClassTag] extends Serializable with Logging {
194
def ssc: StreamingContext
195
def slideDuration: Duration
196
def dependencies: List[DStream[_]]
197
def compute(time: Time): Option[RDD[T]]
198
}
199
```
200
201
### Basic Transformations
202
203
**map**: Apply function to each element in each batch
204
```scala { .api }
205
def map[U: ClassTag](mapFunc: T => U): DStream[U]
206
```
207
208
```scala
209
val numbers = ssc.socketTextStream("localhost", 9999)
210
val doubled = numbers.map(_.toInt * 2)
211
```
212
213
**flatMap**: Apply function and flatten results
214
```scala { .api }
215
def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U]
216
```
217
218
```scala
219
val lines = ssc.textFileStream("input/")
220
val words = lines.flatMap(_.split(" "))
221
```
222
223
**filter**: Keep elements matching predicate
224
```scala { .api }
225
def filter(filterFunc: T => Boolean): DStream[T]
226
```
227
228
```scala
229
val validLines = lines.filter(_.nonEmpty)
230
val longWords = words.filter(_.length > 5)
231
```
232
233
**glom**: Coalesce elements within each partition into arrays
234
```scala { .api }
235
def glom(): DStream[Array[T]]
236
```
237
238
### Stream Operations
239
240
**union**: Union with another DStream
241
```scala { .api }
242
def union(that: DStream[T]): DStream[T]
243
```
244
245
```scala
246
val stream1 = ssc.socketTextStream("host1", 9999)
247
val stream2 = ssc.socketTextStream("host2", 9999)
248
val combined = stream1.union(stream2)
249
```
250
251
### Aggregation Transformations
252
253
**count**: Count elements in each batch
254
```scala { .api }
255
def count(): DStream[Long]
256
```
257
258
**countByValue**: Count occurrences of each value
259
```scala { .api }
260
def countByValue()(implicit ord: Ordering[T] = null): DStream[(T, Long)]
261
```
262
263
**reduce**: Reduce elements in each batch
264
```scala { .api }
265
def reduce(reduceFunc: (T, T) => T): DStream[T]
266
```
267
268
```scala
269
val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)
270
271
val counts = numbers.count() // Count per batch
272
val sums = numbers.reduce(_ + _) // Sum per batch
273
val maxValues = numbers.reduce(math.max) // Max per batch
274
```
275
276
### Advanced Transformations
277
278
**transform**: Apply arbitrary RDD-to-RDD function
279
```scala { .api }
280
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]
281
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]
282
```
283
284
```scala
285
val enhanced = stream.transform { (rdd, time) =>
286
// Access to both RDD and batch time
287
val timeString = time.toString
288
rdd.map(data => s"$timeString: $data")
289
.filter(_.contains("important"))
290
}
291
```
292
293
**transformWith**: Transform with another DStream
294
```scala { .api }
295
def transformWith[U: ClassTag, V: ClassTag](other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]): DStream[V]
296
def transformWith[U: ClassTag, V: ClassTag](other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]): DStream[V]
297
```
298
299
```scala
300
val stream1 = ssc.socketTextStream("localhost", 9999)
301
val stream2 = ssc.socketTextStream("localhost", 8888)
302
303
val joined = stream1.transformWith(stream2) { (rdd1, rdd2) =>
304
// Join RDDs from different streams
305
val pairs1 = rdd1.map(line => (extractKey(line), line))
306
val pairs2 = rdd2.map(line => (extractKey(line), line))
307
pairs1.join(pairs2).map { case (key, (v1, v2)) => s"$v1 | $v2" }
308
}
309
```
310
311
## Window Operations
312
313
Window operations allow you to apply transformations over a sliding window of data.
314
315
### Basic Windowing
316
317
**window**: Return windowed DStream
318
```scala { .api }
319
def window(windowDuration: Duration): DStream[T]
320
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
321
```
322
323
```scala
324
val lines = ssc.socketTextStream("localhost", 9999)
325
326
// 30-second window, sliding every 10 seconds
327
val windowedLines = lines.window(Seconds(30), Seconds(10))
328
val windowCounts = windowedLines.count()
329
```
330
331
### Windowed Reductions
332
333
**reduceByWindow**: Reduce over a window
334
```scala { .api }
335
def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
336
def reduceByWindow(reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
337
```
338
339
```scala
340
val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)
341
342
// Sum over window
343
val windowSums = numbers.reduceByWindow(
344
_ + _, // Add new values
345
Seconds(60), // Window duration
346
Seconds(20) // Slide duration
347
)
348
349
// Efficient windowed reduction with inverse function
350
val efficientSums = numbers.reduceByWindow(
351
_ + _, // Add function
352
_ - _, // Inverse (subtract) function
353
Seconds(60),
354
Seconds(20)
355
)
356
```
357
358
**countByWindow**: Count elements over window
359
```scala { .api }
360
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]
361
```
362
363
**countByValueAndWindow**: Count values over window
364
```scala { .api }
365
def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null): DStream[(T, Long)]
366
```
367
368
```scala
369
val words = ssc.socketTextStream("localhost", 9999).flatMap(_.split(" "))
370
371
// Count words in 2-minute window, sliding every 30 seconds
372
val wordCounts = words.countByValueAndWindow(Minutes(2), Seconds(30))
373
wordCounts.print()
374
```
375
376
## PairDStreamFunctions (Key-Value Operations)
377
378
Operations available on DStreams of (key, value) pairs through implicit conversion.
379
380
### Key-Value Transformations
381
382
**keys and values**:
383
```scala { .api }
384
def keys: DStream[K]
385
def values: DStream[V]
386
```
387
388
**mapValues**: Transform values while preserving keys
389
```scala { .api }
390
def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)]
391
```
392
393
**flatMapValues**: FlatMap values while preserving keys
394
```scala { .api }
395
def flatMapValues[U: ClassTag](flatMapValuesFunc: V => TraversableOnce[U]): DStream[(K, U)]
396
```
397
398
```scala
399
import org.apache.spark.streaming.StreamingContext._
400
401
val pairs = ssc.socketTextStream("localhost", 9999)
402
.map(line => {
403
val parts = line.split(",")
404
(parts(0), parts(1).toInt)
405
})
406
407
val doubled = pairs.mapValues(_ * 2)
408
val allKeys = pairs.keys
409
val allValues = pairs.values
410
```
411
412
### Aggregation by Key
413
414
**groupByKey**: Group values by key in each batch
415
```scala { .api }
416
def groupByKey(): DStream[(K, Iterable[V])]
417
def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])]
418
def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]
419
```
420
421
**reduceByKey**: Reduce values by key in each batch
422
```scala { .api }
423
def reduceByKey(func: (V, V) => V): DStream[(K, V)]
424
def reduceByKey(func: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]
425
def reduceByKey(func: (V, V) => V, numPartitions: Int): DStream[(K, V)]
426
```
427
428
**combineByKey**: Generic combine by key
429
```scala { .api }
430
def combineByKey[C: ClassTag](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, partitioner: Partitioner): DStream[(K, C)]
431
```
432
433
```scala
434
val wordStream = ssc.socketTextStream("localhost", 9999)
435
.flatMap(_.split(" "))
436
.map(word => (word, 1))
437
438
// Count words in each batch
439
val wordCounts = wordStream.reduceByKey(_ + _)
440
441
// Group all occurrences
442
val wordGroups = wordStream.groupByKey()
443
```
444
445
### Windowed Key-Value Operations
446
447
**groupByKeyAndWindow**: Group by key over window
448
```scala { .api }
449
def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]
450
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]
451
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner): DStream[(K, Iterable[V])]
452
```
453
454
**reduceByKeyAndWindow**: Reduce by key over window
455
```scala { .api }
456
def reduceByKeyAndWindow(func: (V, V) => V, windowDuration: Duration): DStream[(K, V)]
457
def reduceByKeyAndWindow(func: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]
458
def reduceByKeyAndWindow(func: (V, V) => V, invFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]
459
```
460
461
**countByKeyAndWindow**: Count by key over window
462
```scala { .api }
463
def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Long)]
464
```
465
466
```scala
467
val wordPairs = ssc.socketTextStream("localhost", 9999)
468
.flatMap(_.split(" "))
469
.map((_, 1))
470
471
// Windowed word count (last 5 minutes, every 30 seconds)
472
val windowedWordCounts = wordPairs.reduceByKeyAndWindow(
473
_ + _, // Reduce function
474
Minutes(5), // Window duration
475
Seconds(30) // Slide duration
476
)
477
478
// Efficient version with inverse function
479
val efficientWordCounts = wordPairs.reduceByKeyAndWindow(
480
_ + _, // Add function
481
_ - _, // Subtract function (inverse)
482
Minutes(5),
483
Seconds(30)
484
)
485
```
486
487
### Join Operations
488
489
**join**: Join with another DStream
490
```scala { .api }
491
def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]
492
def join[W: ClassTag](other: DStream[(K, W)], partitioner: Partitioner): DStream[(K, (V, W))]
493
```
494
495
**leftOuterJoin**, **rightOuterJoin**, **fullOuterJoin**:
496
```scala { .api }
497
def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
498
def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]
499
def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))]
500
```
501
502
**cogroup**: Group together with another DStream
503
```scala { .api }
504
def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]
505
```
506
507
```scala
508
val stream1 = ssc.socketTextStream("localhost", 9999)
509
.map(line => (line.split(",")(0), line.split(",")(1)))
510
511
val stream2 = ssc.socketTextStream("localhost", 8888)
512
.map(line => (line.split(",")(0), line.split(",")(1)))
513
514
// Inner join
515
val joined = stream1.join(stream2)
516
517
// Left outer join
518
val leftJoined = stream1.leftOuterJoin(stream2)
519
520
// Cogroup
521
val cogrouped = stream1.cogroup(stream2)
522
```
523
524
## Stateful Operations
525
526
### updateStateByKey
527
528
Maintain state across batches for each key:
529
530
```scala { .api }
531
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
532
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner): DStream[(K, S)]
533
def updateStateByKey[S: ClassTag](updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean): DStream[(K, S)]
534
```
535
536
```scala
537
// Running count of words
538
val wordPairs = ssc.socketTextStream("localhost", 9999)
539
.flatMap(_.split(" "))
540
.map((_, 1))
541
542
val runningCounts = wordPairs.updateStateByKey { (values: Seq[Int], state: Option[Int]) =>
543
val newCount = values.sum + state.getOrElse(0)
544
Some(newCount)
545
}
546
547
// Advanced state management
548
case class WordStats(count: Int, lastSeen: Long)
549
550
val wordStats = wordPairs.updateStateByKey { (values: Seq[Int], state: Option[WordStats]) =>
551
val currentTime = System.currentTimeMillis()
552
val currentCount = values.sum
553
554
state match {
555
case Some(stats) => Some(WordStats(stats.count + currentCount, currentTime))
556
case None => Some(WordStats(currentCount, currentTime))
557
}
558
}
559
```
560
561
## DStream Actions
562
563
Actions trigger the execution of DStream transformations.
564
565
### Output Operations
566
567
**print**: Print first 10 elements of each batch
568
```scala { .api }
569
def print(): Unit
570
def print(num: Int): Unit
571
```
572
573
**foreachRDD**: Apply function to each RDD
574
```scala { .api }
575
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
576
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit
577
```
578
579
```scala
580
val processed = stream.map(process)
581
582
// Print results
583
processed.print()
584
processed.print(20) // Print first 20 elements
585
586
// Custom processing of each batch
587
processed.foreachRDD { rdd =>
588
val count = rdd.count()
589
if (count > 0) {
590
println(s"Batch size: $count")
591
rdd.take(10).foreach(println)
592
}
593
}
594
595
// With time information
596
processed.foreachRDD { (rdd, time) =>
597
println(s"Batch time: $time, Count: ${rdd.count()}")
598
}
599
```
600
601
### Save Operations
602
603
**saveAsTextFiles**: Save each batch as text files
604
```scala { .api }
605
def saveAsTextFiles(prefix: String, suffix: String = ""): Unit
606
```
607
608
**saveAsObjectFiles**: Save each batch as object files
609
```scala { .api }
610
def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit
611
```
612
613
```scala
614
val processed = stream.map(_.toUpperCase)
615
616
// Save each batch - creates files like output-1414060920000, output-1414060921000, etc.
617
processed.saveAsTextFiles("hdfs://path/to/output", ".txt")
618
619
// Save as object files
620
processed.saveAsObjectFiles("hdfs://path/to/objects")
621
```
622
623
## StreamingContext Control
624
625
### Starting and Stopping
626
627
**start**: Start the streaming computation
628
```scala { .api }
629
def start(): Unit
630
```
631
632
**stop**: Stop the streaming context
633
```scala { .api }
634
def stop(): Unit
635
def stop(stopSparkContext: Boolean): Unit
636
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit
637
```
638
639
**awaitTermination**: Wait for termination
640
```scala { .api }
641
def awaitTermination(): Unit
642
def awaitTermination(timeout: Long): Boolean
643
```
644
645
```scala
646
val ssc = new StreamingContext(conf, Seconds(1))
647
648
// Define streaming computation
649
val stream = ssc.socketTextStream("localhost", 9999)
650
stream.print()
651
652
// Start the computation
653
ssc.start()
654
655
// Wait for termination
656
ssc.awaitTermination()
657
658
// Or wait with timeout
659
val terminated = ssc.awaitTermination(60000) // 60 seconds
660
if (!terminated) {
661
println("Streaming did not terminate within 60 seconds")
662
ssc.stop()
663
}
664
```
665
666
### Checkpointing
667
668
**checkpoint**: Set checkpoint directory
669
```scala { .api }
670
def checkpoint(directory: String): Unit
671
```
672
673
**getOrCreate**: Get existing context from checkpoint or create new one
674
```scala { .api }
675
def getOrCreate(checkpointPath: String, creatingFunc: () => StreamingContext, hadoopConf: Configuration = new Configuration()): StreamingContext
676
```
677
678
```scala
679
// Enable checkpointing
680
ssc.checkpoint("hdfs://path/to/checkpoints")
681
682
// Fault-tolerant pattern
683
def createStreamingContext(): StreamingContext = {
684
val ssc = new StreamingContext(conf, Seconds(1))
685
686
// Define streaming computation
687
val lines = ssc.socketTextStream("localhost", 9999)
688
val words = lines.flatMap(_.split(" "))
689
val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
690
wordCounts.print()
691
692
ssc.checkpoint("hdfs://checkpoints")
693
ssc
694
}
695
696
val ssc = StreamingContext.getOrCreate("hdfs://checkpoints", createStreamingContext _)
697
```
698
699
### Context Properties
700
701
**remember**: Set remember duration
702
```scala { .api }
703
def remember(duration: Duration): Unit
704
```
705
706
**sparkContext**: Access underlying SparkContext
707
```scala { .api }
708
def sparkContext: SparkContext
709
```
710
711
```scala
712
// Set how long to remember RDDs
713
ssc.remember(Minutes(10))
714
715
// Access SparkContext
716
val sc = ssc.sparkContext
717
val broadcast = sc.broadcast(lookupTable)
718
```
719
720
## Persistence and Caching
721
722
DStreams can be persisted in memory for faster access:
723
724
```scala { .api }
725
def persist(storageLevel: StorageLevel): DStream[T]
726
def persist(): DStream[T] // Uses MEMORY_ONLY_SER
727
def cache(): DStream[T] // Uses MEMORY_ONLY_SER
728
```
729
730
```scala
731
import org.apache.spark.storage.StorageLevel
732
733
val expensiveStream = ssc.socketTextStream("localhost", 9999)
734
.map(expensiveTransformation)
735
.cache() // Cache for reuse
736
737
// Multiple operations on cached stream
738
val count = expensiveStream.count()
739
val sample = expensiveStream.sample(false, 0.1)
740
```
741
742
## Performance and Best Practices
743
744
### Batch Interval Selection
745
746
```scala
747
// For low latency (100ms - 1s)
748
val ssc = new StreamingContext(conf, Milliseconds(500))
749
750
// For high throughput (1s - 10s)
751
val ssc = new StreamingContext(conf, Seconds(5))
752
753
// For batch processing style (minutes)
754
val ssc = new StreamingContext(conf, Minutes(2))
755
```
756
757
### Parallelism and Partitioning
758
759
```scala
760
// Increase parallelism for receivers
761
val numReceivers = 4
762
val streams = (1 to numReceivers).map { i =>
763
ssc.socketTextStream(s"host$i", 9999)
764
}
765
val unifiedStream = ssc.union(streams)
766
767
// Repartition for better load balancing
768
val repartitioned = stream.transform(_.repartition(10))
769
```
770
771
### Memory Management
772
773
```scala
774
// Set appropriate storage levels
775
val persistedStream = stream
776
.map(expensiveOperation)
777
.persist(StorageLevel.MEMORY_AND_DISK_SER)
778
779
// Enable checkpointing for fault tolerance
780
ssc.checkpoint("hdfs://checkpoints")
781
782
// Use efficient serialization
783
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
784
```
785
786
## Error Handling and Fault Tolerance
787
788
Spark Streaming applications must handle various failure scenarios to ensure reliable operation.
789
790
### Common Streaming Errors
791
792
**StreamingContextException**: Invalid streaming context operations
793
```scala
794
try {
795
ssc.start()
796
ssc.start() // Error: context already started
797
} catch {
798
case e: IllegalStateException =>
799
println("Streaming context already started")
800
}
801
```
802
803
**Receiver Failures**: Input stream receivers failing
804
```scala
805
// Monitor receiver status
806
ssc.addStreamingListener(new StreamingListener {
807
override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
808
println(s"Receiver error: ${receiverError.receiverInfo.name}")
809
// Implement recovery logic
810
}
811
})
812
```
813
814
**Batch Processing Delays**: When processing takes longer than batch interval
815
```scala
816
// Monitor batch processing times
817
ssc.addStreamingListener(new StreamingListener {
818
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
819
val processingTime = batchCompleted.batchInfo.processingDelay.getOrElse(0L)
820
val batchInterval = ssc.graph.batchDuration.milliseconds
821
822
if (processingTime > batchInterval) {
823
println(s"Warning: Processing time ($processingTime ms) > batch interval ($batchInterval ms)")
824
}
825
}
826
})
827
```
828
829
### Checkpoint Corruption
830
831
**Checkpoint Recovery Failures**: When checkpoint data is corrupted
832
```scala
833
def createStreamingContext(): StreamingContext = {
834
val ssc = new StreamingContext(conf, Seconds(1))
835
// Define streaming logic
836
ssc.checkpoint("hdfs://checkpoints")
837
ssc
838
}
839
840
try {
841
val ssc = StreamingContext.getOrCreate("hdfs://checkpoints", createStreamingContext _)
842
} catch {
843
case e: Exception =>
844
println(s"Checkpoint recovery failed: ${e.getMessage}")
845
// Fall back to creating new context
846
val ssc = createStreamingContext()
847
}
848
```
849
850
**Checkpoint Directory Management**:
851
```scala
852
// Clean up old checkpoints periodically
853
import java.io.File
854
import org.apache.hadoop.fs.{FileSystem, Path}
855
856
def cleanupCheckpoints(checkpointDir: String, retentionHours: Int): Unit = {
857
val fs = FileSystem.get(ssc.sparkContext.hadoopConfiguration)
858
val checkpointPath = new Path(checkpointDir)
859
860
try {
861
val cutoffTime = System.currentTimeMillis() - (retentionHours * 60 * 60 * 1000)
862
val files = fs.listStatus(checkpointPath)
863
864
files.foreach { fileStatus =>
865
if (fileStatus.getModificationTime < cutoffTime) {
866
fs.delete(fileStatus.getPath, true)
867
println(s"Deleted old checkpoint: ${fileStatus.getPath}")
868
}
869
}
870
} catch {
871
case e: Exception => println(s"Checkpoint cleanup failed: ${e.getMessage}")
872
}
873
}
874
```
875
876
### Memory and Resource Errors
877
878
**OutOfMemoryError in Streaming**:
879
```scala
880
// Monitor memory usage and adjust batch sizes
881
val memoryMonitoringStream = stream.transform { rdd =>
882
val memoryUsed = Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory
883
val memoryMax = Runtime.getRuntime.maxMemory
884
val memoryPercent = (memoryUsed.toDouble / memoryMax) * 100
885
886
if (memoryPercent > 80) {
887
println(s"Warning: Memory usage at ${memoryPercent.toInt}%")
888
// Reduce batch size or increase memory
889
}
890
891
rdd
892
}
893
```
894
895
**Backpressure Issues**: When input rate exceeds processing capacity
896
```scala
897
// Enable backpressure (Spark 1.5+)
898
conf.set("spark.streaming.backpressure.enabled", "true")
899
conf.set("spark.streaming.backpressure.initialRate", "1000")
900
901
// Manual rate limiting
902
conf.set("spark.streaming.kafka.maxRatePerPartition", "1000")
903
```
904
905
### Network and Connectivity Errors
906
907
**Socket Connection Failures**:
908
```scala
909
// Implement retry logic for socket connections
910
def createReliableSocketStream(hostname: String, port: Int, maxRetries: Int = 3): DStream[String] = {
911
var attempts = 0
912
var stream: DStream[String] = null
913
914
while (attempts < maxRetries && stream == null) {
915
try {
916
stream = ssc.socketTextStream(hostname, port)
917
println(s"Connected to $hostname:$port")
918
} catch {
919
case e: ConnectException =>
920
attempts += 1
921
println(s"Connection attempt $attempts failed: ${e.getMessage}")
922
if (attempts < maxRetries) {
923
Thread.sleep(5000) // Wait 5 seconds before retry
924
}
925
}
926
}
927
928
if (stream == null) {
929
throw new RuntimeException(s"Failed to connect after $maxRetries attempts")
930
}
931
932
stream
933
}
934
```
935
936
**Kafka Connection Issues**:
937
```scala
938
// Handle Kafka metadata refresh failures
939
val kafkaParams = Map[String, String](
940
"metadata.broker.list" -> "broker1:9092,broker2:9092",
941
"auto.offset.reset" -> "smallest",
942
"refresh.leader.backoff.ms" -> "1000",
943
"socket.timeout.ms" -> "30000",
944
"fetch.message.max.bytes" -> "1048576"
945
)
946
947
try {
948
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
949
ssc, kafkaParams, topics
950
)
951
} catch {
952
case e: TimeoutException =>
953
println("Kafka connection timeout - check broker availability")
954
case e: Exception =>
955
println(s"Kafka stream creation failed: ${e.getMessage}")
956
}
957
```
958
959
### Processing Errors and Recovery
960
961
**Exception Handling in Transformations**:
962
```scala
963
val robustStream = stream.map { record =>
964
try {
965
processRecord(record)
966
} catch {
967
case e: NumberFormatException =>
968
println(s"Invalid number format in record: $record")
969
null // or default value
970
case e: Exception =>
971
println(s"Processing error for record $record: ${e.getMessage}")
972
null
973
}
974
}.filter(_ != null) // Remove failed records
975
```
976
977
**Dead Letter Queue Pattern**:
978
```scala
979
val (successStream, errorStream) = stream.transform { rdd =>
980
val processed = rdd.map { record =>
981
try {
982
(Some(processRecord(record)), None)
983
} catch {
984
case e: Exception =>
985
(None, Some((record, e.getMessage)))
986
}
987
}.cache() // Cache to avoid recomputation
988
989
val successes = processed.filter(_._1.isDefined).map(_._1.get)
990
val errors = processed.filter(_._2.isDefined).map(_._2.get)
991
992
// Save errors to dead letter queue
993
errors.foreachPartition { partition =>
994
partition.foreach { case (record, error) =>
995
saveToDeadLetterQueue(record, error)
996
}
997
}
998
999
successes
1000
}
1001
```
1002
1003
### Best Practices for Error Handling
1004
1005
1. **Enable Checkpointing**: Always use checkpointing for production applications
1006
2. **Monitor Batch Processing Times**: Ensure processing time < batch interval
1007
3. **Implement Circuit Breakers**: Fail fast when external services are down
1008
4. **Use Write-Ahead Logs**: Enable WAL for reliable receivers
1009
5. **Handle Partial Failures**: Process what you can, log what fails
1010
6. **Set Up Monitoring**: Use Spark UI and external monitoring tools
1011
1012
```scala
1013
// Comprehensive error handling pattern
1014
def createRobustStreamingApp(): StreamingContext = {
1015
val ssc = new StreamingContext(conf, Seconds(1))
1016
1017
// Enable fault tolerance features
1018
ssc.checkpoint("hdfs://checkpoints")
1019
conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
1020
conf.set("spark.streaming.backpressure.enabled", "true")
1021
1022
val stream = ssc.socketTextStream("localhost", 9999)
1023
.map(parseRecord)
1024
.filter(_.isDefined)
1025
.map(_.get)
1026
.handleErrors()
1027
.cache()
1028
1029
// Multiple outputs for different purposes
1030
stream.print()
1031
stream.saveAsTextFiles("hdfs://output/data")
1032
1033
// Add monitoring
1034
ssc.addStreamingListener(new CustomStreamingListener())
1035
1036
ssc
1037
}
1038
1039
implicit class RobustDStream[T](dstream: DStream[T]) {
1040
def handleErrors(): DStream[T] = {
1041
dstream.transform { rdd =>
1042
rdd.filter(_ != null).handlePartitionErrors()
1043
}
1044
}
1045
}
1046
```
1047
1048
This comprehensive guide covers the complete Spark Streaming API along with robust error handling patterns for building scalable, fault-tolerant stream processing applications.