CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-apache-spark

Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R

Pending
Overview
Eval results
Files

core-rdd.mddocs/

Core RDD Operations

The RDD (Resilient Distributed Dataset) is the fundamental abstraction in Apache Spark. It represents an immutable, partitioned collection of elements that can be operated on in parallel with fault-tolerance built-in.

RDD Class

abstract class RDD[T] extends Serializable {
  // Core properties
  def partitions: Array[Partition]
  def compute(split: Partition, context: TaskContext): Iterator[T]
  def dependencies: Seq[Dependency[_]]
  def partitioner: Option[Partitioner] = None
  def preferredLocations(split: Partition): Seq[String] = Nil
}

Transformations

Transformations are lazy operations that create a new RDD from an existing one. They are not executed immediately but build a directed acyclic graph (DAG) of computations.

Basic Transformations

map: Apply a function to each element

def map[U: ClassTag](f: T => U): RDD[U]
val numbers = sc.parallelize(Array(1, 2, 3, 4, 5))
val squared = numbers.map(x => x * x)
// Result: RDD containing [1, 4, 9, 16, 25]

flatMap: Apply a function and flatten the results

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
val lines = sc.parallelize(Array("hello world", "spark rdd"))
val words = lines.flatMap(line => line.split(" "))
// Result: RDD containing ["hello", "world", "spark", "rdd"]

filter: Keep elements that satisfy a predicate

def filter(f: T => Boolean): RDD[T]
val numbers = sc.parallelize(Array(1, 2, 3, 4, 5, 6))
val evens = numbers.filter(_ % 2 == 0)
// Result: RDD containing [2, 4, 6]

distinct: Remove duplicate elements

def distinct(): RDD[T]
def distinct(numPartitions: Int): RDD[T]
val data = sc.parallelize(Array(1, 2, 2, 3, 3, 3))
val unique = data.distinct()
// Result: RDD containing [1, 2, 3]

Sampling Transformations

sample: Return a sampled subset

def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
val data = sc.parallelize(1 to 100)
val sampled = data.sample(withReplacement = false, fraction = 0.1)
// Result: RDD with approximately 10% of original elements

randomSplit: Split RDD randomly into multiple RDDs

def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]
val data = sc.parallelize(1 to 100)
val Array(train, test) = data.randomSplit(Array(0.7, 0.3))
// Result: Two RDDs with ~70% and ~30% of data respectively

Set Operations

union: Return the union of two RDDs

def union(other: RDD[T]): RDD[T]

intersection: Return the intersection of two RDDs

def intersection(other: RDD[T]): RDD[T]
def intersection(other: RDD[T], numPartitions: Int): RDD[T]

subtract: Return elements in this RDD but not in the other

def subtract(other: RDD[T]): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], p: Partitioner): RDD[T]
val rdd1 = sc.parallelize(Array(1, 2, 3, 4))
val rdd2 = sc.parallelize(Array(3, 4, 5, 6))

val unionRDD = rdd1.union(rdd2)          // [1, 2, 3, 4, 3, 4, 5, 6]
val intersectionRDD = rdd1.intersection(rdd2) // [3, 4]
val subtractRDD = rdd1.subtract(rdd2)    // [1, 2]

Pairing and Joining

zip: Zip this RDD with another one, returning key-value pairs

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

zipWithIndex: Zip with the element indices

def zipWithIndex(): RDD[(T, Long)]

zipWithUniqueId: Zip with generated unique IDs

def zipWithUniqueId(): RDD[(T, Long)]

cartesian: Return Cartesian product with another RDD

def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
val rdd1 = sc.parallelize(Array("a", "b"))
val rdd2 = sc.parallelize(Array(1, 2))

val zipped = rdd1.zip(rdd2)              // [("a", 1), ("b", 2)]
val withIndex = rdd1.zipWithIndex()      // [("a", 0), ("b", 1)]
val cartesian = rdd1.cartesian(rdd2)     // [("a", 1), ("a", 2), ("b", 1), ("b", 2)]

Grouping and Sorting

groupBy: Group elements by a key function

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
val data = sc.parallelize(Array(1, 2, 3, 4, 5, 6))
val grouped = data.groupBy(_ % 2)  // Group by even/odd
// Result: [(0, [2, 4, 6]), (1, [1, 3, 5])]

keyBy: Create tuples by applying a function to generate keys

def keyBy[K](f: T => K): RDD[(K, T)]
val words = sc.parallelize(Array("apple", "banana", "apricot"))
val byFirstLetter = words.keyBy(_.charAt(0))
// Result: [('a', "apple"), ('b', "banana"), ('a', "apricot")]

Partitioning Transformations

repartition: Increase or decrease partitions with shuffle

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

coalesce: Reduce the number of partitions (optionally with shuffle)

def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]
val data = sc.parallelize(1 to 1000, 10)  // 10 partitions
val repartitioned = data.repartition(5)   // 5 partitions with shuffle
val coalesced = data.coalesce(5)          // 5 partitions without shuffle

Partition-wise Operations

mapPartitions: Apply function to each partition independently

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

mapPartitionsWithIndex: Apply function with partition index

def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

glom: Return an array of all elements in each partition

def glom(): RDD[Array[T]]
val data = sc.parallelize(Array(1, 2, 3, 4, 5, 6), 2)

// Sum elements in each partition
val partitionSums = data.mapPartitions(iter => Iterator(iter.sum))

// Add partition index to each element  
val withPartitionId = data.mapPartitionsWithIndex((index, iter) => 
  iter.map(value => (index, value)))

// Get arrays of elements per partition
val partitionArrays = data.glom()  // [[1, 2, 3], [4, 5, 6]]

Actions

Actions trigger the execution of transformations and return values to the driver program or save data to storage.

Collection Actions

collect: Return all elements as an array (use with caution on large datasets)

def collect(): Array[T]

take: Return first n elements

def take(num: Int): Array[T]

takeOrdered: Return K smallest elements

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

top: Return K largest elements

def top(num: Int)(implicit ord: Ordering[T]): Array[T]

takeSample: Return a random sample of elements

def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]

first: Return the first element

def first(): T
val data = sc.parallelize(Array(5, 1, 3, 9, 2, 7))

val all = data.collect()                    // [5, 1, 3, 9, 2, 7]
val first3 = data.take(3)                   // [5, 1, 3]  
val smallest2 = data.takeOrdered(2)         // [1, 2]
val largest2 = data.top(2)                  // [9, 7]
val sample = data.takeSample(false, 3)      // Random 3 elements
val firstElement = data.first()             // 5

Aggregation Actions

count: Return the number of elements

def count(): Long

countByValue: Return count of each unique value

def countByValue()(implicit ord: Ordering[T]): Map[T, Long]

reduce: Reduce elements using an associative function

def reduce(f: (T, T) => T): T

fold: Aggregate with a zero value

def fold(zeroValue: T)(op: (T, T) => T): T

aggregate: Aggregate with different result type

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
val numbers = sc.parallelize(Array(1, 2, 3, 4, 5))

val count = numbers.count()                          // 5
val sum = numbers.reduce(_ + _)                      // 15
val sumWithZero = numbers.fold(0)(_ + _)             // 15
val (sum2, count2) = numbers.aggregate((0, 0))(      // (15, 5)
  (acc, value) => (acc._1 + value, acc._2 + 1),     // seqOp
  (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // combOp
)

val valueCounts = sc.parallelize(Array("a", "b", "a", "c", "b")).countByValue()
// Result: Map("a" -> 2, "b" -> 2, "c" -> 1)

Statistical Actions

For RDDs containing numeric types, additional statistical operations are available through implicit conversions:

min/max: Find minimum/maximum values

def min()(implicit ord: Ordering[T]): T
def max()(implicit ord: Ordering[T]): T
val numbers = sc.parallelize(Array(5, 1, 9, 3, 7))
val minimum = numbers.min()  // 1
val maximum = numbers.max()  // 9

Iterator Actions

foreach: Apply a function to each element (side effects only)

def foreach(f: T => Unit): Unit

foreachPartition: Apply a function to each partition

def foreachPartition(f: Iterator[T] => Unit): Unit

toLocalIterator: Return an iterator that consumes each partition sequentially

def toLocalIterator: Iterator[T]
val data = sc.parallelize(Array(1, 2, 3, 4, 5))

// Print each element (for debugging)
data.foreach(println)

// Process each partition (e.g., write to database)
data.foreachPartition { partition =>
  // Setup database connection
  partition.foreach { element =>
    // Insert element into database
  }
  // Close database connection
}

Save Operations

saveAsTextFile: Save RDD as text files

def saveAsTextFile(path: String): Unit
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit

saveAsObjectFile: Save as SequenceFile of serialized objects

def saveAsObjectFile(path: String): Unit
val data = sc.parallelize(Array(1, 2, 3, 4, 5))

// Save as text files
data.saveAsTextFile("hdfs://path/to/output")

// Save with compression
import org.apache.hadoop.io.compress.GzipCodec
data.saveAsTextFile("hdfs://path/to/compressed", classOf[GzipCodec])

// Save as object file
data.saveAsObjectFile("hdfs://path/to/objects")

RDD Properties and Metadata

Partition Information:

def partitions: Array[Partition]          // Get partition array
def getNumPartitions: Int                 // Get number of partitions
def partitioner: Option[Partitioner]      // Get partitioner if any

Dependencies and Lineage:

def dependencies: Seq[Dependency[_]]      // RDD dependencies
def toDebugString: String                 // Debug string showing lineage

Naming and Context:

def setName(name: String): RDD[T]        // Set RDD name for monitoring
def name: String                         // Get RDD name
def id: Int                              // Unique RDD identifier
def sparkContext: SparkContext           // The SparkContext that created this RDD
val data = sc.parallelize(Array(1, 2, 3, 4, 5), 3).setName("MyRDD")

println(s"Partitions: ${data.getNumPartitions}")  // Partitions: 3
println(s"Name: ${data.name}")                    // Name: MyRDD
println(s"ID: ${data.id}")                        // ID: 0
println(data.toDebugString)                       // Shows RDD lineage

Type Conversions

toJavaRDD: Convert to Java RDD

def toJavaRDD(): JavaRDD[T]

Error Handling and Common Exceptions

RDD operations can fail for various reasons. Understanding common error scenarios helps with debugging and building robust applications.

Common RDD Exceptions

SparkException: General Spark execution errors

// Task failure due to out of memory, serialization issues, etc.
try {
  val result = rdd.collect()
} catch {
  case e: SparkException => println(s"Spark execution failed: ${e.getMessage}")
}

TaskResultLost: Task results lost due to network or node failure

// Typically indicates node failure or network issues
// Spark automatically retries, but may eventually fail

OutOfMemoryError: Driver or executor runs out of memory

// Common with collect() on large datasets
try {
  val allData = largeRDD.collect()  // Dangerous!
} catch {
  case _: OutOfMemoryError => 
    println("Dataset too large for collect(). Use take() or write to storage.")
}

RDD Action Error Scenarios

collect(): Can cause OutOfMemoryError if result doesn't fit in driver memory

// Safe: Use take() for sampling
val sample = rdd.take(100)

// Dangerous: Collect entire large dataset
val all = rdd.collect()  // May cause OOM

reduce(): Fails if RDD is empty

try {
  val sum = rdd.reduce(_ + _)
} catch {
  case e: UnsupportedOperationException => 
    println("Cannot reduce empty RDD")
}

// Safe alternative
val sum = rdd.fold(0)(_ + _)  // Works with empty RDDs

first(): Throws exception if RDD is empty

try {
  val firstElement = rdd.first()
} catch {
  case e: UnsupportedOperationException =>
    println("RDD is empty")
}

// Safe alternative
val maybeFirst = rdd.take(1).headOption

Serialization Errors

NotSerializableException: Objects must be serializable for distribution

class NotSerializable {
  def process(x: Int): Int = x * 2
}

val processor = new NotSerializable()

// This will fail at runtime
val result = rdd.map(x => processor.process(x))  // NotSerializableException

// Solution: Make objects serializable or use functions
val result = rdd.map(x => x * 2)  // Safe

Partition and File Errors

FileNotFoundException: When reading non-existent files

try {
  val data = sc.textFile("hdfs://nonexistent/path")
  data.count()  // Error occurs on action, not creation
} catch {
  case e: FileNotFoundException =>
    println(s"File not found: ${e.getMessage}")
}

InvalidInputException: Malformed input data

// Handle corrupted or malformed files
try {
  val data = sc.sequenceFile[String, String]("path/to/corrupted/file")
  data.collect()
} catch {
  case e: InvalidInputException =>
    println(s"Invalid input format: ${e.getMessage}")
}

Network and Cluster Errors

Connection timeouts: Network issues between nodes

  • Configure spark.network.timeout for longer operations
  • Use spark.sql.adaptive.coalescePartitions.enabled=true to reduce network overhead

Shuffle failures: Data shuffle operations failing

  • Common with operations like groupByKey, join, distinct
  • Increase spark.serializer.objectStreamReset interval
  • Consider using reduceByKey instead of groupByKey

Best Practices for Error Handling

  1. Avoid collect() on large datasets: Use take(), sample(), or write to storage
  2. Handle empty RDDs: Use fold() instead of reduce(), check isEmpty() before actions
  3. Ensure serializability: Keep closures simple, avoid capturing non-serializable objects
  4. Monitor resource usage: Configure appropriate executor memory and cores
  5. Use checkpointing: For long lineages, checkpoint intermediate results
  6. Handle file system errors: Validate paths exist before reading, use try-catch for file operations
// Robust RDD processing pattern
def processRDDSafely[T](rdd: RDD[T]): Option[Array[T]] = {
  try {
    if (rdd.isEmpty()) {
      println("Warning: RDD is empty")
      None
    } else {
      // Use take() instead of collect() for safety
      val sample = rdd.take(1000)
      Some(sample)
    }
  } catch {
    case e: SparkException =>
      println(s"Spark processing failed: ${e.getMessage}")
      None
    case e: OutOfMemoryError =>
      println("Out of memory - dataset too large")
      None
  }
}

This comprehensive coverage of RDD operations provides the foundation for all data processing in Apache Spark. Understanding these operations and their potential failure modes is crucial for effective Spark programming.

Install with Tessl CLI

npx tessl i tessl/maven-apache-spark

docs

caching-persistence.md

core-rdd.md

data-sources.md

graphx.md

index.md

java-api.md

key-value-operations.md

mllib.md

python-api.md

spark-context.md

sql.md

streaming.md

tile.json