0
# RDD Operations
1
2
## RDD Base Class
3
4
Resilient Distributed Dataset (RDD) is the fundamental abstraction in Apache Spark. RDDs are fault-tolerant collections of elements that can be operated on in parallel.
5
6
```scala { .api }
7
abstract class RDD[T: ClassTag](
8
@transient private var _sc: SparkContext,
9
@transient private var deps: Seq[Dependency[_]]
10
) {
11
// Transformations (Lazy)
12
def map[U: ClassTag](f: T => U): RDD[U]
13
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
14
def filter(f: T => Boolean): RDD[T]
15
def distinct(numPartitions: Int = partitions.length): RDD[T]
16
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
17
def union(other: RDD[T]): RDD[T]
18
def intersection(other: RDD[T]): RDD[T]
19
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
20
def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])]
21
def pipe(command: String): RDD[String]
22
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]
23
def repartition(numPartitions: Int): RDD[T]
24
def sortBy[K](f: T => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
25
def keyBy[K](f: T => K): RDD[(K, T)]
26
27
// Actions (Eager)
28
def collect(): Array[T]
29
def count(): Long
30
def first(): T
31
def take(num: Int): Array[T]
32
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
33
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
34
def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
35
def reduce(f: (T, T) => T): T
36
def fold(zeroValue: T)(op: (T, T) => T): T
37
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
38
def treeAggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U, depth: Int = 2): U
39
def foreach(f: T => Unit): Unit
40
def foreachPartition(f: Iterator[T] => Unit): Unit
41
42
// I/O Actions
43
def saveAsTextFile(path: String): Unit
44
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
45
def saveAsObjectFile(path: String): Unit
46
47
// Persistence
48
def persist(): RDD[T]
49
def persist(newLevel: StorageLevel): RDD[T]
50
def cache(): RDD[T]
51
def unpersist(blocking: Boolean = false): RDD[T]
52
def getStorageLevel: StorageLevel
53
def checkpoint(): Unit
54
def isCheckpointed: Boolean
55
def getCheckpointFile: Option[String]
56
57
// Metadata
58
def partitions: Array[Partition]
59
def partitioner: Option[Partitioner]
60
def getNumPartitions: Int
61
def dependencies: Seq[Dependency[_]]
62
def preferredLocations(split: Partition): Seq[String]
63
def context: SparkContext
64
def id: Int
65
def name: String
66
def setName(name: String): RDD[T]
67
}
68
```
69
70
## PairRDDFunctions
71
72
Operations available on RDDs of key-value pairs. These operations are available through implicit conversions.
73
74
```scala { .api }
75
class PairRDDFunctions[K, V](self: RDD[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) {
76
// Grouping Operations
77
def groupByKey(): RDD[(K, Iterable[V])]
78
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
79
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
80
81
// Reduction Operations
82
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
83
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
84
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
85
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
86
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
87
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
88
89
// Aggregation Operations
90
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
91
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
92
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
93
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
94
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
95
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
96
97
// Partitioning
98
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
99
100
// Join Operations
101
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
102
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
103
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
104
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
105
def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]
106
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]
107
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
108
def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]
109
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]
110
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
111
def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]
112
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]
113
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
114
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
115
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]
116
117
// Set Operations
118
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
119
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
120
def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]
121
122
// Lookups and Collection
123
def lookup(key: K): Seq[V]
124
def collectAsMap(): Map[K, V]
125
def countByKey(): Map[K, Long]
126
def countByKeyApprox(timeout: Long, confidence: Double = 0.95): PartialResult[Map[K, BoundedDouble]]
127
128
// Value Operations
129
def mapValues[U](f: V => U): RDD[(K, U)]
130
def flatMapValues[U](f: V => IterableOnce[U]): RDD[(K, U)]
131
def keys: RDD[K]
132
def values: RDD[V]
133
134
// Sorting
135
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
136
def sortBy[B](f: ((K, V)) => B, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[B], ctag: ClassTag[B]): RDD[(K, V)]
137
138
// I/O Operations
139
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[F], codec: Class[_ <: CompressionCodec]): Unit
140
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[F], conf: JobConf = new JobConf(self.context.hadoopConfiguration), codec: Option[Class[_ <: CompressionCodec]] = None): Unit
141
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[F], conf: Configuration = self.context.hadoopConfiguration): Unit
142
}
143
```
144
145
## DoubleRDDFunctions
146
147
Statistical operations available on RDDs of Double values.
148
149
```scala { .api }
150
class DoubleRDDFunctions(self: RDD[Double]) {
151
// Statistical Operations
152
def sum(): Double
153
def stats(): StatCounter
154
def mean(): Double
155
def variance(): Double
156
def stdev(): Double
157
def sampleStdev(): Double
158
def sampleVariance(): Double
159
160
// Histogram Operations
161
def histogram(buckets: Array[Double]): Array[Long]
162
def histogram(buckets: Int): (Array[Double], Array[Long])
163
}
164
```
165
166
## SequenceFileRDDFunctions
167
168
Operations for saving RDDs as Hadoop SequenceFiles.
169
170
```scala { .api }
171
class SequenceFileRDDFunctions[K, V](self: RDD[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V], keyWritableFactory: WritableFactory[K], valueWritableFactory: WritableFactory[V]) {
172
def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit
173
}
174
```
175
176
## Usage Examples
177
178
### Basic Transformations
179
```scala
180
val numbers = sc.parallelize(1 to 100)
181
182
// Map transformation
183
val squares = numbers.map(x => x * x)
184
185
// Filter transformation
186
val evenNumbers = numbers.filter(_ % 2 == 0)
187
188
// FlatMap transformation
189
val words = sc.parallelize(Array("hello world", "foo bar"))
190
val allWords = words.flatMap(_.split(" "))
191
```
192
193
### Key-Value Operations
194
```scala
195
val pairs = sc.parallelize(Array(("a", 1), ("b", 2), ("a", 3), ("b", 4)))
196
197
// Reduce by key
198
val sums = pairs.reduceByKey(_ + _) // ("a", 4), ("b", 6)
199
200
// Group by key
201
val grouped = pairs.groupByKey() // ("a", [1, 3]), ("b", [2, 4])
202
203
// Join operations
204
val other = sc.parallelize(Array(("a", "apple"), ("b", "banana")))
205
val joined = pairs.join(other) // ("a", (1, "apple")), ("a", (3, "apple")), etc.
206
```
207
208
### Actions
209
```scala
210
val data = sc.parallelize(1 to 100)
211
212
// Collect all data to driver
213
val collected = data.collect() // Array[Int]
214
215
// Count elements
216
val count = data.count() // 100
217
218
// Reduce
219
val sum = data.reduce(_ + _) // 5050
220
221
// Take first n elements
222
val first10 = data.take(10) // Array(1, 2, 3, ..., 10)
223
```
224
225
### Persistence
226
```scala
227
val expensiveRDD = data.map(complexComputation)
228
229
// Cache in memory for reuse
230
expensiveRDD.cache()
231
232
// Or specify storage level
233
expensiveRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
234
235
// Use multiple times (computed only once due to caching)
236
val result1 = expensiveRDD.count()
237
val result2 = expensiveRDD.collect()
238
239
// Remove from cache when done
240
expensiveRDD.unpersist()
241
```
242
243
## Core Types
244
245
```scala { .api }
246
// Core Spark type for runtime type information
247
trait ClassTag[T] {
248
def runtimeClass: Class[_]
249
}
250
251
// RDD partition representation
252
trait Partition extends Serializable {
253
def index: Int
254
}
255
256
// Data partitioning strategy
257
abstract class Partitioner extends Serializable {
258
def numPartitions: Int
259
def getPartition(key: Any): Int
260
}
261
262
// RDD dependency representation
263
abstract class Dependency[T] extends Serializable
264
265
// Serialization framework
266
abstract class Serializer {
267
def newInstance(): SerializerInstance
268
}
269
270
abstract class SerializerInstance {
271
def serialize[T: ClassTag](t: T): ByteBuffer
272
def deserialize[T: ClassTag](bytes: ByteBuffer): T
273
}
274
```