0
# Partitioning and Distribution
1
2
Control over data distribution and partitioning strategies across the cluster. These operations determine how data is distributed among parallel processing instances.
3
4
## Capabilities
5
6
### Hash Partitioning
7
8
Distribute data using hash-based partitioning to ensure even distribution.
9
10
```scala { .api }
11
class DataSet[T] {
12
/**
13
* Partitions data by hash of specified fields
14
* @param fields Field positions to use for hashing
15
* @return DataSet with hash partitioning
16
*/
17
def partitionByHash(fields: Int*): DataSet[T]
18
19
/**
20
* Partitions data by hash of named fields
21
* @param firstField First field name
22
* @param otherFields Additional field names
23
* @return DataSet with hash partitioning
24
*/
25
def partitionByHash(firstField: String, otherFields: String*): DataSet[T]
26
27
/**
28
* Partitions data by hash of key selector result
29
* @param fun Key selector function
30
* @return DataSet with hash partitioning based on key
31
*/
32
def partitionByHash[K: TypeInformation](fun: T => K): DataSet[T]
33
}
34
```
35
36
**Usage Examples:**
37
38
```scala
39
import org.apache.flink.api.scala._
40
41
case class Customer(id: Int, name: String, region: String)
42
43
val env = ExecutionEnvironment.getExecutionEnvironment
44
val customers = env.fromElements(
45
Customer(1, "Alice", "North"),
46
Customer(2, "Bob", "South"),
47
Customer(3, "Charlie", "North"),
48
Customer(4, "Diana", "West")
49
)
50
51
// Partition by customer ID field
52
val partitionedById = customers.partitionByHash("id")
53
54
// Partition by multiple fields
55
val partitionedByRegionAndId = customers.partitionByHash("region", "id")
56
57
// Partition using key selector function
58
val partitionedByRegion = customers.partitionByHash(_.region)
59
```
60
61
### Range Partitioning
62
63
Distribute data using range-based partitioning for ordered distribution.
64
65
```scala { .api }
66
class DataSet[T] {
67
/**
68
* Partitions data by range of specified fields
69
* @param fields Field positions for range partitioning
70
* @return DataSet with range partitioning
71
*/
72
def partitionByRange(fields: Int*): DataSet[T]
73
74
/**
75
* Partitions data by range of named fields
76
* @param firstField First field name
77
* @param otherFields Additional field names
78
* @return DataSet with range partitioning
79
*/
80
def partitionByRange(firstField: String, otherFields: String*): DataSet[T]
81
82
/**
83
* Partitions data by range of key selector result
84
* @param fun Key selector function
85
* @return DataSet with range partitioning based on key
86
*/
87
def partitionByRange[K: TypeInformation](fun: T => K): DataSet[T]
88
}
89
```
90
91
### Range Partitioning with Data Distribution
92
93
Advanced range partitioning using custom data distribution.
94
95
```scala { .api }
96
implicit class DataSetUtils[T](dataSet: DataSet[T]) {
97
/**
98
* Partitions by range using custom data distribution
99
* @param distribution Custom data distribution
100
* @param fields Field positions for partitioning
101
* @return DataSet with custom range partitioning
102
*/
103
def partitionByRange(distribution: DataDistribution, fields: Int*): DataSet[T]
104
105
/**
106
* Partitions by range using custom data distribution and field names
107
* @param distribution Custom data distribution
108
* @param firstField First field name
109
* @param otherFields Additional field names
110
* @return DataSet with custom range partitioning
111
*/
112
def partitionByRange(distribution: DataDistribution, firstField: String, otherFields: String*): DataSet[T]
113
114
/**
115
* Partitions by range using custom data distribution and key selector
116
* @param distribution Custom data distribution
117
* @param fun Key selector function
118
* @return DataSet with custom range partitioning
119
*/
120
def partitionByRange[K: TypeInformation](distribution: DataDistribution, fun: T => K): DataSet[T]
121
}
122
```
123
124
**Usage Examples:**
125
126
```scala
127
// Range partition by age for ordered processing
128
case class Person(name: String, age: Int, salary: Double)
129
130
val people = env.fromElements(
131
Person("Alice", 25, 50000),
132
Person("Bob", 30, 60000),
133
Person("Charlie", 35, 70000)
134
)
135
136
// Simple range partitioning
137
val rangePartitioned = people.partitionByRange(_.age)
138
139
// Range partitioning with multiple fields
140
val rangeByAgeAndSalary = people.partitionByRange(p => (p.age, p.salary))
141
```
142
143
### Custom Partitioning
144
145
Use custom partitioning logic for specialized distribution requirements.
146
147
```scala { .api }
148
class DataSet[T] {
149
/**
150
* Partitions using custom partitioner on specified field
151
* @param partitioner Custom partitioner implementation
152
* @param field Field position for partitioning
153
* @return DataSet with custom partitioning
154
*/
155
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: Int): DataSet[T]
156
157
/**
158
* Partitions using custom partitioner on named field
159
* @param partitioner Custom partitioner implementation
160
* @param field Field name for partitioning
161
* @return DataSet with custom partitioning
162
*/
163
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: String): DataSet[T]
164
165
/**
166
* Partitions using custom partitioner on key selector result
167
* @param partitioner Custom partitioner implementation
168
* @param fun Key selector function
169
* @return DataSet with custom partitioning
170
*/
171
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataSet[T]
172
}
173
```
174
175
**Usage Examples:**
176
177
```scala
178
import org.apache.flink.api.common.functions.Partitioner
179
180
// Custom partitioner that groups regions together
181
class RegionPartitioner extends Partitioner[String] {
182
override def partition(key: String, numPartitions: Int): Int = {
183
key match {
184
case "North" | "South" => 0 % numPartitions
185
case "East" | "West" => 1 % numPartitions
186
case _ => 2 % numPartitions
187
}
188
}
189
}
190
191
val customPartitioned = customers.partitionCustom(new RegionPartitioner, _.region)
192
```
193
194
### Load Balancing
195
196
Rebalance data across all parallel instances for even load distribution.
197
198
```scala { .api }
199
class DataSet[T] {
200
/**
201
* Rebalances data across all parallel instances using round-robin
202
* @return DataSet with rebalanced distribution
203
*/
204
def rebalance(): DataSet[T]
205
}
206
```
207
208
**Usage Examples:**
209
210
```scala
211
// Rebalance after filtering to ensure even distribution
212
val filteredAndRebalanced = customers
213
.filter(_.region == "North")
214
.rebalance()
215
.map(processCustomer)
216
```
217
218
### Partition Sorting
219
220
Sort data within each partition for optimized processing.
221
222
```scala { .api }
223
class DataSet[T] {
224
/**
225
* Sorts elements within each partition by field position
226
* @param field Field position for sorting
227
* @param order Sort order (ASCENDING or DESCENDING)
228
* @return DataSet with sorted partitions
229
*/
230
def sortPartition(field: Int, order: Order): DataSet[T]
231
232
/**
233
* Sorts elements within each partition by field name
234
* @param field Field name for sorting
235
* @param order Sort order (ASCENDING or DESCENDING)
236
* @return DataSet with sorted partitions
237
*/
238
def sortPartition(field: String, order: Order): DataSet[T]
239
240
/**
241
* Sorts elements within each partition using key selector
242
* @param fun Key selector function for sorting
243
* @param order Sort order (ASCENDING or DESCENDING)
244
* @return DataSet with sorted partitions
245
*/
246
def sortPartition[K: TypeInformation](fun: T => K, order: Order): DataSet[T]
247
}
248
```
249
250
### Multi-Key Partition Sorting
251
252
Chain multiple sorting keys for complex partition sorting.
253
254
```scala { .api }
255
class PartitionSortedDataSet[T] extends DataSet[T] {
256
/**
257
* Adds secondary sort key by field position
258
* @param field Field position for secondary sorting
259
* @param order Sort order for secondary key
260
* @return DataSet with multi-key sorted partitions
261
*/
262
def sortPartition(field: Int, order: Order): DataSet[T]
263
264
/**
265
* Adds secondary sort key by field name
266
* @param field Field name for secondary sorting
267
* @param order Sort order for secondary key
268
* @return DataSet with multi-key sorted partitions
269
*/
270
def sortPartition(field: String, order: Order): DataSet[T]
271
272
/**
273
* Adds secondary sort key using key selector
274
* @param fun Key selector function for secondary sorting
275
* @param order Sort order for secondary key
276
* @return DataSet with multi-key sorted partitions
277
*/
278
def sortPartition[K: TypeInformation](fun: T => K, order: Order): DataSet[T]
279
}
280
```
281
282
**Usage Examples:**
283
284
```scala
285
import org.apache.flink.api.common.operators.Order
286
287
// Sort within partitions by age, then by salary
288
val sortedPartitions = people
289
.partitionByHash(_.region)
290
.sortPartition(_.age, Order.ASCENDING)
291
.sortPartition(_.salary, Order.DESCENDING)
292
293
// Sort by multiple fields using field names
294
val sortedByFields = people
295
.sortPartition("age", Order.ASCENDING)
296
.sortPartition("salary", Order.DESCENDING)
297
```
298
299
### Grouped Partitioning
300
301
Control partitioning for grouped DataSets to optimize group processing.
302
303
```scala { .api }
304
class GroupedDataSet[T] {
305
/**
306
* Uses custom partitioner for group distribution
307
* @param partitioner Custom partitioner for group keys
308
* @return GroupedDataSet with custom partitioning
309
*/
310
def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): GroupedDataSet[T]
311
}
312
```
313
314
### Binary Operation Partitioning
315
316
Control partitioning for join, cross, and coGroup operations.
317
318
```scala { .api }
319
trait JoinFunctionAssigner[L, R] {
320
/**
321
* Uses custom partitioner for join distribution
322
* @param partitioner Custom partitioner for join keys
323
* @return JoinFunctionAssigner with custom partitioning
324
*/
325
def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): JoinFunctionAssigner[L, R]
326
}
327
328
class CoGroupDataSet[L, R] {
329
/**
330
* Uses custom partitioner for coGroup distribution
331
* @param partitioner Custom partitioner for coGroup keys
332
* @return CoGroupDataSet with custom partitioning
333
*/
334
def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): CoGroupDataSet[L, R]
335
}
336
```
337
338
**Usage Examples:**
339
340
```scala
341
// Custom partitioning for joins
342
val joinResult = leftDataSet
343
.join(rightDataSet)
344
.where(_.key)
345
.equalTo(_.key)
346
.withPartitioner(new CustomKeyPartitioner)
347
.apply((left, right) => combineData(left, right))
348
```
349
350
### Distribution Strategies
351
352
Hints for optimizing data distribution in binary operations.
353
354
```scala { .api }
355
class DataSet[T] {
356
/**
357
* Hints that this DataSet is small for broadcast operations
358
* @param other Large DataSet to join with
359
* @return Join operation optimized for broadcasting this DataSet
360
*/
361
def joinWithHuge[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
362
363
/**
364
* Hints that other DataSet is small for broadcast operations
365
* @param other Small DataSet to broadcast
366
* @return Join operation optimized for broadcasting other DataSet
367
*/
368
def joinWithTiny[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
369
370
/**
371
* Cross with broadcasting hint for small DataSet
372
* @param other Small DataSet to broadcast
373
* @return Cross operation with broadcast optimization
374
*/
375
def crossWithTiny[O](other: DataSet[O]): CrossDataSet[T, O]
376
377
/**
378
* Cross with broadcasting hint for large DataSet
379
* @param other Large DataSet (this will be broadcast)
380
* @return Cross operation with broadcast optimization
381
*/
382
def crossWithHuge[O](other: DataSet[O]): CrossDataSet[T, O]
383
}
384
```
385
386
### Parallelism Control
387
388
Set parallelism at the operation level to control resource usage.
389
390
```scala { .api }
391
class DataSet[T] {
392
/**
393
* Sets parallelism for this operation
394
* @param parallelism Degree of parallelism
395
* @return DataSet with specified parallelism
396
*/
397
def setParallelism(parallelism: Int): DataSet[T]
398
399
/**
400
* Gets the parallelism of this operation
401
* @return Current parallelism setting
402
*/
403
def getParallelism: Int
404
}
405
```
406
407
**Usage Examples:**
408
409
```scala
410
// Set different parallelism for expensive operations
411
val result = data
412
.setParallelism(8) // Use 8 parallel instances
413
.map(expensiveTransformation)
414
.setParallelism(4) // Reduce to 4 for subsequent operations
415
.reduce(combineResults)
416
```
417
418
### Resource Requirements
419
420
Specify minimum and preferred resource requirements for operations.
421
422
```scala { .api }
423
class DataSet[T] {
424
/**
425
* Gets minimum resource requirements
426
* @return ResourceSpec with minimum requirements
427
*/
428
def minResources: ResourceSpec
429
430
/**
431
* Gets preferred resource requirements
432
* @return ResourceSpec with preferred requirements
433
*/
434
def preferredResources: ResourceSpec
435
}
436
```
437
438
## Types
439
440
```scala { .api }
441
abstract class Partitioner[T] {
442
/**
443
* Determines partition for given key
444
* @param key Key to partition
445
* @param numPartitions Total number of partitions
446
* @return Partition index (0 to numPartitions-1)
447
*/
448
def partition(key: T, numPartitions: Int): Int
449
}
450
451
sealed trait Order
452
object Order {
453
case object ASCENDING extends Order
454
case object DESCENDING extends Order
455
}
456
457
class PartitionSortedDataSet[T] extends DataSet[T] {
458
// Represents a DataSet with sorted partitions that allows chaining additional sort keys
459
}
460
461
trait DataDistribution {
462
/**
463
* Gets bucket boundaries for range partitioning
464
* @return Array of bucket boundaries
465
*/
466
def getBucketBoundary(bucketNum: Int, totalNumBuckets: Int): AnyRef
467
}
468
469
class ResourceSpec {
470
/**
471
* Gets CPU cores requirement
472
* @return Number of CPU cores
473
*/
474
def getCpuCores: Double
475
476
/**
477
* Gets heap memory requirement in MB
478
* @return Heap memory in megabytes
479
*/
480
def getHeapMemoryInMB: Int
481
482
/**
483
* Gets direct memory requirement in MB
484
* @return Direct memory in megabytes
485
*/
486
def getDirectMemoryInMB: Int
487
488
/**
489
* Gets native memory requirement in MB
490
* @return Native memory in megabytes
491
*/
492
def getNativeMemoryInMB: Int
493
494
/**
495
* Gets network memory requirement in MB
496
* @return Network memory in megabytes
497
*/
498
def getNetworkMemoryInMB: Int
499
}
500
501
object ResourceSpec {
502
/**
503
* Creates ResourceSpec with default values
504
* @return Default ResourceSpec
505
*/
506
def DEFAULT: ResourceSpec
507
508
/**
509
* Creates ResourceSpec with unknown requirements
510
* @return Unknown ResourceSpec
511
*/
512
def UNKNOWN: ResourceSpec
513
514
/**
515
* Creates new ResourceSpec builder
516
* @return ResourceSpec builder
517
*/
518
def newBuilder(): ResourceSpec.Builder
519
}
520
```