or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

binary-operations.mddata-transformations.mdexecution-environment.mdgrouping-aggregation.mdindex.mdinput-output.mdpartitioning-distribution.mdtype-system.mdutilities.md

partitioning-distribution.mddocs/

0

# Partitioning and Distribution

1

2

Control over data distribution and partitioning strategies across the cluster. These operations determine how data is distributed among parallel processing instances.

3

4

## Capabilities

5

6

### Hash Partitioning

7

8

Distribute data using hash-based partitioning to ensure even distribution.

9

10

```scala { .api }

11

class DataSet[T] {

12

/**

13

* Partitions data by hash of specified fields

14

* @param fields Field positions to use for hashing

15

* @return DataSet with hash partitioning

16

*/

17

def partitionByHash(fields: Int*): DataSet[T]

18

19

/**

20

* Partitions data by hash of named fields

21

* @param firstField First field name

22

* @param otherFields Additional field names

23

* @return DataSet with hash partitioning

24

*/

25

def partitionByHash(firstField: String, otherFields: String*): DataSet[T]

26

27

/**

28

* Partitions data by hash of key selector result

29

* @param fun Key selector function

30

* @return DataSet with hash partitioning based on key

31

*/

32

def partitionByHash[K: TypeInformation](fun: T => K): DataSet[T]

33

}

34

```

35

36

**Usage Examples:**

37

38

```scala

39

import org.apache.flink.api.scala._

40

41

case class Customer(id: Int, name: String, region: String)

42

43

val env = ExecutionEnvironment.getExecutionEnvironment

44

val customers = env.fromElements(

45

Customer(1, "Alice", "North"),

46

Customer(2, "Bob", "South"),

47

Customer(3, "Charlie", "North"),

48

Customer(4, "Diana", "West")

49

)

50

51

// Partition by customer ID field

52

val partitionedById = customers.partitionByHash("id")

53

54

// Partition by multiple fields

55

val partitionedByRegionAndId = customers.partitionByHash("region", "id")

56

57

// Partition using key selector function

58

val partitionedByRegion = customers.partitionByHash(_.region)

59

```

60

61

### Range Partitioning

62

63

Distribute data using range-based partitioning for ordered distribution.

64

65

```scala { .api }

66

class DataSet[T] {

67

/**

68

* Partitions data by range of specified fields

69

* @param fields Field positions for range partitioning

70

* @return DataSet with range partitioning

71

*/

72

def partitionByRange(fields: Int*): DataSet[T]

73

74

/**

75

* Partitions data by range of named fields

76

* @param firstField First field name

77

* @param otherFields Additional field names

78

* @return DataSet with range partitioning

79

*/

80

def partitionByRange(firstField: String, otherFields: String*): DataSet[T]

81

82

/**

83

* Partitions data by range of key selector result

84

* @param fun Key selector function

85

* @return DataSet with range partitioning based on key

86

*/

87

def partitionByRange[K: TypeInformation](fun: T => K): DataSet[T]

88

}

89

```

90

91

### Range Partitioning with Data Distribution

92

93

Advanced range partitioning using custom data distribution.

94

95

```scala { .api }

96

implicit class DataSetUtils[T](dataSet: DataSet[T]) {

97

/**

98

* Partitions by range using custom data distribution

99

* @param distribution Custom data distribution

100

* @param fields Field positions for partitioning

101

* @return DataSet with custom range partitioning

102

*/

103

def partitionByRange(distribution: DataDistribution, fields: Int*): DataSet[T]

104

105

/**

106

* Partitions by range using custom data distribution and field names

107

* @param distribution Custom data distribution

108

* @param firstField First field name

109

* @param otherFields Additional field names

110

* @return DataSet with custom range partitioning

111

*/

112

def partitionByRange(distribution: DataDistribution, firstField: String, otherFields: String*): DataSet[T]

113

114

/**

115

* Partitions by range using custom data distribution and key selector

116

* @param distribution Custom data distribution

117

* @param fun Key selector function

118

* @return DataSet with custom range partitioning

119

*/

120

def partitionByRange[K: TypeInformation](distribution: DataDistribution, fun: T => K): DataSet[T]

121

}

122

```

123

124

**Usage Examples:**

125

126

```scala

127

// Range partition by age for ordered processing

128

case class Person(name: String, age: Int, salary: Double)

129

130

val people = env.fromElements(

131

Person("Alice", 25, 50000),

132

Person("Bob", 30, 60000),

133

Person("Charlie", 35, 70000)

134

)

135

136

// Simple range partitioning

137

val rangePartitioned = people.partitionByRange(_.age)

138

139

// Range partitioning with multiple fields

140

val rangeByAgeAndSalary = people.partitionByRange(p => (p.age, p.salary))

141

```

142

143

### Custom Partitioning

144

145

Use custom partitioning logic for specialized distribution requirements.

146

147

```scala { .api }

148

class DataSet[T] {

149

/**

150

* Partitions using custom partitioner on specified field

151

* @param partitioner Custom partitioner implementation

152

* @param field Field position for partitioning

153

* @return DataSet with custom partitioning

154

*/

155

def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: Int): DataSet[T]

156

157

/**

158

* Partitions using custom partitioner on named field

159

* @param partitioner Custom partitioner implementation

160

* @param field Field name for partitioning

161

* @return DataSet with custom partitioning

162

*/

163

def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: String): DataSet[T]

164

165

/**

166

* Partitions using custom partitioner on key selector result

167

* @param partitioner Custom partitioner implementation

168

* @param fun Key selector function

169

* @return DataSet with custom partitioning

170

*/

171

def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataSet[T]

172

}

173

```

174

175

**Usage Examples:**

176

177

```scala

178

import org.apache.flink.api.common.functions.Partitioner

179

180

// Custom partitioner that groups regions together

181

class RegionPartitioner extends Partitioner[String] {

182

override def partition(key: String, numPartitions: Int): Int = {

183

key match {

184

case "North" | "South" => 0 % numPartitions

185

case "East" | "West" => 1 % numPartitions

186

case _ => 2 % numPartitions

187

}

188

}

189

}

190

191

val customPartitioned = customers.partitionCustom(new RegionPartitioner, _.region)

192

```

193

194

### Load Balancing

195

196

Rebalance data across all parallel instances for even load distribution.

197

198

```scala { .api }

199

class DataSet[T] {

200

/**

201

* Rebalances data across all parallel instances using round-robin

202

* @return DataSet with rebalanced distribution

203

*/

204

def rebalance(): DataSet[T]

205

}

206

```

207

208

**Usage Examples:**

209

210

```scala

211

// Rebalance after filtering to ensure even distribution

212

val filteredAndRebalanced = customers

213

.filter(_.region == "North")

214

.rebalance()

215

.map(processCustomer)

216

```

217

218

### Partition Sorting

219

220

Sort data within each partition for optimized processing.

221

222

```scala { .api }

223

class DataSet[T] {

224

/**

225

* Sorts elements within each partition by field position

226

* @param field Field position for sorting

227

* @param order Sort order (ASCENDING or DESCENDING)

228

* @return DataSet with sorted partitions

229

*/

230

def sortPartition(field: Int, order: Order): DataSet[T]

231

232

/**

233

* Sorts elements within each partition by field name

234

* @param field Field name for sorting

235

* @param order Sort order (ASCENDING or DESCENDING)

236

* @return DataSet with sorted partitions

237

*/

238

def sortPartition(field: String, order: Order): DataSet[T]

239

240

/**

241

* Sorts elements within each partition using key selector

242

* @param fun Key selector function for sorting

243

* @param order Sort order (ASCENDING or DESCENDING)

244

* @return DataSet with sorted partitions

245

*/

246

def sortPartition[K: TypeInformation](fun: T => K, order: Order): DataSet[T]

247

}

248

```

249

250

### Multi-Key Partition Sorting

251

252

Chain multiple sorting keys for complex partition sorting.

253

254

```scala { .api }

255

class PartitionSortedDataSet[T] extends DataSet[T] {

256

/**

257

* Adds secondary sort key by field position

258

* @param field Field position for secondary sorting

259

* @param order Sort order for secondary key

260

* @return DataSet with multi-key sorted partitions

261

*/

262

def sortPartition(field: Int, order: Order): DataSet[T]

263

264

/**

265

* Adds secondary sort key by field name

266

* @param field Field name for secondary sorting

267

* @param order Sort order for secondary key

268

* @return DataSet with multi-key sorted partitions

269

*/

270

def sortPartition(field: String, order: Order): DataSet[T]

271

272

/**

273

* Adds secondary sort key using key selector

274

* @param fun Key selector function for secondary sorting

275

* @param order Sort order for secondary key

276

* @return DataSet with multi-key sorted partitions

277

*/

278

def sortPartition[K: TypeInformation](fun: T => K, order: Order): DataSet[T]

279

}

280

```

281

282

**Usage Examples:**

283

284

```scala

285

import org.apache.flink.api.common.operators.Order

286

287

// Sort within partitions by age, then by salary

288

val sortedPartitions = people

289

.partitionByHash(_.region)

290

.sortPartition(_.age, Order.ASCENDING)

291

.sortPartition(_.salary, Order.DESCENDING)

292

293

// Sort by multiple fields using field names

294

val sortedByFields = people

295

.sortPartition("age", Order.ASCENDING)

296

.sortPartition("salary", Order.DESCENDING)

297

```

298

299

### Grouped Partitioning

300

301

Control partitioning for grouped DataSets to optimize group processing.

302

303

```scala { .api }

304

class GroupedDataSet[T] {

305

/**

306

* Uses custom partitioner for group distribution

307

* @param partitioner Custom partitioner for group keys

308

* @return GroupedDataSet with custom partitioning

309

*/

310

def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): GroupedDataSet[T]

311

}

312

```

313

314

### Binary Operation Partitioning

315

316

Control partitioning for join, cross, and coGroup operations.

317

318

```scala { .api }

319

trait JoinFunctionAssigner[L, R] {

320

/**

321

* Uses custom partitioner for join distribution

322

* @param partitioner Custom partitioner for join keys

323

* @return JoinFunctionAssigner with custom partitioning

324

*/

325

def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): JoinFunctionAssigner[L, R]

326

}

327

328

class CoGroupDataSet[L, R] {

329

/**

330

* Uses custom partitioner for coGroup distribution

331

* @param partitioner Custom partitioner for coGroup keys

332

* @return CoGroupDataSet with custom partitioning

333

*/

334

def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): CoGroupDataSet[L, R]

335

}

336

```

337

338

**Usage Examples:**

339

340

```scala

341

// Custom partitioning for joins

342

val joinResult = leftDataSet

343

.join(rightDataSet)

344

.where(_.key)

345

.equalTo(_.key)

346

.withPartitioner(new CustomKeyPartitioner)

347

.apply((left, right) => combineData(left, right))

348

```

349

350

### Distribution Strategies

351

352

Hints for optimizing data distribution in binary operations.

353

354

```scala { .api }

355

class DataSet[T] {

356

/**

357

* Hints that this DataSet is small for broadcast operations

358

* @param other Large DataSet to join with

359

* @return Join operation optimized for broadcasting this DataSet

360

*/

361

def joinWithHuge[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]

362

363

/**

364

* Hints that other DataSet is small for broadcast operations

365

* @param other Small DataSet to broadcast

366

* @return Join operation optimized for broadcasting other DataSet

367

*/

368

def joinWithTiny[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]

369

370

/**

371

* Cross with broadcasting hint for small DataSet

372

* @param other Small DataSet to broadcast

373

* @return Cross operation with broadcast optimization

374

*/

375

def crossWithTiny[O](other: DataSet[O]): CrossDataSet[T, O]

376

377

/**

378

* Cross with broadcasting hint for large DataSet

379

* @param other Large DataSet (this will be broadcast)

380

* @return Cross operation with broadcast optimization

381

*/

382

def crossWithHuge[O](other: DataSet[O]): CrossDataSet[T, O]

383

}

384

```

385

386

### Parallelism Control

387

388

Set parallelism at the operation level to control resource usage.

389

390

```scala { .api }

391

class DataSet[T] {

392

/**

393

* Sets parallelism for this operation

394

* @param parallelism Degree of parallelism

395

* @return DataSet with specified parallelism

396

*/

397

def setParallelism(parallelism: Int): DataSet[T]

398

399

/**

400

* Gets the parallelism of this operation

401

* @return Current parallelism setting

402

*/

403

def getParallelism: Int

404

}

405

```

406

407

**Usage Examples:**

408

409

```scala

410

// Set different parallelism for expensive operations

411

val result = data

412

.setParallelism(8) // Use 8 parallel instances

413

.map(expensiveTransformation)

414

.setParallelism(4) // Reduce to 4 for subsequent operations

415

.reduce(combineResults)

416

```

417

418

### Resource Requirements

419

420

Specify minimum and preferred resource requirements for operations.

421

422

```scala { .api }

423

class DataSet[T] {

424

/**

425

* Gets minimum resource requirements

426

* @return ResourceSpec with minimum requirements

427

*/

428

def minResources: ResourceSpec

429

430

/**

431

* Gets preferred resource requirements

432

* @return ResourceSpec with preferred requirements

433

*/

434

def preferredResources: ResourceSpec

435

}

436

```

437

438

## Types

439

440

```scala { .api }

441

abstract class Partitioner[T] {

442

/**

443

* Determines partition for given key

444

* @param key Key to partition

445

* @param numPartitions Total number of partitions

446

* @return Partition index (0 to numPartitions-1)

447

*/

448

def partition(key: T, numPartitions: Int): Int

449

}

450

451

sealed trait Order

452

object Order {

453

case object ASCENDING extends Order

454

case object DESCENDING extends Order

455

}

456

457

class PartitionSortedDataSet[T] extends DataSet[T] {

458

// Represents a DataSet with sorted partitions that allows chaining additional sort keys

459

}

460

461

trait DataDistribution {

462

/**

463

* Gets bucket boundaries for range partitioning

464

* @return Array of bucket boundaries

465

*/

466

def getBucketBoundary(bucketNum: Int, totalNumBuckets: Int): AnyRef

467

}

468

469

class ResourceSpec {

470

/**

471

* Gets CPU cores requirement

472

* @return Number of CPU cores

473

*/

474

def getCpuCores: Double

475

476

/**

477

* Gets heap memory requirement in MB

478

* @return Heap memory in megabytes

479

*/

480

def getHeapMemoryInMB: Int

481

482

/**

483

* Gets direct memory requirement in MB

484

* @return Direct memory in megabytes

485

*/

486

def getDirectMemoryInMB: Int

487

488

/**

489

* Gets native memory requirement in MB

490

* @return Native memory in megabytes

491

*/

492

def getNativeMemoryInMB: Int

493

494

/**

495

* Gets network memory requirement in MB

496

* @return Network memory in megabytes

497

*/

498

def getNetworkMemoryInMB: Int

499

}

500

501

object ResourceSpec {

502

/**

503

* Creates ResourceSpec with default values

504

* @return Default ResourceSpec

505

*/

506

def DEFAULT: ResourceSpec

507

508

/**

509

* Creates ResourceSpec with unknown requirements

510

* @return Unknown ResourceSpec

511

*/

512

def UNKNOWN: ResourceSpec

513

514

/**

515

* Creates new ResourceSpec builder

516

* @return ResourceSpec builder

517

*/

518

def newBuilder(): ResourceSpec.Builder

519

}

520

```