0
# Stream Processing
1
2
Apache Spark provides stream processing capabilities through two APIs: Structured Streaming (the modern API) built on DataFrames/Datasets, and the legacy DStreams API. Structured Streaming is the recommended approach for new applications.
3
4
## Package Information
5
6
Stream processing functionality is available through:
7
8
```scala
9
// Structured Streaming (recommended)
10
import org.apache.spark.sql.streaming._
11
import org.apache.spark.sql.functions._
12
13
// Legacy DStreams API (maintenance mode)
14
import org.apache.spark.streaming._
15
import org.apache.spark.streaming.dstream._
16
```
17
18
## Basic Usage
19
20
### Structured Streaming
21
22
```scala
23
import org.apache.spark.sql.SparkSession
24
import org.apache.spark.sql.streaming.Trigger
25
import org.apache.spark.sql.functions._
26
27
val spark = SparkSession.builder()
28
.appName("Structured Streaming Example")
29
.master("local[*]")
30
.getOrCreate()
31
32
// Read streaming data
33
val lines = spark.readStream
34
.format("socket")
35
.option("host", "localhost")
36
.option("port", 9999)
37
.load()
38
39
// Process streaming data
40
val words = lines.as[String].flatMap(_.split(" "))
41
val wordCounts = words.groupBy("value").count()
42
43
// Write streaming results
44
val query = wordCounts.writeStream
45
.outputMode("complete")
46
.format("console")
47
.trigger(Trigger.ProcessingTime("10 seconds"))
48
.start()
49
50
query.awaitTermination()
51
```
52
53
### DStreams (Legacy)
54
55
```scala
56
import org.apache.spark.streaming._
57
import org.apache.spark.streaming.dstream._
58
59
val ssc = new StreamingContext(sc, Seconds(10))
60
61
// Create DStream
62
val lines = ssc.socketTextStream("localhost", 9999)
63
64
// Process DStream
65
val words = lines.flatMap(_.split(" "))
66
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
67
68
// Output results
69
wordCounts.print()
70
71
// Start streaming computation
72
ssc.start()
73
ssc.awaitTermination()
74
```
75
76
## Capabilities
77
78
### Structured Streaming
79
80
The modern streaming API built on the DataFrame/Dataset API, providing end-to-end exactly-once guarantees.
81
82
#### DataStreamReader
83
84
Interface for loading streaming DataFrames.
85
86
```scala { .api }
87
class DataStreamReader {
88
// Configuration
89
def format(source: String): DataStreamReader
90
def schema(schema: StructType): DataStreamReader
91
def schema(schemaString: String): DataStreamReader
92
def option(key: String, value: String): DataStreamReader
93
def option(key: String, value: Boolean): DataStreamReader
94
def option(key: String, value: Long): DataStreamReader
95
def option(key: String, value: Double): DataStreamReader
96
def options(options: scala.collection.Map[String, String]): DataStreamReader
97
def options(options: java.util.Map[String, String]): DataStreamReader
98
99
// Data sources
100
def csv(path: String): DataFrame
101
def json(path: String): DataFrame
102
def parquet(path: String): DataFrame
103
def orc(path: String): DataFrame
104
def text(path: String): DataFrame
105
def textFile(path: String): Dataset[String]
106
def table(tableName: String): DataFrame
107
108
// Generic load
109
def load(): DataFrame
110
def load(path: String): DataFrame
111
}
112
```
113
114
#### DataStreamWriter
115
116
Interface for writing streaming DataFrames.
117
118
```scala { .api }
119
class DataStreamWriter[T] {
120
// Configuration
121
def outputMode(outputMode: OutputMode): DataStreamWriter[T]
122
def outputMode(outputMode: String): DataStreamWriter[T]
123
def trigger(trigger: Trigger): DataStreamWriter[T]
124
def format(source: String): DataStreamWriter[T]
125
def option(key: String, value: String): DataStreamWriter[T]
126
def option(key: String, value: Boolean): DataStreamWriter[T]
127
def option(key: String, value: Long): DataStreamWriter[T]
128
def option(key: String, value: Double): DataStreamWriter[T]
129
def options(options: scala.collection.Map[String, String]): DataStreamWriter[T]
130
def options(options: java.util.Map[String, String]): DataStreamWriter[T]
131
def partitionBy(colNames: String*): DataStreamWriter[T]
132
def queryName(queryName: String): DataStreamWriter[T]
133
134
// Data sinks
135
def foreach(writer: ForeachWriter[T]): DataStreamWriter[T]
136
def foreachBatch(function: (Dataset[T], Long) => Unit): DataStreamWriter[T]
137
def console(): DataStreamWriter[T]
138
def console(numRows: Int): DataStreamWriter[T]
139
def console(numRows: Int, truncate: Boolean): DataStreamWriter[T]
140
141
// File sinks
142
def csv(path: String): DataStreamWriter[T]
143
def json(path: String): DataStreamWriter[T]
144
def parquet(path: String): DataStreamWriter[T]
145
def orc(path: String): DataStreamWriter[T]
146
def text(path: String): DataStreamWriter[T]
147
148
// Table sinks
149
def table(tableName: String): DataStreamWriter[T]
150
def toTable(tableName: String): DataStreamWriter[T]
151
152
// Generic start
153
def start(): StreamingQuery
154
def start(path: String): StreamingQuery
155
}
156
```
157
158
#### StreamingQuery
159
160
Handle to a running streaming query.
161
162
```scala { .api }
163
trait StreamingQuery {
164
// Query control
165
def start(): StreamingQuery
166
def stop(): Unit
167
def awaitTermination(): Unit
168
def awaitTermination(timeoutMs: Long): Boolean
169
def processAllAvailable(): Unit
170
171
// Query properties
172
def id: UUID
173
def runId: UUID
174
def name: String
175
def explain(): Unit
176
def explain(extended: Boolean): Unit
177
178
// Query status
179
def isActive: Boolean
180
def exception: Option[StreamingQueryException]
181
def status: StreamingQueryStatus
182
def recentProgress: Array[StreamingQueryProgress]
183
def lastProgress: StreamingQueryProgress
184
}
185
```
186
187
#### StreamingQueryManager
188
189
Manager for streaming queries.
190
191
```scala { .api }
192
class StreamingQueryManager {
193
// Query management
194
def active: Array[StreamingQuery]
195
def get(id: UUID): StreamingQuery
196
def get(id: String): StreamingQuery
197
def resetTerminated(): Unit
198
199
// Waiting
200
def awaitAnyTermination(): Unit
201
def awaitAnyTermination(timeoutMs: Long): Boolean
202
203
// Listeners
204
def addListener(listener: StreamingQueryListener): Unit
205
def removeListener(listener: StreamingQueryListener): Unit
206
}
207
```
208
209
#### Output Modes
210
211
```scala { .api }
212
object OutputMode {
213
val Append: OutputMode
214
val Complete: OutputMode
215
val Update: OutputMode
216
}
217
```
218
219
#### Triggers
220
221
```scala { .api }
222
sealed trait Trigger
223
224
object Trigger {
225
def ProcessingTime(interval: String): Trigger
226
def ProcessingTime(interval: Duration): Trigger
227
def ProcessingTime(interval: Long, unit: TimeUnit): Trigger
228
def Once(): Trigger
229
def Continuous(interval: String): Trigger
230
def Continuous(interval: Duration): Trigger
231
def AvailableNow(): Trigger
232
}
233
```
234
235
#### ForeachWriter
236
237
Custom sink for streaming data.
238
239
```scala { .api }
240
abstract class ForeachWriter[T] extends Serializable {
241
def open(partitionId: Long, epochId: Long): Boolean
242
def process(value: T): Unit
243
def close(errorOrNull: Throwable): Unit
244
}
245
```
246
247
Usage example:
248
249
```scala
250
val customWriter = new ForeachWriter[Row] {
251
def open(partitionId: Long, epochId: Long): Boolean = {
252
// Initialize connection
253
true
254
}
255
256
def process(value: Row): Unit = {
257
// Process each row
258
println(s"Processing: ${value.toString}")
259
}
260
261
def close(errorOrNull: Throwable): Unit = {
262
// Clean up resources
263
}
264
}
265
266
val query = df.writeStream
267
.foreach(customWriter)
268
.start()
269
```
270
271
#### Windowing Operations
272
273
```scala { .api }
274
// Available through functions object
275
def window(timeColumn: Column, windowDuration: String): Column
276
def window(timeColumn: Column, windowDuration: String, slideDuration: String): Column
277
def window(timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String): Column
278
279
def session_window(timeColumn: Column, gapDuration: String): Column
280
def session_window(timeColumn: Column, gapDuration: Column): Column
281
```
282
283
Usage example:
284
285
```scala
286
import org.apache.spark.sql.functions._
287
288
// Tumbling window
289
val windowedCounts = df
290
.withWatermark("timestamp", "10 minutes")
291
.groupBy(window(col("timestamp"), "10 minutes"))
292
.count()
293
294
// Sliding window
295
val slidingWindowCounts = df
296
.withWatermark("timestamp", "10 minutes")
297
.groupBy(window(col("timestamp"), "10 minutes", "5 minutes"))
298
.count()
299
300
// Session window
301
val sessionCounts = df
302
.withWatermark("timestamp", "10 minutes")
303
.groupBy(session_window(col("timestamp"), "5 minutes"))
304
.count()
305
```
306
307
#### Watermarking
308
309
```scala { .api }
310
// Available on Dataset
311
def withWatermark(eventTime: String, delayThreshold: String): Dataset[T]
312
```
313
314
Usage example:
315
316
```scala
317
val watermarkedDF = df
318
.withWatermark("eventTime", "2 minutes")
319
.groupBy(window($"eventTime", "10 minutes"))
320
.count()
321
```
322
323
### Legacy DStreams API
324
325
The original streaming API based on RDDs (now in maintenance mode).
326
327
#### StreamingContext
328
329
The main entry point for DStreams functionality.
330
331
```scala { .api }
332
class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) {
333
def this(conf: SparkConf, batchDuration: Duration)
334
def this(path: String, hadoopConf: Configuration = new Configuration())
335
336
// Input sources
337
def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]
338
def textFileStream(directory: String): DStream[String]
339
def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): InputDStream[(K, V)]
340
def queueStream[T: ClassTag](queue: Queue[RDD[T]], oneAtATime: Boolean = true): InputDStream[T]
341
def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]
342
343
// Union
344
def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T]
345
346
// Control
347
def start(): Unit
348
def stop(stopSparkContext: Boolean = true, stopGracefully: Boolean = false): Unit
349
def awaitTermination(): Unit
350
def awaitTerminationOrTimeout(timeout: Long): Boolean
351
352
// State
353
def remember(duration: Duration): Unit
354
def checkpoint(directory: String): Unit
355
356
// Properties
357
def sparkContext: SparkContext
358
def graph: DStreamGraph
359
def getState(): StreamingContextState
360
}
361
362
object StreamingContext {
363
def getOrCreate(checkpointPath: String, creatingFunc: () => StreamingContext): StreamingContext
364
def getActive(): Option[StreamingContext]
365
def getActiveOrCreate(checkpointPath: String, creatingFunc: () => StreamingContext): StreamingContext
366
}
367
```
368
369
#### DStream
370
371
The fundamental abstraction in Spark Streaming representing a continuous stream of data.
372
373
```scala { .api }
374
abstract class DStream[T: ClassTag] extends Serializable {
375
// Transformations
376
def map[U: ClassTag](mapFunc: T => U): DStream[U]
377
def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U]
378
def filter(filterFunc: T => Boolean): DStream[T]
379
def glom(): DStream[Array[T]]
380
def repartition(numPartitions: Int): DStream[T]
381
def union(that: DStream[T]): DStream[T]
382
def count(): DStream[Long]
383
def countByValue(): DStream[(T, Long)]
384
def reduce(reduceFunc: (T, T) => T): DStream[T]
385
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]
386
def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
387
def reduceByWindow(reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
388
389
// Window operations
390
def window(windowDuration: Duration): DStream[T]
391
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
392
393
// Actions
394
def print(): Unit
395
def print(num: Int): Unit
396
def saveAsTextFiles(prefix: String, suffix: String = ""): Unit
397
def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit
398
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
399
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit
400
401
// State operations
402
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
403
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S], numPartitions: Int): DStream[(K, S)]
404
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner): DStream[(K, S)]
405
406
// Persistence
407
def cache(): DStream[T]
408
def persist(): DStream[T]
409
def persist(level: StorageLevel): DStream[T]
410
411
// Output
412
def register(): DStream[T]
413
414
// Properties
415
def context: StreamingContext
416
def ssc: StreamingContext
417
def slideDuration: Duration
418
}
419
```
420
421
#### PairDStreamFunctions
422
423
Additional operations for DStreams of key-value pairs.
424
425
```scala { .api }
426
class PairDStreamFunctions[K, V](self: DStream[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V]) {
427
// Transformations
428
def groupByKey(): DStream[(K, Iterable[V])]
429
def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]
430
def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])]
431
def reduceByKey(func: (V, V) => V): DStream[(K, V)]
432
def reduceByKey(func: (V, V) => V, numPartitions: Int): DStream[(K, V)]
433
def reduceByKey(func: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]
434
def combineByKey[C: ClassTag](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C): DStream[(K, C)]
435
def combineByKey[C: ClassTag](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, numPartitions: Int): DStream[(K, C)]
436
def combineByKey[C: ClassTag](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, partitioner: Partitioner): DStream[(K, C)]
437
438
// Window operations
439
def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]
440
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]
441
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int): DStream[(K, Iterable[V])]
442
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner): DStream[(K, Iterable[V])]
443
def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]
444
def reduceByKeyAndWindow(reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]
445
def reduceByKeyAndWindow(reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration, numPartitions: Int, filterFunc: ((K, V)) => Boolean = null): DStream[(K, V)]
446
447
// Join operations
448
def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]
449
def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))]
450
def join[W: ClassTag](other: DStream[(K, W)], partitioner: Partitioner): DStream[(K, (V, W))]
451
def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
452
def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]
453
def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))]
454
def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]
455
456
// State operations
457
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
458
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S], numPartitions: Int): DStream[(K, S)]
459
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner): DStream[(K, S)]
460
def mapWithState[StateType: ClassTag, MappedType: ClassTag](spec: StateSpec[K, V, StateType, MappedType]): MapWithStateDStream[K, V, StateType, MappedType]
461
462
// Output
463
def saveAsHadoopFiles[F <: OutputFormat[K, V]](prefix: String, suffix: String = ""): Unit
464
def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](prefix: String, suffix: String = ""): Unit
465
}
466
```
467
468
#### Input Sources
469
470
Various built-in input sources for DStreams.
471
472
```scala { .api }
473
// File-based sources
474
object StreamingContext {
475
def textFileStream(directory: String): DStream[String]
476
def binaryRecordsStream(directory: String, recordLength: Int): DStream[Array[Byte]]
477
}
478
479
// Network sources
480
object StreamingContext {
481
def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]
482
def socketStream[T: ClassTag](hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel): ReceiverInputDStream[T]
483
}
484
485
// Queue source (for testing)
486
object StreamingContext {
487
def queueStream[T: ClassTag](queue: Queue[RDD[T]], oneAtATime: Boolean = true, defaultRDD: RDD[T] = null): InputDStream[T]
488
}
489
```
490
491
#### Custom Receivers
492
493
```scala { .api }
494
abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {
495
def onStart(): Unit
496
def onStop(): Unit
497
def store(dataItem: T): Unit
498
def store(dataBuffer: ArrayBuffer[T]): Unit
499
def store(dataIterator: Iterator[T]): Unit
500
def store(dataBuffer: ArrayBuffer[T], metadata: Any): Unit
501
def store(dataIterator: Iterator[T], metadata: Any): Unit
502
def reportError(message: String, throwable: Throwable): Unit
503
def restart(message: String): Unit
504
def restart(message: String, error: Throwable): Unit
505
def restart(message: String, error: Throwable, millisecond: Int): Unit
506
def stop(message: String): Unit
507
def stop(message: String, error: Throwable): Unit
508
def isStarted(): Boolean
509
def isStopped(): Boolean
510
def hasStopped(): Boolean
511
}
512
```
513
514
Usage examples:
515
516
```scala
517
// Structured Streaming: File source
518
val fileStream = spark.readStream
519
.format("json")
520
.option("path", "/path/to/json/files")
521
.load()
522
523
val query = fileStream.writeStream
524
.format("console")
525
.outputMode("append")
526
.start()
527
528
// Structured Streaming: Kafka source
529
val kafkaStream = spark.readStream
530
.format("kafka")
531
.option("kafka.bootstrap.servers", "localhost:9092")
532
.option("subscribe", "topic1")
533
.load()
534
535
val parsedStream = kafkaStream
536
.select(from_json(col("value").cast("string"), schema).as("data"))
537
.select("data.*")
538
539
// DStreams: Custom receiver
540
class CustomReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
541
def onStart() {
542
// Start receiving data
543
new Thread("Custom Receiver") {
544
override def run() { receive() }
545
}.start()
546
}
547
548
def onStop() {
549
// Stop receiving data
550
}
551
552
private def receive() {
553
while (!isStopped()) {
554
// Receive data and store it
555
val data = // ... get data from somewhere
556
store(data)
557
}
558
}
559
}
560
561
val customStream = ssc.receiverStream(new CustomReceiver())
562
```