or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog.mdcolumns-functions.mddata-io.mddataset-dataframe.mdindex.mdsession-management.mdstreaming.mdtypes-encoders.mdudfs.md

streaming.mddocs/

0

# Streaming Operations

1

2

Real-time data processing with structured streaming. Provides unified batch and streaming APIs with exactly-once processing guarantees, fault tolerance, and advanced streaming analytics capabilities.

3

4

## Capabilities

5

6

### DataStreamReader

7

8

Interface for reading streaming data from various sources.

9

10

```scala { .api }

11

/**

12

* Interface for reading streaming data from various sources

13

*/

14

class DataStreamReader {

15

/** Specify streaming data source format */

16

def format(source: String): DataStreamReader

17

18

/** Set schema for the streaming data */

19

def schema(schema: StructType): DataStreamReader

20

def schema(schemaString: String): DataStreamReader

21

22

/** Set options for the streaming source */

23

def option(key: String, value: String): DataStreamReader

24

def option(key: String, value: Boolean): DataStreamReader

25

def option(key: String, value: Long): DataStreamReader

26

def option(key: String, value: Double): DataStreamReader

27

def options(options: scala.collection.Map[String, String]): DataStreamReader

28

def options(options: java.util.Map[String, String]): DataStreamReader

29

30

/** Load streaming data using generic interface */

31

def load(): DataFrame

32

def load(path: String): DataFrame

33

34

/** Built-in streaming sources */

35

def text(path: String): DataFrame

36

def textFile(path: String): Dataset[String]

37

def csv(path: String): DataFrame

38

def json(path: String): DataFrame

39

def parquet(path: String): DataFrame

40

def orc(path: String): DataFrame

41

42

/** Kafka streaming source */

43

def kafka(): DataFrame

44

45

/** Socket streaming source (for testing) */

46

def socket(host: String, port: Int): DataFrame

47

def socket(host: String, port: Int, includeTimestamp: Boolean): DataFrame

48

49

/** Rate streaming source (for testing) */

50

def rate(rowsPerSecond: Long): DataFrame

51

def rate(rowsPerSecond: Long, numPartitions: Int): DataFrame

52

}

53

```

54

55

**Usage Examples:**

56

57

```scala

58

// File-based streaming

59

val streamingDf = spark.readStream

60

.schema(schema)

61

.option("maxFilesPerTrigger", "10")

62

.json("path/to/streaming/json/files")

63

64

// Kafka streaming

65

val kafkaStream = spark.readStream

66

.format("kafka")

67

.option("kafka.bootstrap.servers", "localhost:9092")

68

.option("subscribe", "topic1,topic2")

69

.option("startingOffsets", "latest")

70

.load()

71

72

// Socket streaming (for testing)

73

val socketStream = spark.readStream

74

.format("socket")

75

.option("host", "localhost")

76

.option("port", 9999)

77

.load()

78

79

// Rate source (for testing)

80

val rateStream = spark.readStream

81

.format("rate")

82

.option("rowsPerSecond", 100)

83

.option("numPartitions", 4)

84

.load()

85

```

86

87

### DataStreamWriter

88

89

Interface for writing streaming Dataset to various sinks.

90

91

```scala { .api }

92

/**

93

* Interface for writing streaming Dataset to various sinks

94

* @tparam T Type of the streaming Dataset

95

*/

96

class DataStreamWriter[T] {

97

/** Specify output format */

98

def format(source: String): DataStreamWriter[T]

99

100

/** Set output mode */

101

def outputMode(outputMode: OutputMode): DataStreamWriter[T]

102

def outputMode(outputMode: String): DataStreamWriter[T]

103

104

/** Set trigger for micro-batch processing */

105

def trigger(trigger: Trigger): DataStreamWriter[T]

106

107

/** Set options for the streaming sink */

108

def option(key: String, value: String): DataStreamWriter[T]

109

def option(key: String, value: Boolean): DataStreamWriter[T]

110

def option(key: String, value: Long): DataStreamWriter[T]

111

def option(key: String, value: Double): DataStreamWriter[T]

112

def options(options: scala.collection.Map[String, String]): DataStreamWriter[T]

113

def options(options: java.util.Map[String, String]): DataStreamWriter[T]

114

115

/** Partition output by columns */

116

def partitionBy(colNames: String*): DataStreamWriter[T]

117

118

/** Query name for identification */

119

def queryName(queryName: String): DataStreamWriter[T]

120

121

/** Start streaming query */

122

def start(): StreamingQuery

123

def start(path: String): StreamingQuery

124

125

/** Built-in streaming sinks */

126

def console(): DataStreamWriter[T]

127

def console(numRows: Int): DataStreamWriter[T]

128

def console(numRows: Int, truncate: Boolean): DataStreamWriter[T]

129

130

/** File-based sinks */

131

def json(path: String): StreamingQuery

132

def parquet(path: String): StreamingQuery

133

def orc(path: String): StreamingQuery

134

def text(path: String): StreamingQuery

135

def csv(path: String): StreamingQuery

136

137

/** Custom row-by-row processing */

138

def foreach(writer: ForeachWriter[T]): DataStreamWriter[T]

139

def foreachBatch(function: (Dataset[T], Long) => Unit): DataStreamWriter[T]

140

141

/** Memory sink (for testing) */

142

def memory(queryName: String): DataStreamWriter[T]

143

}

144

```

145

146

**Usage Examples:**

147

148

```scala

149

// Console output

150

val query1 = streamingDf.writeStream

151

.outputMode("append")

152

.format("console")

153

.option("numRows", 20)

154

.option("truncate", false)

155

.start()

156

157

// File output with partitioning

158

val query2 = streamingDf.writeStream

159

.format("parquet")

160

.outputMode("append")

161

.option("path", "output/streaming")

162

.option("checkpointLocation", "checkpoints/streaming")

163

.partitionBy("year", "month")

164

.trigger(Trigger.ProcessingTime("30 seconds"))

165

.start()

166

167

// Kafka output

168

val query3 = processedStream

169

.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

170

.writeStream

171

.format("kafka")

172

.option("kafka.bootstrap.servers", "localhost:9092")

173

.option("topic", "output-topic")

174

.option("checkpointLocation", "checkpoints/kafka-out")

175

.outputMode("append")

176

.start()

177

178

// Custom processing with foreachBatch

179

val query4 = streamingDf.writeStream

180

.foreachBatch { (batchDf: DataFrame, batchId: Long) =>

181

println(s"Processing batch $batchId with ${batchDf.count()} records")

182

batchDf.write

183

.mode(SaveMode.Append)

184

.saveAsTable("streaming_results")

185

}

186

.trigger(Trigger.ProcessingTime("1 minute"))

187

.start()

188

```

189

190

### Output Modes

191

192

Different output modes for streaming queries.

193

194

```scala { .api }

195

/**

196

* Output modes for streaming queries

197

*/

198

object OutputMode {

199

/** Only append new rows to the result table */

200

val Append: OutputMode = "append"

201

202

/** Output complete result table for each trigger */

203

val Complete: OutputMode = "complete"

204

205

/** Output only updated rows since last trigger */

206

val Update: OutputMode = "update"

207

}

208

```

209

210

### Triggers

211

212

Control timing and execution of streaming micro-batches.

213

214

```scala { .api }

215

/**

216

* Triggers for controlling streaming execution

217

*/

218

sealed trait Trigger

219

220

object Trigger {

221

/** Process data as fast as possible */

222

def ProcessingTime(interval: String): Trigger

223

def ProcessingTime(interval: Long, unit: TimeUnit): Trigger

224

225

/** Process data once and stop */

226

def Once(): Trigger

227

228

/** Trigger based on availability of data */

229

def AvailableNow(): Trigger

230

231

/** Continuous processing (experimental) */

232

def Continuous(interval: String): Trigger

233

def Continuous(interval: Long, unit: TimeUnit): Trigger

234

}

235

```

236

237

**Usage Examples:**

238

239

```scala

240

import java.util.concurrent.TimeUnit

241

242

// Process every 30 seconds

243

val trigger1 = Trigger.ProcessingTime("30 seconds")

244

val trigger2 = Trigger.ProcessingTime(30, TimeUnit.SECONDS)

245

246

// Process once and terminate

247

val trigger3 = Trigger.Once()

248

249

// Process all available data and terminate

250

val trigger4 = Trigger.AvailableNow()

251

252

// Continuous processing (low-latency)

253

val trigger5 = Trigger.Continuous("1 second")

254

```

255

256

### StreamingQuery

257

258

Handle to a running streaming query with control and monitoring capabilities.

259

260

```scala { .api }

261

/**

262

* Handle to a running streaming query

263

*/

264

trait StreamingQuery {

265

/** Unique identifier for the query */

266

def id: UUID

267

268

/** Name of the query */

269

def name: String

270

271

/** Check if query is currently active */

272

def isActive: Boolean

273

274

/** Block until query terminates */

275

def awaitTermination(): Unit

276

def awaitTermination(timeoutMs: Long): Boolean

277

278

/** Stop the query */

279

def stop(): Unit

280

281

/** Get the most recent progress update */

282

def lastProgress: StreamingQueryProgress

283

284

/** Get recent progress updates */

285

def recentProgress: Array[StreamingQueryProgress]

286

287

/** Get current status */

288

def status: StreamingQueryStatus

289

290

/** Get exception that caused query to stop (if any) */

291

def exception: Option[StreamingQueryException]

292

293

/** Explain the query plan */

294

def explain(): Unit

295

def explain(extended: Boolean): Unit

296

}

297

```

298

299

### StreamingQueryManager

300

301

Manager for all StreamingQueries in a SparkSession.

302

303

```scala { .api }

304

/**

305

* Manager for all StreamingQueries in a SparkSession

306

*/

307

class StreamingQueryManager {

308

/** Get currently active streaming queries */

309

def active: Array[StreamingQuery]

310

311

/** Get a query by id */

312

def get(id: UUID): StreamingQuery

313

def get(id: String): StreamingQuery

314

315

/** Block until all queries terminate */

316

def awaitAnyTermination(): Unit

317

def awaitAnyTermination(timeoutMs: Long): Boolean

318

319

/** Reset terminates all active queries */

320

def resetTerminated(): Unit

321

322

/** Add listener for streaming query events */

323

def addListener(listener: StreamingQueryListener): Unit

324

def removeListener(listener: StreamingQueryListener): Unit

325

}

326

```

327

328

### ForeachWriter

329

330

Custom sink for row-by-row processing.

331

332

```scala { .api }

333

/**

334

* Abstract class for custom streaming sinks

335

* @tparam T Type of rows to process

336

*/

337

abstract class ForeachWriter[T] extends Serializable {

338

/** Called when starting to process a partition */

339

def open(partitionId: Long, epochId: Long): Boolean

340

341

/** Called for each row */

342

def process(value: T): Unit

343

344

/** Called when finishing processing a partition */

345

def close(errorOrNull: Throwable): Unit

346

}

347

```

348

349

**Usage Example:**

350

351

```scala

352

import org.apache.spark.sql.ForeachWriter

353

354

val customWriter = new ForeachWriter[Row] {

355

def open(partitionId: Long, epochId: Long): Boolean = {

356

// Initialize resources (e.g., database connection)

357

println(s"Opening partition $partitionId for epoch $epochId")

358

true

359

}

360

361

def process(value: Row): Unit = {

362

// Process each row

363

val id = value.getAs[Long]("id")

364

val name = value.getAs[String]("name")

365

println(s"Processing record: $id, $name")

366

// Write to external system

367

}

368

369

def close(errorOrNull: Throwable): Unit = {

370

// Clean up resources

371

if (errorOrNull != null) {

372

println(s"Error occurred: ${errorOrNull.getMessage}")

373

}

374

println("Closing partition")

375

}

376

}

377

378

val query = streamingDf.writeStream

379

.foreach(customWriter)

380

.start()

381

```

382

383

### Streaming Aggregations

384

385

Advanced aggregation operations in streaming context.

386

387

**Time-based aggregations:**

388

389

```scala

390

import org.apache.spark.sql.functions._

391

392

// Window-based aggregations

393

val windowedCounts = streamingDf

394

.withWatermark("timestamp", "10 minutes")

395

.groupBy(

396

window(col("timestamp"), "5 minutes", "1 minute"),

397

col("category")

398

)

399

.count()

400

401

// Tumbling window

402

val tumblingWindow = streamingDf

403

.withWatermark("timestamp", "10 minutes")

404

.groupBy(window(col("timestamp"), "10 minutes"))

405

.agg(sum("amount").alias("total_amount"))

406

407

// Session window (event-time sessions)

408

val sessionWindow = streamingDf

409

.withWatermark("timestamp", "10 minutes")

410

.groupBy(

411

col("userId"),

412

session_window(col("timestamp"), "5 minutes")

413

)

414

.agg(count("*").alias("events_in_session"))

415

```

416

417

**Stateful operations:**

418

419

```scala

420

// Global aggregations (require complete output mode)

421

val globalCounts = streamingDf

422

.groupBy("category")

423

.count()

424

425

// With watermarking for late data handling

426

val withWatermark = streamingDf

427

.withWatermark("eventTime", "1 hour")

428

.groupBy("userId", window(col("eventTime"), "30 minutes"))

429

.agg(sum("amount").alias("total"))

430

```

431

432

### Common Streaming Patterns

433

434

**Exactly-once processing:**

435

436

```scala

437

val query = streamingDf

438

.writeStream

439

.outputMode("append")

440

.option("checkpointLocation", "s3://bucket/checkpoints/query1")

441

.format("delta") // or other ACID-compliant sink

442

.start("s3://bucket/output/table")

443

```

444

445

**Error handling and monitoring:**

446

447

```scala

448

val query = streamingDf.writeStream

449

.foreachBatch { (batchDf: DataFrame, batchId: Long) =>

450

try {

451

batchDf.write

452

.mode(SaveMode.Append)

453

.saveAsTable("results")

454

} catch {

455

case e: Exception =>

456

println(s"Error processing batch $batchId: ${e.getMessage}")

457

// Log to monitoring system

458

throw e // Re-throw to trigger query restart

459

}

460

}

461

.trigger(Trigger.ProcessingTime("30 seconds"))

462

.start()

463

464

// Monitor query progress

465

val progressInfo = query.lastProgress

466

println(s"Input rows/sec: ${progressInfo.inputRowsPerSecond}")

467

println(s"Processing time: ${progressInfo.durationMs}")

468

```

469

470

**Stream-stream joins:**

471

472

```scala

473

val stream1 = spark.readStream.format("kafka")...

474

val stream2 = spark.readStream.format("kafka")...

475

476

val joinedStream = stream1.alias("s1")

477

.join(stream2.alias("s2"),

478

expr("s1.key = s2.key AND s1.timestamp >= s2.timestamp - interval 1 hour AND s1.timestamp <= s2.timestamp + interval 1 hour"),

479

"inner"

480

)

481

```

482

483

**Stream-static joins:**

484

485

```scala

486

val staticDf = spark.read.table("reference_data")

487

val streamingDf = spark.readStream...

488

489

val enrichedStream = streamingDf

490

.join(staticDf, "key") // Broadcast join with static data

491

```