0
# Key-Value Operations
1
2
Key-value operations in Spark are performed on RDDs of type `RDD[(K, V)]` where K is the key type and V is the value type. These operations are made available through implicit conversions to `PairRDDFunctions`, providing powerful aggregation, grouping, and join capabilities.
3
4
## PairRDDFunctions
5
6
```scala { .api }
7
class PairRDDFunctions[K, V](self: RDD[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) {
8
// Basic Key-Value Operations
9
def keys: RDD[K]
10
def values: RDD[V]
11
def mapValues[U](f: V => U): RDD[(K, U)]
12
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]
13
def swapKV()(implicit vt: ClassTag[V], kt: ClassTag[K]): RDD[(V, K)]
14
15
// Grouping Operations
16
def groupByKey(): RDD[(K, Iterable[V])]
17
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
18
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
19
20
// Reduction Operations
21
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
22
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
23
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
24
def reduceByKeyLocally(func: (V, V) => V): Map[K, V]
25
26
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
27
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
28
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
29
30
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
31
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
32
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
33
34
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
35
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner): RDD[(K, C)]
36
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
37
38
// Join Operations
39
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
40
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
41
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
42
43
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
44
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]
45
def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]
46
47
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
48
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]
49
def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]
50
51
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
52
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]
53
def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]
54
55
// Cogroup Operations
56
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
57
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]
58
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
59
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
60
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
61
def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
62
def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
63
64
// Sorting Operations
65
def sortByKey(ascending: Boolean = true): RDD[(K, V)]
66
def sortByKey(ascending: Boolean, numPartitions: Int): RDD[(K, V)]
67
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]
68
69
// Partitioning Operations
70
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
71
72
// Collection Operations
73
def collectAsMap(): Map[K, V]
74
def countByKey(): Map[K, Long]
75
def countByKeyApprox(timeout: Long, confidence: Double = 0.95): PartialResult[Map[K, BoundedDouble]]
76
77
// Lookup Operations
78
def lookup(key: K): Seq[V]
79
80
// Subtraction Operations
81
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
82
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
83
def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]
84
85
// Sampling Operations
86
def sampleByKey(withReplacement: Boolean, fractions: Map[K, Double], seed: Long = Utils.random.nextLong): RDD[(K, V)]
87
def sampleByKeyExact(withReplacement: Boolean, fractions: Map[K, Double], seed: Long = Utils.random.nextLong): RDD[(K, V)]
88
}
89
```
90
91
## Basic Key-Value Operations
92
93
### Creating Key-Value RDDs
94
95
```scala
96
import org.apache.spark.{SparkContext, SparkConf}
97
98
val sc = new SparkContext(new SparkConf().setAppName("Key-Value Examples").setMaster("local[*]"))
99
100
// Create key-value RDD from collections
101
val pairs = sc.parallelize(Seq(
102
("apple", 5), ("banana", 3), ("apple", 2), ("orange", 8), ("banana", 1)
103
))
104
105
// Transform regular RDD to key-value RDD
106
val words = sc.parallelize(Seq("hello", "world", "hello", "spark", "world"))
107
val wordPairs = words.map(word => (word, 1))
108
109
// From text files
110
val lines = sc.textFile("access.log")
111
val urlCounts = lines.map { line =>
112
val parts = line.split(" ")
113
val url = parts(6) // Assuming URL is 7th field
114
(url, 1)
115
}
116
```
117
118
### Basic Operations
119
120
```scala
121
// Extract keys and values
122
val keys = pairs.keys.collect() // Array("apple", "banana", "apple", "orange", "banana")
123
val values = pairs.values.collect() // Array(5, 3, 2, 8, 1)
124
125
// Transform values while preserving keys
126
val discountedPrices = pairs.mapValues(_ * 0.9)
127
128
// Transform values to multiple values
129
val inventory = sc.parallelize(Seq(
130
("electronics", "laptop,phone,tablet"),
131
("books", "fiction,non-fiction,textbook")
132
))
133
val expandedInventory = inventory.flatMapValues(_.split(","))
134
135
// Swap keys and values
136
val swapped = pairs.swapKV()
137
```
138
139
## Aggregation Operations
140
141
### GroupByKey
142
143
Groups values by key. Use with caution for large datasets as it can cause memory issues.
144
145
```scala
146
// Group all values by key
147
val grouped = pairs.groupByKey()
148
// Result: ("apple", Iterable(5, 2)), ("banana", Iterable(3, 1)), ("orange", Iterable(8))
149
150
// Process grouped data
151
val processed = grouped.mapValues { values =>
152
val sum = values.sum
153
val count = values.size
154
val avg = sum.toDouble / count
155
(sum, count, avg)
156
}
157
```
158
159
### ReduceByKey
160
161
More efficient than groupByKey for aggregation as it performs local reduction.
162
163
```scala
164
// Sum values by key
165
val totals = pairs.reduceByKey(_ + _)
166
// Result: ("apple", 7), ("banana", 4), ("orange", 8)
167
168
// Find maximum by key
169
val maxValues = pairs.reduceByKey(math.max)
170
171
// Concatenate strings by key
172
val textData = sc.parallelize(Seq(
173
("user1", "hello"), ("user2", "hi"), ("user1", "world"), ("user2", "there")
174
))
175
val concatenated = textData.reduceByKey(_ + " " + _)
176
```
177
178
### FoldByKey
179
180
Like reduceByKey but with an initial zero value.
181
182
```scala
183
// Sum with initial value
184
val foldedSums = pairs.foldByKey(0)(_ + _)
185
186
// Concatenate with separator
187
val messages = sc.parallelize(Seq(
188
("error", "Connection failed"), ("error", "Timeout"), ("info", "Started"), ("info", "Completed")
189
))
190
val logMessages = messages.foldByKey("")((acc, msg) => if (acc.isEmpty) msg else acc + "; " + msg)
191
```
192
193
### AggregateByKey
194
195
Most flexible aggregation operation with different types for input and output.
196
197
```scala
198
// Calculate statistics (count, sum, sum of squares) for each key
199
val numbers = sc.parallelize(Seq(
200
("math", 85), ("math", 92), ("math", 78), ("science", 88), ("science", 95)
201
))
202
203
case class Stats(count: Int, sum: Double, sumSquares: Double) {
204
def mean = sum / count
205
def variance = (sumSquares / count) - (mean * mean)
206
}
207
208
val stats = numbers.aggregateByKey(Stats(0, 0.0, 0.0))(
209
seqOp = (stats, value) => Stats(
210
stats.count + 1,
211
stats.sum + value,
212
stats.sumSquares + value * value
213
),
214
combOp = (stats1, stats2) => Stats(
215
stats1.count + stats2.count,
216
stats1.sum + stats2.sum,
217
stats1.sumSquares + stats2.sumSquares
218
)
219
)
220
221
// Collect unique values per key
222
val data = sc.parallelize(Seq(
223
("A", 1), ("A", 2), ("A", 1), ("B", 3), ("B", 4), ("B", 3)
224
))
225
val uniqueValues = data.aggregateByKey(Set.empty[Int])(
226
seqOp = (set, value) => set + value,
227
combOp = (set1, set2) => set1 ++ set2
228
)
229
```
230
231
### CombineByKey
232
233
The most general aggregation function - other aggregation functions are implemented using this.
234
235
```scala
236
// Calculate average by key
237
val scores = sc.parallelize(Seq(
238
("Alice", 85), ("Bob", 92), ("Alice", 78), ("Bob", 88), ("Alice", 95)
239
))
240
241
val averages = scores.combineByKey(
242
createCombiner = (score: Int) => (score, 1), // (sum, count)
243
mergeValue = (acc: (Int, Int), score: Int) => (acc._1 + score, acc._2 + 1),
244
mergeCombiners = (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
245
).mapValues { case (sum, count) => sum.toDouble / count }
246
```
247
248
## Join Operations
249
250
### Inner Join
251
252
Returns pairs where keys exist in both RDDs.
253
254
```scala
255
val customers = sc.parallelize(Seq(
256
(1, "Alice"), (2, "Bob"), (3, "Charlie"), (4, "David")
257
))
258
259
val orders = sc.parallelize(Seq(
260
(1, "laptop"), (2, "phone"), (1, "mouse"), (5, "tablet")
261
))
262
263
// Inner join - only matching keys
264
val customerOrders = customers.join(orders)
265
// Result: (1, ("Alice", "laptop")), (1, ("Alice", "mouse")), (2, ("Bob", "phone"))
266
```
267
268
### Outer Joins
269
270
```scala
271
// Left outer join - all keys from left RDD
272
val leftJoin = customers.leftOuterJoin(orders)
273
// Result includes: (4, ("David", None))
274
275
// Right outer join - all keys from right RDD
276
val rightJoin = customers.rightOuterJoin(orders)
277
// Result includes: (5, (None, "tablet"))
278
279
// Full outer join - all keys from both RDDs
280
val fullJoin = customers.fullOuterJoin(orders)
281
// Result includes both: (4, (Some("David"), None)) and (5, (None, Some("tablet")))
282
```
283
284
### Complex Join Example
285
286
```scala
287
// Multi-way joins
288
val products = sc.parallelize(Seq(
289
(1, "Laptop"), (2, "Phone"), (3, "Tablet")
290
))
291
292
val prices = sc.parallelize(Seq(
293
(1, 999.99), (2, 499.99), (3, 299.99)
294
))
295
296
val inventory = sc.parallelize(Seq(
297
(1, 50), (2, 100), (3, 25)
298
))
299
300
// Chain joins for comprehensive product information
301
val productInfo = products
302
.join(prices)
303
.join(inventory)
304
.map { case (id, ((name, price), stock)) =>
305
(id, name, price, stock)
306
}
307
```
308
309
## Cogroup Operations
310
311
Cogroup groups data from multiple RDDs by key.
312
313
```scala
314
val rdd1 = sc.parallelize(Seq((1, "a"), (1, "b"), (2, "c")))
315
val rdd2 = sc.parallelize(Seq((1, "x"), (2, "y"), (2, "z"), (3, "w")))
316
317
// Cogroup two RDDs
318
val cogrouped = rdd1.cogroup(rdd2)
319
// Result: (1, (Iterable("a", "b"), Iterable("x")))
320
// (2, (Iterable("c"), Iterable("y", "z")))
321
// (3, (Iterable(), Iterable("w")))
322
323
// Process cogrouped data
324
val processed = cogrouped.mapValues { case (iter1, iter2) =>
325
val list1 = iter1.toList
326
val list2 = iter2.toList
327
(list1.size, list2.size, list1 ++ list2)
328
}
329
330
// Three-way cogroup
331
val rdd3 = sc.parallelize(Seq((1, "p"), (3, "q")))
332
val threeway = rdd1.cogroup(rdd2, rdd3)
333
```
334
335
## Sorting and Partitioning
336
337
### SortByKey
338
339
```scala
340
val unsorted = sc.parallelize(Seq(
341
("banana", 3), ("apple", 5), ("orange", 1), ("apple", 2)
342
))
343
344
// Sort by key ascending (default)
345
val sortedAsc = unsorted.sortByKey()
346
347
// Sort by key descending
348
val sortedDesc = unsorted.sortByKey(ascending = false)
349
350
// Sort with custom number of partitions
351
val sortedPartitioned = unsorted.sortByKey(ascending = true, numPartitions = 4)
352
```
353
354
### Custom Partitioning
355
356
```scala
357
import org.apache.spark.{HashPartitioner, RangePartitioner}
358
359
// Hash partitioning
360
val hashPartitioned = pairs.partitionBy(new HashPartitioner(4))
361
362
// Range partitioning (for sorted data)
363
val rangePartitioned = pairs.partitionBy(new RangePartitioner(4, pairs))
364
365
// Repartition and sort within partitions (more efficient than sortByKey)
366
val repartitionedAndSorted = pairs.repartitionAndSortWithinPartitions(new HashPartitioner(4))
367
368
// Custom partitioner
369
class DomainPartitioner(numPartitions: Int) extends org.apache.spark.Partitioner {
370
def numPartitions: Int = numPartitions
371
def getPartition(key: Any): Int = {
372
key.toString.hashCode % numPartitions match {
373
case partition if partition < 0 => partition + numPartitions
374
case partition => partition
375
}
376
}
377
}
378
379
val customPartitioned = pairs.partitionBy(new DomainPartitioner(8))
380
```
381
382
## Collection and Lookup Operations
383
384
```scala
385
// Collect as map (for small datasets)
386
val asMap = pairs.collectAsMap()
387
388
// Count by key
389
val keyCounts = pairs.countByKey()
390
391
// Lookup values for specific key
392
val appleValues = pairs.lookup("apple") // Seq(5, 2)
393
394
// Approximate count by key
395
val approxCounts = pairs.countByKeyApprox(timeout = 1000L)
396
```
397
398
## Advanced Patterns
399
400
### Window Operations
401
402
```scala
403
// Time-based window operations (assuming timestamp keys)
404
val timestampData = sc.parallelize(Seq(
405
(100L, "event1"), (150L, "event2"), (200L, "event3"), (250L, "event4")
406
))
407
408
// Group by time windows (e.g., 100ms windows)
409
val windowed = timestampData.map { case (timestamp, event) =>
410
val window = (timestamp / 100) * 100
411
(window, event)
412
}.groupByKey()
413
```
414
415
### Top-K by Key
416
417
```scala
418
val keyValueScores = sc.parallelize(Seq(
419
("user1", 95), ("user1", 87), ("user1", 92), ("user1", 78),
420
("user2", 88), ("user2", 91), ("user2", 85)
421
))
422
423
// Get top 2 scores per user
424
val topScores = keyValueScores
425
.groupByKey()
426
.mapValues(scores => scores.toSeq.sorted(Ordering.Int.reverse).take(2))
427
```
428
429
### Efficient Large Joins
430
431
```scala
432
// For large datasets, consider pre-partitioning both RDDs
433
val partitioner = new HashPartitioner(100)
434
435
val largeRDD1 = customers.partitionBy(partitioner).persist()
436
val largeRDD2 = orders.partitionBy(partitioner).persist()
437
438
// Join will be more efficient as data is co-located
439
val efficientJoin = largeRDD1.join(largeRDD2)
440
```
441
442
### Sampling Operations
443
444
```scala
445
// Stratified sampling by key
446
val userData = sc.parallelize(Seq(
447
("premium", "user1"), ("premium", "user2"), ("premium", "user3"),
448
("basic", "user4"), ("basic", "user5"), ("basic", "user6"), ("basic", "user7")
449
))
450
451
// Sample different fractions by key
452
val sampleFractions = Map("premium" -> 0.8, "basic" -> 0.3)
453
454
// Approximate sampling
455
val approxSample = userData.sampleByKey(withReplacement = false, sampleFractions, seed = 42)
456
457
// Exact sampling (guarantees exact sample sizes)
458
val exactSample = userData.sampleByKeyExact(withReplacement = false, sampleFractions, seed = 42)
459
```
460
461
### Broadcast Hash Join Pattern
462
463
```scala
464
// When one RDD is small, broadcast it for efficient joins
465
val smallLookupTable = Map(1 -> "Category A", 2 -> "Category B", 3 -> "Category C")
466
val broadcastLookup = sc.broadcast(smallLookupTable)
467
468
val enrichedData = largeRDD.map { case (id, data) =>
469
val category = broadcastLookup.value.getOrElse(id, "Unknown")
470
(id, data, category)
471
}
472
```