or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core.mddeployment.mdgraphx.mdindex.mdml.mdsql.mdstreaming.md

streaming.mddocs/

0

# Stream Processing

1

2

Structured Streaming provides real-time stream processing with exactly-once fault-tolerance guarantees. Built on the Spark SQL engine for seamless integration with batch processing. Also includes legacy Spark Streaming (DStreams) for micro-batch processing.

3

4

## Capabilities

5

6

### Structured Streaming

7

8

Modern streaming API built on DataFrame/Dataset with continuous processing and exactly-once guarantees.

9

10

```scala { .api }

11

/**

12

* Reader for streaming data sources

13

*/

14

class DataStreamReader {

15

/** Specify streaming data source format */

16

def format(source: String): DataStreamReader

17

/** Add input option */

18

def option(key: String, value: String): DataStreamReader

19

def option(key: String, value: Boolean): DataStreamReader

20

def option(key: String, value: Long): DataStreamReader

21

def option(key: String, value: Double): DataStreamReader

22

/** Add multiple options */

23

def options(options: Map[String, String]): DataStreamReader

24

/** Set expected schema */

25

def schema(schema: StructType): DataStreamReader

26

def schema(schemaString: String): DataStreamReader

27

28

/** Load streaming DataFrame */

29

def load(): DataFrame

30

def load(path: String): DataFrame

31

32

/** Format-specific methods */

33

def json(path: String): DataFrame

34

def csv(path: String): DataFrame

35

def parquet(path: String): DataFrame

36

def text(path: String): DataFrame

37

def textFile(path: String): Dataset[String]

38

def kafka(options: Map[String, String]): DataFrame

39

def socket(host: String, port: Int): DataFrame

40

}

41

42

/**

43

* Writer for streaming data sinks

44

*/

45

class DataStreamWriter[T] {

46

/** Set output mode */

47

def outputMode(outputMode: OutputMode): DataStreamWriter[T]

48

def outputMode(outputMode: String): DataStreamWriter[T]

49

/** Set trigger interval */

50

def trigger(trigger: Trigger): DataStreamWriter[T]

51

/** Specify output format */

52

def format(source: String): DataStreamWriter[T]

53

/** Add output option */

54

def option(key: String, value: String): DataStreamWriter[T]

55

def option(key: String, value: Boolean): DataStreamWriter[T]

56

def option(key: String, value: Long): DataStreamWriter[T]

57

def option(key: String, value: Double): DataStreamWriter[T]

58

/** Add multiple options */

59

def options(options: Map[String, String]): DataStreamWriter[T]

60

/** Partition output by columns */

61

def partitionBy(colNames: String*): DataStreamWriter[T]

62

/** Set query name */

63

def queryName(queryName: String): DataStreamWriter[T]

64

65

/** Start streaming query */

66

def start(): StreamingQuery

67

def start(path: String): StreamingQuery

68

69

/** Format-specific methods */

70

def console(): StreamingQuery

71

def json(path: String): StreamingQuery

72

def csv(path: String): StreamingQuery

73

def parquet(path: String): StreamingQuery

74

def text(path: String): StreamingQuery

75

def kafka(): StreamingQuery

76

def memory(queryName: String): StreamingQuery

77

}

78

79

/**

80

* Output modes for streaming queries

81

*/

82

object OutputMode {

83

val Append: OutputMode

84

val Complete: OutputMode

85

val Update: OutputMode

86

}

87

88

/**

89

* Trigger policies for streaming queries

90

*/

91

object Trigger {

92

/** Process data as fast as possible */

93

def ProcessingTime(interval: String): Trigger

94

def ProcessingTime(interval: Long, unit: TimeUnit): Trigger

95

/** Process data once then stop */

96

def Once(): Trigger

97

/** Continuous processing with low latency */

98

def Continuous(interval: String): Trigger

99

def Continuous(interval: Long, unit: TimeUnit): Trigger

100

}

101

```

102

103

**Usage Examples:**

104

105

```scala

106

import org.apache.spark.sql.streaming._

107

import org.apache.spark.sql.functions._

108

109

// Read from file source

110

val lines = spark.readStream

111

.format("text")

112

.option("path", "input-directory")

113

.load()

114

115

// Word count example

116

val words = lines

117

.select(explode(split($"value", " ")).as("word"))

118

.groupBy("word")

119

.count()

120

121

// Write to console

122

val query = words.writeStream

123

.outputMode("complete")

124

.format("console")

125

.trigger(Trigger.ProcessingTime("10 seconds"))

126

.start()

127

128

query.awaitTermination()

129

130

// Kafka source and sink

131

val kafkaDF = spark.readStream

132

.format("kafka")

133

.option("kafka.bootstrap.servers", "localhost:9092")

134

.option("subscribe", "topic1")

135

.load()

136

137

val processedDF = kafkaDF

138

.select(from_json($"value".cast("string"), schema).as("data"))

139

.select("data.*")

140

.groupBy("category")

141

.count()

142

143

val output = processedDF.writeStream

144

.format("kafka")

145

.option("kafka.bootstrap.servers", "localhost:9092")

146

.option("topic", "output-topic")

147

.outputMode("update")

148

.start()

149

```

150

151

### StreamingQuery Management

152

153

Interface for managing and monitoring streaming queries.

154

155

```scala { .api }

156

/**

157

* Handle to a running streaming query

158

*/

159

abstract class StreamingQuery {

160

/** Unique identifier for the query */

161

def id: UUID

162

/** Name of the query */

163

def name: String

164

/** Check if query is currently active */

165

def isActive: Boolean

166

/** Get current status */

167

def status: StreamingQueryStatus

168

/** Get recent progress updates */

169

def recentProgress: Array[StreamingQueryProgress]

170

/** Get last progress update */

171

def lastProgress: StreamingQueryProgress

172

/** Block until query terminates */

173

def awaitTermination(): Unit

174

def awaitTermination(timeoutMs: Long): Boolean

175

/** Stop the query */

176

def stop(): Unit

177

/** Get exception that caused query to fail */

178

def exception: Option[StreamingQueryException]

179

/** Explain the streaming query plan */

180

def explain(): Unit

181

def explain(extended: Boolean): Unit

182

}

183

184

/**

185

* Manager for streaming queries

186

*/

187

class StreamingQueryManager {

188

/** Get currently active queries */

189

def active: Array[StreamingQuery]

190

/** Get query by id */

191

def get(id: UUID): StreamingQuery

192

def get(id: String): StreamingQuery

193

/** Block until all queries terminate */

194

def awaitAnyTermination(): Unit

195

def awaitAnyTermination(timeoutMs: Long): Boolean

196

/** Stop all active queries */

197

def stopAll(): Unit

198

/** Add listener for query events */

199

def addListener(listener: StreamingQueryListener): Unit

200

/** Remove listener */

201

def removeListener(listener: StreamingQueryListener): Unit

202

}

203

204

/**

205

* Progress information for streaming queries

206

*/

207

case class StreamingQueryProgress(

208

id: UUID,

209

runId: UUID,

210

name: String,

211

timestamp: String,

212

batchId: Long,

213

batchDuration: Long,

214

durationMs: Map[String, Long],

215

eventTime: Map[String, String],

216

stateOperators: Array[StateOperatorProgress],

217

sources: Array[SourceProgress],

218

sink: SinkProgress

219

)

220

```

221

222

### Window Operations

223

224

Time-based windowing for streaming aggregations.

225

226

```scala { .api }

227

/**

228

* Window functions for streaming data

229

*/

230

object functions {

231

/** Tumbling time window */

232

def window(

233

timeColumn: Column,

234

windowDuration: String

235

): Column

236

237

/** Sliding time window */

238

def window(

239

timeColumn: Column,

240

windowDuration: String,

241

slideDuration: String

242

): Column

243

244

/** Sliding time window with start offset */

245

def window(

246

timeColumn: Column,

247

windowDuration: String,

248

slideDuration: String,

249

startTime: String

250

): Column

251

252

/** Session window */

253

def session_window(

254

timeColumn: Column,

255

gapDuration: String

256

): Column

257

}

258

```

259

260

**Usage Examples:**

261

262

```scala

263

import org.apache.spark.sql.functions._

264

265

// Tumbling window (10 minute windows)

266

val windowed = events

267

.withWatermark("timestamp", "10 minutes")

268

.groupBy(

269

window($"timestamp", "10 minutes"),

270

$"category"

271

)

272

.count()

273

274

// Sliding window (10 minute windows, sliding every 5 minutes)

275

val sliding = events

276

.withWatermark("timestamp", "10 minutes")

277

.groupBy(

278

window($"timestamp", "10 minutes", "5 minutes"),

279

$"userId"

280

)

281

.agg(sum("amount").as("total"))

282

283

// Session window (gap-based)

284

val sessions = events

285

.withWatermark("timestamp", "30 minutes")

286

.groupBy(

287

$"userId",

288

session_window($"timestamp", "20 minutes")

289

)

290

.count()

291

```

292

293

### Watermarking and Late Data

294

295

Handling late-arriving data with watermarks.

296

297

```scala { .api }

298

/**

299

* Watermarking for handling late data

300

*/

301

implicit class DatasetWatermark[T](ds: Dataset[T]) {

302

/** Define watermark for late data handling */

303

def withWatermark(eventTime: String, delayThreshold: String): Dataset[T]

304

}

305

```

306

307

**Usage Examples:**

308

309

```scala

310

// Handle late data with watermarking

311

val result = inputStream

312

.withWatermark("eventTime", "10 minutes") // Allow 10 minutes of late data

313

.groupBy(

314

window($"eventTime", "5 minutes"),

315

$"deviceId"

316

)

317

.count()

318

319

// Watermark affects state cleanup

320

val stateStream = inputStream

321

.withWatermark("eventTime", "1 hour")

322

.groupBy($"sessionId")

323

.agg(

324

min("eventTime").as("sessionStart"),

325

max("eventTime").as("sessionEnd"),

326

count("*").as("eventCount")

327

)

328

```

329

330

### Spark Streaming (Legacy DStreams)

331

332

Original streaming API using discretized streams (micro-batches).

333

334

```scala { .api }

335

/**

336

* Main entry point for Spark Streaming

337

*/

338

class StreamingContext(conf: SparkConf, batchDuration: Duration) {

339

/** Create DStream from TCP socket */

340

def socketTextStream(

341

hostname: String,

342

port: Int,

343

storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2

344

): ReceiverInputDStream[String]

345

346

/** Monitor directory for new text files */

347

def textFileStream(directory: String): DStream[String]

348

349

/** Monitor directory for new files */

350

def fileStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag](

351

directory: String

352

): InputDStream[(K, V)]

353

354

/** Create DStream from RDD queue */

355

def queueStream[T: ClassTag](

356

queue: Queue[RDD[T]],

357

oneAtATime: Boolean = true

358

): InputDStream[T]

359

360

/** Union multiple DStreams */

361

def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T]

362

363

/** Start streaming computation */

364

def start(): Unit

365

/** Stop streaming */

366

def stop(stopSparkContext: Boolean = true, stopGracefully: Boolean = false): Unit

367

/** Wait for termination */

368

def awaitTermination(): Unit

369

def awaitTermination(timeout: Long): Boolean

370

/** Set remember duration for RDDs */

371

def remember(duration: Duration): Unit

372

/** Set checkpoint directory */

373

def checkpoint(directory: String): Unit

374

}

375

376

/**

377

* Discretized Stream - sequence of RDDs

378

*/

379

abstract class DStream[T: ClassTag] {

380

/** Transform each element */

381

def map[U: ClassTag](mapFunc: T => U): DStream[U]

382

/** Transform and flatten */

383

def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U]

384

/** Filter elements */

385

def filter(filterFunc: T => Boolean): DStream[T]

386

/** Reduce elements in each RDD */

387

def reduce(reduceFunc: (T, T) => T): DStream[T]

388

/** Count elements in each RDD */

389

def count(): DStream[Long]

390

/** Union with another DStream */

391

def union(that: DStream[T]): DStream[T]

392

393

/** Transform with function on RDD */

394

def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]

395

def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]

396

397

/** Window operations */

398

def window(windowDuration: Duration): DStream[T]

399

def window(windowDuration: Duration, slideDuration: Duration): DStream[T]

400

/** Reduce over window */

401

def reduceByWindow(

402

reduceFunc: (T, T) => T,

403

windowDuration: Duration,

404

slideDuration: Duration

405

): DStream[T]

406

/** Count over window */

407

def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]

408

409

/** Apply function to each RDD */

410

def foreachRDD(foreachFunc: RDD[T] => Unit): Unit

411

def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit

412

/** Print elements from each RDD */

413

def print(num: Int = 10): Unit

414

/** Save as text files */

415

def saveAsTextFiles(prefix: String, suffix: String = ""): Unit

416

417

/** Cache stream RDDs */

418

def cache(): DStream[T]

419

/** Persist stream RDDs */

420

def persist(level: StorageLevel = StorageLevel.MEMORY_ONLY_SER): DStream[T]

421

}

422

```

423

424

### DStream Operations for Key-Value Pairs

425

426

Additional operations for DStreams of key-value pairs.

427

428

```scala { .api }

429

/**

430

* Additional operations for pair DStreams

431

*/

432

implicit class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K, V)]) {

433

/** Group values by key */

434

def groupByKey(): DStream[(K, Iterable[V])]

435

def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]

436

437

/** Reduce values by key */

438

def reduceByKey(func: (V, V) => V): DStream[(K, V)]

439

def reduceByKey(func: (V, V) => V, numPartitions: Int): DStream[(K, V)]

440

441

/** Reduce values by key over window */

442

def reduceByKeyAndWindow(

443

reduceFunc: (V, V) => V,

444

windowDuration: Duration,

445

slideDuration: Duration

446

): DStream[(K, V)]

447

448

/** Join with another pair DStream */

449

def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]

450

def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))]

451

452

/** Left outer join */

453

def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]

454

455

/** Cogroup with another pair DStream */

456

def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]

457

458

/** Update state by key */

459

def updateStateByKey[S: ClassTag](

460

updateFunc: (Seq[V], Option[S]) => Option[S]

461

): DStream[(K, S)]

462

463

/** Map with state */

464

def mapWithState[StateType: ClassTag, MappedType: ClassTag](

465

spec: StateSpec[K, V, StateType, MappedType]

466

): MapWithStateDStream[K, V, StateType, MappedType]

467

}

468

```

469

470

**Usage Examples:**

471

472

```scala

473

import org.apache.spark.streaming._

474

import org.apache.spark.streaming.dstream._

475

476

// Create StreamingContext

477

val conf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]")

478

val ssc = new StreamingContext(conf, Seconds(1))

479

ssc.checkpoint("checkpoint")

480

481

// Socket stream

482

val lines = ssc.socketTextStream("localhost", 9999)

483

484

// Word count

485

val words = lines.flatMap(_.split(" "))

486

val pairs = words.map(word => (word, 1))

487

val wordCounts = pairs.reduceByKey(_ + _)

488

wordCounts.print()

489

490

// Windowed operations

491

val windowedWordCounts = pairs

492

.reduceByKeyAndWindow(

493

(a: Int, b: Int) => (a + b),

494

Seconds(30),

495

Seconds(10)

496

)

497

498

// Stateful operations

499

val runningCounts = pairs.updateStateByKey[Int] { (values, state) =>

500

val currentCount = values.sum

501

val previousCount = state.getOrElse(0)

502

Some(currentCount + previousCount)

503

}

504

505

// File stream

506

val textFiles = ssc.textFileStream("input-directory")

507

val processedFiles = textFiles

508

.flatMap(_.split("\n"))

509

.filter(_.nonEmpty)

510

.map(line => (line.split(",")(0), 1))

511

.reduceByKey(_ + _)

512

513

ssc.start()

514

ssc.awaitTermination()

515

```

516

517

### Time and Duration

518

519

Time handling utilities for streaming applications.

520

521

```scala { .api }

522

/**

523

* Duration for streaming intervals

524

*/

525

case class Duration(private val millis: Long) {

526

def +(other: Duration): Duration

527

def -(other: Duration): Duration

528

def *(times: Int): Duration

529

def /(that: Duration): Double

530

def <(other: Duration): Boolean

531

def <=(other: Duration): Boolean

532

def >(other: Duration): Boolean

533

def >=(other: Duration): Boolean

534

def milliseconds: Long

535

}

536

537

/** Duration factory methods */

538

object Duration {

539

def apply(length: Long): Duration

540

}

541

542

object Milliseconds {

543

def apply(milliseconds: Long): Duration

544

}

545

546

object Seconds {

547

def apply(seconds: Long): Duration

548

}

549

550

object Minutes {

551

def apply(minutes: Long): Duration

552

}

553

554

/**

555

* Time instance for DStreams

556

*/

557

case class Time(private val millis: Long) {

558

def +(other: Duration): Time

559

def -(other: Duration): Time

560

def -(other: Time): Duration

561

def <(other: Time): Boolean

562

def <=(other: Time): Boolean

563

def >(other: Time): Boolean

564

def >=(other: Time): Boolean

565

def milliseconds: Long

566

}

567

```

568

569

### Checkpointing and Fault Tolerance

570

571

Mechanisms for fault tolerance in streaming applications.

572

573

```scala { .api }

574

/**

575

* Checkpointing utilities

576

*/

577

object StreamingContext {

578

/** Create or recover StreamingContext from checkpoint */

579

def getOrCreate(

580

checkpointPath: String,

581

creatingFunc: () => StreamingContext

582

): StreamingContext

583

584

/** Create or recover with Hadoop configuration */

585

def getOrCreate(

586

checkpointPath: String,

587

creatingFunc: () => StreamingContext,

588

hadoopConf: Configuration

589

): StreamingContext

590

}

591

```

592

593

**Usage Examples:**

594

595

```scala

596

// Checkpointing example

597

def createStreamingContext(): StreamingContext = {

598

val conf = new SparkConf().setAppName("CheckpointApp")

599

val ssc = new StreamingContext(conf, Seconds(5))

600

ssc.checkpoint("hdfs://checkpoint")

601

602

// Define streaming computation

603

val lines = ssc.socketTextStream("localhost", 9999)

604

val words = lines.flatMap(_.split(" "))

605

val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

606

wordCounts.print()

607

608

ssc

609

}

610

611

// Recovery from checkpoint

612

val ssc = StreamingContext.getOrCreate("hdfs://checkpoint", createStreamingContext _)

613

ssc.start()

614

ssc.awaitTermination()

615

```

616

617

## Error Handling

618

619

Common streaming exceptions:

620

621

- `StreamingQueryException` - Streaming query execution failures

622

- `AnalysisException` - Invalid streaming operations or configurations

623

- `IllegalStateException` - Invalid streaming application state

624

- `TimeoutException` - Query termination timeouts