CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-core-2-11

Apache Spark Core - The foundational component of Apache Spark providing distributed computing capabilities including RDDs, transformations, actions, and cluster management.

Pending
Overview
Eval results
Files

rdd-operations.mddocs/

RDD Operations

Resilient Distributed Datasets (RDDs) are the fundamental data abstraction in Spark. They represent immutable, partitioned collections of elements that can be operated on in parallel. RDDs support two types of operations: transformations (which create new RDDs) and actions (which return values).

Core RDD Class

abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
) {
  // Transformations - Lazy operations that return new RDDs
  def map[U: ClassTag](f: T => U): RDD[U]
  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
  def filter(f: T => Boolean): RDD[T]
  def distinct(numPartitions: Int = partitions.length): RDD[T]
  def union(other: RDD[T]): RDD[T]
  def intersection(other: RDD[T]): RDD[T]
  def intersection(other: RDD[T], partitioner: Partitioner): RDD[T]
  def intersection(other: RDD[T], numPartitions: Int): RDD[T]
  def subtract(other: RDD[T]): RDD[T]
  def subtract(other: RDD[T], numPartitions: Int): RDD[T]
  def subtract(other: RDD[T], p: Partitioner): RDD[T]
  def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
  def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
  def zipWithIndex(): RDD[(T, Long)]
  def zipWithUniqueId(): RDD[(T, Long)]
  def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
  def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]
  def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
  def repartition(numPartitions: Int): RDD[T]
  def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]
  def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
  def glom(): RDD[Array[T]]
  def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
  def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
  def pipe(command: String): RDD[String]
  def pipe(command: Seq[String], env: Map[String, String] = Map(), printPipeRDDInfo: Option[String => Unit] = None, propagateFailure: Boolean = false, checkExitCode: Boolean = true): RDD[String]
  
  // Actions - Operations that trigger computation and return values
  def collect(): Array[T]
  def count(): Long
  def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]
  def countApproxDistinct(relativeSD: Double = 0.05): Long
  def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]
  def first(): T
  def isEmpty(): Boolean
  def take(num: Int): Array[T]
  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
  def top(num: Int)(implicit ord: Ordering[T]): Array[T]
  def min()(implicit ord: Ordering[T]): T
  def max()(implicit ord: Ordering[T]): T
  def reduce(f: (T, T) => T): T
  def treeReduce(f: (T, T) => T, depth: Int = 2): T
  def fold(zeroValue: T)(op: (T, T) => T): T
  def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
  def treeAggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U, depth: Int = 2): U
  def foreach(f: T => Unit): Unit
  def foreachPartition(f: Iterator[T] => Unit): Unit
  
  // Persistence and Caching
  def persist(): this.type
  def persist(newLevel: StorageLevel): this.type
  def cache(): this.type
  def unpersist(blocking: Boolean = true): this.type
  def checkpoint(): Unit
  def isCheckpointed: Boolean
  def getCheckpointFile: Option[String]
  def localCheckpoint(): this.type
  
  // Partition and Dependency Information
  def partitions: Array[Partition]
  def partitioner: Option[Partitioner]
  def getNumPartitions: Int
  def dependencies: Seq[Dependency[_]]
  def preferredLocations(split: Partition): Seq[String]
  def compute(split: Partition, context: TaskContext): Iterator[T]
  
  // Metadata and Debugging
  def id: Int
  def name: String
  def setName(name: String): this.type
  def toDebugString: String
  def getStorageLevel: StorageLevel
  def context: SparkContext
}

Transformations

Transformations are lazy operations that define new RDDs but don't trigger computation immediately.

Basic Transformations

import org.apache.spark.{SparkContext, SparkConf}

val sc = new SparkContext(new SparkConf().setAppName("RDD Examples").setMaster("local[*]"))

// Create sample data
val numbers = sc.parallelize(1 to 100)
val words = sc.parallelize(Seq("hello world", "spark is awesome", "big data processing"))

// map - Transform each element
val squared = numbers.map(x => x * x)
val lengths = words.map(_.length)

// flatMap - Transform each element to multiple elements
val allWords = words.flatMap(_.split(" "))
val digits = numbers.flatMap(n => n.toString.toCharArray.map(_.asDigit))

// filter - Keep elements matching predicate
val evenNumbers = numbers.filter(_ % 2 == 0)
val longWords = allWords.filter(_.length > 4)

// distinct - Remove duplicates
val uniqueWords = allWords.distinct()
val uniqueDigits = digits.distinct()

// sample - Random sampling
val sample = numbers.sample(withReplacement = false, fraction = 0.1, seed = 42)
val sampleWithReplacement = numbers.sample(withReplacement = true, fraction = 0.2)

Set Operations

val rdd1 = sc.parallelize(1 to 10)
val rdd2 = sc.parallelize(5 to 15)

// union - Combine RDDs (allows duplicates)
val combined = rdd1.union(rdd2)

// intersection - Elements in both RDDs
val common = rdd1.intersection(rdd2)

// subtract - Elements in first RDD but not second
val difference = rdd1.subtract(rdd2)

// cartesian - Cartesian product
val pairs = rdd1.cartesian(rdd2)

Pairing and Zipping

val data = sc.parallelize(Seq("apple", "banana", "cherry"))
val scores = sc.parallelize(Seq(95, 87, 92))

// zip - Combine elements at same positions
val pairs = data.zip(scores)

// zipWithIndex - Add sequential indices
val indexed = data.zipWithIndex()

// zipWithUniqueId - Add unique IDs (not necessarily sequential)
val withIds = data.zipWithUniqueId()

Repartitioning

val data = sc.parallelize(1 to 1000, numSlices = 8)

// repartition - Change number of partitions (shuffles data)
val repartitioned = data.repartition(4)

// coalesce - Reduce partitions without shuffling (when possible)
val coalesced = data.coalesce(2)

// sortBy - Sort RDD by key function
val sorted = data.sortBy(x => -x, ascending = false) // Sort descending

Advanced Transformations

// glom - Collect all elements in each partition into arrays
val partitionArrays = numbers.glom()

// mapPartitions - Transform entire partitions
val partitionSums = numbers.mapPartitions(iter => Iterator(iter.sum))

// mapPartitionsWithIndex - Access partition index
val partitionInfo = numbers.mapPartitionsWithIndex { (index, iter) =>
  Iterator((index, iter.sum, iter.size))
}

// pipe - Send data through external process
val upperCased = allWords.pipe("tr [a-z] [A-Z]")

Actions

Actions trigger computation and return results to the driver or save data to external systems.

Collection Actions

val data = sc.parallelize(1 to 100)

// collect - Bring all elements to driver (use with caution for large datasets)
val allElements: Array[Int] = data.collect()

// take - Get first n elements
val first10: Array[Int] = data.take(10)

// takeOrdered - Get n smallest elements
val smallest5: Array[Int] = data.takeOrdered(5)

// top - Get n largest elements  
val largest5: Array[Int] = data.top(5)

// takeSample - Random sample
val randomSample: Array[Int] = data.takeSample(withReplacement = false, num = 20)

// first - Get first element
val firstElement: Int = data.first()

// min/max - Find extremes
val minimum: Int = data.min()
val maximum: Int = data.max()

Aggregation Actions

// count - Count elements
val totalCount: Long = data.count()

// countByValue - Count occurrences of each value
val wordCounts: Map[String, Long] = allWords.countByValue()

// reduce - Combine elements with associative function
val sum: Int = data.reduce(_ + _)
val product: Int = data.reduce(_ * _)

// fold - Like reduce but with initial value
val sumWithZero: Int = data.fold(0)(_ + _)

// aggregate - More flexible reduction with different input/output types
val stats: (Int, Double) = data.aggregate((0, 0.0))(
  seqOp = { case ((count, sum), value) => (count + 1, sum + value) },
  combOp = { case ((c1, s1), (c2, s2)) => (c1 + c2, s1 + s2) }
)

// treeReduce/treeAggregate - Hierarchical reduction (more efficient for deep lineages)
val treeSum: Int = data.treeReduce(_ + _)

Side-Effect Actions

// foreach - Execute function on each element (driver side)
data.foreach(println)

// foreachPartition - Execute function on each partition
data.foreachPartition { partition =>
  val connection = createDatabaseConnection()
  partition.foreach(value => insertToDatabase(connection, value))
  connection.close()
}

Approximate Actions

// countApprox - Approximate count with timeout
val approxCount = data.countApprox(timeout = 1000L, confidence = 0.95)

// countApproxDistinct - Approximate distinct count
val approxDistinct = data.countApproxDistinct(relativeSD = 0.05)

Persistence and Caching

RDDs can be persisted in memory or disk for efficient reuse across multiple actions.

import org.apache.spark.storage.StorageLevel

val expensiveRDD = data
  .filter(_ > 50)
  .map(expensiveComputation)
  .filter(_.nonEmpty)

// Cache in memory (shorthand for MEMORY_ONLY)
expensiveRDD.cache()

// Explicit persistence with storage level
expensiveRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)

// Use the cached RDD multiple times
val result1 = expensiveRDD.count()
val result2 = expensiveRDD.collect()
val result3 = expensiveRDD.take(10)

// Remove from cache when done
expensiveRDD.unpersist()

Checkpointing

Checkpointing saves RDD data to reliable storage to truncate lineage and improve fault tolerance.

// Set checkpoint directory
sc.setCheckpointDir("hdfs://namenode:port/checkpoints")

val complexRDD = data
  .map(complexTransformation1)
  .filter(complexFilter)
  .map(complexTransformation2)
  .filter(anotherComplexFilter)

// Checkpoint the RDD
complexRDD.checkpoint()

// Trigger checkpointing with an action
complexRDD.count()

// Verify checkpointing
println(s"Is checkpointed: ${complexRDD.isCheckpointed}")
println(s"Checkpoint file: ${complexRDD.getCheckpointFile}")

Partition Management

Understanding and controlling partitions is crucial for performance optimization.

val data = sc.textFile("large-file.txt", minPartitions = 100)

// Check partition information
println(s"Number of partitions: ${data.getNumPartitions}")
println(s"Partitioner: ${data.partitioner}")

// View partition content
data.glom().collect().zipWithIndex.foreach { case (partitionData, index) =>
  println(s"Partition $index has ${partitionData.length} elements")
}

// Custom partitioning for key-value RDDs
val keyValueData = data.map(line => (line.length, line))
val hashPartitioned = keyValueData.partitionBy(new HashPartitioner(50))
val rangePartitioned = keyValueData.partitionBy(new RangePartitioner(50, keyValueData))

Advanced Usage Patterns

Pipeline Optimization

// Chain transformations efficiently
val pipeline = sc.textFile("input")
  .filter(_.nonEmpty)
  .map(_.toLowerCase.trim)
  .filter(_.length > 5)
  .map(processLine)
  .filter(_.isValid)
  .persist(StorageLevel.MEMORY_AND_DISK_SER) // Persist at strategic points

// Multiple actions on same RDD
val validLines = pipeline.cache()
val count = validLines.count()
val sample = validLines.take(100)
val summary = validLines.map(_.summary).collect()

Error Handling

import scala.util.{Try, Success, Failure}

val robustRDD = data.map { element =>
  Try(riskyOperation(element)) match {
    case Success(result) => Some(result)
    case Failure(exception) => 
      logError(s"Failed to process $element: ${exception.getMessage}")
      None
  }
}.filter(_.isDefined).map(_.get)

Custom Partitioning for Performance

class CustomPartitioner(numPartitions: Int) extends Partitioner {
  def numPartitions: Int = numPartitions
  
  def getPartition(key: Any): Int = {
    key match {
      case s: String => math.abs(s.hashCode) % numPartitions
      case i: Int => i % numPartitions
      case _ => 0
    }
  }
}

val customPartitioned = keyValueData.partitionBy(new CustomPartitioner(100))

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-core-2-11

docs

broadcast-accumulators.md

context-configuration.md

index.md

java-api.md

key-value-operations.md

rdd-operations.md

status-monitoring.md

storage-persistence.md

task-context.md

tile.json