or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

accumulators.mdapplication-context.mdbroadcast-variables.mdindex.mdjava-api.mdpartitioning.mdrdd-operations.mdserialization.mdstorage-persistence.md

rdd-operations.mddocs/

0

# RDD Operations

1

2

Resilient Distributed Dataset API providing transformations and actions for fault-tolerant distributed data processing on large datasets.

3

4

## Capabilities

5

6

### Base RDD Class

7

8

Abstract base class for all RDDs providing core distributed dataset functionality with automatic fault recovery through lineage tracking.

9

10

```scala { .api }

11

/**

12

* Resilient Distributed Dataset - immutable distributed collection

13

*/

14

abstract class RDD[T: ClassTag] extends Serializable {

15

// Transformations (lazy evaluation)

16

17

/** Transform each element using provided function */

18

def map[U: ClassTag](f: T => U): RDD[U]

19

20

/** Transform each element to sequence and flatten results */

21

def flatMap[U: ClassTag](f: T => IterableOnce[U]): RDD[U]

22

23

/** Filter elements matching predicate */

24

def filter(f: T => Boolean): RDD[T]

25

26

/** Map each partition with partition index */

27

def mapPartitionsWithIndex[U: ClassTag](

28

f: (Int, Iterator[T]) => Iterator[U],

29

preservesPartitioning: Boolean = false

30

): RDD[U]

31

32

/** Sample fraction of elements */

33

def sample(

34

withReplacement: Boolean,

35

fraction: Double,

36

seed: Long = Utils.random.nextLong

37

): RDD[T]

38

39

/** Return union of this RDD and another */

40

def union(other: RDD[T]): RDD[T]

41

42

/** Return intersection with another RDD */

43

def intersection(other: RDD[T]): RDD[T]

44

45

/** Return distinct elements */

46

def distinct(numPartitions: Int = partitions.length): RDD[T]

47

48

/** Group by key function */

49

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

50

51

/** Reduce partitions to specified number */

52

def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]

53

54

/** Repartition to specified number */

55

def repartition(numPartitions: Int): RDD[T]

56

57

/** Sort RDD elements */

58

def sortBy[K](

59

f: T => K,

60

ascending: Boolean = true,

61

numPartitions: Int = this.partitions.length

62

)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

63

64

/** Zip with another RDD */

65

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

66

67

/** Zip with element indices */

68

def zipWithIndex(): RDD[(T, Long)]

69

70

/** Zip with unique IDs */

71

def zipWithUniqueId(): RDD[(T, Long)]

72

73

// Actions (trigger computation)

74

75

/** Collect all elements to driver */

76

def collect(): Array[T]

77

78

/** Count number of elements */

79

def count(): Long

80

81

/** Return first element */

82

def first(): T

83

84

/** Take first n elements */

85

def take(num: Int): Array[T]

86

87

/** Take ordered elements */

88

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

89

90

/** Take sample */

91

def takeSample(

92

withReplacement: Boolean,

93

num: Int,

94

seed: Long = Utils.random.nextLong

95

): Array[T]

96

97

/** Reduce elements using function */

98

def reduce(f: (T, T) => T): T

99

100

/** Fold elements with initial value */

101

def fold(zeroValue: T)(op: (T, T) => T): T

102

103

/** Aggregate with different types */

104

def aggregate[U: ClassTag](zeroValue: U)(

105

seqOp: (U, T) => U,

106

combOp: (U, U) => U

107

): U

108

109

/** Apply function to each element */

110

def foreach(f: T => Unit): Unit

111

112

/** Apply function to each partition */

113

def foreachPartition(f: Iterator[T] => Unit): Unit

114

115

/** Save as text file */

116

def saveAsTextFile(path: String): Unit

117

def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit

118

119

// Persistence

120

121

/** Persist RDD with storage level */

122

def persist(newLevel: StorageLevel): this.type

123

124

/** Persist RDD with default storage level (MEMORY_ONLY) */

125

def persist(): this.type

126

127

/** Cache RDD in memory */

128

def cache(): this.type

129

130

/** Remove persisted data */

131

def unpersist(blocking: Boolean = false): this.type

132

133

/** Mark RDD for checkpointing */

134

def checkpoint(): Unit

135

136

/** Check if RDD is checkpointed */

137

def isCheckpointed: Boolean

138

139

// Metadata

140

141

/** Get partitions */

142

def partitions: Array[Partition]

143

144

/** Get partitioner */

145

def partitioner: Option[Partitioner]

146

147

/** Check if RDD is empty */

148

def isEmpty(): Boolean

149

150

/** Get storage level */

151

def getStorageLevel: StorageLevel

152

}

153

```

154

155

### PairRDDFunctions

156

157

Additional operations available on RDDs of key-value pairs through implicit conversion.

158

159

```scala { .api }

160

/**

161

* Extra functions for RDDs of (key, value) pairs

162

*/

163

class PairRDDFunctions[K, V](self: RDD[(K, V)]) {

164

/** Group values by key */

165

def groupByKey(): RDD[(K, Iterable[V])]

166

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]

167

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

168

169

/** Reduce values by key */

170

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

171

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

172

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

173

174

/** Aggregate values by key */

175

def aggregateByKey[U: ClassTag](zeroValue: U)(

176

seqOp: (U, V) => U,

177

combOp: (U, U) => U

178

): RDD[(K, U)]

179

def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(

180

seqOp: (U, V) => U,

181

combOp: (U, U) => U

182

): RDD[(K, U)]

183

184

/** Fold values by key */

185

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

186

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

187

188

/** Combine values by key */

189

def combineByKey[C](

190

createCombiner: V => C,

191

mergeValue: (C, V) => C,

192

mergeCombiners: (C, C) => C

193

): RDD[(K, C)]

194

195

/** Join with another pair RDD */

196

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

197

def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]

198

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

199

200

/** Left outer join */

201

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

202

203

/** Right outer join */

204

def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]

205

206

/** Full outer join */

207

def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]

208

209

/** Cogroup with other RDDs */

210

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

211

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]):

212

RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]

213

214

/** Get values for keys */

215

def lookup(key: K): Seq[V]

216

217

/** Collect as map */

218

def collectAsMap(): Map[K, V]

219

220

/** Count by key */

221

def countByKey(): Map[K, Long]

222

223

/** Sort by key */

224

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)

225

(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[(K, V)]

226

227

/** Get keys only */

228

def keys: RDD[K]

229

230

/** Get values only */

231

def values: RDD[V]

232

233

/** Subtract by key */

234

def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]

235

236

/** Save as Hadoop file */

237

def saveAsHadoopFile[F <: OutputFormat[K, V]](

238

path: String,

239

keyClass: Class[_],

240

valueClass: Class[_],

241

outputFormatClass: Class[F]

242

): Unit

243

244

/** Save as new Hadoop API file */

245

def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](

246

path: String,

247

keyClass: Class[_],

248

valueClass: Class[_],

249

outputFormatClass: Class[F]

250

): Unit

251

}

252

```

253

254

### DoubleRDDFunctions

255

256

Statistical operations available on RDDs of numeric values through implicit conversion.

257

258

```scala { .api }

259

/**

260

* Extra functions for RDDs of doubles

261

*/

262

class DoubleRDDFunctions(self: RDD[Double]) {

263

/** Compute mean */

264

def mean(): Double

265

266

/** Compute variance */

267

def variance(): Double

268

269

/** Compute standard deviation */

270

def stdev(): Double

271

272

/** Compute sum */

273

def sum(): Double

274

275

/** Compute statistics */

276

def stats(): StatCounter

277

278

/** Compute histogram */

279

def histogram(buckets: Int): (Array[Double], Array[Long])

280

def histogram(buckets: Array[Double]): Array[Long]

281

}

282

283

/**

284

* Statistics counter for numeric RDDs

285

*/

286

class StatCounter extends Serializable {

287

def count: Long

288

def mean: Double

289

def sum: Double

290

def min: Double

291

def max: Double

292

def variance: Double

293

def stdev: Double

294

}

295

```

296

297

### Specialized RDD Types

298

299

```scala { .api }

300

/** RDD from parallel collection */

301

class ParallelCollectionRDD[T: ClassTag](

302

@transient sc: SparkContext,

303

@transient data: Seq[T],

304

numSlices: Int

305

) extends RDD[T]

306

307

/** RDD from Hadoop InputFormat */

308

class HadoopRDD[K, V](

309

sc: SparkContext,

310

conf: JobConf,

311

inputFormatClass: Class[_ <: InputFormat[K, V]],

312

keyClass: Class[K],

313

valueClass: Class[V],

314

minPartitions: Int

315

) extends RDD[(K, V)]

316

317

/** RDD from new Hadoop API */

318

class NewHadoopRDD[K, V](

319

sc: SparkContext,

320

inputFormatClass: Class[_ <: NewInputFormat[K, V]],

321

keyClass: Class[K],

322

valueClass: Class[V],

323

conf: Configuration

324

) extends RDD[(K, V)]

325

326

/** RDD from JDBC */

327

class JdbcRDD[T: ClassTag](

328

sc: SparkContext,

329

getConnection: () => Connection,

330

sql: String,

331

lowerBound: Long,

332

upperBound: Long,

333

numPartitions: Int,

334

mapRow: ResultSet => T

335

) extends RDD[T]

336

337

/** Empty RDD */

338

class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T]

339

340

/** Union of multiple RDDs */

341

class UnionRDD[T: ClassTag](sc: SparkContext, rdds: Seq[RDD[T]]) extends RDD[T]

342

343

/** Coalesced RDD with fewer partitions */

344

class CoalescedRDD[T: ClassTag](

345

prev: RDD[T],

346

maxPartitions: Int,

347

shuffle: Boolean = false

348

) extends RDD[T]

349

350

/** Shuffled RDD */

351

class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](

352

prev: RDD[_ <: Product2[K, V]],

353

part: Partitioner

354

) extends RDD[(K, C)]

355

```

356

357

**Usage Examples:**

358

359

```scala

360

import org.apache.spark.{SparkContext, SparkConf}

361

362

val sc = new SparkContext(new SparkConf().setAppName("RDD Example"))

363

364

// Basic transformations

365

val numbers = sc.parallelize(1 to 100)

366

val squares = numbers.map(x => x * x)

367

val evens = numbers.filter(_ % 2 == 0)

368

369

// Pair RDD operations

370

val pairs = sc.parallelize(List(("a", 1), ("b", 2), ("a", 3)))

371

val grouped = pairs.groupByKey()

372

val sums = pairs.reduceByKey(_ + _)

373

374

// Statistical operations

375

val doubles = sc.parallelize(Array(1.0, 2.0, 3.0, 4.0, 5.0))

376

val avg = doubles.mean()

377

val stats = doubles.stats()

378

379

// Actions

380

val result = squares.take(10)

381

val total = numbers.reduce(_ + _)

382

squares.saveAsTextFile("output/squares")

383

384

sc.stop()

385

```

386

387

## Partitioning

388

389

RDDs maintain partitioning information to optimize distributed operations and minimize data shuffling.

390

391

```scala { .api }

392

trait Partition extends Serializable {

393

def index: Int

394

}

395

396

case class TaskContext(

397

stageId: Int,

398

stageAttemptNumber: Int,

399

partitionId: Int,

400

taskAttemptId: Long,

401

attemptNumber: Int,

402

taskMemoryManager: TaskMemoryManager,

403

localProperties: Properties,

404

metricsSystem: MetricsSystem

405

)

406

```