or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

context-config.mdindex.mdjava-api.mdrdd-operations.mdresource-management.mdserialization.mdshared-variables.mdstorage-caching.mdtask-context.md

rdd-operations.mddocs/

0

# RDD Operations

1

2

## RDD Base Class

3

4

Resilient Distributed Dataset (RDD) is the fundamental abstraction in Apache Spark. RDDs are fault-tolerant collections of elements that can be operated on in parallel.

5

6

```scala { .api }

7

abstract class RDD[T: ClassTag](

8

@transient private var _sc: SparkContext,

9

@transient private var deps: Seq[Dependency[_]]

10

) {

11

// Transformations (Lazy)

12

def map[U: ClassTag](f: T => U): RDD[U]

13

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

14

def filter(f: T => Boolean): RDD[T]

15

def distinct(numPartitions: Int = partitions.length): RDD[T]

16

def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

17

def union(other: RDD[T]): RDD[T]

18

def intersection(other: RDD[T]): RDD[T]

19

def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]

20

def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])]

21

def pipe(command: String): RDD[String]

22

def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]

23

def repartition(numPartitions: Int): RDD[T]

24

def sortBy[K](f: T => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

25

def keyBy[K](f: T => K): RDD[(K, T)]

26

27

// Actions (Eager)

28

def collect(): Array[T]

29

def count(): Long

30

def first(): T

31

def take(num: Int): Array[T]

32

def top(num: Int)(implicit ord: Ordering[T]): Array[T]

33

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

34

def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]

35

def reduce(f: (T, T) => T): T

36

def fold(zeroValue: T)(op: (T, T) => T): T

37

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

38

def treeAggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U, depth: Int = 2): U

39

def foreach(f: T => Unit): Unit

40

def foreachPartition(f: Iterator[T] => Unit): Unit

41

42

// I/O Actions

43

def saveAsTextFile(path: String): Unit

44

def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit

45

def saveAsObjectFile(path: String): Unit

46

47

// Persistence

48

def persist(): RDD[T]

49

def persist(newLevel: StorageLevel): RDD[T]

50

def cache(): RDD[T]

51

def unpersist(blocking: Boolean = false): RDD[T]

52

def getStorageLevel: StorageLevel

53

def checkpoint(): Unit

54

def isCheckpointed: Boolean

55

def getCheckpointFile: Option[String]

56

57

// Metadata

58

def partitions: Array[Partition]

59

def partitioner: Option[Partitioner]

60

def getNumPartitions: Int

61

def dependencies: Seq[Dependency[_]]

62

def preferredLocations(split: Partition): Seq[String]

63

def context: SparkContext

64

def id: Int

65

def name: String

66

def setName(name: String): RDD[T]

67

}

68

```

69

70

## PairRDDFunctions

71

72

Operations available on RDDs of key-value pairs. These operations are available through implicit conversions.

73

74

```scala { .api }

75

class PairRDDFunctions[K, V](self: RDD[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) {

76

// Grouping Operations

77

def groupByKey(): RDD[(K, Iterable[V])]

78

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]

79

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

80

81

// Reduction Operations

82

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

83

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

84

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

85

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

86

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

87

def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

88

89

// Aggregation Operations

90

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

91

def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

92

def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

93

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

94

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]

95

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]

96

97

// Partitioning

98

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

99

100

// Join Operations

101

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

102

def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]

103

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

104

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

105

def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]

106

def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]

107

def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]

108

def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]

109

def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]

110

def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]

111

def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]

112

def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]

113

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

114

def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]

115

def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]

116

117

// Set Operations

118

def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]

119

def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]

120

def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]

121

122

// Lookups and Collection

123

def lookup(key: K): Seq[V]

124

def collectAsMap(): Map[K, V]

125

def countByKey(): Map[K, Long]

126

def countByKeyApprox(timeout: Long, confidence: Double = 0.95): PartialResult[Map[K, BoundedDouble]]

127

128

// Value Operations

129

def mapValues[U](f: V => U): RDD[(K, U)]

130

def flatMapValues[U](f: V => IterableOnce[U]): RDD[(K, U)]

131

def keys: RDD[K]

132

def values: RDD[V]

133

134

// Sorting

135

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]

136

def sortBy[B](f: ((K, V)) => B, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[B], ctag: ClassTag[B]): RDD[(K, V)]

137

138

// I/O Operations

139

def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[F], codec: Class[_ <: CompressionCodec]): Unit

140

def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[F], conf: JobConf = new JobConf(self.context.hadoopConfiguration), codec: Option[Class[_ <: CompressionCodec]] = None): Unit

141

def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[F], conf: Configuration = self.context.hadoopConfiguration): Unit

142

}

143

```

144

145

## DoubleRDDFunctions

146

147

Statistical operations available on RDDs of Double values.

148

149

```scala { .api }

150

class DoubleRDDFunctions(self: RDD[Double]) {

151

// Statistical Operations

152

def sum(): Double

153

def stats(): StatCounter

154

def mean(): Double

155

def variance(): Double

156

def stdev(): Double

157

def sampleStdev(): Double

158

def sampleVariance(): Double

159

160

// Histogram Operations

161

def histogram(buckets: Array[Double]): Array[Long]

162

def histogram(buckets: Int): (Array[Double], Array[Long])

163

}

164

```

165

166

## SequenceFileRDDFunctions

167

168

Operations for saving RDDs as Hadoop SequenceFiles.

169

170

```scala { .api }

171

class SequenceFileRDDFunctions[K, V](self: RDD[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V], keyWritableFactory: WritableFactory[K], valueWritableFactory: WritableFactory[V]) {

172

def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit

173

}

174

```

175

176

## Usage Examples

177

178

### Basic Transformations

179

```scala

180

val numbers = sc.parallelize(1 to 100)

181

182

// Map transformation

183

val squares = numbers.map(x => x * x)

184

185

// Filter transformation

186

val evenNumbers = numbers.filter(_ % 2 == 0)

187

188

// FlatMap transformation

189

val words = sc.parallelize(Array("hello world", "foo bar"))

190

val allWords = words.flatMap(_.split(" "))

191

```

192

193

### Key-Value Operations

194

```scala

195

val pairs = sc.parallelize(Array(("a", 1), ("b", 2), ("a", 3), ("b", 4)))

196

197

// Reduce by key

198

val sums = pairs.reduceByKey(_ + _) // ("a", 4), ("b", 6)

199

200

// Group by key

201

val grouped = pairs.groupByKey() // ("a", [1, 3]), ("b", [2, 4])

202

203

// Join operations

204

val other = sc.parallelize(Array(("a", "apple"), ("b", "banana")))

205

val joined = pairs.join(other) // ("a", (1, "apple")), ("a", (3, "apple")), etc.

206

```

207

208

### Actions

209

```scala

210

val data = sc.parallelize(1 to 100)

211

212

// Collect all data to driver

213

val collected = data.collect() // Array[Int]

214

215

// Count elements

216

val count = data.count() // 100

217

218

// Reduce

219

val sum = data.reduce(_ + _) // 5050

220

221

// Take first n elements

222

val first10 = data.take(10) // Array(1, 2, 3, ..., 10)

223

```

224

225

### Persistence

226

```scala

227

val expensiveRDD = data.map(complexComputation)

228

229

// Cache in memory for reuse

230

expensiveRDD.cache()

231

232

// Or specify storage level

233

expensiveRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)

234

235

// Use multiple times (computed only once due to caching)

236

val result1 = expensiveRDD.count()

237

val result2 = expensiveRDD.collect()

238

239

// Remove from cache when done

240

expensiveRDD.unpersist()

241

```

242

243

## Core Types

244

245

```scala { .api }

246

// Core Spark type for runtime type information

247

trait ClassTag[T] {

248

def runtimeClass: Class[_]

249

}

250

251

// RDD partition representation

252

trait Partition extends Serializable {

253

def index: Int

254

}

255

256

// Data partitioning strategy

257

abstract class Partitioner extends Serializable {

258

def numPartitions: Int

259

def getPartition(key: Any): Int

260

}

261

262

// RDD dependency representation

263

abstract class Dependency[T] extends Serializable

264

265

// Serialization framework

266

abstract class Serializer {

267

def newInstance(): SerializerInstance

268

}

269

270

abstract class SerializerInstance {

271

def serialize[T: ClassTag](t: T): ByteBuffer

272

def deserialize[T: ClassTag](bytes: ByteBuffer): T

273

}

274

```