Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
—
When working with RDDs of (key, value) pairs, Spark provides specialized operations through implicit conversions. These operations are available when you import org.apache.spark.SparkContext._ and work with RDDs of type RDD[(K, V)].
class PairRDDFunctions[K, V](self: RDD[(K, V)]) extends Logging with Serializable {
// Available through implicit conversion when importing org.apache.spark.SparkContext._
}import org.apache.spark.SparkContext._ // Essential for PairRDDFunctions
import org.apache.spark.rdd.RDD// Creating pair RDDs
val pairs: RDD[(String, Int)] = sc.parallelize(Seq(
("apple", 5), ("banana", 3), ("apple", 2), ("orange", 1)
))
val wordCounts: RDD[(String, Int)] = sc.textFile("file.txt")
.flatMap(_.split(" "))
.map(word => (word, 1))The most general aggregation function - all other aggregation operations are built on top of this:
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null
): RDD[(K, C)]
// Convenience methods with default partitioner
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]// Calculate average per key using combineByKey
val data = sc.parallelize(Seq(("math", 85), ("english", 90), ("math", 92), ("english", 88)))
val averages = data.combineByKey(
(score: Int) => (score, 1), // createCombiner: V => (sum, count)
(acc: (Int, Int), score: Int) => (acc._1 + score, acc._2 + 1), // mergeValue
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // mergeCombiners
).mapValues { case (sum, count) => sum.toDouble / count }
// Result: [("math", 88.5), ("english", 89.0)]Combine values with the same key using an associative function:
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
def reduceByKey(numPartitions: Int, func: (V, V) => V): RDD[(K, V)]val wordCounts = sc.textFile("document.txt")
.flatMap(_.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _) // Sum counts for each word
// With custom partitioner
val counts = data.reduceByKey(new HashPartitioner(4), _ + _)Reduce by key and return results to driver as a Map:
def reduceByKeyLocally(func: (V, V) => V): Map[K, V]val localCounts: Map[String, Int] = wordCounts.reduceByKeyLocally(_ + _)
// Returns a local Map instead of an RDDAggregate values of each key with different result type:
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]// Calculate max, min, and count per key
val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 1), ("a", 2)))
val stats = data.aggregateByKey((Int.MinValue, Int.MaxValue, 0))(
// seqOp: combine value with accumulator
(acc, value) => (math.max(acc._1, value), math.min(acc._2, value), acc._3 + 1),
// combOp: combine accumulators
(acc1, acc2) => (math.max(acc1._1, acc2._1), math.min(acc1._2, acc2._2), acc1._3 + acc2._3)
)
// Result: [("a", (3, 1, 3)), ("b", (2, 1, 2))] - (max, min, count)Fold values for each key with a zero value:
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))
val sums = data.foldByKey(0)(_ + _)
// Result: [("a", 4), ("b", 2)]Group values by key (use with caution for large datasets):
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 4)))
val grouped = data.groupByKey()
// Result: [("a", [1, 3]), ("b", [2, 4])]
// Note: groupByKey can cause out-of-memory errors for keys with many values
// Consider using reduceByKey, aggregateByKey, or combineByKey insteaddef join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]val rdd1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))
val rdd2 = sc.parallelize(Seq(("a", "x"), ("b", "y"), ("d", "z")))
val joined = rdd1.join(rdd2)
// Result: [("a", (1, "x")), ("b", (2, "y"))]
// Note: "c" and "d" are excluded (inner join)def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]
def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]val leftJoined = rdd1.leftOuterJoin(rdd2)
// Result: [("a", (1, Some("x"))), ("b", (2, Some("y"))), ("c", (3, None))]def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]
def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]val rightJoined = rdd1.rightOuterJoin(rdd2)
// Result: [("a", (Some(1), "x")), ("b", (Some(2), "y")), ("d", (None, "z"))]def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]
def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]val fullJoined = rdd1.fullOuterJoin(rdd2)
// Result: [("a", (Some(1), Some("x"))), ("b", (Some(2), Some("y"))),
// ("c", (Some(3), None)), ("d", (None, Some("z")))]def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
// Multi-way cogroup
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]val rdd1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 3)))
val rdd2 = sc.parallelize(Seq(("a", "x"), ("c", "y")))
val cogrouped = rdd1.cogroup(rdd2)
// Result: [("a", ([1, 2], ["x"])), ("b", ([3], [])), ("c", ([], ["y"]))]
// Three-way cogroup
val rdd3 = sc.parallelize(Seq(("a", 10.0), ("b", 20.0)))
val threeway = rdd1.cogroup(rdd2, rdd3)def keys: RDD[K] // Extract all keys
def values: RDD[V] // Extract all valuesval pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))
val allKeys = pairs.keys // RDD["a", "b", "a"]
val allValues = pairs.values // RDD[1, 2, 3]Transform values while preserving keys and partitioning:
def mapValues[U](f: V => U): RDD[(K, U)]val pairs = sc.parallelize(Seq(("alice", 25), ("bob", 30), ("charlie", 35)))
val incremented = pairs.mapValues(_ + 1)
// Result: [("alice", 26), ("bob", 31), ("charlie", 36)]
// mapValues preserves partitioning, unlike map
val transformed = pairs.mapValues(age => if (age >= 30) "adult" else "young")FlatMap values while preserving keys:
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]val sentences = sc.parallelize(Seq(
("doc1", "hello world"),
("doc2", "spark scala")
))
val words = sentences.flatMapValues(_.split(" "))
// Result: [("doc1", "hello"), ("doc1", "world"), ("doc2", "spark"), ("doc2", "scala")]Partition RDD according to a partitioner:
def partitionBy(partitioner: Partitioner): RDD[(K, V)]import org.apache.spark.HashPartitioner
val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("d", 4)))
val partitioned = data.partitionBy(new HashPartitioner(2))
// Custom partitioner
class CustomPartitioner(numPartitions: Int) extends Partitioner {
def numPartitions: Int = numPartitions
def getPartition(key: Any): Int = key.hashCode() % numPartitions
}Sort by key:
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]val data = sc.parallelize(Seq(("c", 3), ("a", 1), ("b", 2)))
val sorted = data.sortByKey() // Ascending: [("a", 1), ("b", 2), ("c", 3)]
val descending = data.sortByKey(ascending = false) // Descending: [("c", 3), ("b", 2), ("a", 1)]More efficient than separate repartition and sort:
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]val data = sc.parallelize(Seq(("c", 3), ("a", 1), ("b", 2), ("d", 4)))
val repartitioned = data.repartitionAndSortWithinPartitions(new HashPartitioner(2))
// Partitions data and sorts within each partition in one operationReturn (key, value) pairs in this RDD but not in other:
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]val rdd1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))
val rdd2 = sc.parallelize(Seq(("a", "x"), ("c", "y")))
val subtracted = rdd1.subtractByKey(rdd2)
// Result: [("b", 2)] - only pairs whose keys don't exist in rdd2Count the number of elements for each key:
def countByKey(): Map[K, Long]val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5)))
val counts = data.countByKey()
// Result: Map("a" -> 3, "b" -> 2)Approximate count by key:
def countByKeyApprox(timeout: Long, confidence: Double = 0.95): PartialResult[Map[K, BoundedDouble]]val largePairRDD = sc.parallelize(/* large dataset */)
val approxCounts = largePairRDD.countByKeyApprox(1000) // 1 second timeoutReturn the key-value pairs as a Map (assumes unique keys):
def collectAsMap(): Map[K, V]val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))
val map = pairs.collectAsMap()
// Result: Map("a" -> 1, "b" -> 2, "c" -> 3)
// Warning: If there are duplicate keys, only one value per key is keptSave as Hadoop file with various output formats:
def saveAsHadoopFile[F <: OutputFormat[K, V]](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
codec: Class[_ <: CompressionCodec]
): Unit
// Simplified versions
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String): Unit
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, codec: Class[_ <: CompressionCodec]): UnitSave with new Hadoop API:
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
conf: Configuration = self.context.hadoopConfiguration
): UnitSave as Hadoop SequenceFile:
def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unitimport org.apache.hadoop.io.{IntWritable, Text}
// For writable types
val writablePairs: RDD[(IntWritable, Text)] = pairs.map {
case (k, v) => (new IntWritable(k), new Text(v.toString))
}
writablePairs.saveAsSequenceFile("hdfs://path/to/output")Save using Hadoop JobConf:
def saveAsHadoopDataset(conf: JobConf): Unitimport org.apache.hadoop.mapred.{JobConf, TextOutputFormat}
val jobConf = new JobConf()
jobConf.setOutputFormat(classOf[TextOutputFormat[String, Int]])
jobConf.setOutputKeyClass(classOf[String])
jobConf.setOutputValueClass(classOf[Int])
pairs.saveAsHadoopDataset(jobConf)reduceByKey over groupByKey: reduceByKey combines values locally before shufflingmapValues: Preserves partitioning, more efficient than mapcollectAsMap on large datasets: Brings all data to drivercombineByKey for complex aggregations: Most flexible and efficient// Efficient pattern
val wordCounts = textFile
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _) // Combines locally before shuffle
// Less efficient pattern
val wordCountsSlow = textFile
.flatMap(_.split(" "))
.map((_, 1))
.groupByKey() // Shuffles all values
.mapValues(_.sum) // Then combinesInstall with Tessl CLI
npx tessl i tessl/maven-apache-spark