Spec RegistrySpec Registry

Help your agents use open-source better. Learn more.

Find usage specs for your project’s dependencies

>

maven-apache-spark

Description
Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
Author
tessl
Last updated

How to use

npx @tessl/cli registry install tessl/maven-apache-spark@1.0.0

key-value-operations.md docs/

1
# Key-Value Operations
2
3
When working with RDDs of (key, value) pairs, Spark provides specialized operations through implicit conversions. These operations are available when you import `org.apache.spark.SparkContext._` and work with RDDs of type `RDD[(K, V)]`.
4
5
## PairRDDFunctions
6
7
```scala { .api }
8
class PairRDDFunctions[K, V](self: RDD[(K, V)]) extends Logging with Serializable {
9
// Available through implicit conversion when importing org.apache.spark.SparkContext._
10
}
11
```
12
13
## Setup and Imports
14
15
```scala { .api }
16
import org.apache.spark.SparkContext._ // Essential for PairRDDFunctions
17
import org.apache.spark.rdd.RDD
18
```
19
20
```scala
21
// Creating pair RDDs
22
val pairs: RDD[(String, Int)] = sc.parallelize(Seq(
23
("apple", 5), ("banana", 3), ("apple", 2), ("orange", 1)
24
))
25
26
val wordCounts: RDD[(String, Int)] = sc.textFile("file.txt")
27
.flatMap(_.split(" "))
28
.map(word => (word, 1))
29
```
30
31
## Aggregation Operations
32
33
### combineByKey
34
35
The most general aggregation function - all other aggregation operations are built on top of this:
36
37
```scala { .api }
38
def combineByKey[C](
39
createCombiner: V => C,
40
mergeValue: (C, V) => C,
41
mergeCombiners: (C, C) => C,
42
partitioner: Partitioner,
43
mapSideCombine: Boolean = true,
44
serializer: Serializer = null
45
): RDD[(K, C)]
46
47
// Convenience methods with default partitioner
48
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
49
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
50
```
51
52
```scala
53
// Calculate average per key using combineByKey
54
val data = sc.parallelize(Seq(("math", 85), ("english", 90), ("math", 92), ("english", 88)))
55
56
val averages = data.combineByKey(
57
(score: Int) => (score, 1), // createCombiner: V => (sum, count)
58
(acc: (Int, Int), score: Int) => (acc._1 + score, acc._2 + 1), // mergeValue
59
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // mergeCombiners
60
).mapValues { case (sum, count) => sum.toDouble / count }
61
62
// Result: [("math", 88.5), ("english", 89.0)]
63
```
64
65
### reduceByKey
66
67
Combine values with the same key using an associative function:
68
69
```scala { .api }
70
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
71
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
72
def reduceByKey(numPartitions: Int, func: (V, V) => V): RDD[(K, V)]
73
```
74
75
```scala
76
val wordCounts = sc.textFile("document.txt")
77
.flatMap(_.split(" "))
78
.map(word => (word, 1))
79
.reduceByKey(_ + _) // Sum counts for each word
80
81
// With custom partitioner
82
val counts = data.reduceByKey(new HashPartitioner(4), _ + _)
83
```
84
85
### reduceByKeyLocally
86
87
Reduce by key and return results to driver as a Map:
88
89
```scala { .api }
90
def reduceByKeyLocally(func: (V, V) => V): Map[K, V]
91
```
92
93
```scala
94
val localCounts: Map[String, Int] = wordCounts.reduceByKeyLocally(_ + _)
95
// Returns a local Map instead of an RDD
96
```
97
98
### aggregateByKey
99
100
Aggregate values of each key with different result type:
101
102
```scala { .api }
103
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
104
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
105
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
106
```
107
108
```scala
109
// Calculate max, min, and count per key
110
val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 1), ("a", 2)))
111
112
val stats = data.aggregateByKey((Int.MinValue, Int.MaxValue, 0))(
113
// seqOp: combine value with accumulator
114
(acc, value) => (math.max(acc._1, value), math.min(acc._2, value), acc._3 + 1),
115
// combOp: combine accumulators
116
(acc1, acc2) => (math.max(acc1._1, acc2._1), math.min(acc1._2, acc2._2), acc1._3 + acc2._3)
117
)
118
// Result: [("a", (3, 1, 3)), ("b", (2, 1, 2))] - (max, min, count)
119
```
120
121
### foldByKey
122
123
Fold values for each key with a zero value:
124
125
```scala { .api }
126
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
127
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
128
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
129
```
130
131
```scala
132
val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))
133
val sums = data.foldByKey(0)(_ + _)
134
// Result: [("a", 4), ("b", 2)]
135
```
136
137
### groupByKey
138
139
Group values by key (use with caution for large datasets):
140
141
```scala { .api }
142
def groupByKey(): RDD[(K, Iterable[V])]
143
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
144
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
145
```
146
147
```scala
148
val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 4)))
149
val grouped = data.groupByKey()
150
// Result: [("a", [1, 3]), ("b", [2, 4])]
151
152
// Note: groupByKey can cause out-of-memory errors for keys with many values
153
// Consider using reduceByKey, aggregateByKey, or combineByKey instead
154
```
155
156
## Join Operations
157
158
### join (Inner Join)
159
160
```scala { .api }
161
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
162
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
163
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
164
```
165
166
```scala
167
val rdd1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))
168
val rdd2 = sc.parallelize(Seq(("a", "x"), ("b", "y"), ("d", "z")))
169
170
val joined = rdd1.join(rdd2)
171
// Result: [("a", (1, "x")), ("b", (2, "y"))]
172
// Note: "c" and "d" are excluded (inner join)
173
```
174
175
### leftOuterJoin
176
177
```scala { .api }
178
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
179
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]
180
def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]
181
```
182
183
```scala
184
val leftJoined = rdd1.leftOuterJoin(rdd2)
185
// Result: [("a", (1, Some("x"))), ("b", (2, Some("y"))), ("c", (3, None))]
186
```
187
188
### rightOuterJoin
189
190
```scala { .api }
191
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
192
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]
193
def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]
194
```
195
196
```scala
197
val rightJoined = rdd1.rightOuterJoin(rdd2)
198
// Result: [("a", (Some(1), "x")), ("b", (Some(2), "y")), ("d", (None, "z"))]
199
```
200
201
### fullOuterJoin
202
203
```scala { .api }
204
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
205
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]
206
def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]
207
```
208
209
```scala
210
val fullJoined = rdd1.fullOuterJoin(rdd2)
211
// Result: [("a", (Some(1), Some("x"))), ("b", (Some(2), Some("y"))),
212
// ("c", (Some(3), None)), ("d", (None, Some("z")))]
213
```
214
215
## Cogroup Operations
216
217
### cogroup (Group Together)
218
219
```scala { .api }
220
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
221
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]
222
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
223
224
// Multi-way cogroup
225
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
226
def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
227
```
228
229
```scala
230
val rdd1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 3)))
231
val rdd2 = sc.parallelize(Seq(("a", "x"), ("c", "y")))
232
233
val cogrouped = rdd1.cogroup(rdd2)
234
// Result: [("a", ([1, 2], ["x"])), ("b", ([3], [])), ("c", ([], ["y"]))]
235
236
// Three-way cogroup
237
val rdd3 = sc.parallelize(Seq(("a", 10.0), ("b", 20.0)))
238
val threeway = rdd1.cogroup(rdd2, rdd3)
239
```
240
241
## Key and Value Transformations
242
243
### keys and values
244
245
```scala { .api }
246
def keys: RDD[K] // Extract all keys
247
def values: RDD[V] // Extract all values
248
```
249
250
```scala
251
val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))
252
val allKeys = pairs.keys // RDD["a", "b", "a"]
253
val allValues = pairs.values // RDD[1, 2, 3]
254
```
255
256
### mapValues
257
258
Transform values while preserving keys and partitioning:
259
260
```scala { .api }
261
def mapValues[U](f: V => U): RDD[(K, U)]
262
```
263
264
```scala
265
val pairs = sc.parallelize(Seq(("alice", 25), ("bob", 30), ("charlie", 35)))
266
val incremented = pairs.mapValues(_ + 1)
267
// Result: [("alice", 26), ("bob", 31), ("charlie", 36)]
268
269
// mapValues preserves partitioning, unlike map
270
val transformed = pairs.mapValues(age => if (age >= 30) "adult" else "young")
271
```
272
273
### flatMapValues
274
275
FlatMap values while preserving keys:
276
277
```scala { .api }
278
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]
279
```
280
281
```scala
282
val sentences = sc.parallelize(Seq(
283
("doc1", "hello world"),
284
("doc2", "spark scala")
285
))
286
287
val words = sentences.flatMapValues(_.split(" "))
288
// Result: [("doc1", "hello"), ("doc1", "world"), ("doc2", "spark"), ("doc2", "scala")]
289
```
290
291
## Partitioning and Sorting
292
293
### partitionBy
294
295
Partition RDD according to a partitioner:
296
297
```scala { .api }
298
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
299
```
300
301
```scala
302
import org.apache.spark.HashPartitioner
303
304
val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("d", 4)))
305
val partitioned = data.partitionBy(new HashPartitioner(2))
306
307
// Custom partitioner
308
class CustomPartitioner(numPartitions: Int) extends Partitioner {
309
def numPartitions: Int = numPartitions
310
def getPartition(key: Any): Int = key.hashCode() % numPartitions
311
}
312
```
313
314
### sortByKey
315
316
Sort by key:
317
318
```scala { .api }
319
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
320
```
321
322
```scala
323
val data = sc.parallelize(Seq(("c", 3), ("a", 1), ("b", 2)))
324
val sorted = data.sortByKey() // Ascending: [("a", 1), ("b", 2), ("c", 3)]
325
val descending = data.sortByKey(ascending = false) // Descending: [("c", 3), ("b", 2), ("a", 1)]
326
```
327
328
### repartitionAndSortWithinPartitions
329
330
More efficient than separate repartition and sort:
331
332
```scala { .api }
333
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]
334
```
335
336
```scala
337
val data = sc.parallelize(Seq(("c", 3), ("a", 1), ("b", 2), ("d", 4)))
338
val repartitioned = data.repartitionAndSortWithinPartitions(new HashPartitioner(2))
339
// Partitions data and sorts within each partition in one operation
340
```
341
342
## Set Operations
343
344
### subtractByKey
345
346
Return (key, value) pairs in this RDD but not in other:
347
348
```scala { .api }
349
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
350
def subtractByKey[W: ClassTag](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, V)]
351
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
352
```
353
354
```scala
355
val rdd1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))
356
val rdd2 = sc.parallelize(Seq(("a", "x"), ("c", "y")))
357
358
val subtracted = rdd1.subtractByKey(rdd2)
359
// Result: [("b", 2)] - only pairs whose keys don't exist in rdd2
360
```
361
362
## Statistical Operations
363
364
### countByKey
365
366
Count the number of elements for each key:
367
368
```scala { .api }
369
def countByKey(): Map[K, Long]
370
```
371
372
```scala
373
val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5)))
374
val counts = data.countByKey()
375
// Result: Map("a" -> 3, "b" -> 2)
376
```
377
378
### countByKeyApprox
379
380
Approximate count by key:
381
382
```scala { .api }
383
def countByKeyApprox(timeout: Long, confidence: Double = 0.95): PartialResult[Map[K, BoundedDouble]]
384
```
385
386
```scala
387
val largePairRDD = sc.parallelize(/* large dataset */)
388
val approxCounts = largePairRDD.countByKeyApprox(1000) // 1 second timeout
389
```
390
391
### collectAsMap
392
393
Return the key-value pairs as a Map (assumes unique keys):
394
395
```scala { .api }
396
def collectAsMap(): Map[K, V]
397
```
398
399
```scala
400
val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))
401
val map = pairs.collectAsMap()
402
// Result: Map("a" -> 1, "b" -> 2, "c" -> 3)
403
404
// Warning: If there are duplicate keys, only one value per key is kept
405
```
406
407
## Save Operations
408
409
### saveAsHadoopFile
410
411
Save as Hadoop file with various output formats:
412
413
```scala { .api }
414
def saveAsHadoopFile[F <: OutputFormat[K, V]](
415
path: String,
416
keyClass: Class[_],
417
valueClass: Class[_],
418
outputFormatClass: Class[F],
419
codec: Class[_ <: CompressionCodec]
420
): Unit
421
422
// Simplified versions
423
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String): Unit
424
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, codec: Class[_ <: CompressionCodec]): Unit
425
```
426
427
### saveAsNewAPIHadoopFile
428
429
Save with new Hadoop API:
430
431
```scala { .api }
432
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](
433
path: String,
434
keyClass: Class[_],
435
valueClass: Class[_],
436
outputFormatClass: Class[F],
437
conf: Configuration = self.context.hadoopConfiguration
438
): Unit
439
```
440
441
### saveAsSequenceFile
442
443
Save as Hadoop SequenceFile:
444
445
```scala { .api }
446
def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit
447
```
448
449
```scala
450
import org.apache.hadoop.io.{IntWritable, Text}
451
452
// For writable types
453
val writablePairs: RDD[(IntWritable, Text)] = pairs.map {
454
case (k, v) => (new IntWritable(k), new Text(v.toString))
455
}
456
writablePairs.saveAsSequenceFile("hdfs://path/to/output")
457
```
458
459
### saveAsHadoopDataset
460
461
Save using Hadoop JobConf:
462
463
```scala { .api }
464
def saveAsHadoopDataset(conf: JobConf): Unit
465
```
466
467
```scala
468
import org.apache.hadoop.mapred.{JobConf, TextOutputFormat}
469
470
val jobConf = new JobConf()
471
jobConf.setOutputFormat(classOf[TextOutputFormat[String, Int]])
472
jobConf.setOutputKeyClass(classOf[String])
473
jobConf.setOutputValueClass(classOf[Int])
474
475
pairs.saveAsHadoopDataset(jobConf)
476
```
477
478
## Performance Considerations
479
480
1. **Prefer `reduceByKey` over `groupByKey`**: `reduceByKey` combines values locally before shuffling
481
2. **Use appropriate partitioners**: Custom partitioners can reduce shuffle overhead
482
3. **Consider `mapValues`**: Preserves partitioning, more efficient than `map`
483
4. **Avoid `collectAsMap` on large datasets**: Brings all data to driver
484
5. **Use `combineByKey` for complex aggregations**: Most flexible and efficient
485
486
```scala
487
// Efficient pattern
488
val wordCounts = textFile
489
.flatMap(_.split(" "))
490
.map((_, 1))
491
.reduceByKey(_ + _) // Combines locally before shuffle
492
493
// Less efficient pattern
494
val wordCountsSlow = textFile
495
.flatMap(_.split(" "))
496
.map((_, 1))
497
.groupByKey() // Shuffles all values
498
.mapValues(_.sum) // Then combines
499
```