or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

caching-persistence.mdcore-rdd.mddata-sources.mdgraphx.mdindex.mdjava-api.mdkey-value-operations.mdmllib.mdpython-api.mdspark-context.mdsql.mdstreaming.md

core-rdd.mddocs/

0

# Core RDD Operations

1

2

The RDD (Resilient Distributed Dataset) is the fundamental abstraction in Apache Spark. It represents an immutable, partitioned collection of elements that can be operated on in parallel with fault-tolerance built-in.

3

4

## RDD Class

5

6

```scala { .api }

7

abstract class RDD[T] extends Serializable {

8

// Core properties

9

def partitions: Array[Partition]

10

def compute(split: Partition, context: TaskContext): Iterator[T]

11

def dependencies: Seq[Dependency[_]]

12

def partitioner: Option[Partitioner] = None

13

def preferredLocations(split: Partition): Seq[String] = Nil

14

}

15

```

16

17

## Transformations

18

19

Transformations are lazy operations that create a new RDD from an existing one. They are not executed immediately but build a directed acyclic graph (DAG) of computations.

20

21

### Basic Transformations

22

23

**map**: Apply a function to each element

24

```scala { .api }

25

def map[U: ClassTag](f: T => U): RDD[U]

26

```

27

28

```scala

29

val numbers = sc.parallelize(Array(1, 2, 3, 4, 5))

30

val squared = numbers.map(x => x * x)

31

// Result: RDD containing [1, 4, 9, 16, 25]

32

```

33

34

**flatMap**: Apply a function and flatten the results

35

```scala { .api }

36

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

37

```

38

39

```scala

40

val lines = sc.parallelize(Array("hello world", "spark rdd"))

41

val words = lines.flatMap(line => line.split(" "))

42

// Result: RDD containing ["hello", "world", "spark", "rdd"]

43

```

44

45

**filter**: Keep elements that satisfy a predicate

46

```scala { .api }

47

def filter(f: T => Boolean): RDD[T]

48

```

49

50

```scala

51

val numbers = sc.parallelize(Array(1, 2, 3, 4, 5, 6))

52

val evens = numbers.filter(_ % 2 == 0)

53

// Result: RDD containing [2, 4, 6]

54

```

55

56

**distinct**: Remove duplicate elements

57

```scala { .api }

58

def distinct(): RDD[T]

59

def distinct(numPartitions: Int): RDD[T]

60

```

61

62

```scala

63

val data = sc.parallelize(Array(1, 2, 2, 3, 3, 3))

64

val unique = data.distinct()

65

// Result: RDD containing [1, 2, 3]

66

```

67

68

### Sampling Transformations

69

70

**sample**: Return a sampled subset

71

```scala { .api }

72

def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

73

```

74

75

```scala

76

val data = sc.parallelize(1 to 100)

77

val sampled = data.sample(withReplacement = false, fraction = 0.1)

78

// Result: RDD with approximately 10% of original elements

79

```

80

81

**randomSplit**: Split RDD randomly into multiple RDDs

82

```scala { .api }

83

def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]

84

```

85

86

```scala

87

val data = sc.parallelize(1 to 100)

88

val Array(train, test) = data.randomSplit(Array(0.7, 0.3))

89

// Result: Two RDDs with ~70% and ~30% of data respectively

90

```

91

92

### Set Operations

93

94

**union**: Return the union of two RDDs

95

```scala { .api }

96

def union(other: RDD[T]): RDD[T]

97

```

98

99

**intersection**: Return the intersection of two RDDs

100

```scala { .api }

101

def intersection(other: RDD[T]): RDD[T]

102

def intersection(other: RDD[T], numPartitions: Int): RDD[T]

103

```

104

105

**subtract**: Return elements in this RDD but not in the other

106

```scala { .api }

107

def subtract(other: RDD[T]): RDD[T]

108

def subtract(other: RDD[T], numPartitions: Int): RDD[T]

109

def subtract(other: RDD[T], p: Partitioner): RDD[T]

110

```

111

112

```scala

113

val rdd1 = sc.parallelize(Array(1, 2, 3, 4))

114

val rdd2 = sc.parallelize(Array(3, 4, 5, 6))

115

116

val unionRDD = rdd1.union(rdd2) // [1, 2, 3, 4, 3, 4, 5, 6]

117

val intersectionRDD = rdd1.intersection(rdd2) // [3, 4]

118

val subtractRDD = rdd1.subtract(rdd2) // [1, 2]

119

```

120

121

### Pairing and Joining

122

123

**zip**: Zip this RDD with another one, returning key-value pairs

124

```scala { .api }

125

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

126

```

127

128

**zipWithIndex**: Zip with the element indices

129

```scala { .api }

130

def zipWithIndex(): RDD[(T, Long)]

131

```

132

133

**zipWithUniqueId**: Zip with generated unique IDs

134

```scala { .api }

135

def zipWithUniqueId(): RDD[(T, Long)]

136

```

137

138

**cartesian**: Return Cartesian product with another RDD

139

```scala { .api }

140

def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]

141

```

142

143

```scala

144

val rdd1 = sc.parallelize(Array("a", "b"))

145

val rdd2 = sc.parallelize(Array(1, 2))

146

147

val zipped = rdd1.zip(rdd2) // [("a", 1), ("b", 2)]

148

val withIndex = rdd1.zipWithIndex() // [("a", 0), ("b", 1)]

149

val cartesian = rdd1.cartesian(rdd2) // [("a", 1), ("a", 2), ("b", 1), ("b", 2)]

150

```

151

152

### Grouping and Sorting

153

154

**groupBy**: Group elements by a key function

155

```scala { .api }

156

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

157

def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

158

```

159

160

```scala

161

val data = sc.parallelize(Array(1, 2, 3, 4, 5, 6))

162

val grouped = data.groupBy(_ % 2) // Group by even/odd

163

// Result: [(0, [2, 4, 6]), (1, [1, 3, 5])]

164

```

165

166

**keyBy**: Create tuples by applying a function to generate keys

167

```scala { .api }

168

def keyBy[K](f: T => K): RDD[(K, T)]

169

```

170

171

```scala

172

val words = sc.parallelize(Array("apple", "banana", "apricot"))

173

val byFirstLetter = words.keyBy(_.charAt(0))

174

// Result: [('a', "apple"), ('b', "banana"), ('a', "apricot")]

175

```

176

177

### Partitioning Transformations

178

179

**repartition**: Increase or decrease partitions with shuffle

180

```scala { .api }

181

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

182

```

183

184

**coalesce**: Reduce the number of partitions (optionally with shuffle)

185

```scala { .api }

186

def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]

187

```

188

189

```scala

190

val data = sc.parallelize(1 to 1000, 10) // 10 partitions

191

val repartitioned = data.repartition(5) // 5 partitions with shuffle

192

val coalesced = data.coalesce(5) // 5 partitions without shuffle

193

```

194

195

### Partition-wise Operations

196

197

**mapPartitions**: Apply function to each partition independently

198

```scala { .api }

199

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

200

```

201

202

**mapPartitionsWithIndex**: Apply function with partition index

203

```scala { .api }

204

def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

205

```

206

207

**glom**: Return an array of all elements in each partition

208

```scala { .api }

209

def glom(): RDD[Array[T]]

210

```

211

212

```scala

213

val data = sc.parallelize(Array(1, 2, 3, 4, 5, 6), 2)

214

215

// Sum elements in each partition

216

val partitionSums = data.mapPartitions(iter => Iterator(iter.sum))

217

218

// Add partition index to each element

219

val withPartitionId = data.mapPartitionsWithIndex((index, iter) =>

220

iter.map(value => (index, value)))

221

222

// Get arrays of elements per partition

223

val partitionArrays = data.glom() // [[1, 2, 3], [4, 5, 6]]

224

```

225

226

## Actions

227

228

Actions trigger the execution of transformations and return values to the driver program or save data to storage.

229

230

### Collection Actions

231

232

**collect**: Return all elements as an array (use with caution on large datasets)

233

```scala { .api }

234

def collect(): Array[T]

235

```

236

237

**take**: Return first n elements

238

```scala { .api }

239

def take(num: Int): Array[T]

240

```

241

242

**takeOrdered**: Return K smallest elements

243

```scala { .api }

244

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

245

```

246

247

**top**: Return K largest elements

248

```scala { .api }

249

def top(num: Int)(implicit ord: Ordering[T]): Array[T]

250

```

251

252

**takeSample**: Return a random sample of elements

253

```scala { .api }

254

def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]

255

```

256

257

**first**: Return the first element

258

```scala { .api }

259

def first(): T

260

```

261

262

```scala

263

val data = sc.parallelize(Array(5, 1, 3, 9, 2, 7))

264

265

val all = data.collect() // [5, 1, 3, 9, 2, 7]

266

val first3 = data.take(3) // [5, 1, 3]

267

val smallest2 = data.takeOrdered(2) // [1, 2]

268

val largest2 = data.top(2) // [9, 7]

269

val sample = data.takeSample(false, 3) // Random 3 elements

270

val firstElement = data.first() // 5

271

```

272

273

### Aggregation Actions

274

275

**count**: Return the number of elements

276

```scala { .api }

277

def count(): Long

278

```

279

280

**countByValue**: Return count of each unique value

281

```scala { .api }

282

def countByValue()(implicit ord: Ordering[T]): Map[T, Long]

283

```

284

285

**reduce**: Reduce elements using an associative function

286

```scala { .api }

287

def reduce(f: (T, T) => T): T

288

```

289

290

**fold**: Aggregate with a zero value

291

```scala { .api }

292

def fold(zeroValue: T)(op: (T, T) => T): T

293

```

294

295

**aggregate**: Aggregate with different result type

296

```scala { .api }

297

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

298

```

299

300

```scala

301

val numbers = sc.parallelize(Array(1, 2, 3, 4, 5))

302

303

val count = numbers.count() // 5

304

val sum = numbers.reduce(_ + _) // 15

305

val sumWithZero = numbers.fold(0)(_ + _) // 15

306

val (sum2, count2) = numbers.aggregate((0, 0))( // (15, 5)

307

(acc, value) => (acc._1 + value, acc._2 + 1), // seqOp

308

(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // combOp

309

)

310

311

val valueCounts = sc.parallelize(Array("a", "b", "a", "c", "b")).countByValue()

312

// Result: Map("a" -> 2, "b" -> 2, "c" -> 1)

313

```

314

315

### Statistical Actions

316

317

For RDDs containing numeric types, additional statistical operations are available through implicit conversions:

318

319

**min/max**: Find minimum/maximum values

320

```scala { .api }

321

def min()(implicit ord: Ordering[T]): T

322

def max()(implicit ord: Ordering[T]): T

323

```

324

325

```scala

326

val numbers = sc.parallelize(Array(5, 1, 9, 3, 7))

327

val minimum = numbers.min() // 1

328

val maximum = numbers.max() // 9

329

```

330

331

### Iterator Actions

332

333

**foreach**: Apply a function to each element (side effects only)

334

```scala { .api }

335

def foreach(f: T => Unit): Unit

336

```

337

338

**foreachPartition**: Apply a function to each partition

339

```scala { .api }

340

def foreachPartition(f: Iterator[T] => Unit): Unit

341

```

342

343

**toLocalIterator**: Return an iterator that consumes each partition sequentially

344

```scala { .api }

345

def toLocalIterator: Iterator[T]

346

```

347

348

```scala

349

val data = sc.parallelize(Array(1, 2, 3, 4, 5))

350

351

// Print each element (for debugging)

352

data.foreach(println)

353

354

// Process each partition (e.g., write to database)

355

data.foreachPartition { partition =>

356

// Setup database connection

357

partition.foreach { element =>

358

// Insert element into database

359

}

360

// Close database connection

361

}

362

```

363

364

## Save Operations

365

366

**saveAsTextFile**: Save RDD as text files

367

```scala { .api }

368

def saveAsTextFile(path: String): Unit

369

def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit

370

```

371

372

**saveAsObjectFile**: Save as SequenceFile of serialized objects

373

```scala { .api }

374

def saveAsObjectFile(path: String): Unit

375

```

376

377

```scala

378

val data = sc.parallelize(Array(1, 2, 3, 4, 5))

379

380

// Save as text files

381

data.saveAsTextFile("hdfs://path/to/output")

382

383

// Save with compression

384

import org.apache.hadoop.io.compress.GzipCodec

385

data.saveAsTextFile("hdfs://path/to/compressed", classOf[GzipCodec])

386

387

// Save as object file

388

data.saveAsObjectFile("hdfs://path/to/objects")

389

```

390

391

## RDD Properties and Metadata

392

393

**Partition Information:**

394

```scala { .api }

395

def partitions: Array[Partition] // Get partition array

396

def getNumPartitions: Int // Get number of partitions

397

def partitioner: Option[Partitioner] // Get partitioner if any

398

```

399

400

**Dependencies and Lineage:**

401

```scala { .api }

402

def dependencies: Seq[Dependency[_]] // RDD dependencies

403

def toDebugString: String // Debug string showing lineage

404

```

405

406

**Naming and Context:**

407

```scala { .api }

408

def setName(name: String): RDD[T] // Set RDD name for monitoring

409

def name: String // Get RDD name

410

def id: Int // Unique RDD identifier

411

def sparkContext: SparkContext // The SparkContext that created this RDD

412

```

413

414

```scala

415

val data = sc.parallelize(Array(1, 2, 3, 4, 5), 3).setName("MyRDD")

416

417

println(s"Partitions: ${data.getNumPartitions}") // Partitions: 3

418

println(s"Name: ${data.name}") // Name: MyRDD

419

println(s"ID: ${data.id}") // ID: 0

420

println(data.toDebugString) // Shows RDD lineage

421

```

422

423

## Type Conversions

424

425

**toJavaRDD**: Convert to Java RDD

426

```scala { .api }

427

def toJavaRDD(): JavaRDD[T]

428

```

429

430

## Error Handling and Common Exceptions

431

432

RDD operations can fail for various reasons. Understanding common error scenarios helps with debugging and building robust applications.

433

434

### Common RDD Exceptions

435

436

**SparkException**: General Spark execution errors

437

```scala

438

// Task failure due to out of memory, serialization issues, etc.

439

try {

440

val result = rdd.collect()

441

} catch {

442

case e: SparkException => println(s"Spark execution failed: ${e.getMessage}")

443

}

444

```

445

446

**TaskResultLost**: Task results lost due to network or node failure

447

```scala

448

// Typically indicates node failure or network issues

449

// Spark automatically retries, but may eventually fail

450

```

451

452

**OutOfMemoryError**: Driver or executor runs out of memory

453

```scala

454

// Common with collect() on large datasets

455

try {

456

val allData = largeRDD.collect() // Dangerous!

457

} catch {

458

case _: OutOfMemoryError =>

459

println("Dataset too large for collect(). Use take() or write to storage.")

460

}

461

```

462

463

### RDD Action Error Scenarios

464

465

**collect()**: Can cause OutOfMemoryError if result doesn't fit in driver memory

466

```scala

467

// Safe: Use take() for sampling

468

val sample = rdd.take(100)

469

470

// Dangerous: Collect entire large dataset

471

val all = rdd.collect() // May cause OOM

472

```

473

474

**reduce()**: Fails if RDD is empty

475

```scala

476

try {

477

val sum = rdd.reduce(_ + _)

478

} catch {

479

case e: UnsupportedOperationException =>

480

println("Cannot reduce empty RDD")

481

}

482

483

// Safe alternative

484

val sum = rdd.fold(0)(_ + _) // Works with empty RDDs

485

```

486

487

**first()**: Throws exception if RDD is empty

488

```scala

489

try {

490

val firstElement = rdd.first()

491

} catch {

492

case e: UnsupportedOperationException =>

493

println("RDD is empty")

494

}

495

496

// Safe alternative

497

val maybeFirst = rdd.take(1).headOption

498

```

499

500

### Serialization Errors

501

502

**NotSerializableException**: Objects must be serializable for distribution

503

```scala

504

class NotSerializable {

505

def process(x: Int): Int = x * 2

506

}

507

508

val processor = new NotSerializable()

509

510

// This will fail at runtime

511

val result = rdd.map(x => processor.process(x)) // NotSerializableException

512

513

// Solution: Make objects serializable or use functions

514

val result = rdd.map(x => x * 2) // Safe

515

```

516

517

### Partition and File Errors

518

519

**FileNotFoundException**: When reading non-existent files

520

```scala

521

try {

522

val data = sc.textFile("hdfs://nonexistent/path")

523

data.count() // Error occurs on action, not creation

524

} catch {

525

case e: FileNotFoundException =>

526

println(s"File not found: ${e.getMessage}")

527

}

528

```

529

530

**InvalidInputException**: Malformed input data

531

```scala

532

// Handle corrupted or malformed files

533

try {

534

val data = sc.sequenceFile[String, String]("path/to/corrupted/file")

535

data.collect()

536

} catch {

537

case e: InvalidInputException =>

538

println(s"Invalid input format: ${e.getMessage}")

539

}

540

```

541

542

### Network and Cluster Errors

543

544

**Connection timeouts**: Network issues between nodes

545

- Configure `spark.network.timeout` for longer operations

546

- Use `spark.sql.adaptive.coalescePartitions.enabled=true` to reduce network overhead

547

548

**Shuffle failures**: Data shuffle operations failing

549

- Common with operations like `groupByKey`, `join`, `distinct`

550

- Increase `spark.serializer.objectStreamReset` interval

551

- Consider using `reduceByKey` instead of `groupByKey`

552

553

### Best Practices for Error Handling

554

555

1. **Avoid collect() on large datasets**: Use `take()`, `sample()`, or write to storage

556

2. **Handle empty RDDs**: Use `fold()` instead of `reduce()`, check `isEmpty()` before actions

557

3. **Ensure serializability**: Keep closures simple, avoid capturing non-serializable objects

558

4. **Monitor resource usage**: Configure appropriate executor memory and cores

559

5. **Use checkpointing**: For long lineages, checkpoint intermediate results

560

6. **Handle file system errors**: Validate paths exist before reading, use try-catch for file operations

561

562

```scala

563

// Robust RDD processing pattern

564

def processRDDSafely[T](rdd: RDD[T]): Option[Array[T]] = {

565

try {

566

if (rdd.isEmpty()) {

567

println("Warning: RDD is empty")

568

None

569

} else {

570

// Use take() instead of collect() for safety

571

val sample = rdd.take(1000)

572

Some(sample)

573

}

574

} catch {

575

case e: SparkException =>

576

println(s"Spark processing failed: ${e.getMessage}")

577

None

578

case e: OutOfMemoryError =>

579

println("Out of memory - dataset too large")

580

None

581

}

582

}

583

```

584

585

This comprehensive coverage of RDD operations provides the foundation for all data processing in Apache Spark. Understanding these operations and their potential failure modes is crucial for effective Spark programming.