Resilient Distributed Dataset (RDD) is the fundamental abstraction in Apache Spark. RDDs are fault-tolerant collections of elements that can be operated on in parallel.
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) {
// Transformations (Lazy)
def map[U: ClassTag](f: T => U): RDD[U]
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
def filter(f: T => Boolean): RDD[T]
def distinct(numPartitions: Int = partitions.length): RDD[T]
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
def union(other: RDD[T]): RDD[T]
def intersection(other: RDD[T]): RDD[T]
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])]
def pipe(command: String): RDD[String]
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]
def repartition(numPartitions: Int): RDD[T]
def sortBy[K](f: T => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
def keyBy[K](f: T => K): RDD[(K, T)]
// Actions (Eager)
def collect(): Array[T]
def count(): Long
def first(): T
def take(num: Int): Array[T]
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
def reduce(f: (T, T) => T): T
def fold(zeroValue: T)(op: (T, T) => T): T
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
def treeAggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U, depth: Int = 2): U
def foreach(f: T => Unit): Unit
def foreachPartition(f: Iterator[T] => Unit): Unit
// I/O Actions
def saveAsTextFile(path: String): Unit
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
def saveAsObjectFile(path: String): Unit
// Persistence
def persist(): RDD[T]
def persist(newLevel: StorageLevel): RDD[T]
def cache(): RDD[T]
def unpersist(blocking: Boolean = false): RDD[T]
def getStorageLevel: StorageLevel
def checkpoint(): Unit
def isCheckpointed: Boolean
def getCheckpointFile: Option[String]
// Metadata
def partitions: Array[Partition]
def partitioner: Option[Partitioner]
def getNumPartitions: Int
def dependencies: Seq[Dependency[_]]
def preferredLocations(split: Partition): Seq[String]
def context: SparkContext
def id: Int
def name: String
def setName(name: String): RDD[T]
}Operations available on RDDs of key-value pairs. These operations are available through implicit conversions.
class PairRDDFunctions[K, V](self: RDD[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) {
// Grouping Operations
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
// Reduction Operations
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
// Aggregation Operations
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
// Partitioning
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
// Join Operations
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]
// Set Operations
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]
// Lookups and Collection
def lookup(key: K): Seq[V]
def collectAsMap(): Map[K, V]
def countByKey(): Map[K, Long]
def countByKeyApprox(timeout: Long, confidence: Double = 0.95): PartialResult[Map[K, BoundedDouble]]
// Value Operations
def mapValues[U](f: V => U): RDD[(K, U)]
def flatMapValues[U](f: V => IterableOnce[U]): RDD[(K, U)]
def keys: RDD[K]
def values: RDD[V]
// Sorting
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
def sortBy[B](f: ((K, V)) => B, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[B], ctag: ClassTag[B]): RDD[(K, V)]
// I/O Operations
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[F], codec: Class[_ <: CompressionCodec]): Unit
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[F], conf: JobConf = new JobConf(self.context.hadoopConfiguration), codec: Option[Class[_ <: CompressionCodec]] = None): Unit
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[F], conf: Configuration = self.context.hadoopConfiguration): Unit
}Statistical operations available on RDDs of Double values.
class DoubleRDDFunctions(self: RDD[Double]) {
// Statistical Operations
def sum(): Double
def stats(): StatCounter
def mean(): Double
def variance(): Double
def stdev(): Double
def sampleStdev(): Double
def sampleVariance(): Double
// Histogram Operations
def histogram(buckets: Array[Double]): Array[Long]
def histogram(buckets: Int): (Array[Double], Array[Long])
}Operations for saving RDDs as Hadoop SequenceFiles.
class SequenceFileRDDFunctions[K, V](self: RDD[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V], keyWritableFactory: WritableFactory[K], valueWritableFactory: WritableFactory[V]) {
def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit
}val numbers = sc.parallelize(1 to 100)
// Map transformation
val squares = numbers.map(x => x * x)
// Filter transformation
val evenNumbers = numbers.filter(_ % 2 == 0)
// FlatMap transformation
val words = sc.parallelize(Array("hello world", "foo bar"))
val allWords = words.flatMap(_.split(" "))val pairs = sc.parallelize(Array(("a", 1), ("b", 2), ("a", 3), ("b", 4)))
// Reduce by key
val sums = pairs.reduceByKey(_ + _) // ("a", 4), ("b", 6)
// Group by key
val grouped = pairs.groupByKey() // ("a", [1, 3]), ("b", [2, 4])
// Join operations
val other = sc.parallelize(Array(("a", "apple"), ("b", "banana")))
val joined = pairs.join(other) // ("a", (1, "apple")), ("a", (3, "apple")), etc.val data = sc.parallelize(1 to 100)
// Collect all data to driver
val collected = data.collect() // Array[Int]
// Count elements
val count = data.count() // 100
// Reduce
val sum = data.reduce(_ + _) // 5050
// Take first n elements
val first10 = data.take(10) // Array(1, 2, 3, ..., 10)val expensiveRDD = data.map(complexComputation)
// Cache in memory for reuse
expensiveRDD.cache()
// Or specify storage level
expensiveRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
// Use multiple times (computed only once due to caching)
val result1 = expensiveRDD.count()
val result2 = expensiveRDD.collect()
// Remove from cache when done
expensiveRDD.unpersist()// Core Spark type for runtime type information
trait ClassTag[T] {
def runtimeClass: Class[_]
}
// RDD partition representation
trait Partition extends Serializable {
def index: Int
}
// Data partitioning strategy
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}
// RDD dependency representation
abstract class Dependency[T] extends Serializable
// Serialization framework
abstract class Serializer {
def newInstance(): SerializerInstance
}
abstract class SerializerInstance {
def serialize[T: ClassTag](t: T): ByteBuffer
def deserialize[T: ClassTag](bytes: ByteBuffer): T
}