or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-launcher.mdcore-engine.mdgraph-processing.mdindex.mdmachine-learning.mdsql-dataframes.mdstream-processing.md

stream-processing.mddocs/

0

# Stream Processing

1

2

Spark Streaming provides scalable, high-throughput, fault-tolerant stream processing of live data streams. It supports both the legacy DStream API and the newer Structured Streaming approach.

3

4

## StreamingContext (Legacy DStream API)

5

6

The main entry point for Spark Streaming functionality using the DStream API.

7

8

```scala { .api }

9

class StreamingContext private[streaming] (

10

_sc: SparkContext,

11

_cp: Checkpoint,

12

_batchDur: Duration) extends Logging {

13

14

// Constructors (via companion object)

15

// new StreamingContext(sparkContext: SparkContext, batchDuration: Duration)

16

// new StreamingContext(conf: SparkConf, batchDuration: Duration)

17

// new StreamingContext(path: String, hadoopConf: Configuration)

18

19

// Input stream creation

20

def socketTextStream(hostname: String, port: Int,

21

storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]

22

def socketStream[T: ClassTag](hostname: String, port: Int,

23

converter: (InputStream) => Iterator[T],

24

storageLevel: StorageLevel): ReceiverInputDStream[T]

25

def rawSocketStream[T: ClassTag](hostname: String, port: Int,

26

storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[T]

27

28

def textFileStream(directory: String): DStream[String]

29

def fileStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag](directory: String): InputDStream[(K, V)]

30

def binaryRecordsStream(directory: String, recordLength: Int): DStream[Array[Byte]]

31

32

def queueStream[T: ClassTag](queue: Queue[RDD[T]], oneAtATime: Boolean = true,

33

defaultRDD: RDD[T] = null): InputDStream[T]

34

def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]

35

def actorStream[T: ClassTag](props: Props, name: String,

36

storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[T]

37

def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T]

38

def transform[T: ClassTag](dstreams: Seq[DStream[_]],

39

transformFunc: (Seq[RDD[_]], Time) => RDD[T]): DStream[T]

40

41

// Execution control

42

def start(): Unit

43

def stop(stopSparkContext: Boolean = conf.getBoolean("spark.streaming.stopSparkContextByDefault", true),

44

stopGracefully: Boolean = false): Unit

45

def awaitTermination(): Unit

46

def awaitTerminationOrTimeout(timeout: Long): Boolean

47

48

// Configuration and state

49

def remember(duration: Duration): Unit

50

def checkpoint(directory: String): Unit

51

def sparkContext: SparkContext

52

def graph: DStreamGraph

53

def getState(): StreamingContextState

54

55

// Properties

56

def batchDuration: Duration

57

}

58

59

object StreamingContext extends Logging {

60

def getOrCreate(checkpointPath: String, creatingFunc: () => StreamingContext,

61

hadoopConf: Configuration = SparkHadoopUtil.get.conf): StreamingContext

62

def getActiveOrCreate(checkpointPath: String, creatingFunc: () => StreamingContext,

63

hadoopConf: Configuration = SparkHadoopUtil.get.conf): StreamingContext

64

}

65

```

66

67

### Usage Examples

68

69

```scala

70

import org.apache.spark.streaming.{StreamingContext, Seconds}

71

import org.apache.spark.SparkConf

72

73

// Create StreamingContext

74

val conf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]")

75

val ssc = new StreamingContext(conf, Seconds(1))

76

77

// Set checkpoint directory

78

ssc.checkpoint("hdfs://checkpoint")

79

80

// Create input streams

81

val lines = ssc.socketTextStream("localhost", 9999)

82

val fileStream = ssc.textFileStream("hdfs://input")

83

84

// Start processing

85

ssc.start()

86

ssc.awaitTermination()

87

```

88

89

## DStream (Discretized Stream)

90

91

The basic abstraction in Spark Streaming representing a continuous stream of data.

92

93

```scala { .api }

94

abstract class DStream[T: ClassTag](@transient private var ssc: StreamingContext) extends Serializable with Logging {

95

96

// Basic transformations

97

def map[U: ClassTag](mapFunc: T => U): DStream[U]

98

def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U]

99

def filter(filterFunc: T => Boolean): DStream[T]

100

def mapPartitions[U: ClassTag](mapPartFunc: Iterator[T] => Iterator[U],

101

preservePartitioning: Boolean = false): DStream[U]

102

def mapPartitionsWithIndex[U: ClassTag](mapPartFunc: (Int, Iterator[T]) => Iterator[U],

103

preservePartitioning: Boolean = false): DStream[U]

104

105

// Reduce operations

106

def reduce(reduceFunc: (T, T) => T): DStream[T]

107

def count(): DStream[Long]

108

def countByValue()(implicit ord: Ordering[T] = null): DStream[(T, Long)]

109

def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration)

110

(implicit ord: Ordering[T] = null): DStream[(T, Long)]

111

112

// Pair DStream operations (available when T is (K, V))

113

def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] // when T = (K, V)

114

def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)]

115

def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]

116

def groupByKey(): DStream[(K, Iterable[V])] // when T = (K, V)

117

def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]

118

def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])]

119

def combineByKey[C: ClassTag](createCombiner: V => C, mergeValue: (C, V) => C,

120

mergeCombiners: (C, C) => C): DStream[(K, C)]

121

def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]

122

def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S],

123

numPartitions: Int): DStream[(K, S)]

124

def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S],

125

partitioner: Partitioner,

126

initialRDD: RDD[(K, S)]): DStream[(K, S)]

127

128

// Join operations

129

def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] // when T = (K, V)

130

def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))]

131

def join[W: ClassTag](other: DStream[(K, W)], partitioner: Partitioner): DStream[(K, (V, W))]

132

def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]

133

def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]

134

def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))]

135

def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]

136

137

// Set operations

138

def union(that: DStream[T]): DStream[T]

139

140

// Window operations

141

def window(windowDuration: Duration, slideDuration: Duration = this.slideDuration): DStream[T]

142

def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]

143

def reduceByWindow(reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T,

144

windowDuration: Duration, slideDuration: Duration): DStream[T]

145

def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]

146

def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])] // when T = (K, V)

147

def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]

148

def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration,

149

numPartitions: Int): DStream[(K, Iterable[V])]

150

def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration,

151

partitioner: Partitioner): DStream[(K, Iterable[V])]

152

def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration): DStream[(K, V)]

153

def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration,

154

slideDuration: Duration): DStream[(K, V)]

155

def reduceByKeyAndWindow(reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V,

156

windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]

157

158

// Transformation operations

159

def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]

160

def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]

161

def transformWith[U: ClassTag, W: ClassTag](other: DStream[U],

162

transformFunc: (RDD[T], RDD[U]) => RDD[W]): DStream[W]

163

def transformWith[U: ClassTag, W: ClassTag](other: DStream[U],

164

transformFunc: (RDD[T], RDD[U], Time) => RDD[W]): DStream[W]

165

166

// Output operations (actions)

167

def print(): Unit

168

def print(num: Int): Unit

169

def saveAsTextFiles(prefix: String, suffix: String = ""): Unit

170

def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit

171

def saveAsHadoopFiles[F <: OutputFormat[K, V]](prefix: String, suffix: String = ""): Unit

172

def foreach(foreachFunc: RDD[T] => Unit): Unit

173

def foreachRDD(foreachFunc: RDD[T] => Unit): Unit

174

def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit

175

176

// Caching

177

def cache(): DStream[T]

178

def persist(): DStream[T]

179

def persist(level: StorageLevel): DStream[T]

180

181

// Checkpointing

182

def checkpoint(interval: Duration): DStream[T]

183

184

// Properties

185

def context: StreamingContext

186

def ssc: StreamingContext

187

def slideDuration: Duration

188

def dependencies: List[DStream[_]]

189

def generatedRDDs: HashMap[Time, RDD[T]]

190

def rememberDuration: Duration

191

}

192

```

193

194

### Usage Examples

195

196

```scala

197

import org.apache.spark.streaming.{StreamingContext, Seconds}

198

199

val ssc = new StreamingContext(conf, Seconds(1))

200

201

// Basic transformations

202

val lines = ssc.socketTextStream("localhost", 9999)

203

val words = lines.flatMap(_.split(" "))

204

val pairs = words.map(word => (word, 1))

205

val wordCounts = pairs.reduceByKey(_ + _)

206

207

// Window operations

208

val windowedWordCounts = pairs.reduceByKeyAndWindow(

209

reduceFunc = _ + _,

210

windowDuration = Seconds(30),

211

slideDuration = Seconds(10)

212

)

213

214

// Stateful operations

215

val runningCounts = pairs.updateStateByKey[Int] { (values: Seq[Int], state: Option[Int]) =>

216

val currentCount = values.sum

217

val previousCount = state.getOrElse(0)

218

Some(currentCount + previousCount)

219

}

220

221

// Output operations

222

wordCounts.print()

223

wordCounts.saveAsTextFiles("hdfs://output")

224

wordCounts.foreachRDD { rdd =>

225

rdd.foreach(println)

226

}

227

228

ssc.start()

229

ssc.awaitTermination()

230

```

231

232

## Structured Streaming (Recommended Approach)

233

234

Structured Streaming provides a scalable and fault-tolerant stream processing engine built on the Spark SQL engine.

235

236

### DataStreamReader

237

238

```scala { .api }

239

class DataStreamReader private[sql](sparkSession: SparkSession) {

240

// Format specification

241

def format(source: String): DataStreamReader

242

243

// Options

244

def option(key: String, value: String): DataStreamReader

245

def option(key: String, value: Boolean): DataStreamReader

246

def option(key: String, value: Long): DataStreamReader

247

def option(key: String, value: Double): DataStreamReader

248

def options(options: scala.collection.Map[String, String]): DataStreamReader

249

def options(options: java.util.Map[String, String]): DataStreamReader

250

251

// Schema

252

def schema(schema: StructType): DataStreamReader

253

def schema(schemaString: String): DataStreamReader

254

255

// Loading methods

256

def load(): DataFrame

257

def load(path: String): DataFrame

258

259

// Format-specific methods

260

def csv(path: String): DataFrame

261

def json(path: String): DataFrame

262

def parquet(path: String): DataFrame

263

def text(path: String): DataFrame

264

def textFile(path: String): Dataset[String]

265

def table(tableName: String): DataFrame

266

267

// Source-specific methods

268

def socket(host: String, port: Int): DataFrame

269

def rate(rowsPerSecond: Long): DataFrame

270

}

271

```

272

273

### DataStreamWriter

274

275

```scala { .api }

276

class DataStreamWriter[T] private[sql](ds: Dataset[T]) {

277

// Output mode

278

def outputMode(outputMode: OutputMode): DataStreamWriter[T]

279

def outputMode(outputMode: String): DataStreamWriter[T]

280

281

// Trigger specification

282

def trigger(trigger: Trigger): DataStreamWriter[T]

283

284

// Format specification

285

def format(source: String): DataStreamWriter[T]

286

287

// Options

288

def option(key: String, value: String): DataStreamWriter[T]

289

def option(key: String, value: Boolean): DataStreamWriter[T]

290

def option(key: String, value: Long): DataStreamWriter[T]

291

def option(key: String, value: Double): DataStreamWriter[T]

292

def options(options: scala.collection.Map[String, String]): DataStreamWriter[T]

293

def options(options: java.util.Map[String, String]): DataStreamWriter[T]

294

295

// Partitioning

296

def partitionBy(colNames: String*): DataStreamWriter[T]

297

298

// Query naming and checkpointing

299

def queryName(queryName: String): DataStreamWriter[T]

300

def option(key: String, value: String): DataStreamWriter[T]

301

302

// Output methods

303

def start(): StreamingQuery

304

def start(path: String): StreamingQuery

305

def toTable(tableName: String): StreamingQuery

306

307

// Format-specific methods

308

def console(): StreamingQuery

309

def csv(path: String): StreamingQuery

310

def json(path: String): StreamingQuery

311

def parquet(path: String): StreamingQuery

312

def text(path: String): StreamingQuery

313

314

// Foreach operations

315

def foreach(writer: ForeachWriter[T]): StreamingQuery

316

def foreachBatch(function: (Dataset[T], Long) => Unit): StreamingQuery

317

def foreachBatch(function: VoidFunction2[Dataset[T], java.lang.Long]): StreamingQuery

318

}

319

```

320

321

### StreamingQuery

322

323

```scala { .api }

324

trait StreamingQuery {

325

def name: String

326

def id: UUID

327

def runId: UUID

328

def sparkSession: SparkSession

329

def isActive: Boolean

330

def exception: Option[StreamingQueryException]

331

def status: StreamingQueryStatus

332

def recentProgress: Array[StreamingQueryProgress]

333

def lastProgress: StreamingQueryProgress

334

def awaitTermination(): Unit

335

def awaitTermination(timeoutMs: Long): Boolean

336

def processAllAvailable(): Unit

337

def stop(): Unit

338

def explain(): Unit

339

def explain(extended: Boolean): Unit

340

}

341

```

342

343

### StreamingQueryManager

344

345

```scala { .api }

346

class StreamingQueryManager private[sql](sparkSession: SparkSession) {

347

def active: Array[StreamingQuery]

348

def get(id: UUID): StreamingQuery

349

def get(name: String): StreamingQuery

350

def awaitAnyTermination(): Unit

351

def awaitAnyTermination(timeoutMs: Long): Boolean

352

def resetTerminated(): Unit

353

def addListener(listener: StreamingQueryListener): Unit

354

def removeListener(listener: StreamingQueryListener): Unit

355

}

356

```

357

358

### Triggers

359

360

```scala { .api }

361

sealed trait Trigger

362

363

object Trigger {

364

def ProcessingTime(interval: String): Trigger

365

def ProcessingTime(interval: Long, unit: TimeUnit): Trigger

366

def ProcessingTime(interval: Duration): Trigger

367

def Once(): Trigger

368

def Continuous(interval: String): Trigger

369

def Continuous(interval: Long, unit: TimeUnit): Trigger

370

def Continuous(interval: Duration): Trigger

371

def AvailableNow(): Trigger

372

}

373

```

374

375

### Output Modes

376

377

```scala { .api }

378

sealed trait OutputMode

379

380

object OutputMode {

381

val Append: OutputMode

382

val Complete: OutputMode

383

val Update: OutputMode

384

}

385

```

386

387

### Usage Examples

388

389

```scala

390

import org.apache.spark.sql.SparkSession

391

import org.apache.spark.sql.streaming.Trigger

392

import org.apache.spark.sql.functions._

393

394

val spark = SparkSession.builder()

395

.appName("StructuredStreaming")

396

.master("local[*]")

397

.getOrCreate()

398

399

import spark.implicits._

400

401

// Read from socket

402

val lines = spark.readStream

403

.format("socket")

404

.option("host", "localhost")

405

.option("port", 9999)

406

.load()

407

408

// Word count with watermarking

409

val words = lines.as[String].flatMap(_.split(" "))

410

val wordCounts = words

411

.withColumn("timestamp", current_timestamp())

412

.withWatermark("timestamp", "10 minutes")

413

.groupBy(window($"timestamp", "10 minutes", "5 minutes"), $"value")

414

.count()

415

416

// Write to console

417

val query = wordCounts.writeStream

418

.outputMode("update")

419

.format("console")

420

.trigger(Trigger.ProcessingTime("10 seconds"))

421

.start()

422

423

query.awaitTermination()

424

425

// Read from files

426

val csvDF = spark.readStream

427

.option("sep", ",")

428

.schema(userSchema)

429

.csv("path/to/directory")

430

431

// Write to parquet with checkpointing

432

val fileQuery = csvDF.writeStream

433

.format("parquet")

434

.option("checkpointLocation", "path/to/checkpoint/dir")

435

.option("path", "path/to/destination/dir")

436

.trigger(Trigger.ProcessingTime("1 minute"))

437

.start()

438

439

// Foreach batch processing

440

csvDF.writeStream

441

.foreachBatch { (batchDF: DataFrame, batchId: Long) =>

442

batchDF.write

443

.mode("append")

444

.saveAsTable("my_table")

445

}

446

.start()

447

```

448

449

## Custom Receivers (Legacy DStream)

450

451

```scala { .api }

452

abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {

453

def onStart(): Unit

454

def onStop(): Unit

455

def store(dataItem: T): Unit

456

def store(dataBuffer: ArrayBuffer[T]): Unit

457

def store(dataIterator: Iterator[T]): Unit

458

def store(dataBuffer: ArrayBuffer[T], metadata: Any): Unit

459

def store(dataIterator: Iterator[T], metadata: Any): Unit

460

def reportError(message: String, throwable: Throwable): Unit

461

def restart(message: String): Unit

462

def restart(message: String, error: Throwable): Unit

463

def restart(message: String, error: Throwable, millisecond: Int): Unit

464

def stop(message: String): Unit

465

def stop(message: String, error: Throwable): Unit

466

def isStarted(): Boolean

467

def isStopped(): Boolean

468

def hasError(): Boolean

469

def preferredLocation: Option[String]

470

}

471

```

472

473

### Usage Examples

474

475

```scala

476

import org.apache.spark.streaming.receiver.Receiver

477

import org.apache.spark.storage.StorageLevel

478

import java.io._

479

import java.net._

480

481

class CustomReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {

482

def onStart() {

483

new Thread("Socket Receiver") {

484

override def run() { receive() }

485

}.start()

486

}

487

488

def onStop() {

489

// Clean up resources

490

}

491

492

private def receive() {

493

var socket: Socket = null

494

var userInput: String = null

495

try {

496

socket = new Socket(host, port)

497

val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))

498

userInput = reader.readLine()

499

while(!isStopped && userInput != null) {

500

store(userInput)

501

userInput = reader.readLine()

502

}

503

reader.close()

504

socket.close()

505

restart("Trying to connect again")

506

} catch {

507

case e: java.net.ConnectException =>

508

restart("Error connecting to " + host + ":" + port, e)

509

case t: Throwable =>

510

restart("Error receiving data", t)

511

}

512

}

513

}

514

515

// Use custom receiver

516

val customStream = ssc.receiverStream(new CustomReceiver("localhost", 9999))

517

```

518

519

## Duration

520

521

Time duration utilities for streaming applications.

522

523

```scala { .api }

524

case class Duration(private val millis: Long) extends Serializable {

525

def milliseconds: Long = millis

526

def +(that: Duration): Duration

527

def -(that: Duration): Duration

528

def *(times: Int): Duration

529

def /(that: Duration): Double

530

def <(that: Duration): Boolean

531

def <=(that: Duration): Boolean

532

def >(that: Duration): Boolean

533

def >=(that: Duration): Boolean

534

def isMultipleOf(that: Duration): Boolean

535

def min(that: Duration): Duration

536

def max(that: Duration): Duration

537

def isZero: Boolean

538

override def toString: String

539

def prettyPrint: String

540

}

541

542

object Duration {

543

val ZERO = new Duration(0)

544

def apply(length: Long, unit: TimeUnit): Duration

545

}

546

547

object Milliseconds {

548

def apply(milliseconds: Long): Duration

549

}

550

551

object Seconds {

552

def apply(seconds: Long): Duration

553

}

554

555

object Minutes {

556

def apply(minutes: Long): Duration

557

}

558

```

559

560

### Usage Examples

561

562

```scala

563

import org.apache.spark.streaming.{Duration, Seconds, Minutes, Milliseconds}

564

565

// Creating durations

566

val batchInterval = Seconds(1)

567

val windowDuration = Minutes(5)

568

val slideDuration = Seconds(30)

569

val timeout = Milliseconds(500)

570

571

// Duration arithmetic

572

val totalTime = batchInterval * 10

573

val remaining = windowDuration - slideDuration

574

val isMultiple = windowDuration.isMultipleOf(slideDuration)

575

576

// Using in streaming operations

577

val ssc = new StreamingContext(conf, batchInterval)

578

val windowedCounts = pairs.reduceByKeyAndWindow(_ + _, windowDuration, slideDuration)

579

```