Spec Registry
Help your agents use open-source better. Learn more.
Find usage specs for your project’s dependencies
- Author
- tessl
- Last updated
- Spec files
maven-apache-spark
Describes: maven/org.apache.spark/spark-parent
- Description
- Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
- Author
- tessl
- Last updated
key-value-operations.md docs/
1# Key-Value Operations23When working with RDDs of (key, value) pairs, Spark provides specialized operations through implicit conversions. These operations are available when you import `org.apache.spark.SparkContext._` and work with RDDs of type `RDD[(K, V)]`.45## PairRDDFunctions67```scala { .api }8class PairRDDFunctions[K, V](self: RDD[(K, V)]) extends Logging with Serializable {9// Available through implicit conversion when importing org.apache.spark.SparkContext._10}11```1213## Setup and Imports1415```scala { .api }16import org.apache.spark.SparkContext._ // Essential for PairRDDFunctions17import org.apache.spark.rdd.RDD18```1920```scala21// Creating pair RDDs22val pairs: RDD[(String, Int)] = sc.parallelize(Seq(23("apple", 5), ("banana", 3), ("apple", 2), ("orange", 1)24))2526val wordCounts: RDD[(String, Int)] = sc.textFile("file.txt")27.flatMap(_.split(" "))28.map(word => (word, 1))29```3031## Aggregation Operations3233### combineByKey3435The most general aggregation function - all other aggregation operations are built on top of this:3637```scala { .api }38def combineByKey[C](39createCombiner: V => C,40mergeValue: (C, V) => C,41mergeCombiners: (C, C) => C,42partitioner: Partitioner,43mapSideCombine: Boolean = true,44serializer: Serializer = null45): RDD[(K, C)]4647// Convenience methods with default partitioner48def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]49def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]50```5152```scala53// Calculate average per key using combineByKey54val data = sc.parallelize(Seq(("math", 85), ("english", 90), ("math", 92), ("english", 88)))5556val averages = data.combineByKey(57(score: Int) => (score, 1), // createCombiner: V => (sum, count)58(acc: (Int, Int), score: Int) => (acc._1 + score, acc._2 + 1), // mergeValue59(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // mergeCombiners60).mapValues { case (sum, count) => sum.toDouble / count }6162// Result: [("math", 88.5), ("english", 89.0)]63```6465### reduceByKey6667Combine values with the same key using an associative function:6869```scala { .api }70def reduceByKey(func: (V, V) => V): RDD[(K, V)]71def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]72def reduceByKey(numPartitions: Int, func: (V, V) => V): RDD[(K, V)]73```7475```scala76val wordCounts = sc.textFile("document.txt")77.flatMap(_.split(" "))78.map(word => (word, 1))79.reduceByKey(_ + _) // Sum counts for each word8081// With custom partitioner82val counts = data.reduceByKey(new HashPartitioner(4), _ + _)83```8485### reduceByKeyLocally8687Reduce by key and return results to driver as a Map:8889```scala { .api }90def reduceByKeyLocally(func: (V, V) => V): Map[K, V]91```9293```scala94val localCounts: Map[String, Int] = wordCounts.reduceByKeyLocally(_ + _)95// Returns a local Map instead of an RDD96```9798### aggregateByKey99100Aggregate values of each key with different result type:101102```scala { .api }103def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]104def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]105def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]106```107108```scala109// Calculate max, min, and count per key110val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 1), ("a", 2)))111112val stats = data.aggregateByKey((Int.MinValue, Int.MaxValue, 0))(113// seqOp: combine value with accumulator114(acc, value) => (math.max(acc._1, value), math.min(acc._2, value), acc._3 + 1),115// combOp: combine accumulators116(acc1, acc2) => (math.max(acc1._1, acc2._1), math.min(acc1._2, acc2._2), acc1._3 + acc2._3)117)118// Result: [("a", (3, 1, 3)), ("b", (2, 1, 2))] - (max, min, count)119```120121### foldByKey122123Fold values for each key with a zero value:124125```scala { .api }126def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]127def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]128def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]129```130131```scala132val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))133val sums = data.foldByKey(0)(_ + _)134// Result: [("a", 4), ("b", 2)]135```136137### groupByKey138139Group values by key (use with caution for large datasets):140141```scala { .api }142def groupByKey(): RDD[(K, Iterable[V])]143def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]144def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]145```146147```scala148val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 4)))149val grouped = data.groupByKey()150// Result: [("a", [1, 3]), ("b", [2, 4])]151152// Note: groupByKey can cause out-of-memory errors for keys with many values153// Consider using reduceByKey, aggregateByKey, or combineByKey instead154```155156## Join Operations157158### join (Inner Join)159160```scala { .api }161def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]162def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]163def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]164```165166```scala167val rdd1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))168val rdd2 = sc.parallelize(Seq(("a", "x"), ("b", "y"), ("d", "z")))169170val joined = rdd1.join(rdd2)171// Result: [("a", (1, "x")), ("b", (2, "y"))]172// Note: "c" and "d" are excluded (inner join)173```174175### leftOuterJoin176177```scala { .api }178def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]179def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]180def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]181```182183```scala184val leftJoined = rdd1.leftOuterJoin(rdd2)185// Result: [("a", (1, Some("x"))), ("b", (2, Some("y"))), ("c", (3, None))]186```187188### rightOuterJoin189190```scala { .api }191def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]192def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]193def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]194```195196```scala197val rightJoined = rdd1.rightOuterJoin(rdd2)198// Result: [("a", (Some(1), "x")), ("b", (Some(2), "y")), ("d", (None, "z"))]199```200201### fullOuterJoin202203```scala { .api }204def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]205def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]206def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]207```208209```scala210val fullJoined = rdd1.fullOuterJoin(rdd2)211// Result: [("a", (Some(1), Some("x"))), ("b", (Some(2), Some("y"))),212// ("c", (Some(3), None)), ("d", (None, Some("z")))]213```214215## Cogroup Operations216217### cogroup (Group Together)218219```scala { .api }220def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]221def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]222def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]223224// Multi-way cogroup225def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]226def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]227```228229```scala230val rdd1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 3)))231val rdd2 = sc.parallelize(Seq(("a", "x"), ("c", "y")))232233val cogrouped = rdd1.cogroup(rdd2)234// Result: [("a", ([1, 2], ["x"])), ("b", ([3], [])), ("c", ([], ["y"]))]235236// Three-way cogroup237val rdd3 = sc.parallelize(Seq(("a", 10.0), ("b", 20.0)))238val threeway = rdd1.cogroup(rdd2, rdd3)239```240241## Key and Value Transformations242243### keys and values244245```scala { .api }246def keys: RDD[K] // Extract all keys247def values: RDD[V] // Extract all values248```249250```scala251val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))252val allKeys = pairs.keys // RDD["a", "b", "a"]253val allValues = pairs.values // RDD[1, 2, 3]254```255256### mapValues257258Transform values while preserving keys and partitioning:259260```scala { .api }261def mapValues[U](f: V => U): RDD[(K, U)]262```263264```scala265val pairs = sc.parallelize(Seq(("alice", 25), ("bob", 30), ("charlie", 35)))266val incremented = pairs.mapValues(_ + 1)267// Result: [("alice", 26), ("bob", 31), ("charlie", 36)]268269// mapValues preserves partitioning, unlike map270val transformed = pairs.mapValues(age => if (age >= 30) "adult" else "young")271```272273### flatMapValues274275FlatMap values while preserving keys:276277```scala { .api }278def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]279```280281```scala282val sentences = sc.parallelize(Seq(283("doc1", "hello world"),284("doc2", "spark scala")285))286287val words = sentences.flatMapValues(_.split(" "))288// Result: [("doc1", "hello"), ("doc1", "world"), ("doc2", "spark"), ("doc2", "scala")]289```290291## Partitioning and Sorting292293### partitionBy294295Partition RDD according to a partitioner:296297```scala { .api }298def partitionBy(partitioner: Partitioner): RDD[(K, V)]299```300301```scala302import org.apache.spark.HashPartitioner303304val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("d", 4)))305val partitioned = data.partitionBy(new HashPartitioner(2))306307// Custom partitioner308class CustomPartitioner(numPartitions: Int) extends Partitioner {309def numPartitions: Int = numPartitions310def getPartition(key: Any): Int = key.hashCode() % numPartitions311}312```313314### sortByKey315316Sort by key:317318```scala { .api }319def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]320```321322```scala323val data = sc.parallelize(Seq(("c", 3), ("a", 1), ("b", 2)))324val sorted = data.sortByKey() // Ascending: [("a", 1), ("b", 2), ("c", 3)]325val descending = data.sortByKey(ascending = false) // Descending: [("c", 3), ("b", 2), ("a", 1)]326```327328### repartitionAndSortWithinPartitions329330More efficient than separate repartition and sort:331332```scala { .api }333def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]334```335336```scala337val data = sc.parallelize(Seq(("c", 3), ("a", 1), ("b", 2), ("d", 4)))338val repartitioned = data.repartitionAndSortWithinPartitions(new HashPartitioner(2))339// Partitions data and sorts within each partition in one operation340```341342## Set Operations343344### subtractByKey345346Return (key, value) pairs in this RDD but not in other:347348```scala { .api }349def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]350def subtractByKey[W: ClassTag](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, V)]351def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]352```353354```scala355val rdd1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))356val rdd2 = sc.parallelize(Seq(("a", "x"), ("c", "y")))357358val subtracted = rdd1.subtractByKey(rdd2)359// Result: [("b", 2)] - only pairs whose keys don't exist in rdd2360```361362## Statistical Operations363364### countByKey365366Count the number of elements for each key:367368```scala { .api }369def countByKey(): Map[K, Long]370```371372```scala373val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5)))374val counts = data.countByKey()375// Result: Map("a" -> 3, "b" -> 2)376```377378### countByKeyApprox379380Approximate count by key:381382```scala { .api }383def countByKeyApprox(timeout: Long, confidence: Double = 0.95): PartialResult[Map[K, BoundedDouble]]384```385386```scala387val largePairRDD = sc.parallelize(/* large dataset */)388val approxCounts = largePairRDD.countByKeyApprox(1000) // 1 second timeout389```390391### collectAsMap392393Return the key-value pairs as a Map (assumes unique keys):394395```scala { .api }396def collectAsMap(): Map[K, V]397```398399```scala400val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))401val map = pairs.collectAsMap()402// Result: Map("a" -> 1, "b" -> 2, "c" -> 3)403404// Warning: If there are duplicate keys, only one value per key is kept405```406407## Save Operations408409### saveAsHadoopFile410411Save as Hadoop file with various output formats:412413```scala { .api }414def saveAsHadoopFile[F <: OutputFormat[K, V]](415path: String,416keyClass: Class[_],417valueClass: Class[_],418outputFormatClass: Class[F],419codec: Class[_ <: CompressionCodec]420): Unit421422// Simplified versions423def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String): Unit424def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, codec: Class[_ <: CompressionCodec]): Unit425```426427### saveAsNewAPIHadoopFile428429Save with new Hadoop API:430431```scala { .api }432def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](433path: String,434keyClass: Class[_],435valueClass: Class[_],436outputFormatClass: Class[F],437conf: Configuration = self.context.hadoopConfiguration438): Unit439```440441### saveAsSequenceFile442443Save as Hadoop SequenceFile:444445```scala { .api }446def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit447```448449```scala450import org.apache.hadoop.io.{IntWritable, Text}451452// For writable types453val writablePairs: RDD[(IntWritable, Text)] = pairs.map {454case (k, v) => (new IntWritable(k), new Text(v.toString))455}456writablePairs.saveAsSequenceFile("hdfs://path/to/output")457```458459### saveAsHadoopDataset460461Save using Hadoop JobConf:462463```scala { .api }464def saveAsHadoopDataset(conf: JobConf): Unit465```466467```scala468import org.apache.hadoop.mapred.{JobConf, TextOutputFormat}469470val jobConf = new JobConf()471jobConf.setOutputFormat(classOf[TextOutputFormat[String, Int]])472jobConf.setOutputKeyClass(classOf[String])473jobConf.setOutputValueClass(classOf[Int])474475pairs.saveAsHadoopDataset(jobConf)476```477478## Performance Considerations4794801. **Prefer `reduceByKey` over `groupByKey`**: `reduceByKey` combines values locally before shuffling4812. **Use appropriate partitioners**: Custom partitioners can reduce shuffle overhead4823. **Consider `mapValues`**: Preserves partitioning, more efficient than `map`4834. **Avoid `collectAsMap` on large datasets**: Brings all data to driver4845. **Use `combineByKey` for complex aggregations**: Most flexible and efficient485486```scala487// Efficient pattern488val wordCounts = textFile489.flatMap(_.split(" "))490.map((_, 1))491.reduceByKey(_ + _) // Combines locally before shuffle492493// Less efficient pattern494val wordCountsSlow = textFile495.flatMap(_.split(" "))496.map((_, 1))497.groupByKey() // Shuffles all values498.mapValues(_.sum) // Then combines499```