0
# Apache Spark SQL - Streaming
1
2
## Capabilities
3
4
### Structured Streaming Query Execution
5
- Execute continuous data processing with fault-tolerant, exactly-once semantics using micro-batch processing
6
- Handle late-arriving data with configurable watermarking and event-time processing
7
- Support for stateful operations including aggregations, joins, and custom state management
8
- Process unbounded streams with automatic checkpointing and recovery mechanisms
9
10
### Stream Data Sources and Sinks
11
- Read from various streaming sources including Kafka, files, sockets, and rate sources for testing
12
- Write to multiple sink types including files, Kafka, console, memory, and foreach sinks
13
- Support for different output modes including append, complete, and update for different use cases
14
- Handle schema evolution and format changes in streaming data pipelines
15
16
### Trigger Management and Processing Control
17
- Configure processing triggers including fixed intervals, once triggers, continuous processing, and available now
18
- Control micro-batch sizing and processing intervals for throughput and latency optimization
19
- Support for event-time processing with watermarks for handling out-of-order data
20
- Enable backpressure handling and dynamic batch sizing based on cluster capacity
21
22
### Stateful Stream Processing
23
- Maintain state across micro-batches for complex event processing and session analytics
24
- Support for arbitrary stateful processing using mapGroupsWithState and flatMapGroupsWithState
25
- Handle state expiration and cleanup with configurable timeout policies
26
- Enable stateful stream-stream joins with configurable state retention policies
27
28
## API Reference
29
30
### DataStreamReader Class
31
```scala { .api }
32
class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging {
33
// Format specification
34
def format(source: String): DataStreamReader
35
36
// Schema definition
37
def schema(schema: StructType): DataStreamReader
38
def schema(schemaString: String): DataStreamReader
39
40
// Options configuration
41
def option(key: String, value: String): DataStreamReader
42
def option(key: String, value: Boolean): DataStreamReader
43
def option(key: String, value: Long): DataStreamReader
44
def option(key: String, value: Double): DataStreamReader
45
def options(options: scala.collection.Map[String, String]): DataStreamReader
46
def options(options: java.util.Map[String, String]): DataStreamReader
47
48
// Load operations
49
def load(): DataFrame
50
def load(path: String): DataFrame
51
52
// Format-specific loaders
53
def json(path: String): DataFrame
54
def csv(path: String): DataFrame
55
def parquet(path: String): DataFrame
56
def orc(path: String): DataFrame
57
def text(path: String): DataFrame
58
def textFile(path: String): Dataset[String]
59
60
// Kafka loader
61
def kafka(): DataFrame
62
63
// Socket and rate sources
64
def socket(host: String, port: Int): DataFrame
65
def socket(host: String, port: Int, includeTimestamp: Boolean): DataFrame
66
def rate(rowsPerSecond: Long): DataFrame
67
def rate(rowsPerSecond: Long, rampUpTime: Long): DataFrame
68
def rate(rowsPerSecond: Long, rampUpTime: Long, numPartitions: Int): DataFrame
69
}
70
```
71
72
### DataStreamWriter[T] Class
73
```scala { .api }
74
class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
75
// Format specification
76
def format(source: String): DataStreamWriter[T]
77
78
// Output mode configuration
79
def outputMode(outputMode: OutputMode): DataStreamWriter[T]
80
def outputMode(outputMode: String): DataStreamWriter[T]
81
82
// Trigger configuration
83
def trigger(trigger: Trigger): DataStreamWriter[T]
84
85
// Options configuration
86
def option(key: String, value: String): DataStreamWriter[T]
87
def option(key: String, value: Boolean): DataStreamWriter[T]
88
def option(key: String, value: Long): DataStreamWriter[T]
89
def option(key: String, value: Double): DataStreamWriter[T]
90
def options(options: scala.collection.Map[String, String]): DataStreamWriter[T]
91
def options(options: java.util.Map[String, String]): DataStreamWriter[T]
92
93
// Partitioning and ordering
94
def partitionBy(colNames: String*): DataStreamWriter[T]
95
def partitionBy(cols: Seq[String]): DataStreamWriter[T]
96
97
// Query naming and checkpointing
98
def queryName(queryName: String): DataStreamWriter[T]
99
def queryTimeout(timeoutMs: Long): DataStreamWriter[T]
100
101
// Start operations
102
def start(): StreamingQuery
103
def start(path: String): StreamingQuery
104
105
// Format-specific writers
106
def json(path: String): StreamingQuery
107
def csv(path: String): StreamingQuery
108
def parquet(path: String): StreamingQuery
109
def orc(path: String): StreamingQuery
110
def text(path: String): StreamingQuery
111
112
// Special sinks
113
def console(): StreamingQuery
114
def console(numRows: Int): StreamingQuery
115
def console(numRows: Int, truncate: Boolean): StreamingQuery
116
def memory(queryName: String): StreamingQuery
117
def kafka(): StreamingQuery
118
119
// Custom sink
120
def foreach(writer: ForeachWriter[T]): StreamingQuery
121
def foreachBatch(function: (Dataset[T], Long) => Unit): StreamingQuery
122
def foreachBatch(function: VoidFunction2[Dataset[T], java.lang.Long]): StreamingQuery
123
}
124
```
125
126
### StreamingQuery Interface
127
```scala { .api }
128
trait StreamingQuery {
129
// Query identification
130
def id: UUID
131
def runId: UUID
132
def name: String
133
134
// Query control
135
def start(): StreamingQuery
136
def stop(): Unit
137
def stop(stopGracefully: Boolean): Unit
138
def processAllAvailable(): Unit
139
140
// Query state
141
def isActive: Boolean
142
def awaitTermination(): Unit
143
def awaitTermination(timeoutMs: Long): Boolean
144
def exception: Option[StreamingQueryException]
145
146
// Progress monitoring
147
def lastProgress: StreamingQueryProgress
148
def recentProgress: Array[StreamingQueryProgress]
149
def status: StreamingQueryStatus
150
151
// Explain plans
152
def explain(): Unit
153
def explain(extended: Boolean): Unit
154
}
155
```
156
157
### StreamingQueryManager Class
158
```scala { .api }
159
abstract class StreamingQueryManager {
160
// Active queries management
161
def active: Array[StreamingQuery]
162
def get(id: UUID): StreamingQuery
163
def get(name: String): StreamingQuery
164
165
// Termination handling
166
def awaitAnyTermination(): Unit
167
def awaitAnyTermination(timeoutMs: Long): Boolean
168
def resetTerminated(): Unit
169
170
// Listeners
171
def addListener(listener: StreamingQueryListener): Unit
172
def removeListener(listener: StreamingQueryListener): Unit
173
}
174
```
175
176
### Trigger Types
177
```scala { .api }
178
// Base trigger trait
179
sealed trait Trigger
180
181
// Processing time trigger
182
case class ProcessingTimeTrigger(interval: Long) extends Trigger
183
184
object Trigger {
185
// Factory methods
186
def ProcessingTime(interval: String): Trigger
187
def ProcessingTime(interval: Long, unit: TimeUnit): Trigger
188
def ProcessingTime(interval: Duration): Trigger
189
def Once(): Trigger
190
def Continuous(interval: String): Trigger
191
def Continuous(interval: Long, unit: TimeUnit): Trigger
192
def Continuous(interval: Duration): Trigger
193
def AvailableNow(): Trigger
194
}
195
196
// Specific trigger implementations
197
case object OnceTrigger extends Trigger
198
case class ContinuousTrigger(interval: Long) extends Trigger
199
case object AvailableNowTrigger extends Trigger
200
```
201
202
### Output Modes
203
```scala { .api }
204
sealed trait OutputMode
205
206
object OutputMode {
207
case object Append extends OutputMode
208
case object Complete extends OutputMode
209
case object Update extends OutputMode
210
211
def apply(outputMode: String): OutputMode
212
}
213
```
214
215
### State Management
216
```scala { .api }
217
// Group state for stateful operations
218
abstract class GroupState[S] extends Serializable {
219
// State access
220
def exists: Boolean
221
def get: S
222
def getOption: Option[S]
223
def update(newState: S): Unit
224
def remove(): Unit
225
226
// Timeout management
227
def setTimeoutDuration(durationMs: Long): Unit
228
def setTimeoutDuration(duration: String): Unit
229
def setTimeoutTimestamp(timestampMs: Long): Unit
230
def setTimeoutTimestamp(timestamp: Date): Unit
231
def getCurrentWatermarkMs(): Long
232
def getCurrentProcessingTimeMs(): Long
233
def hasTimedOut: Boolean
234
}
235
236
// State timeout configuration
237
sealed trait GroupStateTimeout
238
case object NoTimeout extends GroupStateTimeout
239
case object ProcessingTimeTimeout extends GroupStateTimeout
240
case object EventTimeTimeout extends GroupStateTimeout
241
```
242
243
### Streaming Query Progress and Status
244
```scala { .api }
245
// Query progress information
246
class StreamingQueryProgress private[sql] (
247
val id: UUID,
248
val runId: UUID,
249
val name: String,
250
val timestamp: String,
251
val batchId: Long,
252
val batchDuration: Long,
253
val durationMs: Map[String, Long],
254
val eventTime: Map[String, String],
255
val stateOperators: Array[StateOperatorProgress],
256
val sources: Array[SourceProgress],
257
val sink: SinkProgress,
258
val observedMetrics: Map[String, Row]) extends Serializable {
259
260
def inputRowsPerSecond: Double
261
def processedRowsPerSecond: Double
262
def prettyJson: String
263
def json: String
264
}
265
266
// Query status information
267
class StreamingQueryStatus private[sql] (
268
val message: String,
269
val isDataAvailable: Boolean,
270
val isTriggerActive: Boolean) extends Serializable
271
272
// State operator progress
273
class StateOperatorProgress private[sql] (
274
val operatorName: String,
275
val numRowsTotal: Long,
276
val numRowsUpdated: Long,
277
val memoryUsedBytes: Long,
278
val customMetrics: Map[String, Long] = Map.empty) extends Serializable
279
280
// Source progress
281
class SourceProgress private[sql] (
282
val description: String,
283
val startOffset: String,
284
val endOffset: String,
285
val numInputRows: Long,
286
val inputRowsPerSecond: Double,
287
val processedRowsPerSecond: Double,
288
val metrics: Map[String, String] = Map.empty) extends Serializable
289
290
// Sink progress
291
class SinkProgress private[sql] (
292
val description: String,
293
val numOutputRows: Long,
294
val metrics: Map[String, String] = Map.empty) extends Serializable
295
```
296
297
### Custom Sinks
298
```scala { .api }
299
// ForeachWriter for custom output
300
abstract class ForeachWriter[T] extends Serializable {
301
// Lifecycle methods
302
def open(partitionId: Long, epochId: Long): Boolean
303
def process(value: T): Unit
304
def close(errorOrNull: Throwable): Unit
305
}
306
307
// Simplified batch writer
308
abstract class ForeachBatchSink[T] extends Serializable {
309
def process(batchDF: Dataset[T], batchId: Long): Unit
310
}
311
```
312
313
### Watermark Operations
314
```scala { .api }
315
// Watermark functions (in Dataset class)
316
class Dataset[T] {
317
def withWatermark(eventTime: String, delayThreshold: String): Dataset[T]
318
}
319
320
// Watermark utilities
321
object Functions {
322
def window(timeColumn: Column, windowDuration: String): Column
323
def window(timeColumn: Column, windowDuration: String, slideDuration: String): Column
324
def window(timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String): Column
325
def session_window(timeColumn: Column, gapDuration: String): Column
326
}
327
```
328
329
## Usage Examples
330
331
### Basic Streaming Setup
332
```scala
333
import org.apache.spark.sql.{SparkSession, Dataset, DataFrame}
334
import org.apache.spark.sql.streaming._
335
import org.apache.spark.sql.functions._
336
import org.apache.spark.sql.types._
337
338
val spark = SparkSession.builder()
339
.appName("Streaming Example")
340
.config("spark.sql.streaming.checkpointLocation", "/path/to/checkpoint")
341
.getOrCreate()
342
343
// Define schema for streaming data
344
val userActivitySchema = StructType(Array(
345
StructField("user_id", StringType, nullable = false),
346
StructField("action", StringType, nullable = false),
347
StructField("timestamp", TimestampType, nullable = false),
348
StructField("session_id", StringType, nullable = false),
349
StructField("page_url", StringType, nullable = true),
350
StructField("duration", IntegerType, nullable = true)
351
))
352
353
// Read from JSON files
354
val inputStream = spark.readStream
355
.format("json")
356
.schema(userActivitySchema)
357
.option("maxFilesPerTrigger", "1")
358
.load("/path/to/streaming/data")
359
360
// Basic transformations
361
val processedStream = inputStream
362
.filter($"action" =!= "heartbeat")
363
.withColumn("hour", hour($"timestamp"))
364
.withColumn("date", to_date($"timestamp"))
365
```
366
367
### File-based Streaming Sources
368
```scala
369
// CSV streaming
370
val csvStream = spark.readStream
371
.format("csv")
372
.option("header", "true")
373
.option("inferSchema", "false")
374
.schema(userActivitySchema)
375
.load("/path/to/csv/files")
376
377
// Parquet streaming
378
val parquetStream = spark.readStream
379
.format("parquet")
380
.schema(userActivitySchema)
381
.load("/path/to/parquet/files")
382
383
// Text file streaming
384
val textStream = spark.readStream
385
.format("text")
386
.load("/path/to/text/files")
387
.select(
388
regexp_extract($"value", """(\w+)\s+(.+)""", 1).as("level"),
389
regexp_extract($"value", """(\w+)\s+(.+)""", 2).as("message"),
390
current_timestamp().as("processed_time")
391
)
392
```
393
394
### Kafka Integration
395
```scala
396
// Read from Kafka
397
val kafkaStream = spark.readStream
398
.format("kafka")
399
.option("kafka.bootstrap.servers", "localhost:9092")
400
.option("subscribe", "user-activity,page-views")
401
.option("startingOffsets", "latest") // or "earliest"
402
.option("failOnDataLoss", "false")
403
.load()
404
405
// Parse Kafka messages
406
val parsedKafkaStream = kafkaStream.select(
407
$"topic",
408
$"partition",
409
$"offset",
410
$"timestamp",
411
$"key".cast(StringType).as("message_key"),
412
from_json($"value".cast(StringType), userActivitySchema).as("data")
413
).select($"topic", $"partition", $"offset", $"timestamp", $"message_key", $"data.*")
414
415
// Write to Kafka
416
val kafkaOutput = processedStream.writeStream
417
.format("kafka")
418
.option("kafka.bootstrap.servers", "localhost:9092")
419
.option("topic", "processed-activity")
420
.option("checkpointLocation", "/path/to/kafka/checkpoint")
421
.trigger(Trigger.ProcessingTime("30 seconds"))
422
.start()
423
```
424
425
### Aggregations and Windowing
426
```scala
427
// Simple aggregations
428
val userCounts = inputStream
429
.groupBy($"user_id")
430
.count()
431
432
// Time-based windowing
433
val windowedCounts = inputStream
434
.withWatermark("timestamp", "10 minutes")
435
.groupBy(
436
window($"timestamp", "5 minutes", "1 minute"),
437
$"action"
438
)
439
.agg(
440
count("*").as("action_count"),
441
countDistinct("user_id").as("unique_users"),
442
avg("duration").as("avg_duration")
443
)
444
445
// Session windows
446
val sessionAnalysis = inputStream
447
.withWatermark("timestamp", "30 minutes")
448
.groupBy(
449
$"user_id",
450
session_window($"timestamp", "10 minutes")
451
)
452
.agg(
453
count("*").as("actions_in_session"),
454
sum("duration").as("total_session_time"),
455
collect_list("action").as("session_actions")
456
)
457
458
// Complex aggregations with multiple time windows
459
val multiWindowAnalysis = inputStream
460
.withWatermark("timestamp", "1 hour")
461
.groupBy(
462
$"user_id",
463
window($"timestamp", "1 hour"),
464
$"action"
465
)
466
.agg(
467
count("*").as("hourly_count"),
468
first("timestamp").as("first_action_time"),
469
last("timestamp").as("last_action_time")
470
)
471
.groupBy($"user_id", $"window")
472
.agg(
473
sum("hourly_count").as("total_actions"),
474
collect_map("action", "hourly_count").as("action_breakdown")
475
)
476
```
477
478
### Stateful Processing
479
```scala
480
case class UserSession(
481
userId: String,
482
startTime: Long,
483
lastActivity: Long,
484
actionCount: Int,
485
totalDuration: Int
486
)
487
488
// Stateful session tracking
489
val sessionTracking = inputStream
490
.groupByKey(_.getString(0)) // Group by user_id
491
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout) {
492
(userId: String, values: Iterator[Row], state: GroupState[UserSession]) =>
493
494
val events = values.toSeq
495
val now = System.currentTimeMillis()
496
497
// Get or create session state
498
val session = if (state.exists) {
499
state.get
500
} else {
501
UserSession(userId, now, now, 0, 0)
502
}
503
504
// Update session with new events
505
val updatedSession = events.foldLeft(session) { (s, event) =>
506
val eventTime = event.getAs[java.sql.Timestamp]("timestamp").getTime
507
val duration = Option(event.getAs[Integer]("duration")).map(_.toInt).getOrElse(0)
508
509
s.copy(
510
lastActivity = math.max(s.lastActivity, eventTime),
511
actionCount = s.actionCount + 1,
512
totalDuration = s.totalDuration + duration
513
)
514
}
515
516
// Set timeout for 30 minutes of inactivity
517
state.setTimeoutDuration("30 minutes")
518
519
// Check for session timeout
520
if (state.hasTimedOut) {
521
state.remove()
522
("session_expired", updatedSession)
523
} else {
524
state.update(updatedSession)
525
("session_active", updatedSession)
526
}
527
}
528
```
529
530
### Stream-Stream Joins
531
```scala
532
// Define two streams
533
val clickStream = spark.readStream
534
.format("kafka")
535
.option("kafka.bootstrap.servers", "localhost:9092")
536
.option("subscribe", "clicks")
537
.load()
538
.select(
539
from_json($"value".cast("string"), clickSchema).as("data")
540
)
541
.select($"data.*")
542
.withWatermark("timestamp", "2 hours")
543
544
val impressionStream = spark.readStream
545
.format("kafka")
546
.option("kafka.bootstrap.servers", "localhost:9092")
547
.option("subscribe", "impressions")
548
.load()
549
.select(
550
from_json($"value".cast("string"), impressionSchema).as("data")
551
)
552
.select($"data.*")
553
.withWatermark("timestamp", "3 hours")
554
555
// Join streams with time constraints
556
val joinedStream = impressionStream.join(
557
clickStream,
558
expr("""
559
impression_id = click_impression_id AND
560
click_timestamp >= impression_timestamp AND
561
click_timestamp <= impression_timestamp + interval 1 hour
562
"""),
563
joinType = "leftOuter"
564
)
565
566
// Aggregate joined data
567
val conversionAnalysis = joinedStream
568
.withWatermark("impression_timestamp", "1 hour")
569
.groupBy(
570
window($"impression_timestamp", "10 minutes"),
571
$"campaign_id"
572
)
573
.agg(
574
count("impression_id").as("impressions"),
575
count("click_impression_id").as("clicks"),
576
(count("click_impression_id") * 100.0 / count("impression_id")).as("ctr")
577
)
578
```
579
580
### Output Modes and Triggers
581
```scala
582
// Append mode - only new rows
583
val appendQuery = processedStream.writeStream
584
.outputMode(OutputMode.Append)
585
.format("parquet")
586
.option("path", "/path/to/output")
587
.option("checkpointLocation", "/path/to/checkpoint")
588
.trigger(Trigger.ProcessingTime("1 minute"))
589
.start()
590
591
// Complete mode - entire result table
592
val completeQuery = userCounts.writeStream
593
.outputMode(OutputMode.Complete)
594
.format("memory")
595
.queryName("user_counts_table")
596
.trigger(Trigger.ProcessingTime("30 seconds"))
597
.start()
598
599
// Update mode - only changed rows
600
val updateQuery = windowedCounts.writeStream
601
.outputMode(OutputMode.Update)
602
.format("console")
603
.option("truncate", "false")
604
.trigger(Trigger.ProcessingTime("2 minutes"))
605
.start()
606
607
// Once trigger - single micro-batch
608
val onceQuery = processedStream.writeStream
609
.outputMode(OutputMode.Append)
610
.format("json")
611
.option("path", "/path/to/batch/output")
612
.trigger(Trigger.Once)
613
.start()
614
615
// Available now trigger - process all available data
616
val availableNowQuery = processedStream.writeStream
617
.outputMode(OutputMode.Append)
618
.format("delta")
619
.option("path", "/path/to/delta/table")
620
.trigger(Trigger.AvailableNow)
621
.start()
622
623
// Continuous processing (experimental)
624
val continuousQuery = inputStream
625
.filter($"action" === "purchase")
626
.writeStream
627
.outputMode(OutputMode.Append)
628
.format("console")
629
.trigger(Trigger.Continuous("1 second"))
630
.start()
631
```
632
633
### Custom Sinks
634
```scala
635
// Custom ForeachWriter
636
class DatabaseWriter extends ForeachWriter[Row] {
637
var connection: java.sql.Connection = _
638
var statement: java.sql.PreparedStatement = _
639
640
override def open(partitionId: Long, epochId: Long): Boolean = {
641
// Initialize database connection
642
connection = java.sql.DriverManager.getConnection(
643
"jdbc:postgresql://localhost/mydb", "user", "password"
644
)
645
statement = connection.prepareStatement(
646
"INSERT INTO user_activity (user_id, action, timestamp) VALUES (?, ?, ?)"
647
)
648
true
649
}
650
651
override def process(value: Row): Unit = {
652
statement.setString(1, value.getString(0))
653
statement.setString(2, value.getString(1))
654
statement.setTimestamp(3, value.getTimestamp(2))
655
statement.executeUpdate()
656
}
657
658
override def close(errorOrNull: Throwable): Unit = {
659
if (statement != null) statement.close()
660
if (connection != null) connection.close()
661
}
662
}
663
664
// Use custom writer
665
val customSinkQuery = processedStream.writeStream
666
.foreach(new DatabaseWriter())
667
.option("checkpointLocation", "/path/to/custom/checkpoint")
668
.trigger(Trigger.ProcessingTime("10 seconds"))
669
.start()
670
671
// ForeachBatch for batch processing
672
val foreachBatchQuery = processedStream.writeStream
673
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
674
println(s"Processing batch $batchId with ${batchDF.count()} records")
675
676
// Custom processing logic
677
batchDF.cache()
678
679
// Write to multiple sinks
680
batchDF.write
681
.mode("append")
682
.parquet(s"/path/to/archive/batch_$batchId")
683
684
batchDF.write
685
.format("jdbc")
686
.option("url", "jdbc:postgresql://localhost/mydb")
687
.option("dbtable", "processed_activity")
688
.mode("append")
689
.save()
690
691
batchDF.unpersist()
692
}
693
.option("checkpointLocation", "/path/to/batch/checkpoint")
694
.start()
695
```
696
697
### Query Management and Monitoring
698
```scala
699
// Start multiple queries
700
val queries = Array(appendQuery, completeQuery, updateQuery)
701
702
// Monitor query status
703
queries.foreach { query =>
704
println(s"Query ${query.name} - Active: ${query.isActive}")
705
if (query.lastProgress != null) {
706
println(s"Batch ${query.lastProgress.batchId} processed ${query.lastProgress.inputRowsPerSecond} rows/sec")
707
}
708
}
709
710
// Wait for termination
711
spark.streams.awaitAnyTermination()
712
713
// Query manager operations
714
val activeQueries = spark.streams.active
715
println(s"Number of active queries: ${activeQueries.length}")
716
717
// Get specific query by name
718
val specificQuery = spark.streams.get("user_counts_table")
719
720
// Add streaming query listener
721
spark.streams.addListener(new StreamingQueryListener() {
722
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
723
println(s"Query started: ${queryStarted.name}")
724
}
725
726
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
727
println(s"Query terminated: ${queryTerminated.id}")
728
}
729
730
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
731
val progress = queryProgress.progress
732
println(s"Query ${progress.name}: processed ${progress.inputRowsPerSecond} rows/sec")
733
}
734
})
735
736
// Graceful shutdown
737
sys.addShutdownHook {
738
println("Stopping streaming queries...")
739
spark.streams.active.foreach(_.stop())
740
spark.stop()
741
}
742
```