Apache Spark Core - The foundational component of Apache Spark providing distributed computing capabilities including RDDs, transformations, actions, and cluster management.
—
Key-value operations in Spark are performed on RDDs of type RDD[(K, V)] where K is the key type and V is the value type. These operations are made available through implicit conversions to PairRDDFunctions, providing powerful aggregation, grouping, and join capabilities.
class PairRDDFunctions[K, V](self: RDD[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) {
// Basic Key-Value Operations
def keys: RDD[K]
def values: RDD[V]
def mapValues[U](f: V => U): RDD[(K, U)]
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]
def swapKV()(implicit vt: ClassTag[V], kt: ClassTag[K]): RDD[(V, K)]
// Grouping Operations
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
// Reduction Operations
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
def reduceByKeyLocally(func: (V, V) => V): Map[K, V]
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
// Join Operations
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]
def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]
def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]
def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]
// Cogroup Operations
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
// Sorting Operations
def sortByKey(ascending: Boolean = true): RDD[(K, V)]
def sortByKey(ascending: Boolean, numPartitions: Int): RDD[(K, V)]
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]
// Partitioning Operations
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
// Collection Operations
def collectAsMap(): Map[K, V]
def countByKey(): Map[K, Long]
def countByKeyApprox(timeout: Long, confidence: Double = 0.95): PartialResult[Map[K, BoundedDouble]]
// Lookup Operations
def lookup(key: K): Seq[V]
// Subtraction Operations
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]
// Sampling Operations
def sampleByKey(withReplacement: Boolean, fractions: Map[K, Double], seed: Long = Utils.random.nextLong): RDD[(K, V)]
def sampleByKeyExact(withReplacement: Boolean, fractions: Map[K, Double], seed: Long = Utils.random.nextLong): RDD[(K, V)]
}import org.apache.spark.{SparkContext, SparkConf}
val sc = new SparkContext(new SparkConf().setAppName("Key-Value Examples").setMaster("local[*]"))
// Create key-value RDD from collections
val pairs = sc.parallelize(Seq(
("apple", 5), ("banana", 3), ("apple", 2), ("orange", 8), ("banana", 1)
))
// Transform regular RDD to key-value RDD
val words = sc.parallelize(Seq("hello", "world", "hello", "spark", "world"))
val wordPairs = words.map(word => (word, 1))
// From text files
val lines = sc.textFile("access.log")
val urlCounts = lines.map { line =>
val parts = line.split(" ")
val url = parts(6) // Assuming URL is 7th field
(url, 1)
}// Extract keys and values
val keys = pairs.keys.collect() // Array("apple", "banana", "apple", "orange", "banana")
val values = pairs.values.collect() // Array(5, 3, 2, 8, 1)
// Transform values while preserving keys
val discountedPrices = pairs.mapValues(_ * 0.9)
// Transform values to multiple values
val inventory = sc.parallelize(Seq(
("electronics", "laptop,phone,tablet"),
("books", "fiction,non-fiction,textbook")
))
val expandedInventory = inventory.flatMapValues(_.split(","))
// Swap keys and values
val swapped = pairs.swapKV()Groups values by key. Use with caution for large datasets as it can cause memory issues.
// Group all values by key
val grouped = pairs.groupByKey()
// Result: ("apple", Iterable(5, 2)), ("banana", Iterable(3, 1)), ("orange", Iterable(8))
// Process grouped data
val processed = grouped.mapValues { values =>
val sum = values.sum
val count = values.size
val avg = sum.toDouble / count
(sum, count, avg)
}More efficient than groupByKey for aggregation as it performs local reduction.
// Sum values by key
val totals = pairs.reduceByKey(_ + _)
// Result: ("apple", 7), ("banana", 4), ("orange", 8)
// Find maximum by key
val maxValues = pairs.reduceByKey(math.max)
// Concatenate strings by key
val textData = sc.parallelize(Seq(
("user1", "hello"), ("user2", "hi"), ("user1", "world"), ("user2", "there")
))
val concatenated = textData.reduceByKey(_ + " " + _)Like reduceByKey but with an initial zero value.
// Sum with initial value
val foldedSums = pairs.foldByKey(0)(_ + _)
// Concatenate with separator
val messages = sc.parallelize(Seq(
("error", "Connection failed"), ("error", "Timeout"), ("info", "Started"), ("info", "Completed")
))
val logMessages = messages.foldByKey("")((acc, msg) => if (acc.isEmpty) msg else acc + "; " + msg)Most flexible aggregation operation with different types for input and output.
// Calculate statistics (count, sum, sum of squares) for each key
val numbers = sc.parallelize(Seq(
("math", 85), ("math", 92), ("math", 78), ("science", 88), ("science", 95)
))
case class Stats(count: Int, sum: Double, sumSquares: Double) {
def mean = sum / count
def variance = (sumSquares / count) - (mean * mean)
}
val stats = numbers.aggregateByKey(Stats(0, 0.0, 0.0))(
seqOp = (stats, value) => Stats(
stats.count + 1,
stats.sum + value,
stats.sumSquares + value * value
),
combOp = (stats1, stats2) => Stats(
stats1.count + stats2.count,
stats1.sum + stats2.sum,
stats1.sumSquares + stats2.sumSquares
)
)
// Collect unique values per key
val data = sc.parallelize(Seq(
("A", 1), ("A", 2), ("A", 1), ("B", 3), ("B", 4), ("B", 3)
))
val uniqueValues = data.aggregateByKey(Set.empty[Int])(
seqOp = (set, value) => set + value,
combOp = (set1, set2) => set1 ++ set2
)The most general aggregation function - other aggregation functions are implemented using this.
// Calculate average by key
val scores = sc.parallelize(Seq(
("Alice", 85), ("Bob", 92), ("Alice", 78), ("Bob", 88), ("Alice", 95)
))
val averages = scores.combineByKey(
createCombiner = (score: Int) => (score, 1), // (sum, count)
mergeValue = (acc: (Int, Int), score: Int) => (acc._1 + score, acc._2 + 1),
mergeCombiners = (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).mapValues { case (sum, count) => sum.toDouble / count }Returns pairs where keys exist in both RDDs.
val customers = sc.parallelize(Seq(
(1, "Alice"), (2, "Bob"), (3, "Charlie"), (4, "David")
))
val orders = sc.parallelize(Seq(
(1, "laptop"), (2, "phone"), (1, "mouse"), (5, "tablet")
))
// Inner join - only matching keys
val customerOrders = customers.join(orders)
// Result: (1, ("Alice", "laptop")), (1, ("Alice", "mouse")), (2, ("Bob", "phone"))// Left outer join - all keys from left RDD
val leftJoin = customers.leftOuterJoin(orders)
// Result includes: (4, ("David", None))
// Right outer join - all keys from right RDD
val rightJoin = customers.rightOuterJoin(orders)
// Result includes: (5, (None, "tablet"))
// Full outer join - all keys from both RDDs
val fullJoin = customers.fullOuterJoin(orders)
// Result includes both: (4, (Some("David"), None)) and (5, (None, Some("tablet")))// Multi-way joins
val products = sc.parallelize(Seq(
(1, "Laptop"), (2, "Phone"), (3, "Tablet")
))
val prices = sc.parallelize(Seq(
(1, 999.99), (2, 499.99), (3, 299.99)
))
val inventory = sc.parallelize(Seq(
(1, 50), (2, 100), (3, 25)
))
// Chain joins for comprehensive product information
val productInfo = products
.join(prices)
.join(inventory)
.map { case (id, ((name, price), stock)) =>
(id, name, price, stock)
}Cogroup groups data from multiple RDDs by key.
val rdd1 = sc.parallelize(Seq((1, "a"), (1, "b"), (2, "c")))
val rdd2 = sc.parallelize(Seq((1, "x"), (2, "y"), (2, "z"), (3, "w")))
// Cogroup two RDDs
val cogrouped = rdd1.cogroup(rdd2)
// Result: (1, (Iterable("a", "b"), Iterable("x")))
// (2, (Iterable("c"), Iterable("y", "z")))
// (3, (Iterable(), Iterable("w")))
// Process cogrouped data
val processed = cogrouped.mapValues { case (iter1, iter2) =>
val list1 = iter1.toList
val list2 = iter2.toList
(list1.size, list2.size, list1 ++ list2)
}
// Three-way cogroup
val rdd3 = sc.parallelize(Seq((1, "p"), (3, "q")))
val threeway = rdd1.cogroup(rdd2, rdd3)val unsorted = sc.parallelize(Seq(
("banana", 3), ("apple", 5), ("orange", 1), ("apple", 2)
))
// Sort by key ascending (default)
val sortedAsc = unsorted.sortByKey()
// Sort by key descending
val sortedDesc = unsorted.sortByKey(ascending = false)
// Sort with custom number of partitions
val sortedPartitioned = unsorted.sortByKey(ascending = true, numPartitions = 4)import org.apache.spark.{HashPartitioner, RangePartitioner}
// Hash partitioning
val hashPartitioned = pairs.partitionBy(new HashPartitioner(4))
// Range partitioning (for sorted data)
val rangePartitioned = pairs.partitionBy(new RangePartitioner(4, pairs))
// Repartition and sort within partitions (more efficient than sortByKey)
val repartitionedAndSorted = pairs.repartitionAndSortWithinPartitions(new HashPartitioner(4))
// Custom partitioner
class DomainPartitioner(numPartitions: Int) extends org.apache.spark.Partitioner {
def numPartitions: Int = numPartitions
def getPartition(key: Any): Int = {
key.toString.hashCode % numPartitions match {
case partition if partition < 0 => partition + numPartitions
case partition => partition
}
}
}
val customPartitioned = pairs.partitionBy(new DomainPartitioner(8))// Collect as map (for small datasets)
val asMap = pairs.collectAsMap()
// Count by key
val keyCounts = pairs.countByKey()
// Lookup values for specific key
val appleValues = pairs.lookup("apple") // Seq(5, 2)
// Approximate count by key
val approxCounts = pairs.countByKeyApprox(timeout = 1000L)// Time-based window operations (assuming timestamp keys)
val timestampData = sc.parallelize(Seq(
(100L, "event1"), (150L, "event2"), (200L, "event3"), (250L, "event4")
))
// Group by time windows (e.g., 100ms windows)
val windowed = timestampData.map { case (timestamp, event) =>
val window = (timestamp / 100) * 100
(window, event)
}.groupByKey()val keyValueScores = sc.parallelize(Seq(
("user1", 95), ("user1", 87), ("user1", 92), ("user1", 78),
("user2", 88), ("user2", 91), ("user2", 85)
))
// Get top 2 scores per user
val topScores = keyValueScores
.groupByKey()
.mapValues(scores => scores.toSeq.sorted(Ordering.Int.reverse).take(2))// For large datasets, consider pre-partitioning both RDDs
val partitioner = new HashPartitioner(100)
val largeRDD1 = customers.partitionBy(partitioner).persist()
val largeRDD2 = orders.partitionBy(partitioner).persist()
// Join will be more efficient as data is co-located
val efficientJoin = largeRDD1.join(largeRDD2)// Stratified sampling by key
val userData = sc.parallelize(Seq(
("premium", "user1"), ("premium", "user2"), ("premium", "user3"),
("basic", "user4"), ("basic", "user5"), ("basic", "user6"), ("basic", "user7")
))
// Sample different fractions by key
val sampleFractions = Map("premium" -> 0.8, "basic" -> 0.3)
// Approximate sampling
val approxSample = userData.sampleByKey(withReplacement = false, sampleFractions, seed = 42)
// Exact sampling (guarantees exact sample sizes)
val exactSample = userData.sampleByKeyExact(withReplacement = false, sampleFractions, seed = 42)// When one RDD is small, broadcast it for efficient joins
val smallLookupTable = Map(1 -> "Category A", 2 -> "Category B", 3 -> "Category C")
val broadcastLookup = sc.broadcast(smallLookupTable)
val enrichedData = largeRDD.map { case (id, data) =>
val category = broadcastLookup.value.getOrElse(id, "Unknown")
(id, data, category)
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-core-2-11