0
# Core RDD Operations
1
2
The RDD (Resilient Distributed Dataset) is the fundamental abstraction in Apache Spark. It represents an immutable, partitioned collection of elements that can be operated on in parallel with fault-tolerance built-in.
3
4
## RDD Class
5
6
```scala { .api }
7
abstract class RDD[T] extends Serializable {
8
// Core properties
9
def partitions: Array[Partition]
10
def compute(split: Partition, context: TaskContext): Iterator[T]
11
def dependencies: Seq[Dependency[_]]
12
def partitioner: Option[Partitioner] = None
13
def preferredLocations(split: Partition): Seq[String] = Nil
14
}
15
```
16
17
## Transformations
18
19
Transformations are lazy operations that create a new RDD from an existing one. They are not executed immediately but build a directed acyclic graph (DAG) of computations.
20
21
### Basic Transformations
22
23
**map**: Apply a function to each element
24
```scala { .api }
25
def map[U: ClassTag](f: T => U): RDD[U]
26
```
27
28
```scala
29
val numbers = sc.parallelize(Array(1, 2, 3, 4, 5))
30
val squared = numbers.map(x => x * x)
31
// Result: RDD containing [1, 4, 9, 16, 25]
32
```
33
34
**flatMap**: Apply a function and flatten the results
35
```scala { .api }
36
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
37
```
38
39
```scala
40
val lines = sc.parallelize(Array("hello world", "spark rdd"))
41
val words = lines.flatMap(line => line.split(" "))
42
// Result: RDD containing ["hello", "world", "spark", "rdd"]
43
```
44
45
**filter**: Keep elements that satisfy a predicate
46
```scala { .api }
47
def filter(f: T => Boolean): RDD[T]
48
```
49
50
```scala
51
val numbers = sc.parallelize(Array(1, 2, 3, 4, 5, 6))
52
val evens = numbers.filter(_ % 2 == 0)
53
// Result: RDD containing [2, 4, 6]
54
```
55
56
**distinct**: Remove duplicate elements
57
```scala { .api }
58
def distinct(): RDD[T]
59
def distinct(numPartitions: Int): RDD[T]
60
```
61
62
```scala
63
val data = sc.parallelize(Array(1, 2, 2, 3, 3, 3))
64
val unique = data.distinct()
65
// Result: RDD containing [1, 2, 3]
66
```
67
68
### Sampling Transformations
69
70
**sample**: Return a sampled subset
71
```scala { .api }
72
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
73
```
74
75
```scala
76
val data = sc.parallelize(1 to 100)
77
val sampled = data.sample(withReplacement = false, fraction = 0.1)
78
// Result: RDD with approximately 10% of original elements
79
```
80
81
**randomSplit**: Split RDD randomly into multiple RDDs
82
```scala { .api }
83
def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]
84
```
85
86
```scala
87
val data = sc.parallelize(1 to 100)
88
val Array(train, test) = data.randomSplit(Array(0.7, 0.3))
89
// Result: Two RDDs with ~70% and ~30% of data respectively
90
```
91
92
### Set Operations
93
94
**union**: Return the union of two RDDs
95
```scala { .api }
96
def union(other: RDD[T]): RDD[T]
97
```
98
99
**intersection**: Return the intersection of two RDDs
100
```scala { .api }
101
def intersection(other: RDD[T]): RDD[T]
102
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
103
```
104
105
**subtract**: Return elements in this RDD but not in the other
106
```scala { .api }
107
def subtract(other: RDD[T]): RDD[T]
108
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
109
def subtract(other: RDD[T], p: Partitioner): RDD[T]
110
```
111
112
```scala
113
val rdd1 = sc.parallelize(Array(1, 2, 3, 4))
114
val rdd2 = sc.parallelize(Array(3, 4, 5, 6))
115
116
val unionRDD = rdd1.union(rdd2) // [1, 2, 3, 4, 3, 4, 5, 6]
117
val intersectionRDD = rdd1.intersection(rdd2) // [3, 4]
118
val subtractRDD = rdd1.subtract(rdd2) // [1, 2]
119
```
120
121
### Pairing and Joining
122
123
**zip**: Zip this RDD with another one, returning key-value pairs
124
```scala { .api }
125
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
126
```
127
128
**zipWithIndex**: Zip with the element indices
129
```scala { .api }
130
def zipWithIndex(): RDD[(T, Long)]
131
```
132
133
**zipWithUniqueId**: Zip with generated unique IDs
134
```scala { .api }
135
def zipWithUniqueId(): RDD[(T, Long)]
136
```
137
138
**cartesian**: Return Cartesian product with another RDD
139
```scala { .api }
140
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
141
```
142
143
```scala
144
val rdd1 = sc.parallelize(Array("a", "b"))
145
val rdd2 = sc.parallelize(Array(1, 2))
146
147
val zipped = rdd1.zip(rdd2) // [("a", 1), ("b", 2)]
148
val withIndex = rdd1.zipWithIndex() // [("a", 0), ("b", 1)]
149
val cartesian = rdd1.cartesian(rdd2) // [("a", 1), ("a", 2), ("b", 1), ("b", 2)]
150
```
151
152
### Grouping and Sorting
153
154
**groupBy**: Group elements by a key function
155
```scala { .api }
156
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
157
def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
158
```
159
160
```scala
161
val data = sc.parallelize(Array(1, 2, 3, 4, 5, 6))
162
val grouped = data.groupBy(_ % 2) // Group by even/odd
163
// Result: [(0, [2, 4, 6]), (1, [1, 3, 5])]
164
```
165
166
**keyBy**: Create tuples by applying a function to generate keys
167
```scala { .api }
168
def keyBy[K](f: T => K): RDD[(K, T)]
169
```
170
171
```scala
172
val words = sc.parallelize(Array("apple", "banana", "apricot"))
173
val byFirstLetter = words.keyBy(_.charAt(0))
174
// Result: [('a', "apple"), ('b', "banana"), ('a', "apricot")]
175
```
176
177
### Partitioning Transformations
178
179
**repartition**: Increase or decrease partitions with shuffle
180
```scala { .api }
181
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
182
```
183
184
**coalesce**: Reduce the number of partitions (optionally with shuffle)
185
```scala { .api }
186
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]
187
```
188
189
```scala
190
val data = sc.parallelize(1 to 1000, 10) // 10 partitions
191
val repartitioned = data.repartition(5) // 5 partitions with shuffle
192
val coalesced = data.coalesce(5) // 5 partitions without shuffle
193
```
194
195
### Partition-wise Operations
196
197
**mapPartitions**: Apply function to each partition independently
198
```scala { .api }
199
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
200
```
201
202
**mapPartitionsWithIndex**: Apply function with partition index
203
```scala { .api }
204
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
205
```
206
207
**glom**: Return an array of all elements in each partition
208
```scala { .api }
209
def glom(): RDD[Array[T]]
210
```
211
212
```scala
213
val data = sc.parallelize(Array(1, 2, 3, 4, 5, 6), 2)
214
215
// Sum elements in each partition
216
val partitionSums = data.mapPartitions(iter => Iterator(iter.sum))
217
218
// Add partition index to each element
219
val withPartitionId = data.mapPartitionsWithIndex((index, iter) =>
220
iter.map(value => (index, value)))
221
222
// Get arrays of elements per partition
223
val partitionArrays = data.glom() // [[1, 2, 3], [4, 5, 6]]
224
```
225
226
## Actions
227
228
Actions trigger the execution of transformations and return values to the driver program or save data to storage.
229
230
### Collection Actions
231
232
**collect**: Return all elements as an array (use with caution on large datasets)
233
```scala { .api }
234
def collect(): Array[T]
235
```
236
237
**take**: Return first n elements
238
```scala { .api }
239
def take(num: Int): Array[T]
240
```
241
242
**takeOrdered**: Return K smallest elements
243
```scala { .api }
244
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
245
```
246
247
**top**: Return K largest elements
248
```scala { .api }
249
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
250
```
251
252
**takeSample**: Return a random sample of elements
253
```scala { .api }
254
def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
255
```
256
257
**first**: Return the first element
258
```scala { .api }
259
def first(): T
260
```
261
262
```scala
263
val data = sc.parallelize(Array(5, 1, 3, 9, 2, 7))
264
265
val all = data.collect() // [5, 1, 3, 9, 2, 7]
266
val first3 = data.take(3) // [5, 1, 3]
267
val smallest2 = data.takeOrdered(2) // [1, 2]
268
val largest2 = data.top(2) // [9, 7]
269
val sample = data.takeSample(false, 3) // Random 3 elements
270
val firstElement = data.first() // 5
271
```
272
273
### Aggregation Actions
274
275
**count**: Return the number of elements
276
```scala { .api }
277
def count(): Long
278
```
279
280
**countByValue**: Return count of each unique value
281
```scala { .api }
282
def countByValue()(implicit ord: Ordering[T]): Map[T, Long]
283
```
284
285
**reduce**: Reduce elements using an associative function
286
```scala { .api }
287
def reduce(f: (T, T) => T): T
288
```
289
290
**fold**: Aggregate with a zero value
291
```scala { .api }
292
def fold(zeroValue: T)(op: (T, T) => T): T
293
```
294
295
**aggregate**: Aggregate with different result type
296
```scala { .api }
297
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
298
```
299
300
```scala
301
val numbers = sc.parallelize(Array(1, 2, 3, 4, 5))
302
303
val count = numbers.count() // 5
304
val sum = numbers.reduce(_ + _) // 15
305
val sumWithZero = numbers.fold(0)(_ + _) // 15
306
val (sum2, count2) = numbers.aggregate((0, 0))( // (15, 5)
307
(acc, value) => (acc._1 + value, acc._2 + 1), // seqOp
308
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // combOp
309
)
310
311
val valueCounts = sc.parallelize(Array("a", "b", "a", "c", "b")).countByValue()
312
// Result: Map("a" -> 2, "b" -> 2, "c" -> 1)
313
```
314
315
### Statistical Actions
316
317
For RDDs containing numeric types, additional statistical operations are available through implicit conversions:
318
319
**min/max**: Find minimum/maximum values
320
```scala { .api }
321
def min()(implicit ord: Ordering[T]): T
322
def max()(implicit ord: Ordering[T]): T
323
```
324
325
```scala
326
val numbers = sc.parallelize(Array(5, 1, 9, 3, 7))
327
val minimum = numbers.min() // 1
328
val maximum = numbers.max() // 9
329
```
330
331
### Iterator Actions
332
333
**foreach**: Apply a function to each element (side effects only)
334
```scala { .api }
335
def foreach(f: T => Unit): Unit
336
```
337
338
**foreachPartition**: Apply a function to each partition
339
```scala { .api }
340
def foreachPartition(f: Iterator[T] => Unit): Unit
341
```
342
343
**toLocalIterator**: Return an iterator that consumes each partition sequentially
344
```scala { .api }
345
def toLocalIterator: Iterator[T]
346
```
347
348
```scala
349
val data = sc.parallelize(Array(1, 2, 3, 4, 5))
350
351
// Print each element (for debugging)
352
data.foreach(println)
353
354
// Process each partition (e.g., write to database)
355
data.foreachPartition { partition =>
356
// Setup database connection
357
partition.foreach { element =>
358
// Insert element into database
359
}
360
// Close database connection
361
}
362
```
363
364
## Save Operations
365
366
**saveAsTextFile**: Save RDD as text files
367
```scala { .api }
368
def saveAsTextFile(path: String): Unit
369
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
370
```
371
372
**saveAsObjectFile**: Save as SequenceFile of serialized objects
373
```scala { .api }
374
def saveAsObjectFile(path: String): Unit
375
```
376
377
```scala
378
val data = sc.parallelize(Array(1, 2, 3, 4, 5))
379
380
// Save as text files
381
data.saveAsTextFile("hdfs://path/to/output")
382
383
// Save with compression
384
import org.apache.hadoop.io.compress.GzipCodec
385
data.saveAsTextFile("hdfs://path/to/compressed", classOf[GzipCodec])
386
387
// Save as object file
388
data.saveAsObjectFile("hdfs://path/to/objects")
389
```
390
391
## RDD Properties and Metadata
392
393
**Partition Information:**
394
```scala { .api }
395
def partitions: Array[Partition] // Get partition array
396
def getNumPartitions: Int // Get number of partitions
397
def partitioner: Option[Partitioner] // Get partitioner if any
398
```
399
400
**Dependencies and Lineage:**
401
```scala { .api }
402
def dependencies: Seq[Dependency[_]] // RDD dependencies
403
def toDebugString: String // Debug string showing lineage
404
```
405
406
**Naming and Context:**
407
```scala { .api }
408
def setName(name: String): RDD[T] // Set RDD name for monitoring
409
def name: String // Get RDD name
410
def id: Int // Unique RDD identifier
411
def sparkContext: SparkContext // The SparkContext that created this RDD
412
```
413
414
```scala
415
val data = sc.parallelize(Array(1, 2, 3, 4, 5), 3).setName("MyRDD")
416
417
println(s"Partitions: ${data.getNumPartitions}") // Partitions: 3
418
println(s"Name: ${data.name}") // Name: MyRDD
419
println(s"ID: ${data.id}") // ID: 0
420
println(data.toDebugString) // Shows RDD lineage
421
```
422
423
## Type Conversions
424
425
**toJavaRDD**: Convert to Java RDD
426
```scala { .api }
427
def toJavaRDD(): JavaRDD[T]
428
```
429
430
## Error Handling and Common Exceptions
431
432
RDD operations can fail for various reasons. Understanding common error scenarios helps with debugging and building robust applications.
433
434
### Common RDD Exceptions
435
436
**SparkException**: General Spark execution errors
437
```scala
438
// Task failure due to out of memory, serialization issues, etc.
439
try {
440
val result = rdd.collect()
441
} catch {
442
case e: SparkException => println(s"Spark execution failed: ${e.getMessage}")
443
}
444
```
445
446
**TaskResultLost**: Task results lost due to network or node failure
447
```scala
448
// Typically indicates node failure or network issues
449
// Spark automatically retries, but may eventually fail
450
```
451
452
**OutOfMemoryError**: Driver or executor runs out of memory
453
```scala
454
// Common with collect() on large datasets
455
try {
456
val allData = largeRDD.collect() // Dangerous!
457
} catch {
458
case _: OutOfMemoryError =>
459
println("Dataset too large for collect(). Use take() or write to storage.")
460
}
461
```
462
463
### RDD Action Error Scenarios
464
465
**collect()**: Can cause OutOfMemoryError if result doesn't fit in driver memory
466
```scala
467
// Safe: Use take() for sampling
468
val sample = rdd.take(100)
469
470
// Dangerous: Collect entire large dataset
471
val all = rdd.collect() // May cause OOM
472
```
473
474
**reduce()**: Fails if RDD is empty
475
```scala
476
try {
477
val sum = rdd.reduce(_ + _)
478
} catch {
479
case e: UnsupportedOperationException =>
480
println("Cannot reduce empty RDD")
481
}
482
483
// Safe alternative
484
val sum = rdd.fold(0)(_ + _) // Works with empty RDDs
485
```
486
487
**first()**: Throws exception if RDD is empty
488
```scala
489
try {
490
val firstElement = rdd.first()
491
} catch {
492
case e: UnsupportedOperationException =>
493
println("RDD is empty")
494
}
495
496
// Safe alternative
497
val maybeFirst = rdd.take(1).headOption
498
```
499
500
### Serialization Errors
501
502
**NotSerializableException**: Objects must be serializable for distribution
503
```scala
504
class NotSerializable {
505
def process(x: Int): Int = x * 2
506
}
507
508
val processor = new NotSerializable()
509
510
// This will fail at runtime
511
val result = rdd.map(x => processor.process(x)) // NotSerializableException
512
513
// Solution: Make objects serializable or use functions
514
val result = rdd.map(x => x * 2) // Safe
515
```
516
517
### Partition and File Errors
518
519
**FileNotFoundException**: When reading non-existent files
520
```scala
521
try {
522
val data = sc.textFile("hdfs://nonexistent/path")
523
data.count() // Error occurs on action, not creation
524
} catch {
525
case e: FileNotFoundException =>
526
println(s"File not found: ${e.getMessage}")
527
}
528
```
529
530
**InvalidInputException**: Malformed input data
531
```scala
532
// Handle corrupted or malformed files
533
try {
534
val data = sc.sequenceFile[String, String]("path/to/corrupted/file")
535
data.collect()
536
} catch {
537
case e: InvalidInputException =>
538
println(s"Invalid input format: ${e.getMessage}")
539
}
540
```
541
542
### Network and Cluster Errors
543
544
**Connection timeouts**: Network issues between nodes
545
- Configure `spark.network.timeout` for longer operations
546
- Use `spark.sql.adaptive.coalescePartitions.enabled=true` to reduce network overhead
547
548
**Shuffle failures**: Data shuffle operations failing
549
- Common with operations like `groupByKey`, `join`, `distinct`
550
- Increase `spark.serializer.objectStreamReset` interval
551
- Consider using `reduceByKey` instead of `groupByKey`
552
553
### Best Practices for Error Handling
554
555
1. **Avoid collect() on large datasets**: Use `take()`, `sample()`, or write to storage
556
2. **Handle empty RDDs**: Use `fold()` instead of `reduce()`, check `isEmpty()` before actions
557
3. **Ensure serializability**: Keep closures simple, avoid capturing non-serializable objects
558
4. **Monitor resource usage**: Configure appropriate executor memory and cores
559
5. **Use checkpointing**: For long lineages, checkpoint intermediate results
560
6. **Handle file system errors**: Validate paths exist before reading, use try-catch for file operations
561
562
```scala
563
// Robust RDD processing pattern
564
def processRDDSafely[T](rdd: RDD[T]): Option[Array[T]] = {
565
try {
566
if (rdd.isEmpty()) {
567
println("Warning: RDD is empty")
568
None
569
} else {
570
// Use take() instead of collect() for safety
571
val sample = rdd.take(1000)
572
Some(sample)
573
}
574
} catch {
575
case e: SparkException =>
576
println(s"Spark processing failed: ${e.getMessage}")
577
None
578
case e: OutOfMemoryError =>
579
println("Out of memory - dataset too large")
580
None
581
}
582
}
583
```
584
585
This comprehensive coverage of RDD operations provides the foundation for all data processing in Apache Spark. Understanding these operations and their potential failure modes is crucial for effective Spark programming.