or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-transformations.mdcore-streaming.mdindex.mdinput-sources.mdjava-api.md

advanced-transformations.mddocs/

0

# Advanced Transformations and Windowing

1

2

Advanced streaming operations including windowed computations, stateful transformations, and complex data processing patterns for sophisticated streaming analytics and temporal data analysis.

3

4

## Capabilities

5

6

### Window Operations

7

8

Apply operations over sliding windows of data for temporal aggregations and time-based analytics.

9

10

```scala { .api }

11

abstract class DStream[T] {

12

/** Create windowed DStream with specified window and slide durations */

13

def window(windowDuration: Duration): DStream[T]

14

15

/** Create windowed DStream with custom slide duration */

16

def window(windowDuration: Duration, slideDuration: Duration): DStream[T]

17

18

/** Reduce elements over a sliding window */

19

def reduceByWindow(

20

reduceFunc: (T, T) => T,

21

windowDuration: Duration,

22

slideDuration: Duration

23

): DStream[T]

24

25

/** Incremental reduce with inverse function for efficiency */

26

def reduceByWindow(

27

reduceFunc: (T, T) => T,

28

invReduceFunc: (T, T) => T,

29

windowDuration: Duration,

30

slideDuration: Duration

31

): DStream[T]

32

33

/** Count elements in sliding window */

34

def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]

35

36

/** Count unique values in sliding window */

37

def countByValueAndWindow(

38

windowDuration: Duration,

39

slideDuration: Duration

40

): DStream[(T, Long)]

41

42

/** Count unique values with custom partitions */

43

def countByValueAndWindow(

44

windowDuration: Duration,

45

slideDuration: Duration,

46

numPartitions: Int

47

): DStream[(T, Long)]

48

}

49

```

50

51

**Usage Examples:**

52

53

```scala

54

import org.apache.spark.streaming._

55

56

val numbers: DStream[Int] = // stream of integers

57

58

// Basic windowing - collect data over 30 seconds, slide every 10 seconds

59

val windowed = numbers.window(Seconds(30), Seconds(10))

60

61

// Reduce over window - sum all numbers in window

62

val windowSum = numbers.reduceByWindow(_ + _, Seconds(30), Seconds(10))

63

64

// Incremental reduce for efficiency

65

val efficientSum = numbers.reduceByWindow(

66

_ + _, // reduce function

67

_ - _, // inverse reduce function

68

Seconds(30), // window duration

69

Seconds(10) // slide duration

70

)

71

72

// Count elements in window

73

val windowCounts = numbers.countByWindow(Seconds(30), Seconds(10))

74

75

// Window operations on text

76

val words: DStream[String] = ssc.socketTextStream("localhost", 9999).flatMap(_.split(" "))

77

val wordCounts = words.countByValueAndWindow(Seconds(30), Seconds(10))

78

79

wordCounts.print()

80

```

81

82

### Windowed Operations on Pair DStreams

83

84

Windowed operations specifically for key-value pairs, enabling temporal aggregations by key.

85

86

```scala { .api }

87

class PairDStreamFunctions[K, V] {

88

/** Group by key over sliding window */

89

def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]

90

91

/** Group by key with custom slide duration */

92

def groupByKeyAndWindow(

93

windowDuration: Duration,

94

slideDuration: Duration

95

): DStream[(K, Iterable[V])]

96

97

/** Group by key with custom partitioner */

98

def groupByKeyAndWindow(

99

windowDuration: Duration,

100

slideDuration: Duration,

101

partitioner: Partitioner

102

): DStream[(K, Iterable[V])]

103

104

/** Group by key with partition count */

105

def groupByKeyAndWindow(

106

windowDuration: Duration,

107

slideDuration: Duration,

108

numPartitions: Int

109

): DStream[(K, Iterable[V])]

110

111

/** Reduce by key over sliding window */

112

def reduceByKeyAndWindow(

113

reduceFunc: (V, V) => V,

114

windowDuration: Duration,

115

slideDuration: Duration

116

): DStream[(K, V)]

117

118

/** Reduce by key with custom partitioner */

119

def reduceByKeyAndWindow(

120

reduceFunc: (V, V) => V,

121

windowDuration: Duration,

122

slideDuration: Duration,

123

partitioner: Partitioner

124

): DStream[(K, V)]

125

126

/** Incremental reduce by key with inverse function */

127

def reduceByKeyAndWindow(

128

reduceFunc: (V, V) => V,

129

invReduceFunc: (V, V) => V,

130

windowDuration: Duration,

131

slideDuration: Duration

132

): DStream[(K, V)]

133

134

/** Incremental reduce with custom partitioner */

135

def reduceByKeyAndWindow(

136

reduceFunc: (V, V) => V,

137

invReduceFunc: (V, V) => V,

138

windowDuration: Duration,

139

slideDuration: Duration,

140

partitioner: Partitioner

141

): DStream[(K, V)]

142

143

/** Incremental reduce with partition count and filtering */

144

def reduceByKeyAndWindow(

145

reduceFunc: (V, V) => V,

146

invReduceFunc: (V, V) => V,

147

windowDuration: Duration,

148

slideDuration: Duration,

149

numPartitions: Int,

150

filterFunc: ((K, V)) => Boolean

151

): DStream[(K, V)]

152

}

153

```

154

155

**Usage Examples:**

156

157

```scala

158

val pairs: DStream[(String, Int)] = words.map(word => (word, 1))

159

160

// Group words by key over 1-minute windows

161

val groupedByWindow = pairs.groupByKeyAndWindow(Minutes(1), Seconds(10))

162

163

// Word count over sliding window

164

val wordCountsWindow = pairs.reduceByKeyAndWindow(_ + _, Minutes(1), Seconds(10))

165

166

// Efficient incremental word counting

167

val efficientWordCounts = pairs.reduceByKeyAndWindow(

168

_ + _, // add new words

169

_ - _, // subtract old words

170

Minutes(1), // window duration

171

Seconds(10) // slide duration

172

)

173

174

// Window with filtering - only words with count > 5

175

val filteredCounts = pairs.reduceByKeyAndWindow(

176

_ + _,

177

_ - _,

178

Minutes(1),

179

Seconds(10),

180

10, // numPartitions

181

{ case (word, count) => count > 5 }

182

)

183

```

184

185

### Stateful Transformations

186

187

Maintain state across batches for sophisticated streaming computations that require memory of past events.

188

189

```scala { .api }

190

class PairDStreamFunctions[K, V] {

191

/** Update state by key using update function */

192

def updateStateByKey[S](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]

193

194

/** Update state with custom partitioner */

195

def updateStateByKey[S](

196

updateFunc: (Seq[V], Option[S]) => Option[S],

197

partitioner: Partitioner

198

): DStream[(K, S)]

199

200

/** Update state with partition count */

201

def updateStateByKey[S](

202

updateFunc: (Seq[V], Option[S]) => Option[S],

203

numPartitions: Int

204

): DStream[(K, S)]

205

206

/** Update state with initial RDD */

207

def updateStateByKey[S](

208

updateFunc: (Seq[V], Option[S]) => Option[S],

209

partitioner: Partitioner,

210

initialRDD: RDD[(K, S)]

211

): DStream[(K, S)]

212

213

/** Advanced stateful operations using mapWithState */

214

def mapWithState[StateType, MappedType](

215

spec: StateSpec[K, V, StateType, MappedType]

216

): MapWithStateDStream[K, V, StateType, MappedType]

217

}

218

```

219

220

**Usage Examples:**

221

222

```scala

223

// Running word count with updateStateByKey

224

val runningCounts = pairs.updateStateByKey[Int] { (values: Seq[Int], state: Option[Int]) =>

225

val currentCount = values.sum

226

val previousCount = state.getOrElse(0)

227

Some(currentCount + previousCount)

228

}

229

230

// Session tracking example

231

case class Session(startTime: Long, lastSeen: Long, eventCount: Int)

232

233

val sessionUpdates = userEvents.updateStateByKey[Session] {

234

(events: Seq[UserEvent], session: Option[Session]) =>

235

if (events.nonEmpty) {

236

val now = System.currentTimeMillis()

237

session match {

238

case Some(s) =>

239

// Update existing session

240

Some(s.copy(lastSeen = now, eventCount = s.eventCount + events.length))

241

case None =>

242

// New session

243

Some(Session(now, now, events.length))

244

}

245

} else {

246

// No new events - check if session should timeout

247

session.filter(s => System.currentTimeMillis() - s.lastSeen < 300000) // 5 min timeout

248

}

249

}

250

251

// Complex state with custom partitioner

252

val partitioner = new HashPartitioner(8)

253

val statefulStream = pairs.updateStateByKey(updateFunc, partitioner)

254

```

255

256

### MapWithState Operations

257

258

More efficient and flexible stateful operations with timeout support and better performance.

259

260

```scala { .api }

261

/**

262

* Specification for mapWithState operation

263

*/

264

class StateSpec[KeyType, ValueType, StateType, MappedType] private (

265

mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType

266

) {

267

/** Set initial state RDD */

268

def initialState(rdd: RDD[(KeyType, StateType)]): StateSpec[KeyType, ValueType, StateType, MappedType]

269

270

/** Set number of partitions */

271

def numPartitions(numPartitions: Int): StateSpec[KeyType, ValueType, StateType, MappedType]

272

273

/** Set custom partitioner */

274

def partitioner(partitioner: Partitioner): StateSpec[KeyType, ValueType, StateType, MappedType]

275

276

/** Set timeout duration for idle keys */

277

def timeout(idleDuration: Duration): StateSpec[KeyType, ValueType, StateType, MappedType]

278

}

279

280

object StateSpec {

281

/** Create StateSpec with mapping function */

282

def function[KeyType, ValueType, StateType, MappedType](

283

mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType

284

): StateSpec[KeyType, ValueType, StateType, MappedType]

285

}

286

287

/**

288

* State object for managing state in mapWithState operations

289

*/

290

abstract class State[S] {

291

/** Check if state exists */

292

def exists(): Boolean

293

294

/** Get current state value */

295

def get(): S

296

297

/** Update state with new value */

298

def update(newState: S): Unit

299

300

/** Remove state */

301

def remove(): Unit

302

303

/** Check if state is timing out */

304

def isTimingOut(): Boolean

305

}

306

307

/**

308

* DStream returned by mapWithState operation

309

*/

310

class MapWithStateDStream[K, V, StateType, MappedType] extends DStream[MappedType] {

311

/** Get snapshots of current state */

312

def stateSnapshots(): DStream[(K, StateType)]

313

}

314

```

315

316

**Usage Examples:**

317

318

```scala

319

import org.apache.spark.streaming._

320

321

// Efficient session tracking with mapWithState

322

val trackSessions = StateSpec.function(

323

(userId: String, event: Option[UserEvent], state: State[Session]) => {

324

val currentTime = System.currentTimeMillis()

325

326

event match {

327

case Some(e) =>

328

// Update or create session

329

if (state.exists()) {

330

val session = state.get()

331

state.update(session.copy(lastSeen = currentTime, eventCount = session.eventCount + 1))

332

(userId, session.eventCount + 1) // return event count

333

} else {

334

val newSession = Session(currentTime, currentTime, 1)

335

state.update(newSession)

336

(userId, 1)

337

}

338

case None =>

339

// Timeout case

340

if (state.isTimingOut()) {

341

val session = state.get()

342

state.remove()

343

(userId, -1) // indicate session ended

344

} else {

345

(userId, 0) // no change

346

}

347

}

348

}

349

).timeout(Minutes(30)) // 30-minute timeout

350

351

val sessionEvents: MapWithStateDStream[String, UserEvent, Session, (String, Int)] = userEvents.mapWithState(trackSessions)

352

353

// Word count with mapWithState

354

val wordCountSpec = StateSpec.function(

355

(word: String, count: Option[Int], state: State[Int]) => {

356

val currentCount = count.getOrElse(0)

357

val previousCount = state.getOrElse(0)

358

val newCount = currentCount + previousCount

359

state.update(newCount)

360

(word, newCount)

361

}

362

).numPartitions(10)

363

364

val wordCounts: MapWithStateDStream[String, Int, Int, (String, Int)] = pairs.mapWithState(wordCountSpec)

365

366

// Get snapshot of current state

367

val currentState: DStream[(String, Int)] = wordCounts.stateSnapshots()

368

currentState.print()

369

```

370

371

### Transform Operations

372

373

Apply arbitrary RDD transformations to DStreams for custom processing logic.

374

375

```scala { .api }

376

abstract class DStream[T] {

377

/** Transform DStream using RDD operations */

378

def transform[U](transformFunc: RDD[T] => RDD[U]): DStream[U]

379

380

/** Transform with access to batch time */

381

def transform[U](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]

382

383

/** Transform with multiple DStreams */

384

def transformWith[U, V](

385

other: DStream[U],

386

transformFunc: (RDD[T], RDD[U]) => RDD[V]

387

): DStream[V]

388

389

/** Transform with multiple DStreams and time access */

390

def transformWith[U, V](

391

other: DStream[U],

392

transformFunc: (RDD[T], RDD[U], Time) => RDD[V]

393

): DStream[V]

394

}

395

396

class StreamingContext {

397

/** Transform multiple DStreams together */

398

def transform[T](transformFunc: Seq[RDD[_]] => RDD[T]): DStream[T]

399

}

400

```

401

402

**Usage Examples:**

403

404

```scala

405

val lines: DStream[String] = ssc.textFileStream("/path/to/files")

406

407

// Custom transformation with RDD operations

408

val processed = lines.transform { rdd =>

409

// Use any RDD operation

410

rdd.filter(_.nonEmpty)

411

.map(_.split(","))

412

.filter(_.length >= 3)

413

.map(fields => (fields(0), fields(1).toInt))

414

.reduceByKey(_ + _)

415

}

416

417

// Transform with time information

418

val timeAware = lines.transform { (rdd, time) =>

419

println(s"Processing batch at time: $time")

420

rdd.map(line => s"$time: $line")

421

}

422

423

// Transform two DStreams together

424

val stream1: DStream[String] = // first stream

425

val stream2: DStream[Int] = // second stream

426

427

val combined = stream1.transformWith(stream2) { (rdd1, rdd2) =>

428

val count1 = rdd1.count()

429

val count2 = rdd2.count()

430

rdd1.sparkContext.parallelize(Seq(s"Stream1: $count1, Stream2: $count2"))

431

}

432

433

// Multi-stream transformation

434

val multiTransform = ssc.transform(Seq(stream1, stream2)) { rdds =>

435

val rdd1 = rdds(0).asInstanceOf[RDD[String]]

436

val rdd2 = rdds(1).asInstanceOf[RDD[Int]]

437

// Custom logic combining multiple RDDs

438

rdd1.zipWithIndex().join(rdd2.zipWithIndex()).map(_._2)

439

}

440

```

441

442

### Advanced Data Processing Patterns

443

444

Complex streaming patterns for sophisticated data processing scenarios.

445

446

```scala { .api }

447

/**

448

* Pattern: Deduplication over time window

449

*/

450

def deduplicateOverWindow[T](

451

stream: DStream[T],

452

windowDuration: Duration,

453

slideDuration: Duration

454

)(keyFunc: T => String): DStream[T] = {

455

stream.window(windowDuration, slideDuration)

456

.map(item => (keyFunc(item), item))

457

.groupByKey()

458

.map(_._2.head) // Take first occurrence

459

}

460

461

/**

462

* Pattern: Top-K elements by window

463

*/

464

def topKByWindow[T](

465

stream: DStream[T],

466

k: Int,

467

windowDuration: Duration,

468

slideDuration: Duration

469

)(implicit ord: Ordering[T]): DStream[Array[T]] = {

470

stream.window(windowDuration, slideDuration)

471

.transform(_.takeOrdered(k)(ord.reverse))

472

}

473

474

/**

475

* Pattern: Threshold-based alerting

476

*/

477

def thresholdAlert[K, V](

478

stream: DStream[(K, V)],

479

threshold: V

480

)(implicit ord: Ordering[V]): DStream[(K, V)] = {

481

stream.filter { case (_, value) => ord.gteq(value, threshold) }

482

}

483

```

484

485

**Usage Examples:**

486

487

```scala

488

// Deduplication example

489

val events: DStream[Event] = // stream of events

490

val uniqueEvents = deduplicateOverWindow(events, Minutes(5), Seconds(30))(_.id)

491

492

// Top-K pattern

493

val scores: DStream[Int] = // stream of scores

494

val topScores = topKByWindow(scores, 10, Minutes(1), Seconds(10))

495

496

// Threshold alerting

497

val metrics: DStream[(String, Double)] = // stream of metrics

498

val alerts = thresholdAlert(metrics, 90.0) // alert when > 90

499

500

alerts.foreachRDD { rdd =>

501

rdd.collect().foreach { case (metric, value) =>

502

println(s"ALERT: $metric exceeded threshold with value $value")

503

}

504

}

505

```