0
# RDD Operations
1
2
The core RDD API providing transformations and actions for distributed data processing, including map, filter, reduce operations and advanced transformations like joins and aggregations.
3
4
## Capabilities
5
6
### RDD Base Class
7
8
The fundamental abstraction in Spark representing an immutable, partitioned collection of elements that can be operated on in parallel.
9
10
```scala { .api }
11
/**
12
* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
13
* partitioned collection of elements that can be operated on in parallel.
14
*
15
* @param _sc The SparkContext that created this RDD
16
* @param deps Dependencies on other RDDs
17
*/
18
abstract class RDD[T: ClassTag](
19
@transient private var _sc: SparkContext,
20
@transient private var deps: Seq[Dependency[_]]
21
) extends Serializable with Logging {
22
23
// TRANSFORMATIONS (lazy operations that return new RDDs)
24
25
/** Apply a function to each element and return a new RDD */
26
def map[U: ClassTag](f: T => U): RDD[U]
27
28
/** Apply a function to each element and flatten the results */
29
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
30
31
/** Filter elements using a predicate function */
32
def filter(f: T => Boolean): RDD[T]
33
34
/** Return distinct elements (removes duplicates) */
35
def distinct(): RDD[T]
36
37
/** Return distinct elements with specific number of partitions */
38
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
39
40
/** Union this RDD with another RDD of the same type */
41
def union(other: RDD[T]): RDD[T]
42
43
/** Return intersection with another RDD */
44
def intersection(other: RDD[T]): RDD[T]
45
46
/** Subtract elements found in another RDD */
47
def subtract(other: RDD[T]): RDD[T]
48
49
/** Return Cartesian product with another RDD */
50
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
51
52
/** Sample elements with or without replacement */
53
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
54
55
/** Split RDD into multiple RDDs using weights array */
56
def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]
57
58
/** Group elements of each partition into an array */
59
def glom(): RDD[Array[T]]
60
61
/** Apply function to each partition */
62
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
63
64
/** Apply function to each partition with partition index */
65
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
66
67
/** Zip this RDD with another RDD (must have same number of partitions and elements) */
68
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
69
70
/** Zip elements with their indices */
71
def zipWithIndex(): RDD[(T, Long)]
72
73
/** Zip elements with unique IDs */
74
def zipWithUniqueId(): RDD[(T, Long)]
75
76
/** Pipe elements through external command */
77
def pipe(command: String): RDD[String]
78
79
/** Pipe elements through external command with environment */
80
def pipe(command: Seq[String], env: Map[String, String] = Map(), printPipeContext: (String => Unit) => Unit = null, printRDDElement: (T, String => Unit) => Unit = null): RDD[String]
81
82
/** Reduce number of partitions (no shuffle) */
83
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]
84
85
/** Repartition to different number of partitions (with shuffle) */
86
def repartition(numPartitions: Int): RDD[T]
87
88
/** Sort RDD elements */
89
def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
90
91
// ACTIONS (operations that return values to the driver)
92
93
/** Return all elements as an array */
94
def collect(): Array[T]
95
96
/** Return number of elements */
97
def count(): Long
98
99
/** Return first element */
100
def first(): T
101
102
/** Return first n elements */
103
def take(num: Int): Array[T]
104
105
/** Return top n elements by natural ordering */
106
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
107
108
/** Return smallest n elements by natural ordering */
109
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
110
111
/** Take a sample of elements and return as array */
112
def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
113
114
/** Apply function to each element (no return value) */
115
def foreach(f: T => Unit): Unit
116
117
/** Apply function to each partition */
118
def foreachPartition(f: Iterator[T] => Unit): Unit
119
120
/** Reduce elements using function */
121
def reduce(f: (T, T) => T): T
122
123
/** Aggregate elements with initial value */
124
def fold(zeroValue: T)(op: (T, T) => T): T
125
126
/** Aggregate elements using different input and output types */
127
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
128
129
/** Collect elements as key-value map (for RDD[(K, V)]) */
130
def collectAsMap(): Map[K, V] // Available when T <: (K, V)
131
132
/** Count elements by value */
133
def countByValue()(implicit ord: Ordering[T]): Map[T, Long]
134
135
/** Check if RDD is empty */
136
def isEmpty(): Boolean
137
138
/** Save as text file */
139
def saveAsTextFile(path: String): Unit
140
141
/** Save as text file with codec */
142
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
143
144
/** Save as object file using serialization */
145
def saveAsObjectFile(path: String): Unit
146
147
// PERSISTENCE OPERATIONS
148
149
/** Persist RDD in memory */
150
def cache(): RDD.this.type
151
152
/** Persist RDD with specified storage level */
153
def persist(newLevel: StorageLevel): RDD.this.type
154
155
/** Remove RDD from cache */
156
def unpersist(blocking: Boolean = true): RDD.this.type
157
158
/** Mark RDD for checkpointing */
159
def checkpoint(): Unit
160
161
/** Check if RDD is checkpointed */
162
def isCheckpointed: Boolean
163
164
/** Get checkpoint file if available */
165
def getCheckpointFile: Option[String]
166
167
// INFORMATION METHODS
168
169
/** Get number of partitions */
170
def getNumPartitions: Int
171
172
/** Get partitions array */
173
def partitions: Array[Partition]
174
175
/** Get current storage level */
176
def getStorageLevel: StorageLevel
177
178
/** Get RDD name */
179
def name: String
180
181
/** Set RDD name for display purposes */
182
def setName(_name: String): RDD.this.type
183
184
/** Get RDD ID */
185
def id: Int
186
187
/** Convert to string representation */
188
override def toString: String
189
190
/** Get creation site information */
191
def creationSite: CallSite
192
}
193
```
194
195
**Usage Examples:**
196
197
```scala
198
import org.apache.spark.{SparkContext, SparkConf}
199
import org.apache.spark.storage.StorageLevel
200
201
val sc = new SparkContext(new SparkConf().setAppName("RDD Examples").setMaster("local[*]"))
202
203
// Create RDD from collection
204
val numbers = sc.parallelize(1 to 100, 4)
205
206
// Basic transformations
207
val doubled = numbers.map(_ * 2)
208
val evens = numbers.filter(_ % 2 == 0)
209
val squares = numbers.map(x => x * x)
210
211
// FlatMap example
212
val words = sc.parallelize(Array("hello world", "spark rdd", "distributed computing"))
213
val allWords = words.flatMap(_.split(" "))
214
215
// Distinct and sampling
216
val duplicates = sc.parallelize(Array(1, 2, 2, 3, 3, 3, 4))
217
val unique = duplicates.distinct()
218
val sample = numbers.sample(false, 0.1) // 10% sample without replacement
219
220
// Combining RDDs
221
val rdd1 = sc.parallelize(1 to 5)
222
val rdd2 = sc.parallelize(4 to 8)
223
val combined = rdd1.union(rdd2)
224
val intersection = rdd1.intersection(rdd2)
225
val difference = rdd1.subtract(rdd2)
226
227
// Partitioning operations
228
val repartitioned = numbers.repartition(8) // Force 8 partitions
229
val coalesced = numbers.coalesce(2) // Reduce to 2 partitions without shuffle
230
231
// Actions
232
val collected = numbers.collect() // Returns Array[Int]
233
val count = numbers.count() // Returns Long
234
val sum = numbers.reduce(_ + _) // Returns Int
235
val first10 = numbers.take(10) // Returns Array[Int]
236
237
// Aggregation
238
val stats = numbers.aggregate((0, 0, 0))(
239
seqOp = { case ((sum, count, max), x) => (sum + x, count + 1, math.max(max, x)) },
240
combOp = { case ((sum1, count1, max1), (sum2, count2, max2)) =>
241
(sum1 + sum2, count1 + count2, math.max(max1, max2)) }
242
)
243
244
// Persistence
245
numbers.cache() // Cache in memory
246
numbers.persist(StorageLevel.MEMORY_AND_DISK_SER) // Custom storage level
247
248
// Cleanup
249
numbers.unpersist()
250
sc.stop()
251
```
252
253
### Double RDD Functions
254
255
Additional operations available on RDDs of Double values for statistical computations.
256
257
```scala { .api }
258
/**
259
* Extra functions available on RDDs of Doubles through an implicit conversion.
260
*/
261
class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
262
263
/** Compute the mean of RDD elements */
264
def mean(): Double
265
266
/** Compute the variance of RDD elements */
267
def variance(): Double
268
269
/** Compute the standard deviation of RDD elements */
270
def stdev(): Double
271
272
/** Compute the population variance of RDD elements */
273
def popVariance(): Double
274
275
/** Compute the population standard deviation of RDD elements */
276
def popStdev(): Double
277
278
/** Compute the sample variance of RDD elements */
279
def sampleVariance(): Double
280
281
/** Compute the sample standard deviation of RDD elements */
282
def sampleStdev(): Double
283
284
/** Compute statistics summary */
285
def stats(): StatCounter
286
287
/** Compute histogram with specified number of buckets */
288
def histogram(buckets: Int): (Array[Double], Array[Long])
289
290
/** Compute histogram with specified bucket boundaries */
291
def histogram(buckets: Array[Double]): Array[Long]
292
293
/** Sum all elements */
294
def sum(): Double
295
}
296
297
/**
298
* Statistics counter for computing mean, variance, etc.
299
*/
300
class StatCounter extends Serializable {
301
def count: Long
302
def mean: Double
303
def sum: Double
304
def min: Double
305
def max: Double
306
def variance: Double
307
def stdev: Double
308
def sampleVariance: Double
309
def sampleStdev: Double
310
}
311
```
312
313
**Usage Examples:**
314
315
```scala
316
val doubleRDD = sc.parallelize(Array(1.0, 2.5, 3.7, 4.2, 5.9, 6.1, 7.8))
317
318
// Basic statistics
319
val mean = doubleRDD.mean() // 4.457...
320
val variance = doubleRDD.variance()
321
val stdev = doubleRDD.stdev()
322
val sum = doubleRDD.sum()
323
324
// Statistics summary
325
val stats = doubleRDD.stats()
326
println(s"Count: ${stats.count}, Mean: ${stats.mean}, StdDev: ${stats.stdev}")
327
328
// Histogram
329
val (buckets, counts) = doubleRDD.histogram(5) // 5 equal-width buckets
330
val customCounts = doubleRDD.histogram(Array(0.0, 2.0, 4.0, 6.0, 8.0)) // Custom buckets
331
```
332
333
### Sequential File Operations
334
335
Operations for RDDs that can be saved as Hadoop SequenceFiles.
336
337
```scala { .api }
338
/**
339
* Extra functions available on RDDs of (key, value) pairs to allow writing to SequenceFiles.
340
*/
341
class SequenceFileRDDFunctions[K <% Writable : ClassTag, V <% Writable : ClassTag](
342
self: RDD[(K, V)]) extends Logging with Serializable {
343
344
/** Save RDD as SequenceFile */
345
def saveAsSequenceFile(path: String): Unit
346
347
/** Save RDD as SequenceFile with compression */
348
def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]]): Unit
349
}
350
```
351
352
**Usage Examples:**
353
354
```scala
355
import org.apache.hadoop.io.{IntWritable, Text}
356
357
// Create RDD of key-value pairs
358
val pairs = sc.parallelize(Array(("key1", "value1"), ("key2", "value2")))
359
360
// Convert to Writable types for SequenceFile
361
val writablePairs = pairs.map { case (k, v) => (new Text(k), new Text(v)) }
362
363
// Save as SequenceFile
364
writablePairs.saveAsSequenceFile("hdfs://path/to/output")
365
366
// Save with compression
367
import org.apache.hadoop.io.compress.GzipCodec
368
writablePairs.saveAsSequenceFile("hdfs://path/to/compressed", Some(classOf[GzipCodec]))
369
```
370
371
### Async RDD Actions
372
373
Asynchronous versions of RDD actions that return FutureAction objects.
374
375
```scala { .api }
376
/**
377
* Asynchronous API for RDD actions.
378
*/
379
class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Logging {
380
381
/** Asynchronously returns all elements of the RDD */
382
def collectAsync(): FutureAction[Array[T]]
383
384
/** Asynchronously returns the number of elements */
385
def countAsync(): FutureAction[Long]
386
387
/** Asynchronously applies a function to all elements */
388
def foreachAsync(f: T => Unit): FutureAction[Unit]
389
390
/** Asynchronously applies function to each partition */
391
def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit]
392
393
/** Asynchronously returns first n elements */
394
def takeAsync(num: Int): FutureAction[Array[T]]
395
}
396
397
/**
398
* A future for the result of an action.
399
*/
400
trait FutureAction[T] extends Future[T] {
401
/** Cancel the action if possible */
402
def cancel(): Unit
403
404
/** Check if action was cancelled */
405
def isCancelled: Boolean
406
407
/** Get job group ID */
408
def jobIds: Array[Int]
409
}
410
```
411
412
**Usage Examples:**
413
414
```scala
415
import scala.concurrent.{Await, ExecutionContext}
416
import scala.concurrent.duration._
417
418
implicit val ec = ExecutionContext.global
419
420
val largeRDD = sc.parallelize(1 to 1000000, 100)
421
422
// Asynchronous operations
423
val futureCount = largeRDD.countAsync()
424
val futureSum = largeRDD.map(_.toLong).reduce(_ + _)
425
426
// Handle results
427
futureCount.onSuccess {
428
case count => println(s"RDD has $count elements")
429
}
430
431
// Wait for completion
432
val count = Await.result(futureCount, 30.seconds)
433
434
// Cancel long-running operations
435
val futureResult = largeRDD.map(heavyComputation).collectAsync()
436
// ... later ...
437
futureResult.cancel()
438
```
439
440
## Performance Considerations
441
442
### Choosing Transformations vs Actions
443
- **Transformations** are lazy - they don't execute until an action is called
444
- Chain multiple transformations before calling actions to optimize execution
445
- Use `cache()` or `persist()` for RDDs that will be reused multiple times
446
447
### Partitioning Strategy
448
- More partitions = better parallelism but higher overhead
449
- Rule of thumb: 2-4 partitions per CPU core
450
- Use `coalesce()` to reduce partitions without shuffle when possible
451
- Use `repartition()` when you need more partitions or better distribution
452
453
### Memory Management
454
- Cache frequently accessed RDDs with appropriate storage levels
455
- Use serialized storage (`MEMORY_ONLY_SER`) to reduce memory usage
456
- Consider `MEMORY_AND_DISK` for large RDDs that don't fit in memory
457
- Call `unpersist()` when RDDs are no longer needed
458
459
### Common Performance Anti-patterns
460
- Avoid `collect()` on large RDDs - it brings all data to the driver
461
- Don't use `countByValue()` on RDDs with high cardinality
462
- Minimize data shuffling operations when possible
463
- Use broadcast variables for small lookup tables instead of joins