or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

broadcast-accumulators.mdcontext-configuration.mdindex.mdjava-api.mdkey-value-operations.mdrdd-operations.mdstatus-monitoring.mdstorage-persistence.mdtask-context.md

key-value-operations.mddocs/

0

# Key-Value Operations

1

2

Key-value operations in Spark are performed on RDDs of type `RDD[(K, V)]` where K is the key type and V is the value type. These operations are made available through implicit conversions to `PairRDDFunctions`, providing powerful aggregation, grouping, and join capabilities.

3

4

## PairRDDFunctions

5

6

```scala { .api }

7

class PairRDDFunctions[K, V](self: RDD[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) {

8

// Basic Key-Value Operations

9

def keys: RDD[K]

10

def values: RDD[V]

11

def mapValues[U](f: V => U): RDD[(K, U)]

12

def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]

13

def swapKV()(implicit vt: ClassTag[V], kt: ClassTag[K]): RDD[(V, K)]

14

15

// Grouping Operations

16

def groupByKey(): RDD[(K, Iterable[V])]

17

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

18

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]

19

20

// Reduction Operations

21

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

22

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

23

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

24

def reduceByKeyLocally(func: (V, V) => V): Map[K, V]

25

26

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

27

def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

28

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

29

30

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

31

def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

32

def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

33

34

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

35

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner): RDD[(K, C)]

36

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]

37

38

// Join Operations

39

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

40

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

41

def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]

42

43

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

44

def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]

45

def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]

46

47

def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]

48

def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]

49

def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]

50

51

def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]

52

def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]

53

def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]

54

55

// Cogroup Operations

56

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

57

def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]

58

def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]

59

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]

60

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]

61

def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]

62

def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]

63

64

// Sorting Operations

65

def sortByKey(ascending: Boolean = true): RDD[(K, V)]

66

def sortByKey(ascending: Boolean, numPartitions: Int): RDD[(K, V)]

67

def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]

68

69

// Partitioning Operations

70

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

71

72

// Collection Operations

73

def collectAsMap(): Map[K, V]

74

def countByKey(): Map[K, Long]

75

def countByKeyApprox(timeout: Long, confidence: Double = 0.95): PartialResult[Map[K, BoundedDouble]]

76

77

// Lookup Operations

78

def lookup(key: K): Seq[V]

79

80

// Subtraction Operations

81

def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]

82

def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]

83

def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]

84

85

// Sampling Operations

86

def sampleByKey(withReplacement: Boolean, fractions: Map[K, Double], seed: Long = Utils.random.nextLong): RDD[(K, V)]

87

def sampleByKeyExact(withReplacement: Boolean, fractions: Map[K, Double], seed: Long = Utils.random.nextLong): RDD[(K, V)]

88

}

89

```

90

91

## Basic Key-Value Operations

92

93

### Creating Key-Value RDDs

94

95

```scala

96

import org.apache.spark.{SparkContext, SparkConf}

97

98

val sc = new SparkContext(new SparkConf().setAppName("Key-Value Examples").setMaster("local[*]"))

99

100

// Create key-value RDD from collections

101

val pairs = sc.parallelize(Seq(

102

("apple", 5), ("banana", 3), ("apple", 2), ("orange", 8), ("banana", 1)

103

))

104

105

// Transform regular RDD to key-value RDD

106

val words = sc.parallelize(Seq("hello", "world", "hello", "spark", "world"))

107

val wordPairs = words.map(word => (word, 1))

108

109

// From text files

110

val lines = sc.textFile("access.log")

111

val urlCounts = lines.map { line =>

112

val parts = line.split(" ")

113

val url = parts(6) // Assuming URL is 7th field

114

(url, 1)

115

}

116

```

117

118

### Basic Operations

119

120

```scala

121

// Extract keys and values

122

val keys = pairs.keys.collect() // Array("apple", "banana", "apple", "orange", "banana")

123

val values = pairs.values.collect() // Array(5, 3, 2, 8, 1)

124

125

// Transform values while preserving keys

126

val discountedPrices = pairs.mapValues(_ * 0.9)

127

128

// Transform values to multiple values

129

val inventory = sc.parallelize(Seq(

130

("electronics", "laptop,phone,tablet"),

131

("books", "fiction,non-fiction,textbook")

132

))

133

val expandedInventory = inventory.flatMapValues(_.split(","))

134

135

// Swap keys and values

136

val swapped = pairs.swapKV()

137

```

138

139

## Aggregation Operations

140

141

### GroupByKey

142

143

Groups values by key. Use with caution for large datasets as it can cause memory issues.

144

145

```scala

146

// Group all values by key

147

val grouped = pairs.groupByKey()

148

// Result: ("apple", Iterable(5, 2)), ("banana", Iterable(3, 1)), ("orange", Iterable(8))

149

150

// Process grouped data

151

val processed = grouped.mapValues { values =>

152

val sum = values.sum

153

val count = values.size

154

val avg = sum.toDouble / count

155

(sum, count, avg)

156

}

157

```

158

159

### ReduceByKey

160

161

More efficient than groupByKey for aggregation as it performs local reduction.

162

163

```scala

164

// Sum values by key

165

val totals = pairs.reduceByKey(_ + _)

166

// Result: ("apple", 7), ("banana", 4), ("orange", 8)

167

168

// Find maximum by key

169

val maxValues = pairs.reduceByKey(math.max)

170

171

// Concatenate strings by key

172

val textData = sc.parallelize(Seq(

173

("user1", "hello"), ("user2", "hi"), ("user1", "world"), ("user2", "there")

174

))

175

val concatenated = textData.reduceByKey(_ + " " + _)

176

```

177

178

### FoldByKey

179

180

Like reduceByKey but with an initial zero value.

181

182

```scala

183

// Sum with initial value

184

val foldedSums = pairs.foldByKey(0)(_ + _)

185

186

// Concatenate with separator

187

val messages = sc.parallelize(Seq(

188

("error", "Connection failed"), ("error", "Timeout"), ("info", "Started"), ("info", "Completed")

189

))

190

val logMessages = messages.foldByKey("")((acc, msg) => if (acc.isEmpty) msg else acc + "; " + msg)

191

```

192

193

### AggregateByKey

194

195

Most flexible aggregation operation with different types for input and output.

196

197

```scala

198

// Calculate statistics (count, sum, sum of squares) for each key

199

val numbers = sc.parallelize(Seq(

200

("math", 85), ("math", 92), ("math", 78), ("science", 88), ("science", 95)

201

))

202

203

case class Stats(count: Int, sum: Double, sumSquares: Double) {

204

def mean = sum / count

205

def variance = (sumSquares / count) - (mean * mean)

206

}

207

208

val stats = numbers.aggregateByKey(Stats(0, 0.0, 0.0))(

209

seqOp = (stats, value) => Stats(

210

stats.count + 1,

211

stats.sum + value,

212

stats.sumSquares + value * value

213

),

214

combOp = (stats1, stats2) => Stats(

215

stats1.count + stats2.count,

216

stats1.sum + stats2.sum,

217

stats1.sumSquares + stats2.sumSquares

218

)

219

)

220

221

// Collect unique values per key

222

val data = sc.parallelize(Seq(

223

("A", 1), ("A", 2), ("A", 1), ("B", 3), ("B", 4), ("B", 3)

224

))

225

val uniqueValues = data.aggregateByKey(Set.empty[Int])(

226

seqOp = (set, value) => set + value,

227

combOp = (set1, set2) => set1 ++ set2

228

)

229

```

230

231

### CombineByKey

232

233

The most general aggregation function - other aggregation functions are implemented using this.

234

235

```scala

236

// Calculate average by key

237

val scores = sc.parallelize(Seq(

238

("Alice", 85), ("Bob", 92), ("Alice", 78), ("Bob", 88), ("Alice", 95)

239

))

240

241

val averages = scores.combineByKey(

242

createCombiner = (score: Int) => (score, 1), // (sum, count)

243

mergeValue = (acc: (Int, Int), score: Int) => (acc._1 + score, acc._2 + 1),

244

mergeCombiners = (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)

245

).mapValues { case (sum, count) => sum.toDouble / count }

246

```

247

248

## Join Operations

249

250

### Inner Join

251

252

Returns pairs where keys exist in both RDDs.

253

254

```scala

255

val customers = sc.parallelize(Seq(

256

(1, "Alice"), (2, "Bob"), (3, "Charlie"), (4, "David")

257

))

258

259

val orders = sc.parallelize(Seq(

260

(1, "laptop"), (2, "phone"), (1, "mouse"), (5, "tablet")

261

))

262

263

// Inner join - only matching keys

264

val customerOrders = customers.join(orders)

265

// Result: (1, ("Alice", "laptop")), (1, ("Alice", "mouse")), (2, ("Bob", "phone"))

266

```

267

268

### Outer Joins

269

270

```scala

271

// Left outer join - all keys from left RDD

272

val leftJoin = customers.leftOuterJoin(orders)

273

// Result includes: (4, ("David", None))

274

275

// Right outer join - all keys from right RDD

276

val rightJoin = customers.rightOuterJoin(orders)

277

// Result includes: (5, (None, "tablet"))

278

279

// Full outer join - all keys from both RDDs

280

val fullJoin = customers.fullOuterJoin(orders)

281

// Result includes both: (4, (Some("David"), None)) and (5, (None, Some("tablet")))

282

```

283

284

### Complex Join Example

285

286

```scala

287

// Multi-way joins

288

val products = sc.parallelize(Seq(

289

(1, "Laptop"), (2, "Phone"), (3, "Tablet")

290

))

291

292

val prices = sc.parallelize(Seq(

293

(1, 999.99), (2, 499.99), (3, 299.99)

294

))

295

296

val inventory = sc.parallelize(Seq(

297

(1, 50), (2, 100), (3, 25)

298

))

299

300

// Chain joins for comprehensive product information

301

val productInfo = products

302

.join(prices)

303

.join(inventory)

304

.map { case (id, ((name, price), stock)) =>

305

(id, name, price, stock)

306

}

307

```

308

309

## Cogroup Operations

310

311

Cogroup groups data from multiple RDDs by key.

312

313

```scala

314

val rdd1 = sc.parallelize(Seq((1, "a"), (1, "b"), (2, "c")))

315

val rdd2 = sc.parallelize(Seq((1, "x"), (2, "y"), (2, "z"), (3, "w")))

316

317

// Cogroup two RDDs

318

val cogrouped = rdd1.cogroup(rdd2)

319

// Result: (1, (Iterable("a", "b"), Iterable("x")))

320

// (2, (Iterable("c"), Iterable("y", "z")))

321

// (3, (Iterable(), Iterable("w")))

322

323

// Process cogrouped data

324

val processed = cogrouped.mapValues { case (iter1, iter2) =>

325

val list1 = iter1.toList

326

val list2 = iter2.toList

327

(list1.size, list2.size, list1 ++ list2)

328

}

329

330

// Three-way cogroup

331

val rdd3 = sc.parallelize(Seq((1, "p"), (3, "q")))

332

val threeway = rdd1.cogroup(rdd2, rdd3)

333

```

334

335

## Sorting and Partitioning

336

337

### SortByKey

338

339

```scala

340

val unsorted = sc.parallelize(Seq(

341

("banana", 3), ("apple", 5), ("orange", 1), ("apple", 2)

342

))

343

344

// Sort by key ascending (default)

345

val sortedAsc = unsorted.sortByKey()

346

347

// Sort by key descending

348

val sortedDesc = unsorted.sortByKey(ascending = false)

349

350

// Sort with custom number of partitions

351

val sortedPartitioned = unsorted.sortByKey(ascending = true, numPartitions = 4)

352

```

353

354

### Custom Partitioning

355

356

```scala

357

import org.apache.spark.{HashPartitioner, RangePartitioner}

358

359

// Hash partitioning

360

val hashPartitioned = pairs.partitionBy(new HashPartitioner(4))

361

362

// Range partitioning (for sorted data)

363

val rangePartitioned = pairs.partitionBy(new RangePartitioner(4, pairs))

364

365

// Repartition and sort within partitions (more efficient than sortByKey)

366

val repartitionedAndSorted = pairs.repartitionAndSortWithinPartitions(new HashPartitioner(4))

367

368

// Custom partitioner

369

class DomainPartitioner(numPartitions: Int) extends org.apache.spark.Partitioner {

370

def numPartitions: Int = numPartitions

371

def getPartition(key: Any): Int = {

372

key.toString.hashCode % numPartitions match {

373

case partition if partition < 0 => partition + numPartitions

374

case partition => partition

375

}

376

}

377

}

378

379

val customPartitioned = pairs.partitionBy(new DomainPartitioner(8))

380

```

381

382

## Collection and Lookup Operations

383

384

```scala

385

// Collect as map (for small datasets)

386

val asMap = pairs.collectAsMap()

387

388

// Count by key

389

val keyCounts = pairs.countByKey()

390

391

// Lookup values for specific key

392

val appleValues = pairs.lookup("apple") // Seq(5, 2)

393

394

// Approximate count by key

395

val approxCounts = pairs.countByKeyApprox(timeout = 1000L)

396

```

397

398

## Advanced Patterns

399

400

### Window Operations

401

402

```scala

403

// Time-based window operations (assuming timestamp keys)

404

val timestampData = sc.parallelize(Seq(

405

(100L, "event1"), (150L, "event2"), (200L, "event3"), (250L, "event4")

406

))

407

408

// Group by time windows (e.g., 100ms windows)

409

val windowed = timestampData.map { case (timestamp, event) =>

410

val window = (timestamp / 100) * 100

411

(window, event)

412

}.groupByKey()

413

```

414

415

### Top-K by Key

416

417

```scala

418

val keyValueScores = sc.parallelize(Seq(

419

("user1", 95), ("user1", 87), ("user1", 92), ("user1", 78),

420

("user2", 88), ("user2", 91), ("user2", 85)

421

))

422

423

// Get top 2 scores per user

424

val topScores = keyValueScores

425

.groupByKey()

426

.mapValues(scores => scores.toSeq.sorted(Ordering.Int.reverse).take(2))

427

```

428

429

### Efficient Large Joins

430

431

```scala

432

// For large datasets, consider pre-partitioning both RDDs

433

val partitioner = new HashPartitioner(100)

434

435

val largeRDD1 = customers.partitionBy(partitioner).persist()

436

val largeRDD2 = orders.partitionBy(partitioner).persist()

437

438

// Join will be more efficient as data is co-located

439

val efficientJoin = largeRDD1.join(largeRDD2)

440

```

441

442

### Sampling Operations

443

444

```scala

445

// Stratified sampling by key

446

val userData = sc.parallelize(Seq(

447

("premium", "user1"), ("premium", "user2"), ("premium", "user3"),

448

("basic", "user4"), ("basic", "user5"), ("basic", "user6"), ("basic", "user7")

449

))

450

451

// Sample different fractions by key

452

val sampleFractions = Map("premium" -> 0.8, "basic" -> 0.3)

453

454

// Approximate sampling

455

val approxSample = userData.sampleByKey(withReplacement = false, sampleFractions, seed = 42)

456

457

// Exact sampling (guarantees exact sample sizes)

458

val exactSample = userData.sampleByKeyExact(withReplacement = false, sampleFractions, seed = 42)

459

```

460

461

### Broadcast Hash Join Pattern

462

463

```scala

464

// When one RDD is small, broadcast it for efficient joins

465

val smallLookupTable = Map(1 -> "Category A", 2 -> "Category B", 3 -> "Category C")

466

val broadcastLookup = sc.broadcast(smallLookupTable)

467

468

val enrichedData = largeRDD.map { case (id, data) =>

469

val category = broadcastLookup.value.getOrElse(id, "Unknown")

470

(id, data, category)

471

}

472

```