0
# RDD Operations and Transformations
1
2
Resilient Distributed Datasets (RDDs) are the fundamental abstraction in Spark, representing immutable, fault-tolerant collections that can be operated on in parallel across a cluster.
3
4
## Core RDD Properties
5
6
```scala { .api }
7
abstract class RDD[T: ClassTag] {
8
def sparkContext: SparkContext
9
def id: Int
10
def partitions: Array[Partition]
11
def getNumPartitions: Int
12
def preferredLocations(split: Partition): Seq[String]
13
def dependencies: Seq[Dependency[_]]
14
def partitioner: Option[Partitioner]
15
def getStorageLevel: StorageLevel
16
}
17
```
18
19
## Transformations
20
21
Transformations are lazy operations that create new RDDs from existing ones. They are not executed until an action is called.
22
23
### Basic Transformations
24
25
```scala { .api }
26
abstract class RDD[T] {
27
def map[U: ClassTag](f: T => U): RDD[U]
28
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
29
def filter(f: T => Boolean): RDD[T]
30
def distinct(numPartitions: Int = numPartitions): RDD[T]
31
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
32
def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
33
}
34
```
35
36
### Set Operations
37
38
```scala { .api }
39
abstract class RDD[T] {
40
def union(other: RDD[T]): RDD[T]
41
def intersection(other: RDD[T]): RDD[T]
42
def intersection(other: RDD[T], partitioner: Partitioner): RDD[T]
43
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
44
def subtract(other: RDD[T]): RDD[T]
45
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
46
def subtract(other: RDD[T], p: Partitioner): RDD[T]
47
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
48
}
49
```
50
51
### Grouping and Partitioning
52
53
```scala { .api }
54
abstract class RDD[T] {
55
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
56
def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
57
def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
58
}
59
```
60
61
### Partition Operations
62
63
```scala { .api }
64
abstract class RDD[T] {
65
def mapPartitions[U: ClassTag](
66
f: Iterator[T] => Iterator[U],
67
preservesPartitioning: Boolean = false
68
): RDD[U]
69
70
def mapPartitionsWithIndex[U: ClassTag](
71
f: (Int, Iterator[T]) => Iterator[U],
72
preservesPartitioning: Boolean = false
73
): RDD[U]
74
75
def foreachPartition(f: Iterator[T] => Unit): Unit
76
def glom(): RDD[Array[T]]
77
}
78
```
79
80
### Repartitioning
81
82
```scala { .api }
83
abstract class RDD[T] {
84
def repartition(numPartitions: Int): RDD[T]
85
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]
86
def sortBy[K](
87
f: T => K,
88
ascending: Boolean = true,
89
numPartitions: Int = this.partitions.length
90
)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
91
}
92
```
93
94
### Pairing & Zipping Operations
95
96
```scala { .api }
97
abstract class RDD[T] {
98
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
99
def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B])(
100
f: (Iterator[T], Iterator[B]) => Iterator[V]
101
): RDD[V]
102
def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C])(
103
f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]
104
): RDD[V]
105
def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag](
106
rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D]
107
)(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V]
108
def zipWithIndex(): RDD[(T, Long)]
109
def zipWithUniqueId(): RDD[(T, Long)]
110
def keyBy[K](f: T => K): RDD[(K, T)]
111
}
112
```
113
114
### External Command Processing
115
116
```scala { .api }
117
abstract class RDD[T] {
118
def pipe(command: String): RDD[String]
119
def pipe(command: String, env: Map[String, String]): RDD[String]
120
def pipe(
121
command: Seq[String],
122
env: Map[String, String] = Map(),
123
printPipeContext: (String => Unit) => Unit = null,
124
printRDDElement: (T, String => Unit) => Unit = null,
125
separateWorkingDir: Boolean = false,
126
bufferSize: Int = 8192,
127
encoding: String = Codec.defaultCharsetCodec.name
128
): RDD[String]
129
}
130
```
131
132
## Actions
133
134
Actions trigger the execution of RDD transformations and return results to the driver or save data to storage.
135
136
### Collection Actions
137
138
```scala { .api }
139
abstract class RDD[T] {
140
def collect(): Array[T]
141
def toLocalIterator: Iterator[T]
142
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U]
143
def take(num: Int): Array[T]
144
def first(): T
145
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
146
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
147
}
148
```
149
150
### Reduction Actions
151
152
```scala { .api }
153
abstract class RDD[T] {
154
def reduce(f: (T, T) => T): T
155
def treeReduce(f: (T, T) => T, depth: Int = 2): T
156
def fold(zeroValue: T)(op: (T, T) => T): T
157
def aggregate[U: ClassTag](zeroValue: U)(
158
seqOp: (U, T) => U,
159
combOp: (U, U) => U
160
): U
161
def treeAggregate[U: ClassTag](zeroValue: U)(
162
seqOp: (U, T) => U,
163
combOp: (U, U) => U,
164
depth: Int = 2
165
): U
166
}
167
```
168
169
### Counting Actions
170
171
```scala { .api }
172
abstract class RDD[T] {
173
def count(): Long
174
def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]
175
def countByValue()(implicit ord: Ordering[T]): Map[T, Long]
176
def countByValueApprox(timeout: Long, confidence: Double = 0.95)(implicit ord: Ordering[T]): PartialResult[Map[T, BoundedDouble]]
177
def countApproxDistinct(relativeSD: Double = 0.05): Long
178
def countApproxDistinct(p: Int, sp: Int): Long
179
def isEmpty(): Boolean
180
}
181
```
182
183
### Statistical Actions
184
185
```scala { .api }
186
abstract class RDD[T] {
187
def max()(implicit ord: Ordering[T]): T
188
def min()(implicit ord: Ordering[T]): T
189
}
190
```
191
192
### Iterative Actions
193
194
```scala { .api }
195
abstract class RDD[T] {
196
def foreach(f: T => Unit): Unit
197
def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]
198
}
199
```
200
201
## Pair RDD Operations
202
203
When RDDs contain key-value pairs, additional operations become available through implicit conversions.
204
205
### Key-Value Transformations
206
207
```scala { .api }
208
// Available on RDD[(K, V)] through implicit conversion to PairRDDFunctions
209
class PairRDDFunctions[K, V](self: RDD[(K, V)]) {
210
def keys: RDD[K]
211
def values: RDD[V]
212
def mapValues[U](f: V => U): RDD[(K, U)]
213
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]
214
}
215
```
216
217
### Grouping Operations
218
219
```scala { .api }
220
class PairRDDFunctions[K, V] {
221
def groupByKey(): RDD[(K, Iterable[V])]
222
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
223
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
224
225
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
226
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
227
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
228
229
def aggregateByKey[U: ClassTag](zeroValue: U)(
230
seqOp: (U, V) => U,
231
combOp: (U, U) => U
232
): RDD[(K, U)]
233
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(
234
seqOp: (U, V) => U,
235
combOp: (U, U) => U
236
): RDD[(K, U)]
237
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(
238
seqOp: (U, V) => U,
239
combOp: (U, U) => U
240
): RDD[(K, U)]
241
242
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
243
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
244
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
245
246
def combineByKey[C](
247
createCombiner: V => C,
248
mergeValue: (C, V) => C,
249
mergeCombiners: (C, C) => C
250
): RDD[(K, C)]
251
def combineByKey[C](
252
createCombiner: V => C,
253
mergeValue: (C, V) => C,
254
mergeCombiners: (C, C) => C,
255
numPartitions: Int
256
): RDD[(K, C)]
257
def combineByKey[C](
258
createCombiner: V => C,
259
mergeValue: (C, V) => C,
260
mergeCombiners: (C, C) => C,
261
partitioner: Partitioner
262
): RDD[(K, C)]
263
}
264
```
265
266
### Join Operations
267
268
```scala { .api }
269
class PairRDDFunctions[K, V] {
270
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
271
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
272
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
273
274
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
275
def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]
276
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]
277
278
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
279
def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]
280
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]
281
282
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
283
def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]
284
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]
285
286
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
287
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
288
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]
289
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
290
def cogroup[W1, W2, W3](
291
other1: RDD[(K, W1)],
292
other2: RDD[(K, W2)],
293
other3: RDD[(K, W3)]
294
): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
295
}
296
```
297
298
### Set Operations on Pairs
299
300
```scala { .api }
301
class PairRDDFunctions[K, V] {
302
def subtractByKey[W](other: RDD[(K, W)]): RDD[(K, V)]
303
def subtractByKey[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
304
def subtractByKey[W](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]
305
}
306
```
307
308
### Sorting Operations
309
310
```scala { .api }
311
// Available on RDD[(K, V)] where K is Ordered
312
class OrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](self: RDD[(K, V)]) {
313
def sortByKey(ascending: Boolean = true): RDD[(K, V)]
314
def sortByKey(ascending: Boolean, numPartitions: Int): RDD[(K, V)]
315
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]
316
}
317
```
318
319
### Partitioning Operations
320
321
```scala { .api }
322
class PairRDDFunctions[K, V] {
323
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
324
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]
325
}
326
```
327
328
### Lookup and Collection
329
330
```scala { .api }
331
class PairRDDFunctions[K, V] {
332
def lookup(key: K): Seq[V]
333
def collectAsMap(): Map[K, V]
334
def reduceByKeyLocally(func: (V, V) => V): Map[K, V]
335
def countByKey(): Map[K, Long]
336
}
337
```
338
339
## Numeric RDD Operations
340
341
For RDDs containing numeric values, statistical operations are available.
342
343
```scala { .api }
344
// Available on RDD[Double] through implicit conversion
345
class DoubleRDDFunctions(self: RDD[Double]) {
346
def sum(): Double
347
def stats(): StatCounter
348
def mean(): Double
349
def variance(): Double
350
def stdev(): Double
351
def sampleStdev(): Double
352
def sampleVariance(): Double
353
def histogram(buckets: Array[Double]): Array[Long]
354
def histogram(buckets: Int): (Array[Double], Array[Long])
355
}
356
```
357
358
## Output Operations
359
360
```scala { .api }
361
abstract class RDD[T] {
362
def saveAsTextFile(path: String): Unit
363
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
364
def saveAsObjectFile(path: String): Unit
365
}
366
367
// For pair RDDs
368
class PairRDDFunctions[K, V] {
369
def saveAsHadoopFile[F <: OutputFormat[_, _]](
370
path: String,
371
keyClass: Class[_],
372
valueClass: Class[_],
373
outputFormatClass: Class[F],
374
conf: JobConf = new JobConf(self.context.hadoopConfiguration)
375
): Unit
376
377
def saveAsHadoopDataset(conf: JobConf): Unit
378
379
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
380
path: String,
381
keyClass: Class[_],
382
valueClass: Class[_],
383
outputFormatClass: Class[F],
384
conf: Configuration = self.context.hadoopConfiguration
385
): Unit
386
387
def saveAsNewAPIHadoopDataset(conf: Configuration): Unit
388
}
389
390
// For SequenceFile output
391
class SequenceFileRDDFunctions[K, V](self: RDD[(K, V)]) {
392
def saveAsSequenceFile(
393
path: String,
394
codec: Option[Class[_ <: CompressionCodec]] = None
395
): Unit
396
}
397
```
398
399
## Usage Examples
400
401
### Basic Transformations and Actions
402
403
```scala
404
val rdd = sc.parallelize(1 to 100)
405
406
// Transformations (lazy)
407
val evenNumbers = rdd.filter(_ % 2 == 0)
408
val doubled = evenNumbers.map(_ * 2)
409
val distinctValues = doubled.distinct()
410
411
// Actions (trigger computation)
412
val result = distinctValues.collect()
413
val count = distinctValues.count()
414
val sum = distinctValues.reduce(_ + _)
415
```
416
417
### Working with Key-Value Pairs
418
419
```scala
420
val pairs = sc.parallelize(Seq(
421
("apple", 1), ("banana", 2), ("apple", 3), ("cherry", 1)
422
))
423
424
// Group by key
425
val grouped = pairs.groupByKey()
426
// Result: ("apple", [1, 3]), ("banana", [2]), ("cherry", [1])
427
428
// Reduce by key
429
val totals = pairs.reduceByKey(_ + _)
430
// Result: ("apple", 4), ("banana", 2), ("cherry", 1)
431
432
// Join with another RDD
433
val prices = sc.parallelize(Seq(("apple", 0.5), ("banana", 0.3)))
434
val joined = totals.join(prices)
435
// Result: ("apple", (4, 0.5)), ("banana", (2, 0.3))
436
```
437
438
### Advanced Aggregations
439
440
```scala
441
val sales = sc.parallelize(Seq(
442
("store1", 100), ("store2", 200), ("store1", 150), ("store2", 175)
443
))
444
445
// Aggregate by key with different data types
446
case class SalesSummary(totalSales: Int, count: Int, avgSale: Double)
447
448
val summary = sales.aggregateByKey(SalesSummary(0, 0, 0.0))(
449
seqOp = { (summary, sale) =>
450
val newTotal = summary.totalSales + sale
451
val newCount = summary.count + 1
452
SalesSummary(newTotal, newCount, newTotal.toDouble / newCount)
453
},
454
combOp = { (sum1, sum2) =>
455
val combinedTotal = sum1.totalSales + sum2.totalSales
456
val combinedCount = sum1.count + sum2.count
457
SalesSummary(combinedTotal, combinedCount, combinedTotal.toDouble / combinedCount)
458
}
459
)
460
```
461
462
### Partition-Level Operations
463
464
```scala
465
val rdd = sc.parallelize(1 to 100, numSlices = 4)
466
467
// Process each partition independently
468
val partitionSums = rdd.mapPartitions { iter =>
469
Iterator(iter.sum)
470
}
471
472
// Access partition index
473
val partitionInfo = rdd.mapPartitionsWithIndex { (index, iter) =>
474
val partitionSize = iter.size
475
Iterator((s"partition-$index", partitionSize))
476
}
477
478
// Convert each partition to an array
479
val partitionArrays = rdd.glom()
480
```
481
482
### Statistical Operations
483
484
```scala
485
val numbers = sc.parallelize((1 to 1000).map(_.toDouble))
486
487
// Basic statistics
488
val stats = numbers.stats()
489
val mean = numbers.mean()
490
val variance = numbers.variance()
491
val stdev = numbers.stdev()
492
493
// Histogram
494
val (buckets, counts) = numbers.histogram(10)
495
```
496
497
### Repartitioning Examples
498
499
```scala
500
val rdd = sc.parallelize(1 to 100, numSlices = 10)
501
502
// Reduce number of partitions without shuffle
503
val coalesced = rdd.coalesce(5)
504
505
// Repartition with shuffle (can increase or decrease partitions)
506
val repartitioned = rdd.repartition(20)
507
508
// Sort by key function
509
val sorted = rdd.sortBy(x => -x) // Sort in descending order
510
```
511
512
## Performance Considerations
513
514
### Transformation Optimization
515
516
- **Use `mapPartitions` instead of `map`** when processing entire partitions is more efficient
517
- **Prefer `reduceByKey` over `groupByKey`** followed by reduce operations
518
- **Use `coalesce` instead of `repartition`** when only reducing partition count
519
- **Consider `combineByKey`** for complex aggregations that can't be expressed with other operations
520
521
### Memory Management
522
523
- **Call `unpersist()` on cached RDDs** when they're no longer needed
524
- **Use appropriate storage levels** for different access patterns
525
- **Monitor partition sizes** to avoid memory issues
526
527
### Shuffle Optimization
528
529
- **Use partitioners** to control data distribution and minimize shuffles
530
- **Co-partition related datasets** to enable map-side joins
531
- **Tune shuffle parameters** like `spark.sql.shuffle.partitions`
532
533
## Important Notes
534
535
- **RDDs are immutable** - transformations create new RDDs
536
- **Transformations are lazy** - they're only executed when an action is called
537
- **Actions are eager** - they trigger execution of the entire computation graph
538
- **Lineage tracking** enables automatic recovery from failures
539
- **Partitioning matters** - well-partitioned data improves performance significantly
540
- **Avoid collect() on large datasets** - it brings all data to the driver node