Apache Spark Core provides distributed computing capabilities including RDD abstractions, task scheduling, memory management, and fault recovery.
Resilient Distributed Dataset (RDD) is the fundamental abstraction in Apache Spark. RDDs are fault-tolerant collections of elements that can be operated on in parallel.
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) {
// Transformations (Lazy)
def map[U: ClassTag](f: T => U): RDD[U]
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
def filter(f: T => Boolean): RDD[T]
def distinct(numPartitions: Int = partitions.length): RDD[T]
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
def union(other: RDD[T]): RDD[T]
def intersection(other: RDD[T]): RDD[T]
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])]
def pipe(command: String): RDD[String]
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]
def repartition(numPartitions: Int): RDD[T]
def sortBy[K](f: T => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
def keyBy[K](f: T => K): RDD[(K, T)]
// Actions (Eager)
def collect(): Array[T]
def count(): Long
def first(): T
def take(num: Int): Array[T]
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
def reduce(f: (T, T) => T): T
def fold(zeroValue: T)(op: (T, T) => T): T
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
def treeAggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U, depth: Int = 2): U
def foreach(f: T => Unit): Unit
def foreachPartition(f: Iterator[T] => Unit): Unit
// I/O Actions
def saveAsTextFile(path: String): Unit
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
def saveAsObjectFile(path: String): Unit
// Persistence
def persist(): RDD[T]
def persist(newLevel: StorageLevel): RDD[T]
def cache(): RDD[T]
def unpersist(blocking: Boolean = false): RDD[T]
def getStorageLevel: StorageLevel
def checkpoint(): Unit
def isCheckpointed: Boolean
def getCheckpointFile: Option[String]
// Metadata
def partitions: Array[Partition]
def partitioner: Option[Partitioner]
def getNumPartitions: Int
def dependencies: Seq[Dependency[_]]
def preferredLocations(split: Partition): Seq[String]
def context: SparkContext
def id: Int
def name: String
def setName(name: String): RDD[T]
}Operations available on RDDs of key-value pairs. These operations are available through implicit conversions.
class PairRDDFunctions[K, V](self: RDD[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) {
// Grouping Operations
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
// Reduction Operations
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
// Aggregation Operations
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
// Partitioning
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
// Join Operations
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]
// Set Operations
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]
// Lookups and Collection
def lookup(key: K): Seq[V]
def collectAsMap(): Map[K, V]
def countByKey(): Map[K, Long]
def countByKeyApprox(timeout: Long, confidence: Double = 0.95): PartialResult[Map[K, BoundedDouble]]
// Value Operations
def mapValues[U](f: V => U): RDD[(K, U)]
def flatMapValues[U](f: V => IterableOnce[U]): RDD[(K, U)]
def keys: RDD[K]
def values: RDD[V]
// Sorting
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
def sortBy[B](f: ((K, V)) => B, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[B], ctag: ClassTag[B]): RDD[(K, V)]
// I/O Operations
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[F], codec: Class[_ <: CompressionCodec]): Unit
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[F], conf: JobConf = new JobConf(self.context.hadoopConfiguration), codec: Option[Class[_ <: CompressionCodec]] = None): Unit
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[F], conf: Configuration = self.context.hadoopConfiguration): Unit
}Statistical operations available on RDDs of Double values.
class DoubleRDDFunctions(self: RDD[Double]) {
// Statistical Operations
def sum(): Double
def stats(): StatCounter
def mean(): Double
def variance(): Double
def stdev(): Double
def sampleStdev(): Double
def sampleVariance(): Double
// Histogram Operations
def histogram(buckets: Array[Double]): Array[Long]
def histogram(buckets: Int): (Array[Double], Array[Long])
}Operations for saving RDDs as Hadoop SequenceFiles.
class SequenceFileRDDFunctions[K, V](self: RDD[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V], keyWritableFactory: WritableFactory[K], valueWritableFactory: WritableFactory[V]) {
def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit
}val numbers = sc.parallelize(1 to 100)
// Map transformation
val squares = numbers.map(x => x * x)
// Filter transformation
val evenNumbers = numbers.filter(_ % 2 == 0)
// FlatMap transformation
val words = sc.parallelize(Array("hello world", "foo bar"))
val allWords = words.flatMap(_.split(" "))val pairs = sc.parallelize(Array(("a", 1), ("b", 2), ("a", 3), ("b", 4)))
// Reduce by key
val sums = pairs.reduceByKey(_ + _) // ("a", 4), ("b", 6)
// Group by key
val grouped = pairs.groupByKey() // ("a", [1, 3]), ("b", [2, 4])
// Join operations
val other = sc.parallelize(Array(("a", "apple"), ("b", "banana")))
val joined = pairs.join(other) // ("a", (1, "apple")), ("a", (3, "apple")), etc.val data = sc.parallelize(1 to 100)
// Collect all data to driver
val collected = data.collect() // Array[Int]
// Count elements
val count = data.count() // 100
// Reduce
val sum = data.reduce(_ + _) // 5050
// Take first n elements
val first10 = data.take(10) // Array(1, 2, 3, ..., 10)val expensiveRDD = data.map(complexComputation)
// Cache in memory for reuse
expensiveRDD.cache()
// Or specify storage level
expensiveRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
// Use multiple times (computed only once due to caching)
val result1 = expensiveRDD.count()
val result2 = expensiveRDD.collect()
// Remove from cache when done
expensiveRDD.unpersist()// Core Spark type for runtime type information
trait ClassTag[T] {
def runtimeClass: Class[_]
}
// RDD partition representation
trait Partition extends Serializable {
def index: Int
}
// Data partitioning strategy
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}
// RDD dependency representation
abstract class Dependency[T] extends Serializable
// Serialization framework
abstract class Serializer {
def newInstance(): SerializerInstance
}
abstract class SerializerInstance {
def serialize[T: ClassTag](t: T): ByteBuffer
def deserialize[T: ClassTag](bytes: ByteBuffer): T
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-core-2-12