or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

broadcast-accumulators.mdcontext-management.mdindex.mdjava-api.mdpair-rdd-operations.mdrdd-operations.mdstorage-persistence.md

pair-rdd-operations.mddocs/

0

# Pair RDD Operations

1

2

Advanced operations for key-value pair RDDs including grouping, joining, and aggregation operations essential for data processing workflows.

3

4

## Capabilities

5

6

### PairRDDFunctions

7

8

Extra operations available on RDDs of key-value pairs through implicit conversion from RDD[(K, V)].

9

10

```scala { .api }

11

/**

12

* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.

13

* These functions are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]).

14

*/

15

class PairRDDFunctions[K, V](self: RDD[(K, V)])

16

(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)

17

extends Logging with Serializable {

18

19

// GROUPING OPERATIONS

20

21

/** Group values by key */

22

def groupByKey(): RDD[(K, Iterable[V])]

23

24

/** Group values by key with custom number of partitions */

25

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]

26

27

/** Group values by key with custom partitioner */

28

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

29

30

// REDUCTION OPERATIONS

31

32

/** Combine values with the same key using a function */

33

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

34

35

/** Combine values with the same key using function and custom partitions */

36

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

37

38

/** Combine values with the same key using function and custom partitioner */

39

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

40

41

/** Fold values by key with zero value */

42

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

43

44

/** Fold values by key with zero value and custom partitioner */

45

def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

46

47

/** Aggregate values by key using different types for intermediate and final results */

48

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

49

50

/** Aggregate values by key with custom partitioner */

51

def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

52

53

/** Generic function to combine values by key using combiners */

54

def combineByKey[C](createCombiner: V => C,

55

mergeValue: (C, V) => C,

56

mergeCombiners: (C, C) => C): RDD[(K, C)]

57

58

/** Generic function to combine values by key with custom partitioner */

59

def combineByKey[C](createCombiner: V => C,

60

mergeValue: (C, V) => C,

61

mergeCombiners: (C, C) => C,

62

partitioner: Partitioner): RDD[(K, C)]

63

64

// JOIN OPERATIONS

65

66

/** Inner join with another RDD */

67

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

68

69

/** Inner join with custom partitioner */

70

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

71

72

/** Left outer join with another RDD */

73

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

74

75

/** Left outer join with custom partitioner */

76

def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]

77

78

/** Right outer join with another RDD */

79

def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]

80

81

/** Right outer join with custom partitioner */

82

def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]

83

84

/** Full outer join with another RDD */

85

def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]

86

87

/** Full outer join with custom partitioner */

88

def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]

89

90

/** Cogroup with another RDD (group by key across both RDDs) */

91

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

92

93

/** Cogroup with custom partitioner */

94

def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]

95

96

/** Cogroup with two other RDDs */

97

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]

98

99

// SET OPERATIONS

100

101

/** Subtract by key (remove elements with keys present in other RDD) */

102

def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]

103

104

/** Subtract by key with custom partitioner */

105

def subtractByKey[W: ClassTag](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, V)]

106

107

// SORTING AND PARTITIONING

108

109

/** Sort RDD by key */

110

def sortByKey(ascending: Boolean = true): RDD[(K, V)]

111

112

/** Sort RDD by key with custom number of partitions */

113

def sortByKey(ascending: Boolean, numPartitions: Int): RDD[(K, V)]

114

115

/** Partition RDD by key using specified partitioner */

116

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

117

118

// EXTRACTION OPERATIONS

119

120

/** Get RDD of keys only */

121

def keys: RDD[K]

122

123

/** Get RDD of values only */

124

def values: RDD[V]

125

126

/** Transform only the values (keep keys unchanged) */

127

def mapValues[U](f: V => U): RDD[(K, U)]

128

129

/** FlatMap only the values (keep keys unchanged) */

130

def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]

131

132

// ACTIONS

133

134

/** Count number of elements for each key */

135

def countByKey(): Map[K, Long]

136

137

/** Collect as a map (key -> single value, not suitable for duplicate keys) */

138

def collectAsMap(): Map[K, V]

139

140

/** Look up values for a specific key */

141

def lookup(key: K): Seq[V]

142

143

// HADOOP OUTPUT OPERATIONS

144

145

/** Save as Hadoop file using old MapReduce API */

146

def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String,

147

keyClass: Class[_],

148

valueClass: Class[_],

149

outputFormatClass: Class[F],

150

conf: JobConf = new JobConf,

151

codec: Option[Class[_ <: CompressionCodec]] = None): Unit

152

153

/** Save as Hadoop file using new MapReduce API */

154

def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String,

155

keyClass: Class[_],

156

valueClass: Class[_],

157

outputFormatClass: Class[F],

158

conf: Configuration = new Configuration): Unit

159

160

/** Save as Hadoop dataset using old API */

161

def saveAsHadoopDataset(conf: JobConf): Unit

162

163

/** Save as Hadoop dataset using new API */

164

def saveAsNewAPIHadoopDataset(conf: Configuration): Unit

165

}

166

```

167

168

**Usage Examples:**

169

170

```scala

171

import org.apache.spark.{SparkContext, SparkConf}

172

import org.apache.spark.HashPartitioner

173

174

val sc = new SparkContext(new SparkConf().setAppName("Pair RDD Examples").setMaster("local[*]"))

175

176

// Create pair RDDs

177

val scores = sc.parallelize(Array(

178

("Alice", 85), ("Bob", 90), ("Alice", 92), ("Charlie", 78), ("Bob", 95)

179

))

180

181

val grades = sc.parallelize(Array(

182

("Alice", "A"), ("Bob", "A+"), ("Charlie", "B"), ("David", "C")

183

))

184

185

// Grouping operations

186

val groupedScores = scores.groupByKey() // RDD[(String, Iterable[Int])]

187

// Result: ("Alice", [85, 92]), ("Bob", [90, 95]), ("Charlie", [78])

188

189

val avgScores = scores.reduceByKey(_ + _).mapValues(_ / 2) // Average scores

190

// Result: ("Alice", 88.5), ("Bob", 92.5), ("Charlie", 78)

191

192

val maxScores = scores.reduceByKey(math.max) // Maximum score per student

193

// Result: ("Alice", 92), ("Bob", 95), ("Charlie", 78)

194

195

// Aggregation with different types

196

val stats = scores.aggregateByKey((0, 0))(

197

seqOp = { case ((sum, count), score) => (sum + score, count + 1) },

198

combOp = { case ((sum1, count1), (sum2, count2)) => (sum1 + sum2, count1 + count2) }

199

).mapValues { case (sum, count) => sum.toDouble / count }

200

201

// Join operations

202

val joined = scores.join(grades) // Inner join

203

// Result: ("Alice", (85, "A")), ("Alice", (92, "A")), ("Bob", (90, "A+")), ("Bob", (95, "A+")), ("Charlie", (78, "B"))

204

205

val leftJoin = scores.leftOuterJoin(grades) // Left outer join

206

// Includes all scores, grades as Option[String]

207

208

val fullJoin = scores.fullOuterJoin(grades) // Full outer join

209

// Includes all keys from both RDDs

210

211

// Cogroup (group by key across multiple RDDs)

212

val cogrouped = scores.cogroup(grades)

213

// Result: ("Alice", ([85, 92], ["A"])), ("Bob", ([90, 95], ["A+"])), etc.

214

215

// Set operations

216

val students = sc.parallelize(Array(("Alice", 1), ("Bob", 2), ("Charlie", 3)))

217

val withdrawn = sc.parallelize(Array(("Bob", "withdrawn"), ("David", "withdrawn")))

218

val activeStudents = students.subtractByKey(withdrawn)

219

// Result: ("Alice", 1), ("Charlie", 3)

220

221

// Sorting

222

val sortedScores = scores.sortByKey() // Sort by student name

223

val sortedByScore = scores.map(_.swap).sortByKey(false).map(_.swap) // Sort by score descending

224

225

// Key/value extraction

226

val studentNames = scores.keys.distinct() // RDD[String] of unique student names

227

val allScores = scores.values // RDD[Int] of all scores

228

val upperCaseNames = scores.mapValues(score => s"Score: $score") // Transform values only

229

230

// Actions

231

val countsByStudent = scores.countByKey() // Map[String, Long]

232

val scoreMap = scores.collectAsMap() // Map[String, Int] - warning: loses duplicates!

233

val aliceScores = scores.lookup("Alice") // Seq[Int] = List(85, 92)

234

235

sc.stop()

236

```

237

238

### Advanced Aggregation with combineByKey

239

240

The `combineByKey` function is the most general aggregation function for pair RDDs.

241

242

```scala { .api }

243

/**

244

* Generic function to combine values by key using three functions:

245

* @param createCombiner function to create initial combiner from first value

246

* @param mergeValue function to merge a value into an existing combiner

247

* @param mergeCombiners function to merge two combiners

248

*/

249

def combineByKey[C](createCombiner: V => C,

250

mergeValue: (C, V) => C,

251

mergeCombiners: (C, C) => C): RDD[(K, C)]

252

```

253

254

**Usage Example:**

255

256

```scala

257

// Compute mean score per student using combineByKey

258

val meanScores = scores.combineByKey(

259

createCombiner = (score: Int) => (score, 1), // (sum, count)

260

mergeValue = (acc: (Int, Int), score: Int) => (acc._1 + score, acc._2 + 1),

261

mergeCombiners = (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)

262

).mapValues { case (sum, count) => sum.toDouble / count }

263

264

// Collect all scores per student into a list (more efficient than groupByKey)

265

val scoresPerStudent = scores.combineByKey(

266

createCombiner = (score: Int) => List(score),

267

mergeValue = (acc: List[Int], score: Int) => score :: acc,

268

mergeCombiners = (acc1: List[Int], acc2: List[Int]) => acc1 ::: acc2

269

)

270

```

271

272

### Partitioning for Performance

273

274

Understanding and controlling partitioning is crucial for performance in pair RDD operations.

275

276

```scala { .api }

277

/**

278

* Hash partitioner that distributes keys using hash function

279

*/

280

class HashPartitioner(partitions: Int) extends Partitioner {

281

def numPartitions: Int = partitions

282

def getPartition(key: Any): Int

283

}

284

285

/**

286

* Range partitioner that distributes keys by ranges

287

*/

288

class RangePartitioner[K : Ordering : ClassTag, V](

289

partitions: Int,

290

rdd: RDD[_ <: Product2[K, V]],

291

ascending: Boolean = true) extends Partitioner {

292

def numPartitions: Int = partitions

293

def getPartition(key: Any): Int

294

}

295

```

296

297

**Usage Examples:**

298

299

```scala

300

// Partition data for better join performance

301

val partitioner = new HashPartitioner(4)

302

val partitionedScores = scores.partitionBy(partitioner).persist()

303

val partitionedGrades = grades.partitionBy(partitioner).persist()

304

305

// Now joins will be much faster as data is co-located

306

val efficientJoin = partitionedScores.join(partitionedGrades)

307

308

// Range partitioner for sorted data

309

val sortedData = scores.sortByKey() // Uses RangePartitioner internally

310

```

311

312

### Join Performance Optimizations

313

314

Different join strategies for different data size scenarios:

315

316

```scala

317

// Broadcast join for small datasets (automatically optimized)

318

val smallGrades = grades.filter(_._2 == "A+") // Small RDD

319

val broadcastJoin = scores.join(smallGrades) // Spark may broadcast small RDD

320

321

// Manual broadcast for very small lookup tables

322

val gradeMap = sc.broadcast(grades.collectAsMap())

323

val manualBroadcastJoin = scores.map { case (student, score) =>

324

val grade = gradeMap.value.get(student)

325

(student, (score, grade))

326

}

327

328

// Bucket join for large datasets with known partitioning

329

val bucketedScores = scores.partitionBy(new HashPartitioner(10)).persist()

330

val bucketedGrades = grades.partitionBy(new HashPartitioner(10)).persist()

331

val bucketJoin = bucketedScores.join(bucketedGrades) // No shuffle needed

332

```

333

334

## Performance Considerations

335

336

### Choosing the Right Operation

337

- **`reduceByKey`** vs **`groupByKey`**: Always prefer `reduceByKey` when possible as it reduces data before shuffling

338

- **`aggregateByKey`** vs **`combineByKey`**: Use `aggregateByKey` for simpler cases, `combineByKey` for complex transformations

339

- **`cogroup`** vs multiple **joins**: Use `cogroup` when you need to join multiple RDDs by the same key

340

341

### Partitioning Strategy

342

- Use consistent partitioners across RDDs that will be joined

343

- Persist partitioned RDDs when they'll be reused

344

- Consider the number of unique keys when choosing partition count

345

- Use `RangePartitioner` for sorted operations, `HashPartitioner` for general use

346

347

### Memory Management

348

- Be careful with `groupByKey` on high-cardinality keys (can cause OOM)

349

- Use `mapValues` and `flatMapValues` to avoid shuffling keys

350

- Consider using `aggregateByKey` with smaller intermediate types

351

352

### Common Anti-patterns

353

- Using `groupByKey().mapValues(_.sum)` instead of `reduceByKey(_ + _)`

354

- Not partitioning RDDs that will be joined multiple times

355

- Using `collectAsMap()` on RDDs with duplicate keys (data loss)

356

- Performing multiple joins without consistent partitioning