0
# State Management
1
2
Comprehensive state management API supporting both synchronous and asynchronous operations. Flink provides different types of state for various use cases including value state, list state, map state, and specialized aggregating states for stateful stream processing applications.
3
4
## Capabilities
5
6
### State Interfaces (Synchronous API)
7
8
Core state interfaces for traditional synchronous state access.
9
10
```java { .api }
11
/**
12
* Base interface for all state types
13
*/
14
interface State {
15
/**
16
* Clear the state
17
*/
18
void clear();
19
}
20
21
/**
22
* State that holds a single value
23
* @param <T> Type of the value
24
*/
25
interface ValueState<T> extends State {
26
/**
27
* Get current value
28
* @return Current value, null if not set
29
* @throws Exception
30
*/
31
T value() throws Exception;
32
33
/**
34
* Update the state value
35
* @param value New value
36
* @throws Exception
37
*/
38
void update(T value) throws Exception;
39
}
40
41
/**
42
* State that holds a list of elements
43
* @param <T> Type of elements in the list
44
*/
45
interface ListState<T> extends State {
46
/**
47
* Get all elements in the list
48
* @return Iterable over all elements
49
* @throws Exception
50
*/
51
Iterable<T> get() throws Exception;
52
53
/**
54
* Add element to the list
55
* @param value Element to add
56
* @throws Exception
57
*/
58
void add(T value) throws Exception;
59
60
/**
61
* Add all elements from iterable to the list
62
* @param values Elements to add
63
* @throws Exception
64
*/
65
void addAll(List<T> values) throws Exception;
66
67
/**
68
* Replace all elements in the list
69
* @param values New elements
70
* @throws Exception
71
*/
72
void update(List<T> values) throws Exception;
73
}
74
75
/**
76
* State that holds a key-value map
77
* @param <UK> Type of user keys
78
* @param <UV> Type of user values
79
*/
80
interface MapState<UK, UV> extends State {
81
/**
82
* Get value for the given key
83
* @param key User key
84
* @return Value for the key, null if not present
85
* @throws Exception
86
*/
87
UV get(UK key) throws Exception;
88
89
/**
90
* Associate value with key
91
* @param key User key
92
* @param value User value
93
* @throws Exception
94
*/
95
void put(UK key, UV value) throws Exception;
96
97
/**
98
* Add all key-value pairs from map
99
* @param map Key-value pairs to add
100
* @throws Exception
101
*/
102
void putAll(Map<UK, UV> map) throws Exception;
103
104
/**
105
* Remove key-value pair
106
* @param key Key to remove
107
* @throws Exception
108
*/
109
void remove(UK key) throws Exception;
110
111
/**
112
* Check if key exists
113
* @param key Key to check
114
* @return true if key exists
115
* @throws Exception
116
*/
117
boolean contains(UK key) throws Exception;
118
119
/**
120
* Get all entries
121
* @return Iterable over all entries
122
* @throws Exception
123
*/
124
Iterable<Map.Entry<UK, UV>> entries() throws Exception;
125
126
/**
127
* Get all keys
128
* @return Iterable over all keys
129
* @throws Exception
130
*/
131
Iterable<UK> keys() throws Exception;
132
133
/**
134
* Get all values
135
* @return Iterable over all values
136
* @throws Exception
137
*/
138
Iterable<UV> values() throws Exception;
139
140
/**
141
* Check if state is empty
142
* @return true if no entries
143
* @throws Exception
144
*/
145
boolean isEmpty() throws Exception;
146
}
147
148
/**
149
* State for pre-aggregating values using AggregateFunction
150
* @param <IN> Input type
151
* @param <OUT> Output type
152
*/
153
interface AggregatingState<IN, OUT> extends State {
154
/**
155
* Get current aggregated result
156
* @return Aggregated result
157
* @throws Exception
158
*/
159
OUT get() throws Exception;
160
161
/**
162
* Add value to aggregation
163
* @param value Value to add
164
* @throws Exception
165
*/
166
void add(IN value) throws Exception;
167
}
168
169
/**
170
* State that reduces values on-the-fly using ReduceFunction
171
* @param <T> Element type
172
*/
173
interface ReducingState<T> extends State {
174
/**
175
* Get current reduced result
176
* @return Reduced result
177
* @throws Exception
178
*/
179
T get() throws Exception;
180
181
/**
182
* Add value to reduction
183
* @param value Value to add
184
* @throws Exception
185
*/
186
void add(T value) throws Exception;
187
}
188
189
/**
190
* State for broadcast patterns - read-only for non-broadcast stream
191
* @param <K> Key type
192
* @param <V> Value type
193
*/
194
interface BroadcastState<K, V> extends State {
195
/**
196
* Get value for key (read-only for non-broadcast stream)
197
* @param key Key to lookup
198
* @return Value for key
199
* @throws Exception
200
*/
201
V get(K key) throws Exception;
202
203
/**
204
* Check if key exists (read-only for non-broadcast stream)
205
* @param key Key to check
206
* @return true if key exists
207
* @throws Exception
208
*/
209
boolean contains(K key) throws Exception;
210
211
/**
212
* Get all entries (read-only for non-broadcast stream)
213
* @return Iterable over all entries
214
* @throws Exception
215
*/
216
Iterable<Map.Entry<K, V>> entries() throws Exception;
217
218
/**
219
* Get all keys (read-only for non-broadcast stream)
220
* @return Iterable over all keys
221
* @throws Exception
222
*/
223
Iterable<K> keys() throws Exception;
224
225
/**
226
* Get all values (read-only for non-broadcast stream)
227
* @return Iterable over all values
228
* @throws Exception
229
*/
230
Iterable<V> values() throws Exception;
231
232
// Write operations available only in broadcast stream processing functions
233
234
/**
235
* Associate value with key (broadcast stream only)
236
* @param key Key
237
* @param value Value
238
* @throws Exception
239
*/
240
void put(K key, V value) throws Exception;
241
242
/**
243
* Add all key-value pairs (broadcast stream only)
244
* @param map Key-value pairs to add
245
* @throws Exception
246
*/
247
void putAll(Map<K, V> map) throws Exception;
248
249
/**
250
* Remove key-value pair (broadcast stream only)
251
* @param key Key to remove
252
* @throws Exception
253
*/
254
void remove(K key) throws Exception;
255
}
256
```
257
258
### State Descriptors
259
260
Descriptors for creating and configuring state variables.
261
262
```java { .api }
263
/**
264
* Base descriptor for state variables
265
* @param <S> State type
266
* @param <T> Value type
267
*/
268
abstract class StateDescriptor<S extends State, T> {
269
/**
270
* Get state name
271
* @return State name
272
*/
273
public String getName();
274
275
/**
276
* Get type information
277
* @return Type information
278
*/
279
public TypeInformation<T> getTypeInformation();
280
281
/**
282
* Set default value
283
* @param defaultValue Default value
284
*/
285
public void setDefaultValue(T defaultValue);
286
287
/**
288
* Get default value
289
* @return Default value
290
*/
291
public T getDefaultValue();
292
}
293
294
/**
295
* Descriptor for ValueState
296
* @param <T> Value type
297
*/
298
class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {
299
/**
300
* Create descriptor with name and type information
301
* @param name State name
302
* @param typeInfo Type information
303
*/
304
public ValueStateDescriptor(String name, TypeInformation<T> typeInfo);
305
306
/**
307
* Create descriptor with name and type class
308
* @param name State name
309
* @param typeClass Type class
310
*/
311
public ValueStateDescriptor(String name, Class<T> typeClass);
312
}
313
314
/**
315
* Descriptor for ListState
316
* @param <T> Element type
317
*/
318
class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T>> {
319
/**
320
* Create descriptor with name and element type information
321
* @param name State name
322
* @param elementTypeInfo Element type information
323
*/
324
public ListStateDescriptor(String name, TypeInformation<T> elementTypeInfo);
325
326
/**
327
* Create descriptor with name and element type class
328
* @param name State name
329
* @param elementTypeClass Element type class
330
*/
331
public ListStateDescriptor(String name, Class<T> elementTypeClass);
332
}
333
334
/**
335
* Descriptor for MapState
336
* @param <UK> User key type
337
* @param <UV> User value type
338
*/
339
class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>, Map<UK, UV>> {
340
/**
341
* Create descriptor with name and type information
342
* @param name State name
343
* @param keyTypeInfo Key type information
344
* @param valueTypeInfo Value type information
345
*/
346
public MapStateDescriptor(String name, TypeInformation<UK> keyTypeInfo, TypeInformation<UV> valueTypeInfo);
347
348
/**
349
* Create descriptor with name and type classes
350
* @param name State name
351
* @param keyTypeClass Key type class
352
* @param valueTypeClass Value type class
353
*/
354
public MapStateDescriptor(String name, Class<UK> keyTypeClass, Class<UV> valueTypeClass);
355
}
356
357
/**
358
* Descriptor for AggregatingState
359
* @param <IN> Input type
360
* @param <ACC> Accumulator type
361
* @param <OUT> Output type
362
*/
363
class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<AggregatingState<IN, OUT>, ACC> {
364
/**
365
* Create descriptor with name, aggregate function, and accumulator type
366
* @param name State name
367
* @param aggFunction Aggregate function
368
* @param accTypeInfo Accumulator type information
369
*/
370
public AggregatingStateDescriptor(String name, AggregateFunction<IN, ACC, OUT> aggFunction, TypeInformation<ACC> accTypeInfo);
371
}
372
373
/**
374
* Descriptor for ReducingState
375
* @param <T> Element type
376
*/
377
class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>, T> {
378
/**
379
* Create descriptor with name, reduce function, and type information
380
* @param name State name
381
* @param reduceFunction Reduce function
382
* @param typeInfo Type information
383
*/
384
public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, TypeInformation<T> typeInfo);
385
}
386
```
387
388
### State v2 API (Asynchronous)
389
390
Next-generation state API supporting asynchronous operations for improved performance.
391
392
```java { .api }
393
/**
394
* Base interface for async state API
395
*/
396
interface org.apache.flink.api.common.state.v2.State {
397
/**
398
* Clear the state asynchronously
399
* @return Future representing completion
400
*/
401
StateFuture<Void> asyncClear();
402
}
403
404
/**
405
* Async single value state
406
* @param <T> Value type
407
*/
408
interface org.apache.flink.api.common.state.v2.ValueState<T> extends org.apache.flink.api.common.state.v2.State {
409
/**
410
* Get current value asynchronously
411
* @return Future with current value
412
*/
413
StateFuture<T> asyncValue();
414
415
/**
416
* Update state value asynchronously
417
* @param value New value
418
* @return Future representing completion
419
*/
420
StateFuture<Void> asyncUpdate(T value);
421
}
422
423
/**
424
* Async list state
425
* @param <T> Element type
426
*/
427
interface org.apache.flink.api.common.state.v2.ListState<T> extends org.apache.flink.api.common.state.v2.State {
428
/**
429
* Get all elements asynchronously
430
* @return Future with iterable over elements
431
*/
432
StateFuture<Iterable<T>> asyncGet();
433
434
/**
435
* Add element asynchronously
436
* @param value Element to add
437
* @return Future representing completion
438
*/
439
StateFuture<Void> asyncAdd(T value);
440
441
/**
442
* Update list asynchronously
443
* @param values New elements
444
* @return Future representing completion
445
*/
446
StateFuture<Void> asyncUpdate(List<T> values);
447
}
448
449
/**
450
* Async map state
451
* @param <UK> User key type
452
* @param <UV> User value type
453
*/
454
interface org.apache.flink.api.common.state.v2.MapState<UK, UV> extends org.apache.flink.api.common.state.v2.State {
455
/**
456
* Get value for key asynchronously
457
* @param key User key
458
* @return Future with value
459
*/
460
StateFuture<UV> asyncGet(UK key);
461
462
/**
463
* Put key-value pair asynchronously
464
* @param key User key
465
* @param value User value
466
* @return Future representing completion
467
*/
468
StateFuture<Void> asyncPut(UK key, UV value);
469
470
/**
471
* Remove key asynchronously
472
* @param key Key to remove
473
* @return Future representing completion
474
*/
475
StateFuture<Void> asyncRemove(UK key);
476
477
/**
478
* Check if key exists asynchronously
479
* @param key Key to check
480
* @return Future with boolean result
481
*/
482
StateFuture<Boolean> asyncContains(UK key);
483
484
/**
485
* Get all entries asynchronously
486
* @return Future with iterable over entries
487
*/
488
StateFuture<Iterable<Map.Entry<UK, UV>>> asyncEntries();
489
}
490
491
/**
492
* Future type for async state operations
493
* @param <T> Result type
494
*/
495
interface StateFuture<T> {
496
/**
497
* Apply function when future completes
498
* @param fn Function to apply
499
* @param <U> Function result type
500
* @return New future with function result
501
*/
502
<U> StateFuture<U> thenApply(Function<T, U> fn);
503
504
/**
505
* Compose with another async operation
506
* @param fn Function returning another future
507
* @param <U> Composed result type
508
* @return Future representing composed operation
509
*/
510
<U> StateFuture<U> thenCompose(Function<T, StateFuture<U>> fn);
511
512
/**
513
* Handle completion or exception
514
* @param fn Handler function
515
* @param <U> Handler result type
516
* @return Future with handler result
517
*/
518
<U> StateFuture<U> handle(BiFunction<T, Throwable, U> fn);
519
}
520
```
521
522
### Watermark Management
523
524
Watermark system for event time processing.
525
526
```java { .api }
527
/**
528
* Base interface for watermarks
529
*/
530
interface Watermark {
531
/**
532
* Get watermark timestamp
533
* @return Timestamp
534
*/
535
long getTimestamp();
536
537
/**
538
* Check if this is a special watermark (e.g., MAX_WATERMARK)
539
* @return true if special watermark
540
*/
541
boolean isSpecial();
542
}
543
544
/**
545
* Long-based watermark implementation
546
*/
547
class LongWatermark implements Watermark {
548
/**
549
* Create watermark with timestamp
550
* @param timestamp Watermark timestamp
551
*/
552
public LongWatermark(long timestamp);
553
554
@Override
555
public long getTimestamp();
556
557
@Override
558
public boolean isSpecial();
559
560
/** Maximum possible watermark value */
561
public static final LongWatermark MAX_WATERMARK;
562
}
563
564
/**
565
* Boolean-based watermark implementation
566
*/
567
class BoolWatermark implements Watermark {
568
/**
569
* Create boolean watermark
570
* @param value Boolean value
571
*/
572
public BoolWatermark(boolean value);
573
574
/**
575
* Get boolean value
576
* @return Boolean value
577
*/
578
public boolean getValue();
579
}
580
581
/**
582
* Interface for watermark management
583
*/
584
interface WatermarkManager {
585
/**
586
* Update watermarks
587
* @param watermarks New watermarks
588
*/
589
void updateWatermarks(Collection<Watermark> watermarks);
590
591
/**
592
* Get current combined watermark
593
* @return Current watermark
594
*/
595
Watermark getCurrentWatermark();
596
}
597
```