Spec Registry
Help your agents use open-source better. Learn more.
Find usage specs for your project’s dependencies
- Author
- tessl
- Last updated
- Spec files
maven-apache-spark
Describes: maven/org.apache.spark/spark-parent
- Description
- Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
- Author
- tessl
- Last updated
core-rdd.md docs/
1# Core RDD Operations23The RDD (Resilient Distributed Dataset) is the fundamental abstraction in Apache Spark. It represents an immutable, partitioned collection of elements that can be operated on in parallel with fault-tolerance built-in.45## RDD Class67```scala { .api }8abstract class RDD[T] extends Serializable {9// Core properties10def partitions: Array[Partition]11def compute(split: Partition, context: TaskContext): Iterator[T]12def dependencies: Seq[Dependency[_]]13def partitioner: Option[Partitioner] = None14def preferredLocations(split: Partition): Seq[String] = Nil15}16```1718## Transformations1920Transformations are lazy operations that create a new RDD from an existing one. They are not executed immediately but build a directed acyclic graph (DAG) of computations.2122### Basic Transformations2324**map**: Apply a function to each element25```scala { .api }26def map[U: ClassTag](f: T => U): RDD[U]27```2829```scala30val numbers = sc.parallelize(Array(1, 2, 3, 4, 5))31val squared = numbers.map(x => x * x)32// Result: RDD containing [1, 4, 9, 16, 25]33```3435**flatMap**: Apply a function and flatten the results36```scala { .api }37def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]38```3940```scala41val lines = sc.parallelize(Array("hello world", "spark rdd"))42val words = lines.flatMap(line => line.split(" "))43// Result: RDD containing ["hello", "world", "spark", "rdd"]44```4546**filter**: Keep elements that satisfy a predicate47```scala { .api }48def filter(f: T => Boolean): RDD[T]49```5051```scala52val numbers = sc.parallelize(Array(1, 2, 3, 4, 5, 6))53val evens = numbers.filter(_ % 2 == 0)54// Result: RDD containing [2, 4, 6]55```5657**distinct**: Remove duplicate elements58```scala { .api }59def distinct(): RDD[T]60def distinct(numPartitions: Int): RDD[T]61```6263```scala64val data = sc.parallelize(Array(1, 2, 2, 3, 3, 3))65val unique = data.distinct()66// Result: RDD containing [1, 2, 3]67```6869### Sampling Transformations7071**sample**: Return a sampled subset72```scala { .api }73def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]74```7576```scala77val data = sc.parallelize(1 to 100)78val sampled = data.sample(withReplacement = false, fraction = 0.1)79// Result: RDD with approximately 10% of original elements80```8182**randomSplit**: Split RDD randomly into multiple RDDs83```scala { .api }84def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]85```8687```scala88val data = sc.parallelize(1 to 100)89val Array(train, test) = data.randomSplit(Array(0.7, 0.3))90// Result: Two RDDs with ~70% and ~30% of data respectively91```9293### Set Operations9495**union**: Return the union of two RDDs96```scala { .api }97def union(other: RDD[T]): RDD[T]98```99100**intersection**: Return the intersection of two RDDs101```scala { .api }102def intersection(other: RDD[T]): RDD[T]103def intersection(other: RDD[T], numPartitions: Int): RDD[T]104```105106**subtract**: Return elements in this RDD but not in the other107```scala { .api }108def subtract(other: RDD[T]): RDD[T]109def subtract(other: RDD[T], numPartitions: Int): RDD[T]110def subtract(other: RDD[T], p: Partitioner): RDD[T]111```112113```scala114val rdd1 = sc.parallelize(Array(1, 2, 3, 4))115val rdd2 = sc.parallelize(Array(3, 4, 5, 6))116117val unionRDD = rdd1.union(rdd2) // [1, 2, 3, 4, 3, 4, 5, 6]118val intersectionRDD = rdd1.intersection(rdd2) // [3, 4]119val subtractRDD = rdd1.subtract(rdd2) // [1, 2]120```121122### Pairing and Joining123124**zip**: Zip this RDD with another one, returning key-value pairs125```scala { .api }126def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]127```128129**zipWithIndex**: Zip with the element indices130```scala { .api }131def zipWithIndex(): RDD[(T, Long)]132```133134**zipWithUniqueId**: Zip with generated unique IDs135```scala { .api }136def zipWithUniqueId(): RDD[(T, Long)]137```138139**cartesian**: Return Cartesian product with another RDD140```scala { .api }141def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]142```143144```scala145val rdd1 = sc.parallelize(Array("a", "b"))146val rdd2 = sc.parallelize(Array(1, 2))147148val zipped = rdd1.zip(rdd2) // [("a", 1), ("b", 2)]149val withIndex = rdd1.zipWithIndex() // [("a", 0), ("b", 1)]150val cartesian = rdd1.cartesian(rdd2) // [("a", 1), ("a", 2), ("b", 1), ("b", 2)]151```152153### Grouping and Sorting154155**groupBy**: Group elements by a key function156```scala { .api }157def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]158def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]159```160161```scala162val data = sc.parallelize(Array(1, 2, 3, 4, 5, 6))163val grouped = data.groupBy(_ % 2) // Group by even/odd164// Result: [(0, [2, 4, 6]), (1, [1, 3, 5])]165```166167**keyBy**: Create tuples by applying a function to generate keys168```scala { .api }169def keyBy[K](f: T => K): RDD[(K, T)]170```171172```scala173val words = sc.parallelize(Array("apple", "banana", "apricot"))174val byFirstLetter = words.keyBy(_.charAt(0))175// Result: [('a', "apple"), ('b', "banana"), ('a', "apricot")]176```177178### Partitioning Transformations179180**repartition**: Increase or decrease partitions with shuffle181```scala { .api }182def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]183```184185**coalesce**: Reduce the number of partitions (optionally with shuffle)186```scala { .api }187def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]188```189190```scala191val data = sc.parallelize(1 to 1000, 10) // 10 partitions192val repartitioned = data.repartition(5) // 5 partitions with shuffle193val coalesced = data.coalesce(5) // 5 partitions without shuffle194```195196### Partition-wise Operations197198**mapPartitions**: Apply function to each partition independently199```scala { .api }200def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]201```202203**mapPartitionsWithIndex**: Apply function with partition index204```scala { .api }205def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]206```207208**glom**: Return an array of all elements in each partition209```scala { .api }210def glom(): RDD[Array[T]]211```212213```scala214val data = sc.parallelize(Array(1, 2, 3, 4, 5, 6), 2)215216// Sum elements in each partition217val partitionSums = data.mapPartitions(iter => Iterator(iter.sum))218219// Add partition index to each element220val withPartitionId = data.mapPartitionsWithIndex((index, iter) =>221iter.map(value => (index, value)))222223// Get arrays of elements per partition224val partitionArrays = data.glom() // [[1, 2, 3], [4, 5, 6]]225```226227## Actions228229Actions trigger the execution of transformations and return values to the driver program or save data to storage.230231### Collection Actions232233**collect**: Return all elements as an array (use with caution on large datasets)234```scala { .api }235def collect(): Array[T]236```237238**take**: Return first n elements239```scala { .api }240def take(num: Int): Array[T]241```242243**takeOrdered**: Return K smallest elements244```scala { .api }245def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]246```247248**top**: Return K largest elements249```scala { .api }250def top(num: Int)(implicit ord: Ordering[T]): Array[T]251```252253**takeSample**: Return a random sample of elements254```scala { .api }255def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]256```257258**first**: Return the first element259```scala { .api }260def first(): T261```262263```scala264val data = sc.parallelize(Array(5, 1, 3, 9, 2, 7))265266val all = data.collect() // [5, 1, 3, 9, 2, 7]267val first3 = data.take(3) // [5, 1, 3]268val smallest2 = data.takeOrdered(2) // [1, 2]269val largest2 = data.top(2) // [9, 7]270val sample = data.takeSample(false, 3) // Random 3 elements271val firstElement = data.first() // 5272```273274### Aggregation Actions275276**count**: Return the number of elements277```scala { .api }278def count(): Long279```280281**countByValue**: Return count of each unique value282```scala { .api }283def countByValue()(implicit ord: Ordering[T]): Map[T, Long]284```285286**reduce**: Reduce elements using an associative function287```scala { .api }288def reduce(f: (T, T) => T): T289```290291**fold**: Aggregate with a zero value292```scala { .api }293def fold(zeroValue: T)(op: (T, T) => T): T294```295296**aggregate**: Aggregate with different result type297```scala { .api }298def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U299```300301```scala302val numbers = sc.parallelize(Array(1, 2, 3, 4, 5))303304val count = numbers.count() // 5305val sum = numbers.reduce(_ + _) // 15306val sumWithZero = numbers.fold(0)(_ + _) // 15307val (sum2, count2) = numbers.aggregate((0, 0))( // (15, 5)308(acc, value) => (acc._1 + value, acc._2 + 1), // seqOp309(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // combOp310)311312val valueCounts = sc.parallelize(Array("a", "b", "a", "c", "b")).countByValue()313// Result: Map("a" -> 2, "b" -> 2, "c" -> 1)314```315316### Statistical Actions317318For RDDs containing numeric types, additional statistical operations are available through implicit conversions:319320**min/max**: Find minimum/maximum values321```scala { .api }322def min()(implicit ord: Ordering[T]): T323def max()(implicit ord: Ordering[T]): T324```325326```scala327val numbers = sc.parallelize(Array(5, 1, 9, 3, 7))328val minimum = numbers.min() // 1329val maximum = numbers.max() // 9330```331332### Iterator Actions333334**foreach**: Apply a function to each element (side effects only)335```scala { .api }336def foreach(f: T => Unit): Unit337```338339**foreachPartition**: Apply a function to each partition340```scala { .api }341def foreachPartition(f: Iterator[T] => Unit): Unit342```343344**toLocalIterator**: Return an iterator that consumes each partition sequentially345```scala { .api }346def toLocalIterator: Iterator[T]347```348349```scala350val data = sc.parallelize(Array(1, 2, 3, 4, 5))351352// Print each element (for debugging)353data.foreach(println)354355// Process each partition (e.g., write to database)356data.foreachPartition { partition =>357// Setup database connection358partition.foreach { element =>359// Insert element into database360}361// Close database connection362}363```364365## Save Operations366367**saveAsTextFile**: Save RDD as text files368```scala { .api }369def saveAsTextFile(path: String): Unit370def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit371```372373**saveAsObjectFile**: Save as SequenceFile of serialized objects374```scala { .api }375def saveAsObjectFile(path: String): Unit376```377378```scala379val data = sc.parallelize(Array(1, 2, 3, 4, 5))380381// Save as text files382data.saveAsTextFile("hdfs://path/to/output")383384// Save with compression385import org.apache.hadoop.io.compress.GzipCodec386data.saveAsTextFile("hdfs://path/to/compressed", classOf[GzipCodec])387388// Save as object file389data.saveAsObjectFile("hdfs://path/to/objects")390```391392## RDD Properties and Metadata393394**Partition Information:**395```scala { .api }396def partitions: Array[Partition] // Get partition array397def getNumPartitions: Int // Get number of partitions398def partitioner: Option[Partitioner] // Get partitioner if any399```400401**Dependencies and Lineage:**402```scala { .api }403def dependencies: Seq[Dependency[_]] // RDD dependencies404def toDebugString: String // Debug string showing lineage405```406407**Naming and Context:**408```scala { .api }409def setName(name: String): RDD[T] // Set RDD name for monitoring410def name: String // Get RDD name411def id: Int // Unique RDD identifier412def sparkContext: SparkContext // The SparkContext that created this RDD413```414415```scala416val data = sc.parallelize(Array(1, 2, 3, 4, 5), 3).setName("MyRDD")417418println(s"Partitions: ${data.getNumPartitions}") // Partitions: 3419println(s"Name: ${data.name}") // Name: MyRDD420println(s"ID: ${data.id}") // ID: 0421println(data.toDebugString) // Shows RDD lineage422```423424## Type Conversions425426**toJavaRDD**: Convert to Java RDD427```scala { .api }428def toJavaRDD(): JavaRDD[T]429```430431## Error Handling and Common Exceptions432433RDD operations can fail for various reasons. Understanding common error scenarios helps with debugging and building robust applications.434435### Common RDD Exceptions436437**SparkException**: General Spark execution errors438```scala439// Task failure due to out of memory, serialization issues, etc.440try {441val result = rdd.collect()442} catch {443case e: SparkException => println(s"Spark execution failed: ${e.getMessage}")444}445```446447**TaskResultLost**: Task results lost due to network or node failure448```scala449// Typically indicates node failure or network issues450// Spark automatically retries, but may eventually fail451```452453**OutOfMemoryError**: Driver or executor runs out of memory454```scala455// Common with collect() on large datasets456try {457val allData = largeRDD.collect() // Dangerous!458} catch {459case _: OutOfMemoryError =>460println("Dataset too large for collect(). Use take() or write to storage.")461}462```463464### RDD Action Error Scenarios465466**collect()**: Can cause OutOfMemoryError if result doesn't fit in driver memory467```scala468// Safe: Use take() for sampling469val sample = rdd.take(100)470471// Dangerous: Collect entire large dataset472val all = rdd.collect() // May cause OOM473```474475**reduce()**: Fails if RDD is empty476```scala477try {478val sum = rdd.reduce(_ + _)479} catch {480case e: UnsupportedOperationException =>481println("Cannot reduce empty RDD")482}483484// Safe alternative485val sum = rdd.fold(0)(_ + _) // Works with empty RDDs486```487488**first()**: Throws exception if RDD is empty489```scala490try {491val firstElement = rdd.first()492} catch {493case e: UnsupportedOperationException =>494println("RDD is empty")495}496497// Safe alternative498val maybeFirst = rdd.take(1).headOption499```500501### Serialization Errors502503**NotSerializableException**: Objects must be serializable for distribution504```scala505class NotSerializable {506def process(x: Int): Int = x * 2507}508509val processor = new NotSerializable()510511// This will fail at runtime512val result = rdd.map(x => processor.process(x)) // NotSerializableException513514// Solution: Make objects serializable or use functions515val result = rdd.map(x => x * 2) // Safe516```517518### Partition and File Errors519520**FileNotFoundException**: When reading non-existent files521```scala522try {523val data = sc.textFile("hdfs://nonexistent/path")524data.count() // Error occurs on action, not creation525} catch {526case e: FileNotFoundException =>527println(s"File not found: ${e.getMessage}")528}529```530531**InvalidInputException**: Malformed input data532```scala533// Handle corrupted or malformed files534try {535val data = sc.sequenceFile[String, String]("path/to/corrupted/file")536data.collect()537} catch {538case e: InvalidInputException =>539println(s"Invalid input format: ${e.getMessage}")540}541```542543### Network and Cluster Errors544545**Connection timeouts**: Network issues between nodes546- Configure `spark.network.timeout` for longer operations547- Use `spark.sql.adaptive.coalescePartitions.enabled=true` to reduce network overhead548549**Shuffle failures**: Data shuffle operations failing550- Common with operations like `groupByKey`, `join`, `distinct`551- Increase `spark.serializer.objectStreamReset` interval552- Consider using `reduceByKey` instead of `groupByKey`553554### Best Practices for Error Handling5555561. **Avoid collect() on large datasets**: Use `take()`, `sample()`, or write to storage5572. **Handle empty RDDs**: Use `fold()` instead of `reduce()`, check `isEmpty()` before actions5583. **Ensure serializability**: Keep closures simple, avoid capturing non-serializable objects5594. **Monitor resource usage**: Configure appropriate executor memory and cores5605. **Use checkpointing**: For long lineages, checkpoint intermediate results5616. **Handle file system errors**: Validate paths exist before reading, use try-catch for file operations562563```scala564// Robust RDD processing pattern565def processRDDSafely[T](rdd: RDD[T]): Option[Array[T]] = {566try {567if (rdd.isEmpty()) {568println("Warning: RDD is empty")569None570} else {571// Use take() instead of collect() for safety572val sample = rdd.take(1000)573Some(sample)574}575} catch {576case e: SparkException =>577println(s"Spark processing failed: ${e.getMessage}")578None579case e: OutOfMemoryError =>580println("Out of memory - dataset too large")581None582}583}584```585586This comprehensive coverage of RDD operations provides the foundation for all data processing in Apache Spark. Understanding these operations and their potential failure modes is crucial for effective Spark programming.