0
# Streaming Operations
1
2
Real-time data processing with structured streaming. Provides unified batch and streaming APIs with exactly-once processing guarantees, fault tolerance, and advanced streaming analytics capabilities.
3
4
## Capabilities
5
6
### DataStreamReader
7
8
Interface for reading streaming data from various sources.
9
10
```scala { .api }
11
/**
12
* Interface for reading streaming data from various sources
13
*/
14
class DataStreamReader {
15
/** Specify streaming data source format */
16
def format(source: String): DataStreamReader
17
18
/** Set schema for the streaming data */
19
def schema(schema: StructType): DataStreamReader
20
def schema(schemaString: String): DataStreamReader
21
22
/** Set options for the streaming source */
23
def option(key: String, value: String): DataStreamReader
24
def option(key: String, value: Boolean): DataStreamReader
25
def option(key: String, value: Long): DataStreamReader
26
def option(key: String, value: Double): DataStreamReader
27
def options(options: scala.collection.Map[String, String]): DataStreamReader
28
def options(options: java.util.Map[String, String]): DataStreamReader
29
30
/** Load streaming data using generic interface */
31
def load(): DataFrame
32
def load(path: String): DataFrame
33
34
/** Built-in streaming sources */
35
def text(path: String): DataFrame
36
def textFile(path: String): Dataset[String]
37
def csv(path: String): DataFrame
38
def json(path: String): DataFrame
39
def parquet(path: String): DataFrame
40
def orc(path: String): DataFrame
41
42
/** Kafka streaming source */
43
def kafka(): DataFrame
44
45
/** Socket streaming source (for testing) */
46
def socket(host: String, port: Int): DataFrame
47
def socket(host: String, port: Int, includeTimestamp: Boolean): DataFrame
48
49
/** Rate streaming source (for testing) */
50
def rate(rowsPerSecond: Long): DataFrame
51
def rate(rowsPerSecond: Long, numPartitions: Int): DataFrame
52
}
53
```
54
55
**Usage Examples:**
56
57
```scala
58
// File-based streaming
59
val streamingDf = spark.readStream
60
.schema(schema)
61
.option("maxFilesPerTrigger", "10")
62
.json("path/to/streaming/json/files")
63
64
// Kafka streaming
65
val kafkaStream = spark.readStream
66
.format("kafka")
67
.option("kafka.bootstrap.servers", "localhost:9092")
68
.option("subscribe", "topic1,topic2")
69
.option("startingOffsets", "latest")
70
.load()
71
72
// Socket streaming (for testing)
73
val socketStream = spark.readStream
74
.format("socket")
75
.option("host", "localhost")
76
.option("port", 9999)
77
.load()
78
79
// Rate source (for testing)
80
val rateStream = spark.readStream
81
.format("rate")
82
.option("rowsPerSecond", 100)
83
.option("numPartitions", 4)
84
.load()
85
```
86
87
### DataStreamWriter
88
89
Interface for writing streaming Dataset to various sinks.
90
91
```scala { .api }
92
/**
93
* Interface for writing streaming Dataset to various sinks
94
* @tparam T Type of the streaming Dataset
95
*/
96
class DataStreamWriter[T] {
97
/** Specify output format */
98
def format(source: String): DataStreamWriter[T]
99
100
/** Set output mode */
101
def outputMode(outputMode: OutputMode): DataStreamWriter[T]
102
def outputMode(outputMode: String): DataStreamWriter[T]
103
104
/** Set trigger for micro-batch processing */
105
def trigger(trigger: Trigger): DataStreamWriter[T]
106
107
/** Set options for the streaming sink */
108
def option(key: String, value: String): DataStreamWriter[T]
109
def option(key: String, value: Boolean): DataStreamWriter[T]
110
def option(key: String, value: Long): DataStreamWriter[T]
111
def option(key: String, value: Double): DataStreamWriter[T]
112
def options(options: scala.collection.Map[String, String]): DataStreamWriter[T]
113
def options(options: java.util.Map[String, String]): DataStreamWriter[T]
114
115
/** Partition output by columns */
116
def partitionBy(colNames: String*): DataStreamWriter[T]
117
118
/** Query name for identification */
119
def queryName(queryName: String): DataStreamWriter[T]
120
121
/** Start streaming query */
122
def start(): StreamingQuery
123
def start(path: String): StreamingQuery
124
125
/** Built-in streaming sinks */
126
def console(): DataStreamWriter[T]
127
def console(numRows: Int): DataStreamWriter[T]
128
def console(numRows: Int, truncate: Boolean): DataStreamWriter[T]
129
130
/** File-based sinks */
131
def json(path: String): StreamingQuery
132
def parquet(path: String): StreamingQuery
133
def orc(path: String): StreamingQuery
134
def text(path: String): StreamingQuery
135
def csv(path: String): StreamingQuery
136
137
/** Custom row-by-row processing */
138
def foreach(writer: ForeachWriter[T]): DataStreamWriter[T]
139
def foreachBatch(function: (Dataset[T], Long) => Unit): DataStreamWriter[T]
140
141
/** Memory sink (for testing) */
142
def memory(queryName: String): DataStreamWriter[T]
143
}
144
```
145
146
**Usage Examples:**
147
148
```scala
149
// Console output
150
val query1 = streamingDf.writeStream
151
.outputMode("append")
152
.format("console")
153
.option("numRows", 20)
154
.option("truncate", false)
155
.start()
156
157
// File output with partitioning
158
val query2 = streamingDf.writeStream
159
.format("parquet")
160
.outputMode("append")
161
.option("path", "output/streaming")
162
.option("checkpointLocation", "checkpoints/streaming")
163
.partitionBy("year", "month")
164
.trigger(Trigger.ProcessingTime("30 seconds"))
165
.start()
166
167
// Kafka output
168
val query3 = processedStream
169
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
170
.writeStream
171
.format("kafka")
172
.option("kafka.bootstrap.servers", "localhost:9092")
173
.option("topic", "output-topic")
174
.option("checkpointLocation", "checkpoints/kafka-out")
175
.outputMode("append")
176
.start()
177
178
// Custom processing with foreachBatch
179
val query4 = streamingDf.writeStream
180
.foreachBatch { (batchDf: DataFrame, batchId: Long) =>
181
println(s"Processing batch $batchId with ${batchDf.count()} records")
182
batchDf.write
183
.mode(SaveMode.Append)
184
.saveAsTable("streaming_results")
185
}
186
.trigger(Trigger.ProcessingTime("1 minute"))
187
.start()
188
```
189
190
### Output Modes
191
192
Different output modes for streaming queries.
193
194
```scala { .api }
195
/**
196
* Output modes for streaming queries
197
*/
198
object OutputMode {
199
/** Only append new rows to the result table */
200
val Append: OutputMode = "append"
201
202
/** Output complete result table for each trigger */
203
val Complete: OutputMode = "complete"
204
205
/** Output only updated rows since last trigger */
206
val Update: OutputMode = "update"
207
}
208
```
209
210
### Triggers
211
212
Control timing and execution of streaming micro-batches.
213
214
```scala { .api }
215
/**
216
* Triggers for controlling streaming execution
217
*/
218
sealed trait Trigger
219
220
object Trigger {
221
/** Process data as fast as possible */
222
def ProcessingTime(interval: String): Trigger
223
def ProcessingTime(interval: Long, unit: TimeUnit): Trigger
224
225
/** Process data once and stop */
226
def Once(): Trigger
227
228
/** Trigger based on availability of data */
229
def AvailableNow(): Trigger
230
231
/** Continuous processing (experimental) */
232
def Continuous(interval: String): Trigger
233
def Continuous(interval: Long, unit: TimeUnit): Trigger
234
}
235
```
236
237
**Usage Examples:**
238
239
```scala
240
import java.util.concurrent.TimeUnit
241
242
// Process every 30 seconds
243
val trigger1 = Trigger.ProcessingTime("30 seconds")
244
val trigger2 = Trigger.ProcessingTime(30, TimeUnit.SECONDS)
245
246
// Process once and terminate
247
val trigger3 = Trigger.Once()
248
249
// Process all available data and terminate
250
val trigger4 = Trigger.AvailableNow()
251
252
// Continuous processing (low-latency)
253
val trigger5 = Trigger.Continuous("1 second")
254
```
255
256
### StreamingQuery
257
258
Handle to a running streaming query with control and monitoring capabilities.
259
260
```scala { .api }
261
/**
262
* Handle to a running streaming query
263
*/
264
trait StreamingQuery {
265
/** Unique identifier for the query */
266
def id: UUID
267
268
/** Name of the query */
269
def name: String
270
271
/** Check if query is currently active */
272
def isActive: Boolean
273
274
/** Block until query terminates */
275
def awaitTermination(): Unit
276
def awaitTermination(timeoutMs: Long): Boolean
277
278
/** Stop the query */
279
def stop(): Unit
280
281
/** Get the most recent progress update */
282
def lastProgress: StreamingQueryProgress
283
284
/** Get recent progress updates */
285
def recentProgress: Array[StreamingQueryProgress]
286
287
/** Get current status */
288
def status: StreamingQueryStatus
289
290
/** Get exception that caused query to stop (if any) */
291
def exception: Option[StreamingQueryException]
292
293
/** Explain the query plan */
294
def explain(): Unit
295
def explain(extended: Boolean): Unit
296
}
297
```
298
299
### StreamingQueryManager
300
301
Manager for all StreamingQueries in a SparkSession.
302
303
```scala { .api }
304
/**
305
* Manager for all StreamingQueries in a SparkSession
306
*/
307
class StreamingQueryManager {
308
/** Get currently active streaming queries */
309
def active: Array[StreamingQuery]
310
311
/** Get a query by id */
312
def get(id: UUID): StreamingQuery
313
def get(id: String): StreamingQuery
314
315
/** Block until all queries terminate */
316
def awaitAnyTermination(): Unit
317
def awaitAnyTermination(timeoutMs: Long): Boolean
318
319
/** Reset terminates all active queries */
320
def resetTerminated(): Unit
321
322
/** Add listener for streaming query events */
323
def addListener(listener: StreamingQueryListener): Unit
324
def removeListener(listener: StreamingQueryListener): Unit
325
}
326
```
327
328
### ForeachWriter
329
330
Custom sink for row-by-row processing.
331
332
```scala { .api }
333
/**
334
* Abstract class for custom streaming sinks
335
* @tparam T Type of rows to process
336
*/
337
abstract class ForeachWriter[T] extends Serializable {
338
/** Called when starting to process a partition */
339
def open(partitionId: Long, epochId: Long): Boolean
340
341
/** Called for each row */
342
def process(value: T): Unit
343
344
/** Called when finishing processing a partition */
345
def close(errorOrNull: Throwable): Unit
346
}
347
```
348
349
**Usage Example:**
350
351
```scala
352
import org.apache.spark.sql.ForeachWriter
353
354
val customWriter = new ForeachWriter[Row] {
355
def open(partitionId: Long, epochId: Long): Boolean = {
356
// Initialize resources (e.g., database connection)
357
println(s"Opening partition $partitionId for epoch $epochId")
358
true
359
}
360
361
def process(value: Row): Unit = {
362
// Process each row
363
val id = value.getAs[Long]("id")
364
val name = value.getAs[String]("name")
365
println(s"Processing record: $id, $name")
366
// Write to external system
367
}
368
369
def close(errorOrNull: Throwable): Unit = {
370
// Clean up resources
371
if (errorOrNull != null) {
372
println(s"Error occurred: ${errorOrNull.getMessage}")
373
}
374
println("Closing partition")
375
}
376
}
377
378
val query = streamingDf.writeStream
379
.foreach(customWriter)
380
.start()
381
```
382
383
### Streaming Aggregations
384
385
Advanced aggregation operations in streaming context.
386
387
**Time-based aggregations:**
388
389
```scala
390
import org.apache.spark.sql.functions._
391
392
// Window-based aggregations
393
val windowedCounts = streamingDf
394
.withWatermark("timestamp", "10 minutes")
395
.groupBy(
396
window(col("timestamp"), "5 minutes", "1 minute"),
397
col("category")
398
)
399
.count()
400
401
// Tumbling window
402
val tumblingWindow = streamingDf
403
.withWatermark("timestamp", "10 minutes")
404
.groupBy(window(col("timestamp"), "10 minutes"))
405
.agg(sum("amount").alias("total_amount"))
406
407
// Session window (event-time sessions)
408
val sessionWindow = streamingDf
409
.withWatermark("timestamp", "10 minutes")
410
.groupBy(
411
col("userId"),
412
session_window(col("timestamp"), "5 minutes")
413
)
414
.agg(count("*").alias("events_in_session"))
415
```
416
417
**Stateful operations:**
418
419
```scala
420
// Global aggregations (require complete output mode)
421
val globalCounts = streamingDf
422
.groupBy("category")
423
.count()
424
425
// With watermarking for late data handling
426
val withWatermark = streamingDf
427
.withWatermark("eventTime", "1 hour")
428
.groupBy("userId", window(col("eventTime"), "30 minutes"))
429
.agg(sum("amount").alias("total"))
430
```
431
432
### Common Streaming Patterns
433
434
**Exactly-once processing:**
435
436
```scala
437
val query = streamingDf
438
.writeStream
439
.outputMode("append")
440
.option("checkpointLocation", "s3://bucket/checkpoints/query1")
441
.format("delta") // or other ACID-compliant sink
442
.start("s3://bucket/output/table")
443
```
444
445
**Error handling and monitoring:**
446
447
```scala
448
val query = streamingDf.writeStream
449
.foreachBatch { (batchDf: DataFrame, batchId: Long) =>
450
try {
451
batchDf.write
452
.mode(SaveMode.Append)
453
.saveAsTable("results")
454
} catch {
455
case e: Exception =>
456
println(s"Error processing batch $batchId: ${e.getMessage}")
457
// Log to monitoring system
458
throw e // Re-throw to trigger query restart
459
}
460
}
461
.trigger(Trigger.ProcessingTime("30 seconds"))
462
.start()
463
464
// Monitor query progress
465
val progressInfo = query.lastProgress
466
println(s"Input rows/sec: ${progressInfo.inputRowsPerSecond}")
467
println(s"Processing time: ${progressInfo.durationMs}")
468
```
469
470
**Stream-stream joins:**
471
472
```scala
473
val stream1 = spark.readStream.format("kafka")...
474
val stream2 = spark.readStream.format("kafka")...
475
476
val joinedStream = stream1.alias("s1")
477
.join(stream2.alias("s2"),
478
expr("s1.key = s2.key AND s1.timestamp >= s2.timestamp - interval 1 hour AND s1.timestamp <= s2.timestamp + interval 1 hour"),
479
"inner"
480
)
481
```
482
483
**Stream-static joins:**
484
485
```scala
486
val staticDf = spark.read.table("reference_data")
487
val streamingDf = spark.readStream...
488
489
val enrichedStream = streamingDf
490
.join(staticDf, "key") // Broadcast join with static data
491
```