Resilient Distributed Datasets (RDDs) provide the core abstraction for distributed data processing in Spark, offering fault-tolerant distributed collections with transformations and actions.
Abstract base class representing an immutable, partitioned collection of elements that can be operated on in parallel.
/**
* Resilient Distributed Dataset (RDD) - basic abstraction in Spark
*/
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable
// Core transformation methods (lazy evaluation)
def map[U: ClassTag](f: T => U): RDD[U]
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
def filter(f: T => Boolean): RDD[T]
def distinct(): RDD[T]
def distinct(numPartitions: Int): RDD[T]
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
// Set operations
def union(other: RDD[T]): RDD[T]
def ++(other: RDD[T]): RDD[T] // alias for union
def intersection(other: RDD[T]): RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T]): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
// Grouping and sorting
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null): RDD[(K, Iterable[T])]
def sortBy[K](f: T => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
// Advanced transformations
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
def glom(): RDD[Array[T]]
def keyBy[K](f: T => K): RDD[(K, T)]
def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]
// Zipping operations
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]
def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]
def zipWithIndex(): RDD[(T, Long)]
def zipWithUniqueId(): RDD[(T, Long)]
// Partitioning operations
def repartition(numPartitions: Int): RDD[T]
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]
def partitionBy(partitioner: Partitioner): RDD[T] // only for pair RDDs
// Core action methods (trigger execution)
def collect(): Array[T]
def count(): Long
def first(): T
def take(num: Int): Array[T]
def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
def max()(implicit ord: Ordering[T]): T
def min()(implicit ord: Ordering[T]): T
def isEmpty(): Boolean
def toLocalIterator: Iterator[T]
// Counting actions
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]
def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]
def countByValueApprox(timeout: Long, confidence: Double = 0.95)(implicit ord: Ordering[T] = null): PartialResult[Map[T, BoundedDouble]]
def countApproxDistinct(p: Int, sp: Int): Long
def countApproxDistinct(relativeSD: Double = 0.05): Long
// Reduction operations
def reduce(f: (T, T) => T): T
def fold(zeroValue: T)(op: (T, T) => T): T
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
def treeReduce(f: (T, T) => T, depth: Int = 2): T
def treeAggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U, depth: Int = 2): U
// Side-effect operations
def foreach(f: T => Unit): Unit
def foreachPartition(f: Iterator[T] => Unit): Unit
// Persistence methods
def persist(): RDD[T]
def persist(storageLevel: StorageLevel): RDD[T]
def cache(): RDD[T] // equivalent to persist(MEMORY_ONLY)
def unpersist(blocking: Boolean = true): RDD[T]
def getStorageLevel: StorageLevel
// Checkpointing methods
def checkpoint(): Unit
def localCheckpoint(): RDD[T]
def isCheckpointed: Boolean
def getCheckpointFile: Option[String]
// File output operations
def saveAsTextFile(path: String): Unit
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
def saveAsObjectFile(path: String): Unit
// Piping operations
def pipe(command: String): RDD[String]
def pipe(command: String, env: Map[String, String]): RDD[String]
def pipe(command: Seq[String], env: Map[String, String] = Map(),
printPipeContext: (String => Unit) => Unit = null,
printRDDElement: (T, String => Unit) => Unit = null,
separateWorkingDir: Boolean = false): RDD[String]
// Metadata and utility methods
def setName(name: String): RDD[T]
val name: String
val id: Int
def partitions: Array[Partition]
def dependencies: Seq[Dependency[_]]
val partitioner: Option[Partitioner]
def getNumPartitions: Int
def sparkContext: SparkContext
def context: SparkContext // alias for sparkContext
def toJavaRDD(): JavaRDD[T]
def toDebugString: String
// Special collection methods
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U]Usage Examples:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
val sc = new SparkContext(new SparkConf().setAppName("RDD Examples").setMaster("local[*]"))
// Create RDD from collection
val numbers = sc.parallelize(1 to 100)
// Transformations (lazy evaluation)
val evenNumbers = numbers.filter(_ % 2 == 0)
val squares = evenNumbers.map(x => x * x)
val distinct = squares.distinct()
// Actions (trigger execution)
val result = squares.collect()
val count = squares.count()
val sum = squares.reduce(_ + _)
val firstTen = squares.take(10)
// Complex transformations
val words = sc.textFile("hdfs://path/to/file.txt")
val wordCounts = words
.flatMap(_.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
// Persistence for reuse
val cachedRDD = numbers.map(_ * 2).cache()
val result1 = cachedRDD.sum()
val result2 = cachedRDD.max() // Uses cached data
// Advanced operations
val grouped = numbers.groupBy(_ % 10)
val sorted = numbers.sortBy(identity, ascending = false)
val sampled = numbers.sample(withReplacement = false, 0.1)Additional operations available for RDDs containing numeric values.
/**
* Extra functions available on RDDs of Doubles through an implicit conversion
*/
class DoubleRDDFunctions(self: RDD[Double]) {
/** Compute the mean of this RDD's elements */
def mean(): Double
/** Compute the sum of this RDD's elements */
def sum(): Double
/** Find the minimum element in this RDD */
def min(): Double
/** Find the maximum element in this RDD */
def max(): Double
/** Compute the variance of this RDD's elements */
def variance(): Double
/** Compute the standard deviation of this RDD's elements */
def stdev(): Double
/** Compute the sample standard deviation of this RDD's elements */
def sampleStdev(): Double
/** Compute the sample variance of this RDD's elements */
def sampleVariance(): Double
/** Compute a histogram of the RDD using buckets evenly spaced between the minimum and maximum */
def histogram(buckets: Int): (Array[Double], Array[Long])
/** Compute a histogram using provided bucket boundaries */
def histogram(buckets: Array[Double]): Array[Long]
/** Return approximate percentiles */
def approxQuantile(probabilities: Array[Double], relativeError: Double): Array[Double]
/** Compute column summary statistics */
def stats(): StatCounter
}Usage Examples:
val doubleRDD = sc.parallelize(Array(1.0, 2.5, 3.7, 4.2, 5.8, 6.1))
// Statistical operations (implicit conversion)
val mean = doubleRDD.mean()
val stdDev = doubleRDD.stdev()
val variance = doubleRDD.variance()
val summary = doubleRDD.stats()
// Histogram
val (buckets, counts) = doubleRDD.histogram(5)
println(s"Histogram: ${buckets.zip(counts).mkString(", ")}")Asynchronous versions of RDD actions that return futures for non-blocking execution.
/**
* Extra functions for performing asynchronous operations on RDDs
*/
class AsyncRDDActions[T: ClassTag](self: RDD[T]) {
/** Returns a future for retrieving all elements of this RDD */
def collectAsync(): FutureAction[Array[T]]
/** Returns a future for counting the number of elements in the RDD */
def countAsync(): FutureAction[Long]
/** Returns a future for retrieving the first element in this RDD */
def foreachAsync(f: T => Unit): FutureAction[Unit]
/** Returns a future for applying a function to each partition of this RDD */
def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit]
/** Returns a future for retrieving the first num elements of the RDD */
def takeAsync(num: Int): FutureAction[Array[T]]
}Operations that work at the partition level for performance optimization.
// Partition-level transformation methods in RDD
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
def foreachPartition(f: Iterator[T] => Unit): Unit
def glom(): RDD[Array[T]] // convert each partition into an array
def pipe(command: String): RDD[String]
def pipe(command: Seq[String]): RDD[String]Usage Examples:
val data = sc.parallelize(1 to 100, 4) // 4 partitions
// Process each partition independently
val partitionSums = data.mapPartitions { iter =>
val sum = iter.sum
Iterator(sum)
}
// Access partition index
val partitionInfo = data.mapPartitionsWithIndex { (index, iter) =>
val count = iter.size
Iterator((index, count))
}
// Convert partitions to arrays
val partitionArrays = data.glom()
val arrays = partitionArrays.collect() // Array of arrays
// External command processing
val piped = data.pipe("grep '5'")Operations for combining RDDs element-wise.
// Zip operations in RDD
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
def zipWithIndex(): RDD[(T, Long)]
def zipWithUniqueId(): RDD[(T, Long)]
def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]
def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V]Usage Examples:
val rdd1 = sc.parallelize(Array("a", "b", "c"))
val rdd2 = sc.parallelize(Array(1, 2, 3))
// Zip two RDDs
val zipped = rdd1.zip(rdd2) // RDD[("a", 1), ("b", 2), ("c", 3)]
// Zip with index
val withIndex = rdd1.zipWithIndex() // RDD[("a", 0), ("b", 1), ("c", 2)]
// Zip with unique ID
val withUniqueId = rdd1.zipWithUniqueId()
// Custom zip operation on partitions
val customZip = rdd1.zipPartitions(rdd2) { (iter1, iter2) =>
iter1.zip(iter2).map { case (str, num) => s"$str-$num" }
}val input = sc.textFile("hdfs://input/data.txt")
val processed = input
.filter(_.nonEmpty) // Remove empty lines
.map(_.toLowerCase.trim) // Normalize
.flatMap(_.split("\\s+")) // Split into words
.filter(_.length > 3) // Filter short words
.map((_, 1)) // Create pairs
.reduceByKey(_ + _) // Count occurrences
.filter(_._2 > 10) // Filter rare words
.sortBy(_._2, ascending = false) // Sort by count
.cache() // Cache for reuse
// Multiple actions on cached RDD
val topWords = processed.take(100)
val totalUnique = processed.count()
processed.saveAsTextFile("hdfs://output/results")import org.apache.spark.TaskContext
val dataRDD = sc.parallelize(1 to 1000)
// Add debugging information
val debugRDD = dataRDD.mapPartitionsWithIndex { (partitionId, iter) =>
val taskContext = TaskContext.get()
println(s"Processing partition $partitionId on ${taskContext.taskAttemptId()}")
iter.map { value =>
try {
// Some processing that might fail
if (value % 100 == 0) throw new RuntimeException(s"Error processing $value")
value * 2
} catch {
case e: Exception =>
println(s"Error in partition $partitionId: ${e.getMessage}")
-1 // Error marker
}
}
}
// Filter out error markers and get debug info
val cleanData = debugRDD.filter(_ != -1)
println(s"Debug info: ${dataRDD.toDebugString}")