0
# State Management
1
2
Apache Flink provides comprehensive state management capabilities for stateful stream processing applications. The state APIs enable functions to maintain state across events while providing fault tolerance through checkpointing and recovery mechanisms.
3
4
## State Types
5
6
### Value State
7
8
Store and update a single value per key.
9
10
```java { .api }
11
import org.apache.flink.api.common.state.ValueState;
12
import org.apache.flink.api.common.state.ValueStateDescriptor;
13
import org.apache.flink.api.common.functions.RichMapFunction;
14
import org.apache.flink.api.common.functions.OpenContext;
15
16
public class CountingMapFunction extends RichMapFunction<String, Tuple2<String, Integer>> {
17
private ValueState<Integer> countState;
18
19
@Override
20
public void open(OpenContext openContext) throws Exception {
21
// Create state descriptor
22
ValueStateDescriptor<Integer> descriptor =
23
new ValueStateDescriptor<>("count", Integer.class, 0);
24
25
// Get state handle
26
countState = getRuntimeContext().getState(descriptor);
27
}
28
29
@Override
30
public Tuple2<String, Integer> map(String value) throws Exception {
31
// Read current state
32
Integer currentCount = countState.value();
33
34
// Update state
35
currentCount++;
36
countState.update(currentCount);
37
38
return new Tuple2<>(value, currentCount);
39
}
40
}
41
```
42
43
### List State
44
45
Maintain a list of values per key.
46
47
```java { .api }
48
import org.apache.flink.api.common.state.ListState;
49
import org.apache.flink.api.common.state.ListStateDescriptor;
50
51
public class BufferingMapFunction extends RichMapFunction<String, List<String>> {
52
private ListState<String> bufferState;
53
54
@Override
55
public void open(OpenContext openContext) throws Exception {
56
ListStateDescriptor<String> descriptor =
57
new ListStateDescriptor<>("buffer", String.class);
58
bufferState = getRuntimeContext().getListState(descriptor);
59
}
60
61
@Override
62
public List<String> map(String value) throws Exception {
63
// Add to list state
64
bufferState.add(value);
65
66
// Read all values
67
List<String> allValues = new ArrayList<>();
68
for (String item : bufferState.get()) {
69
allValues.add(item);
70
}
71
72
// Clear state if buffer is full
73
if (allValues.size() > 100) {
74
bufferState.clear();
75
}
76
77
return allValues;
78
}
79
}
80
```
81
82
### Map State
83
84
Store key-value pairs as state.
85
86
```java { .api }
87
import org.apache.flink.api.common.state.MapState;
88
import org.apache.flink.api.common.state.MapStateDescriptor;
89
90
public class UserSessionFunction extends RichMapFunction<Event, SessionInfo> {
91
private MapState<String, Long> sessionStartTimes;
92
93
@Override
94
public void open(OpenContext openContext) throws Exception {
95
MapStateDescriptor<String, Long> descriptor =
96
new MapStateDescriptor<>("sessions", String.class, Long.class);
97
sessionStartTimes = getRuntimeContext().getMapState(descriptor);
98
}
99
100
@Override
101
public SessionInfo map(Event event) throws Exception {
102
String sessionId = event.getSessionId();
103
104
// Check if session exists
105
if (!sessionStartTimes.contains(sessionId)) {
106
// New session
107
sessionStartTimes.put(sessionId, event.getTimestamp());
108
}
109
110
long startTime = sessionStartTimes.get(sessionId);
111
long duration = event.getTimestamp() - startTime;
112
113
// Remove expired sessions
114
Iterator<Map.Entry<String, Long>> iterator = sessionStartTimes.iterator();
115
while (iterator.hasNext()) {
116
Map.Entry<String, Long> entry = iterator.next();
117
if (event.getTimestamp() - entry.getValue() > 3600000) { // 1 hour
118
iterator.remove();
119
}
120
}
121
122
return new SessionInfo(sessionId, startTime, duration);
123
}
124
}
125
```
126
127
### Reducing State
128
129
Aggregate values using a reduce function.
130
131
```java { .api }
132
import org.apache.flink.api.common.state.ReducingState;
133
import org.apache.flink.api.common.state.ReducingStateDescriptor;
134
import org.apache.flink.api.common.functions.ReduceFunction;
135
136
public class SumAccumulatorFunction extends RichMapFunction<Integer, Integer> {
137
private ReducingState<Integer> sumState;
138
139
@Override
140
public void open(OpenContext openContext) throws Exception {
141
ReducingStateDescriptor<Integer> descriptor =
142
new ReducingStateDescriptor<>(
143
"sum",
144
new ReduceFunction<Integer>() {
145
@Override
146
public Integer reduce(Integer value1, Integer value2) throws Exception {
147
return value1 + value2;
148
}
149
},
150
Integer.class
151
);
152
153
sumState = getRuntimeContext().getReducingState(descriptor);
154
}
155
156
@Override
157
public Integer map(Integer value) throws Exception {
158
// Add to reducing state
159
sumState.add(value);
160
161
// Get current sum
162
return sumState.get();
163
}
164
}
165
```
166
167
### Aggregating State
168
169
Use custom aggregate functions for complex aggregations.
170
171
```java { .api }
172
import org.apache.flink.api.common.state.AggregatingState;
173
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
174
import org.apache.flink.api.common.functions.AggregateFunction;
175
176
public class AverageAccumulatorFunction extends RichMapFunction<Double, Double> {
177
private AggregatingState<Double, Double> avgState;
178
179
// Average aggregate function
180
public static class AverageAggregate implements AggregateFunction<Double, Tuple2<Double, Long>, Double> {
181
@Override
182
public Tuple2<Double, Long> createAccumulator() {
183
return new Tuple2<>(0.0, 0L);
184
}
185
186
@Override
187
public Tuple2<Double, Long> add(Double value, Tuple2<Double, Long> accumulator) {
188
return new Tuple2<>(accumulator.f0 + value, accumulator.f1 + 1L);
189
}
190
191
@Override
192
public Double getResult(Tuple2<Double, Long> accumulator) {
193
return accumulator.f1 == 0 ? 0.0 : accumulator.f0 / accumulator.f1;
194
}
195
196
@Override
197
public Tuple2<Double, Long> merge(Tuple2<Double, Long> a, Tuple2<Double, Long> b) {
198
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
199
}
200
}
201
202
@Override
203
public void open(OpenContext openContext) throws Exception {
204
AggregatingStateDescriptor<Double, Tuple2<Double, Long>, Double> descriptor =
205
new AggregatingStateDescriptor<>("average", new AverageAggregate(),
206
TypeInformation.of(new TypeHint<Tuple2<Double, Long>>(){}));
207
208
avgState = getRuntimeContext().getAggregatingState(descriptor);
209
}
210
211
@Override
212
public Double map(Double value) throws Exception {
213
avgState.add(value);
214
return avgState.get();
215
}
216
}
217
```
218
219
## State TTL (Time To Live)
220
221
Configure automatic state cleanup based on time.
222
223
```java { .api }
224
import org.apache.flink.api.common.state.StateTtlConfig;
225
import org.apache.flink.api.common.time.Time;
226
227
public class TTLEnabledFunction extends RichMapFunction<String, String> {
228
private ValueState<String> ttlState;
229
230
@Override
231
public void open(OpenContext openContext) throws Exception {
232
// Configure state TTL
233
StateTtlConfig ttlConfig = StateTtlConfig
234
.newBuilder(Time.hours(1)) // TTL of 1 hour
235
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
236
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
237
.cleanupFullSnapshot() // Cleanup on full snapshots
238
.cleanupIncrementally(10, true) // Incremental cleanup
239
.build();
240
241
ValueStateDescriptor<String> descriptor =
242
new ValueStateDescriptor<>("ttl-state", String.class);
243
descriptor.enableTimeToLive(ttlConfig);
244
245
ttlState = getRuntimeContext().getState(descriptor);
246
}
247
248
@Override
249
public String map(String value) throws Exception {
250
String currentValue = ttlState.value();
251
ttlState.update(value);
252
return currentValue != null ? currentValue : "first-value";
253
}
254
}
255
```
256
257
### Advanced TTL Configuration
258
259
```java { .api }
260
// Different cleanup strategies
261
StateTtlConfig incrementalCleanup = StateTtlConfig
262
.newBuilder(Time.minutes(30))
263
.cleanupIncrementally(5, true) // Clean 5 entries per access
264
.build();
265
266
StateTtlConfig fullSnapshotCleanup = StateTtlConfig
267
.newBuilder(Time.days(1))
268
.cleanupFullSnapshot() // Clean during full snapshots
269
.build();
270
271
StateTtlConfig rocksDBCleanup = StateTtlConfig
272
.newBuilder(Time.hours(2))
273
.cleanupInRocksdbCompactFilter(1000) // RocksDB compaction filter
274
.build();
275
276
// Combined cleanup strategies
277
StateTtlConfig combinedCleanup = StateTtlConfig
278
.newBuilder(Time.hours(6))
279
.cleanupIncrementally(10, true)
280
.cleanupFullSnapshot()
281
.cleanupInRocksdbCompactFilter(500)
282
.build();
283
```
284
285
## Operator State
286
287
State that is not keyed and maintained per operator instance.
288
289
```java { .api }
290
import org.apache.flink.api.common.state.ListState;
291
import org.apache.flink.api.common.state.ListStateDescriptor;
292
import org.apache.flink.runtime.state.FunctionInitializationContext;
293
import org.apache.flink.runtime.state.FunctionSnapshotContext;
294
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
295
296
public class BufferingMapFunction extends RichMapFunction<String, String>
297
implements CheckpointedFunction {
298
299
private List<String> bufferedElements;
300
private ListState<String> checkpointedState;
301
302
@Override
303
public String map(String value) throws Exception {
304
bufferedElements.add(value);
305
306
if (bufferedElements.size() >= 10) {
307
String result = String.join(",", bufferedElements);
308
bufferedElements.clear();
309
return result;
310
}
311
312
return null; // Buffer not full yet
313
}
314
315
@Override
316
public void snapshotState(FunctionSnapshotContext context) throws Exception {
317
checkpointedState.clear();
318
for (String element : bufferedElements) {
319
checkpointedState.add(element);
320
}
321
}
322
323
@Override
324
public void initializeState(FunctionInitializationContext context) throws Exception {
325
ListStateDescriptor<String> descriptor =
326
new ListStateDescriptor<>("buffered-elements", String.class);
327
328
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
329
330
if (context.isRestored()) {
331
// Restore state after failure
332
bufferedElements = new ArrayList<>();
333
for (String element : checkpointedState.get()) {
334
bufferedElements.add(element);
335
}
336
} else {
337
bufferedElements = new ArrayList<>();
338
}
339
}
340
}
341
```
342
343
### Union List State
344
345
For redistributing state during rescaling.
346
347
```java { .api }
348
import org.apache.flink.api.common.state.ListState;
349
import org.apache.flink.runtime.state.FunctionInitializationContext;
350
351
public class RedistributingFunction extends RichMapFunction<Integer, Integer>
352
implements CheckpointedFunction {
353
354
private List<Integer> localBuffer;
355
private ListState<Integer> unionState;
356
357
@Override
358
public void initializeState(FunctionInitializationContext context) throws Exception {
359
ListStateDescriptor<Integer> descriptor =
360
new ListStateDescriptor<>("union-state", Integer.class);
361
362
// Union list state for redistribution during rescaling
363
unionState = context.getOperatorStateStore().getUnionListState(descriptor);
364
365
localBuffer = new ArrayList<>();
366
367
if (context.isRestored()) {
368
// All subtasks receive all state elements
369
for (Integer element : unionState.get()) {
370
localBuffer.add(element);
371
}
372
}
373
}
374
375
@Override
376
public Integer map(Integer value) throws Exception {
377
localBuffer.add(value);
378
379
// Process and return result
380
return localBuffer.size();
381
}
382
383
@Override
384
public void snapshotState(FunctionSnapshotContext context) throws Exception {
385
unionState.clear();
386
for (Integer element : localBuffer) {
387
unionState.add(element);
388
}
389
}
390
}
391
```
392
393
## Broadcast State
394
395
Share read-only state across all parallel instances.
396
397
```java { .api }
398
import org.apache.flink.api.common.state.MapStateDescriptor;
399
import org.apache.flink.api.common.state.BroadcastState;
400
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
401
402
public class RuleBasedProcessFunction extends BroadcastProcessFunction<Event, Rule, Alert> {
403
404
// Broadcast state descriptor
405
private static final MapStateDescriptor<String, Rule> RULE_STATE_DESCRIPTOR =
406
new MapStateDescriptor<>("rules", String.class, Rule.class);
407
408
@Override
409
public void processElement(Event event, ReadOnlyContext ctx, Collector<Alert> out)
410
throws Exception {
411
412
// Read from broadcast state (read-only in processElement)
413
ReadOnlyBroadcastState<String, Rule> broadcastState =
414
ctx.getBroadcastState(RULE_STATE_DESCRIPTOR);
415
416
// Apply rules to event
417
for (Map.Entry<String, Rule> entry : broadcastState.immutableEntries()) {
418
Rule rule = entry.getValue();
419
if (rule.matches(event)) {
420
out.collect(new Alert(event, rule));
421
}
422
}
423
}
424
425
@Override
426
public void processBroadcastElement(Rule rule, Context ctx, Collector<Alert> out)
427
throws Exception {
428
429
// Update broadcast state (writable in processBroadcastElement)
430
BroadcastState<String, Rule> broadcastState =
431
ctx.getBroadcastState(RULE_STATE_DESCRIPTOR);
432
433
// Add or update rule
434
broadcastState.put(rule.getId(), rule);
435
}
436
}
437
```
438
439
## Queryable State
440
441
Make state queryable from external applications.
442
443
```java { .api }
444
import org.apache.flink.api.common.state.ValueState;
445
import org.apache.flink.api.common.state.ValueStateDescriptor;
446
import org.apache.flink.queryablestate.client.QueryableStateClient;
447
448
public class QueryableStateFunction extends RichMapFunction<Tuple2<String, Integer>, String> {
449
private ValueState<Integer> queryableState;
450
451
@Override
452
public void open(OpenContext openContext) throws Exception {
453
ValueStateDescriptor<Integer> descriptor =
454
new ValueStateDescriptor<>("queryable-count", Integer.class, 0);
455
456
// Make state queryable
457
descriptor.setQueryable("count-query");
458
459
queryableState = getRuntimeContext().getState(descriptor);
460
}
461
462
@Override
463
public String map(Tuple2<String, Integer> value) throws Exception {
464
Integer currentCount = queryableState.value();
465
currentCount += value.f1;
466
queryableState.update(currentCount);
467
468
return "Updated count for " + value.f0 + ": " + currentCount;
469
}
470
}
471
472
// Client code to query state
473
public class StateQueryClient {
474
public static void queryState() throws Exception {
475
QueryableStateClient client = new QueryableStateClient("localhost", 9069);
476
477
CompletableFuture<ValueState<Integer>> future =
478
client.getKvState(
479
JobID.generate(),
480
"count-query",
481
"key",
482
BasicTypeInfo.STRING_TYPE_INFO,
483
new ValueStateDescriptor<>("queryable-count", Integer.class)
484
);
485
486
ValueState<Integer> state = future.get();
487
Integer count = state.value();
488
System.out.println("Current count: " + count);
489
490
client.close();
491
}
492
}
493
```
494
495
## Checkpoint Listeners
496
497
React to checkpoint events.
498
499
```java { .api }
500
import org.apache.flink.api.common.state.CheckpointListener;
501
502
public class CheckpointAwareFunction extends RichMapFunction<String, String>
503
implements CheckpointListener {
504
505
private transient DatabaseConnection connection;
506
507
@Override
508
public void open(OpenContext openContext) throws Exception {
509
connection = new DatabaseConnection();
510
}
511
512
@Override
513
public String map(String value) throws Exception {
514
// Regular processing
515
return process(value);
516
}
517
518
@Override
519
public void notifyCheckpointComplete(long checkpointId) throws Exception {
520
// Checkpoint completed successfully
521
connection.commitTransaction();
522
System.out.println("Checkpoint " + checkpointId + " completed");
523
}
524
525
@Override
526
public void notifyCheckpointAborted(long checkpointId) throws Exception {
527
// Checkpoint was aborted
528
connection.rollbackTransaction();
529
System.out.println("Checkpoint " + checkpointId + " aborted");
530
}
531
}
532
```
533
534
## State Migration
535
536
Handle state evolution during application updates.
537
538
```java { .api }
539
// Version 1 of the state
540
public class StateV1 {
541
public String name;
542
public int count;
543
544
// Constructor, getters, setters
545
}
546
547
// Version 2 of the state (evolved)
548
public class StateV2 {
549
public String name;
550
public int count;
551
public long timestamp; // New field
552
553
public StateV2(StateV1 oldState) {
554
this.name = oldState.name;
555
this.count = oldState.count;
556
this.timestamp = System.currentTimeMillis();
557
}
558
}
559
560
// State migration function
561
public class MigratableStateFunction extends RichMapFunction<String, String> {
562
private ValueState<StateV2> state;
563
564
@Override
565
public void open(OpenContext openContext) throws Exception {
566
ValueStateDescriptor<StateV2> descriptor =
567
new ValueStateDescriptor<>("evolved-state", StateV2.class);
568
569
state = getRuntimeContext().getState(descriptor);
570
}
571
572
@Override
573
public String map(String value) throws Exception {
574
StateV2 currentState = state.value();
575
576
if (currentState == null) {
577
currentState = new StateV2();
578
currentState.name = value;
579
currentState.count = 1;
580
currentState.timestamp = System.currentTimeMillis();
581
} else {
582
currentState.count++;
583
currentState.timestamp = System.currentTimeMillis();
584
}
585
586
state.update(currentState);
587
return currentState.toString();
588
}
589
}
590
```
591
592
## Best Practices
593
594
### State Design Patterns
595
596
```java { .api }
597
// Use appropriate state types for your use case
598
public class StatePatternExamples extends RichMapFunction<Event, Result> {
599
600
// Single values per key
601
private ValueState<String> lastValue;
602
603
// Collections that grow over time
604
private ListState<Event> eventHistory;
605
606
// Key-value mappings
607
private MapState<String, Counter> categoryCounters;
608
609
// Aggregations
610
private ReducingState<Long> totalSum;
611
612
@Override
613
public void open(OpenContext openContext) throws Exception {
614
// Configure TTL for all states
615
StateTtlConfig ttlConfig = StateTtlConfig
616
.newBuilder(Time.hours(24))
617
.cleanupIncrementally(5, true)
618
.build();
619
620
ValueStateDescriptor<String> lastValueDesc =
621
new ValueStateDescriptor<>("last-value", String.class);
622
lastValueDesc.enableTimeToLive(ttlConfig);
623
lastValue = getRuntimeContext().getState(lastValueDesc);
624
625
// Other state descriptors with TTL...
626
}
627
628
@Override
629
public Result map(Event event) throws Exception {
630
// Efficient state operations
631
String previous = lastValue.value();
632
lastValue.update(event.getValue());
633
634
return new Result(previous, event.getValue());
635
}
636
}
637
638
// Minimize state size
639
public class EfficientStateFunction extends RichMapFunction<Event, Summary> {
640
private ValueState<CompactSummary> compactState;
641
642
// Use compact data structures
643
public static class CompactSummary {
644
public long count;
645
public double sum;
646
public long lastTimestamp;
647
648
// Compact representation instead of storing all events
649
}
650
651
@Override
652
public void open(OpenContext openContext) throws Exception {
653
ValueStateDescriptor<CompactSummary> descriptor =
654
new ValueStateDescriptor<>("compact-summary", CompactSummary.class);
655
compactState = getRuntimeContext().getState(descriptor);
656
}
657
658
@Override
659
public Summary map(Event event) throws Exception {
660
CompactSummary summary = compactState.value();
661
if (summary == null) {
662
summary = new CompactSummary();
663
}
664
665
// Update compact state
666
summary.count++;
667
summary.sum += event.getValue();
668
summary.lastTimestamp = event.getTimestamp();
669
670
compactState.update(summary);
671
672
return new Summary(summary.count, summary.sum / summary.count);
673
}
674
}
675
```
676
677
Apache Flink's state management provides powerful capabilities for building stateful stream processing applications with fault tolerance guarantees. By understanding the different state types and following best practices, you can build efficient and reliable stateful applications.