Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
—
The RDD (Resilient Distributed Dataset) is the fundamental abstraction in Apache Spark. It represents an immutable, partitioned collection of elements that can be operated on in parallel with fault-tolerance built-in.
abstract class RDD[T] extends Serializable {
// Core properties
def partitions: Array[Partition]
def compute(split: Partition, context: TaskContext): Iterator[T]
def dependencies: Seq[Dependency[_]]
def partitioner: Option[Partitioner] = None
def preferredLocations(split: Partition): Seq[String] = Nil
}Transformations are lazy operations that create a new RDD from an existing one. They are not executed immediately but build a directed acyclic graph (DAG) of computations.
map: Apply a function to each element
def map[U: ClassTag](f: T => U): RDD[U]val numbers = sc.parallelize(Array(1, 2, 3, 4, 5))
val squared = numbers.map(x => x * x)
// Result: RDD containing [1, 4, 9, 16, 25]flatMap: Apply a function and flatten the results
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]val lines = sc.parallelize(Array("hello world", "spark rdd"))
val words = lines.flatMap(line => line.split(" "))
// Result: RDD containing ["hello", "world", "spark", "rdd"]filter: Keep elements that satisfy a predicate
def filter(f: T => Boolean): RDD[T]val numbers = sc.parallelize(Array(1, 2, 3, 4, 5, 6))
val evens = numbers.filter(_ % 2 == 0)
// Result: RDD containing [2, 4, 6]distinct: Remove duplicate elements
def distinct(): RDD[T]
def distinct(numPartitions: Int): RDD[T]val data = sc.parallelize(Array(1, 2, 2, 3, 3, 3))
val unique = data.distinct()
// Result: RDD containing [1, 2, 3]sample: Return a sampled subset
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]val data = sc.parallelize(1 to 100)
val sampled = data.sample(withReplacement = false, fraction = 0.1)
// Result: RDD with approximately 10% of original elementsrandomSplit: Split RDD randomly into multiple RDDs
def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]val data = sc.parallelize(1 to 100)
val Array(train, test) = data.randomSplit(Array(0.7, 0.3))
// Result: Two RDDs with ~70% and ~30% of data respectivelyunion: Return the union of two RDDs
def union(other: RDD[T]): RDD[T]intersection: Return the intersection of two RDDs
def intersection(other: RDD[T]): RDD[T]
def intersection(other: RDD[T], numPartitions: Int): RDD[T]subtract: Return elements in this RDD but not in the other
def subtract(other: RDD[T]): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], p: Partitioner): RDD[T]val rdd1 = sc.parallelize(Array(1, 2, 3, 4))
val rdd2 = sc.parallelize(Array(3, 4, 5, 6))
val unionRDD = rdd1.union(rdd2) // [1, 2, 3, 4, 3, 4, 5, 6]
val intersectionRDD = rdd1.intersection(rdd2) // [3, 4]
val subtractRDD = rdd1.subtract(rdd2) // [1, 2]zip: Zip this RDD with another one, returning key-value pairs
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]zipWithIndex: Zip with the element indices
def zipWithIndex(): RDD[(T, Long)]zipWithUniqueId: Zip with generated unique IDs
def zipWithUniqueId(): RDD[(T, Long)]cartesian: Return Cartesian product with another RDD
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]val rdd1 = sc.parallelize(Array("a", "b"))
val rdd2 = sc.parallelize(Array(1, 2))
val zipped = rdd1.zip(rdd2) // [("a", 1), ("b", 2)]
val withIndex = rdd1.zipWithIndex() // [("a", 0), ("b", 1)]
val cartesian = rdd1.cartesian(rdd2) // [("a", 1), ("a", 2), ("b", 1), ("b", 2)]groupBy: Group elements by a key function
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]val data = sc.parallelize(Array(1, 2, 3, 4, 5, 6))
val grouped = data.groupBy(_ % 2) // Group by even/odd
// Result: [(0, [2, 4, 6]), (1, [1, 3, 5])]keyBy: Create tuples by applying a function to generate keys
def keyBy[K](f: T => K): RDD[(K, T)]val words = sc.parallelize(Array("apple", "banana", "apricot"))
val byFirstLetter = words.keyBy(_.charAt(0))
// Result: [('a', "apple"), ('b', "banana"), ('a', "apricot")]repartition: Increase or decrease partitions with shuffle
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]coalesce: Reduce the number of partitions (optionally with shuffle)
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]val data = sc.parallelize(1 to 1000, 10) // 10 partitions
val repartitioned = data.repartition(5) // 5 partitions with shuffle
val coalesced = data.coalesce(5) // 5 partitions without shufflemapPartitions: Apply function to each partition independently
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]mapPartitionsWithIndex: Apply function with partition index
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]glom: Return an array of all elements in each partition
def glom(): RDD[Array[T]]val data = sc.parallelize(Array(1, 2, 3, 4, 5, 6), 2)
// Sum elements in each partition
val partitionSums = data.mapPartitions(iter => Iterator(iter.sum))
// Add partition index to each element
val withPartitionId = data.mapPartitionsWithIndex((index, iter) =>
iter.map(value => (index, value)))
// Get arrays of elements per partition
val partitionArrays = data.glom() // [[1, 2, 3], [4, 5, 6]]Actions trigger the execution of transformations and return values to the driver program or save data to storage.
collect: Return all elements as an array (use with caution on large datasets)
def collect(): Array[T]take: Return first n elements
def take(num: Int): Array[T]takeOrdered: Return K smallest elements
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]top: Return K largest elements
def top(num: Int)(implicit ord: Ordering[T]): Array[T]takeSample: Return a random sample of elements
def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]first: Return the first element
def first(): Tval data = sc.parallelize(Array(5, 1, 3, 9, 2, 7))
val all = data.collect() // [5, 1, 3, 9, 2, 7]
val first3 = data.take(3) // [5, 1, 3]
val smallest2 = data.takeOrdered(2) // [1, 2]
val largest2 = data.top(2) // [9, 7]
val sample = data.takeSample(false, 3) // Random 3 elements
val firstElement = data.first() // 5count: Return the number of elements
def count(): LongcountByValue: Return count of each unique value
def countByValue()(implicit ord: Ordering[T]): Map[T, Long]reduce: Reduce elements using an associative function
def reduce(f: (T, T) => T): Tfold: Aggregate with a zero value
def fold(zeroValue: T)(op: (T, T) => T): Taggregate: Aggregate with different result type
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): Uval numbers = sc.parallelize(Array(1, 2, 3, 4, 5))
val count = numbers.count() // 5
val sum = numbers.reduce(_ + _) // 15
val sumWithZero = numbers.fold(0)(_ + _) // 15
val (sum2, count2) = numbers.aggregate((0, 0))( // (15, 5)
(acc, value) => (acc._1 + value, acc._2 + 1), // seqOp
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // combOp
)
val valueCounts = sc.parallelize(Array("a", "b", "a", "c", "b")).countByValue()
// Result: Map("a" -> 2, "b" -> 2, "c" -> 1)For RDDs containing numeric types, additional statistical operations are available through implicit conversions:
min/max: Find minimum/maximum values
def min()(implicit ord: Ordering[T]): T
def max()(implicit ord: Ordering[T]): Tval numbers = sc.parallelize(Array(5, 1, 9, 3, 7))
val minimum = numbers.min() // 1
val maximum = numbers.max() // 9foreach: Apply a function to each element (side effects only)
def foreach(f: T => Unit): UnitforeachPartition: Apply a function to each partition
def foreachPartition(f: Iterator[T] => Unit): UnittoLocalIterator: Return an iterator that consumes each partition sequentially
def toLocalIterator: Iterator[T]val data = sc.parallelize(Array(1, 2, 3, 4, 5))
// Print each element (for debugging)
data.foreach(println)
// Process each partition (e.g., write to database)
data.foreachPartition { partition =>
// Setup database connection
partition.foreach { element =>
// Insert element into database
}
// Close database connection
}saveAsTextFile: Save RDD as text files
def saveAsTextFile(path: String): Unit
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): UnitsaveAsObjectFile: Save as SequenceFile of serialized objects
def saveAsObjectFile(path: String): Unitval data = sc.parallelize(Array(1, 2, 3, 4, 5))
// Save as text files
data.saveAsTextFile("hdfs://path/to/output")
// Save with compression
import org.apache.hadoop.io.compress.GzipCodec
data.saveAsTextFile("hdfs://path/to/compressed", classOf[GzipCodec])
// Save as object file
data.saveAsObjectFile("hdfs://path/to/objects")Partition Information:
def partitions: Array[Partition] // Get partition array
def getNumPartitions: Int // Get number of partitions
def partitioner: Option[Partitioner] // Get partitioner if anyDependencies and Lineage:
def dependencies: Seq[Dependency[_]] // RDD dependencies
def toDebugString: String // Debug string showing lineageNaming and Context:
def setName(name: String): RDD[T] // Set RDD name for monitoring
def name: String // Get RDD name
def id: Int // Unique RDD identifier
def sparkContext: SparkContext // The SparkContext that created this RDDval data = sc.parallelize(Array(1, 2, 3, 4, 5), 3).setName("MyRDD")
println(s"Partitions: ${data.getNumPartitions}") // Partitions: 3
println(s"Name: ${data.name}") // Name: MyRDD
println(s"ID: ${data.id}") // ID: 0
println(data.toDebugString) // Shows RDD lineagetoJavaRDD: Convert to Java RDD
def toJavaRDD(): JavaRDD[T]RDD operations can fail for various reasons. Understanding common error scenarios helps with debugging and building robust applications.
SparkException: General Spark execution errors
// Task failure due to out of memory, serialization issues, etc.
try {
val result = rdd.collect()
} catch {
case e: SparkException => println(s"Spark execution failed: ${e.getMessage}")
}TaskResultLost: Task results lost due to network or node failure
// Typically indicates node failure or network issues
// Spark automatically retries, but may eventually failOutOfMemoryError: Driver or executor runs out of memory
// Common with collect() on large datasets
try {
val allData = largeRDD.collect() // Dangerous!
} catch {
case _: OutOfMemoryError =>
println("Dataset too large for collect(). Use take() or write to storage.")
}collect(): Can cause OutOfMemoryError if result doesn't fit in driver memory
// Safe: Use take() for sampling
val sample = rdd.take(100)
// Dangerous: Collect entire large dataset
val all = rdd.collect() // May cause OOMreduce(): Fails if RDD is empty
try {
val sum = rdd.reduce(_ + _)
} catch {
case e: UnsupportedOperationException =>
println("Cannot reduce empty RDD")
}
// Safe alternative
val sum = rdd.fold(0)(_ + _) // Works with empty RDDsfirst(): Throws exception if RDD is empty
try {
val firstElement = rdd.first()
} catch {
case e: UnsupportedOperationException =>
println("RDD is empty")
}
// Safe alternative
val maybeFirst = rdd.take(1).headOptionNotSerializableException: Objects must be serializable for distribution
class NotSerializable {
def process(x: Int): Int = x * 2
}
val processor = new NotSerializable()
// This will fail at runtime
val result = rdd.map(x => processor.process(x)) // NotSerializableException
// Solution: Make objects serializable or use functions
val result = rdd.map(x => x * 2) // SafeFileNotFoundException: When reading non-existent files
try {
val data = sc.textFile("hdfs://nonexistent/path")
data.count() // Error occurs on action, not creation
} catch {
case e: FileNotFoundException =>
println(s"File not found: ${e.getMessage}")
}InvalidInputException: Malformed input data
// Handle corrupted or malformed files
try {
val data = sc.sequenceFile[String, String]("path/to/corrupted/file")
data.collect()
} catch {
case e: InvalidInputException =>
println(s"Invalid input format: ${e.getMessage}")
}Connection timeouts: Network issues between nodes
spark.network.timeout for longer operationsspark.sql.adaptive.coalescePartitions.enabled=true to reduce network overheadShuffle failures: Data shuffle operations failing
groupByKey, join, distinctspark.serializer.objectStreamReset intervalreduceByKey instead of groupByKeytake(), sample(), or write to storagefold() instead of reduce(), check isEmpty() before actions// Robust RDD processing pattern
def processRDDSafely[T](rdd: RDD[T]): Option[Array[T]] = {
try {
if (rdd.isEmpty()) {
println("Warning: RDD is empty")
None
} else {
// Use take() instead of collect() for safety
val sample = rdd.take(1000)
Some(sample)
}
} catch {
case e: SparkException =>
println(s"Spark processing failed: ${e.getMessage}")
None
case e: OutOfMemoryError =>
println("Out of memory - dataset too large")
None
}
}This comprehensive coverage of RDD operations provides the foundation for all data processing in Apache Spark. Understanding these operations and their potential failure modes is crucial for effective Spark programming.
Install with Tessl CLI
npx tessl i tessl/maven-apache-spark