or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdio-operations.mdkey-value-operations.mdpartitioning-shuffling.mdrdd-operations.mdshared-variables.mdspark-context.mdstorage-persistence.md
tile.json

rdd-operations.mddocs/

RDD Operations

Resilient Distributed Datasets (RDDs) provide the core abstraction for distributed data processing in Spark, offering fault-tolerant distributed collections with transformations and actions.

Capabilities

RDD Base Class

Abstract base class representing an immutable, partitioned collection of elements that can be operated on in parallel.

/**
 * Resilient Distributed Dataset (RDD) - basic abstraction in Spark
 */
abstract class RDD[T: ClassTag](
  @transient private var _sc: SparkContext,
  @transient private var deps: Seq[Dependency[_]]
) extends Serializable

// Core transformation methods (lazy evaluation)
def map[U: ClassTag](f: T => U): RDD[U]
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
def filter(f: T => Boolean): RDD[T]
def distinct(): RDD[T]
def distinct(numPartitions: Int): RDD[T]
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

// Set operations
def union(other: RDD[T]): RDD[T]
def ++(other: RDD[T]): RDD[T] // alias for union
def intersection(other: RDD[T]): RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T]): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]

// Grouping and sorting  
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null): RDD[(K, Iterable[T])]
def sortBy[K](f: T => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

// Advanced transformations
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
def glom(): RDD[Array[T]]
def keyBy[K](f: T => K): RDD[(K, T)]
def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]

// Zipping operations
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]
def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]
def zipWithIndex(): RDD[(T, Long)]
def zipWithUniqueId(): RDD[(T, Long)]

// Partitioning operations
def repartition(numPartitions: Int): RDD[T]
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]
def partitionBy(partitioner: Partitioner): RDD[T] // only for pair RDDs

// Core action methods (trigger execution)
def collect(): Array[T]
def count(): Long
def first(): T
def take(num: Int): Array[T]
def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
def max()(implicit ord: Ordering[T]): T
def min()(implicit ord: Ordering[T]): T
def isEmpty(): Boolean
def toLocalIterator: Iterator[T]

// Counting actions
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]
def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]
def countByValueApprox(timeout: Long, confidence: Double = 0.95)(implicit ord: Ordering[T] = null): PartialResult[Map[T, BoundedDouble]]
def countApproxDistinct(p: Int, sp: Int): Long
def countApproxDistinct(relativeSD: Double = 0.05): Long

// Reduction operations
def reduce(f: (T, T) => T): T
def fold(zeroValue: T)(op: (T, T) => T): T
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
def treeReduce(f: (T, T) => T, depth: Int = 2): T
def treeAggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U, depth: Int = 2): U

// Side-effect operations
def foreach(f: T => Unit): Unit
def foreachPartition(f: Iterator[T] => Unit): Unit

// Persistence methods
def persist(): RDD[T]
def persist(storageLevel: StorageLevel): RDD[T]
def cache(): RDD[T] // equivalent to persist(MEMORY_ONLY)
def unpersist(blocking: Boolean = true): RDD[T]
def getStorageLevel: StorageLevel

// Checkpointing methods
def checkpoint(): Unit
def localCheckpoint(): RDD[T]
def isCheckpointed: Boolean
def getCheckpointFile: Option[String]

// File output operations  
def saveAsTextFile(path: String): Unit
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
def saveAsObjectFile(path: String): Unit

// Piping operations
def pipe(command: String): RDD[String]
def pipe(command: String, env: Map[String, String]): RDD[String]
def pipe(command: Seq[String], env: Map[String, String] = Map(), 
         printPipeContext: (String => Unit) => Unit = null, 
         printRDDElement: (T, String => Unit) => Unit = null, 
         separateWorkingDir: Boolean = false): RDD[String]

// Metadata and utility methods
def setName(name: String): RDD[T]
val name: String
val id: Int
def partitions: Array[Partition]
def dependencies: Seq[Dependency[_]]
val partitioner: Option[Partitioner]  
def getNumPartitions: Int
def sparkContext: SparkContext
def context: SparkContext  // alias for sparkContext
def toJavaRDD(): JavaRDD[T]
def toDebugString: String

// Special collection methods
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U]

Usage Examples:

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel

val sc = new SparkContext(new SparkConf().setAppName("RDD Examples").setMaster("local[*]"))

// Create RDD from collection
val numbers = sc.parallelize(1 to 100)

// Transformations (lazy evaluation)
val evenNumbers = numbers.filter(_ % 2 == 0)
val squares = evenNumbers.map(x => x * x)
val distinct = squares.distinct()

// Actions (trigger execution)
val result = squares.collect()
val count = squares.count()
val sum = squares.reduce(_ + _)
val firstTen = squares.take(10)

// Complex transformations
val words = sc.textFile("hdfs://path/to/file.txt")
val wordCounts = words
  .flatMap(_.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _)

// Persistence for reuse
val cachedRDD = numbers.map(_ * 2).cache()
val result1 = cachedRDD.sum()
val result2 = cachedRDD.max() // Uses cached data

// Advanced operations
val grouped = numbers.groupBy(_ % 10)
val sorted = numbers.sortBy(identity, ascending = false)
val sampled = numbers.sample(withReplacement = false, 0.1)

Numeric RDD Operations (DoubleRDDFunctions)

Additional operations available for RDDs containing numeric values.

/**
 * Extra functions available on RDDs of Doubles through an implicit conversion
 */
class DoubleRDDFunctions(self: RDD[Double]) {
  /** Compute the mean of this RDD's elements */
  def mean(): Double
  
  /** Compute the sum of this RDD's elements */
  def sum(): Double
  
  /** Find the minimum element in this RDD */
  def min(): Double
  
  /** Find the maximum element in this RDD */
  def max(): Double
  
  /** Compute the variance of this RDD's elements */
  def variance(): Double
  
  /** Compute the standard deviation of this RDD's elements */
  def stdev(): Double
  
  /** Compute the sample standard deviation of this RDD's elements */
  def sampleStdev(): Double
  
  /** Compute the sample variance of this RDD's elements */
  def sampleVariance(): Double
  
  /** Compute a histogram of the RDD using buckets evenly spaced between the minimum and maximum */
  def histogram(buckets: Int): (Array[Double], Array[Long])
  
  /** Compute a histogram using provided bucket boundaries */
  def histogram(buckets: Array[Double]): Array[Long]
  
  /** Return approximate percentiles */
  def approxQuantile(probabilities: Array[Double], relativeError: Double): Array[Double]
  
  /** Compute column summary statistics */
  def stats(): StatCounter
}

Usage Examples:

val doubleRDD = sc.parallelize(Array(1.0, 2.5, 3.7, 4.2, 5.8, 6.1))

// Statistical operations (implicit conversion)
val mean = doubleRDD.mean()
val stdDev = doubleRDD.stdev()
val variance = doubleRDD.variance()
val summary = doubleRDD.stats()

// Histogram
val (buckets, counts) = doubleRDD.histogram(5)
println(s"Histogram: ${buckets.zip(counts).mkString(", ")}")

Asynchronous RDD Actions

Asynchronous versions of RDD actions that return futures for non-blocking execution.

/**
 * Extra functions for performing asynchronous operations on RDDs
 */
class AsyncRDDActions[T: ClassTag](self: RDD[T]) {
  /** Returns a future for retrieving all elements of this RDD */
  def collectAsync(): FutureAction[Array[T]]
  
  /** Returns a future for counting the number of elements in the RDD */
  def countAsync(): FutureAction[Long]
  
  /** Returns a future for retrieving the first element in this RDD */
  def foreachAsync(f: T => Unit): FutureAction[Unit]
  
  /** Returns a future for applying a function to each partition of this RDD */
  def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit]
  
  /** Returns a future for retrieving the first num elements of the RDD */
  def takeAsync(num: Int): FutureAction[Array[T]]
}

Partition-Level Operations

Operations that work at the partition level for performance optimization.

// Partition-level transformation methods in RDD
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
def foreachPartition(f: Iterator[T] => Unit): Unit
def glom(): RDD[Array[T]] // convert each partition into an array
def pipe(command: String): RDD[String]
def pipe(command: Seq[String]): RDD[String]

Usage Examples:

val data = sc.parallelize(1 to 100, 4) // 4 partitions

// Process each partition independently
val partitionSums = data.mapPartitions { iter =>
  val sum = iter.sum
  Iterator(sum)
}

// Access partition index
val partitionInfo = data.mapPartitionsWithIndex { (index, iter) =>
  val count = iter.size
  Iterator((index, count))
}

// Convert partitions to arrays
val partitionArrays = data.glom()
val arrays = partitionArrays.collect() // Array of arrays

// External command processing
val piped = data.pipe("grep '5'")

Zip Operations

Operations for combining RDDs element-wise.

// Zip operations in RDD
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
def zipWithIndex(): RDD[(T, Long)]
def zipWithUniqueId(): RDD[(T, Long)]
def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]
def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V]

Usage Examples:

val rdd1 = sc.parallelize(Array("a", "b", "c"))
val rdd2 = sc.parallelize(Array(1, 2, 3))

// Zip two RDDs
val zipped = rdd1.zip(rdd2) // RDD[("a", 1), ("b", 2), ("c", 3)]

// Zip with index
val withIndex = rdd1.zipWithIndex() // RDD[("a", 0), ("b", 1), ("c", 2)]

// Zip with unique ID
val withUniqueId = rdd1.zipWithUniqueId()

// Custom zip operation on partitions
val customZip = rdd1.zipPartitions(rdd2) { (iter1, iter2) =>
  iter1.zip(iter2).map { case (str, num) => s"$str-$num" }
}

Common RDD Patterns

Efficient Data Processing Pipeline

val input = sc.textFile("hdfs://input/data.txt")

val processed = input
  .filter(_.nonEmpty) // Remove empty lines
  .map(_.toLowerCase.trim) // Normalize
  .flatMap(_.split("\\s+")) // Split into words
  .filter(_.length > 3) // Filter short words
  .map((_, 1)) // Create pairs
  .reduceByKey(_ + _) // Count occurrences
  .filter(_._2 > 10) // Filter rare words
  .sortBy(_._2, ascending = false) // Sort by count
  .cache() // Cache for reuse

// Multiple actions on cached RDD
val topWords = processed.take(100)
val totalUnique = processed.count()
processed.saveAsTextFile("hdfs://output/results")

Error Handling and Debugging

import org.apache.spark.TaskContext

val dataRDD = sc.parallelize(1 to 1000)

// Add debugging information
val debugRDD = dataRDD.mapPartitionsWithIndex { (partitionId, iter) =>
  val taskContext = TaskContext.get()
  println(s"Processing partition $partitionId on ${taskContext.taskAttemptId()}")
  
  iter.map { value =>
    try {
      // Some processing that might fail
      if (value % 100 == 0) throw new RuntimeException(s"Error processing $value")
      value * 2
    } catch {
      case e: Exception =>
        println(s"Error in partition $partitionId: ${e.getMessage}")
        -1 // Error marker
    }
  }
}

// Filter out error markers and get debug info
val cleanData = debugRDD.filter(_ != -1)
println(s"Debug info: ${dataRDD.toDebugString}")