Specialized operations for RDDs containing key-value pairs, providing powerful data processing capabilities including joins, grouping, and aggregation operations.
Extra functions available on RDDs of (key, value) pairs through implicit conversion.
/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion
*/
class PairRDDFunctions[K, V](self: RDD[(K, V)]) {
// Grouping operations
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
// Reduction operations
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
def reduceByKeyLocally(func: (V, V) => V): Map[K, V]
// Aggregation operations
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
// Combine operations
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner): RDD[(K, C)]
// Join operations
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]
// CoGroup operations
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
// Set operations
def subtractByKey[W](other: RDD[(K, W)]): RDD[(K, V)]
def subtractByKey[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
def subtractByKey[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, V)]
// Partitioning operations
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
// Utility operations
def keys: RDD[K]
def values: RDD[V]
def mapValues[U](f: V => U): RDD[(K, U)]
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]
def collectAsMap(): Map[K, V]
def countByKey(): Map[K, Long]
def countByKeyApprox(timeout: Long, confidence: Double = 0.95): PartialResult[Map[K, BoundedDouble]]
// Lookup operations
def lookup(key: K): Seq[V]
}Usage Examples:
import org.apache.spark.{SparkContext, SparkConf}
val sc = new SparkContext(new SparkConf().setAppName("KeyValue Examples").setMaster("local[*]"))
// Create key-value RDDs
val pairs1 = sc.parallelize(Array(("a", 1), ("b", 2), ("a", 3), ("c", 4)))
val pairs2 = sc.parallelize(Array(("a", "x"), ("b", "y"), ("d", "z")))
// Grouping operations
val grouped = pairs1.groupByKey()
// Result: ("a", [1, 3]), ("b", [2]), ("c", [4])
// Reduction operations
val sumByKey = pairs1.reduceByKey(_ + _)
// Result: ("a", 4), ("b", 2), ("c", 4)
val counts = sc.textFile("hdfs://input.txt")
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
// Aggregation operations
val avgByKey = pairs1.aggregateByKey((0, 0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).mapValues(acc => acc._1.toDouble / acc._2)
// Join operations
val innerJoin = pairs1.join(pairs2)
// Result: ("a", (1, "x")), ("a", (3, "x")), ("b", (2, "y"))
val leftJoin = pairs1.leftOuterJoin(pairs2)
// Result: ("a", (1, Some("x"))), ("a", (3, Some("x"))), ("b", (2, Some("y"))), ("c", (4, None))
val rightJoin = pairs1.rightOuterJoin(pairs2)
val fullJoin = pairs1.fullOuterJoin(pairs2)
// Utility operations
val keys = pairs1.keys // RDD["a", "b", "a", "c"]
val values = pairs1.values // RDD[1, 2, 3, 4]
val doubled = pairs1.mapValues(_ * 2) // ("a", 2), ("b", 4), ("a", 6), ("c", 8)
// Collection operations
val asMap = pairs1.collectAsMap() // Map("a" -> 3, "b" -> 2, "c" -> 4) - note: may lose duplicates
val keyCounts = pairs1.countByKey() // Map("a" -> 2, "b" -> 1, "c" -> 1)
// Lookup specific key
val aValues = pairs1.lookup("a") // Seq(1, 3)Additional functions available on RDDs where the key is sortable.
/**
* Extra functions available on RDDs where the key is sortable
*/
class OrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag, P <: Product2[K, V] : ClassTag](
self: RDD[P]) {
/**
* Sort the RDD by key, with each partition containing a sorted range of elements
*/
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
/**
* Repartition the RDD according to the given partitioner and sort records by their keys within each partition
*/
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]
/**
* Return the key-value pairs in this RDD to the master as a Map
*/
def collectAsMap(): Map[K, V]
}Usage Examples:
val wordCounts = sc.textFile("hdfs://input.txt")
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
// Sort by key (alphabetically)
val sortedByWord = wordCounts.sortByKey(ascending = true)
// Sort by value (frequency) - need to swap key-value
val sortedByCount = wordCounts
.map(_.swap) // (count, word)
.sortByKey(ascending = false) // sort by count descending
.map(_.swap) // back to (word, count)
// Efficient repartition and sort
import org.apache.spark.HashPartitioner
val partitioned = wordCounts.repartitionAndSortWithinPartitions(new HashPartitioner(4))Functions for saving RDDs as Hadoop SequenceFiles.
/**
* Extra functions for saving RDDs as Hadoop SequenceFiles
*/
class SequenceFileRDDFunctions[K, V](self: RDD[(K, V)]) {
/**
* Output the RDD to any Hadoop-supported file system, using a Hadoop JobConf object for configuration
*/
def saveAsHadoopFile[F <: OutputFormat[K, V]](
path: String,
keyClass: Class[K],
valueClass: Class[V],
outputFormatClass: Class[F],
conf: JobConf = new JobConf(self.context.hadoopConfiguration),
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
/**
* Output the RDD to any Hadoop-supported file system, using the new Hadoop API
*/
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](
path: String,
keyClass: Class[K],
valueClass: Class[V],
outputFormatClass: Class[F],
conf: Configuration = self.context.hadoopConfiguration): Unit
/**
* Output the RDD as a Hadoop SequenceFile
*/
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
}case class Sale(product: String, amount: Double, quantity: Int)
val sales = sc.parallelize(Array(
Sale("laptop", 999.99, 1),
Sale("mouse", 29.99, 5),
Sale("laptop", 1199.99, 2),
Sale("mouse", 29.99, 3)
))
val salesByProduct = sales
.map(sale => (sale.product, sale))
.aggregateByKey((0.0, 0))(
// Sequence operation: aggregate within partition
(acc, sale) => (acc._1 + sale.amount * sale.quantity, acc._2 + sale.quantity),
// Combine operation: combine across partitions
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
.mapValues { case (totalRevenue, totalQuantity) =>
(totalRevenue, totalQuantity, if (totalQuantity > 0) totalRevenue / totalQuantity else 0.0)
}
// Result: (product, (totalRevenue, totalQuantity, avgPricePerUnit))import org.apache.spark.broadcast.Broadcast
// Large dataset
val transactions = sc.textFile("hdfs://transactions.txt")
.map(_.split(","))
.map(fields => (fields(0), fields(1).toDouble)) // (userId, amount)
// Small lookup table
val userProfiles = sc.textFile("hdfs://users.txt")
.map(_.split(","))
.map(fields => (fields(0), fields(1))) // (userId, name)
.collectAsMap()
// Broadcast small dataset for efficient lookup
val broadcastProfiles: Broadcast[Map[String, String]] = sc.broadcast(userProfiles)
// Use broadcast variable instead of join
val enrichedTransactions = transactions.map { case (userId, amount) =>
val userName = broadcastProfiles.value.getOrElse(userId, "Unknown")
(userId, userName, amount)
}import org.apache.spark.Partitioner
// Custom partitioner for geographic data
class RegionPartitioner(numPartitions: Int) extends Partitioner {
override def numPartitions: Int = numPartitions
override def getPartition(key: Any): Int = {
val region = key.asInstanceOf[String]
region match {
case r if r.startsWith("US") => 0
case r if r.startsWith("EU") => 1
case r if r.startsWith("ASIA") => 2
case _ => 3
}
}
}
val locationData = sc.parallelize(Array(
("US-CA", "data1"), ("EU-DE", "data2"), ("ASIA-JP", "data3")
))
// Apply custom partitioning
val partitioned = locationData.partitionBy(new RegionPartitioner(4))
// Subsequent joins will be more efficient
val moreLocationData = sc.parallelize(Array(
("US-CA", "moredata1"), ("EU-DE", "moredata2")
)).partitionBy(new RegionPartitioner(4))
val joined = partitioned.join(moreLocationData) // No shuffle needed!case class Event(timestamp: Long, userId: String, action: String)
val events = sc.parallelize(Array(
Event(1000, "user1", "login"),
Event(1010, "user1", "click"),
Event(1020, "user1", "logout"),
Event(1005, "user2", "login"),
Event(1015, "user2", "click")
))
// Group events by user and process in time windows
val userSessions = events
.map(event => (event.userId, event))
.groupByKey()
.mapValues { events =>
val sortedEvents = events.toList.sortBy(_.timestamp)
// Define session boundaries (30 second timeout)
val sessions = sortedEvents.foldLeft(List.empty[List[Event]]) { (sessions, event) =>
sessions match {
case Nil => List(List(event))
case currentSession :: otherSessions =>
if (event.timestamp - currentSession.last.timestamp <= 30000) {
// Add to current session
(currentSession :+ event) :: otherSessions
} else {
// Start new session
List(event) :: sessions
}
}
}
sessions.reverse.map(_.length) // Session lengths
}// INEFFICIENT: groupByKey followed by reduction
val wordCounts1 = words
.map((_, 1))
.groupByKey() // Shuffles all data
.mapValues(_.sum) // Reduces after shuffle
// EFFICIENT: Use reduceByKey instead
val wordCounts2 = words
.map((_, 1))
.reduceByKey(_ + _) // Reduces before shuffleval data = sc.parallelize(Array(("a", 1), ("b", 2), ("a", 3)))
// Use combineByKey for complex aggregations
val stats = data.combineByKey(
(v: Int) => (v, 1), // Create combiner
(acc: (Int, Int), v: Int) => (acc._1 + v, acc._2 + 1), // Merge value
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // Merge combiners
).mapValues(acc => (acc._1, acc._2, acc._1.toDouble / acc._2)) // (sum, count, avg)
// Use aggregateByKey when you need different types
val letterStats = data.aggregateByKey("")(
(acc, v) => if (acc.isEmpty) v.toString else acc + "," + v,
(acc1, acc2) => acc1 + ";" + acc2
)