or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

caching-persistence.mdcore-rdd.mddata-sources.mdgraphx.mdindex.mdjava-api.mdkey-value-operations.mdmllib.mdpython-api.mdspark-context.mdsql.mdstreaming.md

streaming.mddocs/

0

# Spark Streaming

1

2

Spark Streaming enables scalable, high-throughput, fault-tolerant stream processing of live data streams. It ingests data from sources like Kafka, Flume, Twitter, and TCP sockets, and processes it using high-level functions like map, reduce, join, and window operations.

3

4

## Core Concepts

5

6

Spark Streaming discretizes live data streams into micro-batches called **DStreams** (Discretized Streams). Each batch is processed as an RDD, enabling the use of Spark's batch processing APIs on streaming data.

7

8

## StreamingContext

9

10

The main entry point for Spark Streaming functionality.

11

12

### StreamingContext Class

13

14

```scala { .api }

15

class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) extends Logging {

16

// Alternative constructors

17

def this(conf: SparkConf, batchDuration: Duration)

18

def this(master: String, appName: String, batchDuration: Duration, sparkHome: String = null, jars: Seq[String] = Nil, environment: Map[String, String] = Map())

19

def this(path: String, hadoopConf: Configuration = new Configuration())

20

}

21

```

22

23

### Creating StreamingContext

24

25

```scala

26

import org.apache.spark.streaming.{StreamingContext, Seconds, Minutes}

27

import org.apache.spark.{SparkContext, SparkConf}

28

29

// From existing SparkContext

30

val sc = new SparkContext(conf)

31

val ssc = new StreamingContext(sc, Seconds(1))

32

33

// From SparkConf

34

val conf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]")

35

val ssc = new StreamingContext(conf, Seconds(2))

36

37

// With all parameters

38

val ssc = new StreamingContext(

39

master = "local[*]",

40

appName = "My Streaming App",

41

batchDuration = Seconds(1),

42

sparkHome = "/path/to/spark",

43

jars = Seq("app.jar"),

44

environment = Map("ENV_VAR" -> "value")

45

)

46

47

// From checkpoint (recovery)

48

val ssc = new StreamingContext("hdfs://path/to/checkpoint", new Configuration())

49

```

50

51

### Duration Helper Objects

52

53

```scala { .api }

54

import org.apache.spark.streaming.{Milliseconds, Seconds, Minutes}

55

56

Milliseconds(500) // 500 milliseconds

57

Seconds(1) // 1 second

58

Seconds(30) // 30 seconds

59

Minutes(1) // 1 minute

60

```

61

62

## Input DStream Creation

63

64

### Socket Streams

65

66

**socketTextStream**: Create DStream from TCP socket

67

```scala { .api }

68

def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]

69

```

70

71

```scala

72

// Connect to TCP socket for text data

73

val lines = ssc.socketTextStream("localhost", 9999)

74

75

// Process the stream

76

val words = lines.flatMap(_.split(" "))

77

val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

78

wordCounts.print()

79

```

80

81

**socketStream**: Custom socket stream with converter

82

```scala { .api }

83

def socketStream[T: ClassTag](hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel): ReceiverInputDStream[T]

84

```

85

86

**rawSocketStream**: Raw socket stream returning byte arrays

87

```scala { .api }

88

def rawSocketStream[T: ClassTag](hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[T]

89

```

90

91

### File Streams

92

93

**textFileStream**: Monitor directory for new text files

94

```scala { .api }

95

def textFileStream(directory: String): DStream[String]

96

```

97

98

```scala

99

// Monitor directory for new files

100

val fileStream = ssc.textFileStream("hdfs://path/to/directory")

101

102

// Process new files as they arrive

103

val processed = fileStream

104

.filter(_.nonEmpty)

105

.map(_.toUpperCase)

106

107

processed.print()

108

```

109

110

**fileStream**: Generic file stream with InputFormat

111

```scala { .api }

112

def fileStream[K, V, F <: NewInputFormat[K, V]: ClassTag](directory: String, filter: Path => Boolean = _ => true, newFilesOnly: Boolean = true): InputDStream[(K, V)]

113

```

114

115

```scala

116

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

117

import org.apache.hadoop.io.{LongWritable, Text}

118

119

val hadoopStream = ssc.fileStream[LongWritable, Text, TextInputFormat]("hdfs://input/dir")

120

val textStream = hadoopStream.map(_._2.toString)

121

```

122

123

### Queue Streams (for testing)

124

125

**queueStream**: Create stream from queue of RDDs

126

```scala { .api }

127

def queueStream[T: ClassTag](queue: Queue[RDD[T]], oneAtATime: Boolean = true, defaultRDD: RDD[T] = null): InputDStream[T]

128

```

129

130

```scala

131

import scala.collection.mutable.Queue

132

133

val rddQueue = Queue[RDD[Int]]()

134

135

// Create stream from queue

136

val queueStream = ssc.queueStream(rddQueue)

137

138

// Add RDDs to queue (simulate data arrival)

139

for (i <- 1 to 10) {

140

rddQueue += ssc.sparkContext.parallelize(1 to 100)

141

}

142

```

143

144

### Custom Receiver Streams

145

146

**receiverStream**: Create stream from custom receiver

147

```scala { .api }

148

def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]

149

```

150

151

```scala

152

import org.apache.spark.streaming.receiver.Receiver

153

import org.apache.spark.storage.StorageLevel

154

155

// Custom receiver example

156

class CustomReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {

157

def onStart() {

158

// Start receiving data

159

new Thread("Custom Receiver") {

160

override def run() { receive() }

161

}.start()

162

}

163

164

def onStop() {

165

// Stop receiving data

166

}

167

168

private def receive() {

169

while (!isStopped()) {

170

// Simulate data reception

171

val data = generateData()

172

store(data)

173

Thread.sleep(100)

174

}

175

}

176

}

177

178

val customStream = ssc.receiverStream(new CustomReceiver())

179

```

180

181

**actorStream**: Create stream from Akka Actor

182

```scala { .api }

183

def actorStream[T: ClassTag](props: Props, name: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[T]

184

```

185

186

## DStream Transformations

187

188

DStreams support transformations similar to RDDs, applied to each batch.

189

190

### DStream Class

191

192

```scala { .api }

193

abstract class DStream[T: ClassTag] extends Serializable with Logging {

194

def ssc: StreamingContext

195

def slideDuration: Duration

196

def dependencies: List[DStream[_]]

197

def compute(time: Time): Option[RDD[T]]

198

}

199

```

200

201

### Basic Transformations

202

203

**map**: Apply function to each element in each batch

204

```scala { .api }

205

def map[U: ClassTag](mapFunc: T => U): DStream[U]

206

```

207

208

```scala

209

val numbers = ssc.socketTextStream("localhost", 9999)

210

val doubled = numbers.map(_.toInt * 2)

211

```

212

213

**flatMap**: Apply function and flatten results

214

```scala { .api }

215

def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U]

216

```

217

218

```scala

219

val lines = ssc.textFileStream("input/")

220

val words = lines.flatMap(_.split(" "))

221

```

222

223

**filter**: Keep elements matching predicate

224

```scala { .api }

225

def filter(filterFunc: T => Boolean): DStream[T]

226

```

227

228

```scala

229

val validLines = lines.filter(_.nonEmpty)

230

val longWords = words.filter(_.length > 5)

231

```

232

233

**glom**: Coalesce elements within each partition into arrays

234

```scala { .api }

235

def glom(): DStream[Array[T]]

236

```

237

238

### Stream Operations

239

240

**union**: Union with another DStream

241

```scala { .api }

242

def union(that: DStream[T]): DStream[T]

243

```

244

245

```scala

246

val stream1 = ssc.socketTextStream("host1", 9999)

247

val stream2 = ssc.socketTextStream("host2", 9999)

248

val combined = stream1.union(stream2)

249

```

250

251

### Aggregation Transformations

252

253

**count**: Count elements in each batch

254

```scala { .api }

255

def count(): DStream[Long]

256

```

257

258

**countByValue**: Count occurrences of each value

259

```scala { .api }

260

def countByValue()(implicit ord: Ordering[T] = null): DStream[(T, Long)]

261

```

262

263

**reduce**: Reduce elements in each batch

264

```scala { .api }

265

def reduce(reduceFunc: (T, T) => T): DStream[T]

266

```

267

268

```scala

269

val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)

270

271

val counts = numbers.count() // Count per batch

272

val sums = numbers.reduce(_ + _) // Sum per batch

273

val maxValues = numbers.reduce(math.max) // Max per batch

274

```

275

276

### Advanced Transformations

277

278

**transform**: Apply arbitrary RDD-to-RDD function

279

```scala { .api }

280

def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]

281

def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]

282

```

283

284

```scala

285

val enhanced = stream.transform { (rdd, time) =>

286

// Access to both RDD and batch time

287

val timeString = time.toString

288

rdd.map(data => s"$timeString: $data")

289

.filter(_.contains("important"))

290

}

291

```

292

293

**transformWith**: Transform with another DStream

294

```scala { .api }

295

def transformWith[U: ClassTag, V: ClassTag](other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]): DStream[V]

296

def transformWith[U: ClassTag, V: ClassTag](other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]): DStream[V]

297

```

298

299

```scala

300

val stream1 = ssc.socketTextStream("localhost", 9999)

301

val stream2 = ssc.socketTextStream("localhost", 8888)

302

303

val joined = stream1.transformWith(stream2) { (rdd1, rdd2) =>

304

// Join RDDs from different streams

305

val pairs1 = rdd1.map(line => (extractKey(line), line))

306

val pairs2 = rdd2.map(line => (extractKey(line), line))

307

pairs1.join(pairs2).map { case (key, (v1, v2)) => s"$v1 | $v2" }

308

}

309

```

310

311

## Window Operations

312

313

Window operations allow you to apply transformations over a sliding window of data.

314

315

### Basic Windowing

316

317

**window**: Return windowed DStream

318

```scala { .api }

319

def window(windowDuration: Duration): DStream[T]

320

def window(windowDuration: Duration, slideDuration: Duration): DStream[T]

321

```

322

323

```scala

324

val lines = ssc.socketTextStream("localhost", 9999)

325

326

// 30-second window, sliding every 10 seconds

327

val windowedLines = lines.window(Seconds(30), Seconds(10))

328

val windowCounts = windowedLines.count()

329

```

330

331

### Windowed Reductions

332

333

**reduceByWindow**: Reduce over a window

334

```scala { .api }

335

def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]

336

def reduceByWindow(reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]

337

```

338

339

```scala

340

val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)

341

342

// Sum over window

343

val windowSums = numbers.reduceByWindow(

344

_ + _, // Add new values

345

Seconds(60), // Window duration

346

Seconds(20) // Slide duration

347

)

348

349

// Efficient windowed reduction with inverse function

350

val efficientSums = numbers.reduceByWindow(

351

_ + _, // Add function

352

_ - _, // Inverse (subtract) function

353

Seconds(60),

354

Seconds(20)

355

)

356

```

357

358

**countByWindow**: Count elements over window

359

```scala { .api }

360

def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]

361

```

362

363

**countByValueAndWindow**: Count values over window

364

```scala { .api }

365

def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null): DStream[(T, Long)]

366

```

367

368

```scala

369

val words = ssc.socketTextStream("localhost", 9999).flatMap(_.split(" "))

370

371

// Count words in 2-minute window, sliding every 30 seconds

372

val wordCounts = words.countByValueAndWindow(Minutes(2), Seconds(30))

373

wordCounts.print()

374

```

375

376

## PairDStreamFunctions (Key-Value Operations)

377

378

Operations available on DStreams of (key, value) pairs through implicit conversion.

379

380

### Key-Value Transformations

381

382

**keys and values**:

383

```scala { .api }

384

def keys: DStream[K]

385

def values: DStream[V]

386

```

387

388

**mapValues**: Transform values while preserving keys

389

```scala { .api }

390

def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)]

391

```

392

393

**flatMapValues**: FlatMap values while preserving keys

394

```scala { .api }

395

def flatMapValues[U: ClassTag](flatMapValuesFunc: V => TraversableOnce[U]): DStream[(K, U)]

396

```

397

398

```scala

399

import org.apache.spark.streaming.StreamingContext._

400

401

val pairs = ssc.socketTextStream("localhost", 9999)

402

.map(line => {

403

val parts = line.split(",")

404

(parts(0), parts(1).toInt)

405

})

406

407

val doubled = pairs.mapValues(_ * 2)

408

val allKeys = pairs.keys

409

val allValues = pairs.values

410

```

411

412

### Aggregation by Key

413

414

**groupByKey**: Group values by key in each batch

415

```scala { .api }

416

def groupByKey(): DStream[(K, Iterable[V])]

417

def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])]

418

def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]

419

```

420

421

**reduceByKey**: Reduce values by key in each batch

422

```scala { .api }

423

def reduceByKey(func: (V, V) => V): DStream[(K, V)]

424

def reduceByKey(func: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]

425

def reduceByKey(func: (V, V) => V, numPartitions: Int): DStream[(K, V)]

426

```

427

428

**combineByKey**: Generic combine by key

429

```scala { .api }

430

def combineByKey[C: ClassTag](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, partitioner: Partitioner): DStream[(K, C)]

431

```

432

433

```scala

434

val wordStream = ssc.socketTextStream("localhost", 9999)

435

.flatMap(_.split(" "))

436

.map(word => (word, 1))

437

438

// Count words in each batch

439

val wordCounts = wordStream.reduceByKey(_ + _)

440

441

// Group all occurrences

442

val wordGroups = wordStream.groupByKey()

443

```

444

445

### Windowed Key-Value Operations

446

447

**groupByKeyAndWindow**: Group by key over window

448

```scala { .api }

449

def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]

450

def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]

451

def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner): DStream[(K, Iterable[V])]

452

```

453

454

**reduceByKeyAndWindow**: Reduce by key over window

455

```scala { .api }

456

def reduceByKeyAndWindow(func: (V, V) => V, windowDuration: Duration): DStream[(K, V)]

457

def reduceByKeyAndWindow(func: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]

458

def reduceByKeyAndWindow(func: (V, V) => V, invFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]

459

```

460

461

**countByKeyAndWindow**: Count by key over window

462

```scala { .api }

463

def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Long)]

464

```

465

466

```scala

467

val wordPairs = ssc.socketTextStream("localhost", 9999)

468

.flatMap(_.split(" "))

469

.map((_, 1))

470

471

// Windowed word count (last 5 minutes, every 30 seconds)

472

val windowedWordCounts = wordPairs.reduceByKeyAndWindow(

473

_ + _, // Reduce function

474

Minutes(5), // Window duration

475

Seconds(30) // Slide duration

476

)

477

478

// Efficient version with inverse function

479

val efficientWordCounts = wordPairs.reduceByKeyAndWindow(

480

_ + _, // Add function

481

_ - _, // Subtract function (inverse)

482

Minutes(5),

483

Seconds(30)

484

)

485

```

486

487

### Join Operations

488

489

**join**: Join with another DStream

490

```scala { .api }

491

def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]

492

def join[W: ClassTag](other: DStream[(K, W)], partitioner: Partitioner): DStream[(K, (V, W))]

493

```

494

495

**leftOuterJoin**, **rightOuterJoin**, **fullOuterJoin**:

496

```scala { .api }

497

def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]

498

def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]

499

def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))]

500

```

501

502

**cogroup**: Group together with another DStream

503

```scala { .api }

504

def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]

505

```

506

507

```scala

508

val stream1 = ssc.socketTextStream("localhost", 9999)

509

.map(line => (line.split(",")(0), line.split(",")(1)))

510

511

val stream2 = ssc.socketTextStream("localhost", 8888)

512

.map(line => (line.split(",")(0), line.split(",")(1)))

513

514

// Inner join

515

val joined = stream1.join(stream2)

516

517

// Left outer join

518

val leftJoined = stream1.leftOuterJoin(stream2)

519

520

// Cogroup

521

val cogrouped = stream1.cogroup(stream2)

522

```

523

524

## Stateful Operations

525

526

### updateStateByKey

527

528

Maintain state across batches for each key:

529

530

```scala { .api }

531

def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]

532

def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner): DStream[(K, S)]

533

def updateStateByKey[S: ClassTag](updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean): DStream[(K, S)]

534

```

535

536

```scala

537

// Running count of words

538

val wordPairs = ssc.socketTextStream("localhost", 9999)

539

.flatMap(_.split(" "))

540

.map((_, 1))

541

542

val runningCounts = wordPairs.updateStateByKey { (values: Seq[Int], state: Option[Int]) =>

543

val newCount = values.sum + state.getOrElse(0)

544

Some(newCount)

545

}

546

547

// Advanced state management

548

case class WordStats(count: Int, lastSeen: Long)

549

550

val wordStats = wordPairs.updateStateByKey { (values: Seq[Int], state: Option[WordStats]) =>

551

val currentTime = System.currentTimeMillis()

552

val currentCount = values.sum

553

554

state match {

555

case Some(stats) => Some(WordStats(stats.count + currentCount, currentTime))

556

case None => Some(WordStats(currentCount, currentTime))

557

}

558

}

559

```

560

561

## DStream Actions

562

563

Actions trigger the execution of DStream transformations.

564

565

### Output Operations

566

567

**print**: Print first 10 elements of each batch

568

```scala { .api }

569

def print(): Unit

570

def print(num: Int): Unit

571

```

572

573

**foreachRDD**: Apply function to each RDD

574

```scala { .api }

575

def foreachRDD(foreachFunc: RDD[T] => Unit): Unit

576

def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit

577

```

578

579

```scala

580

val processed = stream.map(process)

581

582

// Print results

583

processed.print()

584

processed.print(20) // Print first 20 elements

585

586

// Custom processing of each batch

587

processed.foreachRDD { rdd =>

588

val count = rdd.count()

589

if (count > 0) {

590

println(s"Batch size: $count")

591

rdd.take(10).foreach(println)

592

}

593

}

594

595

// With time information

596

processed.foreachRDD { (rdd, time) =>

597

println(s"Batch time: $time, Count: ${rdd.count()}")

598

}

599

```

600

601

### Save Operations

602

603

**saveAsTextFiles**: Save each batch as text files

604

```scala { .api }

605

def saveAsTextFiles(prefix: String, suffix: String = ""): Unit

606

```

607

608

**saveAsObjectFiles**: Save each batch as object files

609

```scala { .api }

610

def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit

611

```

612

613

```scala

614

val processed = stream.map(_.toUpperCase)

615

616

// Save each batch - creates files like output-1414060920000, output-1414060921000, etc.

617

processed.saveAsTextFiles("hdfs://path/to/output", ".txt")

618

619

// Save as object files

620

processed.saveAsObjectFiles("hdfs://path/to/objects")

621

```

622

623

## StreamingContext Control

624

625

### Starting and Stopping

626

627

**start**: Start the streaming computation

628

```scala { .api }

629

def start(): Unit

630

```

631

632

**stop**: Stop the streaming context

633

```scala { .api }

634

def stop(): Unit

635

def stop(stopSparkContext: Boolean): Unit

636

def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit

637

```

638

639

**awaitTermination**: Wait for termination

640

```scala { .api }

641

def awaitTermination(): Unit

642

def awaitTermination(timeout: Long): Boolean

643

```

644

645

```scala

646

val ssc = new StreamingContext(conf, Seconds(1))

647

648

// Define streaming computation

649

val stream = ssc.socketTextStream("localhost", 9999)

650

stream.print()

651

652

// Start the computation

653

ssc.start()

654

655

// Wait for termination

656

ssc.awaitTermination()

657

658

// Or wait with timeout

659

val terminated = ssc.awaitTermination(60000) // 60 seconds

660

if (!terminated) {

661

println("Streaming did not terminate within 60 seconds")

662

ssc.stop()

663

}

664

```

665

666

### Checkpointing

667

668

**checkpoint**: Set checkpoint directory

669

```scala { .api }

670

def checkpoint(directory: String): Unit

671

```

672

673

**getOrCreate**: Get existing context from checkpoint or create new one

674

```scala { .api }

675

def getOrCreate(checkpointPath: String, creatingFunc: () => StreamingContext, hadoopConf: Configuration = new Configuration()): StreamingContext

676

```

677

678

```scala

679

// Enable checkpointing

680

ssc.checkpoint("hdfs://path/to/checkpoints")

681

682

// Fault-tolerant pattern

683

def createStreamingContext(): StreamingContext = {

684

val ssc = new StreamingContext(conf, Seconds(1))

685

686

// Define streaming computation

687

val lines = ssc.socketTextStream("localhost", 9999)

688

val words = lines.flatMap(_.split(" "))

689

val wordCounts = words.map((_, 1)).reduceByKey(_ + _)

690

wordCounts.print()

691

692

ssc.checkpoint("hdfs://checkpoints")

693

ssc

694

}

695

696

val ssc = StreamingContext.getOrCreate("hdfs://checkpoints", createStreamingContext _)

697

```

698

699

### Context Properties

700

701

**remember**: Set remember duration

702

```scala { .api }

703

def remember(duration: Duration): Unit

704

```

705

706

**sparkContext**: Access underlying SparkContext

707

```scala { .api }

708

def sparkContext: SparkContext

709

```

710

711

```scala

712

// Set how long to remember RDDs

713

ssc.remember(Minutes(10))

714

715

// Access SparkContext

716

val sc = ssc.sparkContext

717

val broadcast = sc.broadcast(lookupTable)

718

```

719

720

## Persistence and Caching

721

722

DStreams can be persisted in memory for faster access:

723

724

```scala { .api }

725

def persist(storageLevel: StorageLevel): DStream[T]

726

def persist(): DStream[T] // Uses MEMORY_ONLY_SER

727

def cache(): DStream[T] // Uses MEMORY_ONLY_SER

728

```

729

730

```scala

731

import org.apache.spark.storage.StorageLevel

732

733

val expensiveStream = ssc.socketTextStream("localhost", 9999)

734

.map(expensiveTransformation)

735

.cache() // Cache for reuse

736

737

// Multiple operations on cached stream

738

val count = expensiveStream.count()

739

val sample = expensiveStream.sample(false, 0.1)

740

```

741

742

## Performance and Best Practices

743

744

### Batch Interval Selection

745

746

```scala

747

// For low latency (100ms - 1s)

748

val ssc = new StreamingContext(conf, Milliseconds(500))

749

750

// For high throughput (1s - 10s)

751

val ssc = new StreamingContext(conf, Seconds(5))

752

753

// For batch processing style (minutes)

754

val ssc = new StreamingContext(conf, Minutes(2))

755

```

756

757

### Parallelism and Partitioning

758

759

```scala

760

// Increase parallelism for receivers

761

val numReceivers = 4

762

val streams = (1 to numReceivers).map { i =>

763

ssc.socketTextStream(s"host$i", 9999)

764

}

765

val unifiedStream = ssc.union(streams)

766

767

// Repartition for better load balancing

768

val repartitioned = stream.transform(_.repartition(10))

769

```

770

771

### Memory Management

772

773

```scala

774

// Set appropriate storage levels

775

val persistedStream = stream

776

.map(expensiveOperation)

777

.persist(StorageLevel.MEMORY_AND_DISK_SER)

778

779

// Enable checkpointing for fault tolerance

780

ssc.checkpoint("hdfs://checkpoints")

781

782

// Use efficient serialization

783

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

784

```

785

786

## Error Handling and Fault Tolerance

787

788

Spark Streaming applications must handle various failure scenarios to ensure reliable operation.

789

790

### Common Streaming Errors

791

792

**StreamingContextException**: Invalid streaming context operations

793

```scala

794

try {

795

ssc.start()

796

ssc.start() // Error: context already started

797

} catch {

798

case e: IllegalStateException =>

799

println("Streaming context already started")

800

}

801

```

802

803

**Receiver Failures**: Input stream receivers failing

804

```scala

805

// Monitor receiver status

806

ssc.addStreamingListener(new StreamingListener {

807

override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {

808

println(s"Receiver error: ${receiverError.receiverInfo.name}")

809

// Implement recovery logic

810

}

811

})

812

```

813

814

**Batch Processing Delays**: When processing takes longer than batch interval

815

```scala

816

// Monitor batch processing times

817

ssc.addStreamingListener(new StreamingListener {

818

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {

819

val processingTime = batchCompleted.batchInfo.processingDelay.getOrElse(0L)

820

val batchInterval = ssc.graph.batchDuration.milliseconds

821

822

if (processingTime > batchInterval) {

823

println(s"Warning: Processing time ($processingTime ms) > batch interval ($batchInterval ms)")

824

}

825

}

826

})

827

```

828

829

### Checkpoint Corruption

830

831

**Checkpoint Recovery Failures**: When checkpoint data is corrupted

832

```scala

833

def createStreamingContext(): StreamingContext = {

834

val ssc = new StreamingContext(conf, Seconds(1))

835

// Define streaming logic

836

ssc.checkpoint("hdfs://checkpoints")

837

ssc

838

}

839

840

try {

841

val ssc = StreamingContext.getOrCreate("hdfs://checkpoints", createStreamingContext _)

842

} catch {

843

case e: Exception =>

844

println(s"Checkpoint recovery failed: ${e.getMessage}")

845

// Fall back to creating new context

846

val ssc = createStreamingContext()

847

}

848

```

849

850

**Checkpoint Directory Management**:

851

```scala

852

// Clean up old checkpoints periodically

853

import java.io.File

854

import org.apache.hadoop.fs.{FileSystem, Path}

855

856

def cleanupCheckpoints(checkpointDir: String, retentionHours: Int): Unit = {

857

val fs = FileSystem.get(ssc.sparkContext.hadoopConfiguration)

858

val checkpointPath = new Path(checkpointDir)

859

860

try {

861

val cutoffTime = System.currentTimeMillis() - (retentionHours * 60 * 60 * 1000)

862

val files = fs.listStatus(checkpointPath)

863

864

files.foreach { fileStatus =>

865

if (fileStatus.getModificationTime < cutoffTime) {

866

fs.delete(fileStatus.getPath, true)

867

println(s"Deleted old checkpoint: ${fileStatus.getPath}")

868

}

869

}

870

} catch {

871

case e: Exception => println(s"Checkpoint cleanup failed: ${e.getMessage}")

872

}

873

}

874

```

875

876

### Memory and Resource Errors

877

878

**OutOfMemoryError in Streaming**:

879

```scala

880

// Monitor memory usage and adjust batch sizes

881

val memoryMonitoringStream = stream.transform { rdd =>

882

val memoryUsed = Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory

883

val memoryMax = Runtime.getRuntime.maxMemory

884

val memoryPercent = (memoryUsed.toDouble / memoryMax) * 100

885

886

if (memoryPercent > 80) {

887

println(s"Warning: Memory usage at ${memoryPercent.toInt}%")

888

// Reduce batch size or increase memory

889

}

890

891

rdd

892

}

893

```

894

895

**Backpressure Issues**: When input rate exceeds processing capacity

896

```scala

897

// Enable backpressure (Spark 1.5+)

898

conf.set("spark.streaming.backpressure.enabled", "true")

899

conf.set("spark.streaming.backpressure.initialRate", "1000")

900

901

// Manual rate limiting

902

conf.set("spark.streaming.kafka.maxRatePerPartition", "1000")

903

```

904

905

### Network and Connectivity Errors

906

907

**Socket Connection Failures**:

908

```scala

909

// Implement retry logic for socket connections

910

def createReliableSocketStream(hostname: String, port: Int, maxRetries: Int = 3): DStream[String] = {

911

var attempts = 0

912

var stream: DStream[String] = null

913

914

while (attempts < maxRetries && stream == null) {

915

try {

916

stream = ssc.socketTextStream(hostname, port)

917

println(s"Connected to $hostname:$port")

918

} catch {

919

case e: ConnectException =>

920

attempts += 1

921

println(s"Connection attempt $attempts failed: ${e.getMessage}")

922

if (attempts < maxRetries) {

923

Thread.sleep(5000) // Wait 5 seconds before retry

924

}

925

}

926

}

927

928

if (stream == null) {

929

throw new RuntimeException(s"Failed to connect after $maxRetries attempts")

930

}

931

932

stream

933

}

934

```

935

936

**Kafka Connection Issues**:

937

```scala

938

// Handle Kafka metadata refresh failures

939

val kafkaParams = Map[String, String](

940

"metadata.broker.list" -> "broker1:9092,broker2:9092",

941

"auto.offset.reset" -> "smallest",

942

"refresh.leader.backoff.ms" -> "1000",

943

"socket.timeout.ms" -> "30000",

944

"fetch.message.max.bytes" -> "1048576"

945

)

946

947

try {

948

val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](

949

ssc, kafkaParams, topics

950

)

951

} catch {

952

case e: TimeoutException =>

953

println("Kafka connection timeout - check broker availability")

954

case e: Exception =>

955

println(s"Kafka stream creation failed: ${e.getMessage}")

956

}

957

```

958

959

### Processing Errors and Recovery

960

961

**Exception Handling in Transformations**:

962

```scala

963

val robustStream = stream.map { record =>

964

try {

965

processRecord(record)

966

} catch {

967

case e: NumberFormatException =>

968

println(s"Invalid number format in record: $record")

969

null // or default value

970

case e: Exception =>

971

println(s"Processing error for record $record: ${e.getMessage}")

972

null

973

}

974

}.filter(_ != null) // Remove failed records

975

```

976

977

**Dead Letter Queue Pattern**:

978

```scala

979

val (successStream, errorStream) = stream.transform { rdd =>

980

val processed = rdd.map { record =>

981

try {

982

(Some(processRecord(record)), None)

983

} catch {

984

case e: Exception =>

985

(None, Some((record, e.getMessage)))

986

}

987

}.cache() // Cache to avoid recomputation

988

989

val successes = processed.filter(_._1.isDefined).map(_._1.get)

990

val errors = processed.filter(_._2.isDefined).map(_._2.get)

991

992

// Save errors to dead letter queue

993

errors.foreachPartition { partition =>

994

partition.foreach { case (record, error) =>

995

saveToDeadLetterQueue(record, error)

996

}

997

}

998

999

successes

1000

}

1001

```

1002

1003

### Best Practices for Error Handling

1004

1005

1. **Enable Checkpointing**: Always use checkpointing for production applications

1006

2. **Monitor Batch Processing Times**: Ensure processing time < batch interval

1007

3. **Implement Circuit Breakers**: Fail fast when external services are down

1008

4. **Use Write-Ahead Logs**: Enable WAL for reliable receivers

1009

5. **Handle Partial Failures**: Process what you can, log what fails

1010

6. **Set Up Monitoring**: Use Spark UI and external monitoring tools

1011

1012

```scala

1013

// Comprehensive error handling pattern

1014

def createRobustStreamingApp(): StreamingContext = {

1015

val ssc = new StreamingContext(conf, Seconds(1))

1016

1017

// Enable fault tolerance features

1018

ssc.checkpoint("hdfs://checkpoints")

1019

conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")

1020

conf.set("spark.streaming.backpressure.enabled", "true")

1021

1022

val stream = ssc.socketTextStream("localhost", 9999)

1023

.map(parseRecord)

1024

.filter(_.isDefined)

1025

.map(_.get)

1026

.handleErrors()

1027

.cache()

1028

1029

// Multiple outputs for different purposes

1030

stream.print()

1031

stream.saveAsTextFiles("hdfs://output/data")

1032

1033

// Add monitoring

1034

ssc.addStreamingListener(new CustomStreamingListener())

1035

1036

ssc

1037

}

1038

1039

implicit class RobustDStream[T](dstream: DStream[T]) {

1040

def handleErrors(): DStream[T] = {

1041

dstream.transform { rdd =>

1042

rdd.filter(_ != null).handlePartitionErrors()

1043

}

1044

}

1045

}

1046

```

1047

1048

This comprehensive guide covers the complete Spark Streaming API along with robust error handling patterns for building scalable, fault-tolerant stream processing applications.