0
# Flink Queryable State Runtime
1
2
Apache Flink's queryable state runtime provides server-side components that enable external applications to query the state of running Flink streaming jobs in real-time. This library implements the infrastructure needed to expose keyed state from Flink jobs to external clients without interrupting job execution.
3
4
## Package Information
5
6
- **Package Name**: flink-queryable-state-runtime_2.11
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Group ID**: org.apache.flink
10
- **Artifact ID**: flink-queryable-state-runtime_2.11
11
- **Version**: 1.13.6
12
- **Installation**: Add to Maven dependencies or include in Flink installation
13
14
## Core Imports
15
16
```java
17
import org.apache.flink.queryablestate.server.KvStateServerImpl;
18
import org.apache.flink.queryablestate.server.KvStateServerHandler;
19
import org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl;
20
import org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler;
21
import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
22
import org.apache.flink.queryablestate.network.AbstractServerHandler;
23
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
24
import org.apache.flink.runtime.query.KvStateRegistry;
25
import org.apache.flink.runtime.query.KvStateServer;
26
import org.apache.flink.runtime.query.KvStateClientProxy;
27
```
28
29
## Basic Usage
30
31
```java
32
import org.apache.flink.queryablestate.server.KvStateServerImpl;
33
import org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl;
34
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
35
import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
36
import org.apache.flink.runtime.query.KvStateRegistry;
37
import java.util.Collections;
38
import java.util.Arrays;
39
40
// Create and start a queryable state server
41
KvStateServerImpl stateServer = new KvStateServerImpl(
42
"localhost", // bind address
43
Collections.singleton(9069).iterator(), // port range
44
1, // event loop threads
45
1, // query threads
46
kvStateRegistry, // state registry
47
new DisabledKvStateRequestStats() // stats collector
48
);
49
50
stateServer.start();
51
52
// Create and start a client proxy
53
KvStateClientProxyImpl clientProxy = new KvStateClientProxyImpl(
54
"localhost", // bind address
55
Collections.singleton(9068).iterator(), // port range
56
1, // event loop threads
57
1, // query threads
58
new DisabledKvStateRequestStats() // stats collector
59
);
60
61
clientProxy.start();
62
```
63
64
## Architecture
65
66
The queryable state runtime implements a two-tier client-server architecture:
67
68
1. **Client Proxy Layer** (`KvStateClientProxyImpl`) - Receives external client requests, handles state location lookups, and routes requests to appropriate state servers
69
2. **State Server Layer** (`KvStateServerImpl`) - Handles internal requests from proxies, queries the actual state backends, and returns serialized state values
70
71
The message flow follows this pattern:
72
External Client → Client Proxy → State Server → State Backend → Response back through chain
73
74
## Capabilities
75
76
### State Server Implementation
77
78
The core server component that handles queryable state requests from clients.
79
80
```java { .api }
81
public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest, KvStateResponse>
82
implements KvStateServer {
83
84
public KvStateServerImpl(
85
String bindAddress,
86
Iterator<Integer> bindPortIterator,
87
Integer numEventLoopThreads,
88
Integer numQueryThreads,
89
KvStateRegistry kvStateRegistry,
90
KvStateRequestStats stats
91
);
92
93
public void start() throws Throwable;
94
public InetSocketAddress getServerAddress();
95
public void shutdown();
96
public MessageSerializer<KvStateInternalRequest, KvStateResponse> getSerializer();
97
public AbstractServerHandler<KvStateInternalRequest, KvStateResponse> initializeHandler();
98
}
99
```
100
101
**Usage Example:**
102
```java
103
// Initialize server with required dependencies
104
KvStateServerImpl server = new KvStateServerImpl(
105
"0.0.0.0", // Listen on all interfaces
106
Arrays.asList(9069, 9070, 9071).iterator(), // Try ports in sequence
107
4, // Event loop threads for network I/O
108
8, // Query threads for state access
109
taskManagerKvStateRegistry, // Registry containing state references
110
metricsCollector // Statistics collector
111
);
112
113
// Start the server
114
server.start();
115
InetSocketAddress address = server.getServerAddress();
116
System.out.println("State server listening on " + address);
117
118
// Shutdown when done
119
server.shutdown();
120
```
121
122
### State Server Request Handler
123
124
Processes individual state requests and queries the state backend.
125
126
```java { .api }
127
public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalRequest, KvStateResponse> {
128
129
public KvStateServerHandler(
130
KvStateServerImpl server,
131
KvStateRegistry kvStateRegistry,
132
MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer,
133
KvStateRequestStats stats
134
);
135
136
public CompletableFuture<KvStateResponse> handleRequest(long requestId, KvStateInternalRequest request);
137
public CompletableFuture<Void> shutdown();
138
}
139
```
140
141
### Client Proxy Implementation
142
143
The proxy server that receives external client requests and forwards them to appropriate state servers.
144
145
```java { .api }
146
public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, KvStateResponse>
147
implements KvStateClientProxy {
148
149
public KvStateClientProxyImpl(
150
String bindAddress,
151
Iterator<Integer> bindPortIterator,
152
Integer numEventLoopThreads,
153
Integer numQueryThreads,
154
KvStateRequestStats stats
155
);
156
157
public void start() throws Throwable;
158
public InetSocketAddress getServerAddress();
159
public void shutdown();
160
public void updateKvStateLocationOracle(JobID jobId, KvStateLocationOracle kvStateLocationOracle);
161
public KvStateLocationOracle getKvStateLocationOracle(JobID jobId);
162
public AbstractServerHandler<KvStateRequest, KvStateResponse> initializeHandler();
163
}
164
```
165
166
**Usage Example:**
167
```java
168
// Initialize client proxy
169
KvStateClientProxyImpl proxy = new KvStateClientProxyImpl(
170
"0.0.0.0", // Listen address
171
Arrays.asList(9068, 9067).iterator(), // Port range
172
2, // Event loop threads
173
4, // Query executor threads
174
statsCollector // Request statistics
175
);
176
177
// Start proxy server
178
proxy.start();
179
180
// Register location oracle for a job
181
JobID jobId = JobID.fromHexString("1234567890abcdef");
182
proxy.updateKvStateLocationOracle(jobId, jobLocationOracle);
183
184
// Remove oracle when job finishes
185
proxy.updateKvStateLocationOracle(jobId, null);
186
187
proxy.shutdown();
188
```
189
190
### Client Proxy Request Handler
191
192
Handles external client requests, performs state location lookups, and forwards requests to state servers.
193
194
```java { .api }
195
public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequest, KvStateResponse> {
196
197
public KvStateClientProxyHandler(
198
KvStateClientProxyImpl proxy,
199
int queryExecutorThreads,
200
MessageSerializer<KvStateRequest, KvStateResponse> serializer,
201
KvStateRequestStats stats
202
);
203
204
public CompletableFuture<KvStateResponse> handleRequest(long requestId, KvStateRequest request);
205
public CompletableFuture<Void> shutdown();
206
}
207
```
208
209
### Internal Request Messages
210
211
Message format used for communication between client proxy and state server.
212
213
```java { .api }
214
public class KvStateInternalRequest extends MessageBody {
215
216
public KvStateInternalRequest(KvStateID stateId, byte[] serializedKeyAndNamespace);
217
218
public KvStateID getKvStateId();
219
public byte[] getSerializedKeyAndNamespace();
220
public byte[] serialize();
221
222
// Contains inner static class KvStateInternalRequestDeserializer
223
}
224
```
225
226
**Usage Example:**
227
```java
228
// Create internal request for forwarding to state server
229
KvStateID stateId = new KvStateID(123L, 456L);
230
byte[] keyAndNamespace = serializeKeyAndNamespace(key, namespace);
231
232
KvStateInternalRequest internalRequest = new KvStateInternalRequest(
233
stateId,
234
keyAndNamespace
235
);
236
237
// Request can be serialized for network transmission
238
byte[] serialized = internalRequest.serialize();
239
```
240
241
### Request Message Deserialization
242
243
Deserializer for internal request messages received over the network. This is an inner static class of KvStateInternalRequest.
244
245
```java { .api }
246
// Inner static class of KvStateInternalRequest
247
public static class KvStateInternalRequest.KvStateInternalRequestDeserializer
248
implements MessageDeserializer<KvStateInternalRequest> {
249
250
public KvStateInternalRequest deserializeMessage(ByteBuf buf);
251
}
252
```
253
254
## Types
255
256
### Core Interfaces (from Flink Runtime)
257
258
```java { .api }
259
// Server interface implemented by KvStateServerImpl
260
interface KvStateServer {
261
void start() throws Throwable;
262
InetSocketAddress getServerAddress();
263
void shutdown();
264
}
265
266
// Client proxy interface implemented by KvStateClientProxyImpl
267
interface KvStateClientProxy {
268
void start() throws Throwable;
269
InetSocketAddress getServerAddress();
270
void shutdown();
271
void updateKvStateLocationOracle(JobID jobId, KvStateLocationOracle oracle);
272
KvStateLocationOracle getKvStateLocationOracle(JobID jobId);
273
}
274
```
275
276
### Dependency Types (from Flink Core/Runtime)
277
278
```java { .api }
279
// State registry containing references to queryable state instances
280
interface KvStateRegistry {
281
KvStateEntry<?, ?, ?> getKvState(KvStateID stateId);
282
}
283
284
// Statistics collector for monitoring request performance
285
interface KvStateRequestStats {
286
void reportRequest();
287
void reportSuccessfulRequest();
288
void reportFailedRequest();
289
}
290
291
// Unique identifier for state instances
292
class KvStateID {
293
public KvStateID(long lowerPart, long upperPart);
294
public long getLowerPart();
295
public long getUpperPart();
296
}
297
298
// Provides location information for queryable state
299
interface KvStateLocationOracle {
300
CompletableFuture<KvStateLocation> requestKvStateLocation(JobID jobId, String stateName);
301
}
302
303
// Job identifier
304
class JobID {
305
public static JobID fromHexString(String hexString);
306
}
307
```
308
309
## Error Handling
310
311
The queryable state runtime can throw several types of exceptions:
312
313
- **UnknownKvStateIdException** - When requested state ID is not found in registry
314
- **UnknownKeyOrNamespaceException** - When requested key/namespace combination doesn't exist
315
- **UnknownKvStateKeyGroupLocationException** - When key group location cannot be determined
316
- **UnknownLocationException** - When state location oracle is unavailable
317
- **FlinkJobNotFoundException** - When referenced job ID is not found
318
319
These exceptions are typically wrapped in CompletableFuture responses and should be handled by client applications.
320
321
## Threading Model
322
323
Both the state server and client proxy use a multi-threaded architecture:
324
325
- **Event Loop Threads** - Handle network I/O operations (Netty event loops)
326
- **Query Threads** - Process actual state queries and location lookups
327
- **Async Processing** - All request handling returns CompletableFuture for non-blocking operations
328
329
This design ensures that network operations don't block state access and vice versa.
330
331
## Integration with Flink
332
333
This runtime library integrates with the broader Flink ecosystem:
334
335
- **State Backends** - Queries state from configured Flink state backends (RocksDB, HashMap, etc.)
336
- **Job Manager** - Coordinates with JobManager for state location information
337
- **Task Managers** - Runs on TaskManager nodes to provide direct state access
338
- **Checkpointing** - Works with Flink's checkpointing mechanism for consistent state snapshots