Spec RegistrySpec Registry

Help your agents use open-source better. Learn more.

Find usage specs for your project’s dependencies

>

maven-apache-spark

Description
Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
Author
tessl
Last updated

How to use

npx @tessl/cli registry install tessl/maven-apache-spark@1.0.0

core-rdd.md docs/

1
# Core RDD Operations
2
3
The RDD (Resilient Distributed Dataset) is the fundamental abstraction in Apache Spark. It represents an immutable, partitioned collection of elements that can be operated on in parallel with fault-tolerance built-in.
4
5
## RDD Class
6
7
```scala { .api }
8
abstract class RDD[T] extends Serializable {
9
// Core properties
10
def partitions: Array[Partition]
11
def compute(split: Partition, context: TaskContext): Iterator[T]
12
def dependencies: Seq[Dependency[_]]
13
def partitioner: Option[Partitioner] = None
14
def preferredLocations(split: Partition): Seq[String] = Nil
15
}
16
```
17
18
## Transformations
19
20
Transformations are lazy operations that create a new RDD from an existing one. They are not executed immediately but build a directed acyclic graph (DAG) of computations.
21
22
### Basic Transformations
23
24
**map**: Apply a function to each element
25
```scala { .api }
26
def map[U: ClassTag](f: T => U): RDD[U]
27
```
28
29
```scala
30
val numbers = sc.parallelize(Array(1, 2, 3, 4, 5))
31
val squared = numbers.map(x => x * x)
32
// Result: RDD containing [1, 4, 9, 16, 25]
33
```
34
35
**flatMap**: Apply a function and flatten the results
36
```scala { .api }
37
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
38
```
39
40
```scala
41
val lines = sc.parallelize(Array("hello world", "spark rdd"))
42
val words = lines.flatMap(line => line.split(" "))
43
// Result: RDD containing ["hello", "world", "spark", "rdd"]
44
```
45
46
**filter**: Keep elements that satisfy a predicate
47
```scala { .api }
48
def filter(f: T => Boolean): RDD[T]
49
```
50
51
```scala
52
val numbers = sc.parallelize(Array(1, 2, 3, 4, 5, 6))
53
val evens = numbers.filter(_ % 2 == 0)
54
// Result: RDD containing [2, 4, 6]
55
```
56
57
**distinct**: Remove duplicate elements
58
```scala { .api }
59
def distinct(): RDD[T]
60
def distinct(numPartitions: Int): RDD[T]
61
```
62
63
```scala
64
val data = sc.parallelize(Array(1, 2, 2, 3, 3, 3))
65
val unique = data.distinct()
66
// Result: RDD containing [1, 2, 3]
67
```
68
69
### Sampling Transformations
70
71
**sample**: Return a sampled subset
72
```scala { .api }
73
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
74
```
75
76
```scala
77
val data = sc.parallelize(1 to 100)
78
val sampled = data.sample(withReplacement = false, fraction = 0.1)
79
// Result: RDD with approximately 10% of original elements
80
```
81
82
**randomSplit**: Split RDD randomly into multiple RDDs
83
```scala { .api }
84
def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]
85
```
86
87
```scala
88
val data = sc.parallelize(1 to 100)
89
val Array(train, test) = data.randomSplit(Array(0.7, 0.3))
90
// Result: Two RDDs with ~70% and ~30% of data respectively
91
```
92
93
### Set Operations
94
95
**union**: Return the union of two RDDs
96
```scala { .api }
97
def union(other: RDD[T]): RDD[T]
98
```
99
100
**intersection**: Return the intersection of two RDDs
101
```scala { .api }
102
def intersection(other: RDD[T]): RDD[T]
103
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
104
```
105
106
**subtract**: Return elements in this RDD but not in the other
107
```scala { .api }
108
def subtract(other: RDD[T]): RDD[T]
109
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
110
def subtract(other: RDD[T], p: Partitioner): RDD[T]
111
```
112
113
```scala
114
val rdd1 = sc.parallelize(Array(1, 2, 3, 4))
115
val rdd2 = sc.parallelize(Array(3, 4, 5, 6))
116
117
val unionRDD = rdd1.union(rdd2) // [1, 2, 3, 4, 3, 4, 5, 6]
118
val intersectionRDD = rdd1.intersection(rdd2) // [3, 4]
119
val subtractRDD = rdd1.subtract(rdd2) // [1, 2]
120
```
121
122
### Pairing and Joining
123
124
**zip**: Zip this RDD with another one, returning key-value pairs
125
```scala { .api }
126
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
127
```
128
129
**zipWithIndex**: Zip with the element indices
130
```scala { .api }
131
def zipWithIndex(): RDD[(T, Long)]
132
```
133
134
**zipWithUniqueId**: Zip with generated unique IDs
135
```scala { .api }
136
def zipWithUniqueId(): RDD[(T, Long)]
137
```
138
139
**cartesian**: Return Cartesian product with another RDD
140
```scala { .api }
141
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
142
```
143
144
```scala
145
val rdd1 = sc.parallelize(Array("a", "b"))
146
val rdd2 = sc.parallelize(Array(1, 2))
147
148
val zipped = rdd1.zip(rdd2) // [("a", 1), ("b", 2)]
149
val withIndex = rdd1.zipWithIndex() // [("a", 0), ("b", 1)]
150
val cartesian = rdd1.cartesian(rdd2) // [("a", 1), ("a", 2), ("b", 1), ("b", 2)]
151
```
152
153
### Grouping and Sorting
154
155
**groupBy**: Group elements by a key function
156
```scala { .api }
157
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
158
def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
159
```
160
161
```scala
162
val data = sc.parallelize(Array(1, 2, 3, 4, 5, 6))
163
val grouped = data.groupBy(_ % 2) // Group by even/odd
164
// Result: [(0, [2, 4, 6]), (1, [1, 3, 5])]
165
```
166
167
**keyBy**: Create tuples by applying a function to generate keys
168
```scala { .api }
169
def keyBy[K](f: T => K): RDD[(K, T)]
170
```
171
172
```scala
173
val words = sc.parallelize(Array("apple", "banana", "apricot"))
174
val byFirstLetter = words.keyBy(_.charAt(0))
175
// Result: [('a', "apple"), ('b', "banana"), ('a', "apricot")]
176
```
177
178
### Partitioning Transformations
179
180
**repartition**: Increase or decrease partitions with shuffle
181
```scala { .api }
182
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
183
```
184
185
**coalesce**: Reduce the number of partitions (optionally with shuffle)
186
```scala { .api }
187
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]
188
```
189
190
```scala
191
val data = sc.parallelize(1 to 1000, 10) // 10 partitions
192
val repartitioned = data.repartition(5) // 5 partitions with shuffle
193
val coalesced = data.coalesce(5) // 5 partitions without shuffle
194
```
195
196
### Partition-wise Operations
197
198
**mapPartitions**: Apply function to each partition independently
199
```scala { .api }
200
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
201
```
202
203
**mapPartitionsWithIndex**: Apply function with partition index
204
```scala { .api }
205
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
206
```
207
208
**glom**: Return an array of all elements in each partition
209
```scala { .api }
210
def glom(): RDD[Array[T]]
211
```
212
213
```scala
214
val data = sc.parallelize(Array(1, 2, 3, 4, 5, 6), 2)
215
216
// Sum elements in each partition
217
val partitionSums = data.mapPartitions(iter => Iterator(iter.sum))
218
219
// Add partition index to each element
220
val withPartitionId = data.mapPartitionsWithIndex((index, iter) =>
221
iter.map(value => (index, value)))
222
223
// Get arrays of elements per partition
224
val partitionArrays = data.glom() // [[1, 2, 3], [4, 5, 6]]
225
```
226
227
## Actions
228
229
Actions trigger the execution of transformations and return values to the driver program or save data to storage.
230
231
### Collection Actions
232
233
**collect**: Return all elements as an array (use with caution on large datasets)
234
```scala { .api }
235
def collect(): Array[T]
236
```
237
238
**take**: Return first n elements
239
```scala { .api }
240
def take(num: Int): Array[T]
241
```
242
243
**takeOrdered**: Return K smallest elements
244
```scala { .api }
245
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
246
```
247
248
**top**: Return K largest elements
249
```scala { .api }
250
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
251
```
252
253
**takeSample**: Return a random sample of elements
254
```scala { .api }
255
def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
256
```
257
258
**first**: Return the first element
259
```scala { .api }
260
def first(): T
261
```
262
263
```scala
264
val data = sc.parallelize(Array(5, 1, 3, 9, 2, 7))
265
266
val all = data.collect() // [5, 1, 3, 9, 2, 7]
267
val first3 = data.take(3) // [5, 1, 3]
268
val smallest2 = data.takeOrdered(2) // [1, 2]
269
val largest2 = data.top(2) // [9, 7]
270
val sample = data.takeSample(false, 3) // Random 3 elements
271
val firstElement = data.first() // 5
272
```
273
274
### Aggregation Actions
275
276
**count**: Return the number of elements
277
```scala { .api }
278
def count(): Long
279
```
280
281
**countByValue**: Return count of each unique value
282
```scala { .api }
283
def countByValue()(implicit ord: Ordering[T]): Map[T, Long]
284
```
285
286
**reduce**: Reduce elements using an associative function
287
```scala { .api }
288
def reduce(f: (T, T) => T): T
289
```
290
291
**fold**: Aggregate with a zero value
292
```scala { .api }
293
def fold(zeroValue: T)(op: (T, T) => T): T
294
```
295
296
**aggregate**: Aggregate with different result type
297
```scala { .api }
298
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
299
```
300
301
```scala
302
val numbers = sc.parallelize(Array(1, 2, 3, 4, 5))
303
304
val count = numbers.count() // 5
305
val sum = numbers.reduce(_ + _) // 15
306
val sumWithZero = numbers.fold(0)(_ + _) // 15
307
val (sum2, count2) = numbers.aggregate((0, 0))( // (15, 5)
308
(acc, value) => (acc._1 + value, acc._2 + 1), // seqOp
309
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // combOp
310
)
311
312
val valueCounts = sc.parallelize(Array("a", "b", "a", "c", "b")).countByValue()
313
// Result: Map("a" -> 2, "b" -> 2, "c" -> 1)
314
```
315
316
### Statistical Actions
317
318
For RDDs containing numeric types, additional statistical operations are available through implicit conversions:
319
320
**min/max**: Find minimum/maximum values
321
```scala { .api }
322
def min()(implicit ord: Ordering[T]): T
323
def max()(implicit ord: Ordering[T]): T
324
```
325
326
```scala
327
val numbers = sc.parallelize(Array(5, 1, 9, 3, 7))
328
val minimum = numbers.min() // 1
329
val maximum = numbers.max() // 9
330
```
331
332
### Iterator Actions
333
334
**foreach**: Apply a function to each element (side effects only)
335
```scala { .api }
336
def foreach(f: T => Unit): Unit
337
```
338
339
**foreachPartition**: Apply a function to each partition
340
```scala { .api }
341
def foreachPartition(f: Iterator[T] => Unit): Unit
342
```
343
344
**toLocalIterator**: Return an iterator that consumes each partition sequentially
345
```scala { .api }
346
def toLocalIterator: Iterator[T]
347
```
348
349
```scala
350
val data = sc.parallelize(Array(1, 2, 3, 4, 5))
351
352
// Print each element (for debugging)
353
data.foreach(println)
354
355
// Process each partition (e.g., write to database)
356
data.foreachPartition { partition =>
357
// Setup database connection
358
partition.foreach { element =>
359
// Insert element into database
360
}
361
// Close database connection
362
}
363
```
364
365
## Save Operations
366
367
**saveAsTextFile**: Save RDD as text files
368
```scala { .api }
369
def saveAsTextFile(path: String): Unit
370
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
371
```
372
373
**saveAsObjectFile**: Save as SequenceFile of serialized objects
374
```scala { .api }
375
def saveAsObjectFile(path: String): Unit
376
```
377
378
```scala
379
val data = sc.parallelize(Array(1, 2, 3, 4, 5))
380
381
// Save as text files
382
data.saveAsTextFile("hdfs://path/to/output")
383
384
// Save with compression
385
import org.apache.hadoop.io.compress.GzipCodec
386
data.saveAsTextFile("hdfs://path/to/compressed", classOf[GzipCodec])
387
388
// Save as object file
389
data.saveAsObjectFile("hdfs://path/to/objects")
390
```
391
392
## RDD Properties and Metadata
393
394
**Partition Information:**
395
```scala { .api }
396
def partitions: Array[Partition] // Get partition array
397
def getNumPartitions: Int // Get number of partitions
398
def partitioner: Option[Partitioner] // Get partitioner if any
399
```
400
401
**Dependencies and Lineage:**
402
```scala { .api }
403
def dependencies: Seq[Dependency[_]] // RDD dependencies
404
def toDebugString: String // Debug string showing lineage
405
```
406
407
**Naming and Context:**
408
```scala { .api }
409
def setName(name: String): RDD[T] // Set RDD name for monitoring
410
def name: String // Get RDD name
411
def id: Int // Unique RDD identifier
412
def sparkContext: SparkContext // The SparkContext that created this RDD
413
```
414
415
```scala
416
val data = sc.parallelize(Array(1, 2, 3, 4, 5), 3).setName("MyRDD")
417
418
println(s"Partitions: ${data.getNumPartitions}") // Partitions: 3
419
println(s"Name: ${data.name}") // Name: MyRDD
420
println(s"ID: ${data.id}") // ID: 0
421
println(data.toDebugString) // Shows RDD lineage
422
```
423
424
## Type Conversions
425
426
**toJavaRDD**: Convert to Java RDD
427
```scala { .api }
428
def toJavaRDD(): JavaRDD[T]
429
```
430
431
## Error Handling and Common Exceptions
432
433
RDD operations can fail for various reasons. Understanding common error scenarios helps with debugging and building robust applications.
434
435
### Common RDD Exceptions
436
437
**SparkException**: General Spark execution errors
438
```scala
439
// Task failure due to out of memory, serialization issues, etc.
440
try {
441
val result = rdd.collect()
442
} catch {
443
case e: SparkException => println(s"Spark execution failed: ${e.getMessage}")
444
}
445
```
446
447
**TaskResultLost**: Task results lost due to network or node failure
448
```scala
449
// Typically indicates node failure or network issues
450
// Spark automatically retries, but may eventually fail
451
```
452
453
**OutOfMemoryError**: Driver or executor runs out of memory
454
```scala
455
// Common with collect() on large datasets
456
try {
457
val allData = largeRDD.collect() // Dangerous!
458
} catch {
459
case _: OutOfMemoryError =>
460
println("Dataset too large for collect(). Use take() or write to storage.")
461
}
462
```
463
464
### RDD Action Error Scenarios
465
466
**collect()**: Can cause OutOfMemoryError if result doesn't fit in driver memory
467
```scala
468
// Safe: Use take() for sampling
469
val sample = rdd.take(100)
470
471
// Dangerous: Collect entire large dataset
472
val all = rdd.collect() // May cause OOM
473
```
474
475
**reduce()**: Fails if RDD is empty
476
```scala
477
try {
478
val sum = rdd.reduce(_ + _)
479
} catch {
480
case e: UnsupportedOperationException =>
481
println("Cannot reduce empty RDD")
482
}
483
484
// Safe alternative
485
val sum = rdd.fold(0)(_ + _) // Works with empty RDDs
486
```
487
488
**first()**: Throws exception if RDD is empty
489
```scala
490
try {
491
val firstElement = rdd.first()
492
} catch {
493
case e: UnsupportedOperationException =>
494
println("RDD is empty")
495
}
496
497
// Safe alternative
498
val maybeFirst = rdd.take(1).headOption
499
```
500
501
### Serialization Errors
502
503
**NotSerializableException**: Objects must be serializable for distribution
504
```scala
505
class NotSerializable {
506
def process(x: Int): Int = x * 2
507
}
508
509
val processor = new NotSerializable()
510
511
// This will fail at runtime
512
val result = rdd.map(x => processor.process(x)) // NotSerializableException
513
514
// Solution: Make objects serializable or use functions
515
val result = rdd.map(x => x * 2) // Safe
516
```
517
518
### Partition and File Errors
519
520
**FileNotFoundException**: When reading non-existent files
521
```scala
522
try {
523
val data = sc.textFile("hdfs://nonexistent/path")
524
data.count() // Error occurs on action, not creation
525
} catch {
526
case e: FileNotFoundException =>
527
println(s"File not found: ${e.getMessage}")
528
}
529
```
530
531
**InvalidInputException**: Malformed input data
532
```scala
533
// Handle corrupted or malformed files
534
try {
535
val data = sc.sequenceFile[String, String]("path/to/corrupted/file")
536
data.collect()
537
} catch {
538
case e: InvalidInputException =>
539
println(s"Invalid input format: ${e.getMessage}")
540
}
541
```
542
543
### Network and Cluster Errors
544
545
**Connection timeouts**: Network issues between nodes
546
- Configure `spark.network.timeout` for longer operations
547
- Use `spark.sql.adaptive.coalescePartitions.enabled=true` to reduce network overhead
548
549
**Shuffle failures**: Data shuffle operations failing
550
- Common with operations like `groupByKey`, `join`, `distinct`
551
- Increase `spark.serializer.objectStreamReset` interval
552
- Consider using `reduceByKey` instead of `groupByKey`
553
554
### Best Practices for Error Handling
555
556
1. **Avoid collect() on large datasets**: Use `take()`, `sample()`, or write to storage
557
2. **Handle empty RDDs**: Use `fold()` instead of `reduce()`, check `isEmpty()` before actions
558
3. **Ensure serializability**: Keep closures simple, avoid capturing non-serializable objects
559
4. **Monitor resource usage**: Configure appropriate executor memory and cores
560
5. **Use checkpointing**: For long lineages, checkpoint intermediate results
561
6. **Handle file system errors**: Validate paths exist before reading, use try-catch for file operations
562
563
```scala
564
// Robust RDD processing pattern
565
def processRDDSafely[T](rdd: RDD[T]): Option[Array[T]] = {
566
try {
567
if (rdd.isEmpty()) {
568
println("Warning: RDD is empty")
569
None
570
} else {
571
// Use take() instead of collect() for safety
572
val sample = rdd.take(1000)
573
Some(sample)
574
}
575
} catch {
576
case e: SparkException =>
577
println(s"Spark processing failed: ${e.getMessage}")
578
None
579
case e: OutOfMemoryError =>
580
println("Out of memory - dataset too large")
581
None
582
}
583
}
584
```
585
586
This comprehensive coverage of RDD operations provides the foundation for all data processing in Apache Spark. Understanding these operations and their potential failure modes is crucial for effective Spark programming.