or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

broadcast-accumulators.mddata-io-persistence.mdindex.mdkey-value-operations.mdrdd-operations.mdsparkcontext.md

key-value-operations.mddocs/

0

# Key-Value Operations

1

2

Specialized operations available on RDDs of key-value pairs through implicit conversion to PairRDDFunctions. These operations provide powerful capabilities for grouping, joining, and aggregating data by keys, forming the foundation for many distributed data processing patterns.

3

4

## Capabilities

5

6

### Grouping Operations

7

8

Group values by key for subsequent processing.

9

10

```scala { .api }

11

// Available on RDD[(K, V)] via implicit conversion

12

def groupByKey(): RDD[(K, Iterable[V])]

13

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]

14

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

15

16

// Group values by custom function

17

def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])]

18

def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterable[T])]

19

def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Iterable[T])]

20

```

21

22

**Usage Examples:**

23

```scala

24

val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("c", 5)))

25

26

// Group values by key

27

val grouped = pairs.groupByKey()

28

// Result: [("a", [1, 3]), ("b", [2, 4]), ("c", [5])]

29

30

// Group with specific number of partitions

31

val groupedPartitioned = pairs.groupByKey(numPartitions = 3)

32

33

// Group regular RDD by derived key

34

val numbers = sc.parallelize(1 to 10)

35

val evenOdd = numbers.groupBy(_ % 2)

36

// Result: [(0, [2, 4, 6, 8, 10]), (1, [1, 3, 5, 7, 9])]

37

```

38

39

### Reduction Operations

40

41

Efficiently reduce values by key without creating intermediate collections.

42

43

```scala { .api }

44

// Key-wise reduction

45

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

46

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

47

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

48

49

// Key-wise folding

50

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

51

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

52

def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

53

```

54

55

**Usage Examples:**

56

```scala

57

val sales = sc.parallelize(Seq(

58

("product1", 100), ("product2", 200), ("product1", 150),

59

("product2", 300), ("product3", 50)

60

))

61

62

// Sum sales by product

63

val totalSales = sales.reduceByKey(_ + _)

64

// Result: [("product1", 250), ("product2", 500), ("product3", 50)]

65

66

// Find maximum sale per product

67

val maxSales = sales.reduceByKey(math.max)

68

// Result: [("product1", 150), ("product2", 300), ("product3", 50)]

69

70

// Fold with initial value (adds base amount to each product)

71

val salesWithBase = sales.foldByKey(10)(_ + _)

72

// Result: [("product1", 260), ("product2", 510), ("product3", 60)]

73

```

74

75

### Aggregation Operations

76

77

Flexible aggregation with different input and output types.

78

79

```scala { .api }

80

// General aggregation

81

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

82

def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

83

def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

84

85

// Combine values with different logic per key

86

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

87

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]

88

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]

89

```

90

91

**Usage Examples:**

92

```scala

93

val scores = sc.parallelize(Seq(

94

("Alice", 85), ("Bob", 90), ("Alice", 92), ("Bob", 87), ("Alice", 78)

95

))

96

97

// Calculate average score per student

98

case class ScoreStats(sum: Int, count: Int) {

99

def average: Double = sum.toDouble / count

100

}

101

102

val averages = scores.aggregateByKey(ScoreStats(0, 0))(

103

seqOp = (stats, score) => ScoreStats(stats.sum + score, stats.count + 1),

104

combOp = (s1, s2) => ScoreStats(s1.sum + s2.sum, s1.count + s2.count)

105

).mapValues(_.average)

106

// Result: [("Alice", 85.0), ("Bob", 88.5)]

107

108

// Using combineByKey for the same result

109

val averages2 = scores.combineByKey(

110

createCombiner = (score: Int) => ScoreStats(score, 1),

111

mergeValue = (stats: ScoreStats, score: Int) => ScoreStats(stats.sum + score, stats.count + 1),

112

mergeCombiners = (s1: ScoreStats, s2: ScoreStats) => ScoreStats(s1.sum + s2.sum, s1.count + s2.count)

113

).mapValues(_.average)

114

```

115

116

### Join Operations

117

118

Join RDDs by key using various join strategies.

119

120

```scala { .api }

121

// Inner joins

122

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

123

def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]

124

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

125

126

// Outer joins

127

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

128

def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]

129

def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]

130

131

def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]

132

def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]

133

def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]

134

135

def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]

136

def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]

137

def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]

138

```

139

140

**Usage Examples:**

141

```scala

142

val users = sc.parallelize(Seq(("u1", "Alice"), ("u2", "Bob"), ("u3", "Charlie")))

143

val orders = sc.parallelize(Seq(("u1", "order1"), ("u2", "order2"), ("u4", "order3")))

144

145

// Inner join: only matching keys

146

val userOrders = users.join(orders)

147

// Result: [("u1", ("Alice", "order1")), ("u2", ("Bob", "order2"))]

148

149

// Left outer join: all users, matched orders

150

val allUsersWithOrders = users.leftOuterJoin(orders)

151

// Result: [("u1", ("Alice", Some("order1"))), ("u2", ("Bob", Some("order2"))), ("u3", ("Charlie", None))]

152

153

// Right outer join: all orders, matched users

154

val allOrdersWithUsers = users.rightOuterJoin(orders)

155

// Result: [("u1", (Some("Alice"), "order1")), ("u2", (Some("Bob"), "order2")), ("u4", (None, "order3"))]

156

157

// Full outer join: all users and orders

158

val fullJoin = users.fullOuterJoin(orders)

159

// Result: [("u1", (Some("Alice"), Some("order1"))), ("u2", (Some("Bob"), Some("order2"))),

160

// ("u3", (Some("Charlie"), None)), ("u4", (None, Some("order3")))]

161

```

162

163

### Cogroup Operations

164

165

Group multiple RDDs by key, creating Cartesian-style groupings.

166

167

```scala { .api }

168

// Two RDD cogroup

169

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

170

def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]

171

def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]

172

173

// Three RDD cogroup

174

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]

175

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]

176

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]

177

178

// Four RDD cogroup

179

def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]

180

```

181

182

**Usage Examples:**

183

```scala

184

val rdd1 = sc.parallelize(Seq(("k1", "a"), ("k2", "b"), ("k1", "c")))

185

val rdd2 = sc.parallelize(Seq(("k1", 1), ("k3", 2), ("k1", 3)))

186

187

// Cogroup creates grouped iterables for each key

188

val cogrouped = rdd1.cogroup(rdd2)

189

// Result: [("k1", (["a", "c"], [1, 3])), ("k2", (["b"], [])), ("k3", ([], [2]))]

190

191

// Useful for implementing custom join logic

192

val customJoin = cogrouped.flatMap { case (key, (values1, values2)) =>

193

if (values1.nonEmpty && values2.nonEmpty) {

194

for (v1 <- values1; v2 <- values2) yield (key, s"$v1-$v2")

195

} else {

196

Seq.empty

197

}

198

}

199

```

200

201

### Key and Value Operations

202

203

Extract and transform keys and values independently.

204

205

```scala { .api }

206

// Key/value extraction

207

def keys: RDD[K]

208

def values: RDD[V]

209

210

// Value transformations (preserving keys)

211

def mapValues[U](f: V => U): RDD[(K, U)]

212

def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]

213

```

214

215

**Usage Examples:**

216

```scala

217

val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))

218

219

// Extract keys and values

220

val keys = pairs.keys // ["a", "b", "c"]

221

val values = pairs.values // [1, 2, 3]

222

223

// Transform values only

224

val doubled = pairs.mapValues(_ * 2)

225

// Result: [("a", 2), ("b", 4), ("c", 6)]

226

227

// Flat map values

228

val wordsWithCounts = sc.parallelize(Seq(("line1", "hello world"), ("line2", "scala spark")))

229

val wordCounts = wordsWithCounts.flatMapValues(_.split(" "))

230

// Result: [("line1", "hello"), ("line1", "world"), ("line2", "scala"), ("line2", "spark")]

231

```

232

233

### Collection Actions

234

235

Collect key-value data as maps and count operations.

236

237

```scala { .api }

238

// Collection as map (use with caution for large datasets)

239

def collectAsMap(): Map[K, V]

240

241

// Counting operations

242

def countByKey(): Map[K, Long]

243

def countApproxDistinctByKey(relativeSD: Double = 0.05): Map[K, Long]

244

def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): Map[K, Long]

245

def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): Map[K, Long]

246

```

247

248

**Usage Examples:**

249

```scala

250

val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("c", 4)))

251

252

// Convert to map (last value for each key wins)

253

val asMap = pairs.collectAsMap()

254

// Result: Map("a" -> 3, "b" -> 2, "c" -> 4)

255

256

// Count occurrences of each key

257

val keyCounts = pairs.countByKey()

258

// Result: Map("a" -> 2, "b" -> 1, "c" -> 1)

259

260

// Approximate distinct count for large datasets

261

val distinctCounts = pairs.countApproxDistinctByKey(relativeSD = 0.1)

262

```

263

264

### Sampling Operations

265

266

Stratified sampling operations for key-value RDDs.

267

268

```scala { .api }

269

// Stratified sampling

270

def sampleByKey(withReplacement: Boolean, fractions: Map[K, Double], seed: Long = Utils.random.nextLong): RDD[(K, V)]

271

def sampleByKeyExact(withReplacement: Boolean, fractions: Map[K, Double], seed: Long = Utils.random.nextLong): RDD[(K, V)]

272

```

273

274

**Usage Examples:**

275

```scala

276

val data = sc.parallelize(Seq(

277

("cat", "fluffy"), ("cat", "whiskers"), ("cat", "mittens"),

278

("dog", "buddy"), ("dog", "max"), ("dog", "bella"),

279

("bird", "tweet"), ("bird", "chirp")

280

))

281

282

// Sample different fractions for each key

283

val fractions = Map("cat" -> 0.5, "dog" -> 0.3, "bird" -> 1.0)

284

val sampled = data.sampleByKey(withReplacement = false, fractions)

285

286

// Exact stratified sampling (guarantees exact sample sizes)

287

val exactSampled = data.sampleByKeyExact(withReplacement = false, fractions)

288

```

289

290

## Partitioning for Key-Value RDDs

291

292

### Hash Partitioning

293

294

Default partitioning strategy using key hash codes.

295

296

```scala { .api }

297

case class HashPartitioner(partitions: Int) extends Partitioner {

298

def numPartitions: Int = partitions

299

def getPartition(key: Any): Int

300

}

301

302

// Apply hash partitioning

303

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

304

```

305

306

**Usage Example:**

307

```scala

308

val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("a", 4)))

309

310

// Partition by hash with 4 partitions

311

val hashPartitioned = pairs.partitionBy(new HashPartitioner(4))

312

313

// Check partitioning

314

println(s"Partitioner: ${hashPartitioned.partitioner}")

315

```

316

317

### Range Partitioning

318

319

Partition keys by ranges for ordered data.

320

321

```scala { .api }

322

case class RangePartitioner[K: Ordering: ClassTag, V](

323

partitions: Int,

324

rdd: RDD[_ <: Product2[K, V]],

325

ascending: Boolean = true,

326

samplePointsPerPartitionHint: Int = 20

327

) extends Partitioner

328

```

329

330

**Usage Example:**

331

```scala

332

val pairs = sc.parallelize(Seq((1, "a"), (5, "b"), (3, "c"), (8, "d"), (2, "e")))

333

334

// Range partition to keep keys in order across partitions

335

val rangePartitioned = pairs.partitionBy(new RangePartitioner(3, pairs))

336

```

337

338

## Types

339

340

```scala { .api }

341

// Implicit conversion adds these methods to RDD[(K, V)]

342

class PairRDDFunctions[K, V](self: RDD[(K, V)])

343

(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)

344

345

// Partitioning strategies

346

abstract class Partitioner extends Serializable {

347

def numPartitions: Int

348

def getPartition(key: Any): Int

349

}

350

351

case class HashPartitioner(partitions: Int) extends Partitioner

352

case class RangePartitioner[K: Ordering: ClassTag, V](

353

partitions: Int,

354

rdd: RDD[_ <: Product2[K, V]]

355

) extends Partitioner

356

357

// Dependency for shuffle operations

358

class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](

359

@transient _rdd: RDD[_ <: Product2[K, V]],

360

partitioner: Partitioner

361

) extends Dependency[Product2[K, V]]

362

```