or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

accumulators.mdapplication-context.mdbroadcast-variables.mdindex.mdjava-api.mdpartitioning.mdrdd-operations.mdserialization.mdstorage-persistence.md

partitioning.mddocs/

0

# Partitioning

1

2

Data partitioning strategies for controlling how RDD elements are distributed across cluster nodes to optimize performance and minimize data shuffling in distributed operations.

3

4

## Capabilities

5

6

### Partitioner Base Class

7

8

Abstract base class defining how keys are distributed across partitions in key-value RDDs.

9

10

```scala { .api }

11

/**

12

* Object that defines how keys are distributed across partitions

13

*/

14

abstract class Partitioner extends Serializable {

15

/** Number of partitions */

16

def numPartitions: Int

17

18

/** Get partition index for given key */

19

def getPartition(key: Any): Int

20

21

/** Whether this partitioner guarantees same partition for equal keys */

22

def equals(other: Any): Boolean

23

24

/** Hash code for this partitioner */

25

def hashCode(): Int

26

}

27

```

28

29

### Hash Partitioner

30

31

Default partitioner using hash function to distribute keys across partitions.

32

33

```scala { .api }

34

/**

35

* Partitioner that partitions using Java object hashCode

36

* @param partitions number of partitions

37

*/

38

class HashPartitioner(partitions: Int) extends Partitioner {

39

require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

40

41

override def numPartitions: Int = partitions

42

43

override def getPartition(key: Any): Int = key match {

44

case null => 0

45

case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)

46

}

47

48

override def equals(other: Any): Boolean = other match {

49

case h: HashPartitioner => h.numPartitions == numPartitions

50

case _ => false

51

}

52

53

override def hashCode: Int = numPartitions

54

}

55

```

56

57

### Range Partitioner

58

59

Partitioner that distributes keys roughly evenly across partitions based on key ranges.

60

61

```scala { .api }

62

/**

63

* Partitioner that partitions sortable records by range into roughly equal ranges

64

* @param partitions number of partitions

65

* @param rdd RDD to sample for determining ranges

66

* @param ascending whether to sort keys in ascending order

67

*/

68

class RangePartitioner[K: Ordering: ClassTag, V](

69

partitions: Int,

70

rdd: RDD[_ <: Product2[K, V]],

71

private var ascending: Boolean = true,

72

val samplePointsPerPartitionHint: Int = 20

73

) extends Partitioner {

74

75

override def numPartitions: Int = partitions

76

77

override def getPartition(key: Any): Int = {

78

val k = key.asInstanceOf[K]

79

var partition = 0

80

if (rangeBounds.length <= 128) {

81

// If we have less than 128 partitions naive search is faster

82

while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {

83

partition += 1

84

}

85

} else {

86

// Use binary search for larger partition counts

87

partition = binarySearch(rangeBounds, k)

88

if (partition < 0) {

89

partition = -partition - 1

90

}

91

if (partition > rangeBounds.length) {

92

partition = rangeBounds.length

93

}

94

}

95

96

if (ascending) {

97

partition

98

} else {

99

rangeBounds.length - partition

100

}

101

}

102

103

override def equals(other: Any): Boolean = other match {

104

case r: RangePartitioner[_, _] =>

105

r.rangeBounds.sameElements(rangeBounds) && r.ascending == ascending

106

case _ =>

107

false

108

}

109

110

override def hashCode(): Int = {

111

val prime = 31

112

var result = 1

113

var i = 0

114

while (i < rangeBounds.length) {

115

result = prime * result + rangeBounds(i).hashCode

116

i += 1

117

}

118

result = prime * result + ascending.hashCode

119

result

120

}

121

122

/** Range bounds array */

123

def rangeBounds: Array[K] = ???

124

}

125

```

126

127

### Custom Partitioners

128

129

Create domain-specific partitioners for specialized data distribution patterns.

130

131

```scala { .api }

132

/**

133

* Example: Partitioner for geographic data

134

*/

135

class GeographicPartitioner(regions: Array[String]) extends Partitioner {

136

private val regionToIndex = regions.zipWithIndex.toMap

137

138

override def numPartitions: Int = regions.length

139

140

override def getPartition(key: Any): Int = key match {

141

case location: String =>

142

// Extract region from location string

143

val region = extractRegion(location)

144

regionToIndex.getOrElse(region, 0)

145

case _ => 0

146

}

147

148

private def extractRegion(location: String): String = {

149

// Custom logic to determine region from location

150

if (location.contains("US")) "North America"

151

else if (location.contains("EU")) "Europe"

152

else if (location.contains("AS")) "Asia"

153

else "Other"

154

}

155

}

156

157

/**

158

* Example: Partitioner for time-series data

159

*/

160

class TimePartitioner(timeRanges: Array[(Long, Long)]) extends Partitioner {

161

override def numPartitions: Int = timeRanges.length

162

163

override def getPartition(key: Any): Int = key match {

164

case timestamp: Long =>

165

timeRanges.zipWithIndex.find { case ((start, end), _) =>

166

timestamp >= start && timestamp < end

167

}.map(_._2).getOrElse(0)

168

case _ => 0

169

}

170

}

171

172

/**

173

* Example: Partitioner based on key prefix

174

*/

175

class PrefixPartitioner(prefixes: Array[String]) extends Partitioner {

176

override def numPartitions: Int = prefixes.length

177

178

override def getPartition(key: Any): Int = key match {

179

case str: String =>

180

prefixes.zipWithIndex

181

.find { case (prefix, _) => str.startsWith(prefix) }

182

.map(_._2)

183

.getOrElse(0)

184

case _ => 0

185

}

186

}

187

```

188

189

### Partitioning RDD Operations

190

191

Methods for controlling and querying RDD partitioning.

192

193

```scala { .api }

194

abstract class RDD[T: ClassTag] {

195

/** Get the partitioner if this RDD has one */

196

def partitioner: Option[Partitioner]

197

198

/** Get array of partitions */

199

def partitions: Array[Partition]

200

201

/** Repartition RDD using hash partitioner */

202

def repartition(numPartitions: Int): RDD[T]

203

204

/** Coalesce to fewer partitions without shuffling */

205

def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]

206

207

/** Get preferred locations for each partition */

208

def preferredLocations(split: Partition): Seq[String]

209

}

210

211

class PairRDDFunctions[K, V](self: RDD[(K, V)]) {

212

/** Partition RDD using specified partitioner */

213

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

214

215

/** Group by key with custom partitioner */

216

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

217

218

/** Reduce by key with custom partitioner */

219

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

220

221

/** Join with custom partitioner */

222

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

223

224

/** Sort by key with custom partitioner */

225

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)

226

(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[(K, V)]

227

}

228

```

229

230

### Partition Information

231

232

Classes providing metadata about RDD partitions.

233

234

```scala { .api }

235

/**

236

* Identifier for a partition in an RDD

237

*/

238

trait Partition extends Serializable {

239

/** Partition index within its parent RDD */

240

def index: Int

241

242

/** Hash code based on index */

243

override def hashCode(): Int = index

244

245

/** Equality based on index */

246

override def equals(other: Any): Boolean = other match {

247

case that: Partition => this.index == that.index

248

case _ => false

249

}

250

}

251

252

/**

253

* Partition for HadoopRDD

254

*/

255

class HadoopPartition(rddId: Int, override val index: Int, inputSplit: InputSplit)

256

extends Partition {

257

258

def inputSplit: InputSplit = ???

259

260

override def hashCode(): Int = 41 * (41 + rddId) + index

261

}

262

263

/**

264

* Partition created from a range

265

*/

266

case class ParallelCollectionPartition[T: ClassTag](

267

override val index: Int,

268

start: Int,

269

end: Int,

270

values: Seq[T]

271

) extends Partition

272

```

273

274

**Usage Examples:**

275

276

```scala

277

import org.apache.spark.{SparkContext, SparkConf, HashPartitioner, RangePartitioner}

278

279

val sc = new SparkContext(new SparkConf().setAppName("Partitioning Example"))

280

281

// Create pair RDD

282

val data = sc.parallelize(Array(

283

("apple", 1), ("banana", 2), ("cherry", 3), ("apple", 4),

284

("banana", 5), ("date", 6), ("elderberry", 7), ("apple", 8)

285

))

286

287

println(s"Default partitions: ${data.partitions.length}")

288

println(s"Default partitioner: ${data.partitioner}")

289

290

// Hash partitioning

291

val hashPartitioned = data.partitionBy(new HashPartitioner(4))

292

println(s"Hash partitioner: ${hashPartitioned.partitioner}")

293

294

// Verify partitioning is preserved through transformations

295

val grouped = hashPartitioned.groupByKey() // No shuffle needed!

296

println(s"Grouped partitioner: ${grouped.partitioner}")

297

298

// Range partitioning for sortable keys

299

val numbers = sc.parallelize(Array(

300

(1, "one"), (5, "five"), (3, "three"), (9, "nine"),

301

(2, "two"), (7, "seven"), (4, "four"), (8, "eight")

302

))

303

304

val rangePartitioned = numbers.partitionBy(new RangePartitioner(3, numbers))

305

println(s"Range partitioner: ${rangePartitioned.partitioner}")

306

307

// Custom partitioner example

308

class EvenOddPartitioner extends Partitioner {

309

override def numPartitions: Int = 2

310

311

override def getPartition(key: Any): Int = key match {

312

case i: Int => if (i % 2 == 0) 0 else 1

313

case _ => 0

314

}

315

}

316

317

val evenOddPartitioned = numbers.partitionBy(new EvenOddPartitioner())

318

println("Even/Odd partitioning:")

319

evenOddPartitioned.glom().collect().zipWithIndex.foreach { case (partition, index) =>

320

println(s"Partition $index: ${partition.mkString(", ")}")

321

}

322

323

// Coalescing partitions

324

val manyPartitions = sc.parallelize(1 to 100, 20)

325

println(s"Many partitions: ${manyPartitions.partitions.length}")

326

327

val coalesced = manyPartitions.coalesce(5)

328

println(s"Coalesced partitions: ${coalesced.partitions.length}")

329

330

// Repartitioning

331

val repartitioned = manyPartitions.repartition(8)

332

println(s"Repartitioned: ${repartitioned.partitions.length}")

333

334

// Partition-aware operations

335

val partitionedData = sc.parallelize(Array(

336

("user1", "data1"), ("user2", "data2"), ("user1", "data3"), ("user3", "data4")

337

), 2).partitionBy(new HashPartitioner(2))

338

339

// This join will not cause a shuffle since both RDDs use same partitioner

340

val otherData = sc.parallelize(Array(

341

("user1", "profile1"), ("user2", "profile2"), ("user3", "profile3")

342

)).partitionBy(new HashPartitioner(2))

343

344

val joined = partitionedData.join(otherData) // No shuffle!

345

println("Joined data:")

346

joined.collect().foreach(println)

347

348

sc.stop()

349

```

350

351

**Java Examples:**

352

353

```java

354

import org.apache.spark.HashPartitioner;

355

import org.apache.spark.Partitioner;

356

import org.apache.spark.SparkConf;

357

import org.apache.spark.api.java.JavaPairRDD;

358

import org.apache.spark.api.java.JavaSparkContext;

359

import scala.Tuple2;

360

361

import java.util.Arrays;

362

import java.util.List;

363

364

JavaSparkContext sc = new JavaSparkContext(

365

new SparkConf().setAppName("Java Partitioning Example")

366

);

367

368

// Create pair RDD

369

List<Tuple2<String, Integer>> data = Arrays.asList(

370

new Tuple2<>("apple", 1),

371

new Tuple2<>("banana", 2),

372

new Tuple2<>("apple", 3)

373

);

374

JavaPairRDD<String, Integer> pairRDD = sc.parallelizePairs(data);

375

376

// Hash partitioning

377

JavaPairRDD<String, Integer> partitioned = pairRDD.partitionBy(new HashPartitioner(2));

378

379

// Verify partitioning

380

System.out.println("Partitioner: " + partitioned.partitioner());

381

382

// Custom partitioner in Java

383

class CustomPartitioner extends Partitioner {

384

@Override

385

public int numPartitions() {

386

return 2;

387

}

388

389

@Override

390

public int getPartition(Object key) {

391

return key.toString().length() % 2;

392

}

393

}

394

395

JavaPairRDD<String, Integer> customPartitioned = pairRDD.partitionBy(new CustomPartitioner());

396

397

sc.close();

398

```

399

400

## Performance Considerations

401

402

### Choosing the Right Partitioner

403

404

- **HashPartitioner**: Good default choice for most use cases

405

- **RangePartitioner**: Best for sorted data and range queries

406

- **Custom Partitioners**: Use for domain-specific data distribution

407

408

### Partitioning Benefits

409

410

```scala

411

// Without partitioning - causes shuffle

412

val rdd1 = sc.parallelize(data1)

413

val rdd2 = sc.parallelize(data2)

414

val joined = rdd1.join(rdd2) // Shuffle occurs

415

416

// With partitioning - no shuffle

417

val partitioned1 = rdd1.partitionBy(new HashPartitioner(4))

418

val partitioned2 = rdd2.partitionBy(new HashPartitioner(4))

419

val joined = partitioned1.join(partitioned2) // No shuffle!

420

```

421

422

### Best Practices

423

424

- Use same partitioner for RDDs that will be joined frequently

425

- Consider data distribution when choosing partition count

426

- Partition early in your processing pipeline

427

- Cache partitioned RDDs for reuse

428

- Monitor partition sizes to avoid skew

429

430

### Avoiding Common Pitfalls

431

432

```scala

433

// Bad: Losing partitioning

434

val partitioned = rdd.partitionBy(new HashPartitioner(4))

435

val mapped = partitioned.map(x => (x._1.toUpperCase, x._2)) // Loses partitioning!

436

437

// Good: Preserving partitioning

438

val mapped = partitioned.mapValues(_.toUpperCase) // Preserves partitioning

439

440

// Bad: Uneven partitions

441

val skewed = rdd.partitionBy(new BadPartitioner()) // Some partitions much larger

442

443

// Good: Balanced partitions

444

val balanced = rdd.partitionBy(new HashPartitioner(numPartitions))

445

```

446

447

Effective partitioning is crucial for Spark performance, reducing network overhead and enabling efficient distributed operations across your cluster.