0
# Keyed Streams and State
1
2
KeyedStream represents a partitioned stream where elements are grouped by key, enabling stateful operations, aggregations, and windowing. This is essential for maintaining state per key and performing keyed computations.
3
4
## Capabilities
5
6
### Stream Properties
7
8
Access key type information and stream metadata.
9
10
```scala { .api }
11
class KeyedStream[T, K] {
12
/**
13
* Get the type information for the key
14
* @return TypeInformation for key type K
15
*/
16
def getKeyType: TypeInformation[K]
17
}
18
```
19
20
### Aggregation Operations
21
22
Built-in aggregation functions for common operations.
23
24
```scala { .api }
25
class KeyedStream[T, K] {
26
/**
27
* Reduce elements using a reduction function
28
* @param reducer Function to combine two elements
29
* @return DataStream with reduced elements per key
30
*/
31
def reduce(reducer: ReduceFunction[T]): DataStream[T]
32
33
/**
34
* Reduce elements using a function
35
* @param fun Function to combine two elements
36
* @return DataStream with reduced elements per key
37
*/
38
def reduce(fun: (T, T) => T): DataStream[T]
39
40
/**
41
* Sum numeric field by position
42
* @param position Field position for summation
43
* @return DataStream with summed values per key
44
*/
45
def sum(position: Int): DataStream[T]
46
47
/**
48
* Sum numeric field by name
49
* @param field Field name for summation
50
* @return DataStream with summed values per key
51
*/
52
def sum(field: String): DataStream[T]
53
54
/**
55
* Get maximum value by field position
56
* @param position Field position for maximum
57
* @return DataStream with maximum values per key
58
*/
59
def max(position: Int): DataStream[T]
60
61
/**
62
* Get maximum value by field name
63
* @param field Field name for maximum
64
* @return DataStream with maximum values per key
65
*/
66
def max(field: String): DataStream[T]
67
68
/**
69
* Get minimum value by field position
70
* @param position Field position for minimum
71
* @return DataStream with minimum values per key
72
*/
73
def min(position: Int): DataStream[T]
74
75
/**
76
* Get minimum value by field name
77
* @param field Field name for minimum
78
* @return DataStream with minimum values per key
79
*/
80
def min(field: String): DataStream[T]
81
82
/**
83
* Get element with maximum value by field position
84
* @param position Field position for maximum element
85
* @return DataStream with maximum elements per key
86
*/
87
def maxBy(position: Int): DataStream[T]
88
89
/**
90
* Get element with maximum value by field name
91
* @param field Field name for maximum element
92
* @return DataStream with maximum elements per key
93
*/
94
def maxBy(field: String): DataStream[T]
95
96
/**
97
* Get element with minimum value by field position
98
* @param position Field position for minimum element
99
* @return DataStream with minimum elements per key
100
*/
101
def minBy(position: Int): DataStream[T]
102
103
/**
104
* Get element with minimum value by field name
105
* @param field Field name for minimum element
106
* @return DataStream with minimum elements per key
107
*/
108
def minBy(field: String): DataStream[T]
109
}
110
```
111
112
**Usage Examples:**
113
114
```scala
115
import org.apache.flink.streaming.api.scala._
116
117
case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)
118
119
val readings = env.fromElements(
120
SensorReading("sensor1", 20.0, 1000L),
121
SensorReading("sensor1", 25.0, 2000L),
122
SensorReading("sensor2", 15.0, 1500L),
123
SensorReading("sensor2", 18.0, 2500L)
124
)
125
126
val keyedReadings = readings.keyBy(_.sensorId)
127
128
// Sum temperatures by sensor
129
val totalTemps = keyedReadings.sum("temperature")
130
131
// Get maximum temperature per sensor
132
val maxTemps = keyedReadings.max("temperature")
133
134
// Get reading with maximum temperature per sensor
135
val maxTempReadings = keyedReadings.maxBy("temperature")
136
137
// Custom reduction - average temperature
138
val avgTemps = keyedReadings.reduce((r1, r2) =>
139
SensorReading(r1.sensorId, (r1.temperature + r2.temperature) / 2, math.max(r1.timestamp, r2.timestamp))
140
)
141
```
142
143
### Windowing Operations
144
145
Apply time or count-based windows to keyed streams.
146
147
```scala { .api }
148
class KeyedStream[T, K] {
149
/**
150
* Apply time-based tumbling windowing (deprecated)
151
* @param size Window size
152
* @return WindowedStream for aggregations
153
*/
154
@deprecated("Use window(TumblingEventTimeWindows.of(size))", "1.12.0")
155
def timeWindow(size: Time): WindowedStream[T, K, TimeWindow]
156
157
/**
158
* Apply time-based sliding windowing (deprecated)
159
* @param size Window size
160
* @param slide Slide interval
161
* @return WindowedStream for aggregations
162
*/
163
@deprecated("Use window(SlidingEventTimeWindows.of(size, slide))", "1.12.0")
164
def timeWindow(size: Time, slide: Time): WindowedStream[T, K, TimeWindow]
165
166
/**
167
* Apply count-based windowing
168
* @param size Window size (number of elements)
169
* @return WindowedStream for aggregations
170
*/
171
def countWindow(size: Long): WindowedStream[T, K, GlobalWindow]
172
173
/**
174
* Apply sliding count-based windowing
175
* @param size Window size (number of elements)
176
* @param slide Slide size (number of elements)
177
* @return WindowedStream for aggregations
178
*/
179
def countWindow(size: Long, slide: Long): WindowedStream[T, K, GlobalWindow]
180
181
/**
182
* Apply custom windowing
183
* @param assigner Window assigner implementation
184
* @return WindowedStream for aggregations
185
*/
186
def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W]
187
}
188
```
189
190
**Usage Examples:**
191
192
```scala
193
import org.apache.flink.streaming.api.scala._
194
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
195
import org.apache.flink.streaming.api.windowing.time.Time
196
197
val keyedReadings = readings.keyBy(_.sensorId)
198
199
// Count-based window - every 5 readings per sensor
200
val countWindow = keyedReadings
201
.countWindow(5)
202
.reduce((r1, r2) => SensorReading(r1.sensorId, math.max(r1.temperature, r2.temperature), r2.timestamp))
203
204
// Time-based window - 1 minute tumbling windows
205
val timeWindow = keyedReadings
206
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
207
.reduce((r1, r2) => SensorReading(r1.sensorId, (r1.temperature + r2.temperature) / 2, r2.timestamp))
208
```
209
210
### Stateful Operations
211
212
Manage per-key state for complex processing logic.
213
214
```scala { .api }
215
class KeyedStream[T, K] {
216
/**
217
* Map with per-key state management
218
* @param fun Function with state access: (value, state) => (result, newState)
219
* @return DataStream with stateful mapping results
220
*/
221
def mapWithState[R: TypeInformation, S: TypeInformation](
222
fun: (T, Option[S]) => (R, Option[S])
223
): DataStream[R]
224
225
/**
226
* FlatMap with per-key state management
227
* @param fun Function with state access: (value, state) => (results, newState)
228
* @return DataStream with stateful flatMap results
229
*/
230
def flatMapWithState[R: TypeInformation, S: TypeInformation](
231
fun: (T, Option[S]) => (TraversableOnce[R], Option[S])
232
): DataStream[R]
233
234
/**
235
* Filter with per-key state management
236
* @param fun Function with state access: (value, state) => (keep, newState)
237
* @return DataStream with stateful filtering results
238
*/
239
def filterWithState[S: TypeInformation](
240
fun: (T, Option[S]) => (Boolean, Option[S])
241
): DataStream[T]
242
}
243
```
244
245
**Usage Examples:**
246
247
```scala
248
import org.apache.flink.streaming.api.scala._
249
250
case class Event(key: String, value: Int, timestamp: Long)
251
252
val events = env.fromElements(
253
Event("A", 1, 1000L),
254
Event("A", 2, 2000L),
255
Event("B", 5, 1500L),
256
Event("A", 3, 3000L)
257
)
258
259
val keyedEvents = events.keyBy(_.key)
260
261
// Count events per key with state
262
val eventCounts = keyedEvents.mapWithState[Int, Int] { (event, count) =>
263
val newCount = count.getOrElse(0) + 1
264
(newCount, Some(newCount))
265
}
266
267
// Running sum with state
268
val runningSums = keyedEvents.mapWithState[Int, Int] { (event, sum) =>
269
val newSum = sum.getOrElse(0) + event.value
270
(newSum, Some(newSum))
271
}
272
273
// Filter events based on count state (only keep first 3 events per key)
274
val limitedEvents = keyedEvents.filterWithState[Int] { (event, count) =>
275
val currentCount = count.getOrElse(0) + 1
276
(currentCount <= 3, Some(currentCount))
277
}
278
```
279
280
### Processing Functions
281
282
Apply custom processing logic with access to timers and state.
283
284
```scala { .api }
285
class KeyedStream[T, K] {
286
/**
287
* Apply a KeyedProcessFunction for low-level processing
288
* @param keyedProcessFunction ProcessFunction implementation with key access
289
* @return DataStream with processed results
290
*/
291
def process[R: TypeInformation](
292
keyedProcessFunction: KeyedProcessFunction[K, T, R]
293
): DataStream[R]
294
}
295
```
296
297
### Queryable State
298
299
Make keyed state queryable from external applications.
300
301
```scala { .api }
302
class KeyedStream[T, K] {
303
/**
304
* Make stream queryable with default state descriptor
305
* @param queryableStateName Name for queryable state
306
* @return QueryableStateStream for external queries
307
*/
308
def asQueryableState(queryableStateName: String): QueryableStateStream[K, T]
309
310
/**
311
* Make stream queryable with custom ValueStateDescriptor
312
* @param queryableStateName Name for queryable state
313
* @param stateDescriptor State descriptor for value state
314
* @return QueryableStateStream for external queries
315
*/
316
def asQueryableState(
317
queryableStateName: String,
318
stateDescriptor: ValueStateDescriptor[T]
319
): QueryableStateStream[K, T]
320
321
/**
322
* Make stream queryable with ReducingStateDescriptor
323
* @param queryableStateName Name for queryable state
324
* @param stateDescriptor State descriptor for reducing state
325
* @return QueryableStateStream for external queries
326
*/
327
def asQueryableState(
328
queryableStateName: String,
329
stateDescriptor: ReducingStateDescriptor[T]
330
): QueryableStateStream[K, T]
331
}
332
```
333
334
### Interval Joins
335
336
Join with another keyed stream within a time interval.
337
338
```scala { .api }
339
class KeyedStream[T, K] {
340
/**
341
* Create an interval join with another keyed stream
342
* @param otherStream Other keyed stream to join with
343
* @return IntervalJoin for configuring join parameters
344
*/
345
def intervalJoin[OTHER](otherStream: KeyedStream[OTHER, K]): IntervalJoin[T, OTHER, K]
346
}
347
348
// IntervalJoin configuration
349
class IntervalJoin[IN1, IN2, KEY] {
350
/**
351
* Define the time interval for the join
352
* @param lowerBound Lower bound of the time interval
353
* @param upperBound Upper bound of the time interval
354
* @return IntervalJoined for processing configuration
355
*/
356
def between(lowerBound: Time, upperBound: Time): IntervalJoined[IN1, IN2, KEY]
357
}
358
359
class IntervalJoined[IN1, IN2, KEY] {
360
/**
361
* Make lower bound exclusive
362
* @return IntervalJoined with exclusive lower bound
363
*/
364
def lowerBoundExclusive(): IntervalJoined[IN1, IN2, KEY]
365
366
/**
367
* Make upper bound exclusive
368
* @return IntervalJoined with exclusive upper bound
369
*/
370
def upperBoundExclusive(): IntervalJoined[IN1, IN2, KEY]
371
372
/**
373
* Process joined elements with a ProcessJoinFunction
374
* @param processJoinFunction Function to process joined elements
375
* @return DataStream with join results
376
*/
377
def process[OUT: TypeInformation](
378
processJoinFunction: ProcessJoinFunction[IN1, IN2, OUT]
379
): DataStream[OUT]
380
}
381
```
382
383
**Usage Examples:**
384
385
```scala
386
import org.apache.flink.streaming.api.scala._
387
import org.apache.flink.streaming.api.windowing.time.Time
388
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
389
import org.apache.flink.util.Collector
390
391
case class Order(id: String, customerId: String, amount: Double, timestamp: Long)
392
case class Payment(id: String, customerId: String, amount: Double, timestamp: Long)
393
case class OrderPayment(orderId: String, paymentId: String, customerId: String, timestamp: Long)
394
395
val orders = env.fromElements(
396
Order("o1", "c1", 100.0, 1000L),
397
Order("o2", "c2", 200.0, 2000L)
398
).keyBy(_.customerId)
399
400
val payments = env.fromElements(
401
Payment("p1", "c1", 100.0, 1100L),
402
Payment("p2", "c2", 200.0, 2100L)
403
).keyBy(_.customerId)
404
405
// Join orders and payments within 5 minutes
406
val joined = orders
407
.intervalJoin(payments)
408
.between(Time.minutes(-5), Time.minutes(5))
409
.process(new ProcessJoinFunction[Order, Payment, OrderPayment] {
410
override def processElement(
411
left: Order,
412
right: Payment,
413
ctx: ProcessJoinFunction[Order, Payment, OrderPayment]#Context,
414
out: Collector[OrderPayment]
415
): Unit = {
416
out.collect(OrderPayment(left.id, right.id, left.customerId, math.max(left.timestamp, right.timestamp)))
417
}
418
})
419
```
420
421
## Types
422
423
```scala { .api }
424
// Reduce function interface
425
trait ReduceFunction[T] {
426
def reduce(value1: T, value2: T): T
427
}
428
429
// Process function for keyed streams
430
abstract class KeyedProcessFunction[K, I, O] {
431
def processElement(value: I, ctx: Context, out: Collector[O]): Unit
432
def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[O]): Unit = {}
433
434
abstract class Context {
435
def timestamp(): Long
436
def getCurrentKey: K
437
def timerService(): TimerService
438
def output[X](outputTag: OutputTag[X], value: X): Unit
439
}
440
441
abstract class OnTimerContext extends Context {
442
def timeDomain(): TimeDomain
443
}
444
}
445
446
// Process join function for interval joins
447
abstract class ProcessJoinFunction[IN1, IN2, OUT] {
448
def processElement(left: IN1, right: IN2, ctx: Context, out: Collector[OUT]): Unit
449
450
abstract class Context {
451
def getLeftTimestamp: Long
452
def getRightTimestamp: Long
453
def getCurrentWatermark: Long
454
}
455
}
456
457
// Queryable state stream
458
class QueryableStateStream[K, V] {
459
def getQueryableStateName: String
460
def getKeyType: TypeInformation[K]
461
def getValueType: TypeInformation[V]
462
}
463
464
// Timer service for processing functions
465
trait TimerService {
466
def currentProcessingTime(): Long
467
def currentWatermark(): Long
468
def registerProcessingTimeTimer(time: Long): Unit
469
def registerEventTimeTimer(time: Long): Unit
470
def deleteProcessingTimeTimer(time: Long): Unit
471
def deleteEventTimeTimer(time: Long): Unit
472
}
473
474
// Time domain enum
475
sealed trait TimeDomain
476
object TimeDomain {
477
case object EVENT_TIME extends TimeDomain
478
case object PROCESSING_TIME extends TimeDomain
479
}
480
481
// State descriptors for queryable state
482
class ValueStateDescriptor[T](name: String, typeInfo: TypeInformation[T])
483
class ReducingStateDescriptor[T](name: String, reduceFunction: ReduceFunction[T], typeInfo: TypeInformation[T])
484
```