0
# Partitioning
1
2
Data partitioning strategies for controlling how RDD elements are distributed across cluster nodes to optimize performance and minimize data shuffling in distributed operations.
3
4
## Capabilities
5
6
### Partitioner Base Class
7
8
Abstract base class defining how keys are distributed across partitions in key-value RDDs.
9
10
```scala { .api }
11
/**
12
* Object that defines how keys are distributed across partitions
13
*/
14
abstract class Partitioner extends Serializable {
15
/** Number of partitions */
16
def numPartitions: Int
17
18
/** Get partition index for given key */
19
def getPartition(key: Any): Int
20
21
/** Whether this partitioner guarantees same partition for equal keys */
22
def equals(other: Any): Boolean
23
24
/** Hash code for this partitioner */
25
def hashCode(): Int
26
}
27
```
28
29
### Hash Partitioner
30
31
Default partitioner using hash function to distribute keys across partitions.
32
33
```scala { .api }
34
/**
35
* Partitioner that partitions using Java object hashCode
36
* @param partitions number of partitions
37
*/
38
class HashPartitioner(partitions: Int) extends Partitioner {
39
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
40
41
override def numPartitions: Int = partitions
42
43
override def getPartition(key: Any): Int = key match {
44
case null => 0
45
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
46
}
47
48
override def equals(other: Any): Boolean = other match {
49
case h: HashPartitioner => h.numPartitions == numPartitions
50
case _ => false
51
}
52
53
override def hashCode: Int = numPartitions
54
}
55
```
56
57
### Range Partitioner
58
59
Partitioner that distributes keys roughly evenly across partitions based on key ranges.
60
61
```scala { .api }
62
/**
63
* Partitioner that partitions sortable records by range into roughly equal ranges
64
* @param partitions number of partitions
65
* @param rdd RDD to sample for determining ranges
66
* @param ascending whether to sort keys in ascending order
67
*/
68
class RangePartitioner[K: Ordering: ClassTag, V](
69
partitions: Int,
70
rdd: RDD[_ <: Product2[K, V]],
71
private var ascending: Boolean = true,
72
val samplePointsPerPartitionHint: Int = 20
73
) extends Partitioner {
74
75
override def numPartitions: Int = partitions
76
77
override def getPartition(key: Any): Int = {
78
val k = key.asInstanceOf[K]
79
var partition = 0
80
if (rangeBounds.length <= 128) {
81
// If we have less than 128 partitions naive search is faster
82
while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
83
partition += 1
84
}
85
} else {
86
// Use binary search for larger partition counts
87
partition = binarySearch(rangeBounds, k)
88
if (partition < 0) {
89
partition = -partition - 1
90
}
91
if (partition > rangeBounds.length) {
92
partition = rangeBounds.length
93
}
94
}
95
96
if (ascending) {
97
partition
98
} else {
99
rangeBounds.length - partition
100
}
101
}
102
103
override def equals(other: Any): Boolean = other match {
104
case r: RangePartitioner[_, _] =>
105
r.rangeBounds.sameElements(rangeBounds) && r.ascending == ascending
106
case _ =>
107
false
108
}
109
110
override def hashCode(): Int = {
111
val prime = 31
112
var result = 1
113
var i = 0
114
while (i < rangeBounds.length) {
115
result = prime * result + rangeBounds(i).hashCode
116
i += 1
117
}
118
result = prime * result + ascending.hashCode
119
result
120
}
121
122
/** Range bounds array */
123
def rangeBounds: Array[K] = ???
124
}
125
```
126
127
### Custom Partitioners
128
129
Create domain-specific partitioners for specialized data distribution patterns.
130
131
```scala { .api }
132
/**
133
* Example: Partitioner for geographic data
134
*/
135
class GeographicPartitioner(regions: Array[String]) extends Partitioner {
136
private val regionToIndex = regions.zipWithIndex.toMap
137
138
override def numPartitions: Int = regions.length
139
140
override def getPartition(key: Any): Int = key match {
141
case location: String =>
142
// Extract region from location string
143
val region = extractRegion(location)
144
regionToIndex.getOrElse(region, 0)
145
case _ => 0
146
}
147
148
private def extractRegion(location: String): String = {
149
// Custom logic to determine region from location
150
if (location.contains("US")) "North America"
151
else if (location.contains("EU")) "Europe"
152
else if (location.contains("AS")) "Asia"
153
else "Other"
154
}
155
}
156
157
/**
158
* Example: Partitioner for time-series data
159
*/
160
class TimePartitioner(timeRanges: Array[(Long, Long)]) extends Partitioner {
161
override def numPartitions: Int = timeRanges.length
162
163
override def getPartition(key: Any): Int = key match {
164
case timestamp: Long =>
165
timeRanges.zipWithIndex.find { case ((start, end), _) =>
166
timestamp >= start && timestamp < end
167
}.map(_._2).getOrElse(0)
168
case _ => 0
169
}
170
}
171
172
/**
173
* Example: Partitioner based on key prefix
174
*/
175
class PrefixPartitioner(prefixes: Array[String]) extends Partitioner {
176
override def numPartitions: Int = prefixes.length
177
178
override def getPartition(key: Any): Int = key match {
179
case str: String =>
180
prefixes.zipWithIndex
181
.find { case (prefix, _) => str.startsWith(prefix) }
182
.map(_._2)
183
.getOrElse(0)
184
case _ => 0
185
}
186
}
187
```
188
189
### Partitioning RDD Operations
190
191
Methods for controlling and querying RDD partitioning.
192
193
```scala { .api }
194
abstract class RDD[T: ClassTag] {
195
/** Get the partitioner if this RDD has one */
196
def partitioner: Option[Partitioner]
197
198
/** Get array of partitions */
199
def partitions: Array[Partition]
200
201
/** Repartition RDD using hash partitioner */
202
def repartition(numPartitions: Int): RDD[T]
203
204
/** Coalesce to fewer partitions without shuffling */
205
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]
206
207
/** Get preferred locations for each partition */
208
def preferredLocations(split: Partition): Seq[String]
209
}
210
211
class PairRDDFunctions[K, V](self: RDD[(K, V)]) {
212
/** Partition RDD using specified partitioner */
213
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
214
215
/** Group by key with custom partitioner */
216
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
217
218
/** Reduce by key with custom partitioner */
219
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
220
221
/** Join with custom partitioner */
222
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
223
224
/** Sort by key with custom partitioner */
225
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
226
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[(K, V)]
227
}
228
```
229
230
### Partition Information
231
232
Classes providing metadata about RDD partitions.
233
234
```scala { .api }
235
/**
236
* Identifier for a partition in an RDD
237
*/
238
trait Partition extends Serializable {
239
/** Partition index within its parent RDD */
240
def index: Int
241
242
/** Hash code based on index */
243
override def hashCode(): Int = index
244
245
/** Equality based on index */
246
override def equals(other: Any): Boolean = other match {
247
case that: Partition => this.index == that.index
248
case _ => false
249
}
250
}
251
252
/**
253
* Partition for HadoopRDD
254
*/
255
class HadoopPartition(rddId: Int, override val index: Int, inputSplit: InputSplit)
256
extends Partition {
257
258
def inputSplit: InputSplit = ???
259
260
override def hashCode(): Int = 41 * (41 + rddId) + index
261
}
262
263
/**
264
* Partition created from a range
265
*/
266
case class ParallelCollectionPartition[T: ClassTag](
267
override val index: Int,
268
start: Int,
269
end: Int,
270
values: Seq[T]
271
) extends Partition
272
```
273
274
**Usage Examples:**
275
276
```scala
277
import org.apache.spark.{SparkContext, SparkConf, HashPartitioner, RangePartitioner}
278
279
val sc = new SparkContext(new SparkConf().setAppName("Partitioning Example"))
280
281
// Create pair RDD
282
val data = sc.parallelize(Array(
283
("apple", 1), ("banana", 2), ("cherry", 3), ("apple", 4),
284
("banana", 5), ("date", 6), ("elderberry", 7), ("apple", 8)
285
))
286
287
println(s"Default partitions: ${data.partitions.length}")
288
println(s"Default partitioner: ${data.partitioner}")
289
290
// Hash partitioning
291
val hashPartitioned = data.partitionBy(new HashPartitioner(4))
292
println(s"Hash partitioner: ${hashPartitioned.partitioner}")
293
294
// Verify partitioning is preserved through transformations
295
val grouped = hashPartitioned.groupByKey() // No shuffle needed!
296
println(s"Grouped partitioner: ${grouped.partitioner}")
297
298
// Range partitioning for sortable keys
299
val numbers = sc.parallelize(Array(
300
(1, "one"), (5, "five"), (3, "three"), (9, "nine"),
301
(2, "two"), (7, "seven"), (4, "four"), (8, "eight")
302
))
303
304
val rangePartitioned = numbers.partitionBy(new RangePartitioner(3, numbers))
305
println(s"Range partitioner: ${rangePartitioned.partitioner}")
306
307
// Custom partitioner example
308
class EvenOddPartitioner extends Partitioner {
309
override def numPartitions: Int = 2
310
311
override def getPartition(key: Any): Int = key match {
312
case i: Int => if (i % 2 == 0) 0 else 1
313
case _ => 0
314
}
315
}
316
317
val evenOddPartitioned = numbers.partitionBy(new EvenOddPartitioner())
318
println("Even/Odd partitioning:")
319
evenOddPartitioned.glom().collect().zipWithIndex.foreach { case (partition, index) =>
320
println(s"Partition $index: ${partition.mkString(", ")}")
321
}
322
323
// Coalescing partitions
324
val manyPartitions = sc.parallelize(1 to 100, 20)
325
println(s"Many partitions: ${manyPartitions.partitions.length}")
326
327
val coalesced = manyPartitions.coalesce(5)
328
println(s"Coalesced partitions: ${coalesced.partitions.length}")
329
330
// Repartitioning
331
val repartitioned = manyPartitions.repartition(8)
332
println(s"Repartitioned: ${repartitioned.partitions.length}")
333
334
// Partition-aware operations
335
val partitionedData = sc.parallelize(Array(
336
("user1", "data1"), ("user2", "data2"), ("user1", "data3"), ("user3", "data4")
337
), 2).partitionBy(new HashPartitioner(2))
338
339
// This join will not cause a shuffle since both RDDs use same partitioner
340
val otherData = sc.parallelize(Array(
341
("user1", "profile1"), ("user2", "profile2"), ("user3", "profile3")
342
)).partitionBy(new HashPartitioner(2))
343
344
val joined = partitionedData.join(otherData) // No shuffle!
345
println("Joined data:")
346
joined.collect().foreach(println)
347
348
sc.stop()
349
```
350
351
**Java Examples:**
352
353
```java
354
import org.apache.spark.HashPartitioner;
355
import org.apache.spark.Partitioner;
356
import org.apache.spark.SparkConf;
357
import org.apache.spark.api.java.JavaPairRDD;
358
import org.apache.spark.api.java.JavaSparkContext;
359
import scala.Tuple2;
360
361
import java.util.Arrays;
362
import java.util.List;
363
364
JavaSparkContext sc = new JavaSparkContext(
365
new SparkConf().setAppName("Java Partitioning Example")
366
);
367
368
// Create pair RDD
369
List<Tuple2<String, Integer>> data = Arrays.asList(
370
new Tuple2<>("apple", 1),
371
new Tuple2<>("banana", 2),
372
new Tuple2<>("apple", 3)
373
);
374
JavaPairRDD<String, Integer> pairRDD = sc.parallelizePairs(data);
375
376
// Hash partitioning
377
JavaPairRDD<String, Integer> partitioned = pairRDD.partitionBy(new HashPartitioner(2));
378
379
// Verify partitioning
380
System.out.println("Partitioner: " + partitioned.partitioner());
381
382
// Custom partitioner in Java
383
class CustomPartitioner extends Partitioner {
384
@Override
385
public int numPartitions() {
386
return 2;
387
}
388
389
@Override
390
public int getPartition(Object key) {
391
return key.toString().length() % 2;
392
}
393
}
394
395
JavaPairRDD<String, Integer> customPartitioned = pairRDD.partitionBy(new CustomPartitioner());
396
397
sc.close();
398
```
399
400
## Performance Considerations
401
402
### Choosing the Right Partitioner
403
404
- **HashPartitioner**: Good default choice for most use cases
405
- **RangePartitioner**: Best for sorted data and range queries
406
- **Custom Partitioners**: Use for domain-specific data distribution
407
408
### Partitioning Benefits
409
410
```scala
411
// Without partitioning - causes shuffle
412
val rdd1 = sc.parallelize(data1)
413
val rdd2 = sc.parallelize(data2)
414
val joined = rdd1.join(rdd2) // Shuffle occurs
415
416
// With partitioning - no shuffle
417
val partitioned1 = rdd1.partitionBy(new HashPartitioner(4))
418
val partitioned2 = rdd2.partitionBy(new HashPartitioner(4))
419
val joined = partitioned1.join(partitioned2) // No shuffle!
420
```
421
422
### Best Practices
423
424
- Use same partitioner for RDDs that will be joined frequently
425
- Consider data distribution when choosing partition count
426
- Partition early in your processing pipeline
427
- Cache partitioned RDDs for reuse
428
- Monitor partition sizes to avoid skew
429
430
### Avoiding Common Pitfalls
431
432
```scala
433
// Bad: Losing partitioning
434
val partitioned = rdd.partitionBy(new HashPartitioner(4))
435
val mapped = partitioned.map(x => (x._1.toUpperCase, x._2)) // Loses partitioning!
436
437
// Good: Preserving partitioning
438
val mapped = partitioned.mapValues(_.toUpperCase) // Preserves partitioning
439
440
// Bad: Uneven partitions
441
val skewed = rdd.partitionBy(new BadPartitioner()) // Some partitions much larger
442
443
// Good: Balanced partitions
444
val balanced = rdd.partitionBy(new HashPartitioner(numPartitions))
445
```
446
447
Effective partitioning is crucial for Spark performance, reducing network overhead and enabling efficient distributed operations across your cluster.