or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-engine.mdgraph-processing.mdindex.mdmachine-learning.mdsql-dataframes.mdstream-processing.md

stream-processing.mddocs/

0

# Stream Processing

1

2

Apache Spark provides stream processing capabilities through two APIs: Structured Streaming (the modern API) built on DataFrames/Datasets, and the legacy DStreams API. Structured Streaming is the recommended approach for new applications.

3

4

## Package Information

5

6

Stream processing functionality is available through:

7

8

```scala

9

// Structured Streaming (recommended)

10

import org.apache.spark.sql.streaming._

11

import org.apache.spark.sql.functions._

12

13

// Legacy DStreams API (maintenance mode)

14

import org.apache.spark.streaming._

15

import org.apache.spark.streaming.dstream._

16

```

17

18

## Basic Usage

19

20

### Structured Streaming

21

22

```scala

23

import org.apache.spark.sql.SparkSession

24

import org.apache.spark.sql.streaming.Trigger

25

import org.apache.spark.sql.functions._

26

27

val spark = SparkSession.builder()

28

.appName("Structured Streaming Example")

29

.master("local[*]")

30

.getOrCreate()

31

32

// Read streaming data

33

val lines = spark.readStream

34

.format("socket")

35

.option("host", "localhost")

36

.option("port", 9999)

37

.load()

38

39

// Process streaming data

40

val words = lines.as[String].flatMap(_.split(" "))

41

val wordCounts = words.groupBy("value").count()

42

43

// Write streaming results

44

val query = wordCounts.writeStream

45

.outputMode("complete")

46

.format("console")

47

.trigger(Trigger.ProcessingTime("10 seconds"))

48

.start()

49

50

query.awaitTermination()

51

```

52

53

### DStreams (Legacy)

54

55

```scala

56

import org.apache.spark.streaming._

57

import org.apache.spark.streaming.dstream._

58

59

val ssc = new StreamingContext(sc, Seconds(10))

60

61

// Create DStream

62

val lines = ssc.socketTextStream("localhost", 9999)

63

64

// Process DStream

65

val words = lines.flatMap(_.split(" "))

66

val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

67

68

// Output results

69

wordCounts.print()

70

71

// Start streaming computation

72

ssc.start()

73

ssc.awaitTermination()

74

```

75

76

## Capabilities

77

78

### Structured Streaming

79

80

The modern streaming API built on the DataFrame/Dataset API, providing end-to-end exactly-once guarantees.

81

82

#### DataStreamReader

83

84

Interface for loading streaming DataFrames.

85

86

```scala { .api }

87

class DataStreamReader {

88

// Configuration

89

def format(source: String): DataStreamReader

90

def schema(schema: StructType): DataStreamReader

91

def schema(schemaString: String): DataStreamReader

92

def option(key: String, value: String): DataStreamReader

93

def option(key: String, value: Boolean): DataStreamReader

94

def option(key: String, value: Long): DataStreamReader

95

def option(key: String, value: Double): DataStreamReader

96

def options(options: scala.collection.Map[String, String]): DataStreamReader

97

def options(options: java.util.Map[String, String]): DataStreamReader

98

99

// Data sources

100

def csv(path: String): DataFrame

101

def json(path: String): DataFrame

102

def parquet(path: String): DataFrame

103

def orc(path: String): DataFrame

104

def text(path: String): DataFrame

105

def textFile(path: String): Dataset[String]

106

def table(tableName: String): DataFrame

107

108

// Generic load

109

def load(): DataFrame

110

def load(path: String): DataFrame

111

}

112

```

113

114

#### DataStreamWriter

115

116

Interface for writing streaming DataFrames.

117

118

```scala { .api }

119

class DataStreamWriter[T] {

120

// Configuration

121

def outputMode(outputMode: OutputMode): DataStreamWriter[T]

122

def outputMode(outputMode: String): DataStreamWriter[T]

123

def trigger(trigger: Trigger): DataStreamWriter[T]

124

def format(source: String): DataStreamWriter[T]

125

def option(key: String, value: String): DataStreamWriter[T]

126

def option(key: String, value: Boolean): DataStreamWriter[T]

127

def option(key: String, value: Long): DataStreamWriter[T]

128

def option(key: String, value: Double): DataStreamWriter[T]

129

def options(options: scala.collection.Map[String, String]): DataStreamWriter[T]

130

def options(options: java.util.Map[String, String]): DataStreamWriter[T]

131

def partitionBy(colNames: String*): DataStreamWriter[T]

132

def queryName(queryName: String): DataStreamWriter[T]

133

134

// Data sinks

135

def foreach(writer: ForeachWriter[T]): DataStreamWriter[T]

136

def foreachBatch(function: (Dataset[T], Long) => Unit): DataStreamWriter[T]

137

def console(): DataStreamWriter[T]

138

def console(numRows: Int): DataStreamWriter[T]

139

def console(numRows: Int, truncate: Boolean): DataStreamWriter[T]

140

141

// File sinks

142

def csv(path: String): DataStreamWriter[T]

143

def json(path: String): DataStreamWriter[T]

144

def parquet(path: String): DataStreamWriter[T]

145

def orc(path: String): DataStreamWriter[T]

146

def text(path: String): DataStreamWriter[T]

147

148

// Table sinks

149

def table(tableName: String): DataStreamWriter[T]

150

def toTable(tableName: String): DataStreamWriter[T]

151

152

// Generic start

153

def start(): StreamingQuery

154

def start(path: String): StreamingQuery

155

}

156

```

157

158

#### StreamingQuery

159

160

Handle to a running streaming query.

161

162

```scala { .api }

163

trait StreamingQuery {

164

// Query control

165

def start(): StreamingQuery

166

def stop(): Unit

167

def awaitTermination(): Unit

168

def awaitTermination(timeoutMs: Long): Boolean

169

def processAllAvailable(): Unit

170

171

// Query properties

172

def id: UUID

173

def runId: UUID

174

def name: String

175

def explain(): Unit

176

def explain(extended: Boolean): Unit

177

178

// Query status

179

def isActive: Boolean

180

def exception: Option[StreamingQueryException]

181

def status: StreamingQueryStatus

182

def recentProgress: Array[StreamingQueryProgress]

183

def lastProgress: StreamingQueryProgress

184

}

185

```

186

187

#### StreamingQueryManager

188

189

Manager for streaming queries.

190

191

```scala { .api }

192

class StreamingQueryManager {

193

// Query management

194

def active: Array[StreamingQuery]

195

def get(id: UUID): StreamingQuery

196

def get(id: String): StreamingQuery

197

def resetTerminated(): Unit

198

199

// Waiting

200

def awaitAnyTermination(): Unit

201

def awaitAnyTermination(timeoutMs: Long): Boolean

202

203

// Listeners

204

def addListener(listener: StreamingQueryListener): Unit

205

def removeListener(listener: StreamingQueryListener): Unit

206

}

207

```

208

209

#### Output Modes

210

211

```scala { .api }

212

object OutputMode {

213

val Append: OutputMode

214

val Complete: OutputMode

215

val Update: OutputMode

216

}

217

```

218

219

#### Triggers

220

221

```scala { .api }

222

sealed trait Trigger

223

224

object Trigger {

225

def ProcessingTime(interval: String): Trigger

226

def ProcessingTime(interval: Duration): Trigger

227

def ProcessingTime(interval: Long, unit: TimeUnit): Trigger

228

def Once(): Trigger

229

def Continuous(interval: String): Trigger

230

def Continuous(interval: Duration): Trigger

231

def AvailableNow(): Trigger

232

}

233

```

234

235

#### ForeachWriter

236

237

Custom sink for streaming data.

238

239

```scala { .api }

240

abstract class ForeachWriter[T] extends Serializable {

241

def open(partitionId: Long, epochId: Long): Boolean

242

def process(value: T): Unit

243

def close(errorOrNull: Throwable): Unit

244

}

245

```

246

247

Usage example:

248

249

```scala

250

val customWriter = new ForeachWriter[Row] {

251

def open(partitionId: Long, epochId: Long): Boolean = {

252

// Initialize connection

253

true

254

}

255

256

def process(value: Row): Unit = {

257

// Process each row

258

println(s"Processing: ${value.toString}")

259

}

260

261

def close(errorOrNull: Throwable): Unit = {

262

// Clean up resources

263

}

264

}

265

266

val query = df.writeStream

267

.foreach(customWriter)

268

.start()

269

```

270

271

#### Windowing Operations

272

273

```scala { .api }

274

// Available through functions object

275

def window(timeColumn: Column, windowDuration: String): Column

276

def window(timeColumn: Column, windowDuration: String, slideDuration: String): Column

277

def window(timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String): Column

278

279

def session_window(timeColumn: Column, gapDuration: String): Column

280

def session_window(timeColumn: Column, gapDuration: Column): Column

281

```

282

283

Usage example:

284

285

```scala

286

import org.apache.spark.sql.functions._

287

288

// Tumbling window

289

val windowedCounts = df

290

.withWatermark("timestamp", "10 minutes")

291

.groupBy(window(col("timestamp"), "10 minutes"))

292

.count()

293

294

// Sliding window

295

val slidingWindowCounts = df

296

.withWatermark("timestamp", "10 minutes")

297

.groupBy(window(col("timestamp"), "10 minutes", "5 minutes"))

298

.count()

299

300

// Session window

301

val sessionCounts = df

302

.withWatermark("timestamp", "10 minutes")

303

.groupBy(session_window(col("timestamp"), "5 minutes"))

304

.count()

305

```

306

307

#### Watermarking

308

309

```scala { .api }

310

// Available on Dataset

311

def withWatermark(eventTime: String, delayThreshold: String): Dataset[T]

312

```

313

314

Usage example:

315

316

```scala

317

val watermarkedDF = df

318

.withWatermark("eventTime", "2 minutes")

319

.groupBy(window($"eventTime", "10 minutes"))

320

.count()

321

```

322

323

### Legacy DStreams API

324

325

The original streaming API based on RDDs (now in maintenance mode).

326

327

#### StreamingContext

328

329

The main entry point for DStreams functionality.

330

331

```scala { .api }

332

class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) {

333

def this(conf: SparkConf, batchDuration: Duration)

334

def this(path: String, hadoopConf: Configuration = new Configuration())

335

336

// Input sources

337

def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]

338

def textFileStream(directory: String): DStream[String]

339

def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): InputDStream[(K, V)]

340

def queueStream[T: ClassTag](queue: Queue[RDD[T]], oneAtATime: Boolean = true): InputDStream[T]

341

def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]

342

343

// Union

344

def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T]

345

346

// Control

347

def start(): Unit

348

def stop(stopSparkContext: Boolean = true, stopGracefully: Boolean = false): Unit

349

def awaitTermination(): Unit

350

def awaitTerminationOrTimeout(timeout: Long): Boolean

351

352

// State

353

def remember(duration: Duration): Unit

354

def checkpoint(directory: String): Unit

355

356

// Properties

357

def sparkContext: SparkContext

358

def graph: DStreamGraph

359

def getState(): StreamingContextState

360

}

361

362

object StreamingContext {

363

def getOrCreate(checkpointPath: String, creatingFunc: () => StreamingContext): StreamingContext

364

def getActive(): Option[StreamingContext]

365

def getActiveOrCreate(checkpointPath: String, creatingFunc: () => StreamingContext): StreamingContext

366

}

367

```

368

369

#### DStream

370

371

The fundamental abstraction in Spark Streaming representing a continuous stream of data.

372

373

```scala { .api }

374

abstract class DStream[T: ClassTag] extends Serializable {

375

// Transformations

376

def map[U: ClassTag](mapFunc: T => U): DStream[U]

377

def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U]

378

def filter(filterFunc: T => Boolean): DStream[T]

379

def glom(): DStream[Array[T]]

380

def repartition(numPartitions: Int): DStream[T]

381

def union(that: DStream[T]): DStream[T]

382

def count(): DStream[Long]

383

def countByValue(): DStream[(T, Long)]

384

def reduce(reduceFunc: (T, T) => T): DStream[T]

385

def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]

386

def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]

387

def reduceByWindow(reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]

388

389

// Window operations

390

def window(windowDuration: Duration): DStream[T]

391

def window(windowDuration: Duration, slideDuration: Duration): DStream[T]

392

393

// Actions

394

def print(): Unit

395

def print(num: Int): Unit

396

def saveAsTextFiles(prefix: String, suffix: String = ""): Unit

397

def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit

398

def foreachRDD(foreachFunc: RDD[T] => Unit): Unit

399

def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit

400

401

// State operations

402

def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]

403

def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S], numPartitions: Int): DStream[(K, S)]

404

def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner): DStream[(K, S)]

405

406

// Persistence

407

def cache(): DStream[T]

408

def persist(): DStream[T]

409

def persist(level: StorageLevel): DStream[T]

410

411

// Output

412

def register(): DStream[T]

413

414

// Properties

415

def context: StreamingContext

416

def ssc: StreamingContext

417

def slideDuration: Duration

418

}

419

```

420

421

#### PairDStreamFunctions

422

423

Additional operations for DStreams of key-value pairs.

424

425

```scala { .api }

426

class PairDStreamFunctions[K, V](self: DStream[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V]) {

427

// Transformations

428

def groupByKey(): DStream[(K, Iterable[V])]

429

def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]

430

def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])]

431

def reduceByKey(func: (V, V) => V): DStream[(K, V)]

432

def reduceByKey(func: (V, V) => V, numPartitions: Int): DStream[(K, V)]

433

def reduceByKey(func: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]

434

def combineByKey[C: ClassTag](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C): DStream[(K, C)]

435

def combineByKey[C: ClassTag](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, numPartitions: Int): DStream[(K, C)]

436

def combineByKey[C: ClassTag](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, partitioner: Partitioner): DStream[(K, C)]

437

438

// Window operations

439

def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]

440

def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]

441

def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int): DStream[(K, Iterable[V])]

442

def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner): DStream[(K, Iterable[V])]

443

def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]

444

def reduceByKeyAndWindow(reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]

445

def reduceByKeyAndWindow(reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration, numPartitions: Int, filterFunc: ((K, V)) => Boolean = null): DStream[(K, V)]

446

447

// Join operations

448

def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]

449

def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))]

450

def join[W: ClassTag](other: DStream[(K, W)], partitioner: Partitioner): DStream[(K, (V, W))]

451

def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]

452

def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]

453

def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))]

454

def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]

455

456

// State operations

457

def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]

458

def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S], numPartitions: Int): DStream[(K, S)]

459

def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner): DStream[(K, S)]

460

def mapWithState[StateType: ClassTag, MappedType: ClassTag](spec: StateSpec[K, V, StateType, MappedType]): MapWithStateDStream[K, V, StateType, MappedType]

461

462

// Output

463

def saveAsHadoopFiles[F <: OutputFormat[K, V]](prefix: String, suffix: String = ""): Unit

464

def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](prefix: String, suffix: String = ""): Unit

465

}

466

```

467

468

#### Input Sources

469

470

Various built-in input sources for DStreams.

471

472

```scala { .api }

473

// File-based sources

474

object StreamingContext {

475

def textFileStream(directory: String): DStream[String]

476

def binaryRecordsStream(directory: String, recordLength: Int): DStream[Array[Byte]]

477

}

478

479

// Network sources

480

object StreamingContext {

481

def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]

482

def socketStream[T: ClassTag](hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel): ReceiverInputDStream[T]

483

}

484

485

// Queue source (for testing)

486

object StreamingContext {

487

def queueStream[T: ClassTag](queue: Queue[RDD[T]], oneAtATime: Boolean = true, defaultRDD: RDD[T] = null): InputDStream[T]

488

}

489

```

490

491

#### Custom Receivers

492

493

```scala { .api }

494

abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {

495

def onStart(): Unit

496

def onStop(): Unit

497

def store(dataItem: T): Unit

498

def store(dataBuffer: ArrayBuffer[T]): Unit

499

def store(dataIterator: Iterator[T]): Unit

500

def store(dataBuffer: ArrayBuffer[T], metadata: Any): Unit

501

def store(dataIterator: Iterator[T], metadata: Any): Unit

502

def reportError(message: String, throwable: Throwable): Unit

503

def restart(message: String): Unit

504

def restart(message: String, error: Throwable): Unit

505

def restart(message: String, error: Throwable, millisecond: Int): Unit

506

def stop(message: String): Unit

507

def stop(message: String, error: Throwable): Unit

508

def isStarted(): Boolean

509

def isStopped(): Boolean

510

def hasStopped(): Boolean

511

}

512

```

513

514

Usage examples:

515

516

```scala

517

// Structured Streaming: File source

518

val fileStream = spark.readStream

519

.format("json")

520

.option("path", "/path/to/json/files")

521

.load()

522

523

val query = fileStream.writeStream

524

.format("console")

525

.outputMode("append")

526

.start()

527

528

// Structured Streaming: Kafka source

529

val kafkaStream = spark.readStream

530

.format("kafka")

531

.option("kafka.bootstrap.servers", "localhost:9092")

532

.option("subscribe", "topic1")

533

.load()

534

535

val parsedStream = kafkaStream

536

.select(from_json(col("value").cast("string"), schema).as("data"))

537

.select("data.*")

538

539

// DStreams: Custom receiver

540

class CustomReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {

541

def onStart() {

542

// Start receiving data

543

new Thread("Custom Receiver") {

544

override def run() { receive() }

545

}.start()

546

}

547

548

def onStop() {

549

// Stop receiving data

550

}

551

552

private def receive() {

553

while (!isStopped()) {

554

// Receive data and store it

555

val data = // ... get data from somewhere

556

store(data)

557

}

558

}

559

}

560

561

val customStream = ssc.receiverStream(new CustomReceiver())

562

```