or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core.mdexceptions.mdgraphx.mdindex.mdlogging.mdmllib.mdsql.mdstorage.mdstreaming.mdutils.md

streaming.mddocs/

0

# Structured Streaming

1

2

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. It provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.

3

4

## Capabilities

5

6

### DataStreamReader

7

8

Interface for reading streaming data from various sources.

9

10

```scala { .api }

11

/**

12

* Interface used to load a streaming Dataset from external storage systems.

13

*/

14

class DataStreamReader {

15

// Format specification

16

def format(source: String): DataStreamReader

17

18

// Schema specification (recommended for production)

19

def schema(schema: StructType): DataStreamReader

20

def schema(schemaString: String): DataStreamReader

21

22

// Options configuration

23

def option(key: String, value: String): DataStreamReader

24

def option(key: String, value: Boolean): DataStreamReader

25

def option(key: String, value: Long): DataStreamReader

26

def option(key: String, value: Double): DataStreamReader

27

def options(options: Map[String, String]): DataStreamReader

28

29

// Built-in sources

30

def csv(path: String): DataFrame

31

def json(path: String): DataFrame

32

def parquet(path: String): DataFrame

33

def text(path: String): DataFrame

34

def textFile(path: String): Dataset[String]

35

36

// Generic load

37

def load(): DataFrame

38

def load(path: String): DataFrame

39

40

// Table sources

41

def table(tableName: String): DataFrame

42

}

43

```

44

45

**Usage Examples:**

46

47

```scala

48

import org.apache.spark.sql.SparkSession

49

import org.apache.spark.sql.types._

50

51

val spark = SparkSession.builder().appName("StructuredStreaming").getOrCreate()

52

53

// Define schema (recommended for production)

54

val schema = StructType(Array(

55

StructField("timestamp", TimestampType, true),

56

StructField("user_id", StringType, true),

57

StructField("action", StringType, true),

58

StructField("value", DoubleType, true)

59

))

60

61

// Read from file source

62

val fileStream = spark.readStream

63

.schema(schema)

64

.option("maxFilesPerTrigger", "1")

65

.json("path/to/streaming/json/files")

66

67

// Read from Kafka

68

val kafkaStream = spark.readStream

69

.format("kafka")

70

.option("kafka.bootstrap.servers", "localhost:9092")

71

.option("subscribe", "events")

72

.option("startingOffsets", "latest")

73

.load()

74

75

// Read from socket (for testing)

76

val socketStream = spark.readStream

77

.format("socket")

78

.option("host", "localhost")

79

.option("port", 9999)

80

.load()

81

82

// Read from rate source (for testing)

83

val rateStream = spark.readStream

84

.format("rate")

85

.option("rowsPerSecond", "10")

86

.option("numPartitions", "2")

87

.load()

88

```

89

90

### Stream Processing Operations

91

92

All DataFrame/Dataset operations are available on streaming DataFrames:

93

94

```scala

95

import org.apache.spark.sql.functions._

96

97

// Basic transformations

98

val processedStream = kafkaStream

99

.select(

100

col("key").cast("string"),

101

from_json(col("value").cast("string"), schema).as("data")

102

)

103

.select("data.*")

104

.withColumn("processing_time", current_timestamp())

105

.filter(col("value") > 0)

106

107

// Windowing operations

108

val windowedCounts = processedStream

109

.withWatermark("timestamp", "10 minutes")

110

.groupBy(

111

window(col("timestamp"), "5 minutes", "1 minute"),

112

col("user_id")

113

)

114

.agg(

115

count("*").as("event_count"),

116

sum("value").as("total_value"),

117

avg("value").as("avg_value")

118

)

119

120

// Stream-stream joins

121

val stream1 = spark.readStream.format("kafka").option("subscribe", "topic1").load()

122

val stream2 = spark.readStream.format("kafka").option("subscribe", "topic2").load()

123

124

val joinedStream = stream1.alias("s1")

125

.join(

126

stream2.alias("s2"),

127

expr("s1.user_id = s2.user_id AND s2.timestamp >= s1.timestamp AND s2.timestamp <= s1.timestamp + interval 1 hour"),

128

"inner"

129

)

130

131

// Stream-static joins

132

val staticDf = spark.read.parquet("path/to/static/data")

133

val enrichedStream = processedStream

134

.join(staticDf, "user_id")

135

```

136

137

### DataStreamWriter

138

139

Interface for writing streaming results to various sinks.

140

141

```scala { .api }

142

/**

143

* Interface used to write a streaming DataFrame to external storage systems.

144

*/

145

class DataStreamWriter[T] {

146

// Output format

147

def format(source: String): DataStreamWriter[T]

148

149

// Output mode

150

def outputMode(outputMode: String): DataStreamWriter[T] // "append", "complete", "update"

151

def outputMode(outputMode: OutputMode): DataStreamWriter[T]

152

153

// Processing options

154

def trigger(trigger: Trigger): DataStreamWriter[T]

155

def option(key: String, value: String): DataStreamWriter[T]

156

def option(key: String, value: Boolean): DataStreamWriter[T]

157

def option(key: String, value: Long): DataStreamWriter[T]

158

def option(key: String, value: Double): DataStreamWriter[T]

159

def options(options: Map[String, String]): DataStreamWriter[T]

160

161

// Checkpointing

162

def checkpointLocation(location: String): DataStreamWriter[T]

163

164

// Query naming

165

def queryName(queryName: String): DataStreamWriter[T]

166

167

// Partitioning (for file sinks)

168

def partitionBy(colNames: String*): DataStreamWriter[T]

169

170

// Start query

171

def start(): StreamingQuery

172

def start(path: String): StreamingQuery // For file-based sinks

173

174

// Built-in sinks

175

def toTable(tableName: String): StreamingQuery

176

}

177

178

// Trigger types

179

object Trigger {

180

def ProcessingTime(interval: String): Trigger

181

def ProcessingTime(interval: FiniteDuration): Trigger

182

def Once(): Trigger

183

def Continuous(interval: String): Trigger

184

def Continuous(interval: FiniteDuration): Trigger

185

def AvailableNow(): Trigger

186

}

187

```

188

189

**Usage Examples:**

190

191

```scala

192

import org.apache.spark.sql.streaming.Trigger

193

import scala.concurrent.duration._

194

195

// Console sink (for debugging)

196

val consoleQuery = processedStream.writeStream

197

.outputMode("append")

198

.format("console")

199

.option("truncate", "false")

200

.trigger(Trigger.ProcessingTime("10 seconds"))

201

.start()

202

203

// File sink

204

val fileQuery = processedStream.writeStream

205

.outputMode("append")

206

.format("parquet")

207

.option("path", "path/to/output")

208

.option("checkpointLocation", "path/to/checkpoint")

209

.partitionBy("date")

210

.trigger(Trigger.ProcessingTime("30 seconds"))

211

.start()

212

213

// Kafka sink

214

val kafkaSink = processedStream

215

.selectExpr("CAST(user_id AS STRING) AS key", "to_json(struct(*)) AS value")

216

.writeStream

217

.format("kafka")

218

.option("kafka.bootstrap.servers", "localhost:9092")

219

.option("topic", "output-topic")

220

.option("checkpointLocation", "path/to/checkpoint")

221

.outputMode("append")

222

.start()

223

224

// Delta Lake sink (if available)

225

val deltaQuery = processedStream.writeStream

226

.format("delta")

227

.outputMode("append")

228

.option("checkpointLocation", "path/to/checkpoint")

229

.toTable("events_table")

230

231

// Memory sink (for testing)

232

val memoryQuery = processedStream.writeStream

233

.outputMode("complete")

234

.format("memory")

235

.queryName("memory_table")

236

.start()

237

238

// Foreach sink (custom logic)

239

val foreachQuery = processedStream.writeStream

240

.foreach(new ForeachWriter[Row] {

241

def open(partitionId: Long, epochId: Long): Boolean = true

242

def process(record: Row): Unit = {

243

// Custom processing logic

244

println(s"Processing record: $record")

245

}

246

def close(errorOrNull: Throwable): Unit = {}

247

})

248

.start()

249

```

250

251

### StreamingQuery Management

252

253

Interface for monitoring and controlling streaming queries.

254

255

```scala { .api }

256

/**

257

* A handle to a query that is executing continuously in the background as new data arrives.

258

*/

259

abstract class StreamingQuery {

260

// Query identification

261

def name: String

262

def id: UUID

263

def runId: UUID

264

265

// Query status

266

def isActive: Boolean

267

def awaitTermination(): Unit

268

def awaitTermination(timeoutMs: Long): Boolean

269

def stop(): Unit

270

271

// Progress monitoring

272

def lastProgress: StreamingQueryProgress

273

def recentProgress: Array[StreamingQueryProgress]

274

def status: StreamingQueryStatus

275

276

// Exception handling

277

def exception: Option[StreamingQueryException]

278

279

// Spark SQL integration

280

def sparkSession: SparkSession

281

}

282

283

/**

284

* Information about progress made in the execution of a StreamingQuery.

285

*/

286

case class StreamingQueryProgress(

287

id: UUID,

288

runId: UUID,

289

name: String,

290

timestamp: String,

291

batchId: Long,

292

batchDuration: Long,

293

durationMs: java.util.Map[String, java.lang.Long],

294

eventTime: java.util.Map[String, String],

295

stateOperators: Array[StateOperatorProgress],

296

sources: Array[SourceProgress],

297

sink: SinkProgress

298

)

299

300

/**

301

* Manager for all StreamingQuery instances in a SparkSession.

302

*/

303

abstract class StreamingQueryManager {

304

def active: Array[StreamingQuery]

305

def get(id: UUID): StreamingQuery

306

def get(name: String): StreamingQuery

307

def awaitAnyTermination(): Unit

308

def awaitAnyTermination(timeoutMs: Long): Boolean

309

def resetTerminated(): Unit

310

}

311

```

312

313

**Usage Examples:**

314

315

```scala

316

// Start multiple queries

317

val query1 = stream1.writeStream.queryName("query1").format("console").start()

318

val query2 = stream2.writeStream.queryName("query2").format("memory").start()

319

320

// Monitor queries

321

spark.streams.active.foreach { query =>

322

println(s"Query ${query.name} is ${if (query.isActive) "active" else "inactive"}")

323

}

324

325

// Wait for specific query

326

query1.awaitTermination()

327

328

// Monitor progress

329

query1.lastProgress match {

330

case progress =>

331

println(s"Batch ${progress.batchId}: ${progress.durationMs.get("triggerExecution")}ms")

332

println(s"Input rate: ${progress.inputRowsPerSecond}")

333

println(s"Processing rate: ${progress.processedRowsPerSecond}")

334

}

335

336

// Handle exceptions

337

query1.exception match {

338

case Some(ex) =>

339

println(s"Query failed: ${ex.getMessage}")

340

println(s"Start offset: ${ex.startOffset}")

341

println(s"End offset: ${ex.endOffset}")

342

case None => println("Query running successfully")

343

}

344

345

// Stop all queries

346

spark.streams.active.foreach(_.stop())

347

```

348

349

### Window Operations and Event Time

350

351

Support for time-based operations with watermarking:

352

353

```scala

354

import org.apache.spark.sql.functions._

355

356

// Tumbling windows

357

val tumblingWindow = eventsStream

358

.withWatermark("eventTime", "2 minutes")

359

.groupBy(window(col("eventTime"), "10 minutes"))

360

.agg(count("*").as("count"))

361

362

// Sliding windows

363

val slidingWindow = eventsStream

364

.withWatermark("eventTime", "2 minutes")

365

.groupBy(window(col("eventTime"), "10 minutes", "5 minutes"))

366

.agg(count("*").as("count"))

367

368

// Complex aggregations

369

val sessionWindows = eventsStream

370

.withWatermark("eventTime", "5 minutes")

371

.groupBy(

372

col("userId"),

373

session_window(col("eventTime"), "30 minutes")

374

)

375

.agg(

376

count("*").as("eventCount"),

377

collect_list("action").as("actions"),

378

min("eventTime").as("sessionStart"),

379

max("eventTime").as("sessionEnd")

380

)

381

```

382

383

### Stateful Operations

384

385

Support for maintaining state across streaming batches:

386

387

```scala

388

// Streaming deduplication

389

val deduplicated = eventsStream

390

.withWatermark("eventTime", "1 hour")

391

.dropDuplicates("eventId", "eventTime")

392

393

// Arbitrary stateful operations

394

import org.apache.spark.sql.streaming.GroupState

395

import org.apache.spark.sql.streaming.GroupStateTimeout

396

397

case class UserSession(userId: String, sessionStart: Long, eventCount: Int)

398

399

def updateUserSession(userId: String, events: Iterator[Event], state: GroupState[UserSession]): UserSession = {

400

val currentSession = state.getOption.getOrElse(UserSession(userId, System.currentTimeMillis(), 0))

401

val newEventCount = currentSession.eventCount + events.size

402

val updatedSession = currentSession.copy(eventCount = newEventCount)

403

404

state.update(updatedSession)

405

state.setTimeoutDuration("1 hour")

406

407

updatedSession

408

}

409

410

val userSessions = eventsStream

411

.withWatermark("eventTime", "10 minutes")

412

.as[Event]

413

.groupByKey(_.userId)

414

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(updateUserSession)

415

```

416

417

## Performance and Monitoring

418

419

### Optimization Strategies

420

421

1. **Schema Definition**: Always define schemas for structured streaming sources

422

2. **Watermarking**: Use watermarks for event-time processing and state cleanup

423

3. **Checkpointing**: Configure appropriate checkpoint locations for fault tolerance

424

4. **Trigger Configuration**: Choose appropriate trigger intervals based on latency requirements

425

5. **Resource Allocation**: Allocate sufficient resources for streaming workloads

426

427

### Monitoring and Debugging

428

429

```scala

430

// Streaming metrics

431

val query = eventsStream.writeStream

432

.option("checkpointLocation", checkpointPath)

433

.foreachBatch { (batchDF: DataFrame, batchId: Long) =>

434

println(s"Processing batch $batchId with ${batchDF.count()} records")

435

batchDF.show()

436

}

437

.start()

438

439

// Query monitoring in Spark UI

440

// - Streaming tab shows active queries and progress

441

// - SQL tab shows streaming SQL queries

442

// - Jobs tab shows streaming job execution

443

444

// Programmatic monitoring

445

def monitorQuery(query: StreamingQuery): Unit = {

446

while (query.isActive) {

447

val progress = query.lastProgress

448

if (progress != null) {

449

println(s"Batch ${progress.batchId}: processed ${progress.numInputRows} rows in ${progress.batchDuration}ms")

450

}

451

Thread.sleep(10000)

452

}

453

}

454

```

455

456

Structured Streaming provides a powerful, fault-tolerant stream processing engine that unifies batch and streaming semantics under a single programming model.