0
# Structured Streaming
1
2
Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. It provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.
3
4
## Capabilities
5
6
### DataStreamReader
7
8
Interface for reading streaming data from various sources.
9
10
```scala { .api }
11
/**
12
* Interface used to load a streaming Dataset from external storage systems.
13
*/
14
class DataStreamReader {
15
// Format specification
16
def format(source: String): DataStreamReader
17
18
// Schema specification (recommended for production)
19
def schema(schema: StructType): DataStreamReader
20
def schema(schemaString: String): DataStreamReader
21
22
// Options configuration
23
def option(key: String, value: String): DataStreamReader
24
def option(key: String, value: Boolean): DataStreamReader
25
def option(key: String, value: Long): DataStreamReader
26
def option(key: String, value: Double): DataStreamReader
27
def options(options: Map[String, String]): DataStreamReader
28
29
// Built-in sources
30
def csv(path: String): DataFrame
31
def json(path: String): DataFrame
32
def parquet(path: String): DataFrame
33
def text(path: String): DataFrame
34
def textFile(path: String): Dataset[String]
35
36
// Generic load
37
def load(): DataFrame
38
def load(path: String): DataFrame
39
40
// Table sources
41
def table(tableName: String): DataFrame
42
}
43
```
44
45
**Usage Examples:**
46
47
```scala
48
import org.apache.spark.sql.SparkSession
49
import org.apache.spark.sql.types._
50
51
val spark = SparkSession.builder().appName("StructuredStreaming").getOrCreate()
52
53
// Define schema (recommended for production)
54
val schema = StructType(Array(
55
StructField("timestamp", TimestampType, true),
56
StructField("user_id", StringType, true),
57
StructField("action", StringType, true),
58
StructField("value", DoubleType, true)
59
))
60
61
// Read from file source
62
val fileStream = spark.readStream
63
.schema(schema)
64
.option("maxFilesPerTrigger", "1")
65
.json("path/to/streaming/json/files")
66
67
// Read from Kafka
68
val kafkaStream = spark.readStream
69
.format("kafka")
70
.option("kafka.bootstrap.servers", "localhost:9092")
71
.option("subscribe", "events")
72
.option("startingOffsets", "latest")
73
.load()
74
75
// Read from socket (for testing)
76
val socketStream = spark.readStream
77
.format("socket")
78
.option("host", "localhost")
79
.option("port", 9999)
80
.load()
81
82
// Read from rate source (for testing)
83
val rateStream = spark.readStream
84
.format("rate")
85
.option("rowsPerSecond", "10")
86
.option("numPartitions", "2")
87
.load()
88
```
89
90
### Stream Processing Operations
91
92
All DataFrame/Dataset operations are available on streaming DataFrames:
93
94
```scala
95
import org.apache.spark.sql.functions._
96
97
// Basic transformations
98
val processedStream = kafkaStream
99
.select(
100
col("key").cast("string"),
101
from_json(col("value").cast("string"), schema).as("data")
102
)
103
.select("data.*")
104
.withColumn("processing_time", current_timestamp())
105
.filter(col("value") > 0)
106
107
// Windowing operations
108
val windowedCounts = processedStream
109
.withWatermark("timestamp", "10 minutes")
110
.groupBy(
111
window(col("timestamp"), "5 minutes", "1 minute"),
112
col("user_id")
113
)
114
.agg(
115
count("*").as("event_count"),
116
sum("value").as("total_value"),
117
avg("value").as("avg_value")
118
)
119
120
// Stream-stream joins
121
val stream1 = spark.readStream.format("kafka").option("subscribe", "topic1").load()
122
val stream2 = spark.readStream.format("kafka").option("subscribe", "topic2").load()
123
124
val joinedStream = stream1.alias("s1")
125
.join(
126
stream2.alias("s2"),
127
expr("s1.user_id = s2.user_id AND s2.timestamp >= s1.timestamp AND s2.timestamp <= s1.timestamp + interval 1 hour"),
128
"inner"
129
)
130
131
// Stream-static joins
132
val staticDf = spark.read.parquet("path/to/static/data")
133
val enrichedStream = processedStream
134
.join(staticDf, "user_id")
135
```
136
137
### DataStreamWriter
138
139
Interface for writing streaming results to various sinks.
140
141
```scala { .api }
142
/**
143
* Interface used to write a streaming DataFrame to external storage systems.
144
*/
145
class DataStreamWriter[T] {
146
// Output format
147
def format(source: String): DataStreamWriter[T]
148
149
// Output mode
150
def outputMode(outputMode: String): DataStreamWriter[T] // "append", "complete", "update"
151
def outputMode(outputMode: OutputMode): DataStreamWriter[T]
152
153
// Processing options
154
def trigger(trigger: Trigger): DataStreamWriter[T]
155
def option(key: String, value: String): DataStreamWriter[T]
156
def option(key: String, value: Boolean): DataStreamWriter[T]
157
def option(key: String, value: Long): DataStreamWriter[T]
158
def option(key: String, value: Double): DataStreamWriter[T]
159
def options(options: Map[String, String]): DataStreamWriter[T]
160
161
// Checkpointing
162
def checkpointLocation(location: String): DataStreamWriter[T]
163
164
// Query naming
165
def queryName(queryName: String): DataStreamWriter[T]
166
167
// Partitioning (for file sinks)
168
def partitionBy(colNames: String*): DataStreamWriter[T]
169
170
// Start query
171
def start(): StreamingQuery
172
def start(path: String): StreamingQuery // For file-based sinks
173
174
// Built-in sinks
175
def toTable(tableName: String): StreamingQuery
176
}
177
178
// Trigger types
179
object Trigger {
180
def ProcessingTime(interval: String): Trigger
181
def ProcessingTime(interval: FiniteDuration): Trigger
182
def Once(): Trigger
183
def Continuous(interval: String): Trigger
184
def Continuous(interval: FiniteDuration): Trigger
185
def AvailableNow(): Trigger
186
}
187
```
188
189
**Usage Examples:**
190
191
```scala
192
import org.apache.spark.sql.streaming.Trigger
193
import scala.concurrent.duration._
194
195
// Console sink (for debugging)
196
val consoleQuery = processedStream.writeStream
197
.outputMode("append")
198
.format("console")
199
.option("truncate", "false")
200
.trigger(Trigger.ProcessingTime("10 seconds"))
201
.start()
202
203
// File sink
204
val fileQuery = processedStream.writeStream
205
.outputMode("append")
206
.format("parquet")
207
.option("path", "path/to/output")
208
.option("checkpointLocation", "path/to/checkpoint")
209
.partitionBy("date")
210
.trigger(Trigger.ProcessingTime("30 seconds"))
211
.start()
212
213
// Kafka sink
214
val kafkaSink = processedStream
215
.selectExpr("CAST(user_id AS STRING) AS key", "to_json(struct(*)) AS value")
216
.writeStream
217
.format("kafka")
218
.option("kafka.bootstrap.servers", "localhost:9092")
219
.option("topic", "output-topic")
220
.option("checkpointLocation", "path/to/checkpoint")
221
.outputMode("append")
222
.start()
223
224
// Delta Lake sink (if available)
225
val deltaQuery = processedStream.writeStream
226
.format("delta")
227
.outputMode("append")
228
.option("checkpointLocation", "path/to/checkpoint")
229
.toTable("events_table")
230
231
// Memory sink (for testing)
232
val memoryQuery = processedStream.writeStream
233
.outputMode("complete")
234
.format("memory")
235
.queryName("memory_table")
236
.start()
237
238
// Foreach sink (custom logic)
239
val foreachQuery = processedStream.writeStream
240
.foreach(new ForeachWriter[Row] {
241
def open(partitionId: Long, epochId: Long): Boolean = true
242
def process(record: Row): Unit = {
243
// Custom processing logic
244
println(s"Processing record: $record")
245
}
246
def close(errorOrNull: Throwable): Unit = {}
247
})
248
.start()
249
```
250
251
### StreamingQuery Management
252
253
Interface for monitoring and controlling streaming queries.
254
255
```scala { .api }
256
/**
257
* A handle to a query that is executing continuously in the background as new data arrives.
258
*/
259
abstract class StreamingQuery {
260
// Query identification
261
def name: String
262
def id: UUID
263
def runId: UUID
264
265
// Query status
266
def isActive: Boolean
267
def awaitTermination(): Unit
268
def awaitTermination(timeoutMs: Long): Boolean
269
def stop(): Unit
270
271
// Progress monitoring
272
def lastProgress: StreamingQueryProgress
273
def recentProgress: Array[StreamingQueryProgress]
274
def status: StreamingQueryStatus
275
276
// Exception handling
277
def exception: Option[StreamingQueryException]
278
279
// Spark SQL integration
280
def sparkSession: SparkSession
281
}
282
283
/**
284
* Information about progress made in the execution of a StreamingQuery.
285
*/
286
case class StreamingQueryProgress(
287
id: UUID,
288
runId: UUID,
289
name: String,
290
timestamp: String,
291
batchId: Long,
292
batchDuration: Long,
293
durationMs: java.util.Map[String, java.lang.Long],
294
eventTime: java.util.Map[String, String],
295
stateOperators: Array[StateOperatorProgress],
296
sources: Array[SourceProgress],
297
sink: SinkProgress
298
)
299
300
/**
301
* Manager for all StreamingQuery instances in a SparkSession.
302
*/
303
abstract class StreamingQueryManager {
304
def active: Array[StreamingQuery]
305
def get(id: UUID): StreamingQuery
306
def get(name: String): StreamingQuery
307
def awaitAnyTermination(): Unit
308
def awaitAnyTermination(timeoutMs: Long): Boolean
309
def resetTerminated(): Unit
310
}
311
```
312
313
**Usage Examples:**
314
315
```scala
316
// Start multiple queries
317
val query1 = stream1.writeStream.queryName("query1").format("console").start()
318
val query2 = stream2.writeStream.queryName("query2").format("memory").start()
319
320
// Monitor queries
321
spark.streams.active.foreach { query =>
322
println(s"Query ${query.name} is ${if (query.isActive) "active" else "inactive"}")
323
}
324
325
// Wait for specific query
326
query1.awaitTermination()
327
328
// Monitor progress
329
query1.lastProgress match {
330
case progress =>
331
println(s"Batch ${progress.batchId}: ${progress.durationMs.get("triggerExecution")}ms")
332
println(s"Input rate: ${progress.inputRowsPerSecond}")
333
println(s"Processing rate: ${progress.processedRowsPerSecond}")
334
}
335
336
// Handle exceptions
337
query1.exception match {
338
case Some(ex) =>
339
println(s"Query failed: ${ex.getMessage}")
340
println(s"Start offset: ${ex.startOffset}")
341
println(s"End offset: ${ex.endOffset}")
342
case None => println("Query running successfully")
343
}
344
345
// Stop all queries
346
spark.streams.active.foreach(_.stop())
347
```
348
349
### Window Operations and Event Time
350
351
Support for time-based operations with watermarking:
352
353
```scala
354
import org.apache.spark.sql.functions._
355
356
// Tumbling windows
357
val tumblingWindow = eventsStream
358
.withWatermark("eventTime", "2 minutes")
359
.groupBy(window(col("eventTime"), "10 minutes"))
360
.agg(count("*").as("count"))
361
362
// Sliding windows
363
val slidingWindow = eventsStream
364
.withWatermark("eventTime", "2 minutes")
365
.groupBy(window(col("eventTime"), "10 minutes", "5 minutes"))
366
.agg(count("*").as("count"))
367
368
// Complex aggregations
369
val sessionWindows = eventsStream
370
.withWatermark("eventTime", "5 minutes")
371
.groupBy(
372
col("userId"),
373
session_window(col("eventTime"), "30 minutes")
374
)
375
.agg(
376
count("*").as("eventCount"),
377
collect_list("action").as("actions"),
378
min("eventTime").as("sessionStart"),
379
max("eventTime").as("sessionEnd")
380
)
381
```
382
383
### Stateful Operations
384
385
Support for maintaining state across streaming batches:
386
387
```scala
388
// Streaming deduplication
389
val deduplicated = eventsStream
390
.withWatermark("eventTime", "1 hour")
391
.dropDuplicates("eventId", "eventTime")
392
393
// Arbitrary stateful operations
394
import org.apache.spark.sql.streaming.GroupState
395
import org.apache.spark.sql.streaming.GroupStateTimeout
396
397
case class UserSession(userId: String, sessionStart: Long, eventCount: Int)
398
399
def updateUserSession(userId: String, events: Iterator[Event], state: GroupState[UserSession]): UserSession = {
400
val currentSession = state.getOption.getOrElse(UserSession(userId, System.currentTimeMillis(), 0))
401
val newEventCount = currentSession.eventCount + events.size
402
val updatedSession = currentSession.copy(eventCount = newEventCount)
403
404
state.update(updatedSession)
405
state.setTimeoutDuration("1 hour")
406
407
updatedSession
408
}
409
410
val userSessions = eventsStream
411
.withWatermark("eventTime", "10 minutes")
412
.as[Event]
413
.groupByKey(_.userId)
414
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(updateUserSession)
415
```
416
417
## Performance and Monitoring
418
419
### Optimization Strategies
420
421
1. **Schema Definition**: Always define schemas for structured streaming sources
422
2. **Watermarking**: Use watermarks for event-time processing and state cleanup
423
3. **Checkpointing**: Configure appropriate checkpoint locations for fault tolerance
424
4. **Trigger Configuration**: Choose appropriate trigger intervals based on latency requirements
425
5. **Resource Allocation**: Allocate sufficient resources for streaming workloads
426
427
### Monitoring and Debugging
428
429
```scala
430
// Streaming metrics
431
val query = eventsStream.writeStream
432
.option("checkpointLocation", checkpointPath)
433
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
434
println(s"Processing batch $batchId with ${batchDF.count()} records")
435
batchDF.show()
436
}
437
.start()
438
439
// Query monitoring in Spark UI
440
// - Streaming tab shows active queries and progress
441
// - SQL tab shows streaming SQL queries
442
// - Jobs tab shows streaming job execution
443
444
// Programmatic monitoring
445
def monitorQuery(query: StreamingQuery): Unit = {
446
while (query.isActive) {
447
val progress = query.lastProgress
448
if (progress != null) {
449
println(s"Batch ${progress.batchId}: processed ${progress.numInputRows} rows in ${progress.batchDuration}ms")
450
}
451
Thread.sleep(10000)
452
}
453
}
454
```
455
456
Structured Streaming provides a powerful, fault-tolerant stream processing engine that unifies batch and streaming semantics under a single programming model.