or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconsumer.mddynamodb-streams.mdindex.mdpartitioning.mdproducer.mdserialization.mdtable-api.md

partitioning.mddocs/

0

# Partitioning Strategies

1

2

Flexible partitioning strategies for distributing data across Kinesis shards and mapping shards to Flink subtasks, enabling optimal load balancing and data distribution.

3

4

## Capabilities

5

6

### KinesisPartitioner

7

8

Abstract base class for implementing custom partitioning strategies that determine how records are distributed across Kinesis shards.

9

10

```java { .api }

11

@PublicEvolving

12

public abstract class KinesisPartitioner<T> implements Serializable {

13

14

/**

15

* Get the partition ID for a record.

16

*

17

* @param element Record to partition

18

* @return Partition ID string

19

*/

20

public abstract String getPartitionId(T element);

21

22

/**

23

* Get explicit hash key for fine-grained shard assignment.

24

*

25

* @param element Record to get hash key for

26

* @return Explicit hash key or null for automatic assignment

27

*/

28

public String getExplicitHashKey(T element) {

29

return null; // Default: use automatic hash key derivation

30

}

31

32

/**

33

* Initialize the partitioner with parallelism information.

34

*

35

* @param indexOfThisSubtask Index of current Flink subtask

36

* @param numberOfParallelSubtasks Total number of parallel subtasks

37

*/

38

public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks) {

39

// Default: no initialization needed

40

}

41

}

42

```

43

44

### FixedKinesisPartitioner

45

46

Ensures each Flink partition maps to the same Kinesis partition, providing deterministic routing based on subtask index.

47

48

```java { .api }

49

@PublicEvolving

50

public class FixedKinesisPartitioner<T> extends KinesisPartitioner<T> {

51

52

/**

53

* Initialize with subtask information.

54

*

55

* @param indexOfThisSubtask Index of current Flink subtask

56

* @param numberOfParallelSubtasks Total number of parallel subtasks

57

*/

58

@Override

59

public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks);

60

61

/**

62

* Get fixed partition ID based on subtask index.

63

*

64

* @param record Record (not used for partitioning decision)

65

* @return Fixed partition ID for this subtask

66

*/

67

@Override

68

public String getPartitionId(T record);

69

70

@Override

71

public boolean equals(Object o);

72

73

@Override

74

public int hashCode();

75

}

76

```

77

78

### RandomKinesisPartitioner

79

80

Maps elements to random partition IDs for even distribution across all available shards.

81

82

```java { .api }

83

@PublicEvolving

84

public class RandomKinesisPartitioner<T> extends KinesisPartitioner<T> {

85

86

/**

87

* Get random partition ID for even distribution.

88

*

89

* @param element Record (not used for partitioning decision)

90

* @return Random partition ID

91

*/

92

@Override

93

public String getPartitionId(T element);

94

95

@Override

96

public boolean equals(Object o);

97

98

@Override

99

public int hashCode();

100

}

101

```

102

103

### KinesisShardAssigner

104

105

Interface for mapping Kinesis shards to Flink subtask indices, controlling load balancing across consumers.

106

107

```java { .api }

108

@PublicEvolving

109

public interface KinesisShardAssigner extends Serializable {

110

111

/**

112

* Returns the index of the target subtask that a specific shard should be assigned to.

113

* If the returned index is out of range [0, numParallelSubtasks), a modulus operation will be applied.

114

*

115

* @param shard Kinesis shard to assign

116

* @param numParallelSubtasks Total number of parallel subtasks

117

* @return Target subtask index (0 to numParallelSubtasks - 1)

118

*/

119

int assign(StreamShardHandle shard, int numParallelSubtasks);

120

}

121

```

122

123

## Usage Examples

124

125

### Custom Business Logic Partitioner

126

127

```java

128

public class UserPartitioner extends KinesisPartitioner<UserEvent> {

129

130

@Override

131

public String getPartitionId(UserEvent element) {

132

// Partition by user ID to maintain ordering per user

133

return String.valueOf(element.getUserId());

134

}

135

136

@Override

137

public String getExplicitHashKey(UserEvent element) {

138

// Use explicit hash for finer control over shard assignment

139

return String.valueOf(element.getUserId());

140

}

141

}

142

143

// Usage with producer

144

FlinkKinesisProducer<UserEvent> producer = new FlinkKinesisProducer<>(schema, props);

145

producer.setCustomPartitioner(new UserPartitioner());

146

```

147

148

### Tenant-Based Partitioning

149

150

```java

151

public class TenantPartitioner extends KinesisPartitioner<TenantEvent> {

152

private int numberOfParallelSubtasks;

153

154

@Override

155

public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks) {

156

this.numberOfParallelSubtasks = numberOfParallelSubtasks;

157

}

158

159

@Override

160

public String getPartitionId(TenantEvent element) {

161

String tenantId = element.getTenantId();

162

163

// Ensure consistent routing for each tenant

164

int partition = Math.abs(tenantId.hashCode()) % numberOfParallelSubtasks;

165

return String.valueOf(partition);

166

}

167

168

@Override

169

public String getExplicitHashKey(TenantEvent element) {

170

// Use tenant ID as hash key for balanced distribution

171

return element.getTenantId();

172

}

173

}

174

```

175

176

### Geographic Partitioning

177

178

```java

179

public class GeographicPartitioner extends KinesisPartitioner<LocationEvent> {

180

private static final Map<String, String> REGION_PARTITIONS = Map.of(

181

"US_EAST", "0",

182

"US_WEST", "1",

183

"EU", "2",

184

"ASIA", "3"

185

);

186

187

@Override

188

public String getPartitionId(LocationEvent element) {

189

String region = element.getRegion();

190

return REGION_PARTITIONS.getOrDefault(region, "0");

191

}

192

193

@Override

194

public String getExplicitHashKey(LocationEvent element) {

195

// Sub-partition within region by city

196

return element.getRegion() + "_" + element.getCity();

197

}

198

}

199

```

200

201

### Time-Based Partitioning

202

203

```java

204

public class TimeBasedPartitioner extends KinesisPartitioner<TimeSeriesEvent> {

205

private static final int PARTITIONS_PER_HOUR = 4;

206

207

@Override

208

public String getPartitionId(TimeSeriesEvent element) {

209

long timestamp = element.getTimestamp();

210

211

// Partition by 15-minute intervals

212

long intervalIndex = (timestamp / (15 * 60 * 1000)) % PARTITIONS_PER_HOUR;

213

return String.valueOf(intervalIndex);

214

}

215

216

@Override

217

public String getExplicitHashKey(TimeSeriesEvent element) {

218

// Use timestamp for ordering within partition

219

return String.valueOf(element.getTimestamp());

220

}

221

}

222

```

223

224

### Load-Aware Partitioning

225

226

```java

227

public class LoadAwarePartitioner extends KinesisPartitioner<WeightedEvent> {

228

private int[] partitionWeights;

229

private int totalWeight;

230

231

@Override

232

public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks) {

233

// Initialize partition weights based on expected load

234

partitionWeights = new int[numberOfParallelSubtasks];

235

for (int i = 0; i < numberOfParallelSubtasks; i++) {

236

partitionWeights[i] = 100; // Default weight

237

}

238

totalWeight = Arrays.stream(partitionWeights).sum();

239

}

240

241

@Override

242

public String getPartitionId(WeightedEvent element) {

243

int eventWeight = element.getProcessingWeight();

244

int targetPartition = selectPartitionByWeight(eventWeight);

245

return String.valueOf(targetPartition);

246

}

247

248

private int selectPartitionByWeight(int eventWeight) {

249

int randomValue = ThreadLocalRandom.current().nextInt(totalWeight);

250

int currentWeight = 0;

251

252

for (int i = 0; i < partitionWeights.length; i++) {

253

currentWeight += partitionWeights[i];

254

if (randomValue < currentWeight) {

255

return i;

256

}

257

}

258

return 0; // Fallback

259

}

260

}

261

```

262

263

### Custom Shard Assignment

264

265

```java

266

public class BalancedShardAssigner implements KinesisShardAssigner {

267

268

@Override

269

public int assign(StreamShardHandle shard, int numParallelSubtasks) {

270

String shardId = shard.getShard().getShardId();

271

272

// Use consistent hashing for balanced assignment

273

return Math.abs(shardId.hashCode()) % numParallelSubtasks;

274

}

275

}

276

277

// Usage with consumer

278

FlinkKinesisConsumer<Event> consumer = new FlinkKinesisConsumer<>(stream, schema, props);

279

consumer.setShardAssigner(new BalancedShardAssigner());

280

```

281

282

### Priority-Based Shard Assignment

283

284

```java

285

public class PriorityShardAssigner implements KinesisShardAssigner {

286

private final Map<String, Integer> shardPriorities;

287

288

public PriorityShardAssigner(Map<String, Integer> shardPriorities) {

289

this.shardPriorities = shardPriorities;

290

}

291

292

@Override

293

public int assign(StreamShardHandle shard, int numParallelSubtasks) {

294

String shardId = shard.getShard().getShardId();

295

int priority = shardPriorities.getOrDefault(shardId, 0);

296

297

// Assign high-priority shards to specific subtasks

298

if (priority > 8) {

299

return 0; // Dedicated high-priority subtask

300

} else if (priority > 5) {

301

return 1; // Medium-priority subtask

302

} else {

303

// Distribute low-priority shards across remaining subtasks

304

return 2 + (Math.abs(shardId.hashCode()) % (numParallelSubtasks - 2));

305

}

306

}

307

}

308

```

309

310

### Stream-Aware Shard Assignment

311

312

```java

313

public class StreamAwareShardAssigner implements KinesisShardAssigner {

314

315

@Override

316

public int assign(StreamShardHandle shard, int numParallelSubtasks) {

317

String streamName = shard.getStreamName();

318

String shardId = shard.getShard().getShardId();

319

320

// Assign shards from different streams to different subtasks when possible

321

int streamHash = Math.abs(streamName.hashCode());

322

int shardHash = Math.abs(shardId.hashCode());

323

324

// Combine stream and shard information for assignment

325

return (streamHash + shardHash) % numParallelSubtasks;

326

}

327

}

328

```

329

330

## Advanced Partitioning Patterns

331

332

### Composite Key Partitioning

333

334

```java

335

public class CompositeKeyPartitioner extends KinesisPartitioner<OrderEvent> {

336

337

@Override

338

public String getPartitionId(OrderEvent element) {

339

// Composite key: customer_id + order_date

340

String customerId = element.getCustomerId();

341

String orderDate = element.getOrderDate().format(DateTimeFormatter.ISO_LOCAL_DATE);

342

return customerId + "_" + orderDate;

343

}

344

345

@Override

346

public String getExplicitHashKey(OrderEvent element) {

347

// Fine-grained hash based on full order ID

348

return element.getOrderId();

349

}

350

}

351

```

352

353

### Seasonal Partitioning

354

355

```java

356

public class SeasonalPartitioner extends KinesisPartitioner<SalesEvent> {

357

358

@Override

359

public String getPartitionId(SalesEvent element) {

360

LocalDateTime timestamp = element.getTimestamp();

361

362

// Adjust partitioning based on expected seasonal load

363

if (isHolidaySeason(timestamp)) {

364

// More partitions during high season

365

return String.valueOf(timestamp.getHour() % 8);

366

} else {

367

// Fewer partitions during low season

368

return String.valueOf(timestamp.getHour() % 4);

369

}

370

}

371

372

private boolean isHolidaySeason(LocalDateTime timestamp) {

373

int month = timestamp.getMonthValue();

374

return month == 11 || month == 12; // November and December

375

}

376

}

377

```

378

379

### Adaptive Partitioning

380

381

```java

382

public class AdaptivePartitioner extends KinesisPartitioner<MetricEvent> {

383

private volatile int currentPartitions = 4;

384

private final AtomicLong messageCount = new AtomicLong(0);

385

private volatile long lastAdjustment = System.currentTimeMillis();

386

387

@Override

388

public String getPartitionId(MetricEvent element) {

389

long count = messageCount.incrementAndGet();

390

391

// Adjust partition count based on throughput

392

if (count % 10000 == 0) {

393

adjustPartitionCount();

394

}

395

396

// Use hash-based assignment with current partition count

397

return String.valueOf(Math.abs(element.getMetricId().hashCode()) % currentPartitions);

398

}

399

400

private void adjustPartitionCount() {

401

long now = System.currentTimeMillis();

402

long elapsed = now - lastAdjustment;

403

404

if (elapsed > 60000) { // Adjust every minute

405

long currentRate = messageCount.get() * 1000 / elapsed;

406

407

if (currentRate > 100000 && currentPartitions < 16) {

408

currentPartitions *= 2; // Scale up

409

} else if (currentRate < 10000 && currentPartitions > 2) {

410

currentPartitions /= 2; // Scale down

411

}

412

413

lastAdjustment = now;

414

messageCount.set(0);

415

}

416

}

417

}

418

```

419

420

## Performance Considerations

421

422

### Partition Distribution

423

424

```java

425

public class DistributionAwarePartitioner extends KinesisPartitioner<Event> {

426

private final Map<String, AtomicLong> partitionCounts = new ConcurrentHashMap<>();

427

private volatile int preferredPartitions;

428

429

@Override

430

public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks) {

431

this.preferredPartitions = numberOfParallelSubtasks * 2; // 2x parallelism

432

}

433

434

@Override

435

public String getPartitionId(Event element) {

436

// Find least loaded partition

437

String leastLoadedPartition = findLeastLoadedPartition();

438

partitionCounts.computeIfAbsent(leastLoadedPartition, k -> new AtomicLong(0)).incrementAndGet();

439

return leastLoadedPartition;

440

}

441

442

private String findLeastLoadedPartition() {

443

long minCount = Long.MAX_VALUE;

444

String minPartition = "0";

445

446

for (int i = 0; i < preferredPartitions; i++) {

447

String partition = String.valueOf(i);

448

long count = partitionCounts.computeIfAbsent(partition, k -> new AtomicLong(0)).get();

449

if (count < minCount) {

450

minCount = count;

451

minPartition = partition;

452

}

453

}

454

455

return minPartition;

456

}

457

}

458

```

459

460

## Best Practices

461

462

1. **Key Selection**: Choose partition keys that provide good distribution and maintain ordering requirements

463

2. **Hot Partitions**: Avoid keys that create hot partitions (e.g., timestamp-only keys)

464

3. **Shard Limits**: Consider Kinesis shard limits (1000 records/second or 1MB/second per shard)

465

4. **Consistency**: Use consistent partitioning to maintain message ordering when required

466

5. **Monitoring**: Monitor partition distribution and shard utilization metrics

467

6. **Evolution**: Design partitioning strategies that can evolve with changing data patterns