CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-apache-spark

Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R

Pending
Overview
Eval results
Files

key-value-operations.mddocs/

Key-Value Operations

When working with RDDs of (key, value) pairs, Spark provides specialized operations through implicit conversions. These operations are available when you import org.apache.spark.SparkContext._ and work with RDDs of type RDD[(K, V)].

PairRDDFunctions

class PairRDDFunctions[K, V](self: RDD[(K, V)]) extends Logging with Serializable {
  // Available through implicit conversion when importing org.apache.spark.SparkContext._
}

Setup and Imports

import org.apache.spark.SparkContext._  // Essential for PairRDDFunctions
import org.apache.spark.rdd.RDD
// Creating pair RDDs
val pairs: RDD[(String, Int)] = sc.parallelize(Seq(
  ("apple", 5), ("banana", 3), ("apple", 2), ("orange", 1)
))

val wordCounts: RDD[(String, Int)] = sc.textFile("file.txt")
  .flatMap(_.split(" "))
  .map(word => (word, 1))

Aggregation Operations

combineByKey

The most general aggregation function - all other aggregation operations are built on top of this:

def combineByKey[C](
  createCombiner: V => C,
  mergeValue: (C, V) => C,
  mergeCombiners: (C, C) => C,
  partitioner: Partitioner,
  mapSideCombine: Boolean = true,
  serializer: Serializer = null
): RDD[(K, C)]

// Convenience methods with default partitioner
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
// Calculate average per key using combineByKey
val data = sc.parallelize(Seq(("math", 85), ("english", 90), ("math", 92), ("english", 88)))

val averages = data.combineByKey(
  (score: Int) => (score, 1),                    // createCombiner: V => (sum, count)
  (acc: (Int, Int), score: Int) => (acc._1 + score, acc._2 + 1),  // mergeValue
  (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)  // mergeCombiners
).mapValues { case (sum, count) => sum.toDouble / count }

// Result: [("math", 88.5), ("english", 89.0)]

reduceByKey

Combine values with the same key using an associative function:

def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
def reduceByKey(numPartitions: Int, func: (V, V) => V): RDD[(K, V)]
val wordCounts = sc.textFile("document.txt")
  .flatMap(_.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _)  // Sum counts for each word

// With custom partitioner
val counts = data.reduceByKey(new HashPartitioner(4), _ + _)

reduceByKeyLocally

Reduce by key and return results to driver as a Map:

def reduceByKeyLocally(func: (V, V) => V): Map[K, V]
val localCounts: Map[String, Int] = wordCounts.reduceByKeyLocally(_ + _)
// Returns a local Map instead of an RDD

aggregateByKey

Aggregate values of each key with different result type:

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
// Calculate max, min, and count per key
val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 1), ("a", 2)))

val stats = data.aggregateByKey((Int.MinValue, Int.MaxValue, 0))(
  // seqOp: combine value with accumulator
  (acc, value) => (math.max(acc._1, value), math.min(acc._2, value), acc._3 + 1),
  // combOp: combine accumulators
  (acc1, acc2) => (math.max(acc1._1, acc2._1), math.min(acc1._2, acc2._2), acc1._3 + acc2._3)
)
// Result: [("a", (3, 1, 3)), ("b", (2, 1, 2))] - (max, min, count)

foldByKey

Fold values for each key with a zero value:

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))
val sums = data.foldByKey(0)(_ + _)
// Result: [("a", 4), ("b", 2)]

groupByKey

Group values by key (use with caution for large datasets):

def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 4)))
val grouped = data.groupByKey()
// Result: [("a", [1, 3]), ("b", [2, 4])]

// Note: groupByKey can cause out-of-memory errors for keys with many values
// Consider using reduceByKey, aggregateByKey, or combineByKey instead

Join Operations

join (Inner Join)

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
val rdd1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))
val rdd2 = sc.parallelize(Seq(("a", "x"), ("b", "y"), ("d", "z")))

val joined = rdd1.join(rdd2)
// Result: [("a", (1, "x")), ("b", (2, "y"))]
// Note: "c" and "d" are excluded (inner join)

leftOuterJoin

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]
def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]
val leftJoined = rdd1.leftOuterJoin(rdd2)
// Result: [("a", (1, Some("x"))), ("b", (2, Some("y"))), ("c", (3, None))]

rightOuterJoin

def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]
def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]
val rightJoined = rdd1.rightOuterJoin(rdd2)
// Result: [("a", (Some(1), "x")), ("b", (Some(2), "y")), ("d", (None, "z"))]

fullOuterJoin

def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]
def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]
val fullJoined = rdd1.fullOuterJoin(rdd2)
// Result: [("a", (Some(1), Some("x"))), ("b", (Some(2), Some("y"))), 
//          ("c", (Some(3), None)), ("d", (None, Some("z")))]

Cogroup Operations

cogroup (Group Together)

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]

// Multi-way cogroup
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
val rdd1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 3)))
val rdd2 = sc.parallelize(Seq(("a", "x"), ("c", "y")))

val cogrouped = rdd1.cogroup(rdd2)
// Result: [("a", ([1, 2], ["x"])), ("b", ([3], [])), ("c", ([], ["y"]))]

// Three-way cogroup
val rdd3 = sc.parallelize(Seq(("a", 10.0), ("b", 20.0)))
val threeway = rdd1.cogroup(rdd2, rdd3)

Key and Value Transformations

keys and values

def keys: RDD[K]     // Extract all keys
def values: RDD[V]   // Extract all values
val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))
val allKeys = pairs.keys      // RDD["a", "b", "a"]
val allValues = pairs.values  // RDD[1, 2, 3]

mapValues

Transform values while preserving keys and partitioning:

def mapValues[U](f: V => U): RDD[(K, U)]
val pairs = sc.parallelize(Seq(("alice", 25), ("bob", 30), ("charlie", 35)))
val incremented = pairs.mapValues(_ + 1)
// Result: [("alice", 26), ("bob", 31), ("charlie", 36)]

// mapValues preserves partitioning, unlike map
val transformed = pairs.mapValues(age => if (age >= 30) "adult" else "young")

flatMapValues

FlatMap values while preserving keys:

def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]
val sentences = sc.parallelize(Seq(
  ("doc1", "hello world"), 
  ("doc2", "spark scala")
))

val words = sentences.flatMapValues(_.split(" "))
// Result: [("doc1", "hello"), ("doc1", "world"), ("doc2", "spark"), ("doc2", "scala")]

Partitioning and Sorting

partitionBy

Partition RDD according to a partitioner:

def partitionBy(partitioner: Partitioner): RDD[(K, V)]
import org.apache.spark.HashPartitioner

val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("d", 4)))
val partitioned = data.partitionBy(new HashPartitioner(2))

// Custom partitioner
class CustomPartitioner(numPartitions: Int) extends Partitioner {
  def numPartitions: Int = numPartitions
  def getPartition(key: Any): Int = key.hashCode() % numPartitions
}

sortByKey

Sort by key:

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
val data = sc.parallelize(Seq(("c", 3), ("a", 1), ("b", 2)))
val sorted = data.sortByKey()                    // Ascending: [("a", 1), ("b", 2), ("c", 3)]
val descending = data.sortByKey(ascending = false) // Descending: [("c", 3), ("b", 2), ("a", 1)]

repartitionAndSortWithinPartitions

More efficient than separate repartition and sort:

def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]
val data = sc.parallelize(Seq(("c", 3), ("a", 1), ("b", 2), ("d", 4)))
val repartitioned = data.repartitionAndSortWithinPartitions(new HashPartitioner(2))
// Partitions data and sorts within each partition in one operation

Set Operations

subtractByKey

Return (key, value) pairs in this RDD but not in other:

def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
val rdd1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))
val rdd2 = sc.parallelize(Seq(("a", "x"), ("c", "y")))

val subtracted = rdd1.subtractByKey(rdd2)
// Result: [("b", 2)] - only pairs whose keys don't exist in rdd2

Statistical Operations

countByKey

Count the number of elements for each key:

def countByKey(): Map[K, Long]
val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5)))
val counts = data.countByKey()
// Result: Map("a" -> 3, "b" -> 2)

countByKeyApprox

Approximate count by key:

def countByKeyApprox(timeout: Long, confidence: Double = 0.95): PartialResult[Map[K, BoundedDouble]]
val largePairRDD = sc.parallelize(/* large dataset */)
val approxCounts = largePairRDD.countByKeyApprox(1000) // 1 second timeout

collectAsMap

Return the key-value pairs as a Map (assumes unique keys):

def collectAsMap(): Map[K, V]
val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))
val map = pairs.collectAsMap()
// Result: Map("a" -> 1, "b" -> 2, "c" -> 3)

// Warning: If there are duplicate keys, only one value per key is kept

Save Operations

saveAsHadoopFile

Save as Hadoop file with various output formats:

def saveAsHadoopFile[F <: OutputFormat[K, V]](
  path: String,
  keyClass: Class[_],
  valueClass: Class[_],
  outputFormatClass: Class[F],
  codec: Class[_ <: CompressionCodec]
): Unit

// Simplified versions
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String): Unit
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, codec: Class[_ <: CompressionCodec]): Unit

saveAsNewAPIHadoopFile

Save with new Hadoop API:

def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](
  path: String,
  keyClass: Class[_],
  valueClass: Class[_],
  outputFormatClass: Class[F],
  conf: Configuration = self.context.hadoopConfiguration
): Unit

saveAsSequenceFile

Save as Hadoop SequenceFile:

def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit
import org.apache.hadoop.io.{IntWritable, Text}

// For writable types
val writablePairs: RDD[(IntWritable, Text)] = pairs.map { 
  case (k, v) => (new IntWritable(k), new Text(v.toString)) 
}
writablePairs.saveAsSequenceFile("hdfs://path/to/output")

saveAsHadoopDataset

Save using Hadoop JobConf:

def saveAsHadoopDataset(conf: JobConf): Unit
import org.apache.hadoop.mapred.{JobConf, TextOutputFormat}

val jobConf = new JobConf()
jobConf.setOutputFormat(classOf[TextOutputFormat[String, Int]])
jobConf.setOutputKeyClass(classOf[String])
jobConf.setOutputValueClass(classOf[Int])

pairs.saveAsHadoopDataset(jobConf)

Performance Considerations

  1. Prefer reduceByKey over groupByKey: reduceByKey combines values locally before shuffling
  2. Use appropriate partitioners: Custom partitioners can reduce shuffle overhead
  3. Consider mapValues: Preserves partitioning, more efficient than map
  4. Avoid collectAsMap on large datasets: Brings all data to driver
  5. Use combineByKey for complex aggregations: Most flexible and efficient
// Efficient pattern
val wordCounts = textFile
  .flatMap(_.split(" "))
  .map((_, 1))
  .reduceByKey(_ + _)  // Combines locally before shuffle

// Less efficient pattern
val wordCountsSlow = textFile
  .flatMap(_.split(" "))
  .map((_, 1))
  .groupByKey()        // Shuffles all values
  .mapValues(_.sum)    // Then combines

Install with Tessl CLI

npx tessl i tessl/maven-apache-spark

docs

caching-persistence.md

core-rdd.md

data-sources.md

graphx.md

index.md

java-api.md

key-value-operations.md

mllib.md

python-api.md

spark-context.md

sql.md

streaming.md

tile.json