Core functionality for Apache Spark, providing RDDs, SparkContext, and the fundamental distributed computing engine for big data processing.
Resilient Distributed Datasets (RDDs) provide the core abstraction for distributed data processing in Spark, offering fault-tolerant distributed collections with transformations and actions.
Abstract base class representing an immutable, partitioned collection of elements that can be operated on in parallel.
/**
* Resilient Distributed Dataset (RDD) - basic abstraction in Spark
*/
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable
// Core transformation methods (lazy evaluation)
def map[U: ClassTag](f: T => U): RDD[U]
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
def filter(f: T => Boolean): RDD[T]
def distinct(): RDD[T]
def distinct(numPartitions: Int): RDD[T]
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
// Set operations
def union(other: RDD[T]): RDD[T]
def ++(other: RDD[T]): RDD[T] // alias for union
def intersection(other: RDD[T]): RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T]): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
// Grouping and sorting
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null): RDD[(K, Iterable[T])]
def sortBy[K](f: T => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
// Advanced transformations
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
def glom(): RDD[Array[T]]
def keyBy[K](f: T => K): RDD[(K, T)]
def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]
// Zipping operations
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]
def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]
def zipWithIndex(): RDD[(T, Long)]
def zipWithUniqueId(): RDD[(T, Long)]
// Partitioning operations
def repartition(numPartitions: Int): RDD[T]
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]
def partitionBy(partitioner: Partitioner): RDD[T] // only for pair RDDs
// Core action methods (trigger execution)
def collect(): Array[T]
def count(): Long
def first(): T
def take(num: Int): Array[T]
def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
def max()(implicit ord: Ordering[T]): T
def min()(implicit ord: Ordering[T]): T
def isEmpty(): Boolean
def toLocalIterator: Iterator[T]
// Counting actions
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]
def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]
def countByValueApprox(timeout: Long, confidence: Double = 0.95)(implicit ord: Ordering[T] = null): PartialResult[Map[T, BoundedDouble]]
def countApproxDistinct(p: Int, sp: Int): Long
def countApproxDistinct(relativeSD: Double = 0.05): Long
// Reduction operations
def reduce(f: (T, T) => T): T
def fold(zeroValue: T)(op: (T, T) => T): T
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
def treeReduce(f: (T, T) => T, depth: Int = 2): T
def treeAggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U, depth: Int = 2): U
// Side-effect operations
def foreach(f: T => Unit): Unit
def foreachPartition(f: Iterator[T] => Unit): Unit
// Persistence methods
def persist(): RDD[T]
def persist(storageLevel: StorageLevel): RDD[T]
def cache(): RDD[T] // equivalent to persist(MEMORY_ONLY)
def unpersist(blocking: Boolean = true): RDD[T]
def getStorageLevel: StorageLevel
// Checkpointing methods
def checkpoint(): Unit
def localCheckpoint(): RDD[T]
def isCheckpointed: Boolean
def getCheckpointFile: Option[String]
// File output operations
def saveAsTextFile(path: String): Unit
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
def saveAsObjectFile(path: String): Unit
// Piping operations
def pipe(command: String): RDD[String]
def pipe(command: String, env: Map[String, String]): RDD[String]
def pipe(command: Seq[String], env: Map[String, String] = Map(),
printPipeContext: (String => Unit) => Unit = null,
printRDDElement: (T, String => Unit) => Unit = null,
separateWorkingDir: Boolean = false): RDD[String]
// Metadata and utility methods
def setName(name: String): RDD[T]
val name: String
val id: Int
def partitions: Array[Partition]
def dependencies: Seq[Dependency[_]]
val partitioner: Option[Partitioner]
def getNumPartitions: Int
def sparkContext: SparkContext
def context: SparkContext // alias for sparkContext
def toJavaRDD(): JavaRDD[T]
def toDebugString: String
// Special collection methods
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U]Usage Examples:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
val sc = new SparkContext(new SparkConf().setAppName("RDD Examples").setMaster("local[*]"))
// Create RDD from collection
val numbers = sc.parallelize(1 to 100)
// Transformations (lazy evaluation)
val evenNumbers = numbers.filter(_ % 2 == 0)
val squares = evenNumbers.map(x => x * x)
val distinct = squares.distinct()
// Actions (trigger execution)
val result = squares.collect()
val count = squares.count()
val sum = squares.reduce(_ + _)
val firstTen = squares.take(10)
// Complex transformations
val words = sc.textFile("hdfs://path/to/file.txt")
val wordCounts = words
.flatMap(_.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
// Persistence for reuse
val cachedRDD = numbers.map(_ * 2).cache()
val result1 = cachedRDD.sum()
val result2 = cachedRDD.max() // Uses cached data
// Advanced operations
val grouped = numbers.groupBy(_ % 10)
val sorted = numbers.sortBy(identity, ascending = false)
val sampled = numbers.sample(withReplacement = false, 0.1)Additional operations available for RDDs containing numeric values.
/**
* Extra functions available on RDDs of Doubles through an implicit conversion
*/
class DoubleRDDFunctions(self: RDD[Double]) {
/** Compute the mean of this RDD's elements */
def mean(): Double
/** Compute the sum of this RDD's elements */
def sum(): Double
/** Find the minimum element in this RDD */
def min(): Double
/** Find the maximum element in this RDD */
def max(): Double
/** Compute the variance of this RDD's elements */
def variance(): Double
/** Compute the standard deviation of this RDD's elements */
def stdev(): Double
/** Compute the sample standard deviation of this RDD's elements */
def sampleStdev(): Double
/** Compute the sample variance of this RDD's elements */
def sampleVariance(): Double
/** Compute a histogram of the RDD using buckets evenly spaced between the minimum and maximum */
def histogram(buckets: Int): (Array[Double], Array[Long])
/** Compute a histogram using provided bucket boundaries */
def histogram(buckets: Array[Double]): Array[Long]
/** Return approximate percentiles */
def approxQuantile(probabilities: Array[Double], relativeError: Double): Array[Double]
/** Compute column summary statistics */
def stats(): StatCounter
}Usage Examples:
val doubleRDD = sc.parallelize(Array(1.0, 2.5, 3.7, 4.2, 5.8, 6.1))
// Statistical operations (implicit conversion)
val mean = doubleRDD.mean()
val stdDev = doubleRDD.stdev()
val variance = doubleRDD.variance()
val summary = doubleRDD.stats()
// Histogram
val (buckets, counts) = doubleRDD.histogram(5)
println(s"Histogram: ${buckets.zip(counts).mkString(", ")}")Asynchronous versions of RDD actions that return futures for non-blocking execution.
/**
* Extra functions for performing asynchronous operations on RDDs
*/
class AsyncRDDActions[T: ClassTag](self: RDD[T]) {
/** Returns a future for retrieving all elements of this RDD */
def collectAsync(): FutureAction[Array[T]]
/** Returns a future for counting the number of elements in the RDD */
def countAsync(): FutureAction[Long]
/** Returns a future for retrieving the first element in this RDD */
def foreachAsync(f: T => Unit): FutureAction[Unit]
/** Returns a future for applying a function to each partition of this RDD */
def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit]
/** Returns a future for retrieving the first num elements of the RDD */
def takeAsync(num: Int): FutureAction[Array[T]]
}Operations that work at the partition level for performance optimization.
// Partition-level transformation methods in RDD
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
def foreachPartition(f: Iterator[T] => Unit): Unit
def glom(): RDD[Array[T]] // convert each partition into an array
def pipe(command: String): RDD[String]
def pipe(command: Seq[String]): RDD[String]Usage Examples:
val data = sc.parallelize(1 to 100, 4) // 4 partitions
// Process each partition independently
val partitionSums = data.mapPartitions { iter =>
val sum = iter.sum
Iterator(sum)
}
// Access partition index
val partitionInfo = data.mapPartitionsWithIndex { (index, iter) =>
val count = iter.size
Iterator((index, count))
}
// Convert partitions to arrays
val partitionArrays = data.glom()
val arrays = partitionArrays.collect() // Array of arrays
// External command processing
val piped = data.pipe("grep '5'")Operations for combining RDDs element-wise.
// Zip operations in RDD
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
def zipWithIndex(): RDD[(T, Long)]
def zipWithUniqueId(): RDD[(T, Long)]
def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]
def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V]Usage Examples:
val rdd1 = sc.parallelize(Array("a", "b", "c"))
val rdd2 = sc.parallelize(Array(1, 2, 3))
// Zip two RDDs
val zipped = rdd1.zip(rdd2) // RDD[("a", 1), ("b", 2), ("c", 3)]
// Zip with index
val withIndex = rdd1.zipWithIndex() // RDD[("a", 0), ("b", 1), ("c", 2)]
// Zip with unique ID
val withUniqueId = rdd1.zipWithUniqueId()
// Custom zip operation on partitions
val customZip = rdd1.zipPartitions(rdd2) { (iter1, iter2) =>
iter1.zip(iter2).map { case (str, num) => s"$str-$num" }
}val input = sc.textFile("hdfs://input/data.txt")
val processed = input
.filter(_.nonEmpty) // Remove empty lines
.map(_.toLowerCase.trim) // Normalize
.flatMap(_.split("\\s+")) // Split into words
.filter(_.length > 3) // Filter short words
.map((_, 1)) // Create pairs
.reduceByKey(_ + _) // Count occurrences
.filter(_._2 > 10) // Filter rare words
.sortBy(_._2, ascending = false) // Sort by count
.cache() // Cache for reuse
// Multiple actions on cached RDD
val topWords = processed.take(100)
val totalUnique = processed.count()
processed.saveAsTextFile("hdfs://output/results")import org.apache.spark.TaskContext
val dataRDD = sc.parallelize(1 to 1000)
// Add debugging information
val debugRDD = dataRDD.mapPartitionsWithIndex { (partitionId, iter) =>
val taskContext = TaskContext.get()
println(s"Processing partition $partitionId on ${taskContext.taskAttemptId()}")
iter.map { value =>
try {
// Some processing that might fail
if (value % 100 == 0) throw new RuntimeException(s"Error processing $value")
value * 2
} catch {
case e: Exception =>
println(s"Error in partition $partitionId: ${e.getMessage}")
-1 // Error marker
}
}
}
// Filter out error markers and get debug info
val cleanData = debugRDD.filter(_ != -1)
println(s"Debug info: ${dataRDD.toDebugString}")Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-core-2-10