0
# RDD Operations
1
2
Resilient Distributed Dataset API providing transformations and actions for fault-tolerant distributed data processing on large datasets.
3
4
## Capabilities
5
6
### Base RDD Class
7
8
Abstract base class for all RDDs providing core distributed dataset functionality with automatic fault recovery through lineage tracking.
9
10
```scala { .api }
11
/**
12
* Resilient Distributed Dataset - immutable distributed collection
13
*/
14
abstract class RDD[T: ClassTag] extends Serializable {
15
// Transformations (lazy evaluation)
16
17
/** Transform each element using provided function */
18
def map[U: ClassTag](f: T => U): RDD[U]
19
20
/** Transform each element to sequence and flatten results */
21
def flatMap[U: ClassTag](f: T => IterableOnce[U]): RDD[U]
22
23
/** Filter elements matching predicate */
24
def filter(f: T => Boolean): RDD[T]
25
26
/** Map each partition with partition index */
27
def mapPartitionsWithIndex[U: ClassTag](
28
f: (Int, Iterator[T]) => Iterator[U],
29
preservesPartitioning: Boolean = false
30
): RDD[U]
31
32
/** Sample fraction of elements */
33
def sample(
34
withReplacement: Boolean,
35
fraction: Double,
36
seed: Long = Utils.random.nextLong
37
): RDD[T]
38
39
/** Return union of this RDD and another */
40
def union(other: RDD[T]): RDD[T]
41
42
/** Return intersection with another RDD */
43
def intersection(other: RDD[T]): RDD[T]
44
45
/** Return distinct elements */
46
def distinct(numPartitions: Int = partitions.length): RDD[T]
47
48
/** Group by key function */
49
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
50
51
/** Reduce partitions to specified number */
52
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]
53
54
/** Repartition to specified number */
55
def repartition(numPartitions: Int): RDD[T]
56
57
/** Sort RDD elements */
58
def sortBy[K](
59
f: T => K,
60
ascending: Boolean = true,
61
numPartitions: Int = this.partitions.length
62
)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
63
64
/** Zip with another RDD */
65
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
66
67
/** Zip with element indices */
68
def zipWithIndex(): RDD[(T, Long)]
69
70
/** Zip with unique IDs */
71
def zipWithUniqueId(): RDD[(T, Long)]
72
73
// Actions (trigger computation)
74
75
/** Collect all elements to driver */
76
def collect(): Array[T]
77
78
/** Count number of elements */
79
def count(): Long
80
81
/** Return first element */
82
def first(): T
83
84
/** Take first n elements */
85
def take(num: Int): Array[T]
86
87
/** Take ordered elements */
88
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
89
90
/** Take sample */
91
def takeSample(
92
withReplacement: Boolean,
93
num: Int,
94
seed: Long = Utils.random.nextLong
95
): Array[T]
96
97
/** Reduce elements using function */
98
def reduce(f: (T, T) => T): T
99
100
/** Fold elements with initial value */
101
def fold(zeroValue: T)(op: (T, T) => T): T
102
103
/** Aggregate with different types */
104
def aggregate[U: ClassTag](zeroValue: U)(
105
seqOp: (U, T) => U,
106
combOp: (U, U) => U
107
): U
108
109
/** Apply function to each element */
110
def foreach(f: T => Unit): Unit
111
112
/** Apply function to each partition */
113
def foreachPartition(f: Iterator[T] => Unit): Unit
114
115
/** Save as text file */
116
def saveAsTextFile(path: String): Unit
117
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
118
119
// Persistence
120
121
/** Persist RDD with storage level */
122
def persist(newLevel: StorageLevel): this.type
123
124
/** Persist RDD with default storage level (MEMORY_ONLY) */
125
def persist(): this.type
126
127
/** Cache RDD in memory */
128
def cache(): this.type
129
130
/** Remove persisted data */
131
def unpersist(blocking: Boolean = false): this.type
132
133
/** Mark RDD for checkpointing */
134
def checkpoint(): Unit
135
136
/** Check if RDD is checkpointed */
137
def isCheckpointed: Boolean
138
139
// Metadata
140
141
/** Get partitions */
142
def partitions: Array[Partition]
143
144
/** Get partitioner */
145
def partitioner: Option[Partitioner]
146
147
/** Check if RDD is empty */
148
def isEmpty(): Boolean
149
150
/** Get storage level */
151
def getStorageLevel: StorageLevel
152
}
153
```
154
155
### PairRDDFunctions
156
157
Additional operations available on RDDs of key-value pairs through implicit conversion.
158
159
```scala { .api }
160
/**
161
* Extra functions for RDDs of (key, value) pairs
162
*/
163
class PairRDDFunctions[K, V](self: RDD[(K, V)]) {
164
/** Group values by key */
165
def groupByKey(): RDD[(K, Iterable[V])]
166
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
167
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
168
169
/** Reduce values by key */
170
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
171
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
172
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
173
174
/** Aggregate values by key */
175
def aggregateByKey[U: ClassTag](zeroValue: U)(
176
seqOp: (U, V) => U,
177
combOp: (U, U) => U
178
): RDD[(K, U)]
179
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(
180
seqOp: (U, V) => U,
181
combOp: (U, U) => U
182
): RDD[(K, U)]
183
184
/** Fold values by key */
185
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
186
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
187
188
/** Combine values by key */
189
def combineByKey[C](
190
createCombiner: V => C,
191
mergeValue: (C, V) => C,
192
mergeCombiners: (C, C) => C
193
): RDD[(K, C)]
194
195
/** Join with another pair RDD */
196
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
197
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
198
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
199
200
/** Left outer join */
201
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
202
203
/** Right outer join */
204
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
205
206
/** Full outer join */
207
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
208
209
/** Cogroup with other RDDs */
210
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
211
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]):
212
RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
213
214
/** Get values for keys */
215
def lookup(key: K): Seq[V]
216
217
/** Collect as map */
218
def collectAsMap(): Map[K, V]
219
220
/** Count by key */
221
def countByKey(): Map[K, Long]
222
223
/** Sort by key */
224
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
225
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[(K, V)]
226
227
/** Get keys only */
228
def keys: RDD[K]
229
230
/** Get values only */
231
def values: RDD[V]
232
233
/** Subtract by key */
234
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
235
236
/** Save as Hadoop file */
237
def saveAsHadoopFile[F <: OutputFormat[K, V]](
238
path: String,
239
keyClass: Class[_],
240
valueClass: Class[_],
241
outputFormatClass: Class[F]
242
): Unit
243
244
/** Save as new Hadoop API file */
245
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](
246
path: String,
247
keyClass: Class[_],
248
valueClass: Class[_],
249
outputFormatClass: Class[F]
250
): Unit
251
}
252
```
253
254
### DoubleRDDFunctions
255
256
Statistical operations available on RDDs of numeric values through implicit conversion.
257
258
```scala { .api }
259
/**
260
* Extra functions for RDDs of doubles
261
*/
262
class DoubleRDDFunctions(self: RDD[Double]) {
263
/** Compute mean */
264
def mean(): Double
265
266
/** Compute variance */
267
def variance(): Double
268
269
/** Compute standard deviation */
270
def stdev(): Double
271
272
/** Compute sum */
273
def sum(): Double
274
275
/** Compute statistics */
276
def stats(): StatCounter
277
278
/** Compute histogram */
279
def histogram(buckets: Int): (Array[Double], Array[Long])
280
def histogram(buckets: Array[Double]): Array[Long]
281
}
282
283
/**
284
* Statistics counter for numeric RDDs
285
*/
286
class StatCounter extends Serializable {
287
def count: Long
288
def mean: Double
289
def sum: Double
290
def min: Double
291
def max: Double
292
def variance: Double
293
def stdev: Double
294
}
295
```
296
297
### Specialized RDD Types
298
299
```scala { .api }
300
/** RDD from parallel collection */
301
class ParallelCollectionRDD[T: ClassTag](
302
@transient sc: SparkContext,
303
@transient data: Seq[T],
304
numSlices: Int
305
) extends RDD[T]
306
307
/** RDD from Hadoop InputFormat */
308
class HadoopRDD[K, V](
309
sc: SparkContext,
310
conf: JobConf,
311
inputFormatClass: Class[_ <: InputFormat[K, V]],
312
keyClass: Class[K],
313
valueClass: Class[V],
314
minPartitions: Int
315
) extends RDD[(K, V)]
316
317
/** RDD from new Hadoop API */
318
class NewHadoopRDD[K, V](
319
sc: SparkContext,
320
inputFormatClass: Class[_ <: NewInputFormat[K, V]],
321
keyClass: Class[K],
322
valueClass: Class[V],
323
conf: Configuration
324
) extends RDD[(K, V)]
325
326
/** RDD from JDBC */
327
class JdbcRDD[T: ClassTag](
328
sc: SparkContext,
329
getConnection: () => Connection,
330
sql: String,
331
lowerBound: Long,
332
upperBound: Long,
333
numPartitions: Int,
334
mapRow: ResultSet => T
335
) extends RDD[T]
336
337
/** Empty RDD */
338
class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T]
339
340
/** Union of multiple RDDs */
341
class UnionRDD[T: ClassTag](sc: SparkContext, rdds: Seq[RDD[T]]) extends RDD[T]
342
343
/** Coalesced RDD with fewer partitions */
344
class CoalescedRDD[T: ClassTag](
345
prev: RDD[T],
346
maxPartitions: Int,
347
shuffle: Boolean = false
348
) extends RDD[T]
349
350
/** Shuffled RDD */
351
class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
352
prev: RDD[_ <: Product2[K, V]],
353
part: Partitioner
354
) extends RDD[(K, C)]
355
```
356
357
**Usage Examples:**
358
359
```scala
360
import org.apache.spark.{SparkContext, SparkConf}
361
362
val sc = new SparkContext(new SparkConf().setAppName("RDD Example"))
363
364
// Basic transformations
365
val numbers = sc.parallelize(1 to 100)
366
val squares = numbers.map(x => x * x)
367
val evens = numbers.filter(_ % 2 == 0)
368
369
// Pair RDD operations
370
val pairs = sc.parallelize(List(("a", 1), ("b", 2), ("a", 3)))
371
val grouped = pairs.groupByKey()
372
val sums = pairs.reduceByKey(_ + _)
373
374
// Statistical operations
375
val doubles = sc.parallelize(Array(1.0, 2.0, 3.0, 4.0, 5.0))
376
val avg = doubles.mean()
377
val stats = doubles.stats()
378
379
// Actions
380
val result = squares.take(10)
381
val total = numbers.reduce(_ + _)
382
squares.saveAsTextFile("output/squares")
383
384
sc.stop()
385
```
386
387
## Partitioning
388
389
RDDs maintain partitioning information to optimize distributed operations and minimize data shuffling.
390
391
```scala { .api }
392
trait Partition extends Serializable {
393
def index: Int
394
}
395
396
case class TaskContext(
397
stageId: Int,
398
stageAttemptNumber: Int,
399
partitionId: Int,
400
taskAttemptId: Long,
401
attemptNumber: Int,
402
taskMemoryManager: TaskMemoryManager,
403
localProperties: Properties,
404
metricsSystem: MetricsSystem
405
)
406
```