or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mddata-sources.mdfunction-interfaces.mdindex.mdkeyed-streams.mdoutput-operations.mdscala-extensions.mdstream-composition.mdstream-environment.mdstream-partitioning.mdstream-transformations.mdwindow-operations.mdwindowing.md

stream-partitioning.mddocs/

0

# Stream Partitioning and Distribution

1

2

Stream partitioning controls how data is distributed across parallel operators in Flink streaming applications. Proper partitioning strategies are crucial for load balancing, performance, and ensuring correct results in parallel processing.

3

4

## Key-Based Partitioning

5

6

### KeyBy Operations

7

8

```scala { .api }

9

class DataStream[T] {

10

def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K]

11

def keyBy(fields: Int*): KeyedStream[T, _]

12

def keyBy(firstField: String, otherFields: String*): KeyedStream[T, _]

13

}

14

```

15

16

Partition elements by key for stateful processing:

17

18

```scala

19

import org.apache.flink.streaming.api.scala._

20

21

val env = StreamExecutionEnvironment.getExecutionEnvironment

22

23

// Key by function

24

case class User(id: String, name: String, age: Int)

25

val users = env.fromElements(

26

User("1", "Alice", 25),

27

User("2", "Bob", 30),

28

User("1", "Alice", 26)

29

)

30

31

val keyedByUserId = users.keyBy(_.id)

32

33

// Key by field position (for tuples)

34

val salesData = env.fromElements(("ProductA", 100), ("ProductB", 200), ("ProductA", 150))

35

val keyedBySalesPosition = salesData.keyBy(0) // Key by product name

36

37

// Key by field name

38

val keyedByUserField = users.keyBy("id")

39

40

// Key by multiple fields

41

val keyedByMultipleFields = users.keyBy("id", "age")

42

```

43

44

## Built-in Partitioning Strategies

45

46

### Broadcast Partitioning

47

48

```scala { .api }

49

class DataStream[T] {

50

def broadcast: DataStream[T]

51

}

52

```

53

54

Send each element to all downstream operators:

55

56

```scala

57

val env = StreamExecutionEnvironment.getExecutionEnvironment

58

val configData = env.fromElements("config1", "config2", "config3")

59

60

// Broadcast configuration to all parallel instances

61

val broadcastedConfig = configData.broadcast

62

63

broadcastedConfig

64

.map(config => s"Processing with config: $config")

65

.setParallelism(4) // All 4 parallel instances receive all elements

66

.print()

67

```

68

69

### Global Partitioning

70

71

```scala { .api }

72

class DataStream[T] {

73

def global: DataStream[T]

74

}

75

```

76

77

Send all elements to the first downstream operator instance:

78

79

```scala

80

val env = StreamExecutionEnvironment.getExecutionEnvironment

81

val stream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

82

83

// Send all elements to first instance (parallelism effectively becomes 1)

84

val globalStream = stream.global

85

.map(x => s"Processed on single instance: $x")

86

.print()

87

```

88

89

### Shuffle Partitioning

90

91

```scala { .api }

92

class DataStream[T] {

93

def shuffle: DataStream[T]

94

}

95

```

96

97

Randomly distribute elements across downstream operators:

98

99

```scala

100

val env = StreamExecutionEnvironment.getExecutionEnvironment

101

val stream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

102

103

// Random distribution for load balancing

104

val shuffledStream = stream.shuffle

105

.map(x => s"Randomly assigned: $x")

106

.setParallelism(4)

107

.print()

108

```

109

110

### Rebalance Partitioning

111

112

```scala { .api }

113

class DataStream[T] {

114

def rebalance: DataStream[T]

115

}

116

```

117

118

Distribute elements evenly across downstream operators using round-robin:

119

120

```scala

121

val env = StreamExecutionEnvironment.getExecutionEnvironment

122

val stream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

123

124

// Even distribution using round-robin

125

val rebalancedStream = stream.rebalance

126

.map(x => s"Evenly distributed: $x")

127

.setParallelism(4)

128

.print()

129

```

130

131

### Rescale Partitioning

132

133

```scala { .api }

134

class DataStream[T] {

135

def rescale: DataStream[T]

136

}

137

```

138

139

Distribute elements to a subset of downstream operators:

140

141

```scala

142

val env = StreamExecutionEnvironment.getExecutionEnvironment

143

val stream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

144

.setParallelism(2)

145

146

// Rescale to subset of downstream operators

147

val rescaledStream = stream.rescale

148

.map(x => s"Rescaled: $x")

149

.setParallelism(6) // Each upstream instance sends to 3 downstream instances

150

.print()

151

```

152

153

### Forward Partitioning

154

155

```scala { .api }

156

class DataStream[T] {

157

def forward: DataStream[T]

158

}

159

```

160

161

Forward elements to local downstream operators (same machine):

162

163

```scala

164

val env = StreamExecutionEnvironment.getExecutionEnvironment

165

val stream = env.fromElements(1, 2, 3, 4, 5)

166

167

// Forward to collocated downstream operators

168

val forwardedStream = stream.forward

169

.map(x => s"Locally forwarded: $x")

170

.print()

171

```

172

173

## Custom Partitioning

174

175

### Custom Partitioner

176

177

```scala { .api }

178

class DataStream[T] {

179

def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: Int): DataStream[T]

180

def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: String): DataStream[T]

181

def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataStream[T]

182

}

183

```

184

185

Implement custom partitioning logic:

186

187

```scala

188

import org.apache.flink.api.common.functions.Partitioner

189

190

// Custom partitioner for even/odd distribution

191

class EvenOddPartitioner extends Partitioner[Int] {

192

override def partition(key: Int, numPartitions: Int): Int = {

193

if (key % 2 == 0) 0 else 1 // Even numbers to partition 0, odd to partition 1

194

}

195

}

196

197

// Range-based partitioner

198

class RangePartitioner extends Partitioner[Int] {

199

override def partition(key: Int, numPartitions: Int): Int = {

200

val range = 100 / numPartitions

201

Math.min(key / range, numPartitions - 1)

202

}

203

}

204

205

val env = StreamExecutionEnvironment.getExecutionEnvironment

206

val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 50, 75, 99)

207

208

// Custom partition by function

209

val evenOddPartitioned = numbers

210

.partitionCustom(new EvenOddPartitioner, identity)

211

.map(x => s"EvenOdd partitioned: $x")

212

.print()

213

214

// Custom partition by field (for tuples/case classes)

215

case class ValueWithCategory(value: Int, category: String)

216

val categorizedData = env.fromElements(

217

ValueWithCategory(10, "A"),

218

ValueWithCategory(20, "B"),

219

ValueWithCategory(30, "A")

220

)

221

222

class CategoryPartitioner extends Partitioner[String] {

223

override def partition(key: String, numPartitions: Int): Int = {

224

key.hashCode % numPartitions

225

}

226

}

227

228

val categoryPartitioned = categorizedData

229

.partitionCustom(new CategoryPartitioner, "category")

230

.print()

231

```

232

233

## Partitioning Best Practices

234

235

### Load Balancing Considerations

236

237

```scala

238

val env = StreamExecutionEnvironment.getExecutionEnvironment

239

240

// Avoid skewed data distribution

241

case class Event(userId: String, data: String)

242

val events = env.fromElements(

243

Event("popular_user", "data1"), // This user might cause skew

244

Event("user2", "data2"),

245

Event("popular_user", "data3")

246

)

247

248

// Instead of simple keyBy which might cause skew:

249

// val skewed = events.keyBy(_.userId)

250

251

// Use custom partitioning for better distribution:

252

class SkewAwarePartitioner extends Partitioner[String] {

253

override def partition(key: String, numPartitions: Int): Int = {

254

if (key == "popular_user") {

255

// Distribute popular user across multiple partitions

256

(key + System.currentTimeMillis()).hashCode % numPartitions

257

} else {

258

key.hashCode % numPartitions

259

}

260

}

261

}

262

263

val balancedEvents = events

264

.partitionCustom(new SkewAwarePartitioner, _.userId)

265

```

266

267

### Network Efficiency

268

269

```scala

270

// Use forward() for operators that should stay on same machine

271

val localProcessing = stream

272

.map(_.toUpperCase) // CPU-intensive operation

273

.forward // Keep on same machine

274

.filter(_.length > 5) // Another local operation

275

276

// Use rebalance() after operations that might cause imbalance

277

val afterFilter = stream

278

.filter(_.contains("important")) // Might reduce volume significantly

279

.rebalance // Redistribute remaining elements evenly

280

.map(complexProcessing)

281

```

282

283

### State and Partitioning

284

285

```scala

286

// Partitioning affects state access

287

val keyedStream = events.keyBy(_.userId) // State is partitioned by userId

288

289

// Changing partitioning loses access to keyed state

290

val repartitioned = keyedStream

291

.map(processEvent) // Can access keyed state here

292

.rebalance // Repartitioning - no more keyed state access

293

.map(postProcess) // No keyed state access here

294

```

295

296

## Complete Example: Multi-Stage Processing Pipeline

297

298

```scala

299

import org.apache.flink.streaming.api.scala._

300

import org.apache.flink.api.common.functions.Partitioner

301

302

case class LogEntry(

303

timestamp: Long,

304

level: String,

305

service: String,

306

message: String,

307

userId: Option[String]

308

)

309

310

case class ProcessingStats(

311

service: String,

312

errorCount: Int,

313

warningCount: Int,

314

totalCount: Int

315

)

316

317

object MultiStageProcessingPipeline {

318

319

// Custom partitioner for log levels

320

class LogLevelPartitioner extends Partitioner[String] {

321

override def partition(key: String, numPartitions: Int): Int = {

322

key match {

323

case "ERROR" => 0

324

case "WARN" => 1

325

case "INFO" => 2

326

case _ => 3

327

}

328

}

329

}

330

331

// Load-balancing partitioner for services

332

class ServicePartitioner extends Partitioner[String] {

333

override def partition(key: String, numPartitions: Int): Int = {

334

// Use hash for even distribution

335

Math.abs(key.hashCode) % numPartitions

336

}

337

}

338

339

def main(args: Array[String]): Unit = {

340

val env = StreamExecutionEnvironment.getExecutionEnvironment

341

env.setParallelism(8)

342

343

// Sample log data

344

val logs = env.fromElements(

345

LogEntry(1000, "ERROR", "service-a", "Database connection failed", Some("user1")),

346

LogEntry(1001, "INFO", "service-b", "Request processed", Some("user2")),

347

LogEntry(1002, "WARN", "service-a", "High memory usage", None),

348

LogEntry(1003, "ERROR", "service-c", "Authentication failed", Some("user3"))

349

)

350

351

// Stage 1: Parse and clean data (broadcast configuration)

352

val config = env.fromElements("config-item-1", "config-item-2")

353

.broadcast // Configuration needed by all instances

354

355

// Stage 2: Partition by log level for specialized processing

356

val logsByLevel = logs

357

.partitionCustom(new LogLevelPartitioner, _.level)

358

.map { log =>

359

// Process based on log level

360

val priority = log.level match {

361

case "ERROR" => 1

362

case "WARN" => 2

363

case "INFO" => 3

364

case _ => 4

365

}

366

(log, priority)

367

}

368

369

// Stage 3: Key by service for stateful aggregation

370

val serviceStats = logsByLevel

371

.map(_._1) // Extract log entry

372

.keyBy(_.service) // Key by service for stateful processing

373

.map { log =>

374

// Simulate stateful processing (count by service)

375

log.service match {

376

case service => ProcessingStats(service,

377

if (log.level == "ERROR") 1 else 0,

378

if (log.level == "WARN") 1 else 0,

379

1)

380

}

381

}

382

383

// Stage 4: Rebalance for final processing

384

val finalResults = serviceStats

385

.rebalance // Even distribution for final processing

386

.map(stats => s"Service ${stats.service}: ${stats.totalCount} total, ${stats.errorCount} errors")

387

388

// Stage 5: Global aggregation (all data to single instance)

389

val globalSummary = logs

390

.global // Send all to single instance for global stats

391

.map(_ => 1)

392

.reduce(_ + _)

393

.map(count => s"Total log entries processed: $count")

394

395

// Stage 6: User-specific processing with custom partitioning

396

val userLogs = logs

397

.filter(_.userId.isDefined)

398

.partitionCustom(new ServicePartitioner, log => log.userId.get)

399

.keyBy(_.userId.get)

400

.map(log => (log.userId.get, 1))

401

.reduce((a, b) => (a._1, a._2 + b._2))

402

403

// Print results

404

finalResults.print("Service Stats")

405

globalSummary.print("Global Summary")

406

userLogs.print("User Activity")

407

408

env.execute("Multi-Stage Processing Pipeline")

409

}

410

}

411

```