or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

broadcast-accumulators.mdcontext-configuration.mdindex.mdjava-api.mdkey-value-operations.mdrdd-operations.mdstatus-monitoring.mdstorage-persistence.mdtask-context.md

rdd-operations.mddocs/

0

# RDD Operations

1

2

Resilient Distributed Datasets (RDDs) are the fundamental data abstraction in Spark. They represent immutable, partitioned collections of elements that can be operated on in parallel. RDDs support two types of operations: transformations (which create new RDDs) and actions (which return values).

3

4

## Core RDD Class

5

6

```scala { .api }

7

abstract class RDD[T: ClassTag](

8

@transient private var _sc: SparkContext,

9

@transient private var deps: Seq[Dependency[_]]

10

) {

11

// Transformations - Lazy operations that return new RDDs

12

def map[U: ClassTag](f: T => U): RDD[U]

13

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

14

def filter(f: T => Boolean): RDD[T]

15

def distinct(numPartitions: Int = partitions.length): RDD[T]

16

def union(other: RDD[T]): RDD[T]

17

def intersection(other: RDD[T]): RDD[T]

18

def intersection(other: RDD[T], partitioner: Partitioner): RDD[T]

19

def intersection(other: RDD[T], numPartitions: Int): RDD[T]

20

def subtract(other: RDD[T]): RDD[T]

21

def subtract(other: RDD[T], numPartitions: Int): RDD[T]

22

def subtract(other: RDD[T], p: Partitioner): RDD[T]

23

def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]

24

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

25

def zipWithIndex(): RDD[(T, Long)]

26

def zipWithUniqueId(): RDD[(T, Long)]

27

def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

28

def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]

29

def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]

30

def repartition(numPartitions: Int): RDD[T]

31

def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]

32

def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

33

def glom(): RDD[Array[T]]

34

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

35

def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

36

def pipe(command: String): RDD[String]

37

def pipe(command: Seq[String], env: Map[String, String] = Map(), printPipeRDDInfo: Option[String => Unit] = None, propagateFailure: Boolean = false, checkExitCode: Boolean = true): RDD[String]

38

39

// Actions - Operations that trigger computation and return values

40

def collect(): Array[T]

41

def count(): Long

42

def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]

43

def countApproxDistinct(relativeSD: Double = 0.05): Long

44

def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]

45

def first(): T

46

def isEmpty(): Boolean

47

def take(num: Int): Array[T]

48

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

49

def top(num: Int)(implicit ord: Ordering[T]): Array[T]

50

def min()(implicit ord: Ordering[T]): T

51

def max()(implicit ord: Ordering[T]): T

52

def reduce(f: (T, T) => T): T

53

def treeReduce(f: (T, T) => T, depth: Int = 2): T

54

def fold(zeroValue: T)(op: (T, T) => T): T

55

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

56

def treeAggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U, depth: Int = 2): U

57

def foreach(f: T => Unit): Unit

58

def foreachPartition(f: Iterator[T] => Unit): Unit

59

60

// Persistence and Caching

61

def persist(): this.type

62

def persist(newLevel: StorageLevel): this.type

63

def cache(): this.type

64

def unpersist(blocking: Boolean = true): this.type

65

def checkpoint(): Unit

66

def isCheckpointed: Boolean

67

def getCheckpointFile: Option[String]

68

def localCheckpoint(): this.type

69

70

// Partition and Dependency Information

71

def partitions: Array[Partition]

72

def partitioner: Option[Partitioner]

73

def getNumPartitions: Int

74

def dependencies: Seq[Dependency[_]]

75

def preferredLocations(split: Partition): Seq[String]

76

def compute(split: Partition, context: TaskContext): Iterator[T]

77

78

// Metadata and Debugging

79

def id: Int

80

def name: String

81

def setName(name: String): this.type

82

def toDebugString: String

83

def getStorageLevel: StorageLevel

84

def context: SparkContext

85

}

86

```

87

88

## Transformations

89

90

Transformations are lazy operations that define new RDDs but don't trigger computation immediately.

91

92

### Basic Transformations

93

94

```scala

95

import org.apache.spark.{SparkContext, SparkConf}

96

97

val sc = new SparkContext(new SparkConf().setAppName("RDD Examples").setMaster("local[*]"))

98

99

// Create sample data

100

val numbers = sc.parallelize(1 to 100)

101

val words = sc.parallelize(Seq("hello world", "spark is awesome", "big data processing"))

102

103

// map - Transform each element

104

val squared = numbers.map(x => x * x)

105

val lengths = words.map(_.length)

106

107

// flatMap - Transform each element to multiple elements

108

val allWords = words.flatMap(_.split(" "))

109

val digits = numbers.flatMap(n => n.toString.toCharArray.map(_.asDigit))

110

111

// filter - Keep elements matching predicate

112

val evenNumbers = numbers.filter(_ % 2 == 0)

113

val longWords = allWords.filter(_.length > 4)

114

115

// distinct - Remove duplicates

116

val uniqueWords = allWords.distinct()

117

val uniqueDigits = digits.distinct()

118

119

// sample - Random sampling

120

val sample = numbers.sample(withReplacement = false, fraction = 0.1, seed = 42)

121

val sampleWithReplacement = numbers.sample(withReplacement = true, fraction = 0.2)

122

```

123

124

### Set Operations

125

126

```scala

127

val rdd1 = sc.parallelize(1 to 10)

128

val rdd2 = sc.parallelize(5 to 15)

129

130

// union - Combine RDDs (allows duplicates)

131

val combined = rdd1.union(rdd2)

132

133

// intersection - Elements in both RDDs

134

val common = rdd1.intersection(rdd2)

135

136

// subtract - Elements in first RDD but not second

137

val difference = rdd1.subtract(rdd2)

138

139

// cartesian - Cartesian product

140

val pairs = rdd1.cartesian(rdd2)

141

```

142

143

### Pairing and Zipping

144

145

```scala

146

val data = sc.parallelize(Seq("apple", "banana", "cherry"))

147

val scores = sc.parallelize(Seq(95, 87, 92))

148

149

// zip - Combine elements at same positions

150

val pairs = data.zip(scores)

151

152

// zipWithIndex - Add sequential indices

153

val indexed = data.zipWithIndex()

154

155

// zipWithUniqueId - Add unique IDs (not necessarily sequential)

156

val withIds = data.zipWithUniqueId()

157

```

158

159

### Repartitioning

160

161

```scala

162

val data = sc.parallelize(1 to 1000, numSlices = 8)

163

164

// repartition - Change number of partitions (shuffles data)

165

val repartitioned = data.repartition(4)

166

167

// coalesce - Reduce partitions without shuffling (when possible)

168

val coalesced = data.coalesce(2)

169

170

// sortBy - Sort RDD by key function

171

val sorted = data.sortBy(x => -x, ascending = false) // Sort descending

172

```

173

174

### Advanced Transformations

175

176

```scala

177

// glom - Collect all elements in each partition into arrays

178

val partitionArrays = numbers.glom()

179

180

// mapPartitions - Transform entire partitions

181

val partitionSums = numbers.mapPartitions(iter => Iterator(iter.sum))

182

183

// mapPartitionsWithIndex - Access partition index

184

val partitionInfo = numbers.mapPartitionsWithIndex { (index, iter) =>

185

Iterator((index, iter.sum, iter.size))

186

}

187

188

// pipe - Send data through external process

189

val upperCased = allWords.pipe("tr [a-z] [A-Z]")

190

```

191

192

## Actions

193

194

Actions trigger computation and return results to the driver or save data to external systems.

195

196

### Collection Actions

197

198

```scala

199

val data = sc.parallelize(1 to 100)

200

201

// collect - Bring all elements to driver (use with caution for large datasets)

202

val allElements: Array[Int] = data.collect()

203

204

// take - Get first n elements

205

val first10: Array[Int] = data.take(10)

206

207

// takeOrdered - Get n smallest elements

208

val smallest5: Array[Int] = data.takeOrdered(5)

209

210

// top - Get n largest elements

211

val largest5: Array[Int] = data.top(5)

212

213

// takeSample - Random sample

214

val randomSample: Array[Int] = data.takeSample(withReplacement = false, num = 20)

215

216

// first - Get first element

217

val firstElement: Int = data.first()

218

219

// min/max - Find extremes

220

val minimum: Int = data.min()

221

val maximum: Int = data.max()

222

```

223

224

### Aggregation Actions

225

226

```scala

227

// count - Count elements

228

val totalCount: Long = data.count()

229

230

// countByValue - Count occurrences of each value

231

val wordCounts: Map[String, Long] = allWords.countByValue()

232

233

// reduce - Combine elements with associative function

234

val sum: Int = data.reduce(_ + _)

235

val product: Int = data.reduce(_ * _)

236

237

// fold - Like reduce but with initial value

238

val sumWithZero: Int = data.fold(0)(_ + _)

239

240

// aggregate - More flexible reduction with different input/output types

241

val stats: (Int, Double) = data.aggregate((0, 0.0))(

242

seqOp = { case ((count, sum), value) => (count + 1, sum + value) },

243

combOp = { case ((c1, s1), (c2, s2)) => (c1 + c2, s1 + s2) }

244

)

245

246

// treeReduce/treeAggregate - Hierarchical reduction (more efficient for deep lineages)

247

val treeSum: Int = data.treeReduce(_ + _)

248

```

249

250

### Side-Effect Actions

251

252

```scala

253

// foreach - Execute function on each element (driver side)

254

data.foreach(println)

255

256

// foreachPartition - Execute function on each partition

257

data.foreachPartition { partition =>

258

val connection = createDatabaseConnection()

259

partition.foreach(value => insertToDatabase(connection, value))

260

connection.close()

261

}

262

```

263

264

### Approximate Actions

265

266

```scala

267

// countApprox - Approximate count with timeout

268

val approxCount = data.countApprox(timeout = 1000L, confidence = 0.95)

269

270

// countApproxDistinct - Approximate distinct count

271

val approxDistinct = data.countApproxDistinct(relativeSD = 0.05)

272

```

273

274

## Persistence and Caching

275

276

RDDs can be persisted in memory or disk for efficient reuse across multiple actions.

277

278

```scala

279

import org.apache.spark.storage.StorageLevel

280

281

val expensiveRDD = data

282

.filter(_ > 50)

283

.map(expensiveComputation)

284

.filter(_.nonEmpty)

285

286

// Cache in memory (shorthand for MEMORY_ONLY)

287

expensiveRDD.cache()

288

289

// Explicit persistence with storage level

290

expensiveRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)

291

292

// Use the cached RDD multiple times

293

val result1 = expensiveRDD.count()

294

val result2 = expensiveRDD.collect()

295

val result3 = expensiveRDD.take(10)

296

297

// Remove from cache when done

298

expensiveRDD.unpersist()

299

```

300

301

## Checkpointing

302

303

Checkpointing saves RDD data to reliable storage to truncate lineage and improve fault tolerance.

304

305

```scala

306

// Set checkpoint directory

307

sc.setCheckpointDir("hdfs://namenode:port/checkpoints")

308

309

val complexRDD = data

310

.map(complexTransformation1)

311

.filter(complexFilter)

312

.map(complexTransformation2)

313

.filter(anotherComplexFilter)

314

315

// Checkpoint the RDD

316

complexRDD.checkpoint()

317

318

// Trigger checkpointing with an action

319

complexRDD.count()

320

321

// Verify checkpointing

322

println(s"Is checkpointed: ${complexRDD.isCheckpointed}")

323

println(s"Checkpoint file: ${complexRDD.getCheckpointFile}")

324

```

325

326

## Partition Management

327

328

Understanding and controlling partitions is crucial for performance optimization.

329

330

```scala

331

val data = sc.textFile("large-file.txt", minPartitions = 100)

332

333

// Check partition information

334

println(s"Number of partitions: ${data.getNumPartitions}")

335

println(s"Partitioner: ${data.partitioner}")

336

337

// View partition content

338

data.glom().collect().zipWithIndex.foreach { case (partitionData, index) =>

339

println(s"Partition $index has ${partitionData.length} elements")

340

}

341

342

// Custom partitioning for key-value RDDs

343

val keyValueData = data.map(line => (line.length, line))

344

val hashPartitioned = keyValueData.partitionBy(new HashPartitioner(50))

345

val rangePartitioned = keyValueData.partitionBy(new RangePartitioner(50, keyValueData))

346

```

347

348

## Advanced Usage Patterns

349

350

### Pipeline Optimization

351

352

```scala

353

// Chain transformations efficiently

354

val pipeline = sc.textFile("input")

355

.filter(_.nonEmpty)

356

.map(_.toLowerCase.trim)

357

.filter(_.length > 5)

358

.map(processLine)

359

.filter(_.isValid)

360

.persist(StorageLevel.MEMORY_AND_DISK_SER) // Persist at strategic points

361

362

// Multiple actions on same RDD

363

val validLines = pipeline.cache()

364

val count = validLines.count()

365

val sample = validLines.take(100)

366

val summary = validLines.map(_.summary).collect()

367

```

368

369

### Error Handling

370

371

```scala

372

import scala.util.{Try, Success, Failure}

373

374

val robustRDD = data.map { element =>

375

Try(riskyOperation(element)) match {

376

case Success(result) => Some(result)

377

case Failure(exception) =>

378

logError(s"Failed to process $element: ${exception.getMessage}")

379

None

380

}

381

}.filter(_.isDefined).map(_.get)

382

```

383

384

### Custom Partitioning for Performance

385

386

```scala

387

class CustomPartitioner(numPartitions: Int) extends Partitioner {

388

def numPartitions: Int = numPartitions

389

390

def getPartition(key: Any): Int = {

391

key match {

392

case s: String => math.abs(s.hashCode) % numPartitions

393

case i: Int => i % numPartitions

394

case _ => 0

395

}

396

}

397

}

398

399

val customPartitioned = keyValueData.partitionBy(new CustomPartitioner(100))

400

```