0
# RDD Operations
1
2
Resilient Distributed Datasets (RDDs) provide the core abstraction for distributed data processing in Spark, offering fault-tolerant distributed collections with transformations and actions.
3
4
## Capabilities
5
6
### RDD Base Class
7
8
Abstract base class representing an immutable, partitioned collection of elements that can be operated on in parallel.
9
10
```scala { .api }
11
/**
12
* Resilient Distributed Dataset (RDD) - basic abstraction in Spark
13
*/
14
abstract class RDD[T: ClassTag](
15
@transient private var _sc: SparkContext,
16
@transient private var deps: Seq[Dependency[_]]
17
) extends Serializable
18
19
// Core transformation methods (lazy evaluation)
20
def map[U: ClassTag](f: T => U): RDD[U]
21
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
22
def filter(f: T => Boolean): RDD[T]
23
def distinct(): RDD[T]
24
def distinct(numPartitions: Int): RDD[T]
25
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
26
27
// Set operations
28
def union(other: RDD[T]): RDD[T]
29
def ++(other: RDD[T]): RDD[T] // alias for union
30
def intersection(other: RDD[T]): RDD[T]
31
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
32
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
33
def subtract(other: RDD[T]): RDD[T]
34
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
35
def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
36
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
37
38
// Grouping and sorting
39
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
40
def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
41
def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null): RDD[(K, Iterable[T])]
42
def sortBy[K](f: T => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
43
44
// Advanced transformations
45
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
46
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
47
def glom(): RDD[Array[T]]
48
def keyBy[K](f: T => K): RDD[(K, T)]
49
def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]
50
51
// Zipping operations
52
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
53
def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]
54
def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]
55
def zipWithIndex(): RDD[(T, Long)]
56
def zipWithUniqueId(): RDD[(T, Long)]
57
58
// Partitioning operations
59
def repartition(numPartitions: Int): RDD[T]
60
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]
61
def partitionBy(partitioner: Partitioner): RDD[T] // only for pair RDDs
62
63
// Core action methods (trigger execution)
64
def collect(): Array[T]
65
def count(): Long
66
def first(): T
67
def take(num: Int): Array[T]
68
def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
69
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
70
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
71
def max()(implicit ord: Ordering[T]): T
72
def min()(implicit ord: Ordering[T]): T
73
def isEmpty(): Boolean
74
def toLocalIterator: Iterator[T]
75
76
// Counting actions
77
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]
78
def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]
79
def countByValueApprox(timeout: Long, confidence: Double = 0.95)(implicit ord: Ordering[T] = null): PartialResult[Map[T, BoundedDouble]]
80
def countApproxDistinct(p: Int, sp: Int): Long
81
def countApproxDistinct(relativeSD: Double = 0.05): Long
82
83
// Reduction operations
84
def reduce(f: (T, T) => T): T
85
def fold(zeroValue: T)(op: (T, T) => T): T
86
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
87
def treeReduce(f: (T, T) => T, depth: Int = 2): T
88
def treeAggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U, depth: Int = 2): U
89
90
// Side-effect operations
91
def foreach(f: T => Unit): Unit
92
def foreachPartition(f: Iterator[T] => Unit): Unit
93
94
// Persistence methods
95
def persist(): RDD[T]
96
def persist(storageLevel: StorageLevel): RDD[T]
97
def cache(): RDD[T] // equivalent to persist(MEMORY_ONLY)
98
def unpersist(blocking: Boolean = true): RDD[T]
99
def getStorageLevel: StorageLevel
100
101
// Checkpointing methods
102
def checkpoint(): Unit
103
def localCheckpoint(): RDD[T]
104
def isCheckpointed: Boolean
105
def getCheckpointFile: Option[String]
106
107
// File output operations
108
def saveAsTextFile(path: String): Unit
109
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
110
def saveAsObjectFile(path: String): Unit
111
112
// Piping operations
113
def pipe(command: String): RDD[String]
114
def pipe(command: String, env: Map[String, String]): RDD[String]
115
def pipe(command: Seq[String], env: Map[String, String] = Map(),
116
printPipeContext: (String => Unit) => Unit = null,
117
printRDDElement: (T, String => Unit) => Unit = null,
118
separateWorkingDir: Boolean = false): RDD[String]
119
120
// Metadata and utility methods
121
def setName(name: String): RDD[T]
122
val name: String
123
val id: Int
124
def partitions: Array[Partition]
125
def dependencies: Seq[Dependency[_]]
126
val partitioner: Option[Partitioner]
127
def getNumPartitions: Int
128
def sparkContext: SparkContext
129
def context: SparkContext // alias for sparkContext
130
def toJavaRDD(): JavaRDD[T]
131
def toDebugString: String
132
133
// Special collection methods
134
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U]
135
```
136
137
**Usage Examples:**
138
139
```scala
140
import org.apache.spark.{SparkContext, SparkConf}
141
import org.apache.spark.storage.StorageLevel
142
143
val sc = new SparkContext(new SparkConf().setAppName("RDD Examples").setMaster("local[*]"))
144
145
// Create RDD from collection
146
val numbers = sc.parallelize(1 to 100)
147
148
// Transformations (lazy evaluation)
149
val evenNumbers = numbers.filter(_ % 2 == 0)
150
val squares = evenNumbers.map(x => x * x)
151
val distinct = squares.distinct()
152
153
// Actions (trigger execution)
154
val result = squares.collect()
155
val count = squares.count()
156
val sum = squares.reduce(_ + _)
157
val firstTen = squares.take(10)
158
159
// Complex transformations
160
val words = sc.textFile("hdfs://path/to/file.txt")
161
val wordCounts = words
162
.flatMap(_.split(" "))
163
.map(word => (word, 1))
164
.reduceByKey(_ + _)
165
166
// Persistence for reuse
167
val cachedRDD = numbers.map(_ * 2).cache()
168
val result1 = cachedRDD.sum()
169
val result2 = cachedRDD.max() // Uses cached data
170
171
// Advanced operations
172
val grouped = numbers.groupBy(_ % 10)
173
val sorted = numbers.sortBy(identity, ascending = false)
174
val sampled = numbers.sample(withReplacement = false, 0.1)
175
```
176
177
### Numeric RDD Operations (DoubleRDDFunctions)
178
179
Additional operations available for RDDs containing numeric values.
180
181
```scala { .api }
182
/**
183
* Extra functions available on RDDs of Doubles through an implicit conversion
184
*/
185
class DoubleRDDFunctions(self: RDD[Double]) {
186
/** Compute the mean of this RDD's elements */
187
def mean(): Double
188
189
/** Compute the sum of this RDD's elements */
190
def sum(): Double
191
192
/** Find the minimum element in this RDD */
193
def min(): Double
194
195
/** Find the maximum element in this RDD */
196
def max(): Double
197
198
/** Compute the variance of this RDD's elements */
199
def variance(): Double
200
201
/** Compute the standard deviation of this RDD's elements */
202
def stdev(): Double
203
204
/** Compute the sample standard deviation of this RDD's elements */
205
def sampleStdev(): Double
206
207
/** Compute the sample variance of this RDD's elements */
208
def sampleVariance(): Double
209
210
/** Compute a histogram of the RDD using buckets evenly spaced between the minimum and maximum */
211
def histogram(buckets: Int): (Array[Double], Array[Long])
212
213
/** Compute a histogram using provided bucket boundaries */
214
def histogram(buckets: Array[Double]): Array[Long]
215
216
/** Return approximate percentiles */
217
def approxQuantile(probabilities: Array[Double], relativeError: Double): Array[Double]
218
219
/** Compute column summary statistics */
220
def stats(): StatCounter
221
}
222
```
223
224
**Usage Examples:**
225
226
```scala
227
val doubleRDD = sc.parallelize(Array(1.0, 2.5, 3.7, 4.2, 5.8, 6.1))
228
229
// Statistical operations (implicit conversion)
230
val mean = doubleRDD.mean()
231
val stdDev = doubleRDD.stdev()
232
val variance = doubleRDD.variance()
233
val summary = doubleRDD.stats()
234
235
// Histogram
236
val (buckets, counts) = doubleRDD.histogram(5)
237
println(s"Histogram: ${buckets.zip(counts).mkString(", ")}")
238
```
239
240
### Asynchronous RDD Actions
241
242
Asynchronous versions of RDD actions that return futures for non-blocking execution.
243
244
```scala { .api }
245
/**
246
* Extra functions for performing asynchronous operations on RDDs
247
*/
248
class AsyncRDDActions[T: ClassTag](self: RDD[T]) {
249
/** Returns a future for retrieving all elements of this RDD */
250
def collectAsync(): FutureAction[Array[T]]
251
252
/** Returns a future for counting the number of elements in the RDD */
253
def countAsync(): FutureAction[Long]
254
255
/** Returns a future for retrieving the first element in this RDD */
256
def foreachAsync(f: T => Unit): FutureAction[Unit]
257
258
/** Returns a future for applying a function to each partition of this RDD */
259
def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit]
260
261
/** Returns a future for retrieving the first num elements of the RDD */
262
def takeAsync(num: Int): FutureAction[Array[T]]
263
}
264
```
265
266
### Partition-Level Operations
267
268
Operations that work at the partition level for performance optimization.
269
270
```scala { .api }
271
// Partition-level transformation methods in RDD
272
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
273
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
274
def foreachPartition(f: Iterator[T] => Unit): Unit
275
def glom(): RDD[Array[T]] // convert each partition into an array
276
def pipe(command: String): RDD[String]
277
def pipe(command: Seq[String]): RDD[String]
278
```
279
280
**Usage Examples:**
281
282
```scala
283
val data = sc.parallelize(1 to 100, 4) // 4 partitions
284
285
// Process each partition independently
286
val partitionSums = data.mapPartitions { iter =>
287
val sum = iter.sum
288
Iterator(sum)
289
}
290
291
// Access partition index
292
val partitionInfo = data.mapPartitionsWithIndex { (index, iter) =>
293
val count = iter.size
294
Iterator((index, count))
295
}
296
297
// Convert partitions to arrays
298
val partitionArrays = data.glom()
299
val arrays = partitionArrays.collect() // Array of arrays
300
301
// External command processing
302
val piped = data.pipe("grep '5'")
303
```
304
305
### Zip Operations
306
307
Operations for combining RDDs element-wise.
308
309
```scala { .api }
310
// Zip operations in RDD
311
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
312
def zipWithIndex(): RDD[(T, Long)]
313
def zipWithUniqueId(): RDD[(T, Long)]
314
def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]
315
def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V]
316
```
317
318
**Usage Examples:**
319
320
```scala
321
val rdd1 = sc.parallelize(Array("a", "b", "c"))
322
val rdd2 = sc.parallelize(Array(1, 2, 3))
323
324
// Zip two RDDs
325
val zipped = rdd1.zip(rdd2) // RDD[("a", 1), ("b", 2), ("c", 3)]
326
327
// Zip with index
328
val withIndex = rdd1.zipWithIndex() // RDD[("a", 0), ("b", 1), ("c", 2)]
329
330
// Zip with unique ID
331
val withUniqueId = rdd1.zipWithUniqueId()
332
333
// Custom zip operation on partitions
334
val customZip = rdd1.zipPartitions(rdd2) { (iter1, iter2) =>
335
iter1.zip(iter2).map { case (str, num) => s"$str-$num" }
336
}
337
```
338
339
## Common RDD Patterns
340
341
### Efficient Data Processing Pipeline
342
343
```scala
344
val input = sc.textFile("hdfs://input/data.txt")
345
346
val processed = input
347
.filter(_.nonEmpty) // Remove empty lines
348
.map(_.toLowerCase.trim) // Normalize
349
.flatMap(_.split("\\s+")) // Split into words
350
.filter(_.length > 3) // Filter short words
351
.map((_, 1)) // Create pairs
352
.reduceByKey(_ + _) // Count occurrences
353
.filter(_._2 > 10) // Filter rare words
354
.sortBy(_._2, ascending = false) // Sort by count
355
.cache() // Cache for reuse
356
357
// Multiple actions on cached RDD
358
val topWords = processed.take(100)
359
val totalUnique = processed.count()
360
processed.saveAsTextFile("hdfs://output/results")
361
```
362
363
### Error Handling and Debugging
364
365
```scala
366
import org.apache.spark.TaskContext
367
368
val dataRDD = sc.parallelize(1 to 1000)
369
370
// Add debugging information
371
val debugRDD = dataRDD.mapPartitionsWithIndex { (partitionId, iter) =>
372
val taskContext = TaskContext.get()
373
println(s"Processing partition $partitionId on ${taskContext.taskAttemptId()}")
374
375
iter.map { value =>
376
try {
377
// Some processing that might fail
378
if (value % 100 == 0) throw new RuntimeException(s"Error processing $value")
379
value * 2
380
} catch {
381
case e: Exception =>
382
println(s"Error in partition $partitionId: ${e.getMessage}")
383
-1 // Error marker
384
}
385
}
386
}
387
388
// Filter out error markers and get debug info
389
val cleanData = debugRDD.filter(_ != -1)
390
println(s"Debug info: ${dataRDD.toDebugString}")
391
```