or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

accumulators.mdapplication-context.mdbroadcast-variables.mdindex.mdjava-api.mdpartitioning.mdrdd-operations.mdserialization.mdstorage-persistence.md
tile.json

rdd-operations.mddocs/

RDD Operations

Resilient Distributed Dataset API providing transformations and actions for fault-tolerant distributed data processing on large datasets.

Capabilities

Base RDD Class

Abstract base class for all RDDs providing core distributed dataset functionality with automatic fault recovery through lineage tracking.

/**
 * Resilient Distributed Dataset - immutable distributed collection
 */
abstract class RDD[T: ClassTag] extends Serializable {
  // Transformations (lazy evaluation)
  
  /** Transform each element using provided function */
  def map[U: ClassTag](f: T => U): RDD[U]
  
  /** Transform each element to sequence and flatten results */
  def flatMap[U: ClassTag](f: T => IterableOnce[U]): RDD[U]
  
  /** Filter elements matching predicate */
  def filter(f: T => Boolean): RDD[T]
  
  /** Map each partition with partition index */
  def mapPartitionsWithIndex[U: ClassTag](
    f: (Int, Iterator[T]) => Iterator[U],
    preservesPartitioning: Boolean = false
  ): RDD[U]
  
  /** Sample fraction of elements */
  def sample(
    withReplacement: Boolean,
    fraction: Double,
    seed: Long = Utils.random.nextLong
  ): RDD[T]
  
  /** Return union of this RDD and another */
  def union(other: RDD[T]): RDD[T]
  
  /** Return intersection with another RDD */
  def intersection(other: RDD[T]): RDD[T]
  
  /** Return distinct elements */
  def distinct(numPartitions: Int = partitions.length): RDD[T]
  
  /** Group by key function */
  def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
  
  /** Reduce partitions to specified number */
  def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]
  
  /** Repartition to specified number */
  def repartition(numPartitions: Int): RDD[T]
  
  /** Sort RDD elements */
  def sortBy[K](
    f: T => K,
    ascending: Boolean = true,
    numPartitions: Int = this.partitions.length
  )(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
  
  /** Zip with another RDD */
  def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
  
  /** Zip with element indices */
  def zipWithIndex(): RDD[(T, Long)]
  
  /** Zip with unique IDs */
  def zipWithUniqueId(): RDD[(T, Long)]
  
  // Actions (trigger computation)
  
  /** Collect all elements to driver */
  def collect(): Array[T]
  
  /** Count number of elements */
  def count(): Long
  
  /** Return first element */
  def first(): T
  
  /** Take first n elements */
  def take(num: Int): Array[T]
  
  /** Take ordered elements */
  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
  
  /** Take sample */
  def takeSample(
    withReplacement: Boolean,
    num: Int,
    seed: Long = Utils.random.nextLong
  ): Array[T]
  
  /** Reduce elements using function */
  def reduce(f: (T, T) => T): T
  
  /** Fold elements with initial value */
  def fold(zeroValue: T)(op: (T, T) => T): T
  
  /** Aggregate with different types */
  def aggregate[U: ClassTag](zeroValue: U)(
    seqOp: (U, T) => U,
    combOp: (U, U) => U
  ): U
  
  /** Apply function to each element */
  def foreach(f: T => Unit): Unit
  
  /** Apply function to each partition */
  def foreachPartition(f: Iterator[T] => Unit): Unit
  
  /** Save as text file */
  def saveAsTextFile(path: String): Unit
  def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
  
  // Persistence
  
  /** Persist RDD with storage level */
  def persist(newLevel: StorageLevel): this.type
  
  /** Persist RDD with default storage level (MEMORY_ONLY) */
  def persist(): this.type
  
  /** Cache RDD in memory */
  def cache(): this.type
  
  /** Remove persisted data */
  def unpersist(blocking: Boolean = false): this.type
  
  /** Mark RDD for checkpointing */
  def checkpoint(): Unit
  
  /** Check if RDD is checkpointed */
  def isCheckpointed: Boolean
  
  // Metadata
  
  /** Get partitions */
  def partitions: Array[Partition]
  
  /** Get partitioner */
  def partitioner: Option[Partitioner]
  
  /** Check if RDD is empty */
  def isEmpty(): Boolean
  
  /** Get storage level */
  def getStorageLevel: StorageLevel
}

PairRDDFunctions

Additional operations available on RDDs of key-value pairs through implicit conversion.

/**
 * Extra functions for RDDs of (key, value) pairs
 */
class PairRDDFunctions[K, V](self: RDD[(K, V)]) {
  /** Group values by key */
  def groupByKey(): RDD[(K, Iterable[V])]
  def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
  def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
  
  /** Reduce values by key */
  def reduceByKey(func: (V, V) => V): RDD[(K, V)]
  def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
  
  /** Aggregate values by key */
  def aggregateByKey[U: ClassTag](zeroValue: U)(
    seqOp: (U, V) => U,
    combOp: (U, U) => U
  ): RDD[(K, U)]
  def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(
    seqOp: (U, V) => U,
    combOp: (U, U) => U
  ): RDD[(K, U)]
  
  /** Fold values by key */
  def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
  def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
  
  /** Combine values by key */
  def combineByKey[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C
  ): RDD[(K, C)]
  
  /** Join with another pair RDD */
  def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
  def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
  def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
  
  /** Left outer join */
  def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
  
  /** Right outer join */
  def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
  
  /** Full outer join */
  def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
  
  /** Cogroup with other RDDs */
  def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
  def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): 
    RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
  
  /** Get values for keys */
  def lookup(key: K): Seq[V]
  
  /** Collect as map */
  def collectAsMap(): Map[K, V]
  
  /** Count by key */
  def countByKey(): Map[K, Long]
  
  /** Sort by key */
  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
              (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[(K, V)]
  
  /** Get keys only */
  def keys: RDD[K]
  
  /** Get values only */
  def values: RDD[V]
  
  /** Subtract by key */
  def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
  
  /** Save as Hadoop file */
  def saveAsHadoopFile[F <: OutputFormat[K, V]](
    path: String,
    keyClass: Class[_],
    valueClass: Class[_],
    outputFormatClass: Class[F]
  ): Unit
  
  /** Save as new Hadoop API file */
  def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](
    path: String,
    keyClass: Class[_],
    valueClass: Class[_],
    outputFormatClass: Class[F]
  ): Unit
}

DoubleRDDFunctions

Statistical operations available on RDDs of numeric values through implicit conversion.

/**
 * Extra functions for RDDs of doubles
 */
class DoubleRDDFunctions(self: RDD[Double]) {
  /** Compute mean */
  def mean(): Double
  
  /** Compute variance */
  def variance(): Double
  
  /** Compute standard deviation */
  def stdev(): Double
  
  /** Compute sum */
  def sum(): Double
  
  /** Compute statistics */
  def stats(): StatCounter
  
  /** Compute histogram */
  def histogram(buckets: Int): (Array[Double], Array[Long])
  def histogram(buckets: Array[Double]): Array[Long]
}

/**
 * Statistics counter for numeric RDDs
 */
class StatCounter extends Serializable {
  def count: Long
  def mean: Double
  def sum: Double
  def min: Double
  def max: Double
  def variance: Double
  def stdev: Double
}

Specialized RDD Types

/** RDD from parallel collection */
class ParallelCollectionRDD[T: ClassTag](
  @transient sc: SparkContext,
  @transient data: Seq[T],
  numSlices: Int
) extends RDD[T]

/** RDD from Hadoop InputFormat */
class HadoopRDD[K, V](
  sc: SparkContext,
  conf: JobConf,
  inputFormatClass: Class[_ <: InputFormat[K, V]],
  keyClass: Class[K],
  valueClass: Class[V],
  minPartitions: Int
) extends RDD[(K, V)]

/** RDD from new Hadoop API */
class NewHadoopRDD[K, V](
  sc: SparkContext,
  inputFormatClass: Class[_ <: NewInputFormat[K, V]],
  keyClass: Class[K],
  valueClass: Class[V],
  conf: Configuration
) extends RDD[(K, V)]

/** RDD from JDBC */
class JdbcRDD[T: ClassTag](
  sc: SparkContext,
  getConnection: () => Connection,
  sql: String,
  lowerBound: Long,
  upperBound: Long,
  numPartitions: Int,
  mapRow: ResultSet => T
) extends RDD[T]

/** Empty RDD */
class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T]

/** Union of multiple RDDs */
class UnionRDD[T: ClassTag](sc: SparkContext, rdds: Seq[RDD[T]]) extends RDD[T]

/** Coalesced RDD with fewer partitions */
class CoalescedRDD[T: ClassTag](
  prev: RDD[T],
  maxPartitions: Int,
  shuffle: Boolean = false
) extends RDD[T]

/** Shuffled RDD */
class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
  prev: RDD[_ <: Product2[K, V]],
  part: Partitioner
) extends RDD[(K, C)]

Usage Examples:

import org.apache.spark.{SparkContext, SparkConf}

val sc = new SparkContext(new SparkConf().setAppName("RDD Example"))

// Basic transformations
val numbers = sc.parallelize(1 to 100)
val squares = numbers.map(x => x * x)
val evens = numbers.filter(_ % 2 == 0)

// Pair RDD operations
val pairs = sc.parallelize(List(("a", 1), ("b", 2), ("a", 3)))
val grouped = pairs.groupByKey()
val sums = pairs.reduceByKey(_ + _)

// Statistical operations
val doubles = sc.parallelize(Array(1.0, 2.0, 3.0, 4.0, 5.0))
val avg = doubles.mean()
val stats = doubles.stats()

// Actions
val result = squares.take(10)
val total = numbers.reduce(_ + _)
squares.saveAsTextFile("output/squares")

sc.stop()

Partitioning

RDDs maintain partitioning information to optimize distributed operations and minimize data shuffling.

trait Partition extends Serializable {
  def index: Int
}

case class TaskContext(
  stageId: Int,
  stageAttemptNumber: Int,
  partitionId: Int,
  taskAttemptId: Long,
  attemptNumber: Int,
  taskMemoryManager: TaskMemoryManager,
  localProperties: Properties,
  metricsSystem: MetricsSystem
)