or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

broadcast-accumulators.mddata-io-persistence.mdindex.mdkey-value-operations.mdrdd-operations.mdsparkcontext.md

rdd-operations.mddocs/

0

# RDD Operations

1

2

RDD (Resilient Distributed Dataset) operations form the core of Spark's distributed computing model. Operations are divided into transformations (lazy evaluation) and actions (trigger computation). All RDDs are immutable and fault-tolerant through lineage tracking.

3

4

## Capabilities

5

6

### Core Transformations

7

8

Lazy operations that return new RDDs without triggering computation.

9

10

```scala { .api }

11

abstract class RDD[T: ClassTag] {

12

// Element-wise transformations

13

def map[U: ClassTag](f: T => U): RDD[U]

14

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

15

def filter(f: T => Boolean): RDD[T]

16

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],

17

preservesPartitioning: Boolean = false): RDD[U]

18

def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U],

19

preservesPartitioning: Boolean = false): RDD[U]

20

}

21

```

22

23

**Usage Examples:**

24

```scala

25

val numbers = sc.parallelize(1 to 10)

26

27

// Transform each element

28

val doubled = numbers.map(_ * 2)

29

30

// Filter elements

31

val evens = numbers.filter(_ % 2 == 0)

32

33

// Flat map: one-to-many transformation

34

val words = sc.parallelize(Seq("hello world", "scala spark"))

35

val allWords = words.flatMap(_.split(" "))

36

37

// Process entire partitions

38

val partitionSums = numbers.mapPartitions { iter =>

39

Iterator(iter.sum)

40

}

41

```

42

43

### Set Operations

44

45

Mathematical set operations on RDDs.

46

47

```scala { .api }

48

// Set operations

49

def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

50

def distinct(): RDD[T]

51

def union(other: RDD[T]): RDD[T]

52

def ++(other: RDD[T]): RDD[T] // Alias for union

53

def intersection(other: RDD[T]): RDD[T]

54

def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]

55

def subtract(other: RDD[T]): RDD[T]

56

def subtract(other: RDD[T], numPartitions: Int): RDD[T]

57

def cartesian[U](other: RDD[U]): RDD[(T, U)]

58

```

59

60

**Usage Examples:**

61

```scala

62

val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5))

63

val rdd2 = sc.parallelize(Seq(4, 5, 6, 7, 8))

64

65

// Union: combine all elements

66

val combined = rdd1.union(rdd2) // [1,2,3,4,5,4,5,6,7,8]

67

68

// Intersection: common elements

69

val common = rdd1.intersection(rdd2) // [4,5]

70

71

// Subtract: elements in rdd1 but not in rdd2

72

val difference = rdd1.subtract(rdd2) // [1,2,3]

73

74

// Remove duplicates

75

val unique = combined.distinct() // [1,2,3,4,5,6,7,8]

76

77

// Cartesian product

78

val cartesian = rdd1.cartesian(rdd2) // [(1,4), (1,5), (1,6), ...]

79

```

80

81

### Partitioning Operations

82

83

Control data distribution across cluster nodes.

84

85

```scala { .api }

86

// Repartitioning

87

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

88

def coalesce(numPartitions: Int, shuffle: Boolean = false,

89

partitionCoalescer: Option[PartitionCoalescer] = Option.empty)

90

(implicit ord: Ordering[T] = null): RDD[T]

91

92

// Partitioning info

93

def partitions: Array[Partition]

94

def getNumPartitions: Int

95

def partitioner: Option[Partitioner]

96

```

97

98

**Usage Examples:**

99

```scala

100

val data = sc.parallelize(1 to 100, 10) // 10 partitions

101

102

// Increase partitions (causes shuffle)

103

val morePartitions = data.repartition(20)

104

105

// Decrease partitions (avoid shuffle when possible)

106

val fewerPartitions = data.coalesce(5)

107

108

// Check partition count

109

println(s"Original: ${data.getNumPartitions} partitions")

110

println(s"Repartitioned: ${morePartitions.getNumPartitions} partitions")

111

```

112

113

### Sampling Operations

114

115

Random sampling and data splitting operations.

116

117

```scala { .api }

118

// Sampling

119

def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

120

def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]

121

def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]

122

```

123

124

**Usage Examples:**

125

```scala

126

val data = sc.parallelize(1 to 1000)

127

128

// Sample 10% of data without replacement

129

val sample10pct = data.sample(withReplacement = false, fraction = 0.1)

130

131

// Take exactly 50 random elements

132

val sample50 = data.takeSample(withReplacement = false, num = 50)

133

134

// Split into training (80%) and test (20%) sets

135

val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 12345)

136

```

137

138

### Core Actions

139

140

Operations that trigger computation and return results to the driver.

141

142

```scala { .api }

143

// Collection actions

144

def collect(): Array[T]

145

def collectAsMap(): Map[K, V] // For RDD[(K,V)]

146

def toLocalIterator: Iterator[T]

147

148

// Reduction actions

149

def reduce(f: (T, T) => T): T

150

def fold(zeroValue: T)(op: (T, T) => T): T

151

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

152

def treeReduce(f: (T, T) => T, depth: Int = 2): T

153

def treeAggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U,

154

depth: Int = 2): U

155

```

156

157

**Usage Examples:**

158

```scala

159

val numbers = sc.parallelize(1 to 100)

160

161

// Collect all elements (use with caution for large datasets)

162

val allNumbers = numbers.collect()

163

164

// Reduce to single value

165

val sum = numbers.reduce(_ + _)

166

val max = numbers.reduce(math.max)

167

168

// Fold with initial value

169

val sumWithInitial = numbers.fold(0)(_ + _)

170

171

// Complex aggregation

172

case class Stats(count: Int, sum: Int)

173

val stats = numbers.aggregate(Stats(0, 0))(

174

seqOp = (stats, value) => Stats(stats.count + 1, stats.sum + value),

175

combOp = (s1, s2) => Stats(s1.count + s2.count, s1.sum + s2.sum)

176

)

177

```

178

179

### Element Access Actions

180

181

Retrieve specific elements without collecting entire dataset.

182

183

```scala { .api }

184

// Element access

185

def first(): T

186

def take(num: Int): Array[T]

187

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

188

def top(num: Int)(implicit ord: Ordering[T]): Array[T]

189

def count(): Long

190

def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]

191

def isEmpty(): Boolean

192

```

193

194

**Usage Examples:**

195

```scala

196

val data = sc.parallelize(Seq(5, 2, 8, 1, 9, 3))

197

198

// Get first element

199

val firstElement = data.first() // 5

200

201

// Take first n elements

202

val firstThree = data.take(3) // [5, 2, 8]

203

204

// Take smallest/largest elements

205

val smallest = data.takeOrdered(3) // [1, 2, 3]

206

val largest = data.top(3) // [9, 8, 5]

207

208

// Count elements

209

val totalCount = data.count() // 6

210

211

// Count each value

212

val valueCounts = data.countByValue() // Map(5->1, 2->1, 8->1, ...)

213

```

214

215

### Side-Effect Actions

216

217

Perform operations on each element without returning values.

218

219

```scala { .api }

220

// Side effects

221

def foreach(f: T => Unit): Unit

222

def foreachPartition(f: Iterator[T] => Unit): Unit

223

```

224

225

**Usage Examples:**

226

```scala

227

val data = sc.parallelize(1 to 10)

228

229

// Print each element

230

data.foreach(println)

231

232

// Process each partition

233

data.foreachPartition { partition =>

234

// Setup expensive resources once per partition

235

val database = connectToDatabase()

236

partition.foreach { element =>

237

database.write(element)

238

}

239

database.close()

240

}

241

```

242

243

### Zip Operations

244

245

Combine RDDs by zipping elements together, creating key-value pairs with indices or unique IDs.

246

247

```scala { .api }

248

// Zip operations

249

def zip[U](other: RDD[U]): RDD[(T, U)]

250

def zipWithIndex(): RDD[(T, Long)]

251

def zipWithUniqueId(): RDD[(T, Long)]

252

def zipPartitions[B, V](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]

253

def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V]

254

```

255

256

**Usage Examples:**

257

```scala

258

val rdd1 = sc.parallelize(Seq("a", "b", "c"))

259

val rdd2 = sc.parallelize(Seq(1, 2, 3))

260

261

// Zip two RDDs (must have same number of partitions and elements)

262

val zipped = rdd1.zip(rdd2) // [("a", 1), ("b", 2), ("c", 3)]

263

264

// Zip with element indices

265

val withIndex = rdd1.zipWithIndex() // [("a", 0), ("b", 1), ("c", 2)]

266

267

// Zip with unique IDs

268

val withUniqueId = rdd1.zipWithUniqueId() // [("a", uniqueId1), ("b", uniqueId2), ...]

269

270

// Zip partitions with custom function

271

val numbers1 = sc.parallelize(1 to 6, 2) // 2 partitions: [1,2,3], [4,5,6]

272

val numbers2 = sc.parallelize(10 to 60 by 10, 2) // 2 partitions: [10,20,30], [40,50,60]

273

val sums = numbers1.zipPartitions(numbers2) { (iter1, iter2) =>

274

iter1.zip(iter2).map { case (a, b) => a + b }

275

}

276

// Result: [11, 22, 33, 44, 55, 66]

277

```

278

279

### Utility Operations

280

281

Additional utility methods for RDD management and identification.

282

283

```scala { .api }

284

// RDD naming and identification

285

def setName(name: String): this.type

286

def name: String

287

def id: Int

288

289

// Key creation

290

def keyBy[K](f: T => K): RDD[(K, T)]

291

292

// External process integration

293

def pipe(command: String): RDD[String]

294

def pipe(command: Seq[String]): RDD[String]

295

def pipe(command: Seq[String], env: Map[String, String]): RDD[String]

296

```

297

298

**Usage Examples:**

299

```scala

300

val data = sc.parallelize(1 to 100)

301

302

// Set RDD name for debugging

303

data.setName("MyNumbers")

304

println(s"RDD name: ${data.name}")

305

306

// Create key-value pairs

307

val keyValue = data.keyBy(_ % 10) // Key by remainder when divided by 10

308

309

// Pipe through external command

310

val lines = sc.textFile("input.txt")

311

val processed = lines.pipe("grep ERROR") // Use external grep command

312

```

313

314

## RDD Lineage and Dependencies

315

316

### Lineage Tracking

317

318

RDD lineage enables fault tolerance through automatic recomputation.

319

320

```scala { .api }

321

// Lineage information

322

def dependencies: Seq[Dependency[_]]

323

def toDebugString: String

324

def getCreationSite: String

325

```

326

327

**Usage Example:**

328

```scala

329

val data = sc.parallelize(1 to 100)

330

val processed = data.filter(_ > 50).map(_ * 2)

331

332

// View the computation lineage

333

println(processed.toDebugString)

334

// Output shows the DAG of operations

335

```

336

337

### Advanced Execution Features

338

339

Advanced execution modes and resource management for specialized workloads.

340

341

```scala { .api }

342

// Barrier execution mode

343

def barrier(): RDDBarrier[T]

344

345

// Resource profiles

346

def withResources(rp: ResourceProfile): RDD[T]

347

```

348

349

**Barrier Execution:**

350

```scala

351

// Barrier execution ensures all tasks start simultaneously

352

// Useful for distributed deep learning and synchronous processing

353

val data = sc.parallelize(1 to 100, 4)

354

val barriered = data.barrier().mapPartitions { iterator =>

355

// All partitions start processing at the same time

356

// Useful for distributed model training

357

processWithBarrier(iterator)

358

}

359

```

360

361

**Resource Profiles:**

362

```scala

363

import org.apache.spark.resource.ResourceProfile

364

365

// Define custom resource requirements

366

val rp = new ResourceProfileBuilder()

367

.require(ExecutorResourceRequests()

368

.memory("4g")

369

.cores(2)

370

.resource("gpu", 1))

371

.build()

372

373

// Apply resource profile to specific RDD operations

374

val gpuRDD = data.withResources(rp).map { item =>

375

// This operation will run on executors with GPU resources

376

processWithGPU(item)

377

}

378

```

379

380

## Types

381

382

```scala { .api }

383

// Core RDD abstraction

384

abstract class RDD[T: ClassTag](

385

@transient private var _sc: SparkContext,

386

@transient private var deps: Seq[Dependency[_]]

387

) extends Serializable

388

389

// Partition representation

390

trait Partition extends Serializable {

391

def index: Int

392

}

393

394

// Dependency types

395

abstract class Dependency[T] extends Serializable

396

class NarrowDependency[T](rdd: RDD[T]) extends Dependency[T]

397

class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](

398

@transient _rdd: RDD[_ <: Product2[K, V]],

399

partitioner: Partitioner

400

) extends Dependency[Product2[K, V]]

401

402

// Task context for advanced operations

403

abstract class TaskContext extends Serializable {

404

def partitionId: Int

405

def stageId: Int

406

def taskAttemptId: Long

407

}

408

```