0
# State Types and Operations
1
2
Comprehensive support for all Flink state types including Value, List, Map, Reducing, and Aggregating states, with transparent changelog logging for all state mutations. This capability provides changelog-enabled wrappers for every type of state supported by Apache Flink.
3
4
## Capabilities
5
6
### Value State Operations
7
8
Value state stores a single value per key and supports read, write, and clear operations with changelog logging.
9
10
```java { .api }
11
/**
12
* Changelog-enabled value state that logs all state changes.
13
* Wraps Flink's InternalValueState with transparent logging.
14
*/
15
class ChangelogValueState<K, N, V> extends AbstractChangelogState<K, N, V, InternalValueState<K, N, V>>
16
implements InternalValueState<K, N, V> {
17
18
/**
19
* Gets the current value for the current key and namespace.
20
*
21
* @return The current value, or null if no value is set
22
* @throws IOException if value retrieval fails
23
*/
24
public V value() throws IOException;
25
26
/**
27
* Updates the value for the current key and namespace.
28
* The change is automatically logged to the changelog.
29
*
30
* @param value The new value to set
31
* @throws IOException if value update fails
32
*/
33
public void update(V value) throws IOException;
34
35
/**
36
* Clears the value for the current key and namespace.
37
* The clear operation is logged to the changelog.
38
*/
39
public void clear();
40
41
/**
42
* Static factory method for creating changelog value state instances.
43
* Used internally by the state backend factory system.
44
*
45
* @param valueState The underlying value state to wrap
46
* @return Changelog-enabled value state instance
47
*/
48
static <K, N, SV, S extends State, IS extends S> IS create(InternalKvState<K, N, SV> valueState);
49
}
50
```
51
52
**Usage Example:**
53
54
```java
55
import org.apache.flink.api.common.state.ValueState;
56
import org.apache.flink.api.common.state.ValueStateDescriptor;
57
58
// In a Flink operator
59
ValueStateDescriptor<String> descriptor =
60
new ValueStateDescriptor<>("my-value", String.class);
61
ValueState<String> valueState = getRuntimeContext().getState(descriptor);
62
63
// All operations are automatically logged to changelog
64
String currentValue = valueState.value();
65
valueState.update("new value");
66
valueState.clear();
67
```
68
69
### List State Operations
70
71
List state maintains a list of values per key, supporting add, update, and iteration operations with changelog logging.
72
73
```java { .api }
74
/**
75
* Changelog-enabled list state that logs all list modifications.
76
* Wraps Flink's InternalListState with transparent logging.
77
*/
78
class ChangelogListState<K, N, V> extends AbstractChangelogState<K, N, List<V>, InternalListState<K, N, V>>
79
implements InternalListState<K, N, V> {
80
81
/**
82
* Gets the current list of values for the current key and namespace.
83
*
84
* @return Iterable over the current list values
85
* @throws Exception if list retrieval fails
86
*/
87
public Iterable<V> get() throws Exception;
88
89
/**
90
* Adds a value to the list for the current key and namespace.
91
* The addition is logged to the changelog.
92
*
93
* @param value The value to add to the list
94
* @throws Exception if value addition fails
95
*/
96
public void add(V value) throws Exception;
97
98
/**
99
* Replaces the entire list with the provided values.
100
* The update is logged to the changelog.
101
*
102
* @param values The new list of values
103
* @throws Exception if list update fails
104
*/
105
public void update(List<V> values) throws Exception;
106
107
/**
108
* Adds all values from the provided list to the current list.
109
* Each addition is logged to the changelog.
110
*
111
* @param values The values to add
112
* @throws Exception if addition fails
113
*/
114
public void addAll(List<V> values) throws Exception;
115
116
/**
117
* Clears all values from the list.
118
* The clear operation is logged to the changelog.
119
*/
120
public void clear();
121
122
/**
123
* Updates the list internally with the provided values.
124
* This is an internal method used by the state backend.
125
*
126
* @param valueToStore The list of values to store internally
127
* @throws Exception if internal update fails
128
*/
129
public void updateInternal(List<V> valueToStore) throws Exception;
130
131
/**
132
* Gets the internal list representation.
133
* This is an internal method used by the state backend.
134
*
135
* @return The internal list of values
136
* @throws Exception if internal retrieval fails
137
*/
138
public List<V> getInternal() throws Exception;
139
140
/**
141
* Merges state from multiple namespaces into a target namespace.
142
* This is used during key group merging and state migration.
143
*
144
* @param target The target namespace to merge into
145
* @param sources The source namespaces to merge from
146
* @throws Exception if namespace merging fails
147
*/
148
public void mergeNamespaces(N target, Collection<N> sources) throws Exception;
149
150
/**
151
* Static factory method for creating changelog list state instances.
152
* Used internally by the state backend factory system.
153
*
154
* @param listState The underlying list state to wrap
155
* @return Changelog-enabled list state instance
156
*/
157
static <K, N, SV, S extends State, IS extends S> IS create(InternalKvState<K, N, SV> listState);
158
}
159
```
160
161
**Usage Example:**
162
163
```java
164
import org.apache.flink.api.common.state.ListState;
165
import org.apache.flink.api.common.state.ListStateDescriptor;
166
167
// In a Flink operator
168
ListStateDescriptor<String> descriptor =
169
new ListStateDescriptor<>("my-list", String.class);
170
ListState<String> listState = getRuntimeContext().getListState(descriptor);
171
172
// All operations are automatically logged to changelog
173
listState.add("item1");
174
listState.add("item2");
175
listState.update(Arrays.asList("new1", "new2", "new3"));
176
177
for (String item : listState.get()) {
178
// Process each item
179
}
180
```
181
182
### Map State Operations
183
184
Map state maintains key-value mappings per key, supporting put, get, remove, and iteration operations with changelog logging.
185
186
```java { .api }
187
/**
188
* Changelog-enabled map state that logs all map modifications.
189
* Wraps Flink's InternalMapState with transparent logging.
190
*/
191
class ChangelogMapState<K, N, UK, UV> extends AbstractChangelogState<K, N, Map<UK, UV>, InternalMapState<K, N, UK, UV>>
192
implements InternalMapState<K, N, UK, UV> {
193
194
/**
195
* Gets the value for the specified map key.
196
*
197
* @param key The map key to look up
198
* @return The value associated with the key, or null if not present
199
* @throws Exception if value retrieval fails
200
*/
201
public UV get(UK key) throws Exception;
202
203
/**
204
* Puts a key-value pair into the map.
205
* The put operation is logged to the changelog.
206
*
207
* @param key The map key
208
* @param value The value to associate with the key
209
* @throws Exception if put operation fails
210
*/
211
public void put(UK key, UV value) throws Exception;
212
213
/**
214
* Puts all entries from the provided map into the state map.
215
* Each put operation is logged to the changelog.
216
*
217
* @param map The map containing entries to add
218
* @throws Exception if put operations fail
219
*/
220
public void putAll(Map<UK, UV> map) throws Exception;
221
222
/**
223
* Removes the entry for the specified key.
224
* The remove operation is logged to the changelog.
225
*
226
* @param key The key to remove
227
* @throws Exception if remove operation fails
228
*/
229
public void remove(UK key) throws Exception;
230
231
/**
232
* Checks if the map contains the specified key.
233
*
234
* @param key The key to check
235
* @return true if the key exists, false otherwise
236
* @throws Exception if check operation fails
237
*/
238
public boolean contains(UK key) throws Exception;
239
240
/**
241
* Gets all entries in the map.
242
*
243
* @return Iterable over all key-value entries
244
* @throws Exception if entry retrieval fails
245
*/
246
public Iterable<Map.Entry<UK, UV>> entries() throws Exception;
247
248
/**
249
* Gets all keys in the map.
250
*
251
* @return Iterable over all keys
252
* @throws Exception if key retrieval fails
253
*/
254
public Iterable<UK> keys() throws Exception;
255
256
/**
257
* Gets all values in the map.
258
*
259
* @return Iterable over all values
260
* @throws Exception if value retrieval fails
261
*/
262
public Iterable<UV> values() throws Exception;
263
264
/**
265
* Gets an iterator over all entries in the map.
266
*
267
* @return Iterator over key-value entries
268
* @throws Exception if iterator creation fails
269
*/
270
public Iterator<Map.Entry<UK, UV>> iterator() throws Exception;
271
272
/**
273
* Checks if the map is empty.
274
*
275
* @return true if the map contains no entries, false otherwise
276
* @throws Exception if empty check fails
277
*/
278
public boolean isEmpty() throws Exception;
279
280
/**
281
* Clears all entries from the map.
282
* The clear operation is logged to the changelog.
283
*/
284
public void clear();
285
286
/**
287
* Static factory method for creating changelog map state instances.
288
* Used internally by the state backend factory system.
289
*
290
* @param mapState The underlying map state to wrap
291
* @return Changelog-enabled map state instance
292
*/
293
static <UK, UV, K, N, SV, S extends State, IS extends S> IS create(InternalKvState<K, N, SV> mapState);
294
}
295
```
296
297
**Usage Example:**
298
299
```java
300
import org.apache.flink.api.common.state.MapState;
301
import org.apache.flink.api.common.state.MapStateDescriptor;
302
303
// In a Flink operator
304
MapStateDescriptor<String, Long> descriptor =
305
new MapStateDescriptor<>("my-map", String.class, Long.class);
306
MapState<String, Long> mapState = getRuntimeContext().getMapState(descriptor);
307
308
// All operations are automatically logged to changelog
309
mapState.put("key1", 100L);
310
mapState.put("key2", 200L);
311
312
Long value = mapState.get("key1");
313
mapState.remove("key2");
314
315
for (Map.Entry<String, Long> entry : mapState.entries()) {
316
// Process each entry
317
}
318
```
319
320
### Reducing State Operations
321
322
Reducing state maintains a single value that is updated using a reduce function, with changelog logging for all reductions.
323
324
```java { .api }
325
/**
326
* Changelog-enabled reducing state that logs all reduce operations.
327
* Wraps Flink's InternalReducingState with transparent logging.
328
*/
329
class ChangelogReducingState<K, N, V> extends AbstractChangelogState<K, N, V, InternalReducingState<K, N, V>>
330
implements InternalReducingState<K, N, V> {
331
332
/**
333
* Gets the current reduced value.
334
*
335
* @return The current reduced value, or null if no value has been added
336
* @throws Exception if value retrieval fails
337
*/
338
public V get() throws Exception;
339
340
/**
341
* Adds a value to the reducing state, applying the reduce function.
342
* The resulting state change is logged to the changelog.
343
*
344
* @param value The value to add/reduce
345
* @throws Exception if add operation fails
346
*/
347
public void add(V value) throws Exception;
348
349
/**
350
* Clears the reducing state.
351
* The clear operation is logged to the changelog.
352
*/
353
public void clear();
354
355
/**
356
* Static factory method for creating changelog reducing state instances.
357
* Used internally by the state backend factory system.
358
*
359
* @param reducingState The underlying reducing state to wrap
360
* @return Changelog-enabled reducing state instance
361
*/
362
static <K, N, SV, S extends State, IS extends S> IS create(InternalKvState<K, N, SV> reducingState);
363
}
364
```
365
366
**Usage Example:**
367
368
```java
369
import org.apache.flink.api.common.state.ReducingState;
370
import org.apache.flink.api.common.state.ReducingStateDescriptor;
371
import org.apache.flink.api.common.functions.ReduceFunction;
372
373
// In a Flink operator
374
ReducingStateDescriptor<Long> descriptor = new ReducingStateDescriptor<>(
375
"sum-state",
376
new ReduceFunction<Long>() {
377
@Override
378
public Long reduce(Long value1, Long value2) throws Exception {
379
return value1 + value2;
380
}
381
},
382
Long.class
383
);
384
ReducingState<Long> reducingState = getRuntimeContext().getReducingState(descriptor);
385
386
// All operations are automatically logged to changelog
387
reducingState.add(10L);
388
reducingState.add(20L);
389
Long sum = reducingState.get(); // Returns 30L
390
```
391
392
### Aggregating State Operations
393
394
Aggregating state maintains an accumulator that is updated using aggregation functions, with changelog logging for all aggregations.
395
396
```java { .api }
397
/**
398
* Changelog-enabled aggregating state that logs all aggregation operations.
399
* Wraps Flink's InternalAggregatingState with transparent logging.
400
*/
401
class ChangelogAggregatingState<K, N, IN, ACC, OUT> extends AbstractChangelogState<K, N, ACC, InternalAggregatingState<K, N, IN, ACC, OUT>>
402
implements InternalAggregatingState<K, N, IN, ACC, OUT> {
403
404
/**
405
* Gets the current aggregated result.
406
*
407
* @return The current aggregated output value
408
* @throws Exception if value retrieval fails
409
*/
410
public OUT get() throws Exception;
411
412
/**
413
* Adds an input value to the aggregating state.
414
* The aggregation is performed and the state change is logged to the changelog.
415
*
416
* @param value The input value to aggregate
417
* @throws Exception if add operation fails
418
*/
419
public void add(IN value) throws Exception;
420
421
/**
422
* Clears the aggregating state.
423
* The clear operation is logged to the changelog.
424
*/
425
public void clear();
426
427
/**
428
* Static factory method for creating changelog aggregating state instances.
429
* Used internally by the state backend factory system.
430
*
431
* @param aggregatingState The underlying aggregating state to wrap
432
* @return Changelog-enabled aggregating state instance
433
*/
434
static <T, K, N, SV, S extends State, IS extends S> IS create(InternalKvState<K, N, SV> aggregatingState);
435
}
436
```
437
438
**Usage Example:**
439
440
```java
441
import org.apache.flink.api.common.state.AggregatingState;
442
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
443
import org.apache.flink.api.common.functions.AggregateFunction;
444
445
// In a Flink operator
446
AggregatingStateDescriptor<Double, Tuple2<Double, Long>, Double> descriptor =
447
new AggregatingStateDescriptor<>(
448
"average-state",
449
new AggregateFunction<Double, Tuple2<Double, Long>, Double>() {
450
@Override
451
public Tuple2<Double, Long> createAccumulator() {
452
return Tuple2.of(0.0, 0L);
453
}
454
455
@Override
456
public Tuple2<Double, Long> add(Double value, Tuple2<Double, Long> accumulator) {
457
return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);
458
}
459
460
@Override
461
public Double getResult(Tuple2<Double, Long> accumulator) {
462
return accumulator.f1 == 0 ? 0.0 : accumulator.f0 / accumulator.f1;
463
}
464
465
@Override
466
public Tuple2<Double, Long> merge(Tuple2<Double, Long> a, Tuple2<Double, Long> b) {
467
return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
468
}
469
},
470
Types.TUPLE(Types.DOUBLE, Types.LONG)
471
);
472
473
AggregatingState<Double, Double> aggregatingState = getRuntimeContext().getAggregatingState(descriptor);
474
475
// All operations are automatically logged to changelog
476
aggregatingState.add(10.0);
477
aggregatingState.add(20.0);
478
aggregatingState.add(30.0);
479
Double average = aggregatingState.get(); // Returns 20.0
480
```
481
482
### Priority Queue Operations
483
484
Priority queue support for timers and ordered event processing with changelog logging.
485
486
```java { .api }
487
/**
488
* Changelog-enabled priority queue that logs all queue operations.
489
* Wraps Flink's KeyGroupedInternalPriorityQueue with transparent logging.
490
*/
491
class ChangelogKeyGroupedPriorityQueue<T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
492
implements KeyGroupedInternalPriorityQueue<T> {
493
494
/**
495
* Adds an element to the priority queue.
496
* The addition is logged to the changelog.
497
*
498
* @param element The element to add
499
* @return true if the element was added successfully
500
*/
501
public boolean add(T element);
502
503
/**
504
* Removes and returns the element with the highest priority.
505
* The removal is logged to the changelog.
506
*
507
* @return The highest priority element, or null if empty
508
*/
509
public T poll();
510
511
/**
512
* Returns the element with the highest priority without removing it.
513
*
514
* @return The highest priority element, or null if empty
515
*/
516
public T peek();
517
518
/**
519
* Removes the specified element from the queue.
520
* The removal is logged to the changelog.
521
*
522
* @param element The element to remove
523
* @return true if the element was removed
524
*/
525
public boolean remove(T element);
526
527
/**
528
* Checks if the queue is empty.
529
*
530
* @return true if the queue contains no elements
531
*/
532
public boolean isEmpty();
533
534
/**
535
* Gets the number of elements in the queue.
536
*
537
* @return The number of elements
538
*/
539
public int size();
540
541
/**
542
* Adds all elements from the provided collection to the queue.
543
* The additions are logged to the changelog.
544
*
545
* @param toAdd Collection of elements to add (can be null)
546
*/
547
public void addAll(@Nullable Collection<? extends T> toAdd);
548
549
/**
550
* Gets a subset of elements for a specific key group.
551
* Used for key group-based partitioning.
552
*
553
* @param keyGroupId The key group identifier
554
* @return Set of elements belonging to the key group
555
*/
556
public Set<T> getSubsetForKeyGroup(int keyGroupId);
557
558
/**
559
* Gets an iterator over all elements in the queue.
560
*
561
* @return CloseableIterator for traversing queue elements
562
*/
563
public CloseableIterator<T> iterator();
564
}
565
```
566
567
## Abstract State Base Class
568
569
All changelog state types extend a common abstract base class that provides shared functionality.
570
571
```java { .api }
572
/**
573
* Base class for all changelog state wrappers.
574
* Provides common functionality for delegation and serialization.
575
*/
576
abstract class AbstractChangelogState<K, N, V, S extends InternalKvState<K, N, V>>
577
implements InternalKvState<K, N, V> {
578
579
/**
580
* Gets the underlying delegated state instance.
581
*
582
* @return The wrapped state object
583
*/
584
public S getDelegatedState();
585
586
/**
587
* Gets the key serializer.
588
*
589
* @return TypeSerializer for key type K
590
*/
591
public TypeSerializer<K> getKeySerializer();
592
593
/**
594
* Gets the namespace serializer.
595
*
596
* @return TypeSerializer for namespace type N
597
*/
598
public TypeSerializer<N> getNamespaceSerializer();
599
600
/**
601
* Gets the value serializer.
602
*
603
* @return TypeSerializer for value type V
604
*/
605
public TypeSerializer<V> getValueSerializer();
606
607
/**
608
* Sets the current namespace for state operations.
609
*
610
* @param namespace The namespace to set as current
611
*/
612
public void setCurrentNamespace(N namespace);
613
614
/**
615
* Gets the serialized value for the given serialized key and namespace.
616
*
617
* @param serializedKeyAndNamespace Serialized key and namespace
618
* @param safeKeySerializer Safe key serializer
619
* @param safeNamespaceSerializer Safe namespace serializer
620
* @param safeValueSerializer Safe value serializer
621
* @return Serialized value bytes
622
* @throws Exception if serialization fails
623
*/
624
public byte[] getSerializedValue(
625
byte[] serializedKeyAndNamespace,
626
TypeSerializer<K> safeKeySerializer,
627
TypeSerializer<N> safeNamespaceSerializer,
628
TypeSerializer<V> safeValueSerializer
629
) throws Exception;
630
}
631
```
632
633
## State Factory System
634
635
The changelog state backend uses a factory system to create appropriate state wrappers for each state type.
636
637
```java { .api }
638
// Internal factory interface for creating changelog states
639
interface StateFactory {
640
<K, N, SV, S extends State, IS extends InternalKvState<K, N, SV>> IS create(
641
IS originalState,
642
StateDescriptor<S, ?> stateDescriptor
643
);
644
}
645
646
// Factory mapping for different state types
647
private static final Map<StateDescriptor.Type, StateFactory> STATE_FACTORIES = Map.of(
648
StateDescriptor.Type.VALUE, ChangelogValueState::create,
649
StateDescriptor.Type.LIST, ChangelogListState::create,
650
StateDescriptor.Type.REDUCING, ChangelogReducingState::create,
651
StateDescriptor.Type.AGGREGATING, ChangelogAggregatingState::create,
652
StateDescriptor.Type.MAP, ChangelogMapState::create
653
);
654
```
655
656
## Changelog Integration
657
658
All state operations are transparently logged to the changelog:
659
660
- **Automatic Logging**: State mutations are logged without requiring code changes
661
- **Operation Granularity**: Individual operations (put, add, remove, etc.) are logged separately
662
- **Type Safety**: Changelog entries maintain type information for proper deserialization
663
- **Performance**: Minimal overhead for logging operations
664
- **Consistency**: Changelog entries are consistent with state backend state