or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog.mddata-sources.mddata-types.mddataframe-dataset.mdindex.mdsession-management.mdsql-functions.mdstreaming.mdudfs.md

streaming.mddocs/

0

# Apache Spark SQL - Streaming

1

2

## Capabilities

3

4

### Structured Streaming Query Execution

5

- Execute continuous data processing with fault-tolerant, exactly-once semantics using micro-batch processing

6

- Handle late-arriving data with configurable watermarking and event-time processing

7

- Support for stateful operations including aggregations, joins, and custom state management

8

- Process unbounded streams with automatic checkpointing and recovery mechanisms

9

10

### Stream Data Sources and Sinks

11

- Read from various streaming sources including Kafka, files, sockets, and rate sources for testing

12

- Write to multiple sink types including files, Kafka, console, memory, and foreach sinks

13

- Support for different output modes including append, complete, and update for different use cases

14

- Handle schema evolution and format changes in streaming data pipelines

15

16

### Trigger Management and Processing Control

17

- Configure processing triggers including fixed intervals, once triggers, continuous processing, and available now

18

- Control micro-batch sizing and processing intervals for throughput and latency optimization

19

- Support for event-time processing with watermarks for handling out-of-order data

20

- Enable backpressure handling and dynamic batch sizing based on cluster capacity

21

22

### Stateful Stream Processing

23

- Maintain state across micro-batches for complex event processing and session analytics

24

- Support for arbitrary stateful processing using mapGroupsWithState and flatMapGroupsWithState

25

- Handle state expiration and cleanup with configurable timeout policies

26

- Enable stateful stream-stream joins with configurable state retention policies

27

28

## API Reference

29

30

### DataStreamReader Class

31

```scala { .api }

32

class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging {

33

// Format specification

34

def format(source: String): DataStreamReader

35

36

// Schema definition

37

def schema(schema: StructType): DataStreamReader

38

def schema(schemaString: String): DataStreamReader

39

40

// Options configuration

41

def option(key: String, value: String): DataStreamReader

42

def option(key: String, value: Boolean): DataStreamReader

43

def option(key: String, value: Long): DataStreamReader

44

def option(key: String, value: Double): DataStreamReader

45

def options(options: scala.collection.Map[String, String]): DataStreamReader

46

def options(options: java.util.Map[String, String]): DataStreamReader

47

48

// Load operations

49

def load(): DataFrame

50

def load(path: String): DataFrame

51

52

// Format-specific loaders

53

def json(path: String): DataFrame

54

def csv(path: String): DataFrame

55

def parquet(path: String): DataFrame

56

def orc(path: String): DataFrame

57

def text(path: String): DataFrame

58

def textFile(path: String): Dataset[String]

59

60

// Kafka loader

61

def kafka(): DataFrame

62

63

// Socket and rate sources

64

def socket(host: String, port: Int): DataFrame

65

def socket(host: String, port: Int, includeTimestamp: Boolean): DataFrame

66

def rate(rowsPerSecond: Long): DataFrame

67

def rate(rowsPerSecond: Long, rampUpTime: Long): DataFrame

68

def rate(rowsPerSecond: Long, rampUpTime: Long, numPartitions: Int): DataFrame

69

}

70

```

71

72

### DataStreamWriter[T] Class

73

```scala { .api }

74

class DataStreamWriter[T] private[sql](ds: Dataset[T]) {

75

// Format specification

76

def format(source: String): DataStreamWriter[T]

77

78

// Output mode configuration

79

def outputMode(outputMode: OutputMode): DataStreamWriter[T]

80

def outputMode(outputMode: String): DataStreamWriter[T]

81

82

// Trigger configuration

83

def trigger(trigger: Trigger): DataStreamWriter[T]

84

85

// Options configuration

86

def option(key: String, value: String): DataStreamWriter[T]

87

def option(key: String, value: Boolean): DataStreamWriter[T]

88

def option(key: String, value: Long): DataStreamWriter[T]

89

def option(key: String, value: Double): DataStreamWriter[T]

90

def options(options: scala.collection.Map[String, String]): DataStreamWriter[T]

91

def options(options: java.util.Map[String, String]): DataStreamWriter[T]

92

93

// Partitioning and ordering

94

def partitionBy(colNames: String*): DataStreamWriter[T]

95

def partitionBy(cols: Seq[String]): DataStreamWriter[T]

96

97

// Query naming and checkpointing

98

def queryName(queryName: String): DataStreamWriter[T]

99

def queryTimeout(timeoutMs: Long): DataStreamWriter[T]

100

101

// Start operations

102

def start(): StreamingQuery

103

def start(path: String): StreamingQuery

104

105

// Format-specific writers

106

def json(path: String): StreamingQuery

107

def csv(path: String): StreamingQuery

108

def parquet(path: String): StreamingQuery

109

def orc(path: String): StreamingQuery

110

def text(path: String): StreamingQuery

111

112

// Special sinks

113

def console(): StreamingQuery

114

def console(numRows: Int): StreamingQuery

115

def console(numRows: Int, truncate: Boolean): StreamingQuery

116

def memory(queryName: String): StreamingQuery

117

def kafka(): StreamingQuery

118

119

// Custom sink

120

def foreach(writer: ForeachWriter[T]): StreamingQuery

121

def foreachBatch(function: (Dataset[T], Long) => Unit): StreamingQuery

122

def foreachBatch(function: VoidFunction2[Dataset[T], java.lang.Long]): StreamingQuery

123

}

124

```

125

126

### StreamingQuery Interface

127

```scala { .api }

128

trait StreamingQuery {

129

// Query identification

130

def id: UUID

131

def runId: UUID

132

def name: String

133

134

// Query control

135

def start(): StreamingQuery

136

def stop(): Unit

137

def stop(stopGracefully: Boolean): Unit

138

def processAllAvailable(): Unit

139

140

// Query state

141

def isActive: Boolean

142

def awaitTermination(): Unit

143

def awaitTermination(timeoutMs: Long): Boolean

144

def exception: Option[StreamingQueryException]

145

146

// Progress monitoring

147

def lastProgress: StreamingQueryProgress

148

def recentProgress: Array[StreamingQueryProgress]

149

def status: StreamingQueryStatus

150

151

// Explain plans

152

def explain(): Unit

153

def explain(extended: Boolean): Unit

154

}

155

```

156

157

### StreamingQueryManager Class

158

```scala { .api }

159

abstract class StreamingQueryManager {

160

// Active queries management

161

def active: Array[StreamingQuery]

162

def get(id: UUID): StreamingQuery

163

def get(name: String): StreamingQuery

164

165

// Termination handling

166

def awaitAnyTermination(): Unit

167

def awaitAnyTermination(timeoutMs: Long): Boolean

168

def resetTerminated(): Unit

169

170

// Listeners

171

def addListener(listener: StreamingQueryListener): Unit

172

def removeListener(listener: StreamingQueryListener): Unit

173

}

174

```

175

176

### Trigger Types

177

```scala { .api }

178

// Base trigger trait

179

sealed trait Trigger

180

181

// Processing time trigger

182

case class ProcessingTimeTrigger(interval: Long) extends Trigger

183

184

object Trigger {

185

// Factory methods

186

def ProcessingTime(interval: String): Trigger

187

def ProcessingTime(interval: Long, unit: TimeUnit): Trigger

188

def ProcessingTime(interval: Duration): Trigger

189

def Once(): Trigger

190

def Continuous(interval: String): Trigger

191

def Continuous(interval: Long, unit: TimeUnit): Trigger

192

def Continuous(interval: Duration): Trigger

193

def AvailableNow(): Trigger

194

}

195

196

// Specific trigger implementations

197

case object OnceTrigger extends Trigger

198

case class ContinuousTrigger(interval: Long) extends Trigger

199

case object AvailableNowTrigger extends Trigger

200

```

201

202

### Output Modes

203

```scala { .api }

204

sealed trait OutputMode

205

206

object OutputMode {

207

case object Append extends OutputMode

208

case object Complete extends OutputMode

209

case object Update extends OutputMode

210

211

def apply(outputMode: String): OutputMode

212

}

213

```

214

215

### State Management

216

```scala { .api }

217

// Group state for stateful operations

218

abstract class GroupState[S] extends Serializable {

219

// State access

220

def exists: Boolean

221

def get: S

222

def getOption: Option[S]

223

def update(newState: S): Unit

224

def remove(): Unit

225

226

// Timeout management

227

def setTimeoutDuration(durationMs: Long): Unit

228

def setTimeoutDuration(duration: String): Unit

229

def setTimeoutTimestamp(timestampMs: Long): Unit

230

def setTimeoutTimestamp(timestamp: Date): Unit

231

def getCurrentWatermarkMs(): Long

232

def getCurrentProcessingTimeMs(): Long

233

def hasTimedOut: Boolean

234

}

235

236

// State timeout configuration

237

sealed trait GroupStateTimeout

238

case object NoTimeout extends GroupStateTimeout

239

case object ProcessingTimeTimeout extends GroupStateTimeout

240

case object EventTimeTimeout extends GroupStateTimeout

241

```

242

243

### Streaming Query Progress and Status

244

```scala { .api }

245

// Query progress information

246

class StreamingQueryProgress private[sql] (

247

val id: UUID,

248

val runId: UUID,

249

val name: String,

250

val timestamp: String,

251

val batchId: Long,

252

val batchDuration: Long,

253

val durationMs: Map[String, Long],

254

val eventTime: Map[String, String],

255

val stateOperators: Array[StateOperatorProgress],

256

val sources: Array[SourceProgress],

257

val sink: SinkProgress,

258

val observedMetrics: Map[String, Row]) extends Serializable {

259

260

def inputRowsPerSecond: Double

261

def processedRowsPerSecond: Double

262

def prettyJson: String

263

def json: String

264

}

265

266

// Query status information

267

class StreamingQueryStatus private[sql] (

268

val message: String,

269

val isDataAvailable: Boolean,

270

val isTriggerActive: Boolean) extends Serializable

271

272

// State operator progress

273

class StateOperatorProgress private[sql] (

274

val operatorName: String,

275

val numRowsTotal: Long,

276

val numRowsUpdated: Long,

277

val memoryUsedBytes: Long,

278

val customMetrics: Map[String, Long] = Map.empty) extends Serializable

279

280

// Source progress

281

class SourceProgress private[sql] (

282

val description: String,

283

val startOffset: String,

284

val endOffset: String,

285

val numInputRows: Long,

286

val inputRowsPerSecond: Double,

287

val processedRowsPerSecond: Double,

288

val metrics: Map[String, String] = Map.empty) extends Serializable

289

290

// Sink progress

291

class SinkProgress private[sql] (

292

val description: String,

293

val numOutputRows: Long,

294

val metrics: Map[String, String] = Map.empty) extends Serializable

295

```

296

297

### Custom Sinks

298

```scala { .api }

299

// ForeachWriter for custom output

300

abstract class ForeachWriter[T] extends Serializable {

301

// Lifecycle methods

302

def open(partitionId: Long, epochId: Long): Boolean

303

def process(value: T): Unit

304

def close(errorOrNull: Throwable): Unit

305

}

306

307

// Simplified batch writer

308

abstract class ForeachBatchSink[T] extends Serializable {

309

def process(batchDF: Dataset[T], batchId: Long): Unit

310

}

311

```

312

313

### Watermark Operations

314

```scala { .api }

315

// Watermark functions (in Dataset class)

316

class Dataset[T] {

317

def withWatermark(eventTime: String, delayThreshold: String): Dataset[T]

318

}

319

320

// Watermark utilities

321

object Functions {

322

def window(timeColumn: Column, windowDuration: String): Column

323

def window(timeColumn: Column, windowDuration: String, slideDuration: String): Column

324

def window(timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String): Column

325

def session_window(timeColumn: Column, gapDuration: String): Column

326

}

327

```

328

329

## Usage Examples

330

331

### Basic Streaming Setup

332

```scala

333

import org.apache.spark.sql.{SparkSession, Dataset, DataFrame}

334

import org.apache.spark.sql.streaming._

335

import org.apache.spark.sql.functions._

336

import org.apache.spark.sql.types._

337

338

val spark = SparkSession.builder()

339

.appName("Streaming Example")

340

.config("spark.sql.streaming.checkpointLocation", "/path/to/checkpoint")

341

.getOrCreate()

342

343

// Define schema for streaming data

344

val userActivitySchema = StructType(Array(

345

StructField("user_id", StringType, nullable = false),

346

StructField("action", StringType, nullable = false),

347

StructField("timestamp", TimestampType, nullable = false),

348

StructField("session_id", StringType, nullable = false),

349

StructField("page_url", StringType, nullable = true),

350

StructField("duration", IntegerType, nullable = true)

351

))

352

353

// Read from JSON files

354

val inputStream = spark.readStream

355

.format("json")

356

.schema(userActivitySchema)

357

.option("maxFilesPerTrigger", "1")

358

.load("/path/to/streaming/data")

359

360

// Basic transformations

361

val processedStream = inputStream

362

.filter($"action" =!= "heartbeat")

363

.withColumn("hour", hour($"timestamp"))

364

.withColumn("date", to_date($"timestamp"))

365

```

366

367

### File-based Streaming Sources

368

```scala

369

// CSV streaming

370

val csvStream = spark.readStream

371

.format("csv")

372

.option("header", "true")

373

.option("inferSchema", "false")

374

.schema(userActivitySchema)

375

.load("/path/to/csv/files")

376

377

// Parquet streaming

378

val parquetStream = spark.readStream

379

.format("parquet")

380

.schema(userActivitySchema)

381

.load("/path/to/parquet/files")

382

383

// Text file streaming

384

val textStream = spark.readStream

385

.format("text")

386

.load("/path/to/text/files")

387

.select(

388

regexp_extract($"value", """(\w+)\s+(.+)""", 1).as("level"),

389

regexp_extract($"value", """(\w+)\s+(.+)""", 2).as("message"),

390

current_timestamp().as("processed_time")

391

)

392

```

393

394

### Kafka Integration

395

```scala

396

// Read from Kafka

397

val kafkaStream = spark.readStream

398

.format("kafka")

399

.option("kafka.bootstrap.servers", "localhost:9092")

400

.option("subscribe", "user-activity,page-views")

401

.option("startingOffsets", "latest") // or "earliest"

402

.option("failOnDataLoss", "false")

403

.load()

404

405

// Parse Kafka messages

406

val parsedKafkaStream = kafkaStream.select(

407

$"topic",

408

$"partition",

409

$"offset",

410

$"timestamp",

411

$"key".cast(StringType).as("message_key"),

412

from_json($"value".cast(StringType), userActivitySchema).as("data")

413

).select($"topic", $"partition", $"offset", $"timestamp", $"message_key", $"data.*")

414

415

// Write to Kafka

416

val kafkaOutput = processedStream.writeStream

417

.format("kafka")

418

.option("kafka.bootstrap.servers", "localhost:9092")

419

.option("topic", "processed-activity")

420

.option("checkpointLocation", "/path/to/kafka/checkpoint")

421

.trigger(Trigger.ProcessingTime("30 seconds"))

422

.start()

423

```

424

425

### Aggregations and Windowing

426

```scala

427

// Simple aggregations

428

val userCounts = inputStream

429

.groupBy($"user_id")

430

.count()

431

432

// Time-based windowing

433

val windowedCounts = inputStream

434

.withWatermark("timestamp", "10 minutes")

435

.groupBy(

436

window($"timestamp", "5 minutes", "1 minute"),

437

$"action"

438

)

439

.agg(

440

count("*").as("action_count"),

441

countDistinct("user_id").as("unique_users"),

442

avg("duration").as("avg_duration")

443

)

444

445

// Session windows

446

val sessionAnalysis = inputStream

447

.withWatermark("timestamp", "30 minutes")

448

.groupBy(

449

$"user_id",

450

session_window($"timestamp", "10 minutes")

451

)

452

.agg(

453

count("*").as("actions_in_session"),

454

sum("duration").as("total_session_time"),

455

collect_list("action").as("session_actions")

456

)

457

458

// Complex aggregations with multiple time windows

459

val multiWindowAnalysis = inputStream

460

.withWatermark("timestamp", "1 hour")

461

.groupBy(

462

$"user_id",

463

window($"timestamp", "1 hour"),

464

$"action"

465

)

466

.agg(

467

count("*").as("hourly_count"),

468

first("timestamp").as("first_action_time"),

469

last("timestamp").as("last_action_time")

470

)

471

.groupBy($"user_id", $"window")

472

.agg(

473

sum("hourly_count").as("total_actions"),

474

collect_map("action", "hourly_count").as("action_breakdown")

475

)

476

```

477

478

### Stateful Processing

479

```scala

480

case class UserSession(

481

userId: String,

482

startTime: Long,

483

lastActivity: Long,

484

actionCount: Int,

485

totalDuration: Int

486

)

487

488

// Stateful session tracking

489

val sessionTracking = inputStream

490

.groupByKey(_.getString(0)) // Group by user_id

491

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout) {

492

(userId: String, values: Iterator[Row], state: GroupState[UserSession]) =>

493

494

val events = values.toSeq

495

val now = System.currentTimeMillis()

496

497

// Get or create session state

498

val session = if (state.exists) {

499

state.get

500

} else {

501

UserSession(userId, now, now, 0, 0)

502

}

503

504

// Update session with new events

505

val updatedSession = events.foldLeft(session) { (s, event) =>

506

val eventTime = event.getAs[java.sql.Timestamp]("timestamp").getTime

507

val duration = Option(event.getAs[Integer]("duration")).map(_.toInt).getOrElse(0)

508

509

s.copy(

510

lastActivity = math.max(s.lastActivity, eventTime),

511

actionCount = s.actionCount + 1,

512

totalDuration = s.totalDuration + duration

513

)

514

}

515

516

// Set timeout for 30 minutes of inactivity

517

state.setTimeoutDuration("30 minutes")

518

519

// Check for session timeout

520

if (state.hasTimedOut) {

521

state.remove()

522

("session_expired", updatedSession)

523

} else {

524

state.update(updatedSession)

525

("session_active", updatedSession)

526

}

527

}

528

```

529

530

### Stream-Stream Joins

531

```scala

532

// Define two streams

533

val clickStream = spark.readStream

534

.format("kafka")

535

.option("kafka.bootstrap.servers", "localhost:9092")

536

.option("subscribe", "clicks")

537

.load()

538

.select(

539

from_json($"value".cast("string"), clickSchema).as("data")

540

)

541

.select($"data.*")

542

.withWatermark("timestamp", "2 hours")

543

544

val impressionStream = spark.readStream

545

.format("kafka")

546

.option("kafka.bootstrap.servers", "localhost:9092")

547

.option("subscribe", "impressions")

548

.load()

549

.select(

550

from_json($"value".cast("string"), impressionSchema).as("data")

551

)

552

.select($"data.*")

553

.withWatermark("timestamp", "3 hours")

554

555

// Join streams with time constraints

556

val joinedStream = impressionStream.join(

557

clickStream,

558

expr("""

559

impression_id = click_impression_id AND

560

click_timestamp >= impression_timestamp AND

561

click_timestamp <= impression_timestamp + interval 1 hour

562

"""),

563

joinType = "leftOuter"

564

)

565

566

// Aggregate joined data

567

val conversionAnalysis = joinedStream

568

.withWatermark("impression_timestamp", "1 hour")

569

.groupBy(

570

window($"impression_timestamp", "10 minutes"),

571

$"campaign_id"

572

)

573

.agg(

574

count("impression_id").as("impressions"),

575

count("click_impression_id").as("clicks"),

576

(count("click_impression_id") * 100.0 / count("impression_id")).as("ctr")

577

)

578

```

579

580

### Output Modes and Triggers

581

```scala

582

// Append mode - only new rows

583

val appendQuery = processedStream.writeStream

584

.outputMode(OutputMode.Append)

585

.format("parquet")

586

.option("path", "/path/to/output")

587

.option("checkpointLocation", "/path/to/checkpoint")

588

.trigger(Trigger.ProcessingTime("1 minute"))

589

.start()

590

591

// Complete mode - entire result table

592

val completeQuery = userCounts.writeStream

593

.outputMode(OutputMode.Complete)

594

.format("memory")

595

.queryName("user_counts_table")

596

.trigger(Trigger.ProcessingTime("30 seconds"))

597

.start()

598

599

// Update mode - only changed rows

600

val updateQuery = windowedCounts.writeStream

601

.outputMode(OutputMode.Update)

602

.format("console")

603

.option("truncate", "false")

604

.trigger(Trigger.ProcessingTime("2 minutes"))

605

.start()

606

607

// Once trigger - single micro-batch

608

val onceQuery = processedStream.writeStream

609

.outputMode(OutputMode.Append)

610

.format("json")

611

.option("path", "/path/to/batch/output")

612

.trigger(Trigger.Once)

613

.start()

614

615

// Available now trigger - process all available data

616

val availableNowQuery = processedStream.writeStream

617

.outputMode(OutputMode.Append)

618

.format("delta")

619

.option("path", "/path/to/delta/table")

620

.trigger(Trigger.AvailableNow)

621

.start()

622

623

// Continuous processing (experimental)

624

val continuousQuery = inputStream

625

.filter($"action" === "purchase")

626

.writeStream

627

.outputMode(OutputMode.Append)

628

.format("console")

629

.trigger(Trigger.Continuous("1 second"))

630

.start()

631

```

632

633

### Custom Sinks

634

```scala

635

// Custom ForeachWriter

636

class DatabaseWriter extends ForeachWriter[Row] {

637

var connection: java.sql.Connection = _

638

var statement: java.sql.PreparedStatement = _

639

640

override def open(partitionId: Long, epochId: Long): Boolean = {

641

// Initialize database connection

642

connection = java.sql.DriverManager.getConnection(

643

"jdbc:postgresql://localhost/mydb", "user", "password"

644

)

645

statement = connection.prepareStatement(

646

"INSERT INTO user_activity (user_id, action, timestamp) VALUES (?, ?, ?)"

647

)

648

true

649

}

650

651

override def process(value: Row): Unit = {

652

statement.setString(1, value.getString(0))

653

statement.setString(2, value.getString(1))

654

statement.setTimestamp(3, value.getTimestamp(2))

655

statement.executeUpdate()

656

}

657

658

override def close(errorOrNull: Throwable): Unit = {

659

if (statement != null) statement.close()

660

if (connection != null) connection.close()

661

}

662

}

663

664

// Use custom writer

665

val customSinkQuery = processedStream.writeStream

666

.foreach(new DatabaseWriter())

667

.option("checkpointLocation", "/path/to/custom/checkpoint")

668

.trigger(Trigger.ProcessingTime("10 seconds"))

669

.start()

670

671

// ForeachBatch for batch processing

672

val foreachBatchQuery = processedStream.writeStream

673

.foreachBatch { (batchDF: DataFrame, batchId: Long) =>

674

println(s"Processing batch $batchId with ${batchDF.count()} records")

675

676

// Custom processing logic

677

batchDF.cache()

678

679

// Write to multiple sinks

680

batchDF.write

681

.mode("append")

682

.parquet(s"/path/to/archive/batch_$batchId")

683

684

batchDF.write

685

.format("jdbc")

686

.option("url", "jdbc:postgresql://localhost/mydb")

687

.option("dbtable", "processed_activity")

688

.mode("append")

689

.save()

690

691

batchDF.unpersist()

692

}

693

.option("checkpointLocation", "/path/to/batch/checkpoint")

694

.start()

695

```

696

697

### Query Management and Monitoring

698

```scala

699

// Start multiple queries

700

val queries = Array(appendQuery, completeQuery, updateQuery)

701

702

// Monitor query status

703

queries.foreach { query =>

704

println(s"Query ${query.name} - Active: ${query.isActive}")

705

if (query.lastProgress != null) {

706

println(s"Batch ${query.lastProgress.batchId} processed ${query.lastProgress.inputRowsPerSecond} rows/sec")

707

}

708

}

709

710

// Wait for termination

711

spark.streams.awaitAnyTermination()

712

713

// Query manager operations

714

val activeQueries = spark.streams.active

715

println(s"Number of active queries: ${activeQueries.length}")

716

717

// Get specific query by name

718

val specificQuery = spark.streams.get("user_counts_table")

719

720

// Add streaming query listener

721

spark.streams.addListener(new StreamingQueryListener() {

722

override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {

723

println(s"Query started: ${queryStarted.name}")

724

}

725

726

override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {

727

println(s"Query terminated: ${queryTerminated.id}")

728

}

729

730

override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {

731

val progress = queryProgress.progress

732

println(s"Query ${progress.name}: processed ${progress.inputRowsPerSecond} rows/sec")

733

}

734

})

735

736

// Graceful shutdown

737

sys.addShutdownHook {

738

println("Stopping streaming queries...")

739

spark.streams.active.foreach(_.stop())

740

spark.stop()

741

}

742

```