0
# Key-Value Operations
1
2
Specialized operations for RDDs containing key-value pairs, providing powerful data processing capabilities including joins, grouping, and aggregation operations.
3
4
## Capabilities
5
6
### PairRDDFunctions
7
8
Extra functions available on RDDs of (key, value) pairs through implicit conversion.
9
10
```scala { .api }
11
/**
12
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion
13
*/
14
class PairRDDFunctions[K, V](self: RDD[(K, V)]) {
15
// Grouping operations
16
def groupByKey(): RDD[(K, Iterable[V])]
17
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
18
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
19
20
// Reduction operations
21
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
22
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
23
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
24
def reduceByKeyLocally(func: (V, V) => V): Map[K, V]
25
26
// Aggregation operations
27
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
28
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
29
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
30
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
31
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
32
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
33
34
// Combine operations
35
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
36
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
37
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner): RDD[(K, C)]
38
39
// Join operations
40
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
41
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
42
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
43
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
44
def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]
45
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]
46
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
47
def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]
48
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]
49
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
50
def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]
51
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]
52
53
// CoGroup operations
54
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
55
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
56
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]
57
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
58
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
59
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
60
61
// Set operations
62
def subtractByKey[W](other: RDD[(K, W)]): RDD[(K, V)]
63
def subtractByKey[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
64
def subtractByKey[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, V)]
65
66
// Partitioning operations
67
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
68
69
// Utility operations
70
def keys: RDD[K]
71
def values: RDD[V]
72
def mapValues[U](f: V => U): RDD[(K, U)]
73
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]
74
def collectAsMap(): Map[K, V]
75
def countByKey(): Map[K, Long]
76
def countByKeyApprox(timeout: Long, confidence: Double = 0.95): PartialResult[Map[K, BoundedDouble]]
77
78
// Lookup operations
79
def lookup(key: K): Seq[V]
80
}
81
```
82
83
**Usage Examples:**
84
85
```scala
86
import org.apache.spark.{SparkContext, SparkConf}
87
88
val sc = new SparkContext(new SparkConf().setAppName("KeyValue Examples").setMaster("local[*]"))
89
90
// Create key-value RDDs
91
val pairs1 = sc.parallelize(Array(("a", 1), ("b", 2), ("a", 3), ("c", 4)))
92
val pairs2 = sc.parallelize(Array(("a", "x"), ("b", "y"), ("d", "z")))
93
94
// Grouping operations
95
val grouped = pairs1.groupByKey()
96
// Result: ("a", [1, 3]), ("b", [2]), ("c", [4])
97
98
// Reduction operations
99
val sumByKey = pairs1.reduceByKey(_ + _)
100
// Result: ("a", 4), ("b", 2), ("c", 4)
101
102
val counts = sc.textFile("hdfs://input.txt")
103
.flatMap(_.split(" "))
104
.map((_, 1))
105
.reduceByKey(_ + _)
106
107
// Aggregation operations
108
val avgByKey = pairs1.aggregateByKey((0, 0))(
109
(acc, value) => (acc._1 + value, acc._2 + 1),
110
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
111
).mapValues(acc => acc._1.toDouble / acc._2)
112
113
// Join operations
114
val innerJoin = pairs1.join(pairs2)
115
// Result: ("a", (1, "x")), ("a", (3, "x")), ("b", (2, "y"))
116
117
val leftJoin = pairs1.leftOuterJoin(pairs2)
118
// Result: ("a", (1, Some("x"))), ("a", (3, Some("x"))), ("b", (2, Some("y"))), ("c", (4, None))
119
120
val rightJoin = pairs1.rightOuterJoin(pairs2)
121
val fullJoin = pairs1.fullOuterJoin(pairs2)
122
123
// Utility operations
124
val keys = pairs1.keys // RDD["a", "b", "a", "c"]
125
val values = pairs1.values // RDD[1, 2, 3, 4]
126
val doubled = pairs1.mapValues(_ * 2) // ("a", 2), ("b", 4), ("a", 6), ("c", 8)
127
128
// Collection operations
129
val asMap = pairs1.collectAsMap() // Map("a" -> 3, "b" -> 2, "c" -> 4) - note: may lose duplicates
130
val keyCounts = pairs1.countByKey() // Map("a" -> 2, "b" -> 1, "c" -> 1)
131
132
// Lookup specific key
133
val aValues = pairs1.lookup("a") // Seq(1, 3)
134
```
135
136
### Sorting Operations (OrderedRDDFunctions)
137
138
Additional functions available on RDDs where the key is sortable.
139
140
```scala { .api }
141
/**
142
* Extra functions available on RDDs where the key is sortable
143
*/
144
class OrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag, P <: Product2[K, V] : ClassTag](
145
self: RDD[P]) {
146
147
/**
148
* Sort the RDD by key, with each partition containing a sorted range of elements
149
*/
150
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
151
152
/**
153
* Repartition the RDD according to the given partitioner and sort records by their keys within each partition
154
*/
155
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]
156
157
/**
158
* Return the key-value pairs in this RDD to the master as a Map
159
*/
160
def collectAsMap(): Map[K, V]
161
}
162
```
163
164
**Usage Examples:**
165
166
```scala
167
val wordCounts = sc.textFile("hdfs://input.txt")
168
.flatMap(_.split(" "))
169
.map((_, 1))
170
.reduceByKey(_ + _)
171
172
// Sort by key (alphabetically)
173
val sortedByWord = wordCounts.sortByKey(ascending = true)
174
175
// Sort by value (frequency) - need to swap key-value
176
val sortedByCount = wordCounts
177
.map(_.swap) // (count, word)
178
.sortByKey(ascending = false) // sort by count descending
179
.map(_.swap) // back to (word, count)
180
181
// Efficient repartition and sort
182
import org.apache.spark.HashPartitioner
183
val partitioned = wordCounts.repartitionAndSortWithinPartitions(new HashPartitioner(4))
184
```
185
186
### SequenceFile Operations
187
188
Functions for saving RDDs as Hadoop SequenceFiles.
189
190
```scala { .api }
191
/**
192
* Extra functions for saving RDDs as Hadoop SequenceFiles
193
*/
194
class SequenceFileRDDFunctions[K, V](self: RDD[(K, V)]) {
195
/**
196
* Output the RDD to any Hadoop-supported file system, using a Hadoop JobConf object for configuration
197
*/
198
def saveAsHadoopFile[F <: OutputFormat[K, V]](
199
path: String,
200
keyClass: Class[K],
201
valueClass: Class[V],
202
outputFormatClass: Class[F],
203
conf: JobConf = new JobConf(self.context.hadoopConfiguration),
204
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
205
206
/**
207
* Output the RDD to any Hadoop-supported file system, using the new Hadoop API
208
*/
209
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](
210
path: String,
211
keyClass: Class[K],
212
valueClass: Class[V],
213
outputFormatClass: Class[F],
214
conf: Configuration = self.context.hadoopConfiguration): Unit
215
216
/**
217
* Output the RDD as a Hadoop SequenceFile
218
*/
219
def saveAsSequenceFile(
220
path: String,
221
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
222
}
223
```
224
225
## Advanced Key-Value Patterns
226
227
### Complex Aggregations
228
229
```scala
230
case class Sale(product: String, amount: Double, quantity: Int)
231
232
val sales = sc.parallelize(Array(
233
Sale("laptop", 999.99, 1),
234
Sale("mouse", 29.99, 5),
235
Sale("laptop", 1199.99, 2),
236
Sale("mouse", 29.99, 3)
237
))
238
239
val salesByProduct = sales
240
.map(sale => (sale.product, sale))
241
.aggregateByKey((0.0, 0))(
242
// Sequence operation: aggregate within partition
243
(acc, sale) => (acc._1 + sale.amount * sale.quantity, acc._2 + sale.quantity),
244
// Combine operation: combine across partitions
245
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
246
)
247
.mapValues { case (totalRevenue, totalQuantity) =>
248
(totalRevenue, totalQuantity, if (totalQuantity > 0) totalRevenue / totalQuantity else 0.0)
249
}
250
251
// Result: (product, (totalRevenue, totalQuantity, avgPricePerUnit))
252
```
253
254
### Efficient Joins with Broadcasting
255
256
```scala
257
import org.apache.spark.broadcast.Broadcast
258
259
// Large dataset
260
val transactions = sc.textFile("hdfs://transactions.txt")
261
.map(_.split(","))
262
.map(fields => (fields(0), fields(1).toDouble)) // (userId, amount)
263
264
// Small lookup table
265
val userProfiles = sc.textFile("hdfs://users.txt")
266
.map(_.split(","))
267
.map(fields => (fields(0), fields(1))) // (userId, name)
268
.collectAsMap()
269
270
// Broadcast small dataset for efficient lookup
271
val broadcastProfiles: Broadcast[Map[String, String]] = sc.broadcast(userProfiles)
272
273
// Use broadcast variable instead of join
274
val enrichedTransactions = transactions.map { case (userId, amount) =>
275
val userName = broadcastProfiles.value.getOrElse(userId, "Unknown")
276
(userId, userName, amount)
277
}
278
```
279
280
### Custom Partitioning for Performance
281
282
```scala
283
import org.apache.spark.Partitioner
284
285
// Custom partitioner for geographic data
286
class RegionPartitioner(numPartitions: Int) extends Partitioner {
287
override def numPartitions: Int = numPartitions
288
289
override def getPartition(key: Any): Int = {
290
val region = key.asInstanceOf[String]
291
region match {
292
case r if r.startsWith("US") => 0
293
case r if r.startsWith("EU") => 1
294
case r if r.startsWith("ASIA") => 2
295
case _ => 3
296
}
297
}
298
}
299
300
val locationData = sc.parallelize(Array(
301
("US-CA", "data1"), ("EU-DE", "data2"), ("ASIA-JP", "data3")
302
))
303
304
// Apply custom partitioning
305
val partitioned = locationData.partitionBy(new RegionPartitioner(4))
306
307
// Subsequent joins will be more efficient
308
val moreLocationData = sc.parallelize(Array(
309
("US-CA", "moredata1"), ("EU-DE", "moredata2")
310
)).partitionBy(new RegionPartitioner(4))
311
312
val joined = partitioned.join(moreLocationData) // No shuffle needed!
313
```
314
315
### Window Operations with groupByKey
316
317
```scala
318
case class Event(timestamp: Long, userId: String, action: String)
319
320
val events = sc.parallelize(Array(
321
Event(1000, "user1", "login"),
322
Event(1010, "user1", "click"),
323
Event(1020, "user1", "logout"),
324
Event(1005, "user2", "login"),
325
Event(1015, "user2", "click")
326
))
327
328
// Group events by user and process in time windows
329
val userSessions = events
330
.map(event => (event.userId, event))
331
.groupByKey()
332
.mapValues { events =>
333
val sortedEvents = events.toList.sortBy(_.timestamp)
334
335
// Define session boundaries (30 second timeout)
336
val sessions = sortedEvents.foldLeft(List.empty[List[Event]]) { (sessions, event) =>
337
sessions match {
338
case Nil => List(List(event))
339
case currentSession :: otherSessions =>
340
if (event.timestamp - currentSession.last.timestamp <= 30000) {
341
// Add to current session
342
(currentSession :+ event) :: otherSessions
343
} else {
344
// Start new session
345
List(event) :: sessions
346
}
347
}
348
}
349
350
sessions.reverse.map(_.length) // Session lengths
351
}
352
```
353
354
## Performance Considerations
355
356
### Avoiding groupByKey
357
358
```scala
359
// INEFFICIENT: groupByKey followed by reduction
360
val wordCounts1 = words
361
.map((_, 1))
362
.groupByKey() // Shuffles all data
363
.mapValues(_.sum) // Reduces after shuffle
364
365
// EFFICIENT: Use reduceByKey instead
366
val wordCounts2 = words
367
.map((_, 1))
368
.reduceByKey(_ + _) // Reduces before shuffle
369
```
370
371
### Choosing the Right Operation
372
373
```scala
374
val data = sc.parallelize(Array(("a", 1), ("b", 2), ("a", 3)))
375
376
// Use combineByKey for complex aggregations
377
val stats = data.combineByKey(
378
(v: Int) => (v, 1), // Create combiner
379
(acc: (Int, Int), v: Int) => (acc._1 + v, acc._2 + 1), // Merge value
380
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // Merge combiners
381
).mapValues(acc => (acc._1, acc._2, acc._1.toDouble / acc._2)) // (sum, count, avg)
382
383
// Use aggregateByKey when you need different types
384
val letterStats = data.aggregateByKey("")(
385
(acc, v) => if (acc.isEmpty) v.toString else acc + "," + v,
386
(acc1, acc2) => acc1 + ";" + acc2
387
)
388
```