or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

broadcasting-accumulators.mdindex.mdjava-api.mdrdd-operations.mdspark-context.mdstorage-persistence.md

rdd-operations.mddocs/

0

# RDD Operations and Transformations

1

2

Resilient Distributed Datasets (RDDs) are the fundamental abstraction in Spark, representing immutable, fault-tolerant collections that can be operated on in parallel across a cluster.

3

4

## Core RDD Properties

5

6

```scala { .api }

7

abstract class RDD[T: ClassTag] {

8

def sparkContext: SparkContext

9

def id: Int

10

def partitions: Array[Partition]

11

def getNumPartitions: Int

12

def preferredLocations(split: Partition): Seq[String]

13

def dependencies: Seq[Dependency[_]]

14

def partitioner: Option[Partitioner]

15

def getStorageLevel: StorageLevel

16

}

17

```

18

19

## Transformations

20

21

Transformations are lazy operations that create new RDDs from existing ones. They are not executed until an action is called.

22

23

### Basic Transformations

24

25

```scala { .api }

26

abstract class RDD[T] {

27

def map[U: ClassTag](f: T => U): RDD[U]

28

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

29

def filter(f: T => Boolean): RDD[T]

30

def distinct(numPartitions: Int = numPartitions): RDD[T]

31

def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

32

def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]

33

}

34

```

35

36

### Set Operations

37

38

```scala { .api }

39

abstract class RDD[T] {

40

def union(other: RDD[T]): RDD[T]

41

def intersection(other: RDD[T]): RDD[T]

42

def intersection(other: RDD[T], partitioner: Partitioner): RDD[T]

43

def intersection(other: RDD[T], numPartitions: Int): RDD[T]

44

def subtract(other: RDD[T]): RDD[T]

45

def subtract(other: RDD[T], numPartitions: Int): RDD[T]

46

def subtract(other: RDD[T], p: Partitioner): RDD[T]

47

def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]

48

}

49

```

50

51

### Grouping and Partitioning

52

53

```scala { .api }

54

abstract class RDD[T] {

55

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

56

def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

57

def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

58

}

59

```

60

61

### Partition Operations

62

63

```scala { .api }

64

abstract class RDD[T] {

65

def mapPartitions[U: ClassTag](

66

f: Iterator[T] => Iterator[U],

67

preservesPartitioning: Boolean = false

68

): RDD[U]

69

70

def mapPartitionsWithIndex[U: ClassTag](

71

f: (Int, Iterator[T]) => Iterator[U],

72

preservesPartitioning: Boolean = false

73

): RDD[U]

74

75

def foreachPartition(f: Iterator[T] => Unit): Unit

76

def glom(): RDD[Array[T]]

77

}

78

```

79

80

### Repartitioning

81

82

```scala { .api }

83

abstract class RDD[T] {

84

def repartition(numPartitions: Int): RDD[T]

85

def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]

86

def sortBy[K](

87

f: T => K,

88

ascending: Boolean = true,

89

numPartitions: Int = this.partitions.length

90

)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

91

}

92

```

93

94

### Pairing & Zipping Operations

95

96

```scala { .api }

97

abstract class RDD[T] {

98

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

99

def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B])(

100

f: (Iterator[T], Iterator[B]) => Iterator[V]

101

): RDD[V]

102

def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C])(

103

f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]

104

): RDD[V]

105

def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag](

106

rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D]

107

)(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V]

108

def zipWithIndex(): RDD[(T, Long)]

109

def zipWithUniqueId(): RDD[(T, Long)]

110

def keyBy[K](f: T => K): RDD[(K, T)]

111

}

112

```

113

114

### External Command Processing

115

116

```scala { .api }

117

abstract class RDD[T] {

118

def pipe(command: String): RDD[String]

119

def pipe(command: String, env: Map[String, String]): RDD[String]

120

def pipe(

121

command: Seq[String],

122

env: Map[String, String] = Map(),

123

printPipeContext: (String => Unit) => Unit = null,

124

printRDDElement: (T, String => Unit) => Unit = null,

125

separateWorkingDir: Boolean = false,

126

bufferSize: Int = 8192,

127

encoding: String = Codec.defaultCharsetCodec.name

128

): RDD[String]

129

}

130

```

131

132

## Actions

133

134

Actions trigger the execution of RDD transformations and return results to the driver or save data to storage.

135

136

### Collection Actions

137

138

```scala { .api }

139

abstract class RDD[T] {

140

def collect(): Array[T]

141

def toLocalIterator: Iterator[T]

142

def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U]

143

def take(num: Int): Array[T]

144

def first(): T

145

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

146

def top(num: Int)(implicit ord: Ordering[T]): Array[T]

147

}

148

```

149

150

### Reduction Actions

151

152

```scala { .api }

153

abstract class RDD[T] {

154

def reduce(f: (T, T) => T): T

155

def treeReduce(f: (T, T) => T, depth: Int = 2): T

156

def fold(zeroValue: T)(op: (T, T) => T): T

157

def aggregate[U: ClassTag](zeroValue: U)(

158

seqOp: (U, T) => U,

159

combOp: (U, U) => U

160

): U

161

def treeAggregate[U: ClassTag](zeroValue: U)(

162

seqOp: (U, T) => U,

163

combOp: (U, U) => U,

164

depth: Int = 2

165

): U

166

}

167

```

168

169

### Counting Actions

170

171

```scala { .api }

172

abstract class RDD[T] {

173

def count(): Long

174

def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]

175

def countByValue()(implicit ord: Ordering[T]): Map[T, Long]

176

def countByValueApprox(timeout: Long, confidence: Double = 0.95)(implicit ord: Ordering[T]): PartialResult[Map[T, BoundedDouble]]

177

def countApproxDistinct(relativeSD: Double = 0.05): Long

178

def countApproxDistinct(p: Int, sp: Int): Long

179

def isEmpty(): Boolean

180

}

181

```

182

183

### Statistical Actions

184

185

```scala { .api }

186

abstract class RDD[T] {

187

def max()(implicit ord: Ordering[T]): T

188

def min()(implicit ord: Ordering[T]): T

189

}

190

```

191

192

### Iterative Actions

193

194

```scala { .api }

195

abstract class RDD[T] {

196

def foreach(f: T => Unit): Unit

197

def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]

198

}

199

```

200

201

## Pair RDD Operations

202

203

When RDDs contain key-value pairs, additional operations become available through implicit conversions.

204

205

### Key-Value Transformations

206

207

```scala { .api }

208

// Available on RDD[(K, V)] through implicit conversion to PairRDDFunctions

209

class PairRDDFunctions[K, V](self: RDD[(K, V)]) {

210

def keys: RDD[K]

211

def values: RDD[V]

212

def mapValues[U](f: V => U): RDD[(K, U)]

213

def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]

214

}

215

```

216

217

### Grouping Operations

218

219

```scala { .api }

220

class PairRDDFunctions[K, V] {

221

def groupByKey(): RDD[(K, Iterable[V])]

222

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]

223

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

224

225

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

226

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

227

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

228

229

def aggregateByKey[U: ClassTag](zeroValue: U)(

230

seqOp: (U, V) => U,

231

combOp: (U, U) => U

232

): RDD[(K, U)]

233

def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(

234

seqOp: (U, V) => U,

235

combOp: (U, U) => U

236

): RDD[(K, U)]

237

def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(

238

seqOp: (U, V) => U,

239

combOp: (U, U) => U

240

): RDD[(K, U)]

241

242

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

243

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

244

def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

245

246

def combineByKey[C](

247

createCombiner: V => C,

248

mergeValue: (C, V) => C,

249

mergeCombiners: (C, C) => C

250

): RDD[(K, C)]

251

def combineByKey[C](

252

createCombiner: V => C,

253

mergeValue: (C, V) => C,

254

mergeCombiners: (C, C) => C,

255

numPartitions: Int

256

): RDD[(K, C)]

257

def combineByKey[C](

258

createCombiner: V => C,

259

mergeValue: (C, V) => C,

260

mergeCombiners: (C, C) => C,

261

partitioner: Partitioner

262

): RDD[(K, C)]

263

}

264

```

265

266

### Join Operations

267

268

```scala { .api }

269

class PairRDDFunctions[K, V] {

270

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

271

def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]

272

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

273

274

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

275

def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]

276

def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]

277

278

def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]

279

def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]

280

def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]

281

282

def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]

283

def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]

284

def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]

285

286

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

287

def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]

288

def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]

289

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]

290

def cogroup[W1, W2, W3](

291

other1: RDD[(K, W1)],

292

other2: RDD[(K, W2)],

293

other3: RDD[(K, W3)]

294

): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]

295

}

296

```

297

298

### Set Operations on Pairs

299

300

```scala { .api }

301

class PairRDDFunctions[K, V] {

302

def subtractByKey[W](other: RDD[(K, W)]): RDD[(K, V)]

303

def subtractByKey[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]

304

def subtractByKey[W](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]

305

}

306

```

307

308

### Sorting Operations

309

310

```scala { .api }

311

// Available on RDD[(K, V)] where K is Ordered

312

class OrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](self: RDD[(K, V)]) {

313

def sortByKey(ascending: Boolean = true): RDD[(K, V)]

314

def sortByKey(ascending: Boolean, numPartitions: Int): RDD[(K, V)]

315

def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]

316

}

317

```

318

319

### Partitioning Operations

320

321

```scala { .api }

322

class PairRDDFunctions[K, V] {

323

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

324

def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]

325

}

326

```

327

328

### Lookup and Collection

329

330

```scala { .api }

331

class PairRDDFunctions[K, V] {

332

def lookup(key: K): Seq[V]

333

def collectAsMap(): Map[K, V]

334

def reduceByKeyLocally(func: (V, V) => V): Map[K, V]

335

def countByKey(): Map[K, Long]

336

}

337

```

338

339

## Numeric RDD Operations

340

341

For RDDs containing numeric values, statistical operations are available.

342

343

```scala { .api }

344

// Available on RDD[Double] through implicit conversion

345

class DoubleRDDFunctions(self: RDD[Double]) {

346

def sum(): Double

347

def stats(): StatCounter

348

def mean(): Double

349

def variance(): Double

350

def stdev(): Double

351

def sampleStdev(): Double

352

def sampleVariance(): Double

353

def histogram(buckets: Array[Double]): Array[Long]

354

def histogram(buckets: Int): (Array[Double], Array[Long])

355

}

356

```

357

358

## Output Operations

359

360

```scala { .api }

361

abstract class RDD[T] {

362

def saveAsTextFile(path: String): Unit

363

def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit

364

def saveAsObjectFile(path: String): Unit

365

}

366

367

// For pair RDDs

368

class PairRDDFunctions[K, V] {

369

def saveAsHadoopFile[F <: OutputFormat[_, _]](

370

path: String,

371

keyClass: Class[_],

372

valueClass: Class[_],

373

outputFormatClass: Class[F],

374

conf: JobConf = new JobConf(self.context.hadoopConfiguration)

375

): Unit

376

377

def saveAsHadoopDataset(conf: JobConf): Unit

378

379

def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](

380

path: String,

381

keyClass: Class[_],

382

valueClass: Class[_],

383

outputFormatClass: Class[F],

384

conf: Configuration = self.context.hadoopConfiguration

385

): Unit

386

387

def saveAsNewAPIHadoopDataset(conf: Configuration): Unit

388

}

389

390

// For SequenceFile output

391

class SequenceFileRDDFunctions[K, V](self: RDD[(K, V)]) {

392

def saveAsSequenceFile(

393

path: String,

394

codec: Option[Class[_ <: CompressionCodec]] = None

395

): Unit

396

}

397

```

398

399

## Usage Examples

400

401

### Basic Transformations and Actions

402

403

```scala

404

val rdd = sc.parallelize(1 to 100)

405

406

// Transformations (lazy)

407

val evenNumbers = rdd.filter(_ % 2 == 0)

408

val doubled = evenNumbers.map(_ * 2)

409

val distinctValues = doubled.distinct()

410

411

// Actions (trigger computation)

412

val result = distinctValues.collect()

413

val count = distinctValues.count()

414

val sum = distinctValues.reduce(_ + _)

415

```

416

417

### Working with Key-Value Pairs

418

419

```scala

420

val pairs = sc.parallelize(Seq(

421

("apple", 1), ("banana", 2), ("apple", 3), ("cherry", 1)

422

))

423

424

// Group by key

425

val grouped = pairs.groupByKey()

426

// Result: ("apple", [1, 3]), ("banana", [2]), ("cherry", [1])

427

428

// Reduce by key

429

val totals = pairs.reduceByKey(_ + _)

430

// Result: ("apple", 4), ("banana", 2), ("cherry", 1)

431

432

// Join with another RDD

433

val prices = sc.parallelize(Seq(("apple", 0.5), ("banana", 0.3)))

434

val joined = totals.join(prices)

435

// Result: ("apple", (4, 0.5)), ("banana", (2, 0.3))

436

```

437

438

### Advanced Aggregations

439

440

```scala

441

val sales = sc.parallelize(Seq(

442

("store1", 100), ("store2", 200), ("store1", 150), ("store2", 175)

443

))

444

445

// Aggregate by key with different data types

446

case class SalesSummary(totalSales: Int, count: Int, avgSale: Double)

447

448

val summary = sales.aggregateByKey(SalesSummary(0, 0, 0.0))(

449

seqOp = { (summary, sale) =>

450

val newTotal = summary.totalSales + sale

451

val newCount = summary.count + 1

452

SalesSummary(newTotal, newCount, newTotal.toDouble / newCount)

453

},

454

combOp = { (sum1, sum2) =>

455

val combinedTotal = sum1.totalSales + sum2.totalSales

456

val combinedCount = sum1.count + sum2.count

457

SalesSummary(combinedTotal, combinedCount, combinedTotal.toDouble / combinedCount)

458

}

459

)

460

```

461

462

### Partition-Level Operations

463

464

```scala

465

val rdd = sc.parallelize(1 to 100, numSlices = 4)

466

467

// Process each partition independently

468

val partitionSums = rdd.mapPartitions { iter =>

469

Iterator(iter.sum)

470

}

471

472

// Access partition index

473

val partitionInfo = rdd.mapPartitionsWithIndex { (index, iter) =>

474

val partitionSize = iter.size

475

Iterator((s"partition-$index", partitionSize))

476

}

477

478

// Convert each partition to an array

479

val partitionArrays = rdd.glom()

480

```

481

482

### Statistical Operations

483

484

```scala

485

val numbers = sc.parallelize((1 to 1000).map(_.toDouble))

486

487

// Basic statistics

488

val stats = numbers.stats()

489

val mean = numbers.mean()

490

val variance = numbers.variance()

491

val stdev = numbers.stdev()

492

493

// Histogram

494

val (buckets, counts) = numbers.histogram(10)

495

```

496

497

### Repartitioning Examples

498

499

```scala

500

val rdd = sc.parallelize(1 to 100, numSlices = 10)

501

502

// Reduce number of partitions without shuffle

503

val coalesced = rdd.coalesce(5)

504

505

// Repartition with shuffle (can increase or decrease partitions)

506

val repartitioned = rdd.repartition(20)

507

508

// Sort by key function

509

val sorted = rdd.sortBy(x => -x) // Sort in descending order

510

```

511

512

## Performance Considerations

513

514

### Transformation Optimization

515

516

- **Use `mapPartitions` instead of `map`** when processing entire partitions is more efficient

517

- **Prefer `reduceByKey` over `groupByKey`** followed by reduce operations

518

- **Use `coalesce` instead of `repartition`** when only reducing partition count

519

- **Consider `combineByKey`** for complex aggregations that can't be expressed with other operations

520

521

### Memory Management

522

523

- **Call `unpersist()` on cached RDDs** when they're no longer needed

524

- **Use appropriate storage levels** for different access patterns

525

- **Monitor partition sizes** to avoid memory issues

526

527

### Shuffle Optimization

528

529

- **Use partitioners** to control data distribution and minimize shuffles

530

- **Co-partition related datasets** to enable map-side joins

531

- **Tune shuffle parameters** like `spark.sql.shuffle.partitions`

532

533

## Important Notes

534

535

- **RDDs are immutable** - transformations create new RDDs

536

- **Transformations are lazy** - they're only executed when an action is called

537

- **Actions are eager** - they trigger execution of the entire computation graph

538

- **Lineage tracking** enables automatic recovery from failures

539

- **Partitioning matters** - well-partitioned data improves performance significantly

540

- **Avoid collect() on large datasets** - it brings all data to the driver node