0
# Partitioners
1
2
Custom partitioning logic for determining target Kafka partitions when producing messages. Partitioners enable control over message distribution across partitions, affecting parallelism, ordering guarantees, and load balancing.
3
4
## Capabilities
5
6
### FlinkKafkaPartitioner
7
8
Abstract base class for implementing custom Kafka partitioning logic within Flink.
9
10
```java { .api }
11
public abstract class FlinkKafkaPartitioner<T> implements Serializable {
12
public void open(int parallelInstanceId, int parallelInstances);
13
public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
14
}
15
```
16
17
**Methods:**
18
19
- `open()` - Initialize the partitioner for a specific parallel instance
20
- `parallelInstanceId` - 0-indexed ID of this parallel instance
21
- `parallelInstances` - Total number of parallel instances
22
23
- `partition()` - Determine target partition for a record
24
- `record` - Original record object being sent
25
- `key` - Serialized message key (may be null)
26
- `value` - Serialized message value
27
- `targetTopic` - Target topic name
28
- `partitions` - Array of available partition IDs for the topic
29
- **Returns:** Target partition ID (must be one of the values in `partitions` array)
30
31
**Usage Example:**
32
33
```java
34
public class UserIdPartitioner<T extends UserEvent> extends FlinkKafkaPartitioner<T> {
35
@Override
36
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
37
// Partition based on user ID to ensure events for same user go to same partition
38
String userId = record.getUserId();
39
if (userId == null) {
40
return partitions[0]; // Default partition for null user IDs
41
}
42
43
int hash = userId.hashCode();
44
return partitions[Math.abs(hash) % partitions.length];
45
}
46
}
47
```
48
49
### Built-in Partitioner Implementations
50
51
#### FlinkFixedPartitioner
52
53
Always routes messages to partition 0, useful for single-partition topics or when ordering across all messages is required.
54
55
```java { .api }
56
public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
57
public FlinkFixedPartitioner();
58
}
59
```
60
61
**Usage Example:**
62
63
```java
64
// All messages go to partition 0
65
FlinkKafkaPartitioner<MyEvent> partitioner = new FlinkFixedPartitioner<>();
66
```
67
68
**Use Cases:**
69
- Single-partition topics
70
- Maintaining global ordering of all messages
71
- Simple scenarios where partition-level parallelism is not needed
72
- Testing and development with predictable partitioning
73
74
#### FlinkKafkaDelegatePartitioner
75
76
Delegates partitioning decisions to Kafka's built-in default partitioner, providing standard Kafka partitioning behavior.
77
78
```java { .api }
79
public class FlinkKafkaDelegatePartitioner<T> extends FlinkKafkaPartitioner<T> {
80
public FlinkKafkaDelegatePartitioner();
81
}
82
```
83
84
**Usage Example:**
85
86
```java
87
// Use Kafka's default partitioning logic
88
FlinkKafkaPartitioner<MyEvent> partitioner = new FlinkKafkaDelegatePartitioner<>();
89
```
90
91
**Kafka Default Partitioning Behavior:**
92
- If key is present: Hash-based partitioning using key
93
- If key is null: Round-robin distribution across partitions
94
- Ensures even distribution and good load balancing
95
96
### KafkaPartitioner (Legacy)
97
98
Abstract partitioner interface compatible with older Kafka clients.
99
100
```java { .api }
101
public abstract class KafkaPartitioner<T> implements Serializable {
102
public abstract int partition(T record, byte[] key, byte[] value, int numPartitions);
103
}
104
```
105
106
This is maintained for backward compatibility with older connector versions.
107
108
## Custom Partitioner Examples
109
110
### Hash-Based Partitioner
111
112
Partition based on a specific field to ensure related records go to the same partition:
113
114
```java
115
public class OrderPartitioner extends FlinkKafkaPartitioner<Order> {
116
@Override
117
public int partition(Order record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
118
// Partition by customer ID to keep all orders for a customer together
119
String customerId = record.getCustomerId();
120
if (customerId == null) {
121
return partitions[0];
122
}
123
124
int hash = customerId.hashCode();
125
return partitions[Math.abs(hash) % partitions.length];
126
}
127
}
128
```
129
130
### Time-Based Partitioner
131
132
Partition based on timestamp ranges for time-series data:
133
134
```java
135
public class TimeBasedPartitioner extends FlinkKafkaPartitioner<TimestampedEvent> {
136
private static final long HOUR_IN_MILLIS = 60 * 60 * 1000;
137
138
@Override
139
public int partition(TimestampedEvent record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
140
// Partition by hour to group events in time windows
141
long timestamp = record.getTimestamp();
142
long hourBucket = timestamp / HOUR_IN_MILLIS;
143
144
return partitions[(int) (hourBucket % partitions.length)];
145
}
146
}
147
```
148
149
### Round-Robin Partitioner
150
151
Distribute messages evenly across partitions using round-robin:
152
153
```java
154
public class RoundRobinPartitioner<T> extends FlinkKafkaPartitioner<T> {
155
private int nextPartition = 0;
156
157
@Override
158
public void open(int parallelInstanceId, int parallelInstances) {
159
// Start each parallel instance at a different offset to avoid contention
160
this.nextPartition = parallelInstanceId;
161
}
162
163
@Override
164
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
165
int targetPartition = partitions[nextPartition % partitions.length];
166
nextPartition++;
167
return targetPartition;
168
}
169
}
170
```
171
172
### Load-Aware Partitioner
173
174
More sophisticated partitioner that considers partition load:
175
176
```java
177
public class LoadAwarePartitioner<T> extends FlinkKafkaPartitioner<T> {
178
private final Map<Integer, AtomicLong> partitionCounts = new ConcurrentHashMap<>();
179
180
@Override
181
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
182
// Find partition with lowest message count
183
int targetPartition = partitions[0];
184
long minCount = partitionCounts.computeIfAbsent(targetPartition, k -> new AtomicLong(0)).get();
185
186
for (int partition : partitions) {
187
long count = partitionCounts.computeIfAbsent(partition, k -> new AtomicLong(0)).get();
188
if (count < minCount) {
189
minCount = count;
190
targetPartition = partition;
191
}
192
}
193
194
partitionCounts.get(targetPartition).incrementAndGet();
195
return targetPartition;
196
}
197
}
198
```
199
200
## Partitioning Strategies and Trade-offs
201
202
### Key-Based Partitioning
203
- **Pros:** Maintains ordering per key, enables stateful processing
204
- **Cons:** Potential hotspots with skewed keys, uneven distribution
205
- **Use Cases:** User events, session data, transaction processing
206
207
### Round-Robin Partitioning
208
- **Pros:** Even distribution, good load balancing
209
- **Cons:** No ordering guarantees, no key locality
210
- **Use Cases:** Stateless processing, high-throughput scenarios
211
212
### Fixed Partitioning
213
- **Pros:** Simple, predictable, maintains global order
214
- **Cons:** No parallelism, potential bottleneck
215
- **Use Cases:** Single-partition topics, global ordering requirements
216
217
### Time-Based Partitioning
218
- **Pros:** Natural for time-series data, enables time-based processing
219
- **Cons:** Potential hotspots during high activity periods
220
- **Use Cases:** Event streams, metrics, logs
221
222
## Best Practices
223
224
### Partition Key Selection
225
226
Choose partition keys that provide good distribution:
227
228
```java
229
// Good: User ID (assuming reasonable distribution)
230
int partition = Math.abs(userId.hashCode()) % partitions.length;
231
232
// Bad: Boolean flag (only 2 possible values)
233
int partition = record.isActive() ? 0 : 1;
234
235
// Good: Combination of fields for better distribution
236
String compositeKey = record.getRegion() + ":" + record.getUserId();
237
int partition = Math.abs(compositeKey.hashCode()) % partitions.length;
238
```
239
240
### Error Handling
241
242
Always validate partition selection:
243
244
```java
245
@Override
246
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
247
if (partitions.length == 0) {
248
throw new IllegalArgumentException("No partitions available for topic: " + targetTopic);
249
}
250
251
int selectedPartition = selectPartition(record, partitions);
252
253
// Validate that selected partition is in the available partitions array
254
boolean isValid = false;
255
for (int partition : partitions) {
256
if (partition == selectedPartition) {
257
isValid = true;
258
break;
259
}
260
}
261
262
if (!isValid) {
263
// Fall back to first available partition
264
return partitions[0];
265
}
266
267
return selectedPartition;
268
}
269
```
270
271
### Performance Considerations
272
273
- Keep partitioning logic simple and fast (called for every record)
274
- Avoid heavy computations or external calls in partition method
275
- Consider caching expensive computations when possible
276
- Be aware of memory usage in stateful partitioners
277
278
### Testing Partitioners
279
280
Always test partitioning behavior:
281
282
```java
283
@Test
284
public void testPartitionDistribution() {
285
UserIdPartitioner<UserEvent> partitioner = new UserIdPartitioner<>();
286
int[] partitions = {0, 1, 2, 3, 4};
287
Map<Integer, Integer> distribution = new HashMap<>();
288
289
// Test with various user IDs
290
for (int i = 0; i < 1000; i++) {
291
UserEvent event = new UserEvent("user" + i, "action");
292
int partition = partitioner.partition(event, null, null, "test-topic", partitions);
293
distribution.merge(partition, 1, Integer::sum);
294
}
295
296
// Verify reasonable distribution
297
for (int count : distribution.values()) {
298
assertTrue("Uneven distribution", count > 150 && count < 250);
299
}
300
}
301
```