Apache Spark Core provides the foundational execution engine and API for distributed data processing with RDDs, task scheduling, and cluster management.
Resilient Distributed Dataset API providing transformations and actions for fault-tolerant distributed data processing on large datasets.
Abstract base class for all RDDs providing core distributed dataset functionality with automatic fault recovery through lineage tracking.
/**
* Resilient Distributed Dataset - immutable distributed collection
*/
abstract class RDD[T: ClassTag] extends Serializable {
// Transformations (lazy evaluation)
/** Transform each element using provided function */
def map[U: ClassTag](f: T => U): RDD[U]
/** Transform each element to sequence and flatten results */
def flatMap[U: ClassTag](f: T => IterableOnce[U]): RDD[U]
/** Filter elements matching predicate */
def filter(f: T => Boolean): RDD[T]
/** Map each partition with partition index */
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false
): RDD[U]
/** Sample fraction of elements */
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong
): RDD[T]
/** Return union of this RDD and another */
def union(other: RDD[T]): RDD[T]
/** Return intersection with another RDD */
def intersection(other: RDD[T]): RDD[T]
/** Return distinct elements */
def distinct(numPartitions: Int = partitions.length): RDD[T]
/** Group by key function */
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
/** Reduce partitions to specified number */
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]
/** Repartition to specified number */
def repartition(numPartitions: Int): RDD[T]
/** Sort RDD elements */
def sortBy[K](
f: T => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length
)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
/** Zip with another RDD */
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
/** Zip with element indices */
def zipWithIndex(): RDD[(T, Long)]
/** Zip with unique IDs */
def zipWithUniqueId(): RDD[(T, Long)]
// Actions (trigger computation)
/** Collect all elements to driver */
def collect(): Array[T]
/** Count number of elements */
def count(): Long
/** Return first element */
def first(): T
/** Take first n elements */
def take(num: Int): Array[T]
/** Take ordered elements */
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
/** Take sample */
def takeSample(
withReplacement: Boolean,
num: Int,
seed: Long = Utils.random.nextLong
): Array[T]
/** Reduce elements using function */
def reduce(f: (T, T) => T): T
/** Fold elements with initial value */
def fold(zeroValue: T)(op: (T, T) => T): T
/** Aggregate with different types */
def aggregate[U: ClassTag](zeroValue: U)(
seqOp: (U, T) => U,
combOp: (U, U) => U
): U
/** Apply function to each element */
def foreach(f: T => Unit): Unit
/** Apply function to each partition */
def foreachPartition(f: Iterator[T] => Unit): Unit
/** Save as text file */
def saveAsTextFile(path: String): Unit
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
// Persistence
/** Persist RDD with storage level */
def persist(newLevel: StorageLevel): this.type
/** Persist RDD with default storage level (MEMORY_ONLY) */
def persist(): this.type
/** Cache RDD in memory */
def cache(): this.type
/** Remove persisted data */
def unpersist(blocking: Boolean = false): this.type
/** Mark RDD for checkpointing */
def checkpoint(): Unit
/** Check if RDD is checkpointed */
def isCheckpointed: Boolean
// Metadata
/** Get partitions */
def partitions: Array[Partition]
/** Get partitioner */
def partitioner: Option[Partitioner]
/** Check if RDD is empty */
def isEmpty(): Boolean
/** Get storage level */
def getStorageLevel: StorageLevel
}Additional operations available on RDDs of key-value pairs through implicit conversion.
/**
* Extra functions for RDDs of (key, value) pairs
*/
class PairRDDFunctions[K, V](self: RDD[(K, V)]) {
/** Group values by key */
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
/** Reduce values by key */
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
/** Aggregate values by key */
def aggregateByKey[U: ClassTag](zeroValue: U)(
seqOp: (U, V) => U,
combOp: (U, U) => U
): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(
seqOp: (U, V) => U,
combOp: (U, U) => U
): RDD[(K, U)]
/** Fold values by key */
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
/** Combine values by key */
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C
): RDD[(K, C)]
/** Join with another pair RDD */
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
/** Left outer join */
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
/** Right outer join */
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
/** Full outer join */
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
/** Cogroup with other RDDs */
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]):
RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
/** Get values for keys */
def lookup(key: K): Seq[V]
/** Collect as map */
def collectAsMap(): Map[K, V]
/** Count by key */
def countByKey(): Map[K, Long]
/** Sort by key */
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[(K, V)]
/** Get keys only */
def keys: RDD[K]
/** Get values only */
def values: RDD[V]
/** Subtract by key */
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
/** Save as Hadoop file */
def saveAsHadoopFile[F <: OutputFormat[K, V]](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F]
): Unit
/** Save as new Hadoop API file */
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F]
): Unit
}Statistical operations available on RDDs of numeric values through implicit conversion.
/**
* Extra functions for RDDs of doubles
*/
class DoubleRDDFunctions(self: RDD[Double]) {
/** Compute mean */
def mean(): Double
/** Compute variance */
def variance(): Double
/** Compute standard deviation */
def stdev(): Double
/** Compute sum */
def sum(): Double
/** Compute statistics */
def stats(): StatCounter
/** Compute histogram */
def histogram(buckets: Int): (Array[Double], Array[Long])
def histogram(buckets: Array[Double]): Array[Long]
}
/**
* Statistics counter for numeric RDDs
*/
class StatCounter extends Serializable {
def count: Long
def mean: Double
def sum: Double
def min: Double
def max: Double
def variance: Double
def stdev: Double
}/** RDD from parallel collection */
class ParallelCollectionRDD[T: ClassTag](
@transient sc: SparkContext,
@transient data: Seq[T],
numSlices: Int
) extends RDD[T]
/** RDD from Hadoop InputFormat */
class HadoopRDD[K, V](
sc: SparkContext,
conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int
) extends RDD[(K, V)]
/** RDD from new Hadoop API */
class NewHadoopRDD[K, V](
sc: SparkContext,
inputFormatClass: Class[_ <: NewInputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
conf: Configuration
) extends RDD[(K, V)]
/** RDD from JDBC */
class JdbcRDD[T: ClassTag](
sc: SparkContext,
getConnection: () => Connection,
sql: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
mapRow: ResultSet => T
) extends RDD[T]
/** Empty RDD */
class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T]
/** Union of multiple RDDs */
class UnionRDD[T: ClassTag](sc: SparkContext, rdds: Seq[RDD[T]]) extends RDD[T]
/** Coalesced RDD with fewer partitions */
class CoalescedRDD[T: ClassTag](
prev: RDD[T],
maxPartitions: Int,
shuffle: Boolean = false
) extends RDD[T]
/** Shuffled RDD */
class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
prev: RDD[_ <: Product2[K, V]],
part: Partitioner
) extends RDD[(K, C)]Usage Examples:
import org.apache.spark.{SparkContext, SparkConf}
val sc = new SparkContext(new SparkConf().setAppName("RDD Example"))
// Basic transformations
val numbers = sc.parallelize(1 to 100)
val squares = numbers.map(x => x * x)
val evens = numbers.filter(_ % 2 == 0)
// Pair RDD operations
val pairs = sc.parallelize(List(("a", 1), ("b", 2), ("a", 3)))
val grouped = pairs.groupByKey()
val sums = pairs.reduceByKey(_ + _)
// Statistical operations
val doubles = sc.parallelize(Array(1.0, 2.0, 3.0, 4.0, 5.0))
val avg = doubles.mean()
val stats = doubles.stats()
// Actions
val result = squares.take(10)
val total = numbers.reduce(_ + _)
squares.saveAsTextFile("output/squares")
sc.stop()RDDs maintain partitioning information to optimize distributed operations and minimize data shuffling.
trait Partition extends Serializable {
def index: Int
}
case class TaskContext(
stageId: Int,
stageAttemptNumber: Int,
partitionId: Int,
taskAttemptId: Long,
attemptNumber: Int,
taskMemoryManager: TaskMemoryManager,
localProperties: Properties,
metricsSystem: MetricsSystem
)Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-core-2-13