Resilient Distributed Dataset API providing transformations and actions for fault-tolerant distributed data processing on large datasets.
Abstract base class for all RDDs providing core distributed dataset functionality with automatic fault recovery through lineage tracking.
/**
* Resilient Distributed Dataset - immutable distributed collection
*/
abstract class RDD[T: ClassTag] extends Serializable {
// Transformations (lazy evaluation)
/** Transform each element using provided function */
def map[U: ClassTag](f: T => U): RDD[U]
/** Transform each element to sequence and flatten results */
def flatMap[U: ClassTag](f: T => IterableOnce[U]): RDD[U]
/** Filter elements matching predicate */
def filter(f: T => Boolean): RDD[T]
/** Map each partition with partition index */
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false
): RDD[U]
/** Sample fraction of elements */
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong
): RDD[T]
/** Return union of this RDD and another */
def union(other: RDD[T]): RDD[T]
/** Return intersection with another RDD */
def intersection(other: RDD[T]): RDD[T]
/** Return distinct elements */
def distinct(numPartitions: Int = partitions.length): RDD[T]
/** Group by key function */
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
/** Reduce partitions to specified number */
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]
/** Repartition to specified number */
def repartition(numPartitions: Int): RDD[T]
/** Sort RDD elements */
def sortBy[K](
f: T => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length
)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
/** Zip with another RDD */
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
/** Zip with element indices */
def zipWithIndex(): RDD[(T, Long)]
/** Zip with unique IDs */
def zipWithUniqueId(): RDD[(T, Long)]
// Actions (trigger computation)
/** Collect all elements to driver */
def collect(): Array[T]
/** Count number of elements */
def count(): Long
/** Return first element */
def first(): T
/** Take first n elements */
def take(num: Int): Array[T]
/** Take ordered elements */
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
/** Take sample */
def takeSample(
withReplacement: Boolean,
num: Int,
seed: Long = Utils.random.nextLong
): Array[T]
/** Reduce elements using function */
def reduce(f: (T, T) => T): T
/** Fold elements with initial value */
def fold(zeroValue: T)(op: (T, T) => T): T
/** Aggregate with different types */
def aggregate[U: ClassTag](zeroValue: U)(
seqOp: (U, T) => U,
combOp: (U, U) => U
): U
/** Apply function to each element */
def foreach(f: T => Unit): Unit
/** Apply function to each partition */
def foreachPartition(f: Iterator[T] => Unit): Unit
/** Save as text file */
def saveAsTextFile(path: String): Unit
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
// Persistence
/** Persist RDD with storage level */
def persist(newLevel: StorageLevel): this.type
/** Persist RDD with default storage level (MEMORY_ONLY) */
def persist(): this.type
/** Cache RDD in memory */
def cache(): this.type
/** Remove persisted data */
def unpersist(blocking: Boolean = false): this.type
/** Mark RDD for checkpointing */
def checkpoint(): Unit
/** Check if RDD is checkpointed */
def isCheckpointed: Boolean
// Metadata
/** Get partitions */
def partitions: Array[Partition]
/** Get partitioner */
def partitioner: Option[Partitioner]
/** Check if RDD is empty */
def isEmpty(): Boolean
/** Get storage level */
def getStorageLevel: StorageLevel
}Additional operations available on RDDs of key-value pairs through implicit conversion.
/**
* Extra functions for RDDs of (key, value) pairs
*/
class PairRDDFunctions[K, V](self: RDD[(K, V)]) {
/** Group values by key */
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
/** Reduce values by key */
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
/** Aggregate values by key */
def aggregateByKey[U: ClassTag](zeroValue: U)(
seqOp: (U, V) => U,
combOp: (U, U) => U
): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(
seqOp: (U, V) => U,
combOp: (U, U) => U
): RDD[(K, U)]
/** Fold values by key */
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
/** Combine values by key */
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C
): RDD[(K, C)]
/** Join with another pair RDD */
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
/** Left outer join */
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
/** Right outer join */
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
/** Full outer join */
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
/** Cogroup with other RDDs */
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]):
RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
/** Get values for keys */
def lookup(key: K): Seq[V]
/** Collect as map */
def collectAsMap(): Map[K, V]
/** Count by key */
def countByKey(): Map[K, Long]
/** Sort by key */
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[(K, V)]
/** Get keys only */
def keys: RDD[K]
/** Get values only */
def values: RDD[V]
/** Subtract by key */
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
/** Save as Hadoop file */
def saveAsHadoopFile[F <: OutputFormat[K, V]](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F]
): Unit
/** Save as new Hadoop API file */
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F]
): Unit
}Statistical operations available on RDDs of numeric values through implicit conversion.
/**
* Extra functions for RDDs of doubles
*/
class DoubleRDDFunctions(self: RDD[Double]) {
/** Compute mean */
def mean(): Double
/** Compute variance */
def variance(): Double
/** Compute standard deviation */
def stdev(): Double
/** Compute sum */
def sum(): Double
/** Compute statistics */
def stats(): StatCounter
/** Compute histogram */
def histogram(buckets: Int): (Array[Double], Array[Long])
def histogram(buckets: Array[Double]): Array[Long]
}
/**
* Statistics counter for numeric RDDs
*/
class StatCounter extends Serializable {
def count: Long
def mean: Double
def sum: Double
def min: Double
def max: Double
def variance: Double
def stdev: Double
}/** RDD from parallel collection */
class ParallelCollectionRDD[T: ClassTag](
@transient sc: SparkContext,
@transient data: Seq[T],
numSlices: Int
) extends RDD[T]
/** RDD from Hadoop InputFormat */
class HadoopRDD[K, V](
sc: SparkContext,
conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int
) extends RDD[(K, V)]
/** RDD from new Hadoop API */
class NewHadoopRDD[K, V](
sc: SparkContext,
inputFormatClass: Class[_ <: NewInputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
conf: Configuration
) extends RDD[(K, V)]
/** RDD from JDBC */
class JdbcRDD[T: ClassTag](
sc: SparkContext,
getConnection: () => Connection,
sql: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
mapRow: ResultSet => T
) extends RDD[T]
/** Empty RDD */
class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T]
/** Union of multiple RDDs */
class UnionRDD[T: ClassTag](sc: SparkContext, rdds: Seq[RDD[T]]) extends RDD[T]
/** Coalesced RDD with fewer partitions */
class CoalescedRDD[T: ClassTag](
prev: RDD[T],
maxPartitions: Int,
shuffle: Boolean = false
) extends RDD[T]
/** Shuffled RDD */
class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
prev: RDD[_ <: Product2[K, V]],
part: Partitioner
) extends RDD[(K, C)]Usage Examples:
import org.apache.spark.{SparkContext, SparkConf}
val sc = new SparkContext(new SparkConf().setAppName("RDD Example"))
// Basic transformations
val numbers = sc.parallelize(1 to 100)
val squares = numbers.map(x => x * x)
val evens = numbers.filter(_ % 2 == 0)
// Pair RDD operations
val pairs = sc.parallelize(List(("a", 1), ("b", 2), ("a", 3)))
val grouped = pairs.groupByKey()
val sums = pairs.reduceByKey(_ + _)
// Statistical operations
val doubles = sc.parallelize(Array(1.0, 2.0, 3.0, 4.0, 5.0))
val avg = doubles.mean()
val stats = doubles.stats()
// Actions
val result = squares.take(10)
val total = numbers.reduce(_ + _)
squares.saveAsTextFile("output/squares")
sc.stop()RDDs maintain partitioning information to optimize distributed operations and minimize data shuffling.
trait Partition extends Serializable {
def index: Int
}
case class TaskContext(
stageId: Int,
stageAttemptNumber: Int,
partitionId: Int,
taskAttemptId: Long,
attemptNumber: Int,
taskMemoryManager: TaskMemoryManager,
localProperties: Properties,
metricsSystem: MetricsSystem
)