or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

consumer-base.mdindex.mdpartitioners.mdproducer-base.mdserialization.mdtable-api.md

partitioners.mddocs/

0

# Partitioners

1

2

Custom partitioning logic for determining target Kafka partitions when producing messages. Partitioners enable control over message distribution across partitions, affecting parallelism, ordering guarantees, and load balancing.

3

4

## Capabilities

5

6

### FlinkKafkaPartitioner

7

8

Abstract base class for implementing custom Kafka partitioning logic within Flink.

9

10

```java { .api }

11

public abstract class FlinkKafkaPartitioner<T> implements Serializable {

12

public void open(int parallelInstanceId, int parallelInstances);

13

public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);

14

}

15

```

16

17

**Methods:**

18

19

- `open()` - Initialize the partitioner for a specific parallel instance

20

- `parallelInstanceId` - 0-indexed ID of this parallel instance

21

- `parallelInstances` - Total number of parallel instances

22

23

- `partition()` - Determine target partition for a record

24

- `record` - Original record object being sent

25

- `key` - Serialized message key (may be null)

26

- `value` - Serialized message value

27

- `targetTopic` - Target topic name

28

- `partitions` - Array of available partition IDs for the topic

29

- **Returns:** Target partition ID (must be one of the values in `partitions` array)

30

31

**Usage Example:**

32

33

```java

34

public class UserIdPartitioner<T extends UserEvent> extends FlinkKafkaPartitioner<T> {

35

@Override

36

public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {

37

// Partition based on user ID to ensure events for same user go to same partition

38

String userId = record.getUserId();

39

if (userId == null) {

40

return partitions[0]; // Default partition for null user IDs

41

}

42

43

int hash = userId.hashCode();

44

return partitions[Math.abs(hash) % partitions.length];

45

}

46

}

47

```

48

49

### Built-in Partitioner Implementations

50

51

#### FlinkFixedPartitioner

52

53

Always routes messages to partition 0, useful for single-partition topics or when ordering across all messages is required.

54

55

```java { .api }

56

public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {

57

public FlinkFixedPartitioner();

58

}

59

```

60

61

**Usage Example:**

62

63

```java

64

// All messages go to partition 0

65

FlinkKafkaPartitioner<MyEvent> partitioner = new FlinkFixedPartitioner<>();

66

```

67

68

**Use Cases:**

69

- Single-partition topics

70

- Maintaining global ordering of all messages

71

- Simple scenarios where partition-level parallelism is not needed

72

- Testing and development with predictable partitioning

73

74

#### FlinkKafkaDelegatePartitioner

75

76

Delegates partitioning decisions to Kafka's built-in default partitioner, providing standard Kafka partitioning behavior.

77

78

```java { .api }

79

public class FlinkKafkaDelegatePartitioner<T> extends FlinkKafkaPartitioner<T> {

80

public FlinkKafkaDelegatePartitioner();

81

}

82

```

83

84

**Usage Example:**

85

86

```java

87

// Use Kafka's default partitioning logic

88

FlinkKafkaPartitioner<MyEvent> partitioner = new FlinkKafkaDelegatePartitioner<>();

89

```

90

91

**Kafka Default Partitioning Behavior:**

92

- If key is present: Hash-based partitioning using key

93

- If key is null: Round-robin distribution across partitions

94

- Ensures even distribution and good load balancing

95

96

### KafkaPartitioner (Legacy)

97

98

Abstract partitioner interface compatible with older Kafka clients.

99

100

```java { .api }

101

public abstract class KafkaPartitioner<T> implements Serializable {

102

public abstract int partition(T record, byte[] key, byte[] value, int numPartitions);

103

}

104

```

105

106

This is maintained for backward compatibility with older connector versions.

107

108

## Custom Partitioner Examples

109

110

### Hash-Based Partitioner

111

112

Partition based on a specific field to ensure related records go to the same partition:

113

114

```java

115

public class OrderPartitioner extends FlinkKafkaPartitioner<Order> {

116

@Override

117

public int partition(Order record, byte[] key, byte[] value, String targetTopic, int[] partitions) {

118

// Partition by customer ID to keep all orders for a customer together

119

String customerId = record.getCustomerId();

120

if (customerId == null) {

121

return partitions[0];

122

}

123

124

int hash = customerId.hashCode();

125

return partitions[Math.abs(hash) % partitions.length];

126

}

127

}

128

```

129

130

### Time-Based Partitioner

131

132

Partition based on timestamp ranges for time-series data:

133

134

```java

135

public class TimeBasedPartitioner extends FlinkKafkaPartitioner<TimestampedEvent> {

136

private static final long HOUR_IN_MILLIS = 60 * 60 * 1000;

137

138

@Override

139

public int partition(TimestampedEvent record, byte[] key, byte[] value, String targetTopic, int[] partitions) {

140

// Partition by hour to group events in time windows

141

long timestamp = record.getTimestamp();

142

long hourBucket = timestamp / HOUR_IN_MILLIS;

143

144

return partitions[(int) (hourBucket % partitions.length)];

145

}

146

}

147

```

148

149

### Round-Robin Partitioner

150

151

Distribute messages evenly across partitions using round-robin:

152

153

```java

154

public class RoundRobinPartitioner<T> extends FlinkKafkaPartitioner<T> {

155

private int nextPartition = 0;

156

157

@Override

158

public void open(int parallelInstanceId, int parallelInstances) {

159

// Start each parallel instance at a different offset to avoid contention

160

this.nextPartition = parallelInstanceId;

161

}

162

163

@Override

164

public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {

165

int targetPartition = partitions[nextPartition % partitions.length];

166

nextPartition++;

167

return targetPartition;

168

}

169

}

170

```

171

172

### Load-Aware Partitioner

173

174

More sophisticated partitioner that considers partition load:

175

176

```java

177

public class LoadAwarePartitioner<T> extends FlinkKafkaPartitioner<T> {

178

private final Map<Integer, AtomicLong> partitionCounts = new ConcurrentHashMap<>();

179

180

@Override

181

public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {

182

// Find partition with lowest message count

183

int targetPartition = partitions[0];

184

long minCount = partitionCounts.computeIfAbsent(targetPartition, k -> new AtomicLong(0)).get();

185

186

for (int partition : partitions) {

187

long count = partitionCounts.computeIfAbsent(partition, k -> new AtomicLong(0)).get();

188

if (count < minCount) {

189

minCount = count;

190

targetPartition = partition;

191

}

192

}

193

194

partitionCounts.get(targetPartition).incrementAndGet();

195

return targetPartition;

196

}

197

}

198

```

199

200

## Partitioning Strategies and Trade-offs

201

202

### Key-Based Partitioning

203

- **Pros:** Maintains ordering per key, enables stateful processing

204

- **Cons:** Potential hotspots with skewed keys, uneven distribution

205

- **Use Cases:** User events, session data, transaction processing

206

207

### Round-Robin Partitioning

208

- **Pros:** Even distribution, good load balancing

209

- **Cons:** No ordering guarantees, no key locality

210

- **Use Cases:** Stateless processing, high-throughput scenarios

211

212

### Fixed Partitioning

213

- **Pros:** Simple, predictable, maintains global order

214

- **Cons:** No parallelism, potential bottleneck

215

- **Use Cases:** Single-partition topics, global ordering requirements

216

217

### Time-Based Partitioning

218

- **Pros:** Natural for time-series data, enables time-based processing

219

- **Cons:** Potential hotspots during high activity periods

220

- **Use Cases:** Event streams, metrics, logs

221

222

## Best Practices

223

224

### Partition Key Selection

225

226

Choose partition keys that provide good distribution:

227

228

```java

229

// Good: User ID (assuming reasonable distribution)

230

int partition = Math.abs(userId.hashCode()) % partitions.length;

231

232

// Bad: Boolean flag (only 2 possible values)

233

int partition = record.isActive() ? 0 : 1;

234

235

// Good: Combination of fields for better distribution

236

String compositeKey = record.getRegion() + ":" + record.getUserId();

237

int partition = Math.abs(compositeKey.hashCode()) % partitions.length;

238

```

239

240

### Error Handling

241

242

Always validate partition selection:

243

244

```java

245

@Override

246

public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {

247

if (partitions.length == 0) {

248

throw new IllegalArgumentException("No partitions available for topic: " + targetTopic);

249

}

250

251

int selectedPartition = selectPartition(record, partitions);

252

253

// Validate that selected partition is in the available partitions array

254

boolean isValid = false;

255

for (int partition : partitions) {

256

if (partition == selectedPartition) {

257

isValid = true;

258

break;

259

}

260

}

261

262

if (!isValid) {

263

// Fall back to first available partition

264

return partitions[0];

265

}

266

267

return selectedPartition;

268

}

269

```

270

271

### Performance Considerations

272

273

- Keep partitioning logic simple and fast (called for every record)

274

- Avoid heavy computations or external calls in partition method

275

- Consider caching expensive computations when possible

276

- Be aware of memory usage in stateful partitioners

277

278

### Testing Partitioners

279

280

Always test partitioning behavior:

281

282

```java

283

@Test

284

public void testPartitionDistribution() {

285

UserIdPartitioner<UserEvent> partitioner = new UserIdPartitioner<>();

286

int[] partitions = {0, 1, 2, 3, 4};

287

Map<Integer, Integer> distribution = new HashMap<>();

288

289

// Test with various user IDs

290

for (int i = 0; i < 1000; i++) {

291

UserEvent event = new UserEvent("user" + i, "action");

292

int partition = partitioner.partition(event, null, null, "test-topic", partitions);

293

distribution.merge(partition, 1, Integer::sum);

294

}

295

296

// Verify reasonable distribution

297

for (int count : distribution.values()) {

298

assertTrue("Uneven distribution", count > 150 && count < 250);

299

}

300

}

301

```