0
# State Querying
1
2
Asynchronous state querying capabilities for accessing different types of keyed state from running Flink jobs. All state queries return immutable, read-only state objects.
3
4
## Capabilities
5
6
### Basic State Query
7
8
Query keyed state using TypeInformation for type safety.
9
10
```java { .api }
11
public <K, S extends State, V> CompletableFuture<S> getKvState(
12
JobID jobId,
13
String queryableStateName,
14
K key,
15
TypeInformation<K> keyTypeInfo,
16
StateDescriptor<S, V> stateDescriptor
17
);
18
```
19
20
**Type Parameters:**
21
- `K` - Type of the key
22
- `S` - Type of the state (extends State)
23
- `V` - Type of the value stored in the state
24
25
**Parameters:**
26
- `jobId` - JobID of the job containing the queryable state
27
- `queryableStateName` - Name under which the state is queryable (set via StateDescriptor.setQueryable())
28
- `key` - The key to query for
29
- `keyTypeInfo` - TypeInformation for the key type
30
- `stateDescriptor` - StateDescriptor matching the state configuration in the Flink job
31
32
**Returns:**
33
- `CompletableFuture<S>` - Future containing an immutable state object
34
35
**Usage Example:**
36
```java
37
import org.apache.flink.api.common.state.ValueStateDescriptor;
38
import org.apache.flink.api.common.typeinfo.TypeInformation;
39
40
QueryableStateClient client = new QueryableStateClient("localhost", 9069);
41
42
JobID jobId = JobID.fromHexString("a1b2c3d4e5f6789012345678901234567890abcd");
43
String stateName = "userProfiles";
44
String userId = "user123";
45
46
ValueStateDescriptor<UserProfile> descriptor =
47
new ValueStateDescriptor<>("userProfiles", UserProfile.class);
48
49
CompletableFuture<ValueState<UserProfile>> future = client.getKvState(
50
jobId,
51
stateName,
52
userId,
53
TypeInformation.of(String.class),
54
descriptor
55
);
56
57
future.thenAccept(state -> {
58
UserProfile profile = state.value();
59
System.out.println("User profile: " + profile);
60
});
61
```
62
63
### Type Hint-Based Query
64
65
Query keyed state using TypeHint for generic type inference.
66
67
```java { .api }
68
public <K, S extends State, V> CompletableFuture<S> getKvState(
69
JobID jobId,
70
String queryableStateName,
71
K key,
72
TypeHint<K> keyTypeHint,
73
StateDescriptor<S, V> stateDescriptor
74
);
75
```
76
77
**Parameters:**
78
- Same as basic query, but uses `TypeHint<K>` instead of `TypeInformation<K>`
79
80
**Usage Example:**
81
```java
82
import org.apache.flink.api.common.typeinfo.TypeHint;
83
84
CompletableFuture<ValueState<String>> future = client.getKvState(
85
jobId,
86
stateName,
87
"myKey",
88
new TypeHint<String>() {}, // Type hint for String key
89
new ValueStateDescriptor<>("state", String.class)
90
);
91
```
92
93
### Supported State Types
94
95
The client supports querying all Flink state types:
96
97
#### ValueState
98
```java
99
ValueStateDescriptor<String> descriptor =
100
new ValueStateDescriptor<>("myValue", String.class);
101
CompletableFuture<ValueState<String>> future = client.getKvState(/*...*/);
102
```
103
104
#### ListState
105
```java
106
ListStateDescriptor<Integer> descriptor =
107
new ListStateDescriptor<>("myList", Integer.class);
108
CompletableFuture<ListState<Integer>> future = client.getKvState(/*...*/);
109
```
110
111
#### MapState
112
```java
113
MapStateDescriptor<String, Long> descriptor =
114
new MapStateDescriptor<>("myMap", String.class, Long.class);
115
CompletableFuture<MapState<String, Long>> future = client.getKvState(/*...*/);
116
```
117
118
#### ReducingState
119
```java
120
ReducingStateDescriptor<Integer> descriptor =
121
new ReducingStateDescriptor<>("myReducing", Integer::sum, Integer.class);
122
CompletableFuture<ReducingState<Integer>> future = client.getKvState(/*...*/);
123
```
124
125
#### AggregatingState
126
```java
127
AggregatingStateDescriptor<Integer, Long, Double> descriptor =
128
new AggregatingStateDescriptor<>("myAggregating", aggregateFunction, Long.class);
129
CompletableFuture<AggregatingState<Integer, Double>> future = client.getKvState(/*...*/);
130
```
131
132
## Query Process
133
134
The state querying process involves several steps:
135
136
1. **Key Serialization**: The key and namespace are serialized using the provided type serializers
137
2. **Location Resolution**: The client proxy contacts the JobManager to resolve the location of the key group
138
3. **State Request**: A KvStateRequest is sent to the appropriate TaskManager
139
4. **State Retrieval**: The TaskManager retrieves the serialized state data
140
5. **Response Processing**: The serialized state is deserialized into an immutable state wrapper
141
6. **Future Completion**: The CompletableFuture is completed with the immutable state object
142
143
## Error Handling
144
145
State queries can fail for various reasons. Common exceptions include:
146
147
- `UnknownKeyOrNamespaceException` - No state exists for the given key/namespace
148
- `UnknownKvStateIdException` - The state ID is not recognized
149
- `UnknownLocationException` - State location cannot be resolved
150
- `IOException` - Network or serialization errors
151
- `FlinkRuntimeException` - General runtime errors
152
153
**Error Handling Example:**
154
```java
155
future.whenComplete((state, throwable) -> {
156
if (throwable != null) {
157
if (throwable instanceof UnknownKeyOrNamespaceException) {
158
System.out.println("No state found for key: " + key);
159
} else if (throwable instanceof UnknownLocationException) {
160
System.err.println("Could not resolve state location");
161
} else {
162
System.err.println("Query failed: " + throwable.getMessage());
163
}
164
} else {
165
// Process successful result
166
processState(state);
167
}
168
});
169
```
170
171
## Performance Considerations
172
173
- **Location Caching**: Resolved state locations are cached to improve performance
174
- **Asynchronous Operations**: All queries are non-blocking and return CompletableFuture
175
- **Serialization Overhead**: Consider the cost of key and state serialization/deserialization
176
- **Network Communication**: Each query involves network communication with the Flink cluster
177
- **Connection Reuse**: The client maintains persistent connections to minimize connection overhead