0
# Key-Value Operations
1
2
Specialized operations available on RDDs of key-value pairs through implicit conversion to PairRDDFunctions. These operations provide powerful capabilities for grouping, joining, and aggregating data by keys, forming the foundation for many distributed data processing patterns.
3
4
## Capabilities
5
6
### Grouping Operations
7
8
Group values by key for subsequent processing.
9
10
```scala { .api }
11
// Available on RDD[(K, V)] via implicit conversion
12
def groupByKey(): RDD[(K, Iterable[V])]
13
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
14
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
15
16
// Group values by custom function
17
def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])]
18
def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterable[T])]
19
def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Iterable[T])]
20
```
21
22
**Usage Examples:**
23
```scala
24
val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("c", 5)))
25
26
// Group values by key
27
val grouped = pairs.groupByKey()
28
// Result: [("a", [1, 3]), ("b", [2, 4]), ("c", [5])]
29
30
// Group with specific number of partitions
31
val groupedPartitioned = pairs.groupByKey(numPartitions = 3)
32
33
// Group regular RDD by derived key
34
val numbers = sc.parallelize(1 to 10)
35
val evenOdd = numbers.groupBy(_ % 2)
36
// Result: [(0, [2, 4, 6, 8, 10]), (1, [1, 3, 5, 7, 9])]
37
```
38
39
### Reduction Operations
40
41
Efficiently reduce values by key without creating intermediate collections.
42
43
```scala { .api }
44
// Key-wise reduction
45
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
46
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
47
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
48
49
// Key-wise folding
50
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
51
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
52
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
53
```
54
55
**Usage Examples:**
56
```scala
57
val sales = sc.parallelize(Seq(
58
("product1", 100), ("product2", 200), ("product1", 150),
59
("product2", 300), ("product3", 50)
60
))
61
62
// Sum sales by product
63
val totalSales = sales.reduceByKey(_ + _)
64
// Result: [("product1", 250), ("product2", 500), ("product3", 50)]
65
66
// Find maximum sale per product
67
val maxSales = sales.reduceByKey(math.max)
68
// Result: [("product1", 150), ("product2", 300), ("product3", 50)]
69
70
// Fold with initial value (adds base amount to each product)
71
val salesWithBase = sales.foldByKey(10)(_ + _)
72
// Result: [("product1", 260), ("product2", 510), ("product3", 60)]
73
```
74
75
### Aggregation Operations
76
77
Flexible aggregation with different input and output types.
78
79
```scala { .api }
80
// General aggregation
81
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
82
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
83
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
84
85
// Combine values with different logic per key
86
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
87
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
88
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
89
```
90
91
**Usage Examples:**
92
```scala
93
val scores = sc.parallelize(Seq(
94
("Alice", 85), ("Bob", 90), ("Alice", 92), ("Bob", 87), ("Alice", 78)
95
))
96
97
// Calculate average score per student
98
case class ScoreStats(sum: Int, count: Int) {
99
def average: Double = sum.toDouble / count
100
}
101
102
val averages = scores.aggregateByKey(ScoreStats(0, 0))(
103
seqOp = (stats, score) => ScoreStats(stats.sum + score, stats.count + 1),
104
combOp = (s1, s2) => ScoreStats(s1.sum + s2.sum, s1.count + s2.count)
105
).mapValues(_.average)
106
// Result: [("Alice", 85.0), ("Bob", 88.5)]
107
108
// Using combineByKey for the same result
109
val averages2 = scores.combineByKey(
110
createCombiner = (score: Int) => ScoreStats(score, 1),
111
mergeValue = (stats: ScoreStats, score: Int) => ScoreStats(stats.sum + score, stats.count + 1),
112
mergeCombiners = (s1: ScoreStats, s2: ScoreStats) => ScoreStats(s1.sum + s2.sum, s1.count + s2.count)
113
).mapValues(_.average)
114
```
115
116
### Join Operations
117
118
Join RDDs by key using various join strategies.
119
120
```scala { .api }
121
// Inner joins
122
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
123
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
124
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
125
126
// Outer joins
127
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
128
def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]
129
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]
130
131
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
132
def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]
133
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]
134
135
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
136
def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]
137
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]
138
```
139
140
**Usage Examples:**
141
```scala
142
val users = sc.parallelize(Seq(("u1", "Alice"), ("u2", "Bob"), ("u3", "Charlie")))
143
val orders = sc.parallelize(Seq(("u1", "order1"), ("u2", "order2"), ("u4", "order3")))
144
145
// Inner join: only matching keys
146
val userOrders = users.join(orders)
147
// Result: [("u1", ("Alice", "order1")), ("u2", ("Bob", "order2"))]
148
149
// Left outer join: all users, matched orders
150
val allUsersWithOrders = users.leftOuterJoin(orders)
151
// Result: [("u1", ("Alice", Some("order1"))), ("u2", ("Bob", Some("order2"))), ("u3", ("Charlie", None))]
152
153
// Right outer join: all orders, matched users
154
val allOrdersWithUsers = users.rightOuterJoin(orders)
155
// Result: [("u1", (Some("Alice"), "order1")), ("u2", (Some("Bob"), "order2")), ("u4", (None, "order3"))]
156
157
// Full outer join: all users and orders
158
val fullJoin = users.fullOuterJoin(orders)
159
// Result: [("u1", (Some("Alice"), Some("order1"))), ("u2", (Some("Bob"), Some("order2"))),
160
// ("u3", (Some("Charlie"), None)), ("u4", (None, Some("order3")))]
161
```
162
163
### Cogroup Operations
164
165
Group multiple RDDs by key, creating Cartesian-style groupings.
166
167
```scala { .api }
168
// Two RDD cogroup
169
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
170
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
171
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]
172
173
// Three RDD cogroup
174
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
175
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
176
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
177
178
// Four RDD cogroup
179
def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
180
```
181
182
**Usage Examples:**
183
```scala
184
val rdd1 = sc.parallelize(Seq(("k1", "a"), ("k2", "b"), ("k1", "c")))
185
val rdd2 = sc.parallelize(Seq(("k1", 1), ("k3", 2), ("k1", 3)))
186
187
// Cogroup creates grouped iterables for each key
188
val cogrouped = rdd1.cogroup(rdd2)
189
// Result: [("k1", (["a", "c"], [1, 3])), ("k2", (["b"], [])), ("k3", ([], [2]))]
190
191
// Useful for implementing custom join logic
192
val customJoin = cogrouped.flatMap { case (key, (values1, values2)) =>
193
if (values1.nonEmpty && values2.nonEmpty) {
194
for (v1 <- values1; v2 <- values2) yield (key, s"$v1-$v2")
195
} else {
196
Seq.empty
197
}
198
}
199
```
200
201
### Key and Value Operations
202
203
Extract and transform keys and values independently.
204
205
```scala { .api }
206
// Key/value extraction
207
def keys: RDD[K]
208
def values: RDD[V]
209
210
// Value transformations (preserving keys)
211
def mapValues[U](f: V => U): RDD[(K, U)]
212
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]
213
```
214
215
**Usage Examples:**
216
```scala
217
val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))
218
219
// Extract keys and values
220
val keys = pairs.keys // ["a", "b", "c"]
221
val values = pairs.values // [1, 2, 3]
222
223
// Transform values only
224
val doubled = pairs.mapValues(_ * 2)
225
// Result: [("a", 2), ("b", 4), ("c", 6)]
226
227
// Flat map values
228
val wordsWithCounts = sc.parallelize(Seq(("line1", "hello world"), ("line2", "scala spark")))
229
val wordCounts = wordsWithCounts.flatMapValues(_.split(" "))
230
// Result: [("line1", "hello"), ("line1", "world"), ("line2", "scala"), ("line2", "spark")]
231
```
232
233
### Collection Actions
234
235
Collect key-value data as maps and count operations.
236
237
```scala { .api }
238
// Collection as map (use with caution for large datasets)
239
def collectAsMap(): Map[K, V]
240
241
// Counting operations
242
def countByKey(): Map[K, Long]
243
def countApproxDistinctByKey(relativeSD: Double = 0.05): Map[K, Long]
244
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): Map[K, Long]
245
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): Map[K, Long]
246
```
247
248
**Usage Examples:**
249
```scala
250
val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("c", 4)))
251
252
// Convert to map (last value for each key wins)
253
val asMap = pairs.collectAsMap()
254
// Result: Map("a" -> 3, "b" -> 2, "c" -> 4)
255
256
// Count occurrences of each key
257
val keyCounts = pairs.countByKey()
258
// Result: Map("a" -> 2, "b" -> 1, "c" -> 1)
259
260
// Approximate distinct count for large datasets
261
val distinctCounts = pairs.countApproxDistinctByKey(relativeSD = 0.1)
262
```
263
264
### Sampling Operations
265
266
Stratified sampling operations for key-value RDDs.
267
268
```scala { .api }
269
// Stratified sampling
270
def sampleByKey(withReplacement: Boolean, fractions: Map[K, Double], seed: Long = Utils.random.nextLong): RDD[(K, V)]
271
def sampleByKeyExact(withReplacement: Boolean, fractions: Map[K, Double], seed: Long = Utils.random.nextLong): RDD[(K, V)]
272
```
273
274
**Usage Examples:**
275
```scala
276
val data = sc.parallelize(Seq(
277
("cat", "fluffy"), ("cat", "whiskers"), ("cat", "mittens"),
278
("dog", "buddy"), ("dog", "max"), ("dog", "bella"),
279
("bird", "tweet"), ("bird", "chirp")
280
))
281
282
// Sample different fractions for each key
283
val fractions = Map("cat" -> 0.5, "dog" -> 0.3, "bird" -> 1.0)
284
val sampled = data.sampleByKey(withReplacement = false, fractions)
285
286
// Exact stratified sampling (guarantees exact sample sizes)
287
val exactSampled = data.sampleByKeyExact(withReplacement = false, fractions)
288
```
289
290
## Partitioning for Key-Value RDDs
291
292
### Hash Partitioning
293
294
Default partitioning strategy using key hash codes.
295
296
```scala { .api }
297
case class HashPartitioner(partitions: Int) extends Partitioner {
298
def numPartitions: Int = partitions
299
def getPartition(key: Any): Int
300
}
301
302
// Apply hash partitioning
303
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
304
```
305
306
**Usage Example:**
307
```scala
308
val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("a", 4)))
309
310
// Partition by hash with 4 partitions
311
val hashPartitioned = pairs.partitionBy(new HashPartitioner(4))
312
313
// Check partitioning
314
println(s"Partitioner: ${hashPartitioned.partitioner}")
315
```
316
317
### Range Partitioning
318
319
Partition keys by ranges for ordered data.
320
321
```scala { .api }
322
case class RangePartitioner[K: Ordering: ClassTag, V](
323
partitions: Int,
324
rdd: RDD[_ <: Product2[K, V]],
325
ascending: Boolean = true,
326
samplePointsPerPartitionHint: Int = 20
327
) extends Partitioner
328
```
329
330
**Usage Example:**
331
```scala
332
val pairs = sc.parallelize(Seq((1, "a"), (5, "b"), (3, "c"), (8, "d"), (2, "e")))
333
334
// Range partition to keep keys in order across partitions
335
val rangePartitioned = pairs.partitionBy(new RangePartitioner(3, pairs))
336
```
337
338
## Types
339
340
```scala { .api }
341
// Implicit conversion adds these methods to RDD[(K, V)]
342
class PairRDDFunctions[K, V](self: RDD[(K, V)])
343
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
344
345
// Partitioning strategies
346
abstract class Partitioner extends Serializable {
347
def numPartitions: Int
348
def getPartition(key: Any): Int
349
}
350
351
case class HashPartitioner(partitions: Int) extends Partitioner
352
case class RangePartitioner[K: Ordering: ClassTag, V](
353
partitions: Int,
354
rdd: RDD[_ <: Product2[K, V]]
355
) extends Partitioner
356
357
// Dependency for shuffle operations
358
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
359
@transient _rdd: RDD[_ <: Product2[K, V]],
360
partitioner: Partitioner
361
) extends Dependency[Product2[K, V]]
362
```