or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregations.mdcatalog.mddata-io.mddataframe-dataset.mdfunctions-expressions.mdindex.mdsession-management.mdstreaming.md

streaming.mddocs/

0

# Streaming Queries

1

2

Spark Structured Streaming provides a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. It allows you to express streaming computations the same way you would express a batch computation on static data, using the same DataFrame and Dataset APIs.

3

4

## Core Streaming Concepts

5

6

### DataStreamReader

7

8

```scala { .api }

9

class DataStreamReader {

10

def format(source: String): DataStreamReader

11

def schema(schema: StructType): DataStreamReader

12

def schema(schemaString: String): DataStreamReader

13

def option(key: String, value: String): DataStreamReader

14

def option(key: String, value: Boolean): DataStreamReader

15

def option(key: String, value: Long): DataStreamReader

16

def option(key: String, value: Double): DataStreamReader

17

def options(options: scala.collection.Map[String, String]): DataStreamReader

18

def options(options: java.util.Map[String, String]): DataStreamReader

19

def load(): DataFrame

20

def load(path: String): DataFrame

21

}

22

```

23

24

### DataStreamWriter

25

26

```scala { .api }

27

class DataStreamWriter[T] {

28

def outputMode(outputMode: OutputMode): DataStreamWriter[T]

29

def outputMode(outputMode: String): DataStreamWriter[T]

30

def trigger(trigger: Trigger): DataStreamWriter[T]

31

def queryName(queryName: String): DataStreamWriter[T]

32

def format(source: String): DataStreamWriter[T]

33

def partitionBy(colNames: String*): DataStreamWriter[T]

34

def option(key: String, value: String): DataStreamWriter[T]

35

def option(key: String, value: Boolean): DataStreamWriter[T]

36

def option(key: String, value: Long): DataStreamWriter[T]

37

def option(key: String, value: Double): DataStreamWriter[T]

38

def options(options: scala.collection.Map[String, String]): DataStreamWriter[T]

39

def options(options: java.util.Map[String, String]): DataStreamWriter[T]

40

def start(): StreamingQuery

41

def start(path: String): StreamingQuery

42

def foreach(writer: ForeachWriter[T]): DataStreamWriter[T]

43

def foreachBatch(function: (Dataset[T], Long) => Unit): DataStreamWriter[T]

44

}

45

```

46

47

## Stream Sources

48

49

### File Sources

50

51

**Usage Examples:**

52

53

```scala

54

// JSON file stream

55

val jsonStream = spark.readStream

56

.format("json")

57

.schema(schema) // Schema is required for file sources

58

.option("path", "/path/to/json/files")

59

.load()

60

61

// CSV file stream

62

val csvStream = spark.readStream

63

.format("csv")

64

.schema(csvSchema)

65

.option("header", "true")

66

.option("path", "/path/to/csv/files")

67

.load()

68

69

// Parquet file stream

70

val parquetStream = spark.readStream

71

.format("parquet")

72

.schema(parquetSchema)

73

.load("/path/to/parquet/files")

74

75

// Text file stream

76

val textStream = spark.readStream

77

.format("text")

78

.option("path", "/path/to/text/files")

79

.load()

80

```

81

82

### Kafka Source

83

84

```scala

85

// Basic Kafka stream

86

val kafkaStream = spark.readStream

87

.format("kafka")

88

.option("kafka.bootstrap.servers", "localhost:9092")

89

.option("subscribe", "topic1,topic2")

90

.load()

91

92

// Kafka with specific partitions

93

val kafkaPartitions = spark.readStream

94

.format("kafka")

95

.option("kafka.bootstrap.servers", "localhost:9092")

96

.option("subscribePattern", "topic.*")

97

.option("startingOffsets", "latest")

98

.option("endingOffsets", "latest")

99

.load()

100

101

// Kafka stream processing

102

val processedKafka = kafkaStream

103

.select(

104

col("key").cast("string"),

105

col("value").cast("string"),

106

col("topic"),

107

col("partition"),

108

col("offset"),

109

col("timestamp")

110

)

111

.filter(col("topic") === "important_topic")

112

```

113

114

### Socket Source (for testing)

115

116

```scala

117

// Socket stream (testing only)

118

val socketStream = spark.readStream

119

.format("socket")

120

.option("host", "localhost")

121

.option("port", 9999)

122

.load()

123

```

124

125

### Rate Source (for testing)

126

127

```scala

128

// Rate source for load testing

129

val rateStream = spark.readStream

130

.format("rate")

131

.option("rowsPerSecond", "100")

132

.option("numPartitions", "10")

133

.load()

134

```

135

136

## Output Modes

137

138

```scala { .api }

139

object OutputMode {

140

val Append: OutputMode

141

val Complete: OutputMode

142

val Update: OutputMode

143

}

144

```

145

146

### Output Mode Usage

147

148

```scala

149

// Append mode - only new rows added to result table

150

val appendQuery = df.writeStream

151

.outputMode(OutputMode.Append)

152

.format("console")

153

.start()

154

155

// Complete mode - entire result table is output

156

val completeQuery = df

157

.groupBy("category")

158

.count()

159

.writeStream

160

.outputMode(OutputMode.Complete)

161

.format("console")

162

.start()

163

164

// Update mode - only updated rows in result table

165

val updateQuery = df

166

.groupBy("id")

167

.agg(max("timestamp"))

168

.writeStream

169

.outputMode(OutputMode.Update)

170

.format("console")

171

.start()

172

```

173

174

## Triggers

175

176

```scala { .api }

177

object Trigger {

178

def ProcessingTime(interval: String): Trigger

179

def ProcessingTime(interval: Long, unit: TimeUnit): Trigger

180

def Once(): Trigger

181

def Continuous(interval: String): Trigger

182

def Continuous(interval: Long, unit: TimeUnit): Trigger

183

}

184

```

185

186

### Trigger Examples

187

188

```scala

189

import org.apache.spark.sql.streaming.Trigger

190

import java.util.concurrent.TimeUnit

191

192

// Micro-batch processing

193

val microBatchQuery = df.writeStream

194

.trigger(Trigger.ProcessingTime("30 seconds"))

195

.outputMode("append")

196

.format("console")

197

.start()

198

199

// One-time trigger (batch-like)

200

val onceQuery = df.writeStream

201

.trigger(Trigger.Once())

202

.outputMode("append")

203

.format("parquet")

204

.option("path", "/output/path")

205

.start()

206

207

// Continuous processing (experimental)

208

val continuousQuery = df.writeStream

209

.trigger(Trigger.Continuous("1 second"))

210

.outputMode("append")

211

.format("console")

212

.start()

213

```

214

215

## StreamingQuery Management

216

217

```scala { .api }

218

class StreamingQuery {

219

def id: UUID

220

def runId: UUID

221

def name: String

222

def sparkSession: SparkSession

223

def isActive: Boolean

224

def exception: Option[StreamingQueryException]

225

def status: StreamingQueryStatus

226

def recentProgress: Array[StreamingQueryProgress]

227

def lastProgress: StreamingQueryProgress

228

229

def awaitTermination(): Unit

230

def awaitTermination(timeoutMs: Long): Boolean

231

def processAllAvailable(): Unit

232

def stop(): Unit

233

def explain(): Unit

234

def explain(extended: Boolean): Unit

235

}

236

```

237

238

### Query Lifecycle Management

239

240

**Usage Examples:**

241

242

```scala

243

// Start a streaming query

244

val query = df

245

.groupBy("category")

246

.count()

247

.writeStream

248

.queryName("category_counts")

249

.outputMode("complete")

250

.format("memory")

251

.start()

252

253

// Monitor query status

254

println(s"Query ID: ${query.id}")

255

println(s"Is Active: ${query.isActive}")

256

println(s"Recent Progress: ${query.recentProgress.length} batches")

257

258

// Wait for termination

259

query.awaitTermination()

260

261

// Or wait with timeout

262

val finished = query.awaitTermination(60000) // 60 seconds

263

if (!finished) {

264

println("Query still running after timeout")

265

query.stop()

266

}

267

268

// Process all available data (testing)

269

query.processAllAvailable()

270

271

// Stop the query

272

query.stop()

273

274

// Exception handling

275

query.exception match {

276

case Some(e) => println(s"Query failed: ${e.getMessage}")

277

case None => println("Query completed successfully")

278

}

279

```

280

281

### Multiple Query Management

282

283

```scala

284

// Manage multiple streaming queries

285

val queries = mutable.ArrayBuffer[StreamingQuery]()

286

287

// Start multiple queries

288

queries += df1.writeStream.queryName("query1").format("console").start()

289

queries += df2.writeStream.queryName("query2").format("console").start()

290

queries += df3.writeStream.queryName("query3").format("console").start()

291

292

// Wait for all queries

293

try {

294

queries.foreach(_.awaitTermination())

295

} catch {

296

case e: Exception =>

297

println(s"A query failed: ${e.getMessage}")

298

queries.foreach(_.stop())

299

}

300

301

// Access all active queries

302

val allQueries = spark.streams.active

303

allQueries.foreach { query =>

304

println(s"Query: ${query.name}, Active: ${query.isActive}")

305

}

306

```

307

308

## Stream Sinks

309

310

### Console Sink

311

312

```scala

313

// Basic console output

314

val consoleQuery = df.writeStream

315

.format("console")

316

.outputMode("append")

317

.start()

318

319

// Console with options

320

val detailedConsole = df.writeStream

321

.format("console")

322

.option("numRows", "50")

323

.option("truncate", "false")

324

.outputMode("append")

325

.start()

326

```

327

328

### File Sinks

329

330

```scala

331

// Parquet sink

332

val parquetSink = df.writeStream

333

.format("parquet")

334

.option("path", "/output/parquet")

335

.option("checkpointLocation", "/checkpoint/parquet")

336

.outputMode("append")

337

.start()

338

339

// JSON sink with partitioning

340

val jsonSink = df.writeStream

341

.format("json")

342

.option("path", "/output/json")

343

.option("checkpointLocation", "/checkpoint/json")

344

.partitionBy("year", "month")

345

.outputMode("append")

346

.start()

347

348

// Delta sink (requires Delta Lake)

349

val deltaSink = df.writeStream

350

.format("delta")

351

.option("path", "/delta/table")

352

.option("checkpointLocation", "/checkpoint/delta")

353

.outputMode("append")

354

.start()

355

```

356

357

### Kafka Sink

358

359

```scala

360

// Kafka sink

361

val kafkaSink = df

362

.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")

363

.writeStream

364

.format("kafka")

365

.option("kafka.bootstrap.servers", "localhost:9092")

366

.option("topic", "output_topic")

367

.option("checkpointLocation", "/checkpoint/kafka")

368

.outputMode("append")

369

.start()

370

371

// Kafka sink with headers

372

val kafkaWithHeaders = df

373

.withColumn("headers",

374

array(struct(lit("source").alias("key"), lit("spark").alias("value"))))

375

.selectExpr(

376

"CAST(id AS STRING) AS key",

377

"to_json(struct(*)) AS value",

378

"headers"

379

)

380

.writeStream

381

.format("kafka")

382

.option("kafka.bootstrap.servers", "localhost:9092")

383

.option("topic", "output_topic")

384

.outputMode("append")

385

.start()

386

```

387

388

### Memory Sink (for testing)

389

390

```scala

391

// Memory sink for testing

392

val memoryQuery = df

393

.groupBy("category")

394

.count()

395

.writeStream

396

.queryName("memory_table")

397

.format("memory")

398

.outputMode("complete")

399

.start()

400

401

// Query the memory table

402

spark.sql("SELECT * FROM memory_table").show()

403

```

404

405

## Custom Sinks with ForeachWriter

406

407

```scala { .api }

408

abstract class ForeachWriter[T] extends Serializable {

409

def open(partitionId: Long, epochId: Long): Boolean

410

def process(value: T): Unit

411

def close(errorOrNull: Throwable): Unit

412

}

413

```

414

415

### Custom ForeachWriter Example

416

417

```scala

418

import org.apache.spark.sql.ForeachWriter

419

420

// Custom writer to database

421

class DatabaseWriter extends ForeachWriter[Row] {

422

var connection: java.sql.Connection = _

423

var statement: java.sql.PreparedStatement = _

424

425

def open(partitionId: Long, epochId: Long): Boolean = {

426

// Initialize connection

427

connection = java.sql.DriverManager.getConnection(

428

"jdbc:postgresql://localhost/mydb", "user", "password")

429

statement = connection.prepareStatement(

430

"INSERT INTO results (id, value, timestamp) VALUES (?, ?, ?)")

431

true

432

}

433

434

def process(value: Row): Unit = {

435

// Process each row

436

statement.setLong(1, value.getLong(0))

437

statement.setString(2, value.getString(1))

438

statement.setTimestamp(3, value.getTimestamp(2))

439

statement.executeUpdate()

440

}

441

442

def close(errorOrNull: Throwable): Unit = {

443

// Clean up

444

if (statement != null) statement.close()

445

if (connection != null) connection.close()

446

}

447

}

448

449

// Use custom writer

450

val customWriterQuery = df.writeStream

451

.foreach(new DatabaseWriter())

452

.outputMode("append")

453

.start()

454

```

455

456

### ForeachBatch for Custom Logic

457

458

```scala

459

// ForeachBatch for custom processing

460

val foreachBatchQuery = df.writeStream

461

.foreachBatch { (batchDF: DataFrame, batchId: Long) =>

462

println(s"Processing batch $batchId")

463

464

// Custom logic per batch

465

if (batchDF.count() > 0) {

466

// Write to multiple sinks

467

batchDF.write

468

.mode("append")

469

.parquet(s"/output/batch_$batchId")

470

471

// Also update a summary table

472

val summary = batchDF

473

.groupBy("category")

474

.agg(sum("amount").alias("total"))

475

.withColumn("batch_id", lit(batchId))

476

477

summary.write

478

.mode("append")

479

.saveAsTable("batch_summaries")

480

}

481

}

482

.outputMode("append")

483

.start()

484

```

485

486

## Windowed Aggregations

487

488

### Time Windows

489

490

```scala

491

import org.apache.spark.sql.functions._

492

493

// Tumbling window

494

val tumblingWindow = df

495

.withWatermark("timestamp", "10 minutes")

496

.groupBy(

497

window(col("timestamp"), "15 minutes"),

498

col("category")

499

)

500

.agg(

501

sum("amount").alias("total_amount"),

502

count("*").alias("count")

503

)

504

505

// Sliding window

506

val slidingWindow = df

507

.withWatermark("timestamp", "10 minutes")

508

.groupBy(

509

window(col("timestamp"), "15 minutes", "5 minutes"),

510

col("category")

511

)

512

.agg(

513

avg("value").alias("avg_value"),

514

max("value").alias("max_value")

515

)

516

517

// Session window (Spark 3.2+)

518

val sessionWindow = df

519

.withWatermark("timestamp", "10 minutes")

520

.groupBy(

521

session_window(col("timestamp"), "30 minutes"),

522

col("user_id")

523

)

524

.agg(

525

count("*").alias("events_in_session"),

526

sum("duration").alias("total_duration")

527

)

528

```

529

530

### Watermarks for Late Data

531

532

```scala

533

// Handle late data with watermarks

534

val lateDataQuery = df

535

.withWatermark("event_time", "10 minutes") // Allow 10 minutes of late data

536

.groupBy(

537

window(col("event_time"), "5 minutes"),

538

col("device_id")

539

)

540

.agg(

541

avg("temperature").alias("avg_temp"),

542

count("*").alias("reading_count")

543

)

544

.writeStream

545

.outputMode("append") // Only complete windows are output

546

.format("console")

547

.start()

548

```

549

550

## Stream-Stream Joins

551

552

```scala

553

// Inner join between two streams

554

val stream1 = spark.readStream.format("kafka")...

555

val stream2 = spark.readStream.format("kafka")...

556

557

val joined = stream1.alias("s1")

558

.join(

559

stream2.alias("s2"),

560

expr("s1.id = s2.id AND s1.timestamp BETWEEN s2.timestamp - INTERVAL 5 MINUTES AND s2.timestamp + INTERVAL 5 MINUTES")

561

)

562

563

// Stream-static join

564

val staticDF = spark.read.parquet("/path/to/static/data")

565

val streamStaticJoin = streamDF

566

.join(staticDF, "key") // No time constraints needed

567

568

// Left outer join with watermarks

569

val outerJoined = stream1

570

.withWatermark("timestamp", "10 minutes")

571

.alias("left")

572

.join(

573

stream2.withWatermark("timestamp", "20 minutes").alias("right"),

574

expr("left.id = right.id AND left.timestamp BETWEEN right.timestamp - INTERVAL 5 MINUTES AND right.timestamp + INTERVAL 5 MINUTES"),

575

"leftOuter"

576

)

577

```

578

579

## Stateful Processing

580

581

### mapGroupsWithState

582

583

```scala

584

import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode}

585

586

case class InputEvent(userId: String, action: String, timestamp: Long)

587

case class UserSession(userId: String, startTime: Long, endTime: Long, actionCount: Int)

588

589

def updateUserSession(userId: String,

590

events: Iterator[InputEvent],

591

state: GroupState[UserSession]): UserSession = {

592

593

val currentSession = if (state.exists) state.get else UserSession(userId, Long.MaxValue, 0L, 0)

594

595

val newEvents = events.toSeq

596

val newActionCount = currentSession.actionCount + newEvents.size

597

val newStartTime = math.min(currentSession.startTime, newEvents.map(_.timestamp).min)

598

val newEndTime = math.max(currentSession.endTime, newEvents.map(_.timestamp).max)

599

600

val updatedSession = UserSession(userId, newStartTime, newEndTime, newActionCount)

601

602

// Update timeout

603

state.setTimeoutDuration("30 minutes")

604

state.update(updatedSession)

605

606

updatedSession

607

}

608

609

val sessionUpdates = inputStream

610

.as[InputEvent]

611

.groupByKey(_.userId)

612

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(updateUserSession)

613

```

614

615

## Monitoring and Debugging

616

617

### Query Progress

618

619

```scala

620

// Monitor query progress

621

val query = df.writeStream...

622

623

// Get progress information

624

val progress = query.lastProgress

625

println(s"Batch ID: ${progress.batchId}")

626

println(s"Input rows: ${progress.inputRowsPerSecond}")

627

println(s"Processing rate: ${progress.inputRowsPerSecond}")

628

println(s"Batch duration: ${progress.batchDuration}")

629

630

// Historical progress

631

query.recentProgress.foreach { progress =>

632

println(s"Batch ${progress.batchId}: ${progress.inputRowsPerSecond} rows/sec")

633

}

634

```

635

636

### Streaming Query Status

637

638

```scala

639

val status = query.status

640

println(s"Message: ${status.message}")

641

println(s"Is trigger active: ${status.isTriggerActive}")

642

println(s"Is data available: ${status.isDataAvailable}")

643

```

644

645

### Error Handling

646

647

```scala

648

// Query with error handling

649

val resilientQuery = df.writeStream

650

.foreachBatch { (batchDF, batchId) =>

651

try {

652

batchDF.write.mode("append").saveAsTable("output_table")

653

} catch {

654

case e: Exception =>

655

println(s"Failed to process batch $batchId: ${e.getMessage}")

656

// Log to error table or external system

657

val errorDF = spark.createDataFrame(Seq(

658

(batchId, e.getMessage, System.currentTimeMillis())

659

)).toDF("batch_id", "error_message", "timestamp")

660

errorDF.write.mode("append").saveAsTable("error_log")

661

}

662

}

663

.start()

664

```