or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdio-operations.mdkey-value-operations.mdpartitioning-shuffling.mdrdd-operations.mdshared-variables.mdspark-context.mdstorage-persistence.md

rdd-operations.mddocs/

0

# RDD Operations

1

2

Resilient Distributed Datasets (RDDs) provide the core abstraction for distributed data processing in Spark, offering fault-tolerant distributed collections with transformations and actions.

3

4

## Capabilities

5

6

### RDD Base Class

7

8

Abstract base class representing an immutable, partitioned collection of elements that can be operated on in parallel.

9

10

```scala { .api }

11

/**

12

* Resilient Distributed Dataset (RDD) - basic abstraction in Spark

13

*/

14

abstract class RDD[T: ClassTag](

15

@transient private var _sc: SparkContext,

16

@transient private var deps: Seq[Dependency[_]]

17

) extends Serializable

18

19

// Core transformation methods (lazy evaluation)

20

def map[U: ClassTag](f: T => U): RDD[U]

21

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

22

def filter(f: T => Boolean): RDD[T]

23

def distinct(): RDD[T]

24

def distinct(numPartitions: Int): RDD[T]

25

def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

26

27

// Set operations

28

def union(other: RDD[T]): RDD[T]

29

def ++(other: RDD[T]): RDD[T] // alias for union

30

def intersection(other: RDD[T]): RDD[T]

31

def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]

32

def intersection(other: RDD[T], numPartitions: Int): RDD[T]

33

def subtract(other: RDD[T]): RDD[T]

34

def subtract(other: RDD[T], numPartitions: Int): RDD[T]

35

def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]

36

def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]

37

38

// Grouping and sorting

39

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

40

def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

41

def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null): RDD[(K, Iterable[T])]

42

def sortBy[K](f: T => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

43

44

// Advanced transformations

45

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

46

def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

47

def glom(): RDD[Array[T]]

48

def keyBy[K](f: T => K): RDD[(K, T)]

49

def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]

50

51

// Zipping operations

52

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

53

def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]

54

def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]

55

def zipWithIndex(): RDD[(T, Long)]

56

def zipWithUniqueId(): RDD[(T, Long)]

57

58

// Partitioning operations

59

def repartition(numPartitions: Int): RDD[T]

60

def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]

61

def partitionBy(partitioner: Partitioner): RDD[T] // only for pair RDDs

62

63

// Core action methods (trigger execution)

64

def collect(): Array[T]

65

def count(): Long

66

def first(): T

67

def take(num: Int): Array[T]

68

def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]

69

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

70

def top(num: Int)(implicit ord: Ordering[T]): Array[T]

71

def max()(implicit ord: Ordering[T]): T

72

def min()(implicit ord: Ordering[T]): T

73

def isEmpty(): Boolean

74

def toLocalIterator: Iterator[T]

75

76

// Counting actions

77

def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]

78

def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]

79

def countByValueApprox(timeout: Long, confidence: Double = 0.95)(implicit ord: Ordering[T] = null): PartialResult[Map[T, BoundedDouble]]

80

def countApproxDistinct(p: Int, sp: Int): Long

81

def countApproxDistinct(relativeSD: Double = 0.05): Long

82

83

// Reduction operations

84

def reduce(f: (T, T) => T): T

85

def fold(zeroValue: T)(op: (T, T) => T): T

86

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

87

def treeReduce(f: (T, T) => T, depth: Int = 2): T

88

def treeAggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U, depth: Int = 2): U

89

90

// Side-effect operations

91

def foreach(f: T => Unit): Unit

92

def foreachPartition(f: Iterator[T] => Unit): Unit

93

94

// Persistence methods

95

def persist(): RDD[T]

96

def persist(storageLevel: StorageLevel): RDD[T]

97

def cache(): RDD[T] // equivalent to persist(MEMORY_ONLY)

98

def unpersist(blocking: Boolean = true): RDD[T]

99

def getStorageLevel: StorageLevel

100

101

// Checkpointing methods

102

def checkpoint(): Unit

103

def localCheckpoint(): RDD[T]

104

def isCheckpointed: Boolean

105

def getCheckpointFile: Option[String]

106

107

// File output operations

108

def saveAsTextFile(path: String): Unit

109

def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit

110

def saveAsObjectFile(path: String): Unit

111

112

// Piping operations

113

def pipe(command: String): RDD[String]

114

def pipe(command: String, env: Map[String, String]): RDD[String]

115

def pipe(command: Seq[String], env: Map[String, String] = Map(),

116

printPipeContext: (String => Unit) => Unit = null,

117

printRDDElement: (T, String => Unit) => Unit = null,

118

separateWorkingDir: Boolean = false): RDD[String]

119

120

// Metadata and utility methods

121

def setName(name: String): RDD[T]

122

val name: String

123

val id: Int

124

def partitions: Array[Partition]

125

def dependencies: Seq[Dependency[_]]

126

val partitioner: Option[Partitioner]

127

def getNumPartitions: Int

128

def sparkContext: SparkContext

129

def context: SparkContext // alias for sparkContext

130

def toJavaRDD(): JavaRDD[T]

131

def toDebugString: String

132

133

// Special collection methods

134

def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U]

135

```

136

137

**Usage Examples:**

138

139

```scala

140

import org.apache.spark.{SparkContext, SparkConf}

141

import org.apache.spark.storage.StorageLevel

142

143

val sc = new SparkContext(new SparkConf().setAppName("RDD Examples").setMaster("local[*]"))

144

145

// Create RDD from collection

146

val numbers = sc.parallelize(1 to 100)

147

148

// Transformations (lazy evaluation)

149

val evenNumbers = numbers.filter(_ % 2 == 0)

150

val squares = evenNumbers.map(x => x * x)

151

val distinct = squares.distinct()

152

153

// Actions (trigger execution)

154

val result = squares.collect()

155

val count = squares.count()

156

val sum = squares.reduce(_ + _)

157

val firstTen = squares.take(10)

158

159

// Complex transformations

160

val words = sc.textFile("hdfs://path/to/file.txt")

161

val wordCounts = words

162

.flatMap(_.split(" "))

163

.map(word => (word, 1))

164

.reduceByKey(_ + _)

165

166

// Persistence for reuse

167

val cachedRDD = numbers.map(_ * 2).cache()

168

val result1 = cachedRDD.sum()

169

val result2 = cachedRDD.max() // Uses cached data

170

171

// Advanced operations

172

val grouped = numbers.groupBy(_ % 10)

173

val sorted = numbers.sortBy(identity, ascending = false)

174

val sampled = numbers.sample(withReplacement = false, 0.1)

175

```

176

177

### Numeric RDD Operations (DoubleRDDFunctions)

178

179

Additional operations available for RDDs containing numeric values.

180

181

```scala { .api }

182

/**

183

* Extra functions available on RDDs of Doubles through an implicit conversion

184

*/

185

class DoubleRDDFunctions(self: RDD[Double]) {

186

/** Compute the mean of this RDD's elements */

187

def mean(): Double

188

189

/** Compute the sum of this RDD's elements */

190

def sum(): Double

191

192

/** Find the minimum element in this RDD */

193

def min(): Double

194

195

/** Find the maximum element in this RDD */

196

def max(): Double

197

198

/** Compute the variance of this RDD's elements */

199

def variance(): Double

200

201

/** Compute the standard deviation of this RDD's elements */

202

def stdev(): Double

203

204

/** Compute the sample standard deviation of this RDD's elements */

205

def sampleStdev(): Double

206

207

/** Compute the sample variance of this RDD's elements */

208

def sampleVariance(): Double

209

210

/** Compute a histogram of the RDD using buckets evenly spaced between the minimum and maximum */

211

def histogram(buckets: Int): (Array[Double], Array[Long])

212

213

/** Compute a histogram using provided bucket boundaries */

214

def histogram(buckets: Array[Double]): Array[Long]

215

216

/** Return approximate percentiles */

217

def approxQuantile(probabilities: Array[Double], relativeError: Double): Array[Double]

218

219

/** Compute column summary statistics */

220

def stats(): StatCounter

221

}

222

```

223

224

**Usage Examples:**

225

226

```scala

227

val doubleRDD = sc.parallelize(Array(1.0, 2.5, 3.7, 4.2, 5.8, 6.1))

228

229

// Statistical operations (implicit conversion)

230

val mean = doubleRDD.mean()

231

val stdDev = doubleRDD.stdev()

232

val variance = doubleRDD.variance()

233

val summary = doubleRDD.stats()

234

235

// Histogram

236

val (buckets, counts) = doubleRDD.histogram(5)

237

println(s"Histogram: ${buckets.zip(counts).mkString(", ")}")

238

```

239

240

### Asynchronous RDD Actions

241

242

Asynchronous versions of RDD actions that return futures for non-blocking execution.

243

244

```scala { .api }

245

/**

246

* Extra functions for performing asynchronous operations on RDDs

247

*/

248

class AsyncRDDActions[T: ClassTag](self: RDD[T]) {

249

/** Returns a future for retrieving all elements of this RDD */

250

def collectAsync(): FutureAction[Array[T]]

251

252

/** Returns a future for counting the number of elements in the RDD */

253

def countAsync(): FutureAction[Long]

254

255

/** Returns a future for retrieving the first element in this RDD */

256

def foreachAsync(f: T => Unit): FutureAction[Unit]

257

258

/** Returns a future for applying a function to each partition of this RDD */

259

def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit]

260

261

/** Returns a future for retrieving the first num elements of the RDD */

262

def takeAsync(num: Int): FutureAction[Array[T]]

263

}

264

```

265

266

### Partition-Level Operations

267

268

Operations that work at the partition level for performance optimization.

269

270

```scala { .api }

271

// Partition-level transformation methods in RDD

272

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

273

def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

274

def foreachPartition(f: Iterator[T] => Unit): Unit

275

def glom(): RDD[Array[T]] // convert each partition into an array

276

def pipe(command: String): RDD[String]

277

def pipe(command: Seq[String]): RDD[String]

278

```

279

280

**Usage Examples:**

281

282

```scala

283

val data = sc.parallelize(1 to 100, 4) // 4 partitions

284

285

// Process each partition independently

286

val partitionSums = data.mapPartitions { iter =>

287

val sum = iter.sum

288

Iterator(sum)

289

}

290

291

// Access partition index

292

val partitionInfo = data.mapPartitionsWithIndex { (index, iter) =>

293

val count = iter.size

294

Iterator((index, count))

295

}

296

297

// Convert partitions to arrays

298

val partitionArrays = data.glom()

299

val arrays = partitionArrays.collect() // Array of arrays

300

301

// External command processing

302

val piped = data.pipe("grep '5'")

303

```

304

305

### Zip Operations

306

307

Operations for combining RDDs element-wise.

308

309

```scala { .api }

310

// Zip operations in RDD

311

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

312

def zipWithIndex(): RDD[(T, Long)]

313

def zipWithUniqueId(): RDD[(T, Long)]

314

def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]

315

def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V]

316

```

317

318

**Usage Examples:**

319

320

```scala

321

val rdd1 = sc.parallelize(Array("a", "b", "c"))

322

val rdd2 = sc.parallelize(Array(1, 2, 3))

323

324

// Zip two RDDs

325

val zipped = rdd1.zip(rdd2) // RDD[("a", 1), ("b", 2), ("c", 3)]

326

327

// Zip with index

328

val withIndex = rdd1.zipWithIndex() // RDD[("a", 0), ("b", 1), ("c", 2)]

329

330

// Zip with unique ID

331

val withUniqueId = rdd1.zipWithUniqueId()

332

333

// Custom zip operation on partitions

334

val customZip = rdd1.zipPartitions(rdd2) { (iter1, iter2) =>

335

iter1.zip(iter2).map { case (str, num) => s"$str-$num" }

336

}

337

```

338

339

## Common RDD Patterns

340

341

### Efficient Data Processing Pipeline

342

343

```scala

344

val input = sc.textFile("hdfs://input/data.txt")

345

346

val processed = input

347

.filter(_.nonEmpty) // Remove empty lines

348

.map(_.toLowerCase.trim) // Normalize

349

.flatMap(_.split("\\s+")) // Split into words

350

.filter(_.length > 3) // Filter short words

351

.map((_, 1)) // Create pairs

352

.reduceByKey(_ + _) // Count occurrences

353

.filter(_._2 > 10) // Filter rare words

354

.sortBy(_._2, ascending = false) // Sort by count

355

.cache() // Cache for reuse

356

357

// Multiple actions on cached RDD

358

val topWords = processed.take(100)

359

val totalUnique = processed.count()

360

processed.saveAsTextFile("hdfs://output/results")

361

```

362

363

### Error Handling and Debugging

364

365

```scala

366

import org.apache.spark.TaskContext

367

368

val dataRDD = sc.parallelize(1 to 1000)

369

370

// Add debugging information

371

val debugRDD = dataRDD.mapPartitionsWithIndex { (partitionId, iter) =>

372

val taskContext = TaskContext.get()

373

println(s"Processing partition $partitionId on ${taskContext.taskAttemptId()}")

374

375

iter.map { value =>

376

try {

377

// Some processing that might fail

378

if (value % 100 == 0) throw new RuntimeException(s"Error processing $value")

379

value * 2

380

} catch {

381

case e: Exception =>

382

println(s"Error in partition $partitionId: ${e.getMessage}")

383

-1 // Error marker

384

}

385

}

386

}

387

388

// Filter out error markers and get debug info

389

val cleanData = debugRDD.filter(_ != -1)

390

println(s"Debug info: ${dataRDD.toDebugString}")

391

```