0
# RDD Operations
1
2
Resilient Distributed Datasets (RDDs) are the fundamental data abstraction in Spark. They represent immutable, partitioned collections of elements that can be operated on in parallel. RDDs support two types of operations: transformations (which create new RDDs) and actions (which return values).
3
4
## Core RDD Class
5
6
```scala { .api }
7
abstract class RDD[T: ClassTag](
8
@transient private var _sc: SparkContext,
9
@transient private var deps: Seq[Dependency[_]]
10
) {
11
// Transformations - Lazy operations that return new RDDs
12
def map[U: ClassTag](f: T => U): RDD[U]
13
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
14
def filter(f: T => Boolean): RDD[T]
15
def distinct(numPartitions: Int = partitions.length): RDD[T]
16
def union(other: RDD[T]): RDD[T]
17
def intersection(other: RDD[T]): RDD[T]
18
def intersection(other: RDD[T], partitioner: Partitioner): RDD[T]
19
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
20
def subtract(other: RDD[T]): RDD[T]
21
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
22
def subtract(other: RDD[T], p: Partitioner): RDD[T]
23
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
24
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
25
def zipWithIndex(): RDD[(T, Long)]
26
def zipWithUniqueId(): RDD[(T, Long)]
27
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
28
def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]
29
def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
30
def repartition(numPartitions: Int): RDD[T]
31
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]
32
def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
33
def glom(): RDD[Array[T]]
34
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
35
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
36
def pipe(command: String): RDD[String]
37
def pipe(command: Seq[String], env: Map[String, String] = Map(), printPipeRDDInfo: Option[String => Unit] = None, propagateFailure: Boolean = false, checkExitCode: Boolean = true): RDD[String]
38
39
// Actions - Operations that trigger computation and return values
40
def collect(): Array[T]
41
def count(): Long
42
def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]
43
def countApproxDistinct(relativeSD: Double = 0.05): Long
44
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]
45
def first(): T
46
def isEmpty(): Boolean
47
def take(num: Int): Array[T]
48
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
49
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
50
def min()(implicit ord: Ordering[T]): T
51
def max()(implicit ord: Ordering[T]): T
52
def reduce(f: (T, T) => T): T
53
def treeReduce(f: (T, T) => T, depth: Int = 2): T
54
def fold(zeroValue: T)(op: (T, T) => T): T
55
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
56
def treeAggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U, depth: Int = 2): U
57
def foreach(f: T => Unit): Unit
58
def foreachPartition(f: Iterator[T] => Unit): Unit
59
60
// Persistence and Caching
61
def persist(): this.type
62
def persist(newLevel: StorageLevel): this.type
63
def cache(): this.type
64
def unpersist(blocking: Boolean = true): this.type
65
def checkpoint(): Unit
66
def isCheckpointed: Boolean
67
def getCheckpointFile: Option[String]
68
def localCheckpoint(): this.type
69
70
// Partition and Dependency Information
71
def partitions: Array[Partition]
72
def partitioner: Option[Partitioner]
73
def getNumPartitions: Int
74
def dependencies: Seq[Dependency[_]]
75
def preferredLocations(split: Partition): Seq[String]
76
def compute(split: Partition, context: TaskContext): Iterator[T]
77
78
// Metadata and Debugging
79
def id: Int
80
def name: String
81
def setName(name: String): this.type
82
def toDebugString: String
83
def getStorageLevel: StorageLevel
84
def context: SparkContext
85
}
86
```
87
88
## Transformations
89
90
Transformations are lazy operations that define new RDDs but don't trigger computation immediately.
91
92
### Basic Transformations
93
94
```scala
95
import org.apache.spark.{SparkContext, SparkConf}
96
97
val sc = new SparkContext(new SparkConf().setAppName("RDD Examples").setMaster("local[*]"))
98
99
// Create sample data
100
val numbers = sc.parallelize(1 to 100)
101
val words = sc.parallelize(Seq("hello world", "spark is awesome", "big data processing"))
102
103
// map - Transform each element
104
val squared = numbers.map(x => x * x)
105
val lengths = words.map(_.length)
106
107
// flatMap - Transform each element to multiple elements
108
val allWords = words.flatMap(_.split(" "))
109
val digits = numbers.flatMap(n => n.toString.toCharArray.map(_.asDigit))
110
111
// filter - Keep elements matching predicate
112
val evenNumbers = numbers.filter(_ % 2 == 0)
113
val longWords = allWords.filter(_.length > 4)
114
115
// distinct - Remove duplicates
116
val uniqueWords = allWords.distinct()
117
val uniqueDigits = digits.distinct()
118
119
// sample - Random sampling
120
val sample = numbers.sample(withReplacement = false, fraction = 0.1, seed = 42)
121
val sampleWithReplacement = numbers.sample(withReplacement = true, fraction = 0.2)
122
```
123
124
### Set Operations
125
126
```scala
127
val rdd1 = sc.parallelize(1 to 10)
128
val rdd2 = sc.parallelize(5 to 15)
129
130
// union - Combine RDDs (allows duplicates)
131
val combined = rdd1.union(rdd2)
132
133
// intersection - Elements in both RDDs
134
val common = rdd1.intersection(rdd2)
135
136
// subtract - Elements in first RDD but not second
137
val difference = rdd1.subtract(rdd2)
138
139
// cartesian - Cartesian product
140
val pairs = rdd1.cartesian(rdd2)
141
```
142
143
### Pairing and Zipping
144
145
```scala
146
val data = sc.parallelize(Seq("apple", "banana", "cherry"))
147
val scores = sc.parallelize(Seq(95, 87, 92))
148
149
// zip - Combine elements at same positions
150
val pairs = data.zip(scores)
151
152
// zipWithIndex - Add sequential indices
153
val indexed = data.zipWithIndex()
154
155
// zipWithUniqueId - Add unique IDs (not necessarily sequential)
156
val withIds = data.zipWithUniqueId()
157
```
158
159
### Repartitioning
160
161
```scala
162
val data = sc.parallelize(1 to 1000, numSlices = 8)
163
164
// repartition - Change number of partitions (shuffles data)
165
val repartitioned = data.repartition(4)
166
167
// coalesce - Reduce partitions without shuffling (when possible)
168
val coalesced = data.coalesce(2)
169
170
// sortBy - Sort RDD by key function
171
val sorted = data.sortBy(x => -x, ascending = false) // Sort descending
172
```
173
174
### Advanced Transformations
175
176
```scala
177
// glom - Collect all elements in each partition into arrays
178
val partitionArrays = numbers.glom()
179
180
// mapPartitions - Transform entire partitions
181
val partitionSums = numbers.mapPartitions(iter => Iterator(iter.sum))
182
183
// mapPartitionsWithIndex - Access partition index
184
val partitionInfo = numbers.mapPartitionsWithIndex { (index, iter) =>
185
Iterator((index, iter.sum, iter.size))
186
}
187
188
// pipe - Send data through external process
189
val upperCased = allWords.pipe("tr [a-z] [A-Z]")
190
```
191
192
## Actions
193
194
Actions trigger computation and return results to the driver or save data to external systems.
195
196
### Collection Actions
197
198
```scala
199
val data = sc.parallelize(1 to 100)
200
201
// collect - Bring all elements to driver (use with caution for large datasets)
202
val allElements: Array[Int] = data.collect()
203
204
// take - Get first n elements
205
val first10: Array[Int] = data.take(10)
206
207
// takeOrdered - Get n smallest elements
208
val smallest5: Array[Int] = data.takeOrdered(5)
209
210
// top - Get n largest elements
211
val largest5: Array[Int] = data.top(5)
212
213
// takeSample - Random sample
214
val randomSample: Array[Int] = data.takeSample(withReplacement = false, num = 20)
215
216
// first - Get first element
217
val firstElement: Int = data.first()
218
219
// min/max - Find extremes
220
val minimum: Int = data.min()
221
val maximum: Int = data.max()
222
```
223
224
### Aggregation Actions
225
226
```scala
227
// count - Count elements
228
val totalCount: Long = data.count()
229
230
// countByValue - Count occurrences of each value
231
val wordCounts: Map[String, Long] = allWords.countByValue()
232
233
// reduce - Combine elements with associative function
234
val sum: Int = data.reduce(_ + _)
235
val product: Int = data.reduce(_ * _)
236
237
// fold - Like reduce but with initial value
238
val sumWithZero: Int = data.fold(0)(_ + _)
239
240
// aggregate - More flexible reduction with different input/output types
241
val stats: (Int, Double) = data.aggregate((0, 0.0))(
242
seqOp = { case ((count, sum), value) => (count + 1, sum + value) },
243
combOp = { case ((c1, s1), (c2, s2)) => (c1 + c2, s1 + s2) }
244
)
245
246
// treeReduce/treeAggregate - Hierarchical reduction (more efficient for deep lineages)
247
val treeSum: Int = data.treeReduce(_ + _)
248
```
249
250
### Side-Effect Actions
251
252
```scala
253
// foreach - Execute function on each element (driver side)
254
data.foreach(println)
255
256
// foreachPartition - Execute function on each partition
257
data.foreachPartition { partition =>
258
val connection = createDatabaseConnection()
259
partition.foreach(value => insertToDatabase(connection, value))
260
connection.close()
261
}
262
```
263
264
### Approximate Actions
265
266
```scala
267
// countApprox - Approximate count with timeout
268
val approxCount = data.countApprox(timeout = 1000L, confidence = 0.95)
269
270
// countApproxDistinct - Approximate distinct count
271
val approxDistinct = data.countApproxDistinct(relativeSD = 0.05)
272
```
273
274
## Persistence and Caching
275
276
RDDs can be persisted in memory or disk for efficient reuse across multiple actions.
277
278
```scala
279
import org.apache.spark.storage.StorageLevel
280
281
val expensiveRDD = data
282
.filter(_ > 50)
283
.map(expensiveComputation)
284
.filter(_.nonEmpty)
285
286
// Cache in memory (shorthand for MEMORY_ONLY)
287
expensiveRDD.cache()
288
289
// Explicit persistence with storage level
290
expensiveRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
291
292
// Use the cached RDD multiple times
293
val result1 = expensiveRDD.count()
294
val result2 = expensiveRDD.collect()
295
val result3 = expensiveRDD.take(10)
296
297
// Remove from cache when done
298
expensiveRDD.unpersist()
299
```
300
301
## Checkpointing
302
303
Checkpointing saves RDD data to reliable storage to truncate lineage and improve fault tolerance.
304
305
```scala
306
// Set checkpoint directory
307
sc.setCheckpointDir("hdfs://namenode:port/checkpoints")
308
309
val complexRDD = data
310
.map(complexTransformation1)
311
.filter(complexFilter)
312
.map(complexTransformation2)
313
.filter(anotherComplexFilter)
314
315
// Checkpoint the RDD
316
complexRDD.checkpoint()
317
318
// Trigger checkpointing with an action
319
complexRDD.count()
320
321
// Verify checkpointing
322
println(s"Is checkpointed: ${complexRDD.isCheckpointed}")
323
println(s"Checkpoint file: ${complexRDD.getCheckpointFile}")
324
```
325
326
## Partition Management
327
328
Understanding and controlling partitions is crucial for performance optimization.
329
330
```scala
331
val data = sc.textFile("large-file.txt", minPartitions = 100)
332
333
// Check partition information
334
println(s"Number of partitions: ${data.getNumPartitions}")
335
println(s"Partitioner: ${data.partitioner}")
336
337
// View partition content
338
data.glom().collect().zipWithIndex.foreach { case (partitionData, index) =>
339
println(s"Partition $index has ${partitionData.length} elements")
340
}
341
342
// Custom partitioning for key-value RDDs
343
val keyValueData = data.map(line => (line.length, line))
344
val hashPartitioned = keyValueData.partitionBy(new HashPartitioner(50))
345
val rangePartitioned = keyValueData.partitionBy(new RangePartitioner(50, keyValueData))
346
```
347
348
## Advanced Usage Patterns
349
350
### Pipeline Optimization
351
352
```scala
353
// Chain transformations efficiently
354
val pipeline = sc.textFile("input")
355
.filter(_.nonEmpty)
356
.map(_.toLowerCase.trim)
357
.filter(_.length > 5)
358
.map(processLine)
359
.filter(_.isValid)
360
.persist(StorageLevel.MEMORY_AND_DISK_SER) // Persist at strategic points
361
362
// Multiple actions on same RDD
363
val validLines = pipeline.cache()
364
val count = validLines.count()
365
val sample = validLines.take(100)
366
val summary = validLines.map(_.summary).collect()
367
```
368
369
### Error Handling
370
371
```scala
372
import scala.util.{Try, Success, Failure}
373
374
val robustRDD = data.map { element =>
375
Try(riskyOperation(element)) match {
376
case Success(result) => Some(result)
377
case Failure(exception) =>
378
logError(s"Failed to process $element: ${exception.getMessage}")
379
None
380
}
381
}.filter(_.isDefined).map(_.get)
382
```
383
384
### Custom Partitioning for Performance
385
386
```scala
387
class CustomPartitioner(numPartitions: Int) extends Partitioner {
388
def numPartitions: Int = numPartitions
389
390
def getPartition(key: Any): Int = {
391
key match {
392
case s: String => math.abs(s.hashCode) % numPartitions
393
case i: Int => i % numPartitions
394
case _ => 0
395
}
396
}
397
}
398
399
val customPartitioned = keyValueData.partitionBy(new CustomPartitioner(100))
400
```