or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

caching-persistence.mdcore-rdd.mddata-sources.mdgraphx.mdindex.mdjava-api.mdkey-value-operations.mdmllib.mdpython-api.mdspark-context.mdsql.mdstreaming.md

key-value-operations.mddocs/

0

# Key-Value Operations

1

2

When working with RDDs of (key, value) pairs, Spark provides specialized operations through implicit conversions. These operations are available when you import `org.apache.spark.SparkContext._` and work with RDDs of type `RDD[(K, V)]`.

3

4

## PairRDDFunctions

5

6

```scala { .api }

7

class PairRDDFunctions[K, V](self: RDD[(K, V)]) extends Logging with Serializable {

8

// Available through implicit conversion when importing org.apache.spark.SparkContext._

9

}

10

```

11

12

## Setup and Imports

13

14

```scala { .api }

15

import org.apache.spark.SparkContext._ // Essential for PairRDDFunctions

16

import org.apache.spark.rdd.RDD

17

```

18

19

```scala

20

// Creating pair RDDs

21

val pairs: RDD[(String, Int)] = sc.parallelize(Seq(

22

("apple", 5), ("banana", 3), ("apple", 2), ("orange", 1)

23

))

24

25

val wordCounts: RDD[(String, Int)] = sc.textFile("file.txt")

26

.flatMap(_.split(" "))

27

.map(word => (word, 1))

28

```

29

30

## Aggregation Operations

31

32

### combineByKey

33

34

The most general aggregation function - all other aggregation operations are built on top of this:

35

36

```scala { .api }

37

def combineByKey[C](

38

createCombiner: V => C,

39

mergeValue: (C, V) => C,

40

mergeCombiners: (C, C) => C,

41

partitioner: Partitioner,

42

mapSideCombine: Boolean = true,

43

serializer: Serializer = null

44

): RDD[(K, C)]

45

46

// Convenience methods with default partitioner

47

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

48

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]

49

```

50

51

```scala

52

// Calculate average per key using combineByKey

53

val data = sc.parallelize(Seq(("math", 85), ("english", 90), ("math", 92), ("english", 88)))

54

55

val averages = data.combineByKey(

56

(score: Int) => (score, 1), // createCombiner: V => (sum, count)

57

(acc: (Int, Int), score: Int) => (acc._1 + score, acc._2 + 1), // mergeValue

58

(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // mergeCombiners

59

).mapValues { case (sum, count) => sum.toDouble / count }

60

61

// Result: [("math", 88.5), ("english", 89.0)]

62

```

63

64

### reduceByKey

65

66

Combine values with the same key using an associative function:

67

68

```scala { .api }

69

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

70

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

71

def reduceByKey(numPartitions: Int, func: (V, V) => V): RDD[(K, V)]

72

```

73

74

```scala

75

val wordCounts = sc.textFile("document.txt")

76

.flatMap(_.split(" "))

77

.map(word => (word, 1))

78

.reduceByKey(_ + _) // Sum counts for each word

79

80

// With custom partitioner

81

val counts = data.reduceByKey(new HashPartitioner(4), _ + _)

82

```

83

84

### reduceByKeyLocally

85

86

Reduce by key and return results to driver as a Map:

87

88

```scala { .api }

89

def reduceByKeyLocally(func: (V, V) => V): Map[K, V]

90

```

91

92

```scala

93

val localCounts: Map[String, Int] = wordCounts.reduceByKeyLocally(_ + _)

94

// Returns a local Map instead of an RDD

95

```

96

97

### aggregateByKey

98

99

Aggregate values of each key with different result type:

100

101

```scala { .api }

102

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

103

def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

104

def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

105

```

106

107

```scala

108

// Calculate max, min, and count per key

109

val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 1), ("a", 2)))

110

111

val stats = data.aggregateByKey((Int.MinValue, Int.MaxValue, 0))(

112

// seqOp: combine value with accumulator

113

(acc, value) => (math.max(acc._1, value), math.min(acc._2, value), acc._3 + 1),

114

// combOp: combine accumulators

115

(acc1, acc2) => (math.max(acc1._1, acc2._1), math.min(acc1._2, acc2._2), acc1._3 + acc2._3)

116

)

117

// Result: [("a", (3, 1, 3)), ("b", (2, 1, 2))] - (max, min, count)

118

```

119

120

### foldByKey

121

122

Fold values for each key with a zero value:

123

124

```scala { .api }

125

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

126

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

127

def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

128

```

129

130

```scala

131

val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))

132

val sums = data.foldByKey(0)(_ + _)

133

// Result: [("a", 4), ("b", 2)]

134

```

135

136

### groupByKey

137

138

Group values by key (use with caution for large datasets):

139

140

```scala { .api }

141

def groupByKey(): RDD[(K, Iterable[V])]

142

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

143

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]

144

```

145

146

```scala

147

val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 4)))

148

val grouped = data.groupByKey()

149

// Result: [("a", [1, 3]), ("b", [2, 4])]

150

151

// Note: groupByKey can cause out-of-memory errors for keys with many values

152

// Consider using reduceByKey, aggregateByKey, or combineByKey instead

153

```

154

155

## Join Operations

156

157

### join (Inner Join)

158

159

```scala { .api }

160

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

161

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

162

def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]

163

```

164

165

```scala

166

val rdd1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))

167

val rdd2 = sc.parallelize(Seq(("a", "x"), ("b", "y"), ("d", "z")))

168

169

val joined = rdd1.join(rdd2)

170

// Result: [("a", (1, "x")), ("b", (2, "y"))]

171

// Note: "c" and "d" are excluded (inner join)

172

```

173

174

### leftOuterJoin

175

176

```scala { .api }

177

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

178

def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]

179

def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]

180

```

181

182

```scala

183

val leftJoined = rdd1.leftOuterJoin(rdd2)

184

// Result: [("a", (1, Some("x"))), ("b", (2, Some("y"))), ("c", (3, None))]

185

```

186

187

### rightOuterJoin

188

189

```scala { .api }

190

def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]

191

def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]

192

def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]

193

```

194

195

```scala

196

val rightJoined = rdd1.rightOuterJoin(rdd2)

197

// Result: [("a", (Some(1), "x")), ("b", (Some(2), "y")), ("d", (None, "z"))]

198

```

199

200

### fullOuterJoin

201

202

```scala { .api }

203

def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]

204

def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]

205

def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]

206

```

207

208

```scala

209

val fullJoined = rdd1.fullOuterJoin(rdd2)

210

// Result: [("a", (Some(1), Some("x"))), ("b", (Some(2), Some("y"))),

211

// ("c", (Some(3), None)), ("d", (None, Some("z")))]

212

```

213

214

## Cogroup Operations

215

216

### cogroup (Group Together)

217

218

```scala { .api }

219

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

220

def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]

221

def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]

222

223

// Multi-way cogroup

224

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]

225

def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]

226

```

227

228

```scala

229

val rdd1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 3)))

230

val rdd2 = sc.parallelize(Seq(("a", "x"), ("c", "y")))

231

232

val cogrouped = rdd1.cogroup(rdd2)

233

// Result: [("a", ([1, 2], ["x"])), ("b", ([3], [])), ("c", ([], ["y"]))]

234

235

// Three-way cogroup

236

val rdd3 = sc.parallelize(Seq(("a", 10.0), ("b", 20.0)))

237

val threeway = rdd1.cogroup(rdd2, rdd3)

238

```

239

240

## Key and Value Transformations

241

242

### keys and values

243

244

```scala { .api }

245

def keys: RDD[K] // Extract all keys

246

def values: RDD[V] // Extract all values

247

```

248

249

```scala

250

val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))

251

val allKeys = pairs.keys // RDD["a", "b", "a"]

252

val allValues = pairs.values // RDD[1, 2, 3]

253

```

254

255

### mapValues

256

257

Transform values while preserving keys and partitioning:

258

259

```scala { .api }

260

def mapValues[U](f: V => U): RDD[(K, U)]

261

```

262

263

```scala

264

val pairs = sc.parallelize(Seq(("alice", 25), ("bob", 30), ("charlie", 35)))

265

val incremented = pairs.mapValues(_ + 1)

266

// Result: [("alice", 26), ("bob", 31), ("charlie", 36)]

267

268

// mapValues preserves partitioning, unlike map

269

val transformed = pairs.mapValues(age => if (age >= 30) "adult" else "young")

270

```

271

272

### flatMapValues

273

274

FlatMap values while preserving keys:

275

276

```scala { .api }

277

def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]

278

```

279

280

```scala

281

val sentences = sc.parallelize(Seq(

282

("doc1", "hello world"),

283

("doc2", "spark scala")

284

))

285

286

val words = sentences.flatMapValues(_.split(" "))

287

// Result: [("doc1", "hello"), ("doc1", "world"), ("doc2", "spark"), ("doc2", "scala")]

288

```

289

290

## Partitioning and Sorting

291

292

### partitionBy

293

294

Partition RDD according to a partitioner:

295

296

```scala { .api }

297

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

298

```

299

300

```scala

301

import org.apache.spark.HashPartitioner

302

303

val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("d", 4)))

304

val partitioned = data.partitionBy(new HashPartitioner(2))

305

306

// Custom partitioner

307

class CustomPartitioner(numPartitions: Int) extends Partitioner {

308

def numPartitions: Int = numPartitions

309

def getPartition(key: Any): Int = key.hashCode() % numPartitions

310

}

311

```

312

313

### sortByKey

314

315

Sort by key:

316

317

```scala { .api }

318

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]

319

```

320

321

```scala

322

val data = sc.parallelize(Seq(("c", 3), ("a", 1), ("b", 2)))

323

val sorted = data.sortByKey() // Ascending: [("a", 1), ("b", 2), ("c", 3)]

324

val descending = data.sortByKey(ascending = false) // Descending: [("c", 3), ("b", 2), ("a", 1)]

325

```

326

327

### repartitionAndSortWithinPartitions

328

329

More efficient than separate repartition and sort:

330

331

```scala { .api }

332

def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]

333

```

334

335

```scala

336

val data = sc.parallelize(Seq(("c", 3), ("a", 1), ("b", 2), ("d", 4)))

337

val repartitioned = data.repartitionAndSortWithinPartitions(new HashPartitioner(2))

338

// Partitions data and sorts within each partition in one operation

339

```

340

341

## Set Operations

342

343

### subtractByKey

344

345

Return (key, value) pairs in this RDD but not in other:

346

347

```scala { .api }

348

def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]

349

def subtractByKey[W: ClassTag](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, V)]

350

def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]

351

```

352

353

```scala

354

val rdd1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))

355

val rdd2 = sc.parallelize(Seq(("a", "x"), ("c", "y")))

356

357

val subtracted = rdd1.subtractByKey(rdd2)

358

// Result: [("b", 2)] - only pairs whose keys don't exist in rdd2

359

```

360

361

## Statistical Operations

362

363

### countByKey

364

365

Count the number of elements for each key:

366

367

```scala { .api }

368

def countByKey(): Map[K, Long]

369

```

370

371

```scala

372

val data = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5)))

373

val counts = data.countByKey()

374

// Result: Map("a" -> 3, "b" -> 2)

375

```

376

377

### countByKeyApprox

378

379

Approximate count by key:

380

381

```scala { .api }

382

def countByKeyApprox(timeout: Long, confidence: Double = 0.95): PartialResult[Map[K, BoundedDouble]]

383

```

384

385

```scala

386

val largePairRDD = sc.parallelize(/* large dataset */)

387

val approxCounts = largePairRDD.countByKeyApprox(1000) // 1 second timeout

388

```

389

390

### collectAsMap

391

392

Return the key-value pairs as a Map (assumes unique keys):

393

394

```scala { .api }

395

def collectAsMap(): Map[K, V]

396

```

397

398

```scala

399

val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))

400

val map = pairs.collectAsMap()

401

// Result: Map("a" -> 1, "b" -> 2, "c" -> 3)

402

403

// Warning: If there are duplicate keys, only one value per key is kept

404

```

405

406

## Save Operations

407

408

### saveAsHadoopFile

409

410

Save as Hadoop file with various output formats:

411

412

```scala { .api }

413

def saveAsHadoopFile[F <: OutputFormat[K, V]](

414

path: String,

415

keyClass: Class[_],

416

valueClass: Class[_],

417

outputFormatClass: Class[F],

418

codec: Class[_ <: CompressionCodec]

419

): Unit

420

421

// Simplified versions

422

def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String): Unit

423

def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, codec: Class[_ <: CompressionCodec]): Unit

424

```

425

426

### saveAsNewAPIHadoopFile

427

428

Save with new Hadoop API:

429

430

```scala { .api }

431

def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](

432

path: String,

433

keyClass: Class[_],

434

valueClass: Class[_],

435

outputFormatClass: Class[F],

436

conf: Configuration = self.context.hadoopConfiguration

437

): Unit

438

```

439

440

### saveAsSequenceFile

441

442

Save as Hadoop SequenceFile:

443

444

```scala { .api }

445

def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit

446

```

447

448

```scala

449

import org.apache.hadoop.io.{IntWritable, Text}

450

451

// For writable types

452

val writablePairs: RDD[(IntWritable, Text)] = pairs.map {

453

case (k, v) => (new IntWritable(k), new Text(v.toString))

454

}

455

writablePairs.saveAsSequenceFile("hdfs://path/to/output")

456

```

457

458

### saveAsHadoopDataset

459

460

Save using Hadoop JobConf:

461

462

```scala { .api }

463

def saveAsHadoopDataset(conf: JobConf): Unit

464

```

465

466

```scala

467

import org.apache.hadoop.mapred.{JobConf, TextOutputFormat}

468

469

val jobConf = new JobConf()

470

jobConf.setOutputFormat(classOf[TextOutputFormat[String, Int]])

471

jobConf.setOutputKeyClass(classOf[String])

472

jobConf.setOutputValueClass(classOf[Int])

473

474

pairs.saveAsHadoopDataset(jobConf)

475

```

476

477

## Performance Considerations

478

479

1. **Prefer `reduceByKey` over `groupByKey`**: `reduceByKey` combines values locally before shuffling

480

2. **Use appropriate partitioners**: Custom partitioners can reduce shuffle overhead

481

3. **Consider `mapValues`**: Preserves partitioning, more efficient than `map`

482

4. **Avoid `collectAsMap` on large datasets**: Brings all data to driver

483

5. **Use `combineByKey` for complex aggregations**: Most flexible and efficient

484

485

```scala

486

// Efficient pattern

487

val wordCounts = textFile

488

.flatMap(_.split(" "))

489

.map((_, 1))

490

.reduceByKey(_ + _) // Combines locally before shuffle

491

492

// Less efficient pattern

493

val wordCountsSlow = textFile

494

.flatMap(_.split(" "))

495

.map((_, 1))

496

.groupByKey() // Shuffles all values

497

.mapValues(_.sum) // Then combines

498

```