CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-core-2-11

Apache Spark Core - The foundational component of Apache Spark providing distributed computing capabilities including RDDs, transformations, actions, and cluster management.

Pending
Overview
Eval results
Files

key-value-operations.mddocs/

Key-Value Operations

Key-value operations in Spark are performed on RDDs of type RDD[(K, V)] where K is the key type and V is the value type. These operations are made available through implicit conversions to PairRDDFunctions, providing powerful aggregation, grouping, and join capabilities.

PairRDDFunctions

class PairRDDFunctions[K, V](self: RDD[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) {
  // Basic Key-Value Operations
  def keys: RDD[K]
  def values: RDD[V]
  def mapValues[U](f: V => U): RDD[(K, U)]
  def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]
  def swapKV()(implicit vt: ClassTag[V], kt: ClassTag[K]): RDD[(V, K)]
  
  // Grouping Operations
  def groupByKey(): RDD[(K, Iterable[V])]
  def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
  def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
  
  // Reduction Operations
  def reduceByKey(func: (V, V) => V): RDD[(K, V)]
  def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
  def reduceByKeyLocally(func: (V, V) => V): Map[K, V]
  
  def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
  def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
  def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
  
  def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
  def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
  def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
  
  def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
  def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner): RDD[(K, C)]
  def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
  
  // Join Operations
  def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
  def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
  def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
  
  def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
  def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]
  def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]
  
  def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
  def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]
  def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]
  
  def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
  def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]
  def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]
  
  // Cogroup Operations
  def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
  def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]
  def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
  def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
  def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
  def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
  def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
  
  // Sorting Operations
  def sortByKey(ascending: Boolean = true): RDD[(K, V)]
  def sortByKey(ascending: Boolean, numPartitions: Int): RDD[(K, V)]
  def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]
  
  // Partitioning Operations
  def partitionBy(partitioner: Partitioner): RDD[(K, V)]
  
  // Collection Operations
  def collectAsMap(): Map[K, V]
  def countByKey(): Map[K, Long]
  def countByKeyApprox(timeout: Long, confidence: Double = 0.95): PartialResult[Map[K, BoundedDouble]]
  
  // Lookup Operations
  def lookup(key: K): Seq[V]
  
  // Subtraction Operations
  def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
  def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
  def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]
  
  // Sampling Operations
  def sampleByKey(withReplacement: Boolean, fractions: Map[K, Double], seed: Long = Utils.random.nextLong): RDD[(K, V)]
  def sampleByKeyExact(withReplacement: Boolean, fractions: Map[K, Double], seed: Long = Utils.random.nextLong): RDD[(K, V)]
}

Basic Key-Value Operations

Creating Key-Value RDDs

import org.apache.spark.{SparkContext, SparkConf}

val sc = new SparkContext(new SparkConf().setAppName("Key-Value Examples").setMaster("local[*]"))

// Create key-value RDD from collections
val pairs = sc.parallelize(Seq(
  ("apple", 5), ("banana", 3), ("apple", 2), ("orange", 8), ("banana", 1)
))

// Transform regular RDD to key-value RDD
val words = sc.parallelize(Seq("hello", "world", "hello", "spark", "world"))
val wordPairs = words.map(word => (word, 1))

// From text files
val lines = sc.textFile("access.log")
val urlCounts = lines.map { line =>
  val parts = line.split(" ")
  val url = parts(6) // Assuming URL is 7th field
  (url, 1)
}

Basic Operations

// Extract keys and values
val keys = pairs.keys.collect()      // Array("apple", "banana", "apple", "orange", "banana")
val values = pairs.values.collect()  // Array(5, 3, 2, 8, 1)

// Transform values while preserving keys
val discountedPrices = pairs.mapValues(_ * 0.9)

// Transform values to multiple values
val inventory = sc.parallelize(Seq(
  ("electronics", "laptop,phone,tablet"),
  ("books", "fiction,non-fiction,textbook")
))
val expandedInventory = inventory.flatMapValues(_.split(","))

// Swap keys and values
val swapped = pairs.swapKV()

Aggregation Operations

GroupByKey

Groups values by key. Use with caution for large datasets as it can cause memory issues.

// Group all values by key
val grouped = pairs.groupByKey()
// Result: ("apple", Iterable(5, 2)), ("banana", Iterable(3, 1)), ("orange", Iterable(8))

// Process grouped data
val processed = grouped.mapValues { values =>
  val sum = values.sum
  val count = values.size
  val avg = sum.toDouble / count
  (sum, count, avg)
}

ReduceByKey

More efficient than groupByKey for aggregation as it performs local reduction.

// Sum values by key
val totals = pairs.reduceByKey(_ + _)
// Result: ("apple", 7), ("banana", 4), ("orange", 8)

// Find maximum by key
val maxValues = pairs.reduceByKey(math.max)

// Concatenate strings by key
val textData = sc.parallelize(Seq(
  ("user1", "hello"), ("user2", "hi"), ("user1", "world"), ("user2", "there")
))
val concatenated = textData.reduceByKey(_ + " " + _)

FoldByKey

Like reduceByKey but with an initial zero value.

// Sum with initial value
val foldedSums = pairs.foldByKey(0)(_ + _)

// Concatenate with separator
val messages = sc.parallelize(Seq(
  ("error", "Connection failed"), ("error", "Timeout"), ("info", "Started"), ("info", "Completed")
))
val logMessages = messages.foldByKey("")((acc, msg) => if (acc.isEmpty) msg else acc + "; " + msg)

AggregateByKey

Most flexible aggregation operation with different types for input and output.

// Calculate statistics (count, sum, sum of squares) for each key
val numbers = sc.parallelize(Seq(
  ("math", 85), ("math", 92), ("math", 78), ("science", 88), ("science", 95)
))

case class Stats(count: Int, sum: Double, sumSquares: Double) {
  def mean = sum / count
  def variance = (sumSquares / count) - (mean * mean)
}

val stats = numbers.aggregateByKey(Stats(0, 0.0, 0.0))(
  seqOp = (stats, value) => Stats(
    stats.count + 1,
    stats.sum + value,
    stats.sumSquares + value * value
  ),
  combOp = (stats1, stats2) => Stats(
    stats1.count + stats2.count,
    stats1.sum + stats2.sum,
    stats1.sumSquares + stats2.sumSquares
  )
)

// Collect unique values per key
val data = sc.parallelize(Seq(
  ("A", 1), ("A", 2), ("A", 1), ("B", 3), ("B", 4), ("B", 3)
))
val uniqueValues = data.aggregateByKey(Set.empty[Int])(
  seqOp = (set, value) => set + value,
  combOp = (set1, set2) => set1 ++ set2
)

CombineByKey

The most general aggregation function - other aggregation functions are implemented using this.

// Calculate average by key
val scores = sc.parallelize(Seq(
  ("Alice", 85), ("Bob", 92), ("Alice", 78), ("Bob", 88), ("Alice", 95)
))

val averages = scores.combineByKey(
  createCombiner = (score: Int) => (score, 1),  // (sum, count)
  mergeValue = (acc: (Int, Int), score: Int) => (acc._1 + score, acc._2 + 1),
  mergeCombiners = (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).mapValues { case (sum, count) => sum.toDouble / count }

Join Operations

Inner Join

Returns pairs where keys exist in both RDDs.

val customers = sc.parallelize(Seq(
  (1, "Alice"), (2, "Bob"), (3, "Charlie"), (4, "David")
))

val orders = sc.parallelize(Seq(
  (1, "laptop"), (2, "phone"), (1, "mouse"), (5, "tablet")
))

// Inner join - only matching keys
val customerOrders = customers.join(orders)
// Result: (1, ("Alice", "laptop")), (1, ("Alice", "mouse")), (2, ("Bob", "phone"))

Outer Joins

// Left outer join - all keys from left RDD
val leftJoin = customers.leftOuterJoin(orders)
// Result includes: (4, ("David", None))

// Right outer join - all keys from right RDD  
val rightJoin = customers.rightOuterJoin(orders)
// Result includes: (5, (None, "tablet"))

// Full outer join - all keys from both RDDs
val fullJoin = customers.fullOuterJoin(orders)
// Result includes both: (4, (Some("David"), None)) and (5, (None, Some("tablet")))

Complex Join Example

// Multi-way joins
val products = sc.parallelize(Seq(
  (1, "Laptop"), (2, "Phone"), (3, "Tablet")
))

val prices = sc.parallelize(Seq(
  (1, 999.99), (2, 499.99), (3, 299.99)
))

val inventory = sc.parallelize(Seq(
  (1, 50), (2, 100), (3, 25)
))

// Chain joins for comprehensive product information
val productInfo = products
  .join(prices)
  .join(inventory)
  .map { case (id, ((name, price), stock)) =>
    (id, name, price, stock)
  }

Cogroup Operations

Cogroup groups data from multiple RDDs by key.

val rdd1 = sc.parallelize(Seq((1, "a"), (1, "b"), (2, "c")))
val rdd2 = sc.parallelize(Seq((1, "x"), (2, "y"), (2, "z"), (3, "w")))

// Cogroup two RDDs
val cogrouped = rdd1.cogroup(rdd2)
// Result: (1, (Iterable("a", "b"), Iterable("x")))
//         (2, (Iterable("c"), Iterable("y", "z")))
//         (3, (Iterable(), Iterable("w")))

// Process cogrouped data
val processed = cogrouped.mapValues { case (iter1, iter2) =>
  val list1 = iter1.toList
  val list2 = iter2.toList
  (list1.size, list2.size, list1 ++ list2)
}

// Three-way cogroup
val rdd3 = sc.parallelize(Seq((1, "p"), (3, "q")))
val threeway = rdd1.cogroup(rdd2, rdd3)

Sorting and Partitioning

SortByKey

val unsorted = sc.parallelize(Seq(
  ("banana", 3), ("apple", 5), ("orange", 1), ("apple", 2)
))

// Sort by key ascending (default)
val sortedAsc = unsorted.sortByKey()

// Sort by key descending
val sortedDesc = unsorted.sortByKey(ascending = false)

// Sort with custom number of partitions
val sortedPartitioned = unsorted.sortByKey(ascending = true, numPartitions = 4)

Custom Partitioning

import org.apache.spark.{HashPartitioner, RangePartitioner}

// Hash partitioning
val hashPartitioned = pairs.partitionBy(new HashPartitioner(4))

// Range partitioning (for sorted data)
val rangePartitioned = pairs.partitionBy(new RangePartitioner(4, pairs))

// Repartition and sort within partitions (more efficient than sortByKey)
val repartitionedAndSorted = pairs.repartitionAndSortWithinPartitions(new HashPartitioner(4))

// Custom partitioner
class DomainPartitioner(numPartitions: Int) extends org.apache.spark.Partitioner {
  def numPartitions: Int = numPartitions
  def getPartition(key: Any): Int = {
    key.toString.hashCode % numPartitions match {
      case partition if partition < 0 => partition + numPartitions
      case partition => partition
    }
  }
}

val customPartitioned = pairs.partitionBy(new DomainPartitioner(8))

Collection and Lookup Operations

// Collect as map (for small datasets)
val asMap = pairs.collectAsMap()

// Count by key
val keyCounts = pairs.countByKey()

// Lookup values for specific key
val appleValues = pairs.lookup("apple")  // Seq(5, 2)

// Approximate count by key
val approxCounts = pairs.countByKeyApprox(timeout = 1000L)

Advanced Patterns

Window Operations

// Time-based window operations (assuming timestamp keys)
val timestampData = sc.parallelize(Seq(
  (100L, "event1"), (150L, "event2"), (200L, "event3"), (250L, "event4")
))

// Group by time windows (e.g., 100ms windows)
val windowed = timestampData.map { case (timestamp, event) =>
  val window = (timestamp / 100) * 100
  (window, event)
}.groupByKey()

Top-K by Key

val keyValueScores = sc.parallelize(Seq(
  ("user1", 95), ("user1", 87), ("user1", 92), ("user1", 78),
  ("user2", 88), ("user2", 91), ("user2", 85)
))

// Get top 2 scores per user
val topScores = keyValueScores
  .groupByKey()
  .mapValues(scores => scores.toSeq.sorted(Ordering.Int.reverse).take(2))

Efficient Large Joins

// For large datasets, consider pre-partitioning both RDDs
val partitioner = new HashPartitioner(100)

val largeRDD1 = customers.partitionBy(partitioner).persist()
val largeRDD2 = orders.partitionBy(partitioner).persist()

// Join will be more efficient as data is co-located
val efficientJoin = largeRDD1.join(largeRDD2)

Sampling Operations

// Stratified sampling by key
val userData = sc.parallelize(Seq(
  ("premium", "user1"), ("premium", "user2"), ("premium", "user3"),
  ("basic", "user4"), ("basic", "user5"), ("basic", "user6"), ("basic", "user7")
))

// Sample different fractions by key
val sampleFractions = Map("premium" -> 0.8, "basic" -> 0.3)

// Approximate sampling
val approxSample = userData.sampleByKey(withReplacement = false, sampleFractions, seed = 42)

// Exact sampling (guarantees exact sample sizes)
val exactSample = userData.sampleByKeyExact(withReplacement = false, sampleFractions, seed = 42)

Broadcast Hash Join Pattern

// When one RDD is small, broadcast it for efficient joins
val smallLookupTable = Map(1 -> "Category A", 2 -> "Category B", 3 -> "Category C")
val broadcastLookup = sc.broadcast(smallLookupTable)

val enrichedData = largeRDD.map { case (id, data) =>
  val category = broadcastLookup.value.getOrElse(id, "Unknown")
  (id, data, category)
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-core-2-11

docs

broadcast-accumulators.md

context-configuration.md

index.md

java-api.md

key-value-operations.md

rdd-operations.md

status-monitoring.md

storage-persistence.md

task-context.md

tile.json