0
# Stream Processing
1
2
Spark Streaming provides scalable, high-throughput, fault-tolerant stream processing of live data streams. It supports both the legacy DStream API and the newer Structured Streaming approach.
3
4
## StreamingContext (Legacy DStream API)
5
6
The main entry point for Spark Streaming functionality using the DStream API.
7
8
```scala { .api }
9
class StreamingContext private[streaming] (
10
_sc: SparkContext,
11
_cp: Checkpoint,
12
_batchDur: Duration) extends Logging {
13
14
// Constructors (via companion object)
15
// new StreamingContext(sparkContext: SparkContext, batchDuration: Duration)
16
// new StreamingContext(conf: SparkConf, batchDuration: Duration)
17
// new StreamingContext(path: String, hadoopConf: Configuration)
18
19
// Input stream creation
20
def socketTextStream(hostname: String, port: Int,
21
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]
22
def socketStream[T: ClassTag](hostname: String, port: Int,
23
converter: (InputStream) => Iterator[T],
24
storageLevel: StorageLevel): ReceiverInputDStream[T]
25
def rawSocketStream[T: ClassTag](hostname: String, port: Int,
26
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[T]
27
28
def textFileStream(directory: String): DStream[String]
29
def fileStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag](directory: String): InputDStream[(K, V)]
30
def binaryRecordsStream(directory: String, recordLength: Int): DStream[Array[Byte]]
31
32
def queueStream[T: ClassTag](queue: Queue[RDD[T]], oneAtATime: Boolean = true,
33
defaultRDD: RDD[T] = null): InputDStream[T]
34
def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]
35
def actorStream[T: ClassTag](props: Props, name: String,
36
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[T]
37
def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T]
38
def transform[T: ClassTag](dstreams: Seq[DStream[_]],
39
transformFunc: (Seq[RDD[_]], Time) => RDD[T]): DStream[T]
40
41
// Execution control
42
def start(): Unit
43
def stop(stopSparkContext: Boolean = conf.getBoolean("spark.streaming.stopSparkContextByDefault", true),
44
stopGracefully: Boolean = false): Unit
45
def awaitTermination(): Unit
46
def awaitTerminationOrTimeout(timeout: Long): Boolean
47
48
// Configuration and state
49
def remember(duration: Duration): Unit
50
def checkpoint(directory: String): Unit
51
def sparkContext: SparkContext
52
def graph: DStreamGraph
53
def getState(): StreamingContextState
54
55
// Properties
56
def batchDuration: Duration
57
}
58
59
object StreamingContext extends Logging {
60
def getOrCreate(checkpointPath: String, creatingFunc: () => StreamingContext,
61
hadoopConf: Configuration = SparkHadoopUtil.get.conf): StreamingContext
62
def getActiveOrCreate(checkpointPath: String, creatingFunc: () => StreamingContext,
63
hadoopConf: Configuration = SparkHadoopUtil.get.conf): StreamingContext
64
}
65
```
66
67
### Usage Examples
68
69
```scala
70
import org.apache.spark.streaming.{StreamingContext, Seconds}
71
import org.apache.spark.SparkConf
72
73
// Create StreamingContext
74
val conf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]")
75
val ssc = new StreamingContext(conf, Seconds(1))
76
77
// Set checkpoint directory
78
ssc.checkpoint("hdfs://checkpoint")
79
80
// Create input streams
81
val lines = ssc.socketTextStream("localhost", 9999)
82
val fileStream = ssc.textFileStream("hdfs://input")
83
84
// Start processing
85
ssc.start()
86
ssc.awaitTermination()
87
```
88
89
## DStream (Discretized Stream)
90
91
The basic abstraction in Spark Streaming representing a continuous stream of data.
92
93
```scala { .api }
94
abstract class DStream[T: ClassTag](@transient private var ssc: StreamingContext) extends Serializable with Logging {
95
96
// Basic transformations
97
def map[U: ClassTag](mapFunc: T => U): DStream[U]
98
def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U]
99
def filter(filterFunc: T => Boolean): DStream[T]
100
def mapPartitions[U: ClassTag](mapPartFunc: Iterator[T] => Iterator[U],
101
preservePartitioning: Boolean = false): DStream[U]
102
def mapPartitionsWithIndex[U: ClassTag](mapPartFunc: (Int, Iterator[T]) => Iterator[U],
103
preservePartitioning: Boolean = false): DStream[U]
104
105
// Reduce operations
106
def reduce(reduceFunc: (T, T) => T): DStream[T]
107
def count(): DStream[Long]
108
def countByValue()(implicit ord: Ordering[T] = null): DStream[(T, Long)]
109
def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration)
110
(implicit ord: Ordering[T] = null): DStream[(T, Long)]
111
112
// Pair DStream operations (available when T is (K, V))
113
def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] // when T = (K, V)
114
def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)]
115
def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]
116
def groupByKey(): DStream[(K, Iterable[V])] // when T = (K, V)
117
def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]
118
def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])]
119
def combineByKey[C: ClassTag](createCombiner: V => C, mergeValue: (C, V) => C,
120
mergeCombiners: (C, C) => C): DStream[(K, C)]
121
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
122
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S],
123
numPartitions: Int): DStream[(K, S)]
124
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S],
125
partitioner: Partitioner,
126
initialRDD: RDD[(K, S)]): DStream[(K, S)]
127
128
// Join operations
129
def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] // when T = (K, V)
130
def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))]
131
def join[W: ClassTag](other: DStream[(K, W)], partitioner: Partitioner): DStream[(K, (V, W))]
132
def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
133
def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]
134
def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))]
135
def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]
136
137
// Set operations
138
def union(that: DStream[T]): DStream[T]
139
140
// Window operations
141
def window(windowDuration: Duration, slideDuration: Duration = this.slideDuration): DStream[T]
142
def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
143
def reduceByWindow(reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T,
144
windowDuration: Duration, slideDuration: Duration): DStream[T]
145
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]
146
def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])] // when T = (K, V)
147
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]
148
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration,
149
numPartitions: Int): DStream[(K, Iterable[V])]
150
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration,
151
partitioner: Partitioner): DStream[(K, Iterable[V])]
152
def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration): DStream[(K, V)]
153
def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration,
154
slideDuration: Duration): DStream[(K, V)]
155
def reduceByKeyAndWindow(reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V,
156
windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]
157
158
// Transformation operations
159
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]
160
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]
161
def transformWith[U: ClassTag, W: ClassTag](other: DStream[U],
162
transformFunc: (RDD[T], RDD[U]) => RDD[W]): DStream[W]
163
def transformWith[U: ClassTag, W: ClassTag](other: DStream[U],
164
transformFunc: (RDD[T], RDD[U], Time) => RDD[W]): DStream[W]
165
166
// Output operations (actions)
167
def print(): Unit
168
def print(num: Int): Unit
169
def saveAsTextFiles(prefix: String, suffix: String = ""): Unit
170
def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit
171
def saveAsHadoopFiles[F <: OutputFormat[K, V]](prefix: String, suffix: String = ""): Unit
172
def foreach(foreachFunc: RDD[T] => Unit): Unit
173
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
174
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit
175
176
// Caching
177
def cache(): DStream[T]
178
def persist(): DStream[T]
179
def persist(level: StorageLevel): DStream[T]
180
181
// Checkpointing
182
def checkpoint(interval: Duration): DStream[T]
183
184
// Properties
185
def context: StreamingContext
186
def ssc: StreamingContext
187
def slideDuration: Duration
188
def dependencies: List[DStream[_]]
189
def generatedRDDs: HashMap[Time, RDD[T]]
190
def rememberDuration: Duration
191
}
192
```
193
194
### Usage Examples
195
196
```scala
197
import org.apache.spark.streaming.{StreamingContext, Seconds}
198
199
val ssc = new StreamingContext(conf, Seconds(1))
200
201
// Basic transformations
202
val lines = ssc.socketTextStream("localhost", 9999)
203
val words = lines.flatMap(_.split(" "))
204
val pairs = words.map(word => (word, 1))
205
val wordCounts = pairs.reduceByKey(_ + _)
206
207
// Window operations
208
val windowedWordCounts = pairs.reduceByKeyAndWindow(
209
reduceFunc = _ + _,
210
windowDuration = Seconds(30),
211
slideDuration = Seconds(10)
212
)
213
214
// Stateful operations
215
val runningCounts = pairs.updateStateByKey[Int] { (values: Seq[Int], state: Option[Int]) =>
216
val currentCount = values.sum
217
val previousCount = state.getOrElse(0)
218
Some(currentCount + previousCount)
219
}
220
221
// Output operations
222
wordCounts.print()
223
wordCounts.saveAsTextFiles("hdfs://output")
224
wordCounts.foreachRDD { rdd =>
225
rdd.foreach(println)
226
}
227
228
ssc.start()
229
ssc.awaitTermination()
230
```
231
232
## Structured Streaming (Recommended Approach)
233
234
Structured Streaming provides a scalable and fault-tolerant stream processing engine built on the Spark SQL engine.
235
236
### DataStreamReader
237
238
```scala { .api }
239
class DataStreamReader private[sql](sparkSession: SparkSession) {
240
// Format specification
241
def format(source: String): DataStreamReader
242
243
// Options
244
def option(key: String, value: String): DataStreamReader
245
def option(key: String, value: Boolean): DataStreamReader
246
def option(key: String, value: Long): DataStreamReader
247
def option(key: String, value: Double): DataStreamReader
248
def options(options: scala.collection.Map[String, String]): DataStreamReader
249
def options(options: java.util.Map[String, String]): DataStreamReader
250
251
// Schema
252
def schema(schema: StructType): DataStreamReader
253
def schema(schemaString: String): DataStreamReader
254
255
// Loading methods
256
def load(): DataFrame
257
def load(path: String): DataFrame
258
259
// Format-specific methods
260
def csv(path: String): DataFrame
261
def json(path: String): DataFrame
262
def parquet(path: String): DataFrame
263
def text(path: String): DataFrame
264
def textFile(path: String): Dataset[String]
265
def table(tableName: String): DataFrame
266
267
// Source-specific methods
268
def socket(host: String, port: Int): DataFrame
269
def rate(rowsPerSecond: Long): DataFrame
270
}
271
```
272
273
### DataStreamWriter
274
275
```scala { .api }
276
class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
277
// Output mode
278
def outputMode(outputMode: OutputMode): DataStreamWriter[T]
279
def outputMode(outputMode: String): DataStreamWriter[T]
280
281
// Trigger specification
282
def trigger(trigger: Trigger): DataStreamWriter[T]
283
284
// Format specification
285
def format(source: String): DataStreamWriter[T]
286
287
// Options
288
def option(key: String, value: String): DataStreamWriter[T]
289
def option(key: String, value: Boolean): DataStreamWriter[T]
290
def option(key: String, value: Long): DataStreamWriter[T]
291
def option(key: String, value: Double): DataStreamWriter[T]
292
def options(options: scala.collection.Map[String, String]): DataStreamWriter[T]
293
def options(options: java.util.Map[String, String]): DataStreamWriter[T]
294
295
// Partitioning
296
def partitionBy(colNames: String*): DataStreamWriter[T]
297
298
// Query naming and checkpointing
299
def queryName(queryName: String): DataStreamWriter[T]
300
def option(key: String, value: String): DataStreamWriter[T]
301
302
// Output methods
303
def start(): StreamingQuery
304
def start(path: String): StreamingQuery
305
def toTable(tableName: String): StreamingQuery
306
307
// Format-specific methods
308
def console(): StreamingQuery
309
def csv(path: String): StreamingQuery
310
def json(path: String): StreamingQuery
311
def parquet(path: String): StreamingQuery
312
def text(path: String): StreamingQuery
313
314
// Foreach operations
315
def foreach(writer: ForeachWriter[T]): StreamingQuery
316
def foreachBatch(function: (Dataset[T], Long) => Unit): StreamingQuery
317
def foreachBatch(function: VoidFunction2[Dataset[T], java.lang.Long]): StreamingQuery
318
}
319
```
320
321
### StreamingQuery
322
323
```scala { .api }
324
trait StreamingQuery {
325
def name: String
326
def id: UUID
327
def runId: UUID
328
def sparkSession: SparkSession
329
def isActive: Boolean
330
def exception: Option[StreamingQueryException]
331
def status: StreamingQueryStatus
332
def recentProgress: Array[StreamingQueryProgress]
333
def lastProgress: StreamingQueryProgress
334
def awaitTermination(): Unit
335
def awaitTermination(timeoutMs: Long): Boolean
336
def processAllAvailable(): Unit
337
def stop(): Unit
338
def explain(): Unit
339
def explain(extended: Boolean): Unit
340
}
341
```
342
343
### StreamingQueryManager
344
345
```scala { .api }
346
class StreamingQueryManager private[sql](sparkSession: SparkSession) {
347
def active: Array[StreamingQuery]
348
def get(id: UUID): StreamingQuery
349
def get(name: String): StreamingQuery
350
def awaitAnyTermination(): Unit
351
def awaitAnyTermination(timeoutMs: Long): Boolean
352
def resetTerminated(): Unit
353
def addListener(listener: StreamingQueryListener): Unit
354
def removeListener(listener: StreamingQueryListener): Unit
355
}
356
```
357
358
### Triggers
359
360
```scala { .api }
361
sealed trait Trigger
362
363
object Trigger {
364
def ProcessingTime(interval: String): Trigger
365
def ProcessingTime(interval: Long, unit: TimeUnit): Trigger
366
def ProcessingTime(interval: Duration): Trigger
367
def Once(): Trigger
368
def Continuous(interval: String): Trigger
369
def Continuous(interval: Long, unit: TimeUnit): Trigger
370
def Continuous(interval: Duration): Trigger
371
def AvailableNow(): Trigger
372
}
373
```
374
375
### Output Modes
376
377
```scala { .api }
378
sealed trait OutputMode
379
380
object OutputMode {
381
val Append: OutputMode
382
val Complete: OutputMode
383
val Update: OutputMode
384
}
385
```
386
387
### Usage Examples
388
389
```scala
390
import org.apache.spark.sql.SparkSession
391
import org.apache.spark.sql.streaming.Trigger
392
import org.apache.spark.sql.functions._
393
394
val spark = SparkSession.builder()
395
.appName("StructuredStreaming")
396
.master("local[*]")
397
.getOrCreate()
398
399
import spark.implicits._
400
401
// Read from socket
402
val lines = spark.readStream
403
.format("socket")
404
.option("host", "localhost")
405
.option("port", 9999)
406
.load()
407
408
// Word count with watermarking
409
val words = lines.as[String].flatMap(_.split(" "))
410
val wordCounts = words
411
.withColumn("timestamp", current_timestamp())
412
.withWatermark("timestamp", "10 minutes")
413
.groupBy(window($"timestamp", "10 minutes", "5 minutes"), $"value")
414
.count()
415
416
// Write to console
417
val query = wordCounts.writeStream
418
.outputMode("update")
419
.format("console")
420
.trigger(Trigger.ProcessingTime("10 seconds"))
421
.start()
422
423
query.awaitTermination()
424
425
// Read from files
426
val csvDF = spark.readStream
427
.option("sep", ",")
428
.schema(userSchema)
429
.csv("path/to/directory")
430
431
// Write to parquet with checkpointing
432
val fileQuery = csvDF.writeStream
433
.format("parquet")
434
.option("checkpointLocation", "path/to/checkpoint/dir")
435
.option("path", "path/to/destination/dir")
436
.trigger(Trigger.ProcessingTime("1 minute"))
437
.start()
438
439
// Foreach batch processing
440
csvDF.writeStream
441
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
442
batchDF.write
443
.mode("append")
444
.saveAsTable("my_table")
445
}
446
.start()
447
```
448
449
## Custom Receivers (Legacy DStream)
450
451
```scala { .api }
452
abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {
453
def onStart(): Unit
454
def onStop(): Unit
455
def store(dataItem: T): Unit
456
def store(dataBuffer: ArrayBuffer[T]): Unit
457
def store(dataIterator: Iterator[T]): Unit
458
def store(dataBuffer: ArrayBuffer[T], metadata: Any): Unit
459
def store(dataIterator: Iterator[T], metadata: Any): Unit
460
def reportError(message: String, throwable: Throwable): Unit
461
def restart(message: String): Unit
462
def restart(message: String, error: Throwable): Unit
463
def restart(message: String, error: Throwable, millisecond: Int): Unit
464
def stop(message: String): Unit
465
def stop(message: String, error: Throwable): Unit
466
def isStarted(): Boolean
467
def isStopped(): Boolean
468
def hasError(): Boolean
469
def preferredLocation: Option[String]
470
}
471
```
472
473
### Usage Examples
474
475
```scala
476
import org.apache.spark.streaming.receiver.Receiver
477
import org.apache.spark.storage.StorageLevel
478
import java.io._
479
import java.net._
480
481
class CustomReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
482
def onStart() {
483
new Thread("Socket Receiver") {
484
override def run() { receive() }
485
}.start()
486
}
487
488
def onStop() {
489
// Clean up resources
490
}
491
492
private def receive() {
493
var socket: Socket = null
494
var userInput: String = null
495
try {
496
socket = new Socket(host, port)
497
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
498
userInput = reader.readLine()
499
while(!isStopped && userInput != null) {
500
store(userInput)
501
userInput = reader.readLine()
502
}
503
reader.close()
504
socket.close()
505
restart("Trying to connect again")
506
} catch {
507
case e: java.net.ConnectException =>
508
restart("Error connecting to " + host + ":" + port, e)
509
case t: Throwable =>
510
restart("Error receiving data", t)
511
}
512
}
513
}
514
515
// Use custom receiver
516
val customStream = ssc.receiverStream(new CustomReceiver("localhost", 9999))
517
```
518
519
## Duration
520
521
Time duration utilities for streaming applications.
522
523
```scala { .api }
524
case class Duration(private val millis: Long) extends Serializable {
525
def milliseconds: Long = millis
526
def +(that: Duration): Duration
527
def -(that: Duration): Duration
528
def *(times: Int): Duration
529
def /(that: Duration): Double
530
def <(that: Duration): Boolean
531
def <=(that: Duration): Boolean
532
def >(that: Duration): Boolean
533
def >=(that: Duration): Boolean
534
def isMultipleOf(that: Duration): Boolean
535
def min(that: Duration): Duration
536
def max(that: Duration): Duration
537
def isZero: Boolean
538
override def toString: String
539
def prettyPrint: String
540
}
541
542
object Duration {
543
val ZERO = new Duration(0)
544
def apply(length: Long, unit: TimeUnit): Duration
545
}
546
547
object Milliseconds {
548
def apply(milliseconds: Long): Duration
549
}
550
551
object Seconds {
552
def apply(seconds: Long): Duration
553
}
554
555
object Minutes {
556
def apply(minutes: Long): Duration
557
}
558
```
559
560
### Usage Examples
561
562
```scala
563
import org.apache.spark.streaming.{Duration, Seconds, Minutes, Milliseconds}
564
565
// Creating durations
566
val batchInterval = Seconds(1)
567
val windowDuration = Minutes(5)
568
val slideDuration = Seconds(30)
569
val timeout = Milliseconds(500)
570
571
// Duration arithmetic
572
val totalTime = batchInterval * 10
573
val remaining = windowDuration - slideDuration
574
val isMultiple = windowDuration.isMultipleOf(slideDuration)
575
576
// Using in streaming operations
577
val ssc = new StreamingContext(conf, batchInterval)
578
val windowedCounts = pairs.reduceByKeyAndWindow(_ + _, windowDuration, slideDuration)
579
```