or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

broadcast-accumulators.mdcontext-management.mdindex.mdjava-api.mdpair-rdd-operations.mdrdd-operations.mdstorage-persistence.md

rdd-operations.mddocs/

0

# RDD Operations

1

2

The core RDD API providing transformations and actions for distributed data processing, including map, filter, reduce operations and advanced transformations like joins and aggregations.

3

4

## Capabilities

5

6

### RDD Base Class

7

8

The fundamental abstraction in Spark representing an immutable, partitioned collection of elements that can be operated on in parallel.

9

10

```scala { .api }

11

/**

12

* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,

13

* partitioned collection of elements that can be operated on in parallel.

14

*

15

* @param _sc The SparkContext that created this RDD

16

* @param deps Dependencies on other RDDs

17

*/

18

abstract class RDD[T: ClassTag](

19

@transient private var _sc: SparkContext,

20

@transient private var deps: Seq[Dependency[_]]

21

) extends Serializable with Logging {

22

23

// TRANSFORMATIONS (lazy operations that return new RDDs)

24

25

/** Apply a function to each element and return a new RDD */

26

def map[U: ClassTag](f: T => U): RDD[U]

27

28

/** Apply a function to each element and flatten the results */

29

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

30

31

/** Filter elements using a predicate function */

32

def filter(f: T => Boolean): RDD[T]

33

34

/** Return distinct elements (removes duplicates) */

35

def distinct(): RDD[T]

36

37

/** Return distinct elements with specific number of partitions */

38

def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

39

40

/** Union this RDD with another RDD of the same type */

41

def union(other: RDD[T]): RDD[T]

42

43

/** Return intersection with another RDD */

44

def intersection(other: RDD[T]): RDD[T]

45

46

/** Subtract elements found in another RDD */

47

def subtract(other: RDD[T]): RDD[T]

48

49

/** Return Cartesian product with another RDD */

50

def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]

51

52

/** Sample elements with or without replacement */

53

def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

54

55

/** Split RDD into multiple RDDs using weights array */

56

def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]

57

58

/** Group elements of each partition into an array */

59

def glom(): RDD[Array[T]]

60

61

/** Apply function to each partition */

62

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

63

64

/** Apply function to each partition with partition index */

65

def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

66

67

/** Zip this RDD with another RDD (must have same number of partitions and elements) */

68

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

69

70

/** Zip elements with their indices */

71

def zipWithIndex(): RDD[(T, Long)]

72

73

/** Zip elements with unique IDs */

74

def zipWithUniqueId(): RDD[(T, Long)]

75

76

/** Pipe elements through external command */

77

def pipe(command: String): RDD[String]

78

79

/** Pipe elements through external command with environment */

80

def pipe(command: Seq[String], env: Map[String, String] = Map(), printPipeContext: (String => Unit) => Unit = null, printRDDElement: (T, String => Unit) => Unit = null): RDD[String]

81

82

/** Reduce number of partitions (no shuffle) */

83

def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]

84

85

/** Repartition to different number of partitions (with shuffle) */

86

def repartition(numPartitions: Int): RDD[T]

87

88

/** Sort RDD elements */

89

def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

90

91

// ACTIONS (operations that return values to the driver)

92

93

/** Return all elements as an array */

94

def collect(): Array[T]

95

96

/** Return number of elements */

97

def count(): Long

98

99

/** Return first element */

100

def first(): T

101

102

/** Return first n elements */

103

def take(num: Int): Array[T]

104

105

/** Return top n elements by natural ordering */

106

def top(num: Int)(implicit ord: Ordering[T]): Array[T]

107

108

/** Return smallest n elements by natural ordering */

109

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

110

111

/** Take a sample of elements and return as array */

112

def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]

113

114

/** Apply function to each element (no return value) */

115

def foreach(f: T => Unit): Unit

116

117

/** Apply function to each partition */

118

def foreachPartition(f: Iterator[T] => Unit): Unit

119

120

/** Reduce elements using function */

121

def reduce(f: (T, T) => T): T

122

123

/** Aggregate elements with initial value */

124

def fold(zeroValue: T)(op: (T, T) => T): T

125

126

/** Aggregate elements using different input and output types */

127

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

128

129

/** Collect elements as key-value map (for RDD[(K, V)]) */

130

def collectAsMap(): Map[K, V] // Available when T <: (K, V)

131

132

/** Count elements by value */

133

def countByValue()(implicit ord: Ordering[T]): Map[T, Long]

134

135

/** Check if RDD is empty */

136

def isEmpty(): Boolean

137

138

/** Save as text file */

139

def saveAsTextFile(path: String): Unit

140

141

/** Save as text file with codec */

142

def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit

143

144

/** Save as object file using serialization */

145

def saveAsObjectFile(path: String): Unit

146

147

// PERSISTENCE OPERATIONS

148

149

/** Persist RDD in memory */

150

def cache(): RDD.this.type

151

152

/** Persist RDD with specified storage level */

153

def persist(newLevel: StorageLevel): RDD.this.type

154

155

/** Remove RDD from cache */

156

def unpersist(blocking: Boolean = true): RDD.this.type

157

158

/** Mark RDD for checkpointing */

159

def checkpoint(): Unit

160

161

/** Check if RDD is checkpointed */

162

def isCheckpointed: Boolean

163

164

/** Get checkpoint file if available */

165

def getCheckpointFile: Option[String]

166

167

// INFORMATION METHODS

168

169

/** Get number of partitions */

170

def getNumPartitions: Int

171

172

/** Get partitions array */

173

def partitions: Array[Partition]

174

175

/** Get current storage level */

176

def getStorageLevel: StorageLevel

177

178

/** Get RDD name */

179

def name: String

180

181

/** Set RDD name for display purposes */

182

def setName(_name: String): RDD.this.type

183

184

/** Get RDD ID */

185

def id: Int

186

187

/** Convert to string representation */

188

override def toString: String

189

190

/** Get creation site information */

191

def creationSite: CallSite

192

}

193

```

194

195

**Usage Examples:**

196

197

```scala

198

import org.apache.spark.{SparkContext, SparkConf}

199

import org.apache.spark.storage.StorageLevel

200

201

val sc = new SparkContext(new SparkConf().setAppName("RDD Examples").setMaster("local[*]"))

202

203

// Create RDD from collection

204

val numbers = sc.parallelize(1 to 100, 4)

205

206

// Basic transformations

207

val doubled = numbers.map(_ * 2)

208

val evens = numbers.filter(_ % 2 == 0)

209

val squares = numbers.map(x => x * x)

210

211

// FlatMap example

212

val words = sc.parallelize(Array("hello world", "spark rdd", "distributed computing"))

213

val allWords = words.flatMap(_.split(" "))

214

215

// Distinct and sampling

216

val duplicates = sc.parallelize(Array(1, 2, 2, 3, 3, 3, 4))

217

val unique = duplicates.distinct()

218

val sample = numbers.sample(false, 0.1) // 10% sample without replacement

219

220

// Combining RDDs

221

val rdd1 = sc.parallelize(1 to 5)

222

val rdd2 = sc.parallelize(4 to 8)

223

val combined = rdd1.union(rdd2)

224

val intersection = rdd1.intersection(rdd2)

225

val difference = rdd1.subtract(rdd2)

226

227

// Partitioning operations

228

val repartitioned = numbers.repartition(8) // Force 8 partitions

229

val coalesced = numbers.coalesce(2) // Reduce to 2 partitions without shuffle

230

231

// Actions

232

val collected = numbers.collect() // Returns Array[Int]

233

val count = numbers.count() // Returns Long

234

val sum = numbers.reduce(_ + _) // Returns Int

235

val first10 = numbers.take(10) // Returns Array[Int]

236

237

// Aggregation

238

val stats = numbers.aggregate((0, 0, 0))(

239

seqOp = { case ((sum, count, max), x) => (sum + x, count + 1, math.max(max, x)) },

240

combOp = { case ((sum1, count1, max1), (sum2, count2, max2)) =>

241

(sum1 + sum2, count1 + count2, math.max(max1, max2)) }

242

)

243

244

// Persistence

245

numbers.cache() // Cache in memory

246

numbers.persist(StorageLevel.MEMORY_AND_DISK_SER) // Custom storage level

247

248

// Cleanup

249

numbers.unpersist()

250

sc.stop()

251

```

252

253

### Double RDD Functions

254

255

Additional operations available on RDDs of Double values for statistical computations.

256

257

```scala { .api }

258

/**

259

* Extra functions available on RDDs of Doubles through an implicit conversion.

260

*/

261

class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {

262

263

/** Compute the mean of RDD elements */

264

def mean(): Double

265

266

/** Compute the variance of RDD elements */

267

def variance(): Double

268

269

/** Compute the standard deviation of RDD elements */

270

def stdev(): Double

271

272

/** Compute the population variance of RDD elements */

273

def popVariance(): Double

274

275

/** Compute the population standard deviation of RDD elements */

276

def popStdev(): Double

277

278

/** Compute the sample variance of RDD elements */

279

def sampleVariance(): Double

280

281

/** Compute the sample standard deviation of RDD elements */

282

def sampleStdev(): Double

283

284

/** Compute statistics summary */

285

def stats(): StatCounter

286

287

/** Compute histogram with specified number of buckets */

288

def histogram(buckets: Int): (Array[Double], Array[Long])

289

290

/** Compute histogram with specified bucket boundaries */

291

def histogram(buckets: Array[Double]): Array[Long]

292

293

/** Sum all elements */

294

def sum(): Double

295

}

296

297

/**

298

* Statistics counter for computing mean, variance, etc.

299

*/

300

class StatCounter extends Serializable {

301

def count: Long

302

def mean: Double

303

def sum: Double

304

def min: Double

305

def max: Double

306

def variance: Double

307

def stdev: Double

308

def sampleVariance: Double

309

def sampleStdev: Double

310

}

311

```

312

313

**Usage Examples:**

314

315

```scala

316

val doubleRDD = sc.parallelize(Array(1.0, 2.5, 3.7, 4.2, 5.9, 6.1, 7.8))

317

318

// Basic statistics

319

val mean = doubleRDD.mean() // 4.457...

320

val variance = doubleRDD.variance()

321

val stdev = doubleRDD.stdev()

322

val sum = doubleRDD.sum()

323

324

// Statistics summary

325

val stats = doubleRDD.stats()

326

println(s"Count: ${stats.count}, Mean: ${stats.mean}, StdDev: ${stats.stdev}")

327

328

// Histogram

329

val (buckets, counts) = doubleRDD.histogram(5) // 5 equal-width buckets

330

val customCounts = doubleRDD.histogram(Array(0.0, 2.0, 4.0, 6.0, 8.0)) // Custom buckets

331

```

332

333

### Sequential File Operations

334

335

Operations for RDDs that can be saved as Hadoop SequenceFiles.

336

337

```scala { .api }

338

/**

339

* Extra functions available on RDDs of (key, value) pairs to allow writing to SequenceFiles.

340

*/

341

class SequenceFileRDDFunctions[K <% Writable : ClassTag, V <% Writable : ClassTag](

342

self: RDD[(K, V)]) extends Logging with Serializable {

343

344

/** Save RDD as SequenceFile */

345

def saveAsSequenceFile(path: String): Unit

346

347

/** Save RDD as SequenceFile with compression */

348

def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]]): Unit

349

}

350

```

351

352

**Usage Examples:**

353

354

```scala

355

import org.apache.hadoop.io.{IntWritable, Text}

356

357

// Create RDD of key-value pairs

358

val pairs = sc.parallelize(Array(("key1", "value1"), ("key2", "value2")))

359

360

// Convert to Writable types for SequenceFile

361

val writablePairs = pairs.map { case (k, v) => (new Text(k), new Text(v)) }

362

363

// Save as SequenceFile

364

writablePairs.saveAsSequenceFile("hdfs://path/to/output")

365

366

// Save with compression

367

import org.apache.hadoop.io.compress.GzipCodec

368

writablePairs.saveAsSequenceFile("hdfs://path/to/compressed", Some(classOf[GzipCodec]))

369

```

370

371

### Async RDD Actions

372

373

Asynchronous versions of RDD actions that return FutureAction objects.

374

375

```scala { .api }

376

/**

377

* Asynchronous API for RDD actions.

378

*/

379

class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Logging {

380

381

/** Asynchronously returns all elements of the RDD */

382

def collectAsync(): FutureAction[Array[T]]

383

384

/** Asynchronously returns the number of elements */

385

def countAsync(): FutureAction[Long]

386

387

/** Asynchronously applies a function to all elements */

388

def foreachAsync(f: T => Unit): FutureAction[Unit]

389

390

/** Asynchronously applies function to each partition */

391

def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit]

392

393

/** Asynchronously returns first n elements */

394

def takeAsync(num: Int): FutureAction[Array[T]]

395

}

396

397

/**

398

* A future for the result of an action.

399

*/

400

trait FutureAction[T] extends Future[T] {

401

/** Cancel the action if possible */

402

def cancel(): Unit

403

404

/** Check if action was cancelled */

405

def isCancelled: Boolean

406

407

/** Get job group ID */

408

def jobIds: Array[Int]

409

}

410

```

411

412

**Usage Examples:**

413

414

```scala

415

import scala.concurrent.{Await, ExecutionContext}

416

import scala.concurrent.duration._

417

418

implicit val ec = ExecutionContext.global

419

420

val largeRDD = sc.parallelize(1 to 1000000, 100)

421

422

// Asynchronous operations

423

val futureCount = largeRDD.countAsync()

424

val futureSum = largeRDD.map(_.toLong).reduce(_ + _)

425

426

// Handle results

427

futureCount.onSuccess {

428

case count => println(s"RDD has $count elements")

429

}

430

431

// Wait for completion

432

val count = Await.result(futureCount, 30.seconds)

433

434

// Cancel long-running operations

435

val futureResult = largeRDD.map(heavyComputation).collectAsync()

436

// ... later ...

437

futureResult.cancel()

438

```

439

440

## Performance Considerations

441

442

### Choosing Transformations vs Actions

443

- **Transformations** are lazy - they don't execute until an action is called

444

- Chain multiple transformations before calling actions to optimize execution

445

- Use `cache()` or `persist()` for RDDs that will be reused multiple times

446

447

### Partitioning Strategy

448

- More partitions = better parallelism but higher overhead

449

- Rule of thumb: 2-4 partitions per CPU core

450

- Use `coalesce()` to reduce partitions without shuffle when possible

451

- Use `repartition()` when you need more partitions or better distribution

452

453

### Memory Management

454

- Cache frequently accessed RDDs with appropriate storage levels

455

- Use serialized storage (`MEMORY_ONLY_SER`) to reduce memory usage

456

- Consider `MEMORY_AND_DISK` for large RDDs that don't fit in memory

457

- Call `unpersist()` when RDDs are no longer needed

458

459

### Common Performance Anti-patterns

460

- Avoid `collect()` on large RDDs - it brings all data to the driver

461

- Don't use `countByValue()` on RDDs with high cardinality

462

- Minimize data shuffling operations when possible

463

- Use broadcast variables for small lookup tables instead of joins