0
# State Reading
1
2
State reading functionality allows you to extract and process state data from existing savepoints. The API supports reading different types of Flink state including operator state (list, union, broadcast) and keyed state.
3
4
## Operator State Reading
5
6
### List State
7
8
Read operator list state as individual elements.
9
10
```java { .api }
11
public <T> DataSource<T> readListState(
12
String uid,
13
String name,
14
TypeInformation<T> typeInfo
15
) throws IOException;
16
17
public <T> DataSource<T> readListState(
18
String uid,
19
String name,
20
TypeInformation<T> typeInfo,
21
TypeSerializer<T> serializer
22
) throws IOException;
23
```
24
25
**Usage Example:**
26
27
```java
28
// Read list state with type information
29
DataSource<String> messages = savepoint.readListState(
30
"kafka-source",
31
"buffered-messages",
32
Types.STRING
33
);
34
35
// Read with custom serializer
36
TypeSerializer<MyCustomType> customSerializer = new MyCustomTypeSerializer();
37
DataSource<MyCustomType> customData = savepoint.readListState(
38
"custom-operator",
39
"custom-state",
40
TypeInformation.of(MyCustomType.class),
41
customSerializer
42
);
43
44
// Process the data
45
messages.map(msg -> "Processed: " + msg).print();
46
```
47
48
### Union State
49
50
Read operator union state which is similar to list state but with different semantics for redistribution.
51
52
```java { .api }
53
public <T> DataSource<T> readUnionState(
54
String uid,
55
String name,
56
TypeInformation<T> typeInfo
57
) throws IOException;
58
59
public <T> DataSource<T> readUnionState(
60
String uid,
61
String name,
62
TypeInformation<T> typeInfo,
63
TypeSerializer<T> serializer
64
) throws IOException;
65
```
66
67
**Usage Example:**
68
69
```java
70
// Read union state
71
DataSource<Configuration> configs = savepoint.readUnionState(
72
"config-broadcaster",
73
"broadcast-config",
74
TypeInformation.of(Configuration.class)
75
);
76
77
configs.map(config -> processConfiguration(config)).collect();
78
```
79
80
### Broadcast State
81
82
Read broadcast state as key-value pairs.
83
84
```java { .api }
85
public <K, V> DataSource<Tuple2<K, V>> readBroadcastState(
86
String uid,
87
String name,
88
TypeInformation<K> keyTypeInfo,
89
TypeInformation<V> valueTypeInfo
90
) throws IOException;
91
92
public <K, V> DataSource<Tuple2<K, V>> readBroadcastState(
93
String uid,
94
String name,
95
TypeInformation<K> keyTypeInfo,
96
TypeInformation<V> valueTypeInfo,
97
TypeSerializer<K> keySerializer,
98
TypeSerializer<V> valueSerializer
99
) throws IOException;
100
```
101
102
**Usage Example:**
103
104
```java
105
// Read broadcast state as key-value pairs
106
DataSource<Tuple2<String, Rule>> rules = savepoint.readBroadcastState(
107
"rule-processor",
108
"rules-broadcast-state",
109
Types.STRING,
110
TypeInformation.of(Rule.class)
111
);
112
113
// Process the rules
114
rules.map(tuple -> "Rule " + tuple.f0 + ": " + tuple.f1.getDescription())
115
.print();
116
```
117
118
## Keyed State Reading
119
120
Read keyed state using custom reader functions that process each key individually.
121
122
```java { .api }
123
public <K, OUT> DataSource<OUT> readKeyedState(
124
String uid,
125
KeyedStateReaderFunction<K, OUT> function
126
) throws IOException;
127
128
public <K, OUT> DataSource<OUT> readKeyedState(
129
String uid,
130
KeyedStateReaderFunction<K, OUT> function,
131
TypeInformation<K> keyTypeInfo,
132
TypeInformation<OUT> outTypeInfo
133
) throws IOException;
134
```
135
136
### KeyedStateReaderFunction Interface
137
138
```java { .api }
139
public abstract class KeyedStateReaderFunction<K, OUT> extends AbstractRichFunction {
140
public abstract void open(Configuration parameters) throws Exception;
141
142
public abstract void readKey(
143
K key,
144
Context ctx,
145
Collector<OUT> out
146
) throws Exception;
147
148
public interface Context {
149
Set<Long> registeredEventTimeTimers() throws Exception;
150
Set<Long> registeredProcessingTimeTimers() throws Exception;
151
}
152
}
153
```
154
155
**Usage Example:**
156
157
```java
158
public class MyKeyedStateReader extends KeyedStateReaderFunction<String, UserStats> {
159
private ValueState<Long> countState;
160
private ValueState<Double> avgState;
161
162
@Override
163
public void open(Configuration parameters) throws Exception {
164
// Register state descriptors in open()
165
ValueStateDescriptor<Long> countDesc = new ValueStateDescriptor<>(
166
"count", Long.class
167
);
168
countState = getRuntimeContext().getState(countDesc);
169
170
ValueStateDescriptor<Double> avgDesc = new ValueStateDescriptor<>(
171
"average", Double.class
172
);
173
avgState = getRuntimeContext().getState(avgDesc);
174
}
175
176
@Override
177
public void readKey(String key, Context ctx, Collector<UserStats> out) throws Exception {
178
Long count = countState.value();
179
Double average = avgState.value();
180
181
if (count != null && average != null) {
182
UserStats stats = new UserStats(key, count, average);
183
out.collect(stats);
184
}
185
186
// Access timer information if needed
187
Set<Long> eventTimers = ctx.registeredEventTimeTimers();
188
Set<Long> processingTimers = ctx.registeredProcessingTimeTimers();
189
}
190
}
191
192
// Use the reader function
193
DataSource<UserStats> userStats = savepoint.readKeyedState(
194
"user-processor",
195
new MyKeyedStateReader()
196
);
197
198
userStats.print();
199
```
200
201
### Reading Different Keyed State Types
202
203
**Value State:**
204
205
```java
206
public class ValueStateReader extends KeyedStateReaderFunction<String, Tuple2<String, String>> {
207
private ValueState<String> valueState;
208
209
@Override
210
public void open(Configuration parameters) throws Exception {
211
ValueStateDescriptor<String> desc = new ValueStateDescriptor<>("value", String.class);
212
valueState = getRuntimeContext().getState(desc);
213
}
214
215
@Override
216
public void readKey(String key, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
217
String value = valueState.value();
218
if (value != null) {
219
out.collect(Tuple2.of(key, value));
220
}
221
}
222
}
223
```
224
225
**List State:**
226
227
```java
228
public class ListStateReader extends KeyedStateReaderFunction<String, Tuple2<String, List<String>>> {
229
private ListState<String> listState;
230
231
@Override
232
public void open(Configuration parameters) throws Exception {
233
ListStateDescriptor<String> desc = new ListStateDescriptor<>("list", String.class);
234
listState = getRuntimeContext().getListState(desc);
235
}
236
237
@Override
238
public void readKey(String key, Context ctx, Collector<Tuple2<String, List<String>>> out) throws Exception {
239
List<String> items = new ArrayList<>();
240
for (String item : listState.get()) {
241
items.add(item);
242
}
243
if (!items.isEmpty()) {
244
out.collect(Tuple2.of(key, items));
245
}
246
}
247
}
248
```
249
250
**Map State:**
251
252
```java
253
public class MapStateReader extends KeyedStateReaderFunction<String, Tuple3<String, String, Integer>> {
254
private MapState<String, Integer> mapState;
255
256
@Override
257
public void open(Configuration parameters) throws Exception {
258
MapStateDescriptor<String, Integer> desc = new MapStateDescriptor<>(
259
"map", String.class, Integer.class
260
);
261
mapState = getRuntimeContext().getMapState(desc);
262
}
263
264
@Override
265
public void readKey(String key, Context ctx, Collector<Tuple3<String, String, Integer>> out) throws Exception {
266
for (Map.Entry<String, Integer> entry : mapState.entries()) {
267
out.collect(Tuple3.of(key, entry.getKey(), entry.getValue()));
268
}
269
}
270
}
271
```
272
273
## Advanced Reading Patterns
274
275
### Filtering State Data
276
277
```java
278
public class FilteringStateReader extends KeyedStateReaderFunction<String, FilteredData> {
279
private ValueState<MyData> dataState;
280
private final Predicate<MyData> filter;
281
282
public FilteringStateReader(Predicate<MyData> filter) {
283
this.filter = filter;
284
}
285
286
@Override
287
public void open(Configuration parameters) throws Exception {
288
ValueStateDescriptor<MyData> desc = new ValueStateDescriptor<>("data", MyData.class);
289
dataState = getRuntimeContext().getState(desc);
290
}
291
292
@Override
293
public void readKey(String key, Context ctx, Collector<FilteredData> out) throws Exception {
294
MyData data = dataState.value();
295
if (data != null && filter.test(data)) {
296
out.collect(new FilteredData(key, data));
297
}
298
}
299
}
300
301
// Usage
302
DataSource<FilteredData> filtered = savepoint.readKeyedState(
303
"data-processor",
304
new FilteringStateReader(data -> data.getScore() > 0.8)
305
);
306
```
307
308
### Aggregating State Across Keys
309
310
```java
311
public class AggregatingStateReader extends KeyedStateReaderFunction<String, KeyAggregate> {
312
private ValueState<Double> valueState;
313
private ListState<String> tagState;
314
315
@Override
316
public void open(Configuration parameters) throws Exception {
317
valueState = getRuntimeContext().getState(
318
new ValueStateDescriptor<>("value", Double.class)
319
);
320
tagState = getRuntimeContext().getListState(
321
new ListStateDescriptor<>("tags", String.class)
322
);
323
}
324
325
@Override
326
public void readKey(String key, Context ctx, Collector<KeyAggregate> out) throws Exception {
327
Double value = valueState.value();
328
List<String> tags = new ArrayList<>();
329
tagState.get().forEach(tags::add);
330
331
if (value != null) {
332
KeyAggregate aggregate = new KeyAggregate(key, value, tags);
333
out.collect(aggregate);
334
}
335
}
336
}
337
```
338
339
## Error Handling
340
341
### Common Reading Errors
342
343
```java
344
try {
345
DataSource<MyData> data = savepoint.readListState(
346
"operator-uid",
347
"state-name",
348
TypeInformation.of(MyData.class)
349
);
350
351
data.collect();
352
353
} catch (IOException e) {
354
if (e.getMessage().contains("does not exist")) {
355
System.err.println("Operator UID not found in savepoint");
356
} else if (e.getMessage().contains("state")) {
357
System.err.println("State descriptor not found");
358
} else {
359
System.err.println("Failed to read state: " + e.getMessage());
360
}
361
}
362
```
363
364
### State Descriptor Registration Errors
365
366
```java
367
public class SafeStateReader extends KeyedStateReaderFunction<String, MyOutput> {
368
private ValueState<String> state;
369
370
@Override
371
public void open(Configuration parameters) throws Exception {
372
try {
373
ValueStateDescriptor<String> desc = new ValueStateDescriptor<>("myState", String.class);
374
state = getRuntimeContext().getState(desc);
375
} catch (Exception e) {
376
throw new RuntimeException("Failed to register state descriptor", e);
377
}
378
}
379
380
@Override
381
public void readKey(String key, Context ctx, Collector<MyOutput> out) throws Exception {
382
try {
383
String value = state.value();
384
if (value != null) {
385
out.collect(new MyOutput(key, value));
386
}
387
} catch (Exception e) {
388
System.err.println("Failed to read state for key " + key + ": " + e.getMessage());
389
// Could collect error record or skip
390
}
391
}
392
}
393
```
394
395
## Performance Considerations
396
397
### Parallelism Settings
398
399
```java
400
// Set appropriate parallelism for reading operations
401
env.setParallelism(8);
402
403
DataSource<MyData> data = savepoint.readKeyedState("operator", readerFunction);
404
405
// Can override parallelism for specific operations
406
data.setParallelism(4).map(processData).print();
407
```
408
409
### Memory Management
410
411
```java
412
// For large state, consider processing in batches
413
public class BatchingStateReader extends KeyedStateReaderFunction<String, List<MyData>> {
414
private ListState<MyData> listState;
415
416
@Override
417
public void readKey(String key, Context ctx, Collector<List<MyData>> out) throws Exception {
418
List<MyData> batch = new ArrayList<>();
419
int batchSize = 0;
420
421
for (MyData item : listState.get()) {
422
batch.add(item);
423
batchSize++;
424
425
// Emit batch when it reaches size limit
426
if (batchSize >= 1000) {
427
out.collect(new ArrayList<>(batch));
428
batch.clear();
429
batchSize = 0;
430
}
431
}
432
433
// Emit remaining items
434
if (!batch.isEmpty()) {
435
out.collect(batch);
436
}
437
}
438
}
439
```