or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdio-operations.mdkey-value-operations.mdpartitioning-shuffling.mdrdd-operations.mdshared-variables.mdspark-context.mdstorage-persistence.md

key-value-operations.mddocs/

0

# Key-Value Operations

1

2

Specialized operations for RDDs containing key-value pairs, providing powerful data processing capabilities including joins, grouping, and aggregation operations.

3

4

## Capabilities

5

6

### PairRDDFunctions

7

8

Extra functions available on RDDs of (key, value) pairs through implicit conversion.

9

10

```scala { .api }

11

/**

12

* Extra functions available on RDDs of (key, value) pairs through an implicit conversion

13

*/

14

class PairRDDFunctions[K, V](self: RDD[(K, V)]) {

15

// Grouping operations

16

def groupByKey(): RDD[(K, Iterable[V])]

17

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]

18

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

19

20

// Reduction operations

21

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

22

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

23

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

24

def reduceByKeyLocally(func: (V, V) => V): Map[K, V]

25

26

// Aggregation operations

27

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

28

def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

29

def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

30

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

31

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

32

def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

33

34

// Combine operations

35

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

36

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]

37

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner): RDD[(K, C)]

38

39

// Join operations

40

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

41

def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]

42

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

43

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

44

def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]

45

def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]

46

def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]

47

def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]

48

def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]

49

def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]

50

def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]

51

def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]

52

53

// CoGroup operations

54

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

55

def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]

56

def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]

57

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]

58

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]

59

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]

60

61

// Set operations

62

def subtractByKey[W](other: RDD[(K, W)]): RDD[(K, V)]

63

def subtractByKey[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]

64

def subtractByKey[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, V)]

65

66

// Partitioning operations

67

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

68

69

// Utility operations

70

def keys: RDD[K]

71

def values: RDD[V]

72

def mapValues[U](f: V => U): RDD[(K, U)]

73

def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]

74

def collectAsMap(): Map[K, V]

75

def countByKey(): Map[K, Long]

76

def countByKeyApprox(timeout: Long, confidence: Double = 0.95): PartialResult[Map[K, BoundedDouble]]

77

78

// Lookup operations

79

def lookup(key: K): Seq[V]

80

}

81

```

82

83

**Usage Examples:**

84

85

```scala

86

import org.apache.spark.{SparkContext, SparkConf}

87

88

val sc = new SparkContext(new SparkConf().setAppName("KeyValue Examples").setMaster("local[*]"))

89

90

// Create key-value RDDs

91

val pairs1 = sc.parallelize(Array(("a", 1), ("b", 2), ("a", 3), ("c", 4)))

92

val pairs2 = sc.parallelize(Array(("a", "x"), ("b", "y"), ("d", "z")))

93

94

// Grouping operations

95

val grouped = pairs1.groupByKey()

96

// Result: ("a", [1, 3]), ("b", [2]), ("c", [4])

97

98

// Reduction operations

99

val sumByKey = pairs1.reduceByKey(_ + _)

100

// Result: ("a", 4), ("b", 2), ("c", 4)

101

102

val counts = sc.textFile("hdfs://input.txt")

103

.flatMap(_.split(" "))

104

.map((_, 1))

105

.reduceByKey(_ + _)

106

107

// Aggregation operations

108

val avgByKey = pairs1.aggregateByKey((0, 0))(

109

(acc, value) => (acc._1 + value, acc._2 + 1),

110

(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)

111

).mapValues(acc => acc._1.toDouble / acc._2)

112

113

// Join operations

114

val innerJoin = pairs1.join(pairs2)

115

// Result: ("a", (1, "x")), ("a", (3, "x")), ("b", (2, "y"))

116

117

val leftJoin = pairs1.leftOuterJoin(pairs2)

118

// Result: ("a", (1, Some("x"))), ("a", (3, Some("x"))), ("b", (2, Some("y"))), ("c", (4, None))

119

120

val rightJoin = pairs1.rightOuterJoin(pairs2)

121

val fullJoin = pairs1.fullOuterJoin(pairs2)

122

123

// Utility operations

124

val keys = pairs1.keys // RDD["a", "b", "a", "c"]

125

val values = pairs1.values // RDD[1, 2, 3, 4]

126

val doubled = pairs1.mapValues(_ * 2) // ("a", 2), ("b", 4), ("a", 6), ("c", 8)

127

128

// Collection operations

129

val asMap = pairs1.collectAsMap() // Map("a" -> 3, "b" -> 2, "c" -> 4) - note: may lose duplicates

130

val keyCounts = pairs1.countByKey() // Map("a" -> 2, "b" -> 1, "c" -> 1)

131

132

// Lookup specific key

133

val aValues = pairs1.lookup("a") // Seq(1, 3)

134

```

135

136

### Sorting Operations (OrderedRDDFunctions)

137

138

Additional functions available on RDDs where the key is sortable.

139

140

```scala { .api }

141

/**

142

* Extra functions available on RDDs where the key is sortable

143

*/

144

class OrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag, P <: Product2[K, V] : ClassTag](

145

self: RDD[P]) {

146

147

/**

148

* Sort the RDD by key, with each partition containing a sorted range of elements

149

*/

150

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]

151

152

/**

153

* Repartition the RDD according to the given partitioner and sort records by their keys within each partition

154

*/

155

def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]

156

157

/**

158

* Return the key-value pairs in this RDD to the master as a Map

159

*/

160

def collectAsMap(): Map[K, V]

161

}

162

```

163

164

**Usage Examples:**

165

166

```scala

167

val wordCounts = sc.textFile("hdfs://input.txt")

168

.flatMap(_.split(" "))

169

.map((_, 1))

170

.reduceByKey(_ + _)

171

172

// Sort by key (alphabetically)

173

val sortedByWord = wordCounts.sortByKey(ascending = true)

174

175

// Sort by value (frequency) - need to swap key-value

176

val sortedByCount = wordCounts

177

.map(_.swap) // (count, word)

178

.sortByKey(ascending = false) // sort by count descending

179

.map(_.swap) // back to (word, count)

180

181

// Efficient repartition and sort

182

import org.apache.spark.HashPartitioner

183

val partitioned = wordCounts.repartitionAndSortWithinPartitions(new HashPartitioner(4))

184

```

185

186

### SequenceFile Operations

187

188

Functions for saving RDDs as Hadoop SequenceFiles.

189

190

```scala { .api }

191

/**

192

* Extra functions for saving RDDs as Hadoop SequenceFiles

193

*/

194

class SequenceFileRDDFunctions[K, V](self: RDD[(K, V)]) {

195

/**

196

* Output the RDD to any Hadoop-supported file system, using a Hadoop JobConf object for configuration

197

*/

198

def saveAsHadoopFile[F <: OutputFormat[K, V]](

199

path: String,

200

keyClass: Class[K],

201

valueClass: Class[V],

202

outputFormatClass: Class[F],

203

conf: JobConf = new JobConf(self.context.hadoopConfiguration),

204

codec: Option[Class[_ <: CompressionCodec]] = None): Unit

205

206

/**

207

* Output the RDD to any Hadoop-supported file system, using the new Hadoop API

208

*/

209

def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](

210

path: String,

211

keyClass: Class[K],

212

valueClass: Class[V],

213

outputFormatClass: Class[F],

214

conf: Configuration = self.context.hadoopConfiguration): Unit

215

216

/**

217

* Output the RDD as a Hadoop SequenceFile

218

*/

219

def saveAsSequenceFile(

220

path: String,

221

codec: Option[Class[_ <: CompressionCodec]] = None): Unit

222

}

223

```

224

225

## Advanced Key-Value Patterns

226

227

### Complex Aggregations

228

229

```scala

230

case class Sale(product: String, amount: Double, quantity: Int)

231

232

val sales = sc.parallelize(Array(

233

Sale("laptop", 999.99, 1),

234

Sale("mouse", 29.99, 5),

235

Sale("laptop", 1199.99, 2),

236

Sale("mouse", 29.99, 3)

237

))

238

239

val salesByProduct = sales

240

.map(sale => (sale.product, sale))

241

.aggregateByKey((0.0, 0))(

242

// Sequence operation: aggregate within partition

243

(acc, sale) => (acc._1 + sale.amount * sale.quantity, acc._2 + sale.quantity),

244

// Combine operation: combine across partitions

245

(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)

246

)

247

.mapValues { case (totalRevenue, totalQuantity) =>

248

(totalRevenue, totalQuantity, if (totalQuantity > 0) totalRevenue / totalQuantity else 0.0)

249

}

250

251

// Result: (product, (totalRevenue, totalQuantity, avgPricePerUnit))

252

```

253

254

### Efficient Joins with Broadcasting

255

256

```scala

257

import org.apache.spark.broadcast.Broadcast

258

259

// Large dataset

260

val transactions = sc.textFile("hdfs://transactions.txt")

261

.map(_.split(","))

262

.map(fields => (fields(0), fields(1).toDouble)) // (userId, amount)

263

264

// Small lookup table

265

val userProfiles = sc.textFile("hdfs://users.txt")

266

.map(_.split(","))

267

.map(fields => (fields(0), fields(1))) // (userId, name)

268

.collectAsMap()

269

270

// Broadcast small dataset for efficient lookup

271

val broadcastProfiles: Broadcast[Map[String, String]] = sc.broadcast(userProfiles)

272

273

// Use broadcast variable instead of join

274

val enrichedTransactions = transactions.map { case (userId, amount) =>

275

val userName = broadcastProfiles.value.getOrElse(userId, "Unknown")

276

(userId, userName, amount)

277

}

278

```

279

280

### Custom Partitioning for Performance

281

282

```scala

283

import org.apache.spark.Partitioner

284

285

// Custom partitioner for geographic data

286

class RegionPartitioner(numPartitions: Int) extends Partitioner {

287

override def numPartitions: Int = numPartitions

288

289

override def getPartition(key: Any): Int = {

290

val region = key.asInstanceOf[String]

291

region match {

292

case r if r.startsWith("US") => 0

293

case r if r.startsWith("EU") => 1

294

case r if r.startsWith("ASIA") => 2

295

case _ => 3

296

}

297

}

298

}

299

300

val locationData = sc.parallelize(Array(

301

("US-CA", "data1"), ("EU-DE", "data2"), ("ASIA-JP", "data3")

302

))

303

304

// Apply custom partitioning

305

val partitioned = locationData.partitionBy(new RegionPartitioner(4))

306

307

// Subsequent joins will be more efficient

308

val moreLocationData = sc.parallelize(Array(

309

("US-CA", "moredata1"), ("EU-DE", "moredata2")

310

)).partitionBy(new RegionPartitioner(4))

311

312

val joined = partitioned.join(moreLocationData) // No shuffle needed!

313

```

314

315

### Window Operations with groupByKey

316

317

```scala

318

case class Event(timestamp: Long, userId: String, action: String)

319

320

val events = sc.parallelize(Array(

321

Event(1000, "user1", "login"),

322

Event(1010, "user1", "click"),

323

Event(1020, "user1", "logout"),

324

Event(1005, "user2", "login"),

325

Event(1015, "user2", "click")

326

))

327

328

// Group events by user and process in time windows

329

val userSessions = events

330

.map(event => (event.userId, event))

331

.groupByKey()

332

.mapValues { events =>

333

val sortedEvents = events.toList.sortBy(_.timestamp)

334

335

// Define session boundaries (30 second timeout)

336

val sessions = sortedEvents.foldLeft(List.empty[List[Event]]) { (sessions, event) =>

337

sessions match {

338

case Nil => List(List(event))

339

case currentSession :: otherSessions =>

340

if (event.timestamp - currentSession.last.timestamp <= 30000) {

341

// Add to current session

342

(currentSession :+ event) :: otherSessions

343

} else {

344

// Start new session

345

List(event) :: sessions

346

}

347

}

348

}

349

350

sessions.reverse.map(_.length) // Session lengths

351

}

352

```

353

354

## Performance Considerations

355

356

### Avoiding groupByKey

357

358

```scala

359

// INEFFICIENT: groupByKey followed by reduction

360

val wordCounts1 = words

361

.map((_, 1))

362

.groupByKey() // Shuffles all data

363

.mapValues(_.sum) // Reduces after shuffle

364

365

// EFFICIENT: Use reduceByKey instead

366

val wordCounts2 = words

367

.map((_, 1))

368

.reduceByKey(_ + _) // Reduces before shuffle

369

```

370

371

### Choosing the Right Operation

372

373

```scala

374

val data = sc.parallelize(Array(("a", 1), ("b", 2), ("a", 3)))

375

376

// Use combineByKey for complex aggregations

377

val stats = data.combineByKey(

378

(v: Int) => (v, 1), // Create combiner

379

(acc: (Int, Int), v: Int) => (acc._1 + v, acc._2 + 1), // Merge value

380

(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // Merge combiners

381

).mapValues(acc => (acc._1, acc._2, acc._1.toDouble / acc._2)) // (sum, count, avg)

382

383

// Use aggregateByKey when you need different types

384

val letterStats = data.aggregateByKey("")(

385

(acc, v) => if (acc.isEmpty) v.toString else acc + "," + v,

386

(acc1, acc2) => acc1 + ";" + acc2

387

)

388

```