0
# Partitioning Strategies
1
2
Flexible partitioning strategies for distributing data across Kinesis shards and mapping shards to Flink subtasks, enabling optimal load balancing and data distribution.
3
4
## Capabilities
5
6
### KinesisPartitioner
7
8
Abstract base class for implementing custom partitioning strategies that determine how records are distributed across Kinesis shards.
9
10
```java { .api }
11
@PublicEvolving
12
public abstract class KinesisPartitioner<T> implements Serializable {
13
14
/**
15
* Get the partition ID for a record.
16
*
17
* @param element Record to partition
18
* @return Partition ID string
19
*/
20
public abstract String getPartitionId(T element);
21
22
/**
23
* Get explicit hash key for fine-grained shard assignment.
24
*
25
* @param element Record to get hash key for
26
* @return Explicit hash key or null for automatic assignment
27
*/
28
public String getExplicitHashKey(T element) {
29
return null; // Default: use automatic hash key derivation
30
}
31
32
/**
33
* Initialize the partitioner with parallelism information.
34
*
35
* @param indexOfThisSubtask Index of current Flink subtask
36
* @param numberOfParallelSubtasks Total number of parallel subtasks
37
*/
38
public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks) {
39
// Default: no initialization needed
40
}
41
}
42
```
43
44
### FixedKinesisPartitioner
45
46
Ensures each Flink partition maps to the same Kinesis partition, providing deterministic routing based on subtask index.
47
48
```java { .api }
49
@PublicEvolving
50
public class FixedKinesisPartitioner<T> extends KinesisPartitioner<T> {
51
52
/**
53
* Initialize with subtask information.
54
*
55
* @param indexOfThisSubtask Index of current Flink subtask
56
* @param numberOfParallelSubtasks Total number of parallel subtasks
57
*/
58
@Override
59
public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks);
60
61
/**
62
* Get fixed partition ID based on subtask index.
63
*
64
* @param record Record (not used for partitioning decision)
65
* @return Fixed partition ID for this subtask
66
*/
67
@Override
68
public String getPartitionId(T record);
69
70
@Override
71
public boolean equals(Object o);
72
73
@Override
74
public int hashCode();
75
}
76
```
77
78
### RandomKinesisPartitioner
79
80
Maps elements to random partition IDs for even distribution across all available shards.
81
82
```java { .api }
83
@PublicEvolving
84
public class RandomKinesisPartitioner<T> extends KinesisPartitioner<T> {
85
86
/**
87
* Get random partition ID for even distribution.
88
*
89
* @param element Record (not used for partitioning decision)
90
* @return Random partition ID
91
*/
92
@Override
93
public String getPartitionId(T element);
94
95
@Override
96
public boolean equals(Object o);
97
98
@Override
99
public int hashCode();
100
}
101
```
102
103
### KinesisShardAssigner
104
105
Interface for mapping Kinesis shards to Flink subtask indices, controlling load balancing across consumers.
106
107
```java { .api }
108
@PublicEvolving
109
public interface KinesisShardAssigner extends Serializable {
110
111
/**
112
* Returns the index of the target subtask that a specific shard should be assigned to.
113
* If the returned index is out of range [0, numParallelSubtasks), a modulus operation will be applied.
114
*
115
* @param shard Kinesis shard to assign
116
* @param numParallelSubtasks Total number of parallel subtasks
117
* @return Target subtask index (0 to numParallelSubtasks - 1)
118
*/
119
int assign(StreamShardHandle shard, int numParallelSubtasks);
120
}
121
```
122
123
## Usage Examples
124
125
### Custom Business Logic Partitioner
126
127
```java
128
public class UserPartitioner extends KinesisPartitioner<UserEvent> {
129
130
@Override
131
public String getPartitionId(UserEvent element) {
132
// Partition by user ID to maintain ordering per user
133
return String.valueOf(element.getUserId());
134
}
135
136
@Override
137
public String getExplicitHashKey(UserEvent element) {
138
// Use explicit hash for finer control over shard assignment
139
return String.valueOf(element.getUserId());
140
}
141
}
142
143
// Usage with producer
144
FlinkKinesisProducer<UserEvent> producer = new FlinkKinesisProducer<>(schema, props);
145
producer.setCustomPartitioner(new UserPartitioner());
146
```
147
148
### Tenant-Based Partitioning
149
150
```java
151
public class TenantPartitioner extends KinesisPartitioner<TenantEvent> {
152
private int numberOfParallelSubtasks;
153
154
@Override
155
public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks) {
156
this.numberOfParallelSubtasks = numberOfParallelSubtasks;
157
}
158
159
@Override
160
public String getPartitionId(TenantEvent element) {
161
String tenantId = element.getTenantId();
162
163
// Ensure consistent routing for each tenant
164
int partition = Math.abs(tenantId.hashCode()) % numberOfParallelSubtasks;
165
return String.valueOf(partition);
166
}
167
168
@Override
169
public String getExplicitHashKey(TenantEvent element) {
170
// Use tenant ID as hash key for balanced distribution
171
return element.getTenantId();
172
}
173
}
174
```
175
176
### Geographic Partitioning
177
178
```java
179
public class GeographicPartitioner extends KinesisPartitioner<LocationEvent> {
180
private static final Map<String, String> REGION_PARTITIONS = Map.of(
181
"US_EAST", "0",
182
"US_WEST", "1",
183
"EU", "2",
184
"ASIA", "3"
185
);
186
187
@Override
188
public String getPartitionId(LocationEvent element) {
189
String region = element.getRegion();
190
return REGION_PARTITIONS.getOrDefault(region, "0");
191
}
192
193
@Override
194
public String getExplicitHashKey(LocationEvent element) {
195
// Sub-partition within region by city
196
return element.getRegion() + "_" + element.getCity();
197
}
198
}
199
```
200
201
### Time-Based Partitioning
202
203
```java
204
public class TimeBasedPartitioner extends KinesisPartitioner<TimeSeriesEvent> {
205
private static final int PARTITIONS_PER_HOUR = 4;
206
207
@Override
208
public String getPartitionId(TimeSeriesEvent element) {
209
long timestamp = element.getTimestamp();
210
211
// Partition by 15-minute intervals
212
long intervalIndex = (timestamp / (15 * 60 * 1000)) % PARTITIONS_PER_HOUR;
213
return String.valueOf(intervalIndex);
214
}
215
216
@Override
217
public String getExplicitHashKey(TimeSeriesEvent element) {
218
// Use timestamp for ordering within partition
219
return String.valueOf(element.getTimestamp());
220
}
221
}
222
```
223
224
### Load-Aware Partitioning
225
226
```java
227
public class LoadAwarePartitioner extends KinesisPartitioner<WeightedEvent> {
228
private int[] partitionWeights;
229
private int totalWeight;
230
231
@Override
232
public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks) {
233
// Initialize partition weights based on expected load
234
partitionWeights = new int[numberOfParallelSubtasks];
235
for (int i = 0; i < numberOfParallelSubtasks; i++) {
236
partitionWeights[i] = 100; // Default weight
237
}
238
totalWeight = Arrays.stream(partitionWeights).sum();
239
}
240
241
@Override
242
public String getPartitionId(WeightedEvent element) {
243
int eventWeight = element.getProcessingWeight();
244
int targetPartition = selectPartitionByWeight(eventWeight);
245
return String.valueOf(targetPartition);
246
}
247
248
private int selectPartitionByWeight(int eventWeight) {
249
int randomValue = ThreadLocalRandom.current().nextInt(totalWeight);
250
int currentWeight = 0;
251
252
for (int i = 0; i < partitionWeights.length; i++) {
253
currentWeight += partitionWeights[i];
254
if (randomValue < currentWeight) {
255
return i;
256
}
257
}
258
return 0; // Fallback
259
}
260
}
261
```
262
263
### Custom Shard Assignment
264
265
```java
266
public class BalancedShardAssigner implements KinesisShardAssigner {
267
268
@Override
269
public int assign(StreamShardHandle shard, int numParallelSubtasks) {
270
String shardId = shard.getShard().getShardId();
271
272
// Use consistent hashing for balanced assignment
273
return Math.abs(shardId.hashCode()) % numParallelSubtasks;
274
}
275
}
276
277
// Usage with consumer
278
FlinkKinesisConsumer<Event> consumer = new FlinkKinesisConsumer<>(stream, schema, props);
279
consumer.setShardAssigner(new BalancedShardAssigner());
280
```
281
282
### Priority-Based Shard Assignment
283
284
```java
285
public class PriorityShardAssigner implements KinesisShardAssigner {
286
private final Map<String, Integer> shardPriorities;
287
288
public PriorityShardAssigner(Map<String, Integer> shardPriorities) {
289
this.shardPriorities = shardPriorities;
290
}
291
292
@Override
293
public int assign(StreamShardHandle shard, int numParallelSubtasks) {
294
String shardId = shard.getShard().getShardId();
295
int priority = shardPriorities.getOrDefault(shardId, 0);
296
297
// Assign high-priority shards to specific subtasks
298
if (priority > 8) {
299
return 0; // Dedicated high-priority subtask
300
} else if (priority > 5) {
301
return 1; // Medium-priority subtask
302
} else {
303
// Distribute low-priority shards across remaining subtasks
304
return 2 + (Math.abs(shardId.hashCode()) % (numParallelSubtasks - 2));
305
}
306
}
307
}
308
```
309
310
### Stream-Aware Shard Assignment
311
312
```java
313
public class StreamAwareShardAssigner implements KinesisShardAssigner {
314
315
@Override
316
public int assign(StreamShardHandle shard, int numParallelSubtasks) {
317
String streamName = shard.getStreamName();
318
String shardId = shard.getShard().getShardId();
319
320
// Assign shards from different streams to different subtasks when possible
321
int streamHash = Math.abs(streamName.hashCode());
322
int shardHash = Math.abs(shardId.hashCode());
323
324
// Combine stream and shard information for assignment
325
return (streamHash + shardHash) % numParallelSubtasks;
326
}
327
}
328
```
329
330
## Advanced Partitioning Patterns
331
332
### Composite Key Partitioning
333
334
```java
335
public class CompositeKeyPartitioner extends KinesisPartitioner<OrderEvent> {
336
337
@Override
338
public String getPartitionId(OrderEvent element) {
339
// Composite key: customer_id + order_date
340
String customerId = element.getCustomerId();
341
String orderDate = element.getOrderDate().format(DateTimeFormatter.ISO_LOCAL_DATE);
342
return customerId + "_" + orderDate;
343
}
344
345
@Override
346
public String getExplicitHashKey(OrderEvent element) {
347
// Fine-grained hash based on full order ID
348
return element.getOrderId();
349
}
350
}
351
```
352
353
### Seasonal Partitioning
354
355
```java
356
public class SeasonalPartitioner extends KinesisPartitioner<SalesEvent> {
357
358
@Override
359
public String getPartitionId(SalesEvent element) {
360
LocalDateTime timestamp = element.getTimestamp();
361
362
// Adjust partitioning based on expected seasonal load
363
if (isHolidaySeason(timestamp)) {
364
// More partitions during high season
365
return String.valueOf(timestamp.getHour() % 8);
366
} else {
367
// Fewer partitions during low season
368
return String.valueOf(timestamp.getHour() % 4);
369
}
370
}
371
372
private boolean isHolidaySeason(LocalDateTime timestamp) {
373
int month = timestamp.getMonthValue();
374
return month == 11 || month == 12; // November and December
375
}
376
}
377
```
378
379
### Adaptive Partitioning
380
381
```java
382
public class AdaptivePartitioner extends KinesisPartitioner<MetricEvent> {
383
private volatile int currentPartitions = 4;
384
private final AtomicLong messageCount = new AtomicLong(0);
385
private volatile long lastAdjustment = System.currentTimeMillis();
386
387
@Override
388
public String getPartitionId(MetricEvent element) {
389
long count = messageCount.incrementAndGet();
390
391
// Adjust partition count based on throughput
392
if (count % 10000 == 0) {
393
adjustPartitionCount();
394
}
395
396
// Use hash-based assignment with current partition count
397
return String.valueOf(Math.abs(element.getMetricId().hashCode()) % currentPartitions);
398
}
399
400
private void adjustPartitionCount() {
401
long now = System.currentTimeMillis();
402
long elapsed = now - lastAdjustment;
403
404
if (elapsed > 60000) { // Adjust every minute
405
long currentRate = messageCount.get() * 1000 / elapsed;
406
407
if (currentRate > 100000 && currentPartitions < 16) {
408
currentPartitions *= 2; // Scale up
409
} else if (currentRate < 10000 && currentPartitions > 2) {
410
currentPartitions /= 2; // Scale down
411
}
412
413
lastAdjustment = now;
414
messageCount.set(0);
415
}
416
}
417
}
418
```
419
420
## Performance Considerations
421
422
### Partition Distribution
423
424
```java
425
public class DistributionAwarePartitioner extends KinesisPartitioner<Event> {
426
private final Map<String, AtomicLong> partitionCounts = new ConcurrentHashMap<>();
427
private volatile int preferredPartitions;
428
429
@Override
430
public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks) {
431
this.preferredPartitions = numberOfParallelSubtasks * 2; // 2x parallelism
432
}
433
434
@Override
435
public String getPartitionId(Event element) {
436
// Find least loaded partition
437
String leastLoadedPartition = findLeastLoadedPartition();
438
partitionCounts.computeIfAbsent(leastLoadedPartition, k -> new AtomicLong(0)).incrementAndGet();
439
return leastLoadedPartition;
440
}
441
442
private String findLeastLoadedPartition() {
443
long minCount = Long.MAX_VALUE;
444
String minPartition = "0";
445
446
for (int i = 0; i < preferredPartitions; i++) {
447
String partition = String.valueOf(i);
448
long count = partitionCounts.computeIfAbsent(partition, k -> new AtomicLong(0)).get();
449
if (count < minCount) {
450
minCount = count;
451
minPartition = partition;
452
}
453
}
454
455
return minPartition;
456
}
457
}
458
```
459
460
## Best Practices
461
462
1. **Key Selection**: Choose partition keys that provide good distribution and maintain ordering requirements
463
2. **Hot Partitions**: Avoid keys that create hot partitions (e.g., timestamp-only keys)
464
3. **Shard Limits**: Consider Kinesis shard limits (1000 records/second or 1MB/second per shard)
465
4. **Consistency**: Use consistent partitioning to maintain message ordering when required
466
5. **Monitoring**: Monitor partition distribution and shard utilization metrics
467
6. **Evolution**: Design partitioning strategies that can evolve with changing data patterns