Flexible partitioning strategies for distributing data across Kinesis shards and mapping shards to Flink subtasks, enabling optimal load balancing and data distribution.
Abstract base class for implementing custom partitioning strategies that determine how records are distributed across Kinesis shards.
@PublicEvolving
public abstract class KinesisPartitioner<T> implements Serializable {
/**
* Get the partition ID for a record.
*
* @param element Record to partition
* @return Partition ID string
*/
public abstract String getPartitionId(T element);
/**
* Get explicit hash key for fine-grained shard assignment.
*
* @param element Record to get hash key for
* @return Explicit hash key or null for automatic assignment
*/
public String getExplicitHashKey(T element) {
return null; // Default: use automatic hash key derivation
}
/**
* Initialize the partitioner with parallelism information.
*
* @param indexOfThisSubtask Index of current Flink subtask
* @param numberOfParallelSubtasks Total number of parallel subtasks
*/
public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks) {
// Default: no initialization needed
}
}Ensures each Flink partition maps to the same Kinesis partition, providing deterministic routing based on subtask index.
@PublicEvolving
public class FixedKinesisPartitioner<T> extends KinesisPartitioner<T> {
/**
* Initialize with subtask information.
*
* @param indexOfThisSubtask Index of current Flink subtask
* @param numberOfParallelSubtasks Total number of parallel subtasks
*/
@Override
public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks);
/**
* Get fixed partition ID based on subtask index.
*
* @param record Record (not used for partitioning decision)
* @return Fixed partition ID for this subtask
*/
@Override
public String getPartitionId(T record);
@Override
public boolean equals(Object o);
@Override
public int hashCode();
}Maps elements to random partition IDs for even distribution across all available shards.
@PublicEvolving
public class RandomKinesisPartitioner<T> extends KinesisPartitioner<T> {
/**
* Get random partition ID for even distribution.
*
* @param element Record (not used for partitioning decision)
* @return Random partition ID
*/
@Override
public String getPartitionId(T element);
@Override
public boolean equals(Object o);
@Override
public int hashCode();
}Interface for mapping Kinesis shards to Flink subtask indices, controlling load balancing across consumers.
@PublicEvolving
public interface KinesisShardAssigner extends Serializable {
/**
* Returns the index of the target subtask that a specific shard should be assigned to.
* If the returned index is out of range [0, numParallelSubtasks), a modulus operation will be applied.
*
* @param shard Kinesis shard to assign
* @param numParallelSubtasks Total number of parallel subtasks
* @return Target subtask index (0 to numParallelSubtasks - 1)
*/
int assign(StreamShardHandle shard, int numParallelSubtasks);
}public class UserPartitioner extends KinesisPartitioner<UserEvent> {
@Override
public String getPartitionId(UserEvent element) {
// Partition by user ID to maintain ordering per user
return String.valueOf(element.getUserId());
}
@Override
public String getExplicitHashKey(UserEvent element) {
// Use explicit hash for finer control over shard assignment
return String.valueOf(element.getUserId());
}
}
// Usage with producer
FlinkKinesisProducer<UserEvent> producer = new FlinkKinesisProducer<>(schema, props);
producer.setCustomPartitioner(new UserPartitioner());public class TenantPartitioner extends KinesisPartitioner<TenantEvent> {
private int numberOfParallelSubtasks;
@Override
public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks) {
this.numberOfParallelSubtasks = numberOfParallelSubtasks;
}
@Override
public String getPartitionId(TenantEvent element) {
String tenantId = element.getTenantId();
// Ensure consistent routing for each tenant
int partition = Math.abs(tenantId.hashCode()) % numberOfParallelSubtasks;
return String.valueOf(partition);
}
@Override
public String getExplicitHashKey(TenantEvent element) {
// Use tenant ID as hash key for balanced distribution
return element.getTenantId();
}
}public class GeographicPartitioner extends KinesisPartitioner<LocationEvent> {
private static final Map<String, String> REGION_PARTITIONS = Map.of(
"US_EAST", "0",
"US_WEST", "1",
"EU", "2",
"ASIA", "3"
);
@Override
public String getPartitionId(LocationEvent element) {
String region = element.getRegion();
return REGION_PARTITIONS.getOrDefault(region, "0");
}
@Override
public String getExplicitHashKey(LocationEvent element) {
// Sub-partition within region by city
return element.getRegion() + "_" + element.getCity();
}
}public class TimeBasedPartitioner extends KinesisPartitioner<TimeSeriesEvent> {
private static final int PARTITIONS_PER_HOUR = 4;
@Override
public String getPartitionId(TimeSeriesEvent element) {
long timestamp = element.getTimestamp();
// Partition by 15-minute intervals
long intervalIndex = (timestamp / (15 * 60 * 1000)) % PARTITIONS_PER_HOUR;
return String.valueOf(intervalIndex);
}
@Override
public String getExplicitHashKey(TimeSeriesEvent element) {
// Use timestamp for ordering within partition
return String.valueOf(element.getTimestamp());
}
}public class LoadAwarePartitioner extends KinesisPartitioner<WeightedEvent> {
private int[] partitionWeights;
private int totalWeight;
@Override
public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks) {
// Initialize partition weights based on expected load
partitionWeights = new int[numberOfParallelSubtasks];
for (int i = 0; i < numberOfParallelSubtasks; i++) {
partitionWeights[i] = 100; // Default weight
}
totalWeight = Arrays.stream(partitionWeights).sum();
}
@Override
public String getPartitionId(WeightedEvent element) {
int eventWeight = element.getProcessingWeight();
int targetPartition = selectPartitionByWeight(eventWeight);
return String.valueOf(targetPartition);
}
private int selectPartitionByWeight(int eventWeight) {
int randomValue = ThreadLocalRandom.current().nextInt(totalWeight);
int currentWeight = 0;
for (int i = 0; i < partitionWeights.length; i++) {
currentWeight += partitionWeights[i];
if (randomValue < currentWeight) {
return i;
}
}
return 0; // Fallback
}
}public class BalancedShardAssigner implements KinesisShardAssigner {
@Override
public int assign(StreamShardHandle shard, int numParallelSubtasks) {
String shardId = shard.getShard().getShardId();
// Use consistent hashing for balanced assignment
return Math.abs(shardId.hashCode()) % numParallelSubtasks;
}
}
// Usage with consumer
FlinkKinesisConsumer<Event> consumer = new FlinkKinesisConsumer<>(stream, schema, props);
consumer.setShardAssigner(new BalancedShardAssigner());public class PriorityShardAssigner implements KinesisShardAssigner {
private final Map<String, Integer> shardPriorities;
public PriorityShardAssigner(Map<String, Integer> shardPriorities) {
this.shardPriorities = shardPriorities;
}
@Override
public int assign(StreamShardHandle shard, int numParallelSubtasks) {
String shardId = shard.getShard().getShardId();
int priority = shardPriorities.getOrDefault(shardId, 0);
// Assign high-priority shards to specific subtasks
if (priority > 8) {
return 0; // Dedicated high-priority subtask
} else if (priority > 5) {
return 1; // Medium-priority subtask
} else {
// Distribute low-priority shards across remaining subtasks
return 2 + (Math.abs(shardId.hashCode()) % (numParallelSubtasks - 2));
}
}
}public class StreamAwareShardAssigner implements KinesisShardAssigner {
@Override
public int assign(StreamShardHandle shard, int numParallelSubtasks) {
String streamName = shard.getStreamName();
String shardId = shard.getShard().getShardId();
// Assign shards from different streams to different subtasks when possible
int streamHash = Math.abs(streamName.hashCode());
int shardHash = Math.abs(shardId.hashCode());
// Combine stream and shard information for assignment
return (streamHash + shardHash) % numParallelSubtasks;
}
}public class CompositeKeyPartitioner extends KinesisPartitioner<OrderEvent> {
@Override
public String getPartitionId(OrderEvent element) {
// Composite key: customer_id + order_date
String customerId = element.getCustomerId();
String orderDate = element.getOrderDate().format(DateTimeFormatter.ISO_LOCAL_DATE);
return customerId + "_" + orderDate;
}
@Override
public String getExplicitHashKey(OrderEvent element) {
// Fine-grained hash based on full order ID
return element.getOrderId();
}
}public class SeasonalPartitioner extends KinesisPartitioner<SalesEvent> {
@Override
public String getPartitionId(SalesEvent element) {
LocalDateTime timestamp = element.getTimestamp();
// Adjust partitioning based on expected seasonal load
if (isHolidaySeason(timestamp)) {
// More partitions during high season
return String.valueOf(timestamp.getHour() % 8);
} else {
// Fewer partitions during low season
return String.valueOf(timestamp.getHour() % 4);
}
}
private boolean isHolidaySeason(LocalDateTime timestamp) {
int month = timestamp.getMonthValue();
return month == 11 || month == 12; // November and December
}
}public class AdaptivePartitioner extends KinesisPartitioner<MetricEvent> {
private volatile int currentPartitions = 4;
private final AtomicLong messageCount = new AtomicLong(0);
private volatile long lastAdjustment = System.currentTimeMillis();
@Override
public String getPartitionId(MetricEvent element) {
long count = messageCount.incrementAndGet();
// Adjust partition count based on throughput
if (count % 10000 == 0) {
adjustPartitionCount();
}
// Use hash-based assignment with current partition count
return String.valueOf(Math.abs(element.getMetricId().hashCode()) % currentPartitions);
}
private void adjustPartitionCount() {
long now = System.currentTimeMillis();
long elapsed = now - lastAdjustment;
if (elapsed > 60000) { // Adjust every minute
long currentRate = messageCount.get() * 1000 / elapsed;
if (currentRate > 100000 && currentPartitions < 16) {
currentPartitions *= 2; // Scale up
} else if (currentRate < 10000 && currentPartitions > 2) {
currentPartitions /= 2; // Scale down
}
lastAdjustment = now;
messageCount.set(0);
}
}
}public class DistributionAwarePartitioner extends KinesisPartitioner<Event> {
private final Map<String, AtomicLong> partitionCounts = new ConcurrentHashMap<>();
private volatile int preferredPartitions;
@Override
public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks) {
this.preferredPartitions = numberOfParallelSubtasks * 2; // 2x parallelism
}
@Override
public String getPartitionId(Event element) {
// Find least loaded partition
String leastLoadedPartition = findLeastLoadedPartition();
partitionCounts.computeIfAbsent(leastLoadedPartition, k -> new AtomicLong(0)).incrementAndGet();
return leastLoadedPartition;
}
private String findLeastLoadedPartition() {
long minCount = Long.MAX_VALUE;
String minPartition = "0";
for (int i = 0; i < preferredPartitions; i++) {
String partition = String.valueOf(i);
long count = partitionCounts.computeIfAbsent(partition, k -> new AtomicLong(0)).get();
if (count < minCount) {
minCount = count;
minPartition = partition;
}
}
return minPartition;
}
}