Apache Spark Core - The foundational component of Apache Spark providing distributed computing capabilities including RDDs, transformations, actions, and cluster management.
—
Resilient Distributed Datasets (RDDs) are the fundamental data abstraction in Spark. They represent immutable, partitioned collections of elements that can be operated on in parallel. RDDs support two types of operations: transformations (which create new RDDs) and actions (which return values).
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) {
// Transformations - Lazy operations that return new RDDs
def map[U: ClassTag](f: T => U): RDD[U]
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
def filter(f: T => Boolean): RDD[T]
def distinct(numPartitions: Int = partitions.length): RDD[T]
def union(other: RDD[T]): RDD[T]
def intersection(other: RDD[T]): RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner): RDD[T]
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T]): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], p: Partitioner): RDD[T]
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
def zipWithIndex(): RDD[(T, Long)]
def zipWithUniqueId(): RDD[(T, Long)]
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]
def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
def repartition(numPartitions: Int): RDD[T]
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]
def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
def glom(): RDD[Array[T]]
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
def pipe(command: String): RDD[String]
def pipe(command: Seq[String], env: Map[String, String] = Map(), printPipeRDDInfo: Option[String => Unit] = None, propagateFailure: Boolean = false, checkExitCode: Boolean = true): RDD[String]
// Actions - Operations that trigger computation and return values
def collect(): Array[T]
def count(): Long
def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]
def countApproxDistinct(relativeSD: Double = 0.05): Long
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]
def first(): T
def isEmpty(): Boolean
def take(num: Int): Array[T]
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
def min()(implicit ord: Ordering[T]): T
def max()(implicit ord: Ordering[T]): T
def reduce(f: (T, T) => T): T
def treeReduce(f: (T, T) => T, depth: Int = 2): T
def fold(zeroValue: T)(op: (T, T) => T): T
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
def treeAggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U, depth: Int = 2): U
def foreach(f: T => Unit): Unit
def foreachPartition(f: Iterator[T] => Unit): Unit
// Persistence and Caching
def persist(): this.type
def persist(newLevel: StorageLevel): this.type
def cache(): this.type
def unpersist(blocking: Boolean = true): this.type
def checkpoint(): Unit
def isCheckpointed: Boolean
def getCheckpointFile: Option[String]
def localCheckpoint(): this.type
// Partition and Dependency Information
def partitions: Array[Partition]
def partitioner: Option[Partitioner]
def getNumPartitions: Int
def dependencies: Seq[Dependency[_]]
def preferredLocations(split: Partition): Seq[String]
def compute(split: Partition, context: TaskContext): Iterator[T]
// Metadata and Debugging
def id: Int
def name: String
def setName(name: String): this.type
def toDebugString: String
def getStorageLevel: StorageLevel
def context: SparkContext
}Transformations are lazy operations that define new RDDs but don't trigger computation immediately.
import org.apache.spark.{SparkContext, SparkConf}
val sc = new SparkContext(new SparkConf().setAppName("RDD Examples").setMaster("local[*]"))
// Create sample data
val numbers = sc.parallelize(1 to 100)
val words = sc.parallelize(Seq("hello world", "spark is awesome", "big data processing"))
// map - Transform each element
val squared = numbers.map(x => x * x)
val lengths = words.map(_.length)
// flatMap - Transform each element to multiple elements
val allWords = words.flatMap(_.split(" "))
val digits = numbers.flatMap(n => n.toString.toCharArray.map(_.asDigit))
// filter - Keep elements matching predicate
val evenNumbers = numbers.filter(_ % 2 == 0)
val longWords = allWords.filter(_.length > 4)
// distinct - Remove duplicates
val uniqueWords = allWords.distinct()
val uniqueDigits = digits.distinct()
// sample - Random sampling
val sample = numbers.sample(withReplacement = false, fraction = 0.1, seed = 42)
val sampleWithReplacement = numbers.sample(withReplacement = true, fraction = 0.2)val rdd1 = sc.parallelize(1 to 10)
val rdd2 = sc.parallelize(5 to 15)
// union - Combine RDDs (allows duplicates)
val combined = rdd1.union(rdd2)
// intersection - Elements in both RDDs
val common = rdd1.intersection(rdd2)
// subtract - Elements in first RDD but not second
val difference = rdd1.subtract(rdd2)
// cartesian - Cartesian product
val pairs = rdd1.cartesian(rdd2)val data = sc.parallelize(Seq("apple", "banana", "cherry"))
val scores = sc.parallelize(Seq(95, 87, 92))
// zip - Combine elements at same positions
val pairs = data.zip(scores)
// zipWithIndex - Add sequential indices
val indexed = data.zipWithIndex()
// zipWithUniqueId - Add unique IDs (not necessarily sequential)
val withIds = data.zipWithUniqueId()val data = sc.parallelize(1 to 1000, numSlices = 8)
// repartition - Change number of partitions (shuffles data)
val repartitioned = data.repartition(4)
// coalesce - Reduce partitions without shuffling (when possible)
val coalesced = data.coalesce(2)
// sortBy - Sort RDD by key function
val sorted = data.sortBy(x => -x, ascending = false) // Sort descending// glom - Collect all elements in each partition into arrays
val partitionArrays = numbers.glom()
// mapPartitions - Transform entire partitions
val partitionSums = numbers.mapPartitions(iter => Iterator(iter.sum))
// mapPartitionsWithIndex - Access partition index
val partitionInfo = numbers.mapPartitionsWithIndex { (index, iter) =>
Iterator((index, iter.sum, iter.size))
}
// pipe - Send data through external process
val upperCased = allWords.pipe("tr [a-z] [A-Z]")Actions trigger computation and return results to the driver or save data to external systems.
val data = sc.parallelize(1 to 100)
// collect - Bring all elements to driver (use with caution for large datasets)
val allElements: Array[Int] = data.collect()
// take - Get first n elements
val first10: Array[Int] = data.take(10)
// takeOrdered - Get n smallest elements
val smallest5: Array[Int] = data.takeOrdered(5)
// top - Get n largest elements
val largest5: Array[Int] = data.top(5)
// takeSample - Random sample
val randomSample: Array[Int] = data.takeSample(withReplacement = false, num = 20)
// first - Get first element
val firstElement: Int = data.first()
// min/max - Find extremes
val minimum: Int = data.min()
val maximum: Int = data.max()// count - Count elements
val totalCount: Long = data.count()
// countByValue - Count occurrences of each value
val wordCounts: Map[String, Long] = allWords.countByValue()
// reduce - Combine elements with associative function
val sum: Int = data.reduce(_ + _)
val product: Int = data.reduce(_ * _)
// fold - Like reduce but with initial value
val sumWithZero: Int = data.fold(0)(_ + _)
// aggregate - More flexible reduction with different input/output types
val stats: (Int, Double) = data.aggregate((0, 0.0))(
seqOp = { case ((count, sum), value) => (count + 1, sum + value) },
combOp = { case ((c1, s1), (c2, s2)) => (c1 + c2, s1 + s2) }
)
// treeReduce/treeAggregate - Hierarchical reduction (more efficient for deep lineages)
val treeSum: Int = data.treeReduce(_ + _)// foreach - Execute function on each element (driver side)
data.foreach(println)
// foreachPartition - Execute function on each partition
data.foreachPartition { partition =>
val connection = createDatabaseConnection()
partition.foreach(value => insertToDatabase(connection, value))
connection.close()
}// countApprox - Approximate count with timeout
val approxCount = data.countApprox(timeout = 1000L, confidence = 0.95)
// countApproxDistinct - Approximate distinct count
val approxDistinct = data.countApproxDistinct(relativeSD = 0.05)RDDs can be persisted in memory or disk for efficient reuse across multiple actions.
import org.apache.spark.storage.StorageLevel
val expensiveRDD = data
.filter(_ > 50)
.map(expensiveComputation)
.filter(_.nonEmpty)
// Cache in memory (shorthand for MEMORY_ONLY)
expensiveRDD.cache()
// Explicit persistence with storage level
expensiveRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
// Use the cached RDD multiple times
val result1 = expensiveRDD.count()
val result2 = expensiveRDD.collect()
val result3 = expensiveRDD.take(10)
// Remove from cache when done
expensiveRDD.unpersist()Checkpointing saves RDD data to reliable storage to truncate lineage and improve fault tolerance.
// Set checkpoint directory
sc.setCheckpointDir("hdfs://namenode:port/checkpoints")
val complexRDD = data
.map(complexTransformation1)
.filter(complexFilter)
.map(complexTransformation2)
.filter(anotherComplexFilter)
// Checkpoint the RDD
complexRDD.checkpoint()
// Trigger checkpointing with an action
complexRDD.count()
// Verify checkpointing
println(s"Is checkpointed: ${complexRDD.isCheckpointed}")
println(s"Checkpoint file: ${complexRDD.getCheckpointFile}")Understanding and controlling partitions is crucial for performance optimization.
val data = sc.textFile("large-file.txt", minPartitions = 100)
// Check partition information
println(s"Number of partitions: ${data.getNumPartitions}")
println(s"Partitioner: ${data.partitioner}")
// View partition content
data.glom().collect().zipWithIndex.foreach { case (partitionData, index) =>
println(s"Partition $index has ${partitionData.length} elements")
}
// Custom partitioning for key-value RDDs
val keyValueData = data.map(line => (line.length, line))
val hashPartitioned = keyValueData.partitionBy(new HashPartitioner(50))
val rangePartitioned = keyValueData.partitionBy(new RangePartitioner(50, keyValueData))// Chain transformations efficiently
val pipeline = sc.textFile("input")
.filter(_.nonEmpty)
.map(_.toLowerCase.trim)
.filter(_.length > 5)
.map(processLine)
.filter(_.isValid)
.persist(StorageLevel.MEMORY_AND_DISK_SER) // Persist at strategic points
// Multiple actions on same RDD
val validLines = pipeline.cache()
val count = validLines.count()
val sample = validLines.take(100)
val summary = validLines.map(_.summary).collect()import scala.util.{Try, Success, Failure}
val robustRDD = data.map { element =>
Try(riskyOperation(element)) match {
case Success(result) => Some(result)
case Failure(exception) =>
logError(s"Failed to process $element: ${exception.getMessage}")
None
}
}.filter(_.isDefined).map(_.get)class CustomPartitioner(numPartitions: Int) extends Partitioner {
def numPartitions: Int = numPartitions
def getPartition(key: Any): Int = {
key match {
case s: String => math.abs(s.hashCode) % numPartitions
case i: Int => i % numPartitions
case _ => 0
}
}
}
val customPartitioned = keyValueData.partitionBy(new CustomPartitioner(100))Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-core-2-11