or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdio-operations.mdkey-value-operations.mdpartitioning-shuffling.mdrdd-operations.mdshared-variables.mdspark-context.mdstorage-persistence.md
tile.json

key-value-operations.mddocs/

Key-Value Operations

Specialized operations for RDDs containing key-value pairs, providing powerful data processing capabilities including joins, grouping, and aggregation operations.

Capabilities

PairRDDFunctions

Extra functions available on RDDs of (key, value) pairs through implicit conversion.

/**
 * Extra functions available on RDDs of (key, value) pairs through an implicit conversion
 */
class PairRDDFunctions[K, V](self: RDD[(K, V)]) {
  // Grouping operations
  def groupByKey(): RDD[(K, Iterable[V])]
  def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
  def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
  
  // Reduction operations
  def reduceByKey(func: (V, V) => V): RDD[(K, V)]
  def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
  def reduceByKeyLocally(func: (V, V) => V): Map[K, V]
  
  // Aggregation operations
  def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
  def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
  def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
  def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
  def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
  def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
  
  // Combine operations
  def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
  def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
  def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner): RDD[(K, C)]
  
  // Join operations
  def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
  def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
  def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
  def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
  def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]
  def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]
  def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
  def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]
  def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]
  def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
  def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]
  def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]
  
  // CoGroup operations
  def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
  def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
  def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]
  def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
  def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
  def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
  
  // Set operations
  def subtractByKey[W](other: RDD[(K, W)]): RDD[(K, V)]
  def subtractByKey[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
  def subtractByKey[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, V)]
  
  // Partitioning operations
  def partitionBy(partitioner: Partitioner): RDD[(K, V)]
  
  // Utility operations  
  def keys: RDD[K]
  def values: RDD[V]
  def mapValues[U](f: V => U): RDD[(K, U)]
  def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]
  def collectAsMap(): Map[K, V]
  def countByKey(): Map[K, Long]
  def countByKeyApprox(timeout: Long, confidence: Double = 0.95): PartialResult[Map[K, BoundedDouble]]
  
  // Lookup operations
  def lookup(key: K): Seq[V]
}

Usage Examples:

import org.apache.spark.{SparkContext, SparkConf}

val sc = new SparkContext(new SparkConf().setAppName("KeyValue Examples").setMaster("local[*]"))

// Create key-value RDDs
val pairs1 = sc.parallelize(Array(("a", 1), ("b", 2), ("a", 3), ("c", 4)))
val pairs2 = sc.parallelize(Array(("a", "x"), ("b", "y"), ("d", "z")))

// Grouping operations
val grouped = pairs1.groupByKey()
// Result: ("a", [1, 3]), ("b", [2]), ("c", [4])

// Reduction operations
val sumByKey = pairs1.reduceByKey(_ + _)
// Result: ("a", 4), ("b", 2), ("c", 4)

val counts = sc.textFile("hdfs://input.txt")
  .flatMap(_.split(" "))
  .map((_, 1))
  .reduceByKey(_ + _)

// Aggregation operations
val avgByKey = pairs1.aggregateByKey((0, 0))(
  (acc, value) => (acc._1 + value, acc._2 + 1),
  (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).mapValues(acc => acc._1.toDouble / acc._2)

// Join operations
val innerJoin = pairs1.join(pairs2)
// Result: ("a", (1, "x")), ("a", (3, "x")), ("b", (2, "y"))

val leftJoin = pairs1.leftOuterJoin(pairs2)
// Result: ("a", (1, Some("x"))), ("a", (3, Some("x"))), ("b", (2, Some("y"))), ("c", (4, None))

val rightJoin = pairs1.rightOuterJoin(pairs2)
val fullJoin = pairs1.fullOuterJoin(pairs2)

// Utility operations
val keys = pairs1.keys // RDD["a", "b", "a", "c"]
val values = pairs1.values // RDD[1, 2, 3, 4]
val doubled = pairs1.mapValues(_ * 2) // ("a", 2), ("b", 4), ("a", 6), ("c", 8)

// Collection operations
val asMap = pairs1.collectAsMap() // Map("a" -> 3, "b" -> 2, "c" -> 4) - note: may lose duplicates
val keyCounts = pairs1.countByKey() // Map("a" -> 2, "b" -> 1, "c" -> 1)

// Lookup specific key
val aValues = pairs1.lookup("a") // Seq(1, 3)

Sorting Operations (OrderedRDDFunctions)

Additional functions available on RDDs where the key is sortable.

/**
 * Extra functions available on RDDs where the key is sortable
 */
class OrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag, P <: Product2[K, V] : ClassTag](
  self: RDD[P]) {
  
  /**
   * Sort the RDD by key, with each partition containing a sorted range of elements
   */
  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
  
  /**
   * Repartition the RDD according to the given partitioner and sort records by their keys within each partition
   */
  def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]
  
  /**
   * Return the key-value pairs in this RDD to the master as a Map
   */
  def collectAsMap(): Map[K, V]
}

Usage Examples:

val wordCounts = sc.textFile("hdfs://input.txt")
  .flatMap(_.split(" "))
  .map((_, 1))
  .reduceByKey(_ + _)

// Sort by key (alphabetically)
val sortedByWord = wordCounts.sortByKey(ascending = true)

// Sort by value (frequency) - need to swap key-value
val sortedByCount = wordCounts
  .map(_.swap) // (count, word)
  .sortByKey(ascending = false) // sort by count descending
  .map(_.swap) // back to (word, count)

// Efficient repartition and sort
import org.apache.spark.HashPartitioner
val partitioned = wordCounts.repartitionAndSortWithinPartitions(new HashPartitioner(4))

SequenceFile Operations

Functions for saving RDDs as Hadoop SequenceFiles.

/**
 * Extra functions for saving RDDs as Hadoop SequenceFiles
 */
class SequenceFileRDDFunctions[K, V](self: RDD[(K, V)]) {
  /**
   * Output the RDD to any Hadoop-supported file system, using a Hadoop JobConf object for configuration
   */
  def saveAsHadoopFile[F <: OutputFormat[K, V]](
    path: String,
    keyClass: Class[K],
    valueClass: Class[V],
    outputFormatClass: Class[F],
    conf: JobConf = new JobConf(self.context.hadoopConfiguration),
    codec: Option[Class[_ <: CompressionCodec]] = None): Unit
  
  /**
   * Output the RDD to any Hadoop-supported file system, using the new Hadoop API
   */
  def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](
    path: String,
    keyClass: Class[K],
    valueClass: Class[V],
    outputFormatClass: Class[F],
    conf: Configuration = self.context.hadoopConfiguration): Unit
  
  /**
   * Output the RDD as a Hadoop SequenceFile
   */
  def saveAsSequenceFile(
    path: String,
    codec: Option[Class[_ <: CompressionCodec]] = None): Unit
}

Advanced Key-Value Patterns

Complex Aggregations

case class Sale(product: String, amount: Double, quantity: Int)

val sales = sc.parallelize(Array(
  Sale("laptop", 999.99, 1),
  Sale("mouse", 29.99, 5),
  Sale("laptop", 1199.99, 2),
  Sale("mouse", 29.99, 3)
))

val salesByProduct = sales
  .map(sale => (sale.product, sale))
  .aggregateByKey((0.0, 0))(
    // Sequence operation: aggregate within partition
    (acc, sale) => (acc._1 + sale.amount * sale.quantity, acc._2 + sale.quantity),
    // Combine operation: combine across partitions
    (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
  )
  .mapValues { case (totalRevenue, totalQuantity) =>
    (totalRevenue, totalQuantity, if (totalQuantity > 0) totalRevenue / totalQuantity else 0.0)
  }

// Result: (product, (totalRevenue, totalQuantity, avgPricePerUnit))

Efficient Joins with Broadcasting

import org.apache.spark.broadcast.Broadcast

// Large dataset
val transactions = sc.textFile("hdfs://transactions.txt")
  .map(_.split(","))
  .map(fields => (fields(0), fields(1).toDouble)) // (userId, amount)

// Small lookup table
val userProfiles = sc.textFile("hdfs://users.txt")
  .map(_.split(","))
  .map(fields => (fields(0), fields(1))) // (userId, name)
  .collectAsMap()

// Broadcast small dataset for efficient lookup
val broadcastProfiles: Broadcast[Map[String, String]] = sc.broadcast(userProfiles)

// Use broadcast variable instead of join
val enrichedTransactions = transactions.map { case (userId, amount) =>
  val userName = broadcastProfiles.value.getOrElse(userId, "Unknown")
  (userId, userName, amount)
}

Custom Partitioning for Performance

import org.apache.spark.Partitioner

// Custom partitioner for geographic data
class RegionPartitioner(numPartitions: Int) extends Partitioner {
  override def numPartitions: Int = numPartitions
  
  override def getPartition(key: Any): Int = {
    val region = key.asInstanceOf[String]
    region match {
      case r if r.startsWith("US") => 0
      case r if r.startsWith("EU") => 1  
      case r if r.startsWith("ASIA") => 2
      case _ => 3
    }
  }
}

val locationData = sc.parallelize(Array(
  ("US-CA", "data1"), ("EU-DE", "data2"), ("ASIA-JP", "data3")
))

// Apply custom partitioning
val partitioned = locationData.partitionBy(new RegionPartitioner(4))

// Subsequent joins will be more efficient
val moreLocationData = sc.parallelize(Array(
  ("US-CA", "moredata1"), ("EU-DE", "moredata2")
)).partitionBy(new RegionPartitioner(4))

val joined = partitioned.join(moreLocationData) // No shuffle needed!

Window Operations with groupByKey

case class Event(timestamp: Long, userId: String, action: String)

val events = sc.parallelize(Array(
  Event(1000, "user1", "login"),
  Event(1010, "user1", "click"),
  Event(1020, "user1", "logout"),
  Event(1005, "user2", "login"),
  Event(1015, "user2", "click")
))

// Group events by user and process in time windows
val userSessions = events
  .map(event => (event.userId, event))
  .groupByKey()
  .mapValues { events =>
    val sortedEvents = events.toList.sortBy(_.timestamp)
    
    // Define session boundaries (30 second timeout)
    val sessions = sortedEvents.foldLeft(List.empty[List[Event]]) { (sessions, event) =>
      sessions match {
        case Nil => List(List(event))
        case currentSession :: otherSessions =>
          if (event.timestamp - currentSession.last.timestamp <= 30000) {
            // Add to current session
            (currentSession :+ event) :: otherSessions
          } else {
            // Start new session
            List(event) :: sessions
          }
      }
    }
    
    sessions.reverse.map(_.length) // Session lengths
  }

Performance Considerations

Avoiding groupByKey

// INEFFICIENT: groupByKey followed by reduction
val wordCounts1 = words
  .map((_, 1))
  .groupByKey() // Shuffles all data
  .mapValues(_.sum) // Reduces after shuffle

// EFFICIENT: Use reduceByKey instead
val wordCounts2 = words
  .map((_, 1))
  .reduceByKey(_ + _) // Reduces before shuffle

Choosing the Right Operation

val data = sc.parallelize(Array(("a", 1), ("b", 2), ("a", 3)))

// Use combineByKey for complex aggregations
val stats = data.combineByKey(
  (v: Int) => (v, 1), // Create combiner
  (acc: (Int, Int), v: Int) => (acc._1 + v, acc._2 + 1), // Merge value
  (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // Merge combiners
).mapValues(acc => (acc._1, acc._2, acc._1.toDouble / acc._2)) // (sum, count, avg)

// Use aggregateByKey when you need different types
val letterStats = data.aggregateByKey("")(
  (acc, v) => if (acc.isEmpty) v.toString else acc + "," + v,
  (acc1, acc2) => acc1 + ";" + acc2
)