0
# RDD Operations
1
2
RDD (Resilient Distributed Dataset) operations form the core of Spark's distributed computing model. Operations are divided into transformations (lazy evaluation) and actions (trigger computation). All RDDs are immutable and fault-tolerant through lineage tracking.
3
4
## Capabilities
5
6
### Core Transformations
7
8
Lazy operations that return new RDDs without triggering computation.
9
10
```scala { .api }
11
abstract class RDD[T: ClassTag] {
12
// Element-wise transformations
13
def map[U: ClassTag](f: T => U): RDD[U]
14
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
15
def filter(f: T => Boolean): RDD[T]
16
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],
17
preservesPartitioning: Boolean = false): RDD[U]
18
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U],
19
preservesPartitioning: Boolean = false): RDD[U]
20
}
21
```
22
23
**Usage Examples:**
24
```scala
25
val numbers = sc.parallelize(1 to 10)
26
27
// Transform each element
28
val doubled = numbers.map(_ * 2)
29
30
// Filter elements
31
val evens = numbers.filter(_ % 2 == 0)
32
33
// Flat map: one-to-many transformation
34
val words = sc.parallelize(Seq("hello world", "scala spark"))
35
val allWords = words.flatMap(_.split(" "))
36
37
// Process entire partitions
38
val partitionSums = numbers.mapPartitions { iter =>
39
Iterator(iter.sum)
40
}
41
```
42
43
### Set Operations
44
45
Mathematical set operations on RDDs.
46
47
```scala { .api }
48
// Set operations
49
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
50
def distinct(): RDD[T]
51
def union(other: RDD[T]): RDD[T]
52
def ++(other: RDD[T]): RDD[T] // Alias for union
53
def intersection(other: RDD[T]): RDD[T]
54
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
55
def subtract(other: RDD[T]): RDD[T]
56
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
57
def cartesian[U](other: RDD[U]): RDD[(T, U)]
58
```
59
60
**Usage Examples:**
61
```scala
62
val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5))
63
val rdd2 = sc.parallelize(Seq(4, 5, 6, 7, 8))
64
65
// Union: combine all elements
66
val combined = rdd1.union(rdd2) // [1,2,3,4,5,4,5,6,7,8]
67
68
// Intersection: common elements
69
val common = rdd1.intersection(rdd2) // [4,5]
70
71
// Subtract: elements in rdd1 but not in rdd2
72
val difference = rdd1.subtract(rdd2) // [1,2,3]
73
74
// Remove duplicates
75
val unique = combined.distinct() // [1,2,3,4,5,6,7,8]
76
77
// Cartesian product
78
val cartesian = rdd1.cartesian(rdd2) // [(1,4), (1,5), (1,6), ...]
79
```
80
81
### Partitioning Operations
82
83
Control data distribution across cluster nodes.
84
85
```scala { .api }
86
// Repartitioning
87
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
88
def coalesce(numPartitions: Int, shuffle: Boolean = false,
89
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
90
(implicit ord: Ordering[T] = null): RDD[T]
91
92
// Partitioning info
93
def partitions: Array[Partition]
94
def getNumPartitions: Int
95
def partitioner: Option[Partitioner]
96
```
97
98
**Usage Examples:**
99
```scala
100
val data = sc.parallelize(1 to 100, 10) // 10 partitions
101
102
// Increase partitions (causes shuffle)
103
val morePartitions = data.repartition(20)
104
105
// Decrease partitions (avoid shuffle when possible)
106
val fewerPartitions = data.coalesce(5)
107
108
// Check partition count
109
println(s"Original: ${data.getNumPartitions} partitions")
110
println(s"Repartitioned: ${morePartitions.getNumPartitions} partitions")
111
```
112
113
### Sampling Operations
114
115
Random sampling and data splitting operations.
116
117
```scala { .api }
118
// Sampling
119
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
120
def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
121
def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]
122
```
123
124
**Usage Examples:**
125
```scala
126
val data = sc.parallelize(1 to 1000)
127
128
// Sample 10% of data without replacement
129
val sample10pct = data.sample(withReplacement = false, fraction = 0.1)
130
131
// Take exactly 50 random elements
132
val sample50 = data.takeSample(withReplacement = false, num = 50)
133
134
// Split into training (80%) and test (20%) sets
135
val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 12345)
136
```
137
138
### Core Actions
139
140
Operations that trigger computation and return results to the driver.
141
142
```scala { .api }
143
// Collection actions
144
def collect(): Array[T]
145
def collectAsMap(): Map[K, V] // For RDD[(K,V)]
146
def toLocalIterator: Iterator[T]
147
148
// Reduction actions
149
def reduce(f: (T, T) => T): T
150
def fold(zeroValue: T)(op: (T, T) => T): T
151
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
152
def treeReduce(f: (T, T) => T, depth: Int = 2): T
153
def treeAggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U,
154
depth: Int = 2): U
155
```
156
157
**Usage Examples:**
158
```scala
159
val numbers = sc.parallelize(1 to 100)
160
161
// Collect all elements (use with caution for large datasets)
162
val allNumbers = numbers.collect()
163
164
// Reduce to single value
165
val sum = numbers.reduce(_ + _)
166
val max = numbers.reduce(math.max)
167
168
// Fold with initial value
169
val sumWithInitial = numbers.fold(0)(_ + _)
170
171
// Complex aggregation
172
case class Stats(count: Int, sum: Int)
173
val stats = numbers.aggregate(Stats(0, 0))(
174
seqOp = (stats, value) => Stats(stats.count + 1, stats.sum + value),
175
combOp = (s1, s2) => Stats(s1.count + s2.count, s1.sum + s2.sum)
176
)
177
```
178
179
### Element Access Actions
180
181
Retrieve specific elements without collecting entire dataset.
182
183
```scala { .api }
184
// Element access
185
def first(): T
186
def take(num: Int): Array[T]
187
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
188
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
189
def count(): Long
190
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]
191
def isEmpty(): Boolean
192
```
193
194
**Usage Examples:**
195
```scala
196
val data = sc.parallelize(Seq(5, 2, 8, 1, 9, 3))
197
198
// Get first element
199
val firstElement = data.first() // 5
200
201
// Take first n elements
202
val firstThree = data.take(3) // [5, 2, 8]
203
204
// Take smallest/largest elements
205
val smallest = data.takeOrdered(3) // [1, 2, 3]
206
val largest = data.top(3) // [9, 8, 5]
207
208
// Count elements
209
val totalCount = data.count() // 6
210
211
// Count each value
212
val valueCounts = data.countByValue() // Map(5->1, 2->1, 8->1, ...)
213
```
214
215
### Side-Effect Actions
216
217
Perform operations on each element without returning values.
218
219
```scala { .api }
220
// Side effects
221
def foreach(f: T => Unit): Unit
222
def foreachPartition(f: Iterator[T] => Unit): Unit
223
```
224
225
**Usage Examples:**
226
```scala
227
val data = sc.parallelize(1 to 10)
228
229
// Print each element
230
data.foreach(println)
231
232
// Process each partition
233
data.foreachPartition { partition =>
234
// Setup expensive resources once per partition
235
val database = connectToDatabase()
236
partition.foreach { element =>
237
database.write(element)
238
}
239
database.close()
240
}
241
```
242
243
### Zip Operations
244
245
Combine RDDs by zipping elements together, creating key-value pairs with indices or unique IDs.
246
247
```scala { .api }
248
// Zip operations
249
def zip[U](other: RDD[U]): RDD[(T, U)]
250
def zipWithIndex(): RDD[(T, Long)]
251
def zipWithUniqueId(): RDD[(T, Long)]
252
def zipPartitions[B, V](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]
253
def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V]
254
```
255
256
**Usage Examples:**
257
```scala
258
val rdd1 = sc.parallelize(Seq("a", "b", "c"))
259
val rdd2 = sc.parallelize(Seq(1, 2, 3))
260
261
// Zip two RDDs (must have same number of partitions and elements)
262
val zipped = rdd1.zip(rdd2) // [("a", 1), ("b", 2), ("c", 3)]
263
264
// Zip with element indices
265
val withIndex = rdd1.zipWithIndex() // [("a", 0), ("b", 1), ("c", 2)]
266
267
// Zip with unique IDs
268
val withUniqueId = rdd1.zipWithUniqueId() // [("a", uniqueId1), ("b", uniqueId2), ...]
269
270
// Zip partitions with custom function
271
val numbers1 = sc.parallelize(1 to 6, 2) // 2 partitions: [1,2,3], [4,5,6]
272
val numbers2 = sc.parallelize(10 to 60 by 10, 2) // 2 partitions: [10,20,30], [40,50,60]
273
val sums = numbers1.zipPartitions(numbers2) { (iter1, iter2) =>
274
iter1.zip(iter2).map { case (a, b) => a + b }
275
}
276
// Result: [11, 22, 33, 44, 55, 66]
277
```
278
279
### Utility Operations
280
281
Additional utility methods for RDD management and identification.
282
283
```scala { .api }
284
// RDD naming and identification
285
def setName(name: String): this.type
286
def name: String
287
def id: Int
288
289
// Key creation
290
def keyBy[K](f: T => K): RDD[(K, T)]
291
292
// External process integration
293
def pipe(command: String): RDD[String]
294
def pipe(command: Seq[String]): RDD[String]
295
def pipe(command: Seq[String], env: Map[String, String]): RDD[String]
296
```
297
298
**Usage Examples:**
299
```scala
300
val data = sc.parallelize(1 to 100)
301
302
// Set RDD name for debugging
303
data.setName("MyNumbers")
304
println(s"RDD name: ${data.name}")
305
306
// Create key-value pairs
307
val keyValue = data.keyBy(_ % 10) // Key by remainder when divided by 10
308
309
// Pipe through external command
310
val lines = sc.textFile("input.txt")
311
val processed = lines.pipe("grep ERROR") // Use external grep command
312
```
313
314
## RDD Lineage and Dependencies
315
316
### Lineage Tracking
317
318
RDD lineage enables fault tolerance through automatic recomputation.
319
320
```scala { .api }
321
// Lineage information
322
def dependencies: Seq[Dependency[_]]
323
def toDebugString: String
324
def getCreationSite: String
325
```
326
327
**Usage Example:**
328
```scala
329
val data = sc.parallelize(1 to 100)
330
val processed = data.filter(_ > 50).map(_ * 2)
331
332
// View the computation lineage
333
println(processed.toDebugString)
334
// Output shows the DAG of operations
335
```
336
337
### Advanced Execution Features
338
339
Advanced execution modes and resource management for specialized workloads.
340
341
```scala { .api }
342
// Barrier execution mode
343
def barrier(): RDDBarrier[T]
344
345
// Resource profiles
346
def withResources(rp: ResourceProfile): RDD[T]
347
```
348
349
**Barrier Execution:**
350
```scala
351
// Barrier execution ensures all tasks start simultaneously
352
// Useful for distributed deep learning and synchronous processing
353
val data = sc.parallelize(1 to 100, 4)
354
val barriered = data.barrier().mapPartitions { iterator =>
355
// All partitions start processing at the same time
356
// Useful for distributed model training
357
processWithBarrier(iterator)
358
}
359
```
360
361
**Resource Profiles:**
362
```scala
363
import org.apache.spark.resource.ResourceProfile
364
365
// Define custom resource requirements
366
val rp = new ResourceProfileBuilder()
367
.require(ExecutorResourceRequests()
368
.memory("4g")
369
.cores(2)
370
.resource("gpu", 1))
371
.build()
372
373
// Apply resource profile to specific RDD operations
374
val gpuRDD = data.withResources(rp).map { item =>
375
// This operation will run on executors with GPU resources
376
processWithGPU(item)
377
}
378
```
379
380
## Types
381
382
```scala { .api }
383
// Core RDD abstraction
384
abstract class RDD[T: ClassTag](
385
@transient private var _sc: SparkContext,
386
@transient private var deps: Seq[Dependency[_]]
387
) extends Serializable
388
389
// Partition representation
390
trait Partition extends Serializable {
391
def index: Int
392
}
393
394
// Dependency types
395
abstract class Dependency[T] extends Serializable
396
class NarrowDependency[T](rdd: RDD[T]) extends Dependency[T]
397
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
398
@transient _rdd: RDD[_ <: Product2[K, V]],
399
partitioner: Partitioner
400
) extends Dependency[Product2[K, V]]
401
402
// Task context for advanced operations
403
abstract class TaskContext extends Serializable {
404
def partitionId: Int
405
def stageId: Int
406
def taskAttemptId: Long
407
}
408
```