or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

context-config.mdindex.mdjava-api.mdrdd-operations.mdresource-management.mdserialization.mdshared-variables.mdstorage-caching.mdtask-context.md
tile.json

rdd-operations.mddocs/

RDD Operations

RDD Base Class

Resilient Distributed Dataset (RDD) is the fundamental abstraction in Apache Spark. RDDs are fault-tolerant collections of elements that can be operated on in parallel.

abstract class RDD[T: ClassTag](
  @transient private var _sc: SparkContext,
  @transient private var deps: Seq[Dependency[_]]
) {
  // Transformations (Lazy)
  def map[U: ClassTag](f: T => U): RDD[U]
  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
  def filter(f: T => Boolean): RDD[T]
  def distinct(numPartitions: Int = partitions.length): RDD[T]
  def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
  def union(other: RDD[T]): RDD[T]
  def intersection(other: RDD[T]): RDD[T]
  def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
  def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])]
  def pipe(command: String): RDD[String]
  def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]
  def repartition(numPartitions: Int): RDD[T]
  def sortBy[K](f: T => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
  def keyBy[K](f: T => K): RDD[(K, T)]
  
  // Actions (Eager)
  def collect(): Array[T]
  def count(): Long
  def first(): T
  def take(num: Int): Array[T]
  def top(num: Int)(implicit ord: Ordering[T]): Array[T]
  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
  def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
  def reduce(f: (T, T) => T): T
  def fold(zeroValue: T)(op: (T, T) => T): T
  def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
  def treeAggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U, depth: Int = 2): U
  def foreach(f: T => Unit): Unit
  def foreachPartition(f: Iterator[T] => Unit): Unit
  
  // I/O Actions
  def saveAsTextFile(path: String): Unit
  def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
  def saveAsObjectFile(path: String): Unit
  
  // Persistence
  def persist(): RDD[T]
  def persist(newLevel: StorageLevel): RDD[T]
  def cache(): RDD[T]
  def unpersist(blocking: Boolean = false): RDD[T]
  def getStorageLevel: StorageLevel
  def checkpoint(): Unit
  def isCheckpointed: Boolean
  def getCheckpointFile: Option[String]
  
  // Metadata
  def partitions: Array[Partition]
  def partitioner: Option[Partitioner]
  def getNumPartitions: Int
  def dependencies: Seq[Dependency[_]]
  def preferredLocations(split: Partition): Seq[String]
  def context: SparkContext
  def id: Int
  def name: String
  def setName(name: String): RDD[T]
}

PairRDDFunctions

Operations available on RDDs of key-value pairs. These operations are available through implicit conversions.

class PairRDDFunctions[K, V](self: RDD[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) {
  // Grouping Operations
  def groupByKey(): RDD[(K, Iterable[V])]
  def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
  def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
  
  // Reduction Operations
  def reduceByKey(func: (V, V) => V): RDD[(K, V)]
  def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
  def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
  def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
  def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
  
  // Aggregation Operations
  def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
  def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
  def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
  def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
  def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
  def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
  
  // Partitioning
  def partitionBy(partitioner: Partitioner): RDD[(K, V)]
  
  // Join Operations
  def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
  def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
  def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
  def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
  def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]
  def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]
  def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
  def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]
  def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]
  def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
  def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]
  def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]
  def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
  def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
  def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]
  
  // Set Operations
  def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
  def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
  def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]
  
  // Lookups and Collection
  def lookup(key: K): Seq[V]
  def collectAsMap(): Map[K, V]
  def countByKey(): Map[K, Long]
  def countByKeyApprox(timeout: Long, confidence: Double = 0.95): PartialResult[Map[K, BoundedDouble]]
  
  // Value Operations
  def mapValues[U](f: V => U): RDD[(K, U)]
  def flatMapValues[U](f: V => IterableOnce[U]): RDD[(K, U)]
  def keys: RDD[K]
  def values: RDD[V]
  
  // Sorting
  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
  def sortBy[B](f: ((K, V)) => B, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[B], ctag: ClassTag[B]): RDD[(K, V)]
  
  // I/O Operations
  def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[F], codec: Class[_ <: CompressionCodec]): Unit
  def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[F], conf: JobConf = new JobConf(self.context.hadoopConfiguration), codec: Option[Class[_ <: CompressionCodec]] = None): Unit
  def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[F], conf: Configuration = self.context.hadoopConfiguration): Unit
}

DoubleRDDFunctions

Statistical operations available on RDDs of Double values.

class DoubleRDDFunctions(self: RDD[Double]) {
  // Statistical Operations
  def sum(): Double
  def stats(): StatCounter
  def mean(): Double
  def variance(): Double
  def stdev(): Double
  def sampleStdev(): Double
  def sampleVariance(): Double
  
  // Histogram Operations
  def histogram(buckets: Array[Double]): Array[Long]
  def histogram(buckets: Int): (Array[Double], Array[Long])
}

SequenceFileRDDFunctions

Operations for saving RDDs as Hadoop SequenceFiles.

class SequenceFileRDDFunctions[K, V](self: RDD[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V], keyWritableFactory: WritableFactory[K], valueWritableFactory: WritableFactory[V]) {
  def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit
}

Usage Examples

Basic Transformations

val numbers = sc.parallelize(1 to 100)

// Map transformation
val squares = numbers.map(x => x * x)

// Filter transformation  
val evenNumbers = numbers.filter(_ % 2 == 0)

// FlatMap transformation
val words = sc.parallelize(Array("hello world", "foo bar"))
val allWords = words.flatMap(_.split(" "))

Key-Value Operations

val pairs = sc.parallelize(Array(("a", 1), ("b", 2), ("a", 3), ("b", 4)))

// Reduce by key
val sums = pairs.reduceByKey(_ + _) // ("a", 4), ("b", 6)

// Group by key
val grouped = pairs.groupByKey() // ("a", [1, 3]), ("b", [2, 4])

// Join operations
val other = sc.parallelize(Array(("a", "apple"), ("b", "banana")))
val joined = pairs.join(other) // ("a", (1, "apple")), ("a", (3, "apple")), etc.

Actions

val data = sc.parallelize(1 to 100)

// Collect all data to driver
val collected = data.collect() // Array[Int]

// Count elements
val count = data.count() // 100

// Reduce
val sum = data.reduce(_ + _) // 5050

// Take first n elements
val first10 = data.take(10) // Array(1, 2, 3, ..., 10)

Persistence

val expensiveRDD = data.map(complexComputation)

// Cache in memory for reuse
expensiveRDD.cache()

// Or specify storage level
expensiveRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)

// Use multiple times (computed only once due to caching)
val result1 = expensiveRDD.count()
val result2 = expensiveRDD.collect()

// Remove from cache when done
expensiveRDD.unpersist()

Core Types

// Core Spark type for runtime type information
trait ClassTag[T] {
  def runtimeClass: Class[_]
}

// RDD partition representation
trait Partition extends Serializable {
  def index: Int
}

// Data partitioning strategy
abstract class Partitioner extends Serializable {
  def numPartitions: Int
  def getPartition(key: Any): Int
}

// RDD dependency representation
abstract class Dependency[T] extends Serializable

// Serialization framework
abstract class Serializer {
  def newInstance(): SerializerInstance
}

abstract class SerializerInstance {
  def serialize[T: ClassTag](t: T): ByteBuffer
  def deserialize[T: ClassTag](bytes: ByteBuffer): T
}