0
# Stream Partitioning and Distribution
1
2
Stream partitioning controls how data is distributed across parallel operators in Flink streaming applications. Proper partitioning strategies are crucial for load balancing, performance, and ensuring correct results in parallel processing.
3
4
## Key-Based Partitioning
5
6
### KeyBy Operations
7
8
```scala { .api }
9
class DataStream[T] {
10
def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K]
11
def keyBy(fields: Int*): KeyedStream[T, _]
12
def keyBy(firstField: String, otherFields: String*): KeyedStream[T, _]
13
}
14
```
15
16
Partition elements by key for stateful processing:
17
18
```scala
19
import org.apache.flink.streaming.api.scala._
20
21
val env = StreamExecutionEnvironment.getExecutionEnvironment
22
23
// Key by function
24
case class User(id: String, name: String, age: Int)
25
val users = env.fromElements(
26
User("1", "Alice", 25),
27
User("2", "Bob", 30),
28
User("1", "Alice", 26)
29
)
30
31
val keyedByUserId = users.keyBy(_.id)
32
33
// Key by field position (for tuples)
34
val salesData = env.fromElements(("ProductA", 100), ("ProductB", 200), ("ProductA", 150))
35
val keyedBySalesPosition = salesData.keyBy(0) // Key by product name
36
37
// Key by field name
38
val keyedByUserField = users.keyBy("id")
39
40
// Key by multiple fields
41
val keyedByMultipleFields = users.keyBy("id", "age")
42
```
43
44
## Built-in Partitioning Strategies
45
46
### Broadcast Partitioning
47
48
```scala { .api }
49
class DataStream[T] {
50
def broadcast: DataStream[T]
51
}
52
```
53
54
Send each element to all downstream operators:
55
56
```scala
57
val env = StreamExecutionEnvironment.getExecutionEnvironment
58
val configData = env.fromElements("config1", "config2", "config3")
59
60
// Broadcast configuration to all parallel instances
61
val broadcastedConfig = configData.broadcast
62
63
broadcastedConfig
64
.map(config => s"Processing with config: $config")
65
.setParallelism(4) // All 4 parallel instances receive all elements
66
.print()
67
```
68
69
### Global Partitioning
70
71
```scala { .api }
72
class DataStream[T] {
73
def global: DataStream[T]
74
}
75
```
76
77
Send all elements to the first downstream operator instance:
78
79
```scala
80
val env = StreamExecutionEnvironment.getExecutionEnvironment
81
val stream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
82
83
// Send all elements to first instance (parallelism effectively becomes 1)
84
val globalStream = stream.global
85
.map(x => s"Processed on single instance: $x")
86
.print()
87
```
88
89
### Shuffle Partitioning
90
91
```scala { .api }
92
class DataStream[T] {
93
def shuffle: DataStream[T]
94
}
95
```
96
97
Randomly distribute elements across downstream operators:
98
99
```scala
100
val env = StreamExecutionEnvironment.getExecutionEnvironment
101
val stream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
102
103
// Random distribution for load balancing
104
val shuffledStream = stream.shuffle
105
.map(x => s"Randomly assigned: $x")
106
.setParallelism(4)
107
.print()
108
```
109
110
### Rebalance Partitioning
111
112
```scala { .api }
113
class DataStream[T] {
114
def rebalance: DataStream[T]
115
}
116
```
117
118
Distribute elements evenly across downstream operators using round-robin:
119
120
```scala
121
val env = StreamExecutionEnvironment.getExecutionEnvironment
122
val stream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
123
124
// Even distribution using round-robin
125
val rebalancedStream = stream.rebalance
126
.map(x => s"Evenly distributed: $x")
127
.setParallelism(4)
128
.print()
129
```
130
131
### Rescale Partitioning
132
133
```scala { .api }
134
class DataStream[T] {
135
def rescale: DataStream[T]
136
}
137
```
138
139
Distribute elements to a subset of downstream operators:
140
141
```scala
142
val env = StreamExecutionEnvironment.getExecutionEnvironment
143
val stream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
144
.setParallelism(2)
145
146
// Rescale to subset of downstream operators
147
val rescaledStream = stream.rescale
148
.map(x => s"Rescaled: $x")
149
.setParallelism(6) // Each upstream instance sends to 3 downstream instances
150
.print()
151
```
152
153
### Forward Partitioning
154
155
```scala { .api }
156
class DataStream[T] {
157
def forward: DataStream[T]
158
}
159
```
160
161
Forward elements to local downstream operators (same machine):
162
163
```scala
164
val env = StreamExecutionEnvironment.getExecutionEnvironment
165
val stream = env.fromElements(1, 2, 3, 4, 5)
166
167
// Forward to collocated downstream operators
168
val forwardedStream = stream.forward
169
.map(x => s"Locally forwarded: $x")
170
.print()
171
```
172
173
## Custom Partitioning
174
175
### Custom Partitioner
176
177
```scala { .api }
178
class DataStream[T] {
179
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: Int): DataStream[T]
180
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: String): DataStream[T]
181
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataStream[T]
182
}
183
```
184
185
Implement custom partitioning logic:
186
187
```scala
188
import org.apache.flink.api.common.functions.Partitioner
189
190
// Custom partitioner for even/odd distribution
191
class EvenOddPartitioner extends Partitioner[Int] {
192
override def partition(key: Int, numPartitions: Int): Int = {
193
if (key % 2 == 0) 0 else 1 // Even numbers to partition 0, odd to partition 1
194
}
195
}
196
197
// Range-based partitioner
198
class RangePartitioner extends Partitioner[Int] {
199
override def partition(key: Int, numPartitions: Int): Int = {
200
val range = 100 / numPartitions
201
Math.min(key / range, numPartitions - 1)
202
}
203
}
204
205
val env = StreamExecutionEnvironment.getExecutionEnvironment
206
val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 50, 75, 99)
207
208
// Custom partition by function
209
val evenOddPartitioned = numbers
210
.partitionCustom(new EvenOddPartitioner, identity)
211
.map(x => s"EvenOdd partitioned: $x")
212
.print()
213
214
// Custom partition by field (for tuples/case classes)
215
case class ValueWithCategory(value: Int, category: String)
216
val categorizedData = env.fromElements(
217
ValueWithCategory(10, "A"),
218
ValueWithCategory(20, "B"),
219
ValueWithCategory(30, "A")
220
)
221
222
class CategoryPartitioner extends Partitioner[String] {
223
override def partition(key: String, numPartitions: Int): Int = {
224
key.hashCode % numPartitions
225
}
226
}
227
228
val categoryPartitioned = categorizedData
229
.partitionCustom(new CategoryPartitioner, "category")
230
.print()
231
```
232
233
## Partitioning Best Practices
234
235
### Load Balancing Considerations
236
237
```scala
238
val env = StreamExecutionEnvironment.getExecutionEnvironment
239
240
// Avoid skewed data distribution
241
case class Event(userId: String, data: String)
242
val events = env.fromElements(
243
Event("popular_user", "data1"), // This user might cause skew
244
Event("user2", "data2"),
245
Event("popular_user", "data3")
246
)
247
248
// Instead of simple keyBy which might cause skew:
249
// val skewed = events.keyBy(_.userId)
250
251
// Use custom partitioning for better distribution:
252
class SkewAwarePartitioner extends Partitioner[String] {
253
override def partition(key: String, numPartitions: Int): Int = {
254
if (key == "popular_user") {
255
// Distribute popular user across multiple partitions
256
(key + System.currentTimeMillis()).hashCode % numPartitions
257
} else {
258
key.hashCode % numPartitions
259
}
260
}
261
}
262
263
val balancedEvents = events
264
.partitionCustom(new SkewAwarePartitioner, _.userId)
265
```
266
267
### Network Efficiency
268
269
```scala
270
// Use forward() for operators that should stay on same machine
271
val localProcessing = stream
272
.map(_.toUpperCase) // CPU-intensive operation
273
.forward // Keep on same machine
274
.filter(_.length > 5) // Another local operation
275
276
// Use rebalance() after operations that might cause imbalance
277
val afterFilter = stream
278
.filter(_.contains("important")) // Might reduce volume significantly
279
.rebalance // Redistribute remaining elements evenly
280
.map(complexProcessing)
281
```
282
283
### State and Partitioning
284
285
```scala
286
// Partitioning affects state access
287
val keyedStream = events.keyBy(_.userId) // State is partitioned by userId
288
289
// Changing partitioning loses access to keyed state
290
val repartitioned = keyedStream
291
.map(processEvent) // Can access keyed state here
292
.rebalance // Repartitioning - no more keyed state access
293
.map(postProcess) // No keyed state access here
294
```
295
296
## Complete Example: Multi-Stage Processing Pipeline
297
298
```scala
299
import org.apache.flink.streaming.api.scala._
300
import org.apache.flink.api.common.functions.Partitioner
301
302
case class LogEntry(
303
timestamp: Long,
304
level: String,
305
service: String,
306
message: String,
307
userId: Option[String]
308
)
309
310
case class ProcessingStats(
311
service: String,
312
errorCount: Int,
313
warningCount: Int,
314
totalCount: Int
315
)
316
317
object MultiStageProcessingPipeline {
318
319
// Custom partitioner for log levels
320
class LogLevelPartitioner extends Partitioner[String] {
321
override def partition(key: String, numPartitions: Int): Int = {
322
key match {
323
case "ERROR" => 0
324
case "WARN" => 1
325
case "INFO" => 2
326
case _ => 3
327
}
328
}
329
}
330
331
// Load-balancing partitioner for services
332
class ServicePartitioner extends Partitioner[String] {
333
override def partition(key: String, numPartitions: Int): Int = {
334
// Use hash for even distribution
335
Math.abs(key.hashCode) % numPartitions
336
}
337
}
338
339
def main(args: Array[String]): Unit = {
340
val env = StreamExecutionEnvironment.getExecutionEnvironment
341
env.setParallelism(8)
342
343
// Sample log data
344
val logs = env.fromElements(
345
LogEntry(1000, "ERROR", "service-a", "Database connection failed", Some("user1")),
346
LogEntry(1001, "INFO", "service-b", "Request processed", Some("user2")),
347
LogEntry(1002, "WARN", "service-a", "High memory usage", None),
348
LogEntry(1003, "ERROR", "service-c", "Authentication failed", Some("user3"))
349
)
350
351
// Stage 1: Parse and clean data (broadcast configuration)
352
val config = env.fromElements("config-item-1", "config-item-2")
353
.broadcast // Configuration needed by all instances
354
355
// Stage 2: Partition by log level for specialized processing
356
val logsByLevel = logs
357
.partitionCustom(new LogLevelPartitioner, _.level)
358
.map { log =>
359
// Process based on log level
360
val priority = log.level match {
361
case "ERROR" => 1
362
case "WARN" => 2
363
case "INFO" => 3
364
case _ => 4
365
}
366
(log, priority)
367
}
368
369
// Stage 3: Key by service for stateful aggregation
370
val serviceStats = logsByLevel
371
.map(_._1) // Extract log entry
372
.keyBy(_.service) // Key by service for stateful processing
373
.map { log =>
374
// Simulate stateful processing (count by service)
375
log.service match {
376
case service => ProcessingStats(service,
377
if (log.level == "ERROR") 1 else 0,
378
if (log.level == "WARN") 1 else 0,
379
1)
380
}
381
}
382
383
// Stage 4: Rebalance for final processing
384
val finalResults = serviceStats
385
.rebalance // Even distribution for final processing
386
.map(stats => s"Service ${stats.service}: ${stats.totalCount} total, ${stats.errorCount} errors")
387
388
// Stage 5: Global aggregation (all data to single instance)
389
val globalSummary = logs
390
.global // Send all to single instance for global stats
391
.map(_ => 1)
392
.reduce(_ + _)
393
.map(count => s"Total log entries processed: $count")
394
395
// Stage 6: User-specific processing with custom partitioning
396
val userLogs = logs
397
.filter(_.userId.isDefined)
398
.partitionCustom(new ServicePartitioner, log => log.userId.get)
399
.keyBy(_.userId.get)
400
.map(log => (log.userId.get, 1))
401
.reduce((a, b) => (a._1, a._2 + b._2))
402
403
// Print results
404
finalResults.print("Service Stats")
405
globalSummary.print("Global Summary")
406
userLogs.print("User Activity")
407
408
env.execute("Multi-Stage Processing Pipeline")
409
}
410
}
411
```