Spec RegistrySpec Registry

Help your agents use open-source better. Learn more.

Find usage specs for your project’s dependencies

>

maven-apache-spark

Description
Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
Author
tessl
Last updated

How to use

npx @tessl/cli registry install tessl/maven-apache-spark@1.0.0

streaming.md docs/

1
# Spark Streaming
2
3
Spark Streaming enables scalable, high-throughput, fault-tolerant stream processing of live data streams. It ingests data from sources like Kafka, Flume, Twitter, and TCP sockets, and processes it using high-level functions like map, reduce, join, and window operations.
4
5
## Core Concepts
6
7
Spark Streaming discretizes live data streams into micro-batches called **DStreams** (Discretized Streams). Each batch is processed as an RDD, enabling the use of Spark's batch processing APIs on streaming data.
8
9
## StreamingContext
10
11
The main entry point for Spark Streaming functionality.
12
13
### StreamingContext Class
14
15
```scala { .api }
16
class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) extends Logging {
17
// Alternative constructors
18
def this(conf: SparkConf, batchDuration: Duration)
19
def this(master: String, appName: String, batchDuration: Duration, sparkHome: String = null, jars: Seq[String] = Nil, environment: Map[String, String] = Map())
20
def this(path: String, hadoopConf: Configuration = new Configuration())
21
}
22
```
23
24
### Creating StreamingContext
25
26
```scala
27
import org.apache.spark.streaming.{StreamingContext, Seconds, Minutes}
28
import org.apache.spark.{SparkContext, SparkConf}
29
30
// From existing SparkContext
31
val sc = new SparkContext(conf)
32
val ssc = new StreamingContext(sc, Seconds(1))
33
34
// From SparkConf
35
val conf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]")
36
val ssc = new StreamingContext(conf, Seconds(2))
37
38
// With all parameters
39
val ssc = new StreamingContext(
40
master = "local[*]",
41
appName = "My Streaming App",
42
batchDuration = Seconds(1),
43
sparkHome = "/path/to/spark",
44
jars = Seq("app.jar"),
45
environment = Map("ENV_VAR" -> "value")
46
)
47
48
// From checkpoint (recovery)
49
val ssc = new StreamingContext("hdfs://path/to/checkpoint", new Configuration())
50
```
51
52
### Duration Helper Objects
53
54
```scala { .api }
55
import org.apache.spark.streaming.{Milliseconds, Seconds, Minutes}
56
57
Milliseconds(500) // 500 milliseconds
58
Seconds(1) // 1 second
59
Seconds(30) // 30 seconds
60
Minutes(1) // 1 minute
61
```
62
63
## Input DStream Creation
64
65
### Socket Streams
66
67
**socketTextStream**: Create DStream from TCP socket
68
```scala { .api }
69
def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]
70
```
71
72
```scala
73
// Connect to TCP socket for text data
74
val lines = ssc.socketTextStream("localhost", 9999)
75
76
// Process the stream
77
val words = lines.flatMap(_.split(" "))
78
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
79
wordCounts.print()
80
```
81
82
**socketStream**: Custom socket stream with converter
83
```scala { .api }
84
def socketStream[T: ClassTag](hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel): ReceiverInputDStream[T]
85
```
86
87
**rawSocketStream**: Raw socket stream returning byte arrays
88
```scala { .api }
89
def rawSocketStream[T: ClassTag](hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[T]
90
```
91
92
### File Streams
93
94
**textFileStream**: Monitor directory for new text files
95
```scala { .api }
96
def textFileStream(directory: String): DStream[String]
97
```
98
99
```scala
100
// Monitor directory for new files
101
val fileStream = ssc.textFileStream("hdfs://path/to/directory")
102
103
// Process new files as they arrive
104
val processed = fileStream
105
.filter(_.nonEmpty)
106
.map(_.toUpperCase)
107
108
processed.print()
109
```
110
111
**fileStream**: Generic file stream with InputFormat
112
```scala { .api }
113
def fileStream[K, V, F <: NewInputFormat[K, V]: ClassTag](directory: String, filter: Path => Boolean = _ => true, newFilesOnly: Boolean = true): InputDStream[(K, V)]
114
```
115
116
```scala
117
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
118
import org.apache.hadoop.io.{LongWritable, Text}
119
120
val hadoopStream = ssc.fileStream[LongWritable, Text, TextInputFormat]("hdfs://input/dir")
121
val textStream = hadoopStream.map(_._2.toString)
122
```
123
124
### Queue Streams (for testing)
125
126
**queueStream**: Create stream from queue of RDDs
127
```scala { .api }
128
def queueStream[T: ClassTag](queue: Queue[RDD[T]], oneAtATime: Boolean = true, defaultRDD: RDD[T] = null): InputDStream[T]
129
```
130
131
```scala
132
import scala.collection.mutable.Queue
133
134
val rddQueue = Queue[RDD[Int]]()
135
136
// Create stream from queue
137
val queueStream = ssc.queueStream(rddQueue)
138
139
// Add RDDs to queue (simulate data arrival)
140
for (i <- 1 to 10) {
141
rddQueue += ssc.sparkContext.parallelize(1 to 100)
142
}
143
```
144
145
### Custom Receiver Streams
146
147
**receiverStream**: Create stream from custom receiver
148
```scala { .api }
149
def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]
150
```
151
152
```scala
153
import org.apache.spark.streaming.receiver.Receiver
154
import org.apache.spark.storage.StorageLevel
155
156
// Custom receiver example
157
class CustomReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
158
def onStart() {
159
// Start receiving data
160
new Thread("Custom Receiver") {
161
override def run() { receive() }
162
}.start()
163
}
164
165
def onStop() {
166
// Stop receiving data
167
}
168
169
private def receive() {
170
while (!isStopped()) {
171
// Simulate data reception
172
val data = generateData()
173
store(data)
174
Thread.sleep(100)
175
}
176
}
177
}
178
179
val customStream = ssc.receiverStream(new CustomReceiver())
180
```
181
182
**actorStream**: Create stream from Akka Actor
183
```scala { .api }
184
def actorStream[T: ClassTag](props: Props, name: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[T]
185
```
186
187
## DStream Transformations
188
189
DStreams support transformations similar to RDDs, applied to each batch.
190
191
### DStream Class
192
193
```scala { .api }
194
abstract class DStream[T: ClassTag] extends Serializable with Logging {
195
def ssc: StreamingContext
196
def slideDuration: Duration
197
def dependencies: List[DStream[_]]
198
def compute(time: Time): Option[RDD[T]]
199
}
200
```
201
202
### Basic Transformations
203
204
**map**: Apply function to each element in each batch
205
```scala { .api }
206
def map[U: ClassTag](mapFunc: T => U): DStream[U]
207
```
208
209
```scala
210
val numbers = ssc.socketTextStream("localhost", 9999)
211
val doubled = numbers.map(_.toInt * 2)
212
```
213
214
**flatMap**: Apply function and flatten results
215
```scala { .api }
216
def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U]
217
```
218
219
```scala
220
val lines = ssc.textFileStream("input/")
221
val words = lines.flatMap(_.split(" "))
222
```
223
224
**filter**: Keep elements matching predicate
225
```scala { .api }
226
def filter(filterFunc: T => Boolean): DStream[T]
227
```
228
229
```scala
230
val validLines = lines.filter(_.nonEmpty)
231
val longWords = words.filter(_.length > 5)
232
```
233
234
**glom**: Coalesce elements within each partition into arrays
235
```scala { .api }
236
def glom(): DStream[Array[T]]
237
```
238
239
### Stream Operations
240
241
**union**: Union with another DStream
242
```scala { .api }
243
def union(that: DStream[T]): DStream[T]
244
```
245
246
```scala
247
val stream1 = ssc.socketTextStream("host1", 9999)
248
val stream2 = ssc.socketTextStream("host2", 9999)
249
val combined = stream1.union(stream2)
250
```
251
252
### Aggregation Transformations
253
254
**count**: Count elements in each batch
255
```scala { .api }
256
def count(): DStream[Long]
257
```
258
259
**countByValue**: Count occurrences of each value
260
```scala { .api }
261
def countByValue()(implicit ord: Ordering[T] = null): DStream[(T, Long)]
262
```
263
264
**reduce**: Reduce elements in each batch
265
```scala { .api }
266
def reduce(reduceFunc: (T, T) => T): DStream[T]
267
```
268
269
```scala
270
val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)
271
272
val counts = numbers.count() // Count per batch
273
val sums = numbers.reduce(_ + _) // Sum per batch
274
val maxValues = numbers.reduce(math.max) // Max per batch
275
```
276
277
### Advanced Transformations
278
279
**transform**: Apply arbitrary RDD-to-RDD function
280
```scala { .api }
281
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]
282
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]
283
```
284
285
```scala
286
val enhanced = stream.transform { (rdd, time) =>
287
// Access to both RDD and batch time
288
val timeString = time.toString
289
rdd.map(data => s"$timeString: $data")
290
.filter(_.contains("important"))
291
}
292
```
293
294
**transformWith**: Transform with another DStream
295
```scala { .api }
296
def transformWith[U: ClassTag, V: ClassTag](other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]): DStream[V]
297
def transformWith[U: ClassTag, V: ClassTag](other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]): DStream[V]
298
```
299
300
```scala
301
val stream1 = ssc.socketTextStream("localhost", 9999)
302
val stream2 = ssc.socketTextStream("localhost", 8888)
303
304
val joined = stream1.transformWith(stream2) { (rdd1, rdd2) =>
305
// Join RDDs from different streams
306
val pairs1 = rdd1.map(line => (extractKey(line), line))
307
val pairs2 = rdd2.map(line => (extractKey(line), line))
308
pairs1.join(pairs2).map { case (key, (v1, v2)) => s"$v1 | $v2" }
309
}
310
```
311
312
## Window Operations
313
314
Window operations allow you to apply transformations over a sliding window of data.
315
316
### Basic Windowing
317
318
**window**: Return windowed DStream
319
```scala { .api }
320
def window(windowDuration: Duration): DStream[T]
321
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
322
```
323
324
```scala
325
val lines = ssc.socketTextStream("localhost", 9999)
326
327
// 30-second window, sliding every 10 seconds
328
val windowedLines = lines.window(Seconds(30), Seconds(10))
329
val windowCounts = windowedLines.count()
330
```
331
332
### Windowed Reductions
333
334
**reduceByWindow**: Reduce over a window
335
```scala { .api }
336
def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
337
def reduceByWindow(reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
338
```
339
340
```scala
341
val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)
342
343
// Sum over window
344
val windowSums = numbers.reduceByWindow(
345
_ + _, // Add new values
346
Seconds(60), // Window duration
347
Seconds(20) // Slide duration
348
)
349
350
// Efficient windowed reduction with inverse function
351
val efficientSums = numbers.reduceByWindow(
352
_ + _, // Add function
353
_ - _, // Inverse (subtract) function
354
Seconds(60),
355
Seconds(20)
356
)
357
```
358
359
**countByWindow**: Count elements over window
360
```scala { .api }
361
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]
362
```
363
364
**countByValueAndWindow**: Count values over window
365
```scala { .api }
366
def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null): DStream[(T, Long)]
367
```
368
369
```scala
370
val words = ssc.socketTextStream("localhost", 9999).flatMap(_.split(" "))
371
372
// Count words in 2-minute window, sliding every 30 seconds
373
val wordCounts = words.countByValueAndWindow(Minutes(2), Seconds(30))
374
wordCounts.print()
375
```
376
377
## PairDStreamFunctions (Key-Value Operations)
378
379
Operations available on DStreams of (key, value) pairs through implicit conversion.
380
381
### Key-Value Transformations
382
383
**keys and values**:
384
```scala { .api }
385
def keys: DStream[K]
386
def values: DStream[V]
387
```
388
389
**mapValues**: Transform values while preserving keys
390
```scala { .api }
391
def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)]
392
```
393
394
**flatMapValues**: FlatMap values while preserving keys
395
```scala { .api }
396
def flatMapValues[U: ClassTag](flatMapValuesFunc: V => TraversableOnce[U]): DStream[(K, U)]
397
```
398
399
```scala
400
import org.apache.spark.streaming.StreamingContext._
401
402
val pairs = ssc.socketTextStream("localhost", 9999)
403
.map(line => {
404
val parts = line.split(",")
405
(parts(0), parts(1).toInt)
406
})
407
408
val doubled = pairs.mapValues(_ * 2)
409
val allKeys = pairs.keys
410
val allValues = pairs.values
411
```
412
413
### Aggregation by Key
414
415
**groupByKey**: Group values by key in each batch
416
```scala { .api }
417
def groupByKey(): DStream[(K, Iterable[V])]
418
def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])]
419
def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]
420
```
421
422
**reduceByKey**: Reduce values by key in each batch
423
```scala { .api }
424
def reduceByKey(func: (V, V) => V): DStream[(K, V)]
425
def reduceByKey(func: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]
426
def reduceByKey(func: (V, V) => V, numPartitions: Int): DStream[(K, V)]
427
```
428
429
**combineByKey**: Generic combine by key
430
```scala { .api }
431
def combineByKey[C: ClassTag](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, partitioner: Partitioner): DStream[(K, C)]
432
```
433
434
```scala
435
val wordStream = ssc.socketTextStream("localhost", 9999)
436
.flatMap(_.split(" "))
437
.map(word => (word, 1))
438
439
// Count words in each batch
440
val wordCounts = wordStream.reduceByKey(_ + _)
441
442
// Group all occurrences
443
val wordGroups = wordStream.groupByKey()
444
```
445
446
### Windowed Key-Value Operations
447
448
**groupByKeyAndWindow**: Group by key over window
449
```scala { .api }
450
def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]
451
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]
452
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner): DStream[(K, Iterable[V])]
453
```
454
455
**reduceByKeyAndWindow**: Reduce by key over window
456
```scala { .api }
457
def reduceByKeyAndWindow(func: (V, V) => V, windowDuration: Duration): DStream[(K, V)]
458
def reduceByKeyAndWindow(func: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]
459
def reduceByKeyAndWindow(func: (V, V) => V, invFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]
460
```
461
462
**countByKeyAndWindow**: Count by key over window
463
```scala { .api }
464
def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Long)]
465
```
466
467
```scala
468
val wordPairs = ssc.socketTextStream("localhost", 9999)
469
.flatMap(_.split(" "))
470
.map((_, 1))
471
472
// Windowed word count (last 5 minutes, every 30 seconds)
473
val windowedWordCounts = wordPairs.reduceByKeyAndWindow(
474
_ + _, // Reduce function
475
Minutes(5), // Window duration
476
Seconds(30) // Slide duration
477
)
478
479
// Efficient version with inverse function
480
val efficientWordCounts = wordPairs.reduceByKeyAndWindow(
481
_ + _, // Add function
482
_ - _, // Subtract function (inverse)
483
Minutes(5),
484
Seconds(30)
485
)
486
```
487
488
### Join Operations
489
490
**join**: Join with another DStream
491
```scala { .api }
492
def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]
493
def join[W: ClassTag](other: DStream[(K, W)], partitioner: Partitioner): DStream[(K, (V, W))]
494
```
495
496
**leftOuterJoin**, **rightOuterJoin**, **fullOuterJoin**:
497
```scala { .api }
498
def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
499
def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]
500
def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))]
501
```
502
503
**cogroup**: Group together with another DStream
504
```scala { .api }
505
def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]
506
```
507
508
```scala
509
val stream1 = ssc.socketTextStream("localhost", 9999)
510
.map(line => (line.split(",")(0), line.split(",")(1)))
511
512
val stream2 = ssc.socketTextStream("localhost", 8888)
513
.map(line => (line.split(",")(0), line.split(",")(1)))
514
515
// Inner join
516
val joined = stream1.join(stream2)
517
518
// Left outer join
519
val leftJoined = stream1.leftOuterJoin(stream2)
520
521
// Cogroup
522
val cogrouped = stream1.cogroup(stream2)
523
```
524
525
## Stateful Operations
526
527
### updateStateByKey
528
529
Maintain state across batches for each key:
530
531
```scala { .api }
532
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
533
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner): DStream[(K, S)]
534
def updateStateByKey[S: ClassTag](updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean): DStream[(K, S)]
535
```
536
537
```scala
538
// Running count of words
539
val wordPairs = ssc.socketTextStream("localhost", 9999)
540
.flatMap(_.split(" "))
541
.map((_, 1))
542
543
val runningCounts = wordPairs.updateStateByKey { (values: Seq[Int], state: Option[Int]) =>
544
val newCount = values.sum + state.getOrElse(0)
545
Some(newCount)
546
}
547
548
// Advanced state management
549
case class WordStats(count: Int, lastSeen: Long)
550
551
val wordStats = wordPairs.updateStateByKey { (values: Seq[Int], state: Option[WordStats]) =>
552
val currentTime = System.currentTimeMillis()
553
val currentCount = values.sum
554
555
state match {
556
case Some(stats) => Some(WordStats(stats.count + currentCount, currentTime))
557
case None => Some(WordStats(currentCount, currentTime))
558
}
559
}
560
```
561
562
## DStream Actions
563
564
Actions trigger the execution of DStream transformations.
565
566
### Output Operations
567
568
**print**: Print first 10 elements of each batch
569
```scala { .api }
570
def print(): Unit
571
def print(num: Int): Unit
572
```
573
574
**foreachRDD**: Apply function to each RDD
575
```scala { .api }
576
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
577
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit
578
```
579
580
```scala
581
val processed = stream.map(process)
582
583
// Print results
584
processed.print()
585
processed.print(20) // Print first 20 elements
586
587
// Custom processing of each batch
588
processed.foreachRDD { rdd =>
589
val count = rdd.count()
590
if (count > 0) {
591
println(s"Batch size: $count")
592
rdd.take(10).foreach(println)
593
}
594
}
595
596
// With time information
597
processed.foreachRDD { (rdd, time) =>
598
println(s"Batch time: $time, Count: ${rdd.count()}")
599
}
600
```
601
602
### Save Operations
603
604
**saveAsTextFiles**: Save each batch as text files
605
```scala { .api }
606
def saveAsTextFiles(prefix: String, suffix: String = ""): Unit
607
```
608
609
**saveAsObjectFiles**: Save each batch as object files
610
```scala { .api }
611
def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit
612
```
613
614
```scala
615
val processed = stream.map(_.toUpperCase)
616
617
// Save each batch - creates files like output-1414060920000, output-1414060921000, etc.
618
processed.saveAsTextFiles("hdfs://path/to/output", ".txt")
619
620
// Save as object files
621
processed.saveAsObjectFiles("hdfs://path/to/objects")
622
```
623
624
## StreamingContext Control
625
626
### Starting and Stopping
627
628
**start**: Start the streaming computation
629
```scala { .api }
630
def start(): Unit
631
```
632
633
**stop**: Stop the streaming context
634
```scala { .api }
635
def stop(): Unit
636
def stop(stopSparkContext: Boolean): Unit
637
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit
638
```
639
640
**awaitTermination**: Wait for termination
641
```scala { .api }
642
def awaitTermination(): Unit
643
def awaitTermination(timeout: Long): Boolean
644
```
645
646
```scala
647
val ssc = new StreamingContext(conf, Seconds(1))
648
649
// Define streaming computation
650
val stream = ssc.socketTextStream("localhost", 9999)
651
stream.print()
652
653
// Start the computation
654
ssc.start()
655
656
// Wait for termination
657
ssc.awaitTermination()
658
659
// Or wait with timeout
660
val terminated = ssc.awaitTermination(60000) // 60 seconds
661
if (!terminated) {
662
println("Streaming did not terminate within 60 seconds")
663
ssc.stop()
664
}
665
```
666
667
### Checkpointing
668
669
**checkpoint**: Set checkpoint directory
670
```scala { .api }
671
def checkpoint(directory: String): Unit
672
```
673
674
**getOrCreate**: Get existing context from checkpoint or create new one
675
```scala { .api }
676
def getOrCreate(checkpointPath: String, creatingFunc: () => StreamingContext, hadoopConf: Configuration = new Configuration()): StreamingContext
677
```
678
679
```scala
680
// Enable checkpointing
681
ssc.checkpoint("hdfs://path/to/checkpoints")
682
683
// Fault-tolerant pattern
684
def createStreamingContext(): StreamingContext = {
685
val ssc = new StreamingContext(conf, Seconds(1))
686
687
// Define streaming computation
688
val lines = ssc.socketTextStream("localhost", 9999)
689
val words = lines.flatMap(_.split(" "))
690
val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
691
wordCounts.print()
692
693
ssc.checkpoint("hdfs://checkpoints")
694
ssc
695
}
696
697
val ssc = StreamingContext.getOrCreate("hdfs://checkpoints", createStreamingContext _)
698
```
699
700
### Context Properties
701
702
**remember**: Set remember duration
703
```scala { .api }
704
def remember(duration: Duration): Unit
705
```
706
707
**sparkContext**: Access underlying SparkContext
708
```scala { .api }
709
def sparkContext: SparkContext
710
```
711
712
```scala
713
// Set how long to remember RDDs
714
ssc.remember(Minutes(10))
715
716
// Access SparkContext
717
val sc = ssc.sparkContext
718
val broadcast = sc.broadcast(lookupTable)
719
```
720
721
## Persistence and Caching
722
723
DStreams can be persisted in memory for faster access:
724
725
```scala { .api }
726
def persist(storageLevel: StorageLevel): DStream[T]
727
def persist(): DStream[T] // Uses MEMORY_ONLY_SER
728
def cache(): DStream[T] // Uses MEMORY_ONLY_SER
729
```
730
731
```scala
732
import org.apache.spark.storage.StorageLevel
733
734
val expensiveStream = ssc.socketTextStream("localhost", 9999)
735
.map(expensiveTransformation)
736
.cache() // Cache for reuse
737
738
// Multiple operations on cached stream
739
val count = expensiveStream.count()
740
val sample = expensiveStream.sample(false, 0.1)
741
```
742
743
## Performance and Best Practices
744
745
### Batch Interval Selection
746
747
```scala
748
// For low latency (100ms - 1s)
749
val ssc = new StreamingContext(conf, Milliseconds(500))
750
751
// For high throughput (1s - 10s)
752
val ssc = new StreamingContext(conf, Seconds(5))
753
754
// For batch processing style (minutes)
755
val ssc = new StreamingContext(conf, Minutes(2))
756
```
757
758
### Parallelism and Partitioning
759
760
```scala
761
// Increase parallelism for receivers
762
val numReceivers = 4
763
val streams = (1 to numReceivers).map { i =>
764
ssc.socketTextStream(s"host$i", 9999)
765
}
766
val unifiedStream = ssc.union(streams)
767
768
// Repartition for better load balancing
769
val repartitioned = stream.transform(_.repartition(10))
770
```
771
772
### Memory Management
773
774
```scala
775
// Set appropriate storage levels
776
val persistedStream = stream
777
.map(expensiveOperation)
778
.persist(StorageLevel.MEMORY_AND_DISK_SER)
779
780
// Enable checkpointing for fault tolerance
781
ssc.checkpoint("hdfs://checkpoints")
782
783
// Use efficient serialization
784
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
785
```
786
787
## Error Handling and Fault Tolerance
788
789
Spark Streaming applications must handle various failure scenarios to ensure reliable operation.
790
791
### Common Streaming Errors
792
793
**StreamingContextException**: Invalid streaming context operations
794
```scala
795
try {
796
ssc.start()
797
ssc.start() // Error: context already started
798
} catch {
799
case e: IllegalStateException =>
800
println("Streaming context already started")
801
}
802
```
803
804
**Receiver Failures**: Input stream receivers failing
805
```scala
806
// Monitor receiver status
807
ssc.addStreamingListener(new StreamingListener {
808
override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
809
println(s"Receiver error: ${receiverError.receiverInfo.name}")
810
// Implement recovery logic
811
}
812
})
813
```
814
815
**Batch Processing Delays**: When processing takes longer than batch interval
816
```scala
817
// Monitor batch processing times
818
ssc.addStreamingListener(new StreamingListener {
819
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
820
val processingTime = batchCompleted.batchInfo.processingDelay.getOrElse(0L)
821
val batchInterval = ssc.graph.batchDuration.milliseconds
822
823
if (processingTime > batchInterval) {
824
println(s"Warning: Processing time ($processingTime ms) > batch interval ($batchInterval ms)")
825
}
826
}
827
})
828
```
829
830
### Checkpoint Corruption
831
832
**Checkpoint Recovery Failures**: When checkpoint data is corrupted
833
```scala
834
def createStreamingContext(): StreamingContext = {
835
val ssc = new StreamingContext(conf, Seconds(1))
836
// Define streaming logic
837
ssc.checkpoint("hdfs://checkpoints")
838
ssc
839
}
840
841
try {
842
val ssc = StreamingContext.getOrCreate("hdfs://checkpoints", createStreamingContext _)
843
} catch {
844
case e: Exception =>
845
println(s"Checkpoint recovery failed: ${e.getMessage}")
846
// Fall back to creating new context
847
val ssc = createStreamingContext()
848
}
849
```
850
851
**Checkpoint Directory Management**:
852
```scala
853
// Clean up old checkpoints periodically
854
import java.io.File
855
import org.apache.hadoop.fs.{FileSystem, Path}
856
857
def cleanupCheckpoints(checkpointDir: String, retentionHours: Int): Unit = {
858
val fs = FileSystem.get(ssc.sparkContext.hadoopConfiguration)
859
val checkpointPath = new Path(checkpointDir)
860
861
try {
862
val cutoffTime = System.currentTimeMillis() - (retentionHours * 60 * 60 * 1000)
863
val files = fs.listStatus(checkpointPath)
864
865
files.foreach { fileStatus =>
866
if (fileStatus.getModificationTime < cutoffTime) {
867
fs.delete(fileStatus.getPath, true)
868
println(s"Deleted old checkpoint: ${fileStatus.getPath}")
869
}
870
}
871
} catch {
872
case e: Exception => println(s"Checkpoint cleanup failed: ${e.getMessage}")
873
}
874
}
875
```
876
877
### Memory and Resource Errors
878
879
**OutOfMemoryError in Streaming**:
880
```scala
881
// Monitor memory usage and adjust batch sizes
882
val memoryMonitoringStream = stream.transform { rdd =>
883
val memoryUsed = Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory
884
val memoryMax = Runtime.getRuntime.maxMemory
885
val memoryPercent = (memoryUsed.toDouble / memoryMax) * 100
886
887
if (memoryPercent > 80) {
888
println(s"Warning: Memory usage at ${memoryPercent.toInt}%")
889
// Reduce batch size or increase memory
890
}
891
892
rdd
893
}
894
```
895
896
**Backpressure Issues**: When input rate exceeds processing capacity
897
```scala
898
// Enable backpressure (Spark 1.5+)
899
conf.set("spark.streaming.backpressure.enabled", "true")
900
conf.set("spark.streaming.backpressure.initialRate", "1000")
901
902
// Manual rate limiting
903
conf.set("spark.streaming.kafka.maxRatePerPartition", "1000")
904
```
905
906
### Network and Connectivity Errors
907
908
**Socket Connection Failures**:
909
```scala
910
// Implement retry logic for socket connections
911
def createReliableSocketStream(hostname: String, port: Int, maxRetries: Int = 3): DStream[String] = {
912
var attempts = 0
913
var stream: DStream[String] = null
914
915
while (attempts < maxRetries && stream == null) {
916
try {
917
stream = ssc.socketTextStream(hostname, port)
918
println(s"Connected to $hostname:$port")
919
} catch {
920
case e: ConnectException =>
921
attempts += 1
922
println(s"Connection attempt $attempts failed: ${e.getMessage}")
923
if (attempts < maxRetries) {
924
Thread.sleep(5000) // Wait 5 seconds before retry
925
}
926
}
927
}
928
929
if (stream == null) {
930
throw new RuntimeException(s"Failed to connect after $maxRetries attempts")
931
}
932
933
stream
934
}
935
```
936
937
**Kafka Connection Issues**:
938
```scala
939
// Handle Kafka metadata refresh failures
940
val kafkaParams = Map[String, String](
941
"metadata.broker.list" -> "broker1:9092,broker2:9092",
942
"auto.offset.reset" -> "smallest",
943
"refresh.leader.backoff.ms" -> "1000",
944
"socket.timeout.ms" -> "30000",
945
"fetch.message.max.bytes" -> "1048576"
946
)
947
948
try {
949
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
950
ssc, kafkaParams, topics
951
)
952
} catch {
953
case e: TimeoutException =>
954
println("Kafka connection timeout - check broker availability")
955
case e: Exception =>
956
println(s"Kafka stream creation failed: ${e.getMessage}")
957
}
958
```
959
960
### Processing Errors and Recovery
961
962
**Exception Handling in Transformations**:
963
```scala
964
val robustStream = stream.map { record =>
965
try {
966
processRecord(record)
967
} catch {
968
case e: NumberFormatException =>
969
println(s"Invalid number format in record: $record")
970
null // or default value
971
case e: Exception =>
972
println(s"Processing error for record $record: ${e.getMessage}")
973
null
974
}
975
}.filter(_ != null) // Remove failed records
976
```
977
978
**Dead Letter Queue Pattern**:
979
```scala
980
val (successStream, errorStream) = stream.transform { rdd =>
981
val processed = rdd.map { record =>
982
try {
983
(Some(processRecord(record)), None)
984
} catch {
985
case e: Exception =>
986
(None, Some((record, e.getMessage)))
987
}
988
}.cache() // Cache to avoid recomputation
989
990
val successes = processed.filter(_._1.isDefined).map(_._1.get)
991
val errors = processed.filter(_._2.isDefined).map(_._2.get)
992
993
// Save errors to dead letter queue
994
errors.foreachPartition { partition =>
995
partition.foreach { case (record, error) =>
996
saveToDeadLetterQueue(record, error)
997
}
998
}
999
1000
successes
1001
}
1002
```
1003
1004
### Best Practices for Error Handling
1005
1006
1. **Enable Checkpointing**: Always use checkpointing for production applications
1007
2. **Monitor Batch Processing Times**: Ensure processing time < batch interval
1008
3. **Implement Circuit Breakers**: Fail fast when external services are down
1009
4. **Use Write-Ahead Logs**: Enable WAL for reliable receivers
1010
5. **Handle Partial Failures**: Process what you can, log what fails
1011
6. **Set Up Monitoring**: Use Spark UI and external monitoring tools
1012
1013
```scala
1014
// Comprehensive error handling pattern
1015
def createRobustStreamingApp(): StreamingContext = {
1016
val ssc = new StreamingContext(conf, Seconds(1))
1017
1018
// Enable fault tolerance features
1019
ssc.checkpoint("hdfs://checkpoints")
1020
conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
1021
conf.set("spark.streaming.backpressure.enabled", "true")
1022
1023
val stream = ssc.socketTextStream("localhost", 9999)
1024
.map(parseRecord)
1025
.filter(_.isDefined)
1026
.map(_.get)
1027
.handleErrors()
1028
.cache()
1029
1030
// Multiple outputs for different purposes
1031
stream.print()
1032
stream.saveAsTextFiles("hdfs://output/data")
1033
1034
// Add monitoring
1035
ssc.addStreamingListener(new CustomStreamingListener())
1036
1037
ssc
1038
}
1039
1040
implicit class RobustDStream[T](dstream: DStream[T]) {
1041
def handleErrors(): DStream[T] = {
1042
dstream.transform { rdd =>
1043
rdd.filter(_ != null).handlePartitionErrors()
1044
}
1045
}
1046
}
1047
```
1048
1049
This comprehensive guide covers the complete Spark Streaming API along with robust error handling patterns for building scalable, fault-tolerant stream processing applications.