or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-operations.mdindex.mdinput-sources.mdjava-api.mdoutput-operations.mdstateful-operations.mdtransformations.md

transformations.mddocs/

0

# DStream Transformations

1

2

Comprehensive transformation operations for processing streaming data including mapping, filtering, windowing, aggregations, and advanced operations.

3

4

## Basic Transformations

5

6

### Map Operations

7

8

Transform each element:

9

```scala { .api }

10

def map[U: ClassTag](mapFunc: T => U): DStream[U]

11

```

12

13

Transform each element with partition information:

14

```scala { .api }

15

def mapPartitions[U: ClassTag](

16

mapPartFunc: Iterator[T] => Iterator[U],

17

preservePartitioning: Boolean = false

18

): DStream[U]

19

```

20

21

Example map operations:

22

```scala

23

val lines = ssc.socketTextStream("localhost", 9999)

24

val lengths = lines.map(_.length)

25

val upperCase = lines.map(_.toUpperCase)

26

27

// MapPartitions for batch processing

28

val batchProcessed = lines.mapPartitions { iter =>

29

val batch = iter.toList

30

processBatch(batch).iterator

31

}

32

```

33

34

### FlatMap Operations

35

36

Transform and flatten elements:

37

```scala { .api }

38

def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U]

39

```

40

41

Example word splitting:

42

```scala

43

val lines = ssc.socketTextStream("localhost", 9999)

44

val words = lines.flatMap(_.split("\\s+"))

45

val nonEmptyWords = lines.flatMap(_.split("\\s+").filter(_.nonEmpty))

46

```

47

48

### Filter Operations

49

50

Filter elements based on predicate:

51

```scala { .api }

52

def filter(filterFunc: T => Boolean): DStream[T]

53

```

54

55

Example filtering:

56

```scala

57

val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)

58

val evenNumbers = numbers.filter(_ % 2 == 0)

59

val positiveNumbers = numbers.filter(_ > 0)

60

```

61

62

## Aggregation Operations

63

64

### Reduce Operations

65

66

Reduce elements in each RDD:

67

```scala { .api }

68

def reduce(reduceFunc: (T, T) => T): DStream[T]

69

```

70

71

Count elements in each RDD:

72

```scala { .api }

73

def count(): DStream[Long]

74

```

75

76

Count by value:

77

```scala { .api }

78

def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)]

79

```

80

81

Example aggregations:

82

```scala

83

val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)

84

85

val sum = numbers.reduce(_ + _)

86

val count = numbers.count()

87

val histogram = numbers.countByValue()

88

89

sum.print()

90

count.print()

91

histogram.print()

92

```

93

94

## Partitioning Operations

95

96

### Repartition

97

98

Change number of partitions:

99

```scala { .api }

100

def repartition(numPartitions: Int): DStream[T]

101

```

102

103

### Coalesce

104

105

Coalesce elements within partitions:

106

```scala { .api }

107

def glom(): DStream[Array[T]]

108

```

109

110

Example partitioning:

111

```scala

112

val lines = ssc.socketTextStream("localhost", 9999)

113

val repartitioned = lines.repartition(4)

114

val grouped = lines.glom() // Group elements within partitions

115

```

116

117

## Window Operations

118

119

### Basic Windowing

120

121

Create windowed DStream:

122

```scala { .api }

123

def window(windowDuration: Duration): DStream[T]

124

def window(windowDuration: Duration, slideDuration: Duration): DStream[T]

125

```

126

127

Example windowing:

128

```scala

129

val lines = ssc.socketTextStream("localhost", 9999)

130

131

// 30-second windows sliding every 10 seconds

132

val windowedLines = lines.window(Seconds(30), Seconds(10))

133

val windowCounts = windowedLines.count()

134

```

135

136

### Windowed Reductions

137

138

Reduce over windows:

139

```scala { .api }

140

def reduceByWindow(

141

reduceFunc: (T, T) => T,

142

windowDuration: Duration,

143

slideDuration: Duration

144

): DStream[T]

145

```

146

147

Optimized windowed reduction with inverse function:

148

```scala { .api }

149

def reduceByWindow(

150

reduceFunc: (T, T) => T,

151

invReduceFunc: (T, T) => T,

152

windowDuration: Duration,

153

slideDuration: Duration

154

): DStream[T]

155

```

156

157

Example windowed reductions:

158

```scala

159

val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)

160

161

// Sum over 1-minute windows, sliding every 10 seconds

162

val windowSum = numbers.reduceByWindow(_ + _, Minutes(1), Seconds(10))

163

164

// Optimized version with inverse function for subtraction

165

val optimizedSum = numbers.reduceByWindow(

166

_ + _, // Add function

167

_ - _, // Inverse (subtract) function

168

Minutes(1), // Window duration

169

Seconds(10) // Slide duration

170

)

171

```

172

173

### Windowed Counting

174

175

Count elements over windows:

176

```scala { .api }

177

def countByWindow(

178

windowDuration: Duration,

179

slideDuration: Duration

180

): DStream[Long]

181

```

182

183

Count by value over windows:

184

```scala { .api }

185

def countByValueAndWindow(

186

windowDuration: Duration,

187

slideDuration: Duration,

188

numPartitions: Int = ssc.sc.defaultParallelism

189

): DStream[(T, Long)]

190

```

191

192

## Pair DStream Operations

193

194

### Key-Value Transformations

195

196

Map values while preserving keys:

197

```scala { .api }

198

def mapValues[U: ClassTag](f: V => U): DStream[(K, U)] // On DStream[(K, V)]

199

```

200

201

FlatMap values:

202

```scala { .api }

203

def flatMapValues[U: ClassTag](f: V => TraversableOnce[U]): DStream[(K, U)]

204

```

205

206

Example key-value transformations:

207

```scala

208

val pairs = ssc.socketTextStream("localhost", 9999)

209

.map(line => (line.split(":")(0), line.split(":")(1)))

210

211

val upperValues = pairs.mapValues(_.toUpperCase)

212

val wordValues = pairs.flatMapValues(_.split("\\s+"))

213

```

214

215

### Grouping Operations

216

217

Group by key:

218

```scala { .api }

219

def groupByKey(): DStream[(K, Iterable[V])] // On DStream[(K, V)]

220

def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]

221

def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])]

222

```

223

224

Example grouping:

225

```scala

226

val keyValuePairs = ssc.socketTextStream("localhost", 9999)

227

.map(line => (line.charAt(0).toString, line))

228

229

val grouped = keyValuePairs.groupByKey()

230

grouped.foreachRDD { rdd =>

231

rdd.collect().foreach { case (key, values) =>

232

println(s"Key: $key, Count: ${values.size}")

233

}

234

}

235

```

236

237

### Reduction Operations

238

239

Reduce by key:

240

```scala { .api }

241

def reduceByKey(func: (V, V) => V): DStream[(K, V)]

242

def reduceByKey(func: (V, V) => V, numPartitions: Int): DStream[(K, V)]

243

def reduceByKey(func: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]

244

```

245

246

Combine by key:

247

```scala { .api }

248

def combineByKey[C: ClassTag](

249

createCombiner: V => C,

250

mergeValue: (C, V) => C,

251

mergeCombiner: (C, C) => C,

252

partitioner: Partitioner,

253

mapSideCombine: Boolean = true

254

): DStream[(K, C)]

255

```

256

257

Example reductions:

258

```scala

259

val wordCounts = ssc.socketTextStream("localhost", 9999)

260

.flatMap(_.split("\\s+"))

261

.map((_, 1))

262

.reduceByKey(_ + _)

263

264

// Advanced combine operation for computing averages

265

val scores = ssc.socketTextStream("localhost", 9999)

266

.map(line => (line.split(",")(0), line.split(",")(1).toDouble))

267

268

val averages = scores.combineByKey(

269

(score: Double) => (score, 1), // Create combiner

270

(acc: (Double, Int), score: Double) => (acc._1 + score, acc._2 + 1), // Merge value

271

(acc1: (Double, Int), acc2: (Double, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // Merge combiners

272

).mapValues { case (sum, count) => sum / count }

273

```

274

275

### Windowed Key-Value Operations

276

277

Group by key over windows:

278

```scala { .api }

279

def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]

280

def groupByKeyAndWindow(

281

windowDuration: Duration,

282

slideDuration: Duration

283

): DStream[(K, Iterable[V])]

284

def groupByKeyAndWindow(

285

windowDuration: Duration,

286

slideDuration: Duration,

287

numPartitions: Int

288

): DStream[(K, Iterable[V])]

289

def groupByKeyAndWindow(

290

windowDuration: Duration,

291

slideDuration: Duration,

292

partitioner: Partitioner

293

): DStream[(K, Iterable[V])]

294

```

295

296

Reduce by key over windows:

297

```scala { .api }

298

def reduceByKeyAndWindow(

299

func: (V, V) => V,

300

windowDuration: Duration

301

): DStream[(K, V)]

302

303

def reduceByKeyAndWindow(

304

func: (V, V) => V,

305

windowDuration: Duration,

306

slideDuration: Duration

307

): DStream[(K, V)]

308

309

def reduceByKeyAndWindow(

310

func: (V, V) => V,

311

windowDuration: Duration,

312

slideDuration: Duration,

313

numPartitions: Int

314

): DStream[(K, V)]

315

316

def reduceByKeyAndWindow(

317

func: (V, V) => V,

318

windowDuration: Duration,

319

slideDuration: Duration,

320

partitioner: Partitioner

321

): DStream[(K, V)]

322

```

323

324

Optimized windowed reduction:

325

```scala { .api }

326

def reduceByKeyAndWindow(

327

reduceFunc: (V, V) => V,

328

invReduceFunc: (V, V) => V,

329

windowDuration: Duration,

330

slideDuration: Duration

331

): DStream[(K, V)]

332

```

333

334

## Join Operations

335

336

### Basic Joins

337

338

Inner join:

339

```scala { .api }

340

def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]

341

def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))]

342

```

343

344

Left outer join:

345

```scala { .api }

346

def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]

347

def leftOuterJoin[W: ClassTag](

348

other: DStream[(K, W)],

349

numPartitions: Int

350

): DStream[(K, (V, Option[W]))]

351

```

352

353

Right outer join:

354

```scala { .api }

355

def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]

356

def rightOuterJoin[W: ClassTag](

357

other: DStream[(K, W)],

358

numPartitions: Int

359

): DStream[(K, (Option[V], W))]

360

```

361

362

Full outer join:

363

```scala { .api }

364

def fullOuterJoin[W: ClassTag](

365

other: DStream[(K, W)]

366

): DStream[(K, (Option[V], Option[W]))]

367

def fullOuterJoin[W: ClassTag](

368

other: DStream[(K, W)],

369

numPartitions: Int

370

): DStream[(K, (Option[V], Option[W]))]

371

```

372

373

Example joins:

374

```scala

375

val userActions = ssc.socketTextStream("localhost", 9999)

376

.map(line => (line.split(",")(0), line.split(",")(1))) // (userId, action)

377

378

val userProfiles = ssc.socketTextStream("localhost", 9998)

379

.map(line => (line.split(",")(0), line.split(",")(1))) // (userId, profile)

380

381

val enrichedActions = userActions.join(userProfiles)

382

val optionalEnriched = userActions.leftOuterJoin(userProfiles)

383

```

384

385

## Union Operations

386

387

Union multiple DStreams:

388

```scala { .api }

389

def union(that: DStream[T]): DStream[T]

390

```

391

392

Static union method:

393

```scala { .api }

394

def union[T](streams: Seq[DStream[T]]): DStream[T] // On StreamingContext

395

```

396

397

Example union:

398

```scala

399

val stream1 = ssc.socketTextStream("localhost", 9999)

400

val stream2 = ssc.socketTextStream("localhost", 9998)

401

val stream3 = ssc.textFileStream("/data/input")

402

403

// Union two streams

404

val combined = stream1.union(stream2)

405

406

// Union multiple streams

407

val allCombined = ssc.union(Seq(stream1, stream2, stream3))

408

```

409

410

## Transform Operations

411

412

### RDD-level Transformations

413

414

Apply arbitrary RDD transformations:

415

```scala { .api }

416

def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]

417

def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]

418

```

419

420

Transform with another DStream:

421

```scala { .api }

422

def transformWith[U: ClassTag, V: ClassTag](

423

other: DStream[U],

424

transformFunc: (RDD[T], RDD[U]) => RDD[V]

425

): DStream[V]

426

427

def transformWith[U: ClassTag, V: ClassTag](

428

other: DStream[U],

429

transformFunc: (RDD[T], RDD[U], Time) => RDD[V]

430

): DStream[V]

431

```

432

433

Example transforms:

434

```scala

435

val lines = ssc.socketTextStream("localhost", 9999)

436

437

// Apply complex RDD operations

438

val processed = lines.transform { rdd =>

439

rdd.filter(_.nonEmpty)

440

.map(_.toLowerCase)

441

.zipWithIndex()

442

.filter(_._2 % 2 == 0)

443

.map(_._1)

444

}

445

446

// Transform with time information

447

val timestamped = lines.transform { (rdd, time) =>

448

rdd.map(line => s"${time.milliseconds}: $line")

449

}

450

451

// Transform with another stream

452

val stream2 = ssc.socketTextStream("localhost", 9998)

453

val combined = lines.transformWith(stream2) { (rdd1, rdd2) =>

454

rdd1.union(rdd2).distinct()

455

}

456

```

457

458

## Utility Transformations

459

460

### Cache and Persistence

461

462

Cache DStream:

463

```scala { .api }

464

def cache(): DStream[T]

465

def persist(): DStream[T]

466

def persist(level: StorageLevel): DStream[T]

467

```

468

469

### Checkpointing

470

471

Enable checkpointing:

472

```scala { .api }

473

def checkpoint(interval: Duration): DStream[T]

474

```

475

476

Example persistence:

477

```scala

478

val expensiveStream = lines

479

.map(expensiveTransformation)

480

.cache() // Cache for reuse

481

482

val checkpointedStream = expensiveStream

483

.checkpoint(Seconds(10)) // Checkpoint every 10 seconds

484

```

485

486

### Slicing

487

488

Get RDDs from time range:

489

```scala { .api }

490

def slice(interval: Interval): Seq[RDD[T]]

491

def slice(fromTime: Time, toTime: Time): Seq[RDD[T]]

492

```

493

494

Example slicing:

495

```scala

496

val stream = ssc.socketTextStream("localhost", 9999)

497

498

// Get RDDs from last 30 seconds

499

val currentTime = new Time(System.currentTimeMillis())

500

val past30Seconds = currentTime - Seconds(30)

501

val recentRDDs = stream.slice(past30Seconds, currentTime)

502

```