Custom partitioning logic for determining target Kafka partitions when producing messages. Partitioners enable control over message distribution across partitions, affecting parallelism, ordering guarantees, and load balancing.
Abstract base class for implementing custom Kafka partitioning logic within Flink.
public abstract class FlinkKafkaPartitioner<T> implements Serializable {
public void open(int parallelInstanceId, int parallelInstances);
public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
}Methods:
open() - Initialize the partitioner for a specific parallel instance
parallelInstanceId - 0-indexed ID of this parallel instanceparallelInstances - Total number of parallel instancespartition() - Determine target partition for a record
record - Original record object being sentkey - Serialized message key (may be null)value - Serialized message valuetargetTopic - Target topic namepartitions - Array of available partition IDs for the topicpartitions array)Usage Example:
public class UserIdPartitioner<T extends UserEvent> extends FlinkKafkaPartitioner<T> {
@Override
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
// Partition based on user ID to ensure events for same user go to same partition
String userId = record.getUserId();
if (userId == null) {
return partitions[0]; // Default partition for null user IDs
}
int hash = userId.hashCode();
return partitions[Math.abs(hash) % partitions.length];
}
}Always routes messages to partition 0, useful for single-partition topics or when ordering across all messages is required.
public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
public FlinkFixedPartitioner();
}Usage Example:
// All messages go to partition 0
FlinkKafkaPartitioner<MyEvent> partitioner = new FlinkFixedPartitioner<>();Use Cases:
Delegates partitioning decisions to Kafka's built-in default partitioner, providing standard Kafka partitioning behavior.
public class FlinkKafkaDelegatePartitioner<T> extends FlinkKafkaPartitioner<T> {
public FlinkKafkaDelegatePartitioner();
}Usage Example:
// Use Kafka's default partitioning logic
FlinkKafkaPartitioner<MyEvent> partitioner = new FlinkKafkaDelegatePartitioner<>();Kafka Default Partitioning Behavior:
Abstract partitioner interface compatible with older Kafka clients.
public abstract class KafkaPartitioner<T> implements Serializable {
public abstract int partition(T record, byte[] key, byte[] value, int numPartitions);
}This is maintained for backward compatibility with older connector versions.
Partition based on a specific field to ensure related records go to the same partition:
public class OrderPartitioner extends FlinkKafkaPartitioner<Order> {
@Override
public int partition(Order record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
// Partition by customer ID to keep all orders for a customer together
String customerId = record.getCustomerId();
if (customerId == null) {
return partitions[0];
}
int hash = customerId.hashCode();
return partitions[Math.abs(hash) % partitions.length];
}
}Partition based on timestamp ranges for time-series data:
public class TimeBasedPartitioner extends FlinkKafkaPartitioner<TimestampedEvent> {
private static final long HOUR_IN_MILLIS = 60 * 60 * 1000;
@Override
public int partition(TimestampedEvent record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
// Partition by hour to group events in time windows
long timestamp = record.getTimestamp();
long hourBucket = timestamp / HOUR_IN_MILLIS;
return partitions[(int) (hourBucket % partitions.length)];
}
}Distribute messages evenly across partitions using round-robin:
public class RoundRobinPartitioner<T> extends FlinkKafkaPartitioner<T> {
private int nextPartition = 0;
@Override
public void open(int parallelInstanceId, int parallelInstances) {
// Start each parallel instance at a different offset to avoid contention
this.nextPartition = parallelInstanceId;
}
@Override
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
int targetPartition = partitions[nextPartition % partitions.length];
nextPartition++;
return targetPartition;
}
}More sophisticated partitioner that considers partition load:
public class LoadAwarePartitioner<T> extends FlinkKafkaPartitioner<T> {
private final Map<Integer, AtomicLong> partitionCounts = new ConcurrentHashMap<>();
@Override
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
// Find partition with lowest message count
int targetPartition = partitions[0];
long minCount = partitionCounts.computeIfAbsent(targetPartition, k -> new AtomicLong(0)).get();
for (int partition : partitions) {
long count = partitionCounts.computeIfAbsent(partition, k -> new AtomicLong(0)).get();
if (count < minCount) {
minCount = count;
targetPartition = partition;
}
}
partitionCounts.get(targetPartition).incrementAndGet();
return targetPartition;
}
}Choose partition keys that provide good distribution:
// Good: User ID (assuming reasonable distribution)
int partition = Math.abs(userId.hashCode()) % partitions.length;
// Bad: Boolean flag (only 2 possible values)
int partition = record.isActive() ? 0 : 1;
// Good: Combination of fields for better distribution
String compositeKey = record.getRegion() + ":" + record.getUserId();
int partition = Math.abs(compositeKey.hashCode()) % partitions.length;Always validate partition selection:
@Override
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
if (partitions.length == 0) {
throw new IllegalArgumentException("No partitions available for topic: " + targetTopic);
}
int selectedPartition = selectPartition(record, partitions);
// Validate that selected partition is in the available partitions array
boolean isValid = false;
for (int partition : partitions) {
if (partition == selectedPartition) {
isValid = true;
break;
}
}
if (!isValid) {
// Fall back to first available partition
return partitions[0];
}
return selectedPartition;
}Always test partitioning behavior:
@Test
public void testPartitionDistribution() {
UserIdPartitioner<UserEvent> partitioner = new UserIdPartitioner<>();
int[] partitions = {0, 1, 2, 3, 4};
Map<Integer, Integer> distribution = new HashMap<>();
// Test with various user IDs
for (int i = 0; i < 1000; i++) {
UserEvent event = new UserEvent("user" + i, "action");
int partition = partitioner.partition(event, null, null, "test-topic", partitions);
distribution.merge(partition, 1, Integer::sum);
}
// Verify reasonable distribution
for (int count : distribution.values()) {
assertTrue("Uneven distribution", count > 150 && count < 250);
}
}