0
# Streaming Queries
1
2
Spark Structured Streaming provides a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. It allows you to express streaming computations the same way you would express a batch computation on static data, using the same DataFrame and Dataset APIs.
3
4
## Core Streaming Concepts
5
6
### DataStreamReader
7
8
```scala { .api }
9
class DataStreamReader {
10
def format(source: String): DataStreamReader
11
def schema(schema: StructType): DataStreamReader
12
def schema(schemaString: String): DataStreamReader
13
def option(key: String, value: String): DataStreamReader
14
def option(key: String, value: Boolean): DataStreamReader
15
def option(key: String, value: Long): DataStreamReader
16
def option(key: String, value: Double): DataStreamReader
17
def options(options: scala.collection.Map[String, String]): DataStreamReader
18
def options(options: java.util.Map[String, String]): DataStreamReader
19
def load(): DataFrame
20
def load(path: String): DataFrame
21
}
22
```
23
24
### DataStreamWriter
25
26
```scala { .api }
27
class DataStreamWriter[T] {
28
def outputMode(outputMode: OutputMode): DataStreamWriter[T]
29
def outputMode(outputMode: String): DataStreamWriter[T]
30
def trigger(trigger: Trigger): DataStreamWriter[T]
31
def queryName(queryName: String): DataStreamWriter[T]
32
def format(source: String): DataStreamWriter[T]
33
def partitionBy(colNames: String*): DataStreamWriter[T]
34
def option(key: String, value: String): DataStreamWriter[T]
35
def option(key: String, value: Boolean): DataStreamWriter[T]
36
def option(key: String, value: Long): DataStreamWriter[T]
37
def option(key: String, value: Double): DataStreamWriter[T]
38
def options(options: scala.collection.Map[String, String]): DataStreamWriter[T]
39
def options(options: java.util.Map[String, String]): DataStreamWriter[T]
40
def start(): StreamingQuery
41
def start(path: String): StreamingQuery
42
def foreach(writer: ForeachWriter[T]): DataStreamWriter[T]
43
def foreachBatch(function: (Dataset[T], Long) => Unit): DataStreamWriter[T]
44
}
45
```
46
47
## Stream Sources
48
49
### File Sources
50
51
**Usage Examples:**
52
53
```scala
54
// JSON file stream
55
val jsonStream = spark.readStream
56
.format("json")
57
.schema(schema) // Schema is required for file sources
58
.option("path", "/path/to/json/files")
59
.load()
60
61
// CSV file stream
62
val csvStream = spark.readStream
63
.format("csv")
64
.schema(csvSchema)
65
.option("header", "true")
66
.option("path", "/path/to/csv/files")
67
.load()
68
69
// Parquet file stream
70
val parquetStream = spark.readStream
71
.format("parquet")
72
.schema(parquetSchema)
73
.load("/path/to/parquet/files")
74
75
// Text file stream
76
val textStream = spark.readStream
77
.format("text")
78
.option("path", "/path/to/text/files")
79
.load()
80
```
81
82
### Kafka Source
83
84
```scala
85
// Basic Kafka stream
86
val kafkaStream = spark.readStream
87
.format("kafka")
88
.option("kafka.bootstrap.servers", "localhost:9092")
89
.option("subscribe", "topic1,topic2")
90
.load()
91
92
// Kafka with specific partitions
93
val kafkaPartitions = spark.readStream
94
.format("kafka")
95
.option("kafka.bootstrap.servers", "localhost:9092")
96
.option("subscribePattern", "topic.*")
97
.option("startingOffsets", "latest")
98
.option("endingOffsets", "latest")
99
.load()
100
101
// Kafka stream processing
102
val processedKafka = kafkaStream
103
.select(
104
col("key").cast("string"),
105
col("value").cast("string"),
106
col("topic"),
107
col("partition"),
108
col("offset"),
109
col("timestamp")
110
)
111
.filter(col("topic") === "important_topic")
112
```
113
114
### Socket Source (for testing)
115
116
```scala
117
// Socket stream (testing only)
118
val socketStream = spark.readStream
119
.format("socket")
120
.option("host", "localhost")
121
.option("port", 9999)
122
.load()
123
```
124
125
### Rate Source (for testing)
126
127
```scala
128
// Rate source for load testing
129
val rateStream = spark.readStream
130
.format("rate")
131
.option("rowsPerSecond", "100")
132
.option("numPartitions", "10")
133
.load()
134
```
135
136
## Output Modes
137
138
```scala { .api }
139
object OutputMode {
140
val Append: OutputMode
141
val Complete: OutputMode
142
val Update: OutputMode
143
}
144
```
145
146
### Output Mode Usage
147
148
```scala
149
// Append mode - only new rows added to result table
150
val appendQuery = df.writeStream
151
.outputMode(OutputMode.Append)
152
.format("console")
153
.start()
154
155
// Complete mode - entire result table is output
156
val completeQuery = df
157
.groupBy("category")
158
.count()
159
.writeStream
160
.outputMode(OutputMode.Complete)
161
.format("console")
162
.start()
163
164
// Update mode - only updated rows in result table
165
val updateQuery = df
166
.groupBy("id")
167
.agg(max("timestamp"))
168
.writeStream
169
.outputMode(OutputMode.Update)
170
.format("console")
171
.start()
172
```
173
174
## Triggers
175
176
```scala { .api }
177
object Trigger {
178
def ProcessingTime(interval: String): Trigger
179
def ProcessingTime(interval: Long, unit: TimeUnit): Trigger
180
def Once(): Trigger
181
def Continuous(interval: String): Trigger
182
def Continuous(interval: Long, unit: TimeUnit): Trigger
183
}
184
```
185
186
### Trigger Examples
187
188
```scala
189
import org.apache.spark.sql.streaming.Trigger
190
import java.util.concurrent.TimeUnit
191
192
// Micro-batch processing
193
val microBatchQuery = df.writeStream
194
.trigger(Trigger.ProcessingTime("30 seconds"))
195
.outputMode("append")
196
.format("console")
197
.start()
198
199
// One-time trigger (batch-like)
200
val onceQuery = df.writeStream
201
.trigger(Trigger.Once())
202
.outputMode("append")
203
.format("parquet")
204
.option("path", "/output/path")
205
.start()
206
207
// Continuous processing (experimental)
208
val continuousQuery = df.writeStream
209
.trigger(Trigger.Continuous("1 second"))
210
.outputMode("append")
211
.format("console")
212
.start()
213
```
214
215
## StreamingQuery Management
216
217
```scala { .api }
218
class StreamingQuery {
219
def id: UUID
220
def runId: UUID
221
def name: String
222
def sparkSession: SparkSession
223
def isActive: Boolean
224
def exception: Option[StreamingQueryException]
225
def status: StreamingQueryStatus
226
def recentProgress: Array[StreamingQueryProgress]
227
def lastProgress: StreamingQueryProgress
228
229
def awaitTermination(): Unit
230
def awaitTermination(timeoutMs: Long): Boolean
231
def processAllAvailable(): Unit
232
def stop(): Unit
233
def explain(): Unit
234
def explain(extended: Boolean): Unit
235
}
236
```
237
238
### Query Lifecycle Management
239
240
**Usage Examples:**
241
242
```scala
243
// Start a streaming query
244
val query = df
245
.groupBy("category")
246
.count()
247
.writeStream
248
.queryName("category_counts")
249
.outputMode("complete")
250
.format("memory")
251
.start()
252
253
// Monitor query status
254
println(s"Query ID: ${query.id}")
255
println(s"Is Active: ${query.isActive}")
256
println(s"Recent Progress: ${query.recentProgress.length} batches")
257
258
// Wait for termination
259
query.awaitTermination()
260
261
// Or wait with timeout
262
val finished = query.awaitTermination(60000) // 60 seconds
263
if (!finished) {
264
println("Query still running after timeout")
265
query.stop()
266
}
267
268
// Process all available data (testing)
269
query.processAllAvailable()
270
271
// Stop the query
272
query.stop()
273
274
// Exception handling
275
query.exception match {
276
case Some(e) => println(s"Query failed: ${e.getMessage}")
277
case None => println("Query completed successfully")
278
}
279
```
280
281
### Multiple Query Management
282
283
```scala
284
// Manage multiple streaming queries
285
val queries = mutable.ArrayBuffer[StreamingQuery]()
286
287
// Start multiple queries
288
queries += df1.writeStream.queryName("query1").format("console").start()
289
queries += df2.writeStream.queryName("query2").format("console").start()
290
queries += df3.writeStream.queryName("query3").format("console").start()
291
292
// Wait for all queries
293
try {
294
queries.foreach(_.awaitTermination())
295
} catch {
296
case e: Exception =>
297
println(s"A query failed: ${e.getMessage}")
298
queries.foreach(_.stop())
299
}
300
301
// Access all active queries
302
val allQueries = spark.streams.active
303
allQueries.foreach { query =>
304
println(s"Query: ${query.name}, Active: ${query.isActive}")
305
}
306
```
307
308
## Stream Sinks
309
310
### Console Sink
311
312
```scala
313
// Basic console output
314
val consoleQuery = df.writeStream
315
.format("console")
316
.outputMode("append")
317
.start()
318
319
// Console with options
320
val detailedConsole = df.writeStream
321
.format("console")
322
.option("numRows", "50")
323
.option("truncate", "false")
324
.outputMode("append")
325
.start()
326
```
327
328
### File Sinks
329
330
```scala
331
// Parquet sink
332
val parquetSink = df.writeStream
333
.format("parquet")
334
.option("path", "/output/parquet")
335
.option("checkpointLocation", "/checkpoint/parquet")
336
.outputMode("append")
337
.start()
338
339
// JSON sink with partitioning
340
val jsonSink = df.writeStream
341
.format("json")
342
.option("path", "/output/json")
343
.option("checkpointLocation", "/checkpoint/json")
344
.partitionBy("year", "month")
345
.outputMode("append")
346
.start()
347
348
// Delta sink (requires Delta Lake)
349
val deltaSink = df.writeStream
350
.format("delta")
351
.option("path", "/delta/table")
352
.option("checkpointLocation", "/checkpoint/delta")
353
.outputMode("append")
354
.start()
355
```
356
357
### Kafka Sink
358
359
```scala
360
// Kafka sink
361
val kafkaSink = df
362
.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")
363
.writeStream
364
.format("kafka")
365
.option("kafka.bootstrap.servers", "localhost:9092")
366
.option("topic", "output_topic")
367
.option("checkpointLocation", "/checkpoint/kafka")
368
.outputMode("append")
369
.start()
370
371
// Kafka sink with headers
372
val kafkaWithHeaders = df
373
.withColumn("headers",
374
array(struct(lit("source").alias("key"), lit("spark").alias("value"))))
375
.selectExpr(
376
"CAST(id AS STRING) AS key",
377
"to_json(struct(*)) AS value",
378
"headers"
379
)
380
.writeStream
381
.format("kafka")
382
.option("kafka.bootstrap.servers", "localhost:9092")
383
.option("topic", "output_topic")
384
.outputMode("append")
385
.start()
386
```
387
388
### Memory Sink (for testing)
389
390
```scala
391
// Memory sink for testing
392
val memoryQuery = df
393
.groupBy("category")
394
.count()
395
.writeStream
396
.queryName("memory_table")
397
.format("memory")
398
.outputMode("complete")
399
.start()
400
401
// Query the memory table
402
spark.sql("SELECT * FROM memory_table").show()
403
```
404
405
## Custom Sinks with ForeachWriter
406
407
```scala { .api }
408
abstract class ForeachWriter[T] extends Serializable {
409
def open(partitionId: Long, epochId: Long): Boolean
410
def process(value: T): Unit
411
def close(errorOrNull: Throwable): Unit
412
}
413
```
414
415
### Custom ForeachWriter Example
416
417
```scala
418
import org.apache.spark.sql.ForeachWriter
419
420
// Custom writer to database
421
class DatabaseWriter extends ForeachWriter[Row] {
422
var connection: java.sql.Connection = _
423
var statement: java.sql.PreparedStatement = _
424
425
def open(partitionId: Long, epochId: Long): Boolean = {
426
// Initialize connection
427
connection = java.sql.DriverManager.getConnection(
428
"jdbc:postgresql://localhost/mydb", "user", "password")
429
statement = connection.prepareStatement(
430
"INSERT INTO results (id, value, timestamp) VALUES (?, ?, ?)")
431
true
432
}
433
434
def process(value: Row): Unit = {
435
// Process each row
436
statement.setLong(1, value.getLong(0))
437
statement.setString(2, value.getString(1))
438
statement.setTimestamp(3, value.getTimestamp(2))
439
statement.executeUpdate()
440
}
441
442
def close(errorOrNull: Throwable): Unit = {
443
// Clean up
444
if (statement != null) statement.close()
445
if (connection != null) connection.close()
446
}
447
}
448
449
// Use custom writer
450
val customWriterQuery = df.writeStream
451
.foreach(new DatabaseWriter())
452
.outputMode("append")
453
.start()
454
```
455
456
### ForeachBatch for Custom Logic
457
458
```scala
459
// ForeachBatch for custom processing
460
val foreachBatchQuery = df.writeStream
461
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
462
println(s"Processing batch $batchId")
463
464
// Custom logic per batch
465
if (batchDF.count() > 0) {
466
// Write to multiple sinks
467
batchDF.write
468
.mode("append")
469
.parquet(s"/output/batch_$batchId")
470
471
// Also update a summary table
472
val summary = batchDF
473
.groupBy("category")
474
.agg(sum("amount").alias("total"))
475
.withColumn("batch_id", lit(batchId))
476
477
summary.write
478
.mode("append")
479
.saveAsTable("batch_summaries")
480
}
481
}
482
.outputMode("append")
483
.start()
484
```
485
486
## Windowed Aggregations
487
488
### Time Windows
489
490
```scala
491
import org.apache.spark.sql.functions._
492
493
// Tumbling window
494
val tumblingWindow = df
495
.withWatermark("timestamp", "10 minutes")
496
.groupBy(
497
window(col("timestamp"), "15 minutes"),
498
col("category")
499
)
500
.agg(
501
sum("amount").alias("total_amount"),
502
count("*").alias("count")
503
)
504
505
// Sliding window
506
val slidingWindow = df
507
.withWatermark("timestamp", "10 minutes")
508
.groupBy(
509
window(col("timestamp"), "15 minutes", "5 minutes"),
510
col("category")
511
)
512
.agg(
513
avg("value").alias("avg_value"),
514
max("value").alias("max_value")
515
)
516
517
// Session window (Spark 3.2+)
518
val sessionWindow = df
519
.withWatermark("timestamp", "10 minutes")
520
.groupBy(
521
session_window(col("timestamp"), "30 minutes"),
522
col("user_id")
523
)
524
.agg(
525
count("*").alias("events_in_session"),
526
sum("duration").alias("total_duration")
527
)
528
```
529
530
### Watermarks for Late Data
531
532
```scala
533
// Handle late data with watermarks
534
val lateDataQuery = df
535
.withWatermark("event_time", "10 minutes") // Allow 10 minutes of late data
536
.groupBy(
537
window(col("event_time"), "5 minutes"),
538
col("device_id")
539
)
540
.agg(
541
avg("temperature").alias("avg_temp"),
542
count("*").alias("reading_count")
543
)
544
.writeStream
545
.outputMode("append") // Only complete windows are output
546
.format("console")
547
.start()
548
```
549
550
## Stream-Stream Joins
551
552
```scala
553
// Inner join between two streams
554
val stream1 = spark.readStream.format("kafka")...
555
val stream2 = spark.readStream.format("kafka")...
556
557
val joined = stream1.alias("s1")
558
.join(
559
stream2.alias("s2"),
560
expr("s1.id = s2.id AND s1.timestamp BETWEEN s2.timestamp - INTERVAL 5 MINUTES AND s2.timestamp + INTERVAL 5 MINUTES")
561
)
562
563
// Stream-static join
564
val staticDF = spark.read.parquet("/path/to/static/data")
565
val streamStaticJoin = streamDF
566
.join(staticDF, "key") // No time constraints needed
567
568
// Left outer join with watermarks
569
val outerJoined = stream1
570
.withWatermark("timestamp", "10 minutes")
571
.alias("left")
572
.join(
573
stream2.withWatermark("timestamp", "20 minutes").alias("right"),
574
expr("left.id = right.id AND left.timestamp BETWEEN right.timestamp - INTERVAL 5 MINUTES AND right.timestamp + INTERVAL 5 MINUTES"),
575
"leftOuter"
576
)
577
```
578
579
## Stateful Processing
580
581
### mapGroupsWithState
582
583
```scala
584
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode}
585
586
case class InputEvent(userId: String, action: String, timestamp: Long)
587
case class UserSession(userId: String, startTime: Long, endTime: Long, actionCount: Int)
588
589
def updateUserSession(userId: String,
590
events: Iterator[InputEvent],
591
state: GroupState[UserSession]): UserSession = {
592
593
val currentSession = if (state.exists) state.get else UserSession(userId, Long.MaxValue, 0L, 0)
594
595
val newEvents = events.toSeq
596
val newActionCount = currentSession.actionCount + newEvents.size
597
val newStartTime = math.min(currentSession.startTime, newEvents.map(_.timestamp).min)
598
val newEndTime = math.max(currentSession.endTime, newEvents.map(_.timestamp).max)
599
600
val updatedSession = UserSession(userId, newStartTime, newEndTime, newActionCount)
601
602
// Update timeout
603
state.setTimeoutDuration("30 minutes")
604
state.update(updatedSession)
605
606
updatedSession
607
}
608
609
val sessionUpdates = inputStream
610
.as[InputEvent]
611
.groupByKey(_.userId)
612
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(updateUserSession)
613
```
614
615
## Monitoring and Debugging
616
617
### Query Progress
618
619
```scala
620
// Monitor query progress
621
val query = df.writeStream...
622
623
// Get progress information
624
val progress = query.lastProgress
625
println(s"Batch ID: ${progress.batchId}")
626
println(s"Input rows: ${progress.inputRowsPerSecond}")
627
println(s"Processing rate: ${progress.inputRowsPerSecond}")
628
println(s"Batch duration: ${progress.batchDuration}")
629
630
// Historical progress
631
query.recentProgress.foreach { progress =>
632
println(s"Batch ${progress.batchId}: ${progress.inputRowsPerSecond} rows/sec")
633
}
634
```
635
636
### Streaming Query Status
637
638
```scala
639
val status = query.status
640
println(s"Message: ${status.message}")
641
println(s"Is trigger active: ${status.isTriggerActive}")
642
println(s"Is data available: ${status.isDataAvailable}")
643
```
644
645
### Error Handling
646
647
```scala
648
// Query with error handling
649
val resilientQuery = df.writeStream
650
.foreachBatch { (batchDF, batchId) =>
651
try {
652
batchDF.write.mode("append").saveAsTable("output_table")
653
} catch {
654
case e: Exception =>
655
println(s"Failed to process batch $batchId: ${e.getMessage}")
656
// Log to error table or external system
657
val errorDF = spark.createDataFrame(Seq(
658
(batchId, e.getMessage, System.currentTimeMillis())
659
)).toDF("batch_id", "error_message", "timestamp")
660
errorDF.write.mode("append").saveAsTable("error_log")
661
}
662
}
663
.start()
664
```