or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

consumer-base.mdindex.mdpartitioners.mdproducer-base.mdserialization.mdtable-api.md
tile.json

partitioners.mddocs/

Partitioners

Custom partitioning logic for determining target Kafka partitions when producing messages. Partitioners enable control over message distribution across partitions, affecting parallelism, ordering guarantees, and load balancing.

Capabilities

FlinkKafkaPartitioner

Abstract base class for implementing custom Kafka partitioning logic within Flink.

public abstract class FlinkKafkaPartitioner<T> implements Serializable {
    public void open(int parallelInstanceId, int parallelInstances);
    public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
}

Methods:

  • open() - Initialize the partitioner for a specific parallel instance

    • parallelInstanceId - 0-indexed ID of this parallel instance
    • parallelInstances - Total number of parallel instances
  • partition() - Determine target partition for a record

    • record - Original record object being sent
    • key - Serialized message key (may be null)
    • value - Serialized message value
    • targetTopic - Target topic name
    • partitions - Array of available partition IDs for the topic
    • Returns: Target partition ID (must be one of the values in partitions array)

Usage Example:

public class UserIdPartitioner<T extends UserEvent> extends FlinkKafkaPartitioner<T> {
    @Override
    public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        // Partition based on user ID to ensure events for same user go to same partition
        String userId = record.getUserId();
        if (userId == null) {
            return partitions[0]; // Default partition for null user IDs
        }
        
        int hash = userId.hashCode();
        return partitions[Math.abs(hash) % partitions.length];
    }
}

Built-in Partitioner Implementations

FlinkFixedPartitioner

Always routes messages to partition 0, useful for single-partition topics or when ordering across all messages is required.

public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
    public FlinkFixedPartitioner();
}

Usage Example:

// All messages go to partition 0
FlinkKafkaPartitioner<MyEvent> partitioner = new FlinkFixedPartitioner<>();

Use Cases:

  • Single-partition topics
  • Maintaining global ordering of all messages
  • Simple scenarios where partition-level parallelism is not needed
  • Testing and development with predictable partitioning

FlinkKafkaDelegatePartitioner

Delegates partitioning decisions to Kafka's built-in default partitioner, providing standard Kafka partitioning behavior.

public class FlinkKafkaDelegatePartitioner<T> extends FlinkKafkaPartitioner<T> {
    public FlinkKafkaDelegatePartitioner();
}

Usage Example:

// Use Kafka's default partitioning logic
FlinkKafkaPartitioner<MyEvent> partitioner = new FlinkKafkaDelegatePartitioner<>();

Kafka Default Partitioning Behavior:

  • If key is present: Hash-based partitioning using key
  • If key is null: Round-robin distribution across partitions
  • Ensures even distribution and good load balancing

KafkaPartitioner (Legacy)

Abstract partitioner interface compatible with older Kafka clients.

public abstract class KafkaPartitioner<T> implements Serializable {
    public abstract int partition(T record, byte[] key, byte[] value, int numPartitions);
}

This is maintained for backward compatibility with older connector versions.

Custom Partitioner Examples

Hash-Based Partitioner

Partition based on a specific field to ensure related records go to the same partition:

public class OrderPartitioner extends FlinkKafkaPartitioner<Order> {
    @Override
    public int partition(Order record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        // Partition by customer ID to keep all orders for a customer together
        String customerId = record.getCustomerId();
        if (customerId == null) {
            return partitions[0];
        }
        
        int hash = customerId.hashCode();
        return partitions[Math.abs(hash) % partitions.length];
    }
}

Time-Based Partitioner

Partition based on timestamp ranges for time-series data:

public class TimeBasedPartitioner extends FlinkKafkaPartitioner<TimestampedEvent> {
    private static final long HOUR_IN_MILLIS = 60 * 60 * 1000;
    
    @Override
    public int partition(TimestampedEvent record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        // Partition by hour to group events in time windows
        long timestamp = record.getTimestamp();
        long hourBucket = timestamp / HOUR_IN_MILLIS;
        
        return partitions[(int) (hourBucket % partitions.length)];
    }
}

Round-Robin Partitioner

Distribute messages evenly across partitions using round-robin:

public class RoundRobinPartitioner<T> extends FlinkKafkaPartitioner<T> {
    private int nextPartition = 0;
    
    @Override
    public void open(int parallelInstanceId, int parallelInstances) {
        // Start each parallel instance at a different offset to avoid contention
        this.nextPartition = parallelInstanceId;
    }
    
    @Override
    public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        int targetPartition = partitions[nextPartition % partitions.length];
        nextPartition++;
        return targetPartition;
    }
}

Load-Aware Partitioner

More sophisticated partitioner that considers partition load:

public class LoadAwarePartitioner<T> extends FlinkKafkaPartitioner<T> {
    private final Map<Integer, AtomicLong> partitionCounts = new ConcurrentHashMap<>();
    
    @Override
    public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        // Find partition with lowest message count
        int targetPartition = partitions[0];
        long minCount = partitionCounts.computeIfAbsent(targetPartition, k -> new AtomicLong(0)).get();
        
        for (int partition : partitions) {
            long count = partitionCounts.computeIfAbsent(partition, k -> new AtomicLong(0)).get();
            if (count < minCount) {
                minCount = count;
                targetPartition = partition;
            }
        }
        
        partitionCounts.get(targetPartition).incrementAndGet();
        return targetPartition;
    }
}

Partitioning Strategies and Trade-offs

Key-Based Partitioning

  • Pros: Maintains ordering per key, enables stateful processing
  • Cons: Potential hotspots with skewed keys, uneven distribution
  • Use Cases: User events, session data, transaction processing

Round-Robin Partitioning

  • Pros: Even distribution, good load balancing
  • Cons: No ordering guarantees, no key locality
  • Use Cases: Stateless processing, high-throughput scenarios

Fixed Partitioning

  • Pros: Simple, predictable, maintains global order
  • Cons: No parallelism, potential bottleneck
  • Use Cases: Single-partition topics, global ordering requirements

Time-Based Partitioning

  • Pros: Natural for time-series data, enables time-based processing
  • Cons: Potential hotspots during high activity periods
  • Use Cases: Event streams, metrics, logs

Best Practices

Partition Key Selection

Choose partition keys that provide good distribution:

// Good: User ID (assuming reasonable distribution)
int partition = Math.abs(userId.hashCode()) % partitions.length;

// Bad: Boolean flag (only 2 possible values)
int partition = record.isActive() ? 0 : 1;

// Good: Combination of fields for better distribution
String compositeKey = record.getRegion() + ":" + record.getUserId();
int partition = Math.abs(compositeKey.hashCode()) % partitions.length;

Error Handling

Always validate partition selection:

@Override
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
    if (partitions.length == 0) {
        throw new IllegalArgumentException("No partitions available for topic: " + targetTopic);
    }
    
    int selectedPartition = selectPartition(record, partitions);
    
    // Validate that selected partition is in the available partitions array
    boolean isValid = false;
    for (int partition : partitions) {
        if (partition == selectedPartition) {
            isValid = true;
            break;
        }
    }
    
    if (!isValid) {
        // Fall back to first available partition
        return partitions[0];
    }
    
    return selectedPartition;
}

Performance Considerations

  • Keep partitioning logic simple and fast (called for every record)
  • Avoid heavy computations or external calls in partition method
  • Consider caching expensive computations when possible
  • Be aware of memory usage in stateful partitioners

Testing Partitioners

Always test partitioning behavior:

@Test
public void testPartitionDistribution() {
    UserIdPartitioner<UserEvent> partitioner = new UserIdPartitioner<>();
    int[] partitions = {0, 1, 2, 3, 4};
    Map<Integer, Integer> distribution = new HashMap<>();
    
    // Test with various user IDs
    for (int i = 0; i < 1000; i++) {
        UserEvent event = new UserEvent("user" + i, "action");
        int partition = partitioner.partition(event, null, null, "test-topic", partitions);
        distribution.merge(partition, 1, Integer::sum);
    }
    
    // Verify reasonable distribution
    for (int count : distribution.values()) {
        assertTrue("Uneven distribution", count > 150 && count < 250);
    }
}