0
# Stream Processing
1
2
Structured Streaming provides real-time stream processing with exactly-once fault-tolerance guarantees. Built on the Spark SQL engine for seamless integration with batch processing. Also includes legacy Spark Streaming (DStreams) for micro-batch processing.
3
4
## Capabilities
5
6
### Structured Streaming
7
8
Modern streaming API built on DataFrame/Dataset with continuous processing and exactly-once guarantees.
9
10
```scala { .api }
11
/**
12
* Reader for streaming data sources
13
*/
14
class DataStreamReader {
15
/** Specify streaming data source format */
16
def format(source: String): DataStreamReader
17
/** Add input option */
18
def option(key: String, value: String): DataStreamReader
19
def option(key: String, value: Boolean): DataStreamReader
20
def option(key: String, value: Long): DataStreamReader
21
def option(key: String, value: Double): DataStreamReader
22
/** Add multiple options */
23
def options(options: Map[String, String]): DataStreamReader
24
/** Set expected schema */
25
def schema(schema: StructType): DataStreamReader
26
def schema(schemaString: String): DataStreamReader
27
28
/** Load streaming DataFrame */
29
def load(): DataFrame
30
def load(path: String): DataFrame
31
32
/** Format-specific methods */
33
def json(path: String): DataFrame
34
def csv(path: String): DataFrame
35
def parquet(path: String): DataFrame
36
def text(path: String): DataFrame
37
def textFile(path: String): Dataset[String]
38
def kafka(options: Map[String, String]): DataFrame
39
def socket(host: String, port: Int): DataFrame
40
}
41
42
/**
43
* Writer for streaming data sinks
44
*/
45
class DataStreamWriter[T] {
46
/** Set output mode */
47
def outputMode(outputMode: OutputMode): DataStreamWriter[T]
48
def outputMode(outputMode: String): DataStreamWriter[T]
49
/** Set trigger interval */
50
def trigger(trigger: Trigger): DataStreamWriter[T]
51
/** Specify output format */
52
def format(source: String): DataStreamWriter[T]
53
/** Add output option */
54
def option(key: String, value: String): DataStreamWriter[T]
55
def option(key: String, value: Boolean): DataStreamWriter[T]
56
def option(key: String, value: Long): DataStreamWriter[T]
57
def option(key: String, value: Double): DataStreamWriter[T]
58
/** Add multiple options */
59
def options(options: Map[String, String]): DataStreamWriter[T]
60
/** Partition output by columns */
61
def partitionBy(colNames: String*): DataStreamWriter[T]
62
/** Set query name */
63
def queryName(queryName: String): DataStreamWriter[T]
64
65
/** Start streaming query */
66
def start(): StreamingQuery
67
def start(path: String): StreamingQuery
68
69
/** Format-specific methods */
70
def console(): StreamingQuery
71
def json(path: String): StreamingQuery
72
def csv(path: String): StreamingQuery
73
def parquet(path: String): StreamingQuery
74
def text(path: String): StreamingQuery
75
def kafka(): StreamingQuery
76
def memory(queryName: String): StreamingQuery
77
}
78
79
/**
80
* Output modes for streaming queries
81
*/
82
object OutputMode {
83
val Append: OutputMode
84
val Complete: OutputMode
85
val Update: OutputMode
86
}
87
88
/**
89
* Trigger policies for streaming queries
90
*/
91
object Trigger {
92
/** Process data as fast as possible */
93
def ProcessingTime(interval: String): Trigger
94
def ProcessingTime(interval: Long, unit: TimeUnit): Trigger
95
/** Process data once then stop */
96
def Once(): Trigger
97
/** Continuous processing with low latency */
98
def Continuous(interval: String): Trigger
99
def Continuous(interval: Long, unit: TimeUnit): Trigger
100
}
101
```
102
103
**Usage Examples:**
104
105
```scala
106
import org.apache.spark.sql.streaming._
107
import org.apache.spark.sql.functions._
108
109
// Read from file source
110
val lines = spark.readStream
111
.format("text")
112
.option("path", "input-directory")
113
.load()
114
115
// Word count example
116
val words = lines
117
.select(explode(split($"value", " ")).as("word"))
118
.groupBy("word")
119
.count()
120
121
// Write to console
122
val query = words.writeStream
123
.outputMode("complete")
124
.format("console")
125
.trigger(Trigger.ProcessingTime("10 seconds"))
126
.start()
127
128
query.awaitTermination()
129
130
// Kafka source and sink
131
val kafkaDF = spark.readStream
132
.format("kafka")
133
.option("kafka.bootstrap.servers", "localhost:9092")
134
.option("subscribe", "topic1")
135
.load()
136
137
val processedDF = kafkaDF
138
.select(from_json($"value".cast("string"), schema).as("data"))
139
.select("data.*")
140
.groupBy("category")
141
.count()
142
143
val output = processedDF.writeStream
144
.format("kafka")
145
.option("kafka.bootstrap.servers", "localhost:9092")
146
.option("topic", "output-topic")
147
.outputMode("update")
148
.start()
149
```
150
151
### StreamingQuery Management
152
153
Interface for managing and monitoring streaming queries.
154
155
```scala { .api }
156
/**
157
* Handle to a running streaming query
158
*/
159
abstract class StreamingQuery {
160
/** Unique identifier for the query */
161
def id: UUID
162
/** Name of the query */
163
def name: String
164
/** Check if query is currently active */
165
def isActive: Boolean
166
/** Get current status */
167
def status: StreamingQueryStatus
168
/** Get recent progress updates */
169
def recentProgress: Array[StreamingQueryProgress]
170
/** Get last progress update */
171
def lastProgress: StreamingQueryProgress
172
/** Block until query terminates */
173
def awaitTermination(): Unit
174
def awaitTermination(timeoutMs: Long): Boolean
175
/** Stop the query */
176
def stop(): Unit
177
/** Get exception that caused query to fail */
178
def exception: Option[StreamingQueryException]
179
/** Explain the streaming query plan */
180
def explain(): Unit
181
def explain(extended: Boolean): Unit
182
}
183
184
/**
185
* Manager for streaming queries
186
*/
187
class StreamingQueryManager {
188
/** Get currently active queries */
189
def active: Array[StreamingQuery]
190
/** Get query by id */
191
def get(id: UUID): StreamingQuery
192
def get(id: String): StreamingQuery
193
/** Block until all queries terminate */
194
def awaitAnyTermination(): Unit
195
def awaitAnyTermination(timeoutMs: Long): Boolean
196
/** Stop all active queries */
197
def stopAll(): Unit
198
/** Add listener for query events */
199
def addListener(listener: StreamingQueryListener): Unit
200
/** Remove listener */
201
def removeListener(listener: StreamingQueryListener): Unit
202
}
203
204
/**
205
* Progress information for streaming queries
206
*/
207
case class StreamingQueryProgress(
208
id: UUID,
209
runId: UUID,
210
name: String,
211
timestamp: String,
212
batchId: Long,
213
batchDuration: Long,
214
durationMs: Map[String, Long],
215
eventTime: Map[String, String],
216
stateOperators: Array[StateOperatorProgress],
217
sources: Array[SourceProgress],
218
sink: SinkProgress
219
)
220
```
221
222
### Window Operations
223
224
Time-based windowing for streaming aggregations.
225
226
```scala { .api }
227
/**
228
* Window functions for streaming data
229
*/
230
object functions {
231
/** Tumbling time window */
232
def window(
233
timeColumn: Column,
234
windowDuration: String
235
): Column
236
237
/** Sliding time window */
238
def window(
239
timeColumn: Column,
240
windowDuration: String,
241
slideDuration: String
242
): Column
243
244
/** Sliding time window with start offset */
245
def window(
246
timeColumn: Column,
247
windowDuration: String,
248
slideDuration: String,
249
startTime: String
250
): Column
251
252
/** Session window */
253
def session_window(
254
timeColumn: Column,
255
gapDuration: String
256
): Column
257
}
258
```
259
260
**Usage Examples:**
261
262
```scala
263
import org.apache.spark.sql.functions._
264
265
// Tumbling window (10 minute windows)
266
val windowed = events
267
.withWatermark("timestamp", "10 minutes")
268
.groupBy(
269
window($"timestamp", "10 minutes"),
270
$"category"
271
)
272
.count()
273
274
// Sliding window (10 minute windows, sliding every 5 minutes)
275
val sliding = events
276
.withWatermark("timestamp", "10 minutes")
277
.groupBy(
278
window($"timestamp", "10 minutes", "5 minutes"),
279
$"userId"
280
)
281
.agg(sum("amount").as("total"))
282
283
// Session window (gap-based)
284
val sessions = events
285
.withWatermark("timestamp", "30 minutes")
286
.groupBy(
287
$"userId",
288
session_window($"timestamp", "20 minutes")
289
)
290
.count()
291
```
292
293
### Watermarking and Late Data
294
295
Handling late-arriving data with watermarks.
296
297
```scala { .api }
298
/**
299
* Watermarking for handling late data
300
*/
301
implicit class DatasetWatermark[T](ds: Dataset[T]) {
302
/** Define watermark for late data handling */
303
def withWatermark(eventTime: String, delayThreshold: String): Dataset[T]
304
}
305
```
306
307
**Usage Examples:**
308
309
```scala
310
// Handle late data with watermarking
311
val result = inputStream
312
.withWatermark("eventTime", "10 minutes") // Allow 10 minutes of late data
313
.groupBy(
314
window($"eventTime", "5 minutes"),
315
$"deviceId"
316
)
317
.count()
318
319
// Watermark affects state cleanup
320
val stateStream = inputStream
321
.withWatermark("eventTime", "1 hour")
322
.groupBy($"sessionId")
323
.agg(
324
min("eventTime").as("sessionStart"),
325
max("eventTime").as("sessionEnd"),
326
count("*").as("eventCount")
327
)
328
```
329
330
### Spark Streaming (Legacy DStreams)
331
332
Original streaming API using discretized streams (micro-batches).
333
334
```scala { .api }
335
/**
336
* Main entry point for Spark Streaming
337
*/
338
class StreamingContext(conf: SparkConf, batchDuration: Duration) {
339
/** Create DStream from TCP socket */
340
def socketTextStream(
341
hostname: String,
342
port: Int,
343
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
344
): ReceiverInputDStream[String]
345
346
/** Monitor directory for new text files */
347
def textFileStream(directory: String): DStream[String]
348
349
/** Monitor directory for new files */
350
def fileStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag](
351
directory: String
352
): InputDStream[(K, V)]
353
354
/** Create DStream from RDD queue */
355
def queueStream[T: ClassTag](
356
queue: Queue[RDD[T]],
357
oneAtATime: Boolean = true
358
): InputDStream[T]
359
360
/** Union multiple DStreams */
361
def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T]
362
363
/** Start streaming computation */
364
def start(): Unit
365
/** Stop streaming */
366
def stop(stopSparkContext: Boolean = true, stopGracefully: Boolean = false): Unit
367
/** Wait for termination */
368
def awaitTermination(): Unit
369
def awaitTermination(timeout: Long): Boolean
370
/** Set remember duration for RDDs */
371
def remember(duration: Duration): Unit
372
/** Set checkpoint directory */
373
def checkpoint(directory: String): Unit
374
}
375
376
/**
377
* Discretized Stream - sequence of RDDs
378
*/
379
abstract class DStream[T: ClassTag] {
380
/** Transform each element */
381
def map[U: ClassTag](mapFunc: T => U): DStream[U]
382
/** Transform and flatten */
383
def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U]
384
/** Filter elements */
385
def filter(filterFunc: T => Boolean): DStream[T]
386
/** Reduce elements in each RDD */
387
def reduce(reduceFunc: (T, T) => T): DStream[T]
388
/** Count elements in each RDD */
389
def count(): DStream[Long]
390
/** Union with another DStream */
391
def union(that: DStream[T]): DStream[T]
392
393
/** Transform with function on RDD */
394
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]
395
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]
396
397
/** Window operations */
398
def window(windowDuration: Duration): DStream[T]
399
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
400
/** Reduce over window */
401
def reduceByWindow(
402
reduceFunc: (T, T) => T,
403
windowDuration: Duration,
404
slideDuration: Duration
405
): DStream[T]
406
/** Count over window */
407
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]
408
409
/** Apply function to each RDD */
410
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
411
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit
412
/** Print elements from each RDD */
413
def print(num: Int = 10): Unit
414
/** Save as text files */
415
def saveAsTextFiles(prefix: String, suffix: String = ""): Unit
416
417
/** Cache stream RDDs */
418
def cache(): DStream[T]
419
/** Persist stream RDDs */
420
def persist(level: StorageLevel = StorageLevel.MEMORY_ONLY_SER): DStream[T]
421
}
422
```
423
424
### DStream Operations for Key-Value Pairs
425
426
Additional operations for DStreams of key-value pairs.
427
428
```scala { .api }
429
/**
430
* Additional operations for pair DStreams
431
*/
432
implicit class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K, V)]) {
433
/** Group values by key */
434
def groupByKey(): DStream[(K, Iterable[V])]
435
def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]
436
437
/** Reduce values by key */
438
def reduceByKey(func: (V, V) => V): DStream[(K, V)]
439
def reduceByKey(func: (V, V) => V, numPartitions: Int): DStream[(K, V)]
440
441
/** Reduce values by key over window */
442
def reduceByKeyAndWindow(
443
reduceFunc: (V, V) => V,
444
windowDuration: Duration,
445
slideDuration: Duration
446
): DStream[(K, V)]
447
448
/** Join with another pair DStream */
449
def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]
450
def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))]
451
452
/** Left outer join */
453
def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
454
455
/** Cogroup with another pair DStream */
456
def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]
457
458
/** Update state by key */
459
def updateStateByKey[S: ClassTag](
460
updateFunc: (Seq[V], Option[S]) => Option[S]
461
): DStream[(K, S)]
462
463
/** Map with state */
464
def mapWithState[StateType: ClassTag, MappedType: ClassTag](
465
spec: StateSpec[K, V, StateType, MappedType]
466
): MapWithStateDStream[K, V, StateType, MappedType]
467
}
468
```
469
470
**Usage Examples:**
471
472
```scala
473
import org.apache.spark.streaming._
474
import org.apache.spark.streaming.dstream._
475
476
// Create StreamingContext
477
val conf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]")
478
val ssc = new StreamingContext(conf, Seconds(1))
479
ssc.checkpoint("checkpoint")
480
481
// Socket stream
482
val lines = ssc.socketTextStream("localhost", 9999)
483
484
// Word count
485
val words = lines.flatMap(_.split(" "))
486
val pairs = words.map(word => (word, 1))
487
val wordCounts = pairs.reduceByKey(_ + _)
488
wordCounts.print()
489
490
// Windowed operations
491
val windowedWordCounts = pairs
492
.reduceByKeyAndWindow(
493
(a: Int, b: Int) => (a + b),
494
Seconds(30),
495
Seconds(10)
496
)
497
498
// Stateful operations
499
val runningCounts = pairs.updateStateByKey[Int] { (values, state) =>
500
val currentCount = values.sum
501
val previousCount = state.getOrElse(0)
502
Some(currentCount + previousCount)
503
}
504
505
// File stream
506
val textFiles = ssc.textFileStream("input-directory")
507
val processedFiles = textFiles
508
.flatMap(_.split("\n"))
509
.filter(_.nonEmpty)
510
.map(line => (line.split(",")(0), 1))
511
.reduceByKey(_ + _)
512
513
ssc.start()
514
ssc.awaitTermination()
515
```
516
517
### Time and Duration
518
519
Time handling utilities for streaming applications.
520
521
```scala { .api }
522
/**
523
* Duration for streaming intervals
524
*/
525
case class Duration(private val millis: Long) {
526
def +(other: Duration): Duration
527
def -(other: Duration): Duration
528
def *(times: Int): Duration
529
def /(that: Duration): Double
530
def <(other: Duration): Boolean
531
def <=(other: Duration): Boolean
532
def >(other: Duration): Boolean
533
def >=(other: Duration): Boolean
534
def milliseconds: Long
535
}
536
537
/** Duration factory methods */
538
object Duration {
539
def apply(length: Long): Duration
540
}
541
542
object Milliseconds {
543
def apply(milliseconds: Long): Duration
544
}
545
546
object Seconds {
547
def apply(seconds: Long): Duration
548
}
549
550
object Minutes {
551
def apply(minutes: Long): Duration
552
}
553
554
/**
555
* Time instance for DStreams
556
*/
557
case class Time(private val millis: Long) {
558
def +(other: Duration): Time
559
def -(other: Duration): Time
560
def -(other: Time): Duration
561
def <(other: Time): Boolean
562
def <=(other: Time): Boolean
563
def >(other: Time): Boolean
564
def >=(other: Time): Boolean
565
def milliseconds: Long
566
}
567
```
568
569
### Checkpointing and Fault Tolerance
570
571
Mechanisms for fault tolerance in streaming applications.
572
573
```scala { .api }
574
/**
575
* Checkpointing utilities
576
*/
577
object StreamingContext {
578
/** Create or recover StreamingContext from checkpoint */
579
def getOrCreate(
580
checkpointPath: String,
581
creatingFunc: () => StreamingContext
582
): StreamingContext
583
584
/** Create or recover with Hadoop configuration */
585
def getOrCreate(
586
checkpointPath: String,
587
creatingFunc: () => StreamingContext,
588
hadoopConf: Configuration
589
): StreamingContext
590
}
591
```
592
593
**Usage Examples:**
594
595
```scala
596
// Checkpointing example
597
def createStreamingContext(): StreamingContext = {
598
val conf = new SparkConf().setAppName("CheckpointApp")
599
val ssc = new StreamingContext(conf, Seconds(5))
600
ssc.checkpoint("hdfs://checkpoint")
601
602
// Define streaming computation
603
val lines = ssc.socketTextStream("localhost", 9999)
604
val words = lines.flatMap(_.split(" "))
605
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
606
wordCounts.print()
607
608
ssc
609
}
610
611
// Recovery from checkpoint
612
val ssc = StreamingContext.getOrCreate("hdfs://checkpoint", createStreamingContext _)
613
ssc.start()
614
ssc.awaitTermination()
615
```
616
617
## Error Handling
618
619
Common streaming exceptions:
620
621
- `StreamingQueryException` - Streaming query execution failures
622
- `AnalysisException` - Invalid streaming operations or configurations
623
- `IllegalStateException` - Invalid streaming application state
624
- `TimeoutException` - Query termination timeouts