0
# Pair RDD Operations
1
2
Advanced operations for key-value pair RDDs including grouping, joining, and aggregation operations essential for data processing workflows.
3
4
## Capabilities
5
6
### PairRDDFunctions
7
8
Extra operations available on RDDs of key-value pairs through implicit conversion from RDD[(K, V)].
9
10
```scala { .api }
11
/**
12
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
13
* These functions are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]).
14
*/
15
class PairRDDFunctions[K, V](self: RDD[(K, V)])
16
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
17
extends Logging with Serializable {
18
19
// GROUPING OPERATIONS
20
21
/** Group values by key */
22
def groupByKey(): RDD[(K, Iterable[V])]
23
24
/** Group values by key with custom number of partitions */
25
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
26
27
/** Group values by key with custom partitioner */
28
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
29
30
// REDUCTION OPERATIONS
31
32
/** Combine values with the same key using a function */
33
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
34
35
/** Combine values with the same key using function and custom partitions */
36
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
37
38
/** Combine values with the same key using function and custom partitioner */
39
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
40
41
/** Fold values by key with zero value */
42
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
43
44
/** Fold values by key with zero value and custom partitioner */
45
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
46
47
/** Aggregate values by key using different types for intermediate and final results */
48
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
49
50
/** Aggregate values by key with custom partitioner */
51
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
52
53
/** Generic function to combine values by key using combiners */
54
def combineByKey[C](createCombiner: V => C,
55
mergeValue: (C, V) => C,
56
mergeCombiners: (C, C) => C): RDD[(K, C)]
57
58
/** Generic function to combine values by key with custom partitioner */
59
def combineByKey[C](createCombiner: V => C,
60
mergeValue: (C, V) => C,
61
mergeCombiners: (C, C) => C,
62
partitioner: Partitioner): RDD[(K, C)]
63
64
// JOIN OPERATIONS
65
66
/** Inner join with another RDD */
67
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
68
69
/** Inner join with custom partitioner */
70
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
71
72
/** Left outer join with another RDD */
73
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
74
75
/** Left outer join with custom partitioner */
76
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]
77
78
/** Right outer join with another RDD */
79
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
80
81
/** Right outer join with custom partitioner */
82
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]
83
84
/** Full outer join with another RDD */
85
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
86
87
/** Full outer join with custom partitioner */
88
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]
89
90
/** Cogroup with another RDD (group by key across both RDDs) */
91
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
92
93
/** Cogroup with custom partitioner */
94
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]
95
96
/** Cogroup with two other RDDs */
97
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
98
99
// SET OPERATIONS
100
101
/** Subtract by key (remove elements with keys present in other RDD) */
102
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
103
104
/** Subtract by key with custom partitioner */
105
def subtractByKey[W: ClassTag](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, V)]
106
107
// SORTING AND PARTITIONING
108
109
/** Sort RDD by key */
110
def sortByKey(ascending: Boolean = true): RDD[(K, V)]
111
112
/** Sort RDD by key with custom number of partitions */
113
def sortByKey(ascending: Boolean, numPartitions: Int): RDD[(K, V)]
114
115
/** Partition RDD by key using specified partitioner */
116
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
117
118
// EXTRACTION OPERATIONS
119
120
/** Get RDD of keys only */
121
def keys: RDD[K]
122
123
/** Get RDD of values only */
124
def values: RDD[V]
125
126
/** Transform only the values (keep keys unchanged) */
127
def mapValues[U](f: V => U): RDD[(K, U)]
128
129
/** FlatMap only the values (keep keys unchanged) */
130
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]
131
132
// ACTIONS
133
134
/** Count number of elements for each key */
135
def countByKey(): Map[K, Long]
136
137
/** Collect as a map (key -> single value, not suitable for duplicate keys) */
138
def collectAsMap(): Map[K, V]
139
140
/** Look up values for a specific key */
141
def lookup(key: K): Seq[V]
142
143
// HADOOP OUTPUT OPERATIONS
144
145
/** Save as Hadoop file using old MapReduce API */
146
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String,
147
keyClass: Class[_],
148
valueClass: Class[_],
149
outputFormatClass: Class[F],
150
conf: JobConf = new JobConf,
151
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
152
153
/** Save as Hadoop file using new MapReduce API */
154
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String,
155
keyClass: Class[_],
156
valueClass: Class[_],
157
outputFormatClass: Class[F],
158
conf: Configuration = new Configuration): Unit
159
160
/** Save as Hadoop dataset using old API */
161
def saveAsHadoopDataset(conf: JobConf): Unit
162
163
/** Save as Hadoop dataset using new API */
164
def saveAsNewAPIHadoopDataset(conf: Configuration): Unit
165
}
166
```
167
168
**Usage Examples:**
169
170
```scala
171
import org.apache.spark.{SparkContext, SparkConf}
172
import org.apache.spark.HashPartitioner
173
174
val sc = new SparkContext(new SparkConf().setAppName("Pair RDD Examples").setMaster("local[*]"))
175
176
// Create pair RDDs
177
val scores = sc.parallelize(Array(
178
("Alice", 85), ("Bob", 90), ("Alice", 92), ("Charlie", 78), ("Bob", 95)
179
))
180
181
val grades = sc.parallelize(Array(
182
("Alice", "A"), ("Bob", "A+"), ("Charlie", "B"), ("David", "C")
183
))
184
185
// Grouping operations
186
val groupedScores = scores.groupByKey() // RDD[(String, Iterable[Int])]
187
// Result: ("Alice", [85, 92]), ("Bob", [90, 95]), ("Charlie", [78])
188
189
val avgScores = scores.reduceByKey(_ + _).mapValues(_ / 2) // Average scores
190
// Result: ("Alice", 88.5), ("Bob", 92.5), ("Charlie", 78)
191
192
val maxScores = scores.reduceByKey(math.max) // Maximum score per student
193
// Result: ("Alice", 92), ("Bob", 95), ("Charlie", 78)
194
195
// Aggregation with different types
196
val stats = scores.aggregateByKey((0, 0))(
197
seqOp = { case ((sum, count), score) => (sum + score, count + 1) },
198
combOp = { case ((sum1, count1), (sum2, count2)) => (sum1 + sum2, count1 + count2) }
199
).mapValues { case (sum, count) => sum.toDouble / count }
200
201
// Join operations
202
val joined = scores.join(grades) // Inner join
203
// Result: ("Alice", (85, "A")), ("Alice", (92, "A")), ("Bob", (90, "A+")), ("Bob", (95, "A+")), ("Charlie", (78, "B"))
204
205
val leftJoin = scores.leftOuterJoin(grades) // Left outer join
206
// Includes all scores, grades as Option[String]
207
208
val fullJoin = scores.fullOuterJoin(grades) // Full outer join
209
// Includes all keys from both RDDs
210
211
// Cogroup (group by key across multiple RDDs)
212
val cogrouped = scores.cogroup(grades)
213
// Result: ("Alice", ([85, 92], ["A"])), ("Bob", ([90, 95], ["A+"])), etc.
214
215
// Set operations
216
val students = sc.parallelize(Array(("Alice", 1), ("Bob", 2), ("Charlie", 3)))
217
val withdrawn = sc.parallelize(Array(("Bob", "withdrawn"), ("David", "withdrawn")))
218
val activeStudents = students.subtractByKey(withdrawn)
219
// Result: ("Alice", 1), ("Charlie", 3)
220
221
// Sorting
222
val sortedScores = scores.sortByKey() // Sort by student name
223
val sortedByScore = scores.map(_.swap).sortByKey(false).map(_.swap) // Sort by score descending
224
225
// Key/value extraction
226
val studentNames = scores.keys.distinct() // RDD[String] of unique student names
227
val allScores = scores.values // RDD[Int] of all scores
228
val upperCaseNames = scores.mapValues(score => s"Score: $score") // Transform values only
229
230
// Actions
231
val countsByStudent = scores.countByKey() // Map[String, Long]
232
val scoreMap = scores.collectAsMap() // Map[String, Int] - warning: loses duplicates!
233
val aliceScores = scores.lookup("Alice") // Seq[Int] = List(85, 92)
234
235
sc.stop()
236
```
237
238
### Advanced Aggregation with combineByKey
239
240
The `combineByKey` function is the most general aggregation function for pair RDDs.
241
242
```scala { .api }
243
/**
244
* Generic function to combine values by key using three functions:
245
* @param createCombiner function to create initial combiner from first value
246
* @param mergeValue function to merge a value into an existing combiner
247
* @param mergeCombiners function to merge two combiners
248
*/
249
def combineByKey[C](createCombiner: V => C,
250
mergeValue: (C, V) => C,
251
mergeCombiners: (C, C) => C): RDD[(K, C)]
252
```
253
254
**Usage Example:**
255
256
```scala
257
// Compute mean score per student using combineByKey
258
val meanScores = scores.combineByKey(
259
createCombiner = (score: Int) => (score, 1), // (sum, count)
260
mergeValue = (acc: (Int, Int), score: Int) => (acc._1 + score, acc._2 + 1),
261
mergeCombiners = (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
262
).mapValues { case (sum, count) => sum.toDouble / count }
263
264
// Collect all scores per student into a list (more efficient than groupByKey)
265
val scoresPerStudent = scores.combineByKey(
266
createCombiner = (score: Int) => List(score),
267
mergeValue = (acc: List[Int], score: Int) => score :: acc,
268
mergeCombiners = (acc1: List[Int], acc2: List[Int]) => acc1 ::: acc2
269
)
270
```
271
272
### Partitioning for Performance
273
274
Understanding and controlling partitioning is crucial for performance in pair RDD operations.
275
276
```scala { .api }
277
/**
278
* Hash partitioner that distributes keys using hash function
279
*/
280
class HashPartitioner(partitions: Int) extends Partitioner {
281
def numPartitions: Int = partitions
282
def getPartition(key: Any): Int
283
}
284
285
/**
286
* Range partitioner that distributes keys by ranges
287
*/
288
class RangePartitioner[K : Ordering : ClassTag, V](
289
partitions: Int,
290
rdd: RDD[_ <: Product2[K, V]],
291
ascending: Boolean = true) extends Partitioner {
292
def numPartitions: Int = partitions
293
def getPartition(key: Any): Int
294
}
295
```
296
297
**Usage Examples:**
298
299
```scala
300
// Partition data for better join performance
301
val partitioner = new HashPartitioner(4)
302
val partitionedScores = scores.partitionBy(partitioner).persist()
303
val partitionedGrades = grades.partitionBy(partitioner).persist()
304
305
// Now joins will be much faster as data is co-located
306
val efficientJoin = partitionedScores.join(partitionedGrades)
307
308
// Range partitioner for sorted data
309
val sortedData = scores.sortByKey() // Uses RangePartitioner internally
310
```
311
312
### Join Performance Optimizations
313
314
Different join strategies for different data size scenarios:
315
316
```scala
317
// Broadcast join for small datasets (automatically optimized)
318
val smallGrades = grades.filter(_._2 == "A+") // Small RDD
319
val broadcastJoin = scores.join(smallGrades) // Spark may broadcast small RDD
320
321
// Manual broadcast for very small lookup tables
322
val gradeMap = sc.broadcast(grades.collectAsMap())
323
val manualBroadcastJoin = scores.map { case (student, score) =>
324
val grade = gradeMap.value.get(student)
325
(student, (score, grade))
326
}
327
328
// Bucket join for large datasets with known partitioning
329
val bucketedScores = scores.partitionBy(new HashPartitioner(10)).persist()
330
val bucketedGrades = grades.partitionBy(new HashPartitioner(10)).persist()
331
val bucketJoin = bucketedScores.join(bucketedGrades) // No shuffle needed
332
```
333
334
## Performance Considerations
335
336
### Choosing the Right Operation
337
- **`reduceByKey`** vs **`groupByKey`**: Always prefer `reduceByKey` when possible as it reduces data before shuffling
338
- **`aggregateByKey`** vs **`combineByKey`**: Use `aggregateByKey` for simpler cases, `combineByKey` for complex transformations
339
- **`cogroup`** vs multiple **joins**: Use `cogroup` when you need to join multiple RDDs by the same key
340
341
### Partitioning Strategy
342
- Use consistent partitioners across RDDs that will be joined
343
- Persist partitioned RDDs when they'll be reused
344
- Consider the number of unique keys when choosing partition count
345
- Use `RangePartitioner` for sorted operations, `HashPartitioner` for general use
346
347
### Memory Management
348
- Be careful with `groupByKey` on high-cardinality keys (can cause OOM)
349
- Use `mapValues` and `flatMapValues` to avoid shuffling keys
350
- Consider using `aggregateByKey` with smaller intermediate types
351
352
### Common Anti-patterns
353
- Using `groupByKey().mapValues(_.sum)` instead of `reduceByKey(_ + _)`
354
- Not partitioning RDDs that will be joined multiple times
355
- Using `collectAsMap()` on RDDs with duplicate keys (data loss)
356
- Performing multiple joins without consistent partitioning