0
# Advanced Transformations and Windowing
1
2
Advanced streaming operations including windowed computations, stateful transformations, and complex data processing patterns for sophisticated streaming analytics and temporal data analysis.
3
4
## Capabilities
5
6
### Window Operations
7
8
Apply operations over sliding windows of data for temporal aggregations and time-based analytics.
9
10
```scala { .api }
11
abstract class DStream[T] {
12
/** Create windowed DStream with specified window and slide durations */
13
def window(windowDuration: Duration): DStream[T]
14
15
/** Create windowed DStream with custom slide duration */
16
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
17
18
/** Reduce elements over a sliding window */
19
def reduceByWindow(
20
reduceFunc: (T, T) => T,
21
windowDuration: Duration,
22
slideDuration: Duration
23
): DStream[T]
24
25
/** Incremental reduce with inverse function for efficiency */
26
def reduceByWindow(
27
reduceFunc: (T, T) => T,
28
invReduceFunc: (T, T) => T,
29
windowDuration: Duration,
30
slideDuration: Duration
31
): DStream[T]
32
33
/** Count elements in sliding window */
34
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]
35
36
/** Count unique values in sliding window */
37
def countByValueAndWindow(
38
windowDuration: Duration,
39
slideDuration: Duration
40
): DStream[(T, Long)]
41
42
/** Count unique values with custom partitions */
43
def countByValueAndWindow(
44
windowDuration: Duration,
45
slideDuration: Duration,
46
numPartitions: Int
47
): DStream[(T, Long)]
48
}
49
```
50
51
**Usage Examples:**
52
53
```scala
54
import org.apache.spark.streaming._
55
56
val numbers: DStream[Int] = // stream of integers
57
58
// Basic windowing - collect data over 30 seconds, slide every 10 seconds
59
val windowed = numbers.window(Seconds(30), Seconds(10))
60
61
// Reduce over window - sum all numbers in window
62
val windowSum = numbers.reduceByWindow(_ + _, Seconds(30), Seconds(10))
63
64
// Incremental reduce for efficiency
65
val efficientSum = numbers.reduceByWindow(
66
_ + _, // reduce function
67
_ - _, // inverse reduce function
68
Seconds(30), // window duration
69
Seconds(10) // slide duration
70
)
71
72
// Count elements in window
73
val windowCounts = numbers.countByWindow(Seconds(30), Seconds(10))
74
75
// Window operations on text
76
val words: DStream[String] = ssc.socketTextStream("localhost", 9999).flatMap(_.split(" "))
77
val wordCounts = words.countByValueAndWindow(Seconds(30), Seconds(10))
78
79
wordCounts.print()
80
```
81
82
### Windowed Operations on Pair DStreams
83
84
Windowed operations specifically for key-value pairs, enabling temporal aggregations by key.
85
86
```scala { .api }
87
class PairDStreamFunctions[K, V] {
88
/** Group by key over sliding window */
89
def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]
90
91
/** Group by key with custom slide duration */
92
def groupByKeyAndWindow(
93
windowDuration: Duration,
94
slideDuration: Duration
95
): DStream[(K, Iterable[V])]
96
97
/** Group by key with custom partitioner */
98
def groupByKeyAndWindow(
99
windowDuration: Duration,
100
slideDuration: Duration,
101
partitioner: Partitioner
102
): DStream[(K, Iterable[V])]
103
104
/** Group by key with partition count */
105
def groupByKeyAndWindow(
106
windowDuration: Duration,
107
slideDuration: Duration,
108
numPartitions: Int
109
): DStream[(K, Iterable[V])]
110
111
/** Reduce by key over sliding window */
112
def reduceByKeyAndWindow(
113
reduceFunc: (V, V) => V,
114
windowDuration: Duration,
115
slideDuration: Duration
116
): DStream[(K, V)]
117
118
/** Reduce by key with custom partitioner */
119
def reduceByKeyAndWindow(
120
reduceFunc: (V, V) => V,
121
windowDuration: Duration,
122
slideDuration: Duration,
123
partitioner: Partitioner
124
): DStream[(K, V)]
125
126
/** Incremental reduce by key with inverse function */
127
def reduceByKeyAndWindow(
128
reduceFunc: (V, V) => V,
129
invReduceFunc: (V, V) => V,
130
windowDuration: Duration,
131
slideDuration: Duration
132
): DStream[(K, V)]
133
134
/** Incremental reduce with custom partitioner */
135
def reduceByKeyAndWindow(
136
reduceFunc: (V, V) => V,
137
invReduceFunc: (V, V) => V,
138
windowDuration: Duration,
139
slideDuration: Duration,
140
partitioner: Partitioner
141
): DStream[(K, V)]
142
143
/** Incremental reduce with partition count and filtering */
144
def reduceByKeyAndWindow(
145
reduceFunc: (V, V) => V,
146
invReduceFunc: (V, V) => V,
147
windowDuration: Duration,
148
slideDuration: Duration,
149
numPartitions: Int,
150
filterFunc: ((K, V)) => Boolean
151
): DStream[(K, V)]
152
}
153
```
154
155
**Usage Examples:**
156
157
```scala
158
val pairs: DStream[(String, Int)] = words.map(word => (word, 1))
159
160
// Group words by key over 1-minute windows
161
val groupedByWindow = pairs.groupByKeyAndWindow(Minutes(1), Seconds(10))
162
163
// Word count over sliding window
164
val wordCountsWindow = pairs.reduceByKeyAndWindow(_ + _, Minutes(1), Seconds(10))
165
166
// Efficient incremental word counting
167
val efficientWordCounts = pairs.reduceByKeyAndWindow(
168
_ + _, // add new words
169
_ - _, // subtract old words
170
Minutes(1), // window duration
171
Seconds(10) // slide duration
172
)
173
174
// Window with filtering - only words with count > 5
175
val filteredCounts = pairs.reduceByKeyAndWindow(
176
_ + _,
177
_ - _,
178
Minutes(1),
179
Seconds(10),
180
10, // numPartitions
181
{ case (word, count) => count > 5 }
182
)
183
```
184
185
### Stateful Transformations
186
187
Maintain state across batches for sophisticated streaming computations that require memory of past events.
188
189
```scala { .api }
190
class PairDStreamFunctions[K, V] {
191
/** Update state by key using update function */
192
def updateStateByKey[S](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
193
194
/** Update state with custom partitioner */
195
def updateStateByKey[S](
196
updateFunc: (Seq[V], Option[S]) => Option[S],
197
partitioner: Partitioner
198
): DStream[(K, S)]
199
200
/** Update state with partition count */
201
def updateStateByKey[S](
202
updateFunc: (Seq[V], Option[S]) => Option[S],
203
numPartitions: Int
204
): DStream[(K, S)]
205
206
/** Update state with initial RDD */
207
def updateStateByKey[S](
208
updateFunc: (Seq[V], Option[S]) => Option[S],
209
partitioner: Partitioner,
210
initialRDD: RDD[(K, S)]
211
): DStream[(K, S)]
212
213
/** Advanced stateful operations using mapWithState */
214
def mapWithState[StateType, MappedType](
215
spec: StateSpec[K, V, StateType, MappedType]
216
): MapWithStateDStream[K, V, StateType, MappedType]
217
}
218
```
219
220
**Usage Examples:**
221
222
```scala
223
// Running word count with updateStateByKey
224
val runningCounts = pairs.updateStateByKey[Int] { (values: Seq[Int], state: Option[Int]) =>
225
val currentCount = values.sum
226
val previousCount = state.getOrElse(0)
227
Some(currentCount + previousCount)
228
}
229
230
// Session tracking example
231
case class Session(startTime: Long, lastSeen: Long, eventCount: Int)
232
233
val sessionUpdates = userEvents.updateStateByKey[Session] {
234
(events: Seq[UserEvent], session: Option[Session]) =>
235
if (events.nonEmpty) {
236
val now = System.currentTimeMillis()
237
session match {
238
case Some(s) =>
239
// Update existing session
240
Some(s.copy(lastSeen = now, eventCount = s.eventCount + events.length))
241
case None =>
242
// New session
243
Some(Session(now, now, events.length))
244
}
245
} else {
246
// No new events - check if session should timeout
247
session.filter(s => System.currentTimeMillis() - s.lastSeen < 300000) // 5 min timeout
248
}
249
}
250
251
// Complex state with custom partitioner
252
val partitioner = new HashPartitioner(8)
253
val statefulStream = pairs.updateStateByKey(updateFunc, partitioner)
254
```
255
256
### MapWithState Operations
257
258
More efficient and flexible stateful operations with timeout support and better performance.
259
260
```scala { .api }
261
/**
262
* Specification for mapWithState operation
263
*/
264
class StateSpec[KeyType, ValueType, StateType, MappedType] private (
265
mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType
266
) {
267
/** Set initial state RDD */
268
def initialState(rdd: RDD[(KeyType, StateType)]): StateSpec[KeyType, ValueType, StateType, MappedType]
269
270
/** Set number of partitions */
271
def numPartitions(numPartitions: Int): StateSpec[KeyType, ValueType, StateType, MappedType]
272
273
/** Set custom partitioner */
274
def partitioner(partitioner: Partitioner): StateSpec[KeyType, ValueType, StateType, MappedType]
275
276
/** Set timeout duration for idle keys */
277
def timeout(idleDuration: Duration): StateSpec[KeyType, ValueType, StateType, MappedType]
278
}
279
280
object StateSpec {
281
/** Create StateSpec with mapping function */
282
def function[KeyType, ValueType, StateType, MappedType](
283
mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType
284
): StateSpec[KeyType, ValueType, StateType, MappedType]
285
}
286
287
/**
288
* State object for managing state in mapWithState operations
289
*/
290
abstract class State[S] {
291
/** Check if state exists */
292
def exists(): Boolean
293
294
/** Get current state value */
295
def get(): S
296
297
/** Update state with new value */
298
def update(newState: S): Unit
299
300
/** Remove state */
301
def remove(): Unit
302
303
/** Check if state is timing out */
304
def isTimingOut(): Boolean
305
}
306
307
/**
308
* DStream returned by mapWithState operation
309
*/
310
class MapWithStateDStream[K, V, StateType, MappedType] extends DStream[MappedType] {
311
/** Get snapshots of current state */
312
def stateSnapshots(): DStream[(K, StateType)]
313
}
314
```
315
316
**Usage Examples:**
317
318
```scala
319
import org.apache.spark.streaming._
320
321
// Efficient session tracking with mapWithState
322
val trackSessions = StateSpec.function(
323
(userId: String, event: Option[UserEvent], state: State[Session]) => {
324
val currentTime = System.currentTimeMillis()
325
326
event match {
327
case Some(e) =>
328
// Update or create session
329
if (state.exists()) {
330
val session = state.get()
331
state.update(session.copy(lastSeen = currentTime, eventCount = session.eventCount + 1))
332
(userId, session.eventCount + 1) // return event count
333
} else {
334
val newSession = Session(currentTime, currentTime, 1)
335
state.update(newSession)
336
(userId, 1)
337
}
338
case None =>
339
// Timeout case
340
if (state.isTimingOut()) {
341
val session = state.get()
342
state.remove()
343
(userId, -1) // indicate session ended
344
} else {
345
(userId, 0) // no change
346
}
347
}
348
}
349
).timeout(Minutes(30)) // 30-minute timeout
350
351
val sessionEvents: MapWithStateDStream[String, UserEvent, Session, (String, Int)] = userEvents.mapWithState(trackSessions)
352
353
// Word count with mapWithState
354
val wordCountSpec = StateSpec.function(
355
(word: String, count: Option[Int], state: State[Int]) => {
356
val currentCount = count.getOrElse(0)
357
val previousCount = state.getOrElse(0)
358
val newCount = currentCount + previousCount
359
state.update(newCount)
360
(word, newCount)
361
}
362
).numPartitions(10)
363
364
val wordCounts: MapWithStateDStream[String, Int, Int, (String, Int)] = pairs.mapWithState(wordCountSpec)
365
366
// Get snapshot of current state
367
val currentState: DStream[(String, Int)] = wordCounts.stateSnapshots()
368
currentState.print()
369
```
370
371
### Transform Operations
372
373
Apply arbitrary RDD transformations to DStreams for custom processing logic.
374
375
```scala { .api }
376
abstract class DStream[T] {
377
/** Transform DStream using RDD operations */
378
def transform[U](transformFunc: RDD[T] => RDD[U]): DStream[U]
379
380
/** Transform with access to batch time */
381
def transform[U](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]
382
383
/** Transform with multiple DStreams */
384
def transformWith[U, V](
385
other: DStream[U],
386
transformFunc: (RDD[T], RDD[U]) => RDD[V]
387
): DStream[V]
388
389
/** Transform with multiple DStreams and time access */
390
def transformWith[U, V](
391
other: DStream[U],
392
transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
393
): DStream[V]
394
}
395
396
class StreamingContext {
397
/** Transform multiple DStreams together */
398
def transform[T](transformFunc: Seq[RDD[_]] => RDD[T]): DStream[T]
399
}
400
```
401
402
**Usage Examples:**
403
404
```scala
405
val lines: DStream[String] = ssc.textFileStream("/path/to/files")
406
407
// Custom transformation with RDD operations
408
val processed = lines.transform { rdd =>
409
// Use any RDD operation
410
rdd.filter(_.nonEmpty)
411
.map(_.split(","))
412
.filter(_.length >= 3)
413
.map(fields => (fields(0), fields(1).toInt))
414
.reduceByKey(_ + _)
415
}
416
417
// Transform with time information
418
val timeAware = lines.transform { (rdd, time) =>
419
println(s"Processing batch at time: $time")
420
rdd.map(line => s"$time: $line")
421
}
422
423
// Transform two DStreams together
424
val stream1: DStream[String] = // first stream
425
val stream2: DStream[Int] = // second stream
426
427
val combined = stream1.transformWith(stream2) { (rdd1, rdd2) =>
428
val count1 = rdd1.count()
429
val count2 = rdd2.count()
430
rdd1.sparkContext.parallelize(Seq(s"Stream1: $count1, Stream2: $count2"))
431
}
432
433
// Multi-stream transformation
434
val multiTransform = ssc.transform(Seq(stream1, stream2)) { rdds =>
435
val rdd1 = rdds(0).asInstanceOf[RDD[String]]
436
val rdd2 = rdds(1).asInstanceOf[RDD[Int]]
437
// Custom logic combining multiple RDDs
438
rdd1.zipWithIndex().join(rdd2.zipWithIndex()).map(_._2)
439
}
440
```
441
442
### Advanced Data Processing Patterns
443
444
Complex streaming patterns for sophisticated data processing scenarios.
445
446
```scala { .api }
447
/**
448
* Pattern: Deduplication over time window
449
*/
450
def deduplicateOverWindow[T](
451
stream: DStream[T],
452
windowDuration: Duration,
453
slideDuration: Duration
454
)(keyFunc: T => String): DStream[T] = {
455
stream.window(windowDuration, slideDuration)
456
.map(item => (keyFunc(item), item))
457
.groupByKey()
458
.map(_._2.head) // Take first occurrence
459
}
460
461
/**
462
* Pattern: Top-K elements by window
463
*/
464
def topKByWindow[T](
465
stream: DStream[T],
466
k: Int,
467
windowDuration: Duration,
468
slideDuration: Duration
469
)(implicit ord: Ordering[T]): DStream[Array[T]] = {
470
stream.window(windowDuration, slideDuration)
471
.transform(_.takeOrdered(k)(ord.reverse))
472
}
473
474
/**
475
* Pattern: Threshold-based alerting
476
*/
477
def thresholdAlert[K, V](
478
stream: DStream[(K, V)],
479
threshold: V
480
)(implicit ord: Ordering[V]): DStream[(K, V)] = {
481
stream.filter { case (_, value) => ord.gteq(value, threshold) }
482
}
483
```
484
485
**Usage Examples:**
486
487
```scala
488
// Deduplication example
489
val events: DStream[Event] = // stream of events
490
val uniqueEvents = deduplicateOverWindow(events, Minutes(5), Seconds(30))(_.id)
491
492
// Top-K pattern
493
val scores: DStream[Int] = // stream of scores
494
val topScores = topKByWindow(scores, 10, Minutes(1), Seconds(10))
495
496
// Threshold alerting
497
val metrics: DStream[(String, Double)] = // stream of metrics
498
val alerts = thresholdAlert(metrics, 90.0) // alert when > 90
499
500
alerts.foreachRDD { rdd =>
501
rdd.collect().foreach { case (metric, value) =>
502
println(s"ALERT: $metric exceeded threshold with value $value")
503
}
504
}
505
```