0
# DStream Transformations
1
2
Comprehensive transformation operations for processing streaming data including mapping, filtering, windowing, aggregations, and advanced operations.
3
4
## Basic Transformations
5
6
### Map Operations
7
8
Transform each element:
9
```scala { .api }
10
def map[U: ClassTag](mapFunc: T => U): DStream[U]
11
```
12
13
Transform each element with partition information:
14
```scala { .api }
15
def mapPartitions[U: ClassTag](
16
mapPartFunc: Iterator[T] => Iterator[U],
17
preservePartitioning: Boolean = false
18
): DStream[U]
19
```
20
21
Example map operations:
22
```scala
23
val lines = ssc.socketTextStream("localhost", 9999)
24
val lengths = lines.map(_.length)
25
val upperCase = lines.map(_.toUpperCase)
26
27
// MapPartitions for batch processing
28
val batchProcessed = lines.mapPartitions { iter =>
29
val batch = iter.toList
30
processBatch(batch).iterator
31
}
32
```
33
34
### FlatMap Operations
35
36
Transform and flatten elements:
37
```scala { .api }
38
def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U]
39
```
40
41
Example word splitting:
42
```scala
43
val lines = ssc.socketTextStream("localhost", 9999)
44
val words = lines.flatMap(_.split("\\s+"))
45
val nonEmptyWords = lines.flatMap(_.split("\\s+").filter(_.nonEmpty))
46
```
47
48
### Filter Operations
49
50
Filter elements based on predicate:
51
```scala { .api }
52
def filter(filterFunc: T => Boolean): DStream[T]
53
```
54
55
Example filtering:
56
```scala
57
val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)
58
val evenNumbers = numbers.filter(_ % 2 == 0)
59
val positiveNumbers = numbers.filter(_ > 0)
60
```
61
62
## Aggregation Operations
63
64
### Reduce Operations
65
66
Reduce elements in each RDD:
67
```scala { .api }
68
def reduce(reduceFunc: (T, T) => T): DStream[T]
69
```
70
71
Count elements in each RDD:
72
```scala { .api }
73
def count(): DStream[Long]
74
```
75
76
Count by value:
77
```scala { .api }
78
def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)]
79
```
80
81
Example aggregations:
82
```scala
83
val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)
84
85
val sum = numbers.reduce(_ + _)
86
val count = numbers.count()
87
val histogram = numbers.countByValue()
88
89
sum.print()
90
count.print()
91
histogram.print()
92
```
93
94
## Partitioning Operations
95
96
### Repartition
97
98
Change number of partitions:
99
```scala { .api }
100
def repartition(numPartitions: Int): DStream[T]
101
```
102
103
### Coalesce
104
105
Coalesce elements within partitions:
106
```scala { .api }
107
def glom(): DStream[Array[T]]
108
```
109
110
Example partitioning:
111
```scala
112
val lines = ssc.socketTextStream("localhost", 9999)
113
val repartitioned = lines.repartition(4)
114
val grouped = lines.glom() // Group elements within partitions
115
```
116
117
## Window Operations
118
119
### Basic Windowing
120
121
Create windowed DStream:
122
```scala { .api }
123
def window(windowDuration: Duration): DStream[T]
124
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
125
```
126
127
Example windowing:
128
```scala
129
val lines = ssc.socketTextStream("localhost", 9999)
130
131
// 30-second windows sliding every 10 seconds
132
val windowedLines = lines.window(Seconds(30), Seconds(10))
133
val windowCounts = windowedLines.count()
134
```
135
136
### Windowed Reductions
137
138
Reduce over windows:
139
```scala { .api }
140
def reduceByWindow(
141
reduceFunc: (T, T) => T,
142
windowDuration: Duration,
143
slideDuration: Duration
144
): DStream[T]
145
```
146
147
Optimized windowed reduction with inverse function:
148
```scala { .api }
149
def reduceByWindow(
150
reduceFunc: (T, T) => T,
151
invReduceFunc: (T, T) => T,
152
windowDuration: Duration,
153
slideDuration: Duration
154
): DStream[T]
155
```
156
157
Example windowed reductions:
158
```scala
159
val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)
160
161
// Sum over 1-minute windows, sliding every 10 seconds
162
val windowSum = numbers.reduceByWindow(_ + _, Minutes(1), Seconds(10))
163
164
// Optimized version with inverse function for subtraction
165
val optimizedSum = numbers.reduceByWindow(
166
_ + _, // Add function
167
_ - _, // Inverse (subtract) function
168
Minutes(1), // Window duration
169
Seconds(10) // Slide duration
170
)
171
```
172
173
### Windowed Counting
174
175
Count elements over windows:
176
```scala { .api }
177
def countByWindow(
178
windowDuration: Duration,
179
slideDuration: Duration
180
): DStream[Long]
181
```
182
183
Count by value over windows:
184
```scala { .api }
185
def countByValueAndWindow(
186
windowDuration: Duration,
187
slideDuration: Duration,
188
numPartitions: Int = ssc.sc.defaultParallelism
189
): DStream[(T, Long)]
190
```
191
192
## Pair DStream Operations
193
194
### Key-Value Transformations
195
196
Map values while preserving keys:
197
```scala { .api }
198
def mapValues[U: ClassTag](f: V => U): DStream[(K, U)] // On DStream[(K, V)]
199
```
200
201
FlatMap values:
202
```scala { .api }
203
def flatMapValues[U: ClassTag](f: V => TraversableOnce[U]): DStream[(K, U)]
204
```
205
206
Example key-value transformations:
207
```scala
208
val pairs = ssc.socketTextStream("localhost", 9999)
209
.map(line => (line.split(":")(0), line.split(":")(1)))
210
211
val upperValues = pairs.mapValues(_.toUpperCase)
212
val wordValues = pairs.flatMapValues(_.split("\\s+"))
213
```
214
215
### Grouping Operations
216
217
Group by key:
218
```scala { .api }
219
def groupByKey(): DStream[(K, Iterable[V])] // On DStream[(K, V)]
220
def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]
221
def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])]
222
```
223
224
Example grouping:
225
```scala
226
val keyValuePairs = ssc.socketTextStream("localhost", 9999)
227
.map(line => (line.charAt(0).toString, line))
228
229
val grouped = keyValuePairs.groupByKey()
230
grouped.foreachRDD { rdd =>
231
rdd.collect().foreach { case (key, values) =>
232
println(s"Key: $key, Count: ${values.size}")
233
}
234
}
235
```
236
237
### Reduction Operations
238
239
Reduce by key:
240
```scala { .api }
241
def reduceByKey(func: (V, V) => V): DStream[(K, V)]
242
def reduceByKey(func: (V, V) => V, numPartitions: Int): DStream[(K, V)]
243
def reduceByKey(func: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]
244
```
245
246
Combine by key:
247
```scala { .api }
248
def combineByKey[C: ClassTag](
249
createCombiner: V => C,
250
mergeValue: (C, V) => C,
251
mergeCombiner: (C, C) => C,
252
partitioner: Partitioner,
253
mapSideCombine: Boolean = true
254
): DStream[(K, C)]
255
```
256
257
Example reductions:
258
```scala
259
val wordCounts = ssc.socketTextStream("localhost", 9999)
260
.flatMap(_.split("\\s+"))
261
.map((_, 1))
262
.reduceByKey(_ + _)
263
264
// Advanced combine operation for computing averages
265
val scores = ssc.socketTextStream("localhost", 9999)
266
.map(line => (line.split(",")(0), line.split(",")(1).toDouble))
267
268
val averages = scores.combineByKey(
269
(score: Double) => (score, 1), // Create combiner
270
(acc: (Double, Int), score: Double) => (acc._1 + score, acc._2 + 1), // Merge value
271
(acc1: (Double, Int), acc2: (Double, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // Merge combiners
272
).mapValues { case (sum, count) => sum / count }
273
```
274
275
### Windowed Key-Value Operations
276
277
Group by key over windows:
278
```scala { .api }
279
def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]
280
def groupByKeyAndWindow(
281
windowDuration: Duration,
282
slideDuration: Duration
283
): DStream[(K, Iterable[V])]
284
def groupByKeyAndWindow(
285
windowDuration: Duration,
286
slideDuration: Duration,
287
numPartitions: Int
288
): DStream[(K, Iterable[V])]
289
def groupByKeyAndWindow(
290
windowDuration: Duration,
291
slideDuration: Duration,
292
partitioner: Partitioner
293
): DStream[(K, Iterable[V])]
294
```
295
296
Reduce by key over windows:
297
```scala { .api }
298
def reduceByKeyAndWindow(
299
func: (V, V) => V,
300
windowDuration: Duration
301
): DStream[(K, V)]
302
303
def reduceByKeyAndWindow(
304
func: (V, V) => V,
305
windowDuration: Duration,
306
slideDuration: Duration
307
): DStream[(K, V)]
308
309
def reduceByKeyAndWindow(
310
func: (V, V) => V,
311
windowDuration: Duration,
312
slideDuration: Duration,
313
numPartitions: Int
314
): DStream[(K, V)]
315
316
def reduceByKeyAndWindow(
317
func: (V, V) => V,
318
windowDuration: Duration,
319
slideDuration: Duration,
320
partitioner: Partitioner
321
): DStream[(K, V)]
322
```
323
324
Optimized windowed reduction:
325
```scala { .api }
326
def reduceByKeyAndWindow(
327
reduceFunc: (V, V) => V,
328
invReduceFunc: (V, V) => V,
329
windowDuration: Duration,
330
slideDuration: Duration
331
): DStream[(K, V)]
332
```
333
334
## Join Operations
335
336
### Basic Joins
337
338
Inner join:
339
```scala { .api }
340
def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]
341
def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))]
342
```
343
344
Left outer join:
345
```scala { .api }
346
def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
347
def leftOuterJoin[W: ClassTag](
348
other: DStream[(K, W)],
349
numPartitions: Int
350
): DStream[(K, (V, Option[W]))]
351
```
352
353
Right outer join:
354
```scala { .api }
355
def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]
356
def rightOuterJoin[W: ClassTag](
357
other: DStream[(K, W)],
358
numPartitions: Int
359
): DStream[(K, (Option[V], W))]
360
```
361
362
Full outer join:
363
```scala { .api }
364
def fullOuterJoin[W: ClassTag](
365
other: DStream[(K, W)]
366
): DStream[(K, (Option[V], Option[W]))]
367
def fullOuterJoin[W: ClassTag](
368
other: DStream[(K, W)],
369
numPartitions: Int
370
): DStream[(K, (Option[V], Option[W]))]
371
```
372
373
Example joins:
374
```scala
375
val userActions = ssc.socketTextStream("localhost", 9999)
376
.map(line => (line.split(",")(0), line.split(",")(1))) // (userId, action)
377
378
val userProfiles = ssc.socketTextStream("localhost", 9998)
379
.map(line => (line.split(",")(0), line.split(",")(1))) // (userId, profile)
380
381
val enrichedActions = userActions.join(userProfiles)
382
val optionalEnriched = userActions.leftOuterJoin(userProfiles)
383
```
384
385
## Union Operations
386
387
Union multiple DStreams:
388
```scala { .api }
389
def union(that: DStream[T]): DStream[T]
390
```
391
392
Static union method:
393
```scala { .api }
394
def union[T](streams: Seq[DStream[T]]): DStream[T] // On StreamingContext
395
```
396
397
Example union:
398
```scala
399
val stream1 = ssc.socketTextStream("localhost", 9999)
400
val stream2 = ssc.socketTextStream("localhost", 9998)
401
val stream3 = ssc.textFileStream("/data/input")
402
403
// Union two streams
404
val combined = stream1.union(stream2)
405
406
// Union multiple streams
407
val allCombined = ssc.union(Seq(stream1, stream2, stream3))
408
```
409
410
## Transform Operations
411
412
### RDD-level Transformations
413
414
Apply arbitrary RDD transformations:
415
```scala { .api }
416
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]
417
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]
418
```
419
420
Transform with another DStream:
421
```scala { .api }
422
def transformWith[U: ClassTag, V: ClassTag](
423
other: DStream[U],
424
transformFunc: (RDD[T], RDD[U]) => RDD[V]
425
): DStream[V]
426
427
def transformWith[U: ClassTag, V: ClassTag](
428
other: DStream[U],
429
transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
430
): DStream[V]
431
```
432
433
Example transforms:
434
```scala
435
val lines = ssc.socketTextStream("localhost", 9999)
436
437
// Apply complex RDD operations
438
val processed = lines.transform { rdd =>
439
rdd.filter(_.nonEmpty)
440
.map(_.toLowerCase)
441
.zipWithIndex()
442
.filter(_._2 % 2 == 0)
443
.map(_._1)
444
}
445
446
// Transform with time information
447
val timestamped = lines.transform { (rdd, time) =>
448
rdd.map(line => s"${time.milliseconds}: $line")
449
}
450
451
// Transform with another stream
452
val stream2 = ssc.socketTextStream("localhost", 9998)
453
val combined = lines.transformWith(stream2) { (rdd1, rdd2) =>
454
rdd1.union(rdd2).distinct()
455
}
456
```
457
458
## Utility Transformations
459
460
### Cache and Persistence
461
462
Cache DStream:
463
```scala { .api }
464
def cache(): DStream[T]
465
def persist(): DStream[T]
466
def persist(level: StorageLevel): DStream[T]
467
```
468
469
### Checkpointing
470
471
Enable checkpointing:
472
```scala { .api }
473
def checkpoint(interval: Duration): DStream[T]
474
```
475
476
Example persistence:
477
```scala
478
val expensiveStream = lines
479
.map(expensiveTransformation)
480
.cache() // Cache for reuse
481
482
val checkpointedStream = expensiveStream
483
.checkpoint(Seconds(10)) // Checkpoint every 10 seconds
484
```
485
486
### Slicing
487
488
Get RDDs from time range:
489
```scala { .api }
490
def slice(interval: Interval): Seq[RDD[T]]
491
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]]
492
```
493
494
Example slicing:
495
```scala
496
val stream = ssc.socketTextStream("localhost", 9999)
497
498
// Get RDDs from last 30 seconds
499
val currentTime = new Time(System.currentTimeMillis())
500
val past30Seconds = currentTime - Seconds(30)
501
val recentRDDs = stream.slice(past30Seconds, currentTime)
502
```