0
# Key-Value Operations
1
2
When working with RDDs of (key, value) pairs, Spark provides specialized operations through implicit conversions. These operations are available when you import `org.apache.spark.SparkContext._` and work with RDDs of type `RDD[(K, V)]`.
3
4
## PairRDDFunctions
5
6
```scala { .api }
7
class PairRDDFunctions[K, V](self: RDD[(K, V)]) extends Logging with Serializable {
8
// Available through implicit conversion when importing org.apache.spark.SparkContext._
9
}
10
```
11
12
## Setup and Imports
13
14
```scala { .api }
15
import org.apache.spark.SparkContext._ // Essential for PairRDDFunctions
16
import org.apache.spark.rdd.RDD
17
```
18
19
```scala
20
// Creating pair RDDs
21
val pairs: RDD[(String, Int)] = sc.parallelize(Seq(
22
("apple", 5), ("banana", 3), ("apple", 2), ("orange", 1)
23
))
24
25
val wordCounts: RDD[(String, Int)] = sc.textFile("file.txt")
26
.flatMap(_.split(" "))
27
.map(word => (word, 1))
28
```
29
30
## Aggregation Operations
31
32
### combineByKey
33
34
The most general aggregation function - all other aggregation operations are built on top of this:
35
36
```scala { .api }
37
def combineByKey[C](
38
createCombiner: V => C,
39
mergeValue: (C, V) => C,
40
mergeCombiners: (C, C) => C,
41
partitioner: Partitioner,
42
mapSideCombine: Boolean = true,
43
serializer: Serializer = null
44
): RDD[(K, C)]
45
46
// Convenience methods with default partitioner
47
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
48
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
49
```
50
51
```scala
52
// Calculate average per key using combineByKey
53
val data = sc.parallelize(Seq(("math", 85), ("english", 90), ("math", 92), ("english", 88)))
54
55
val averages = data.combineByKey(
56
(score: Int) => (score, 1), // createCombiner: V => (sum, count)
57
(acc: (Int, Int), score: Int) => (acc._1 + score, acc._2 + 1), // mergeValue
58
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // mergeCombiners
59
).mapValues { case (sum, count) => sum.toDouble / count }
60
61
// Result: [("math", 88.5), ("english", 89.0)]
62
```
63
64
### reduceByKey
65
66
Combine values with the same key using an associative function:
67
68
```scala { .api }
69
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
70
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
71
def reduceByKey(numPartitions: Int, func: (V, V) => V): RDD[(K, V)]
72
```
73
74
```scala
75
val wordCounts = sc.textFile("document.txt")
76
.flatMap(_.split(" "))
77
.map(word => (word, 1))
78
.reduceByKey(_ + _) // Sum counts for each word
79
80
// With custom partitioner
81
val counts = data.reduceByKey(new HashPartitioner(4), _ + _)
82
```
83
84
### reduceByKeyLocally
85
86
Reduce by key and return results to driver as a Map:
87
88
```scala { .api }
89
def reduceByKeyLocally(func: (V, V) => V): Map[K, V]
90
```
91
92
```scala
93
val localCounts: Map[String, Int] = wordCounts.reduceByKeyLocally(_ + _)
94
// Returns a local Map instead of an RDD
95
```
96
97
### aggregateByKey
98
99
Aggregate values of each key with different result type:
100
101
```scala { .api }
102
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
103
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
104
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
105
```
106
107
```scala
108
// Calculate max, min, and count per key
109
val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 1), ("a", 2)))
110
111
val stats = data.aggregateByKey((Int.MinValue, Int.MaxValue, 0))(
112
// seqOp: combine value with accumulator
113
(acc, value) => (math.max(acc._1, value), math.min(acc._2, value), acc._3 + 1),
114
// combOp: combine accumulators
115
(acc1, acc2) => (math.max(acc1._1, acc2._1), math.min(acc1._2, acc2._2), acc1._3 + acc2._3)
116
)
117
// Result: [("a", (3, 1, 3)), ("b", (2, 1, 2))] - (max, min, count)
118
```
119
120
### foldByKey
121
122
Fold values for each key with a zero value:
123
124
```scala { .api }
125
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
126
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
127
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
128
```
129
130
```scala
131
val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))
132
val sums = data.foldByKey(0)(_ + _)
133
// Result: [("a", 4), ("b", 2)]
134
```
135
136
### groupByKey
137
138
Group values by key (use with caution for large datasets):
139
140
```scala { .api }
141
def groupByKey(): RDD[(K, Iterable[V])]
142
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
143
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
144
```
145
146
```scala
147
val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 4)))
148
val grouped = data.groupByKey()
149
// Result: [("a", [1, 3]), ("b", [2, 4])]
150
151
// Note: groupByKey can cause out-of-memory errors for keys with many values
152
// Consider using reduceByKey, aggregateByKey, or combineByKey instead
153
```
154
155
## Join Operations
156
157
### join (Inner Join)
158
159
```scala { .api }
160
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
161
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
162
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
163
```
164
165
```scala
166
val rdd1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))
167
val rdd2 = sc.parallelize(Seq(("a", "x"), ("b", "y"), ("d", "z")))
168
169
val joined = rdd1.join(rdd2)
170
// Result: [("a", (1, "x")), ("b", (2, "y"))]
171
// Note: "c" and "d" are excluded (inner join)
172
```
173
174
### leftOuterJoin
175
176
```scala { .api }
177
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
178
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]
179
def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]
180
```
181
182
```scala
183
val leftJoined = rdd1.leftOuterJoin(rdd2)
184
// Result: [("a", (1, Some("x"))), ("b", (2, Some("y"))), ("c", (3, None))]
185
```
186
187
### rightOuterJoin
188
189
```scala { .api }
190
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
191
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]
192
def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]
193
```
194
195
```scala
196
val rightJoined = rdd1.rightOuterJoin(rdd2)
197
// Result: [("a", (Some(1), "x")), ("b", (Some(2), "y")), ("d", (None, "z"))]
198
```
199
200
### fullOuterJoin
201
202
```scala { .api }
203
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
204
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]
205
def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]
206
```
207
208
```scala
209
val fullJoined = rdd1.fullOuterJoin(rdd2)
210
// Result: [("a", (Some(1), Some("x"))), ("b", (Some(2), Some("y"))),
211
// ("c", (Some(3), None)), ("d", (None, Some("z")))]
212
```
213
214
## Cogroup Operations
215
216
### cogroup (Group Together)
217
218
```scala { .api }
219
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
220
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]
221
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
222
223
// Multi-way cogroup
224
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
225
def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
226
```
227
228
```scala
229
val rdd1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 3)))
230
val rdd2 = sc.parallelize(Seq(("a", "x"), ("c", "y")))
231
232
val cogrouped = rdd1.cogroup(rdd2)
233
// Result: [("a", ([1, 2], ["x"])), ("b", ([3], [])), ("c", ([], ["y"]))]
234
235
// Three-way cogroup
236
val rdd3 = sc.parallelize(Seq(("a", 10.0), ("b", 20.0)))
237
val threeway = rdd1.cogroup(rdd2, rdd3)
238
```
239
240
## Key and Value Transformations
241
242
### keys and values
243
244
```scala { .api }
245
def keys: RDD[K] // Extract all keys
246
def values: RDD[V] // Extract all values
247
```
248
249
```scala
250
val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))
251
val allKeys = pairs.keys // RDD["a", "b", "a"]
252
val allValues = pairs.values // RDD[1, 2, 3]
253
```
254
255
### mapValues
256
257
Transform values while preserving keys and partitioning:
258
259
```scala { .api }
260
def mapValues[U](f: V => U): RDD[(K, U)]
261
```
262
263
```scala
264
val pairs = sc.parallelize(Seq(("alice", 25), ("bob", 30), ("charlie", 35)))
265
val incremented = pairs.mapValues(_ + 1)
266
// Result: [("alice", 26), ("bob", 31), ("charlie", 36)]
267
268
// mapValues preserves partitioning, unlike map
269
val transformed = pairs.mapValues(age => if (age >= 30) "adult" else "young")
270
```
271
272
### flatMapValues
273
274
FlatMap values while preserving keys:
275
276
```scala { .api }
277
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]
278
```
279
280
```scala
281
val sentences = sc.parallelize(Seq(
282
("doc1", "hello world"),
283
("doc2", "spark scala")
284
))
285
286
val words = sentences.flatMapValues(_.split(" "))
287
// Result: [("doc1", "hello"), ("doc1", "world"), ("doc2", "spark"), ("doc2", "scala")]
288
```
289
290
## Partitioning and Sorting
291
292
### partitionBy
293
294
Partition RDD according to a partitioner:
295
296
```scala { .api }
297
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
298
```
299
300
```scala
301
import org.apache.spark.HashPartitioner
302
303
val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("d", 4)))
304
val partitioned = data.partitionBy(new HashPartitioner(2))
305
306
// Custom partitioner
307
class CustomPartitioner(numPartitions: Int) extends Partitioner {
308
def numPartitions: Int = numPartitions
309
def getPartition(key: Any): Int = key.hashCode() % numPartitions
310
}
311
```
312
313
### sortByKey
314
315
Sort by key:
316
317
```scala { .api }
318
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
319
```
320
321
```scala
322
val data = sc.parallelize(Seq(("c", 3), ("a", 1), ("b", 2)))
323
val sorted = data.sortByKey() // Ascending: [("a", 1), ("b", 2), ("c", 3)]
324
val descending = data.sortByKey(ascending = false) // Descending: [("c", 3), ("b", 2), ("a", 1)]
325
```
326
327
### repartitionAndSortWithinPartitions
328
329
More efficient than separate repartition and sort:
330
331
```scala { .api }
332
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]
333
```
334
335
```scala
336
val data = sc.parallelize(Seq(("c", 3), ("a", 1), ("b", 2), ("d", 4)))
337
val repartitioned = data.repartitionAndSortWithinPartitions(new HashPartitioner(2))
338
// Partitions data and sorts within each partition in one operation
339
```
340
341
## Set Operations
342
343
### subtractByKey
344
345
Return (key, value) pairs in this RDD but not in other:
346
347
```scala { .api }
348
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
349
def subtractByKey[W: ClassTag](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, V)]
350
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
351
```
352
353
```scala
354
val rdd1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))
355
val rdd2 = sc.parallelize(Seq(("a", "x"), ("c", "y")))
356
357
val subtracted = rdd1.subtractByKey(rdd2)
358
// Result: [("b", 2)] - only pairs whose keys don't exist in rdd2
359
```
360
361
## Statistical Operations
362
363
### countByKey
364
365
Count the number of elements for each key:
366
367
```scala { .api }
368
def countByKey(): Map[K, Long]
369
```
370
371
```scala
372
val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5)))
373
val counts = data.countByKey()
374
// Result: Map("a" -> 3, "b" -> 2)
375
```
376
377
### countByKeyApprox
378
379
Approximate count by key:
380
381
```scala { .api }
382
def countByKeyApprox(timeout: Long, confidence: Double = 0.95): PartialResult[Map[K, BoundedDouble]]
383
```
384
385
```scala
386
val largePairRDD = sc.parallelize(/* large dataset */)
387
val approxCounts = largePairRDD.countByKeyApprox(1000) // 1 second timeout
388
```
389
390
### collectAsMap
391
392
Return the key-value pairs as a Map (assumes unique keys):
393
394
```scala { .api }
395
def collectAsMap(): Map[K, V]
396
```
397
398
```scala
399
val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))
400
val map = pairs.collectAsMap()
401
// Result: Map("a" -> 1, "b" -> 2, "c" -> 3)
402
403
// Warning: If there are duplicate keys, only one value per key is kept
404
```
405
406
## Save Operations
407
408
### saveAsHadoopFile
409
410
Save as Hadoop file with various output formats:
411
412
```scala { .api }
413
def saveAsHadoopFile[F <: OutputFormat[K, V]](
414
path: String,
415
keyClass: Class[_],
416
valueClass: Class[_],
417
outputFormatClass: Class[F],
418
codec: Class[_ <: CompressionCodec]
419
): Unit
420
421
// Simplified versions
422
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String): Unit
423
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, codec: Class[_ <: CompressionCodec]): Unit
424
```
425
426
### saveAsNewAPIHadoopFile
427
428
Save with new Hadoop API:
429
430
```scala { .api }
431
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](
432
path: String,
433
keyClass: Class[_],
434
valueClass: Class[_],
435
outputFormatClass: Class[F],
436
conf: Configuration = self.context.hadoopConfiguration
437
): Unit
438
```
439
440
### saveAsSequenceFile
441
442
Save as Hadoop SequenceFile:
443
444
```scala { .api }
445
def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit
446
```
447
448
```scala
449
import org.apache.hadoop.io.{IntWritable, Text}
450
451
// For writable types
452
val writablePairs: RDD[(IntWritable, Text)] = pairs.map {
453
case (k, v) => (new IntWritable(k), new Text(v.toString))
454
}
455
writablePairs.saveAsSequenceFile("hdfs://path/to/output")
456
```
457
458
### saveAsHadoopDataset
459
460
Save using Hadoop JobConf:
461
462
```scala { .api }
463
def saveAsHadoopDataset(conf: JobConf): Unit
464
```
465
466
```scala
467
import org.apache.hadoop.mapred.{JobConf, TextOutputFormat}
468
469
val jobConf = new JobConf()
470
jobConf.setOutputFormat(classOf[TextOutputFormat[String, Int]])
471
jobConf.setOutputKeyClass(classOf[String])
472
jobConf.setOutputValueClass(classOf[Int])
473
474
pairs.saveAsHadoopDataset(jobConf)
475
```
476
477
## Performance Considerations
478
479
1. **Prefer `reduceByKey` over `groupByKey`**: `reduceByKey` combines values locally before shuffling
480
2. **Use appropriate partitioners**: Custom partitioners can reduce shuffle overhead
481
3. **Consider `mapValues`**: Preserves partitioning, more efficient than `map`
482
4. **Avoid `collectAsMap` on large datasets**: Brings all data to driver
483
5. **Use `combineByKey` for complex aggregations**: Most flexible and efficient
484
485
```scala
486
// Efficient pattern
487
val wordCounts = textFile
488
.flatMap(_.split(" "))
489
.map((_, 1))
490
.reduceByKey(_ + _) // Combines locally before shuffle
491
492
// Less efficient pattern
493
val wordCountsSlow = textFile
494
.flatMap(_.split(" "))
495
.map((_, 1))
496
.groupByKey() // Shuffles all values
497
.mapValues(_.sum) // Then combines
498
```