0
# Exception Handling
1
2
Comprehensive exception handling for various failure scenarios in state querying operations. The queryable state client provides specific exception types to help identify and handle different error conditions.
3
4
## Capabilities
5
6
### Key/Namespace Not Found
7
8
Thrown when no state exists for the given key and namespace combination.
9
10
```java { .api }
11
public class UnknownKeyOrNamespaceException extends BadRequestException {
12
public UnknownKeyOrNamespaceException(String serverName);
13
}
14
```
15
16
**Cause**: The specified key does not exist in the queryable state, or the namespace is invalid.
17
18
**Usage Example:**
19
```java
20
client.getKvState(jobId, stateName, key, keyTypeInfo, descriptor)
21
.whenComplete((state, throwable) -> {
22
if (throwable instanceof UnknownKeyOrNamespaceException) {
23
System.out.println("No state found for key: " + key);
24
// Handle missing key case - maybe return default value
25
}
26
});
27
```
28
29
### State ID Not Found
30
31
Thrown when the requested state ID is not recognized by the server.
32
33
```java { .api }
34
public class UnknownKvStateIdException extends BadRequestException {
35
public UnknownKvStateIdException(String serverName, KvStateID kvStateId);
36
}
37
```
38
39
**Cause**: The state ID resolved from the job and state name is not valid or the state has been removed.
40
41
**Usage Example:**
42
```java
43
client.getKvState(jobId, stateName, key, keyTypeInfo, descriptor)
44
.whenComplete((state, throwable) -> {
45
if (throwable instanceof UnknownKvStateIdException) {
46
System.err.println("State ID not recognized - state may have been removed");
47
// Handle state removal case
48
}
49
});
50
```
51
52
### Location Resolution Failed
53
54
Thrown when the location of the requested state cannot be determined.
55
56
```java { .api }
57
public class UnknownLocationException extends FlinkException {
58
public UnknownLocationException(String msg);
59
}
60
```
61
62
**Cause**: The JobManager cannot resolve the location of the key group containing the requested key.
63
64
**Usage Example:**
65
```java
66
client.getKvState(jobId, stateName, key, keyTypeInfo, descriptor)
67
.whenComplete((state, throwable) -> {
68
if (throwable instanceof UnknownLocationException) {
69
System.err.println("Could not resolve state location: " + throwable.getMessage());
70
// Handle location resolution failure - maybe retry
71
}
72
});
73
```
74
75
### Key Group Location Not Found
76
77
Thrown when no location information is available for the key group.
78
79
```java { .api }
80
public class UnknownKvStateKeyGroupLocationException extends BadRequestException {
81
public UnknownKvStateKeyGroupLocationException(String serverName);
82
}
83
```
84
85
**Cause**: The system cannot determine which TaskManager holds the key group for the requested key.
86
87
**Usage Example:**
88
```java
89
client.getKvState(jobId, stateName, key, keyTypeInfo, descriptor)
90
.whenComplete((state, throwable) -> {
91
if (throwable instanceof UnknownKvStateKeyGroupLocationException) {
92
System.err.println("Key group location unknown");
93
// Handle key group location failure
94
}
95
});
96
```
97
98
### Base Exception Types
99
100
#### BadRequestException
101
102
Base class for request-related exceptions.
103
104
```java { .api }
105
public class BadRequestException extends Exception {
106
public BadRequestException(String serverName, String message);
107
}
108
```
109
110
**Properties:**
111
- Contains the server name where the error occurred
112
- Provides a descriptive error message
113
114
#### Request Failure
115
116
Protocol-level error information for debugging network issues.
117
118
```java { .api }
119
public class RequestFailure {
120
public RequestFailure(long requestId, Throwable cause);
121
public long getRequestId();
122
public Throwable getCause();
123
}
124
```
125
126
**Usage**: Internal to the network layer, typically wrapped in other exceptions.
127
128
## Exception Hierarchy
129
130
```
131
Exception
132
├── FlinkException
133
│ └── UnknownLocationException
134
└── BadRequestException
135
├── UnknownKeyOrNamespaceException
136
├── UnknownKvStateIdException
137
└── UnknownKvStateKeyGroupLocationException
138
```
139
140
## Common Error Handling Patterns
141
142
### Comprehensive Error Handling
143
144
```java
145
client.getKvState(jobId, stateName, key, keyTypeInfo, descriptor)
146
.whenComplete((state, throwable) -> {
147
if (throwable != null) {
148
if (throwable instanceof UnknownKeyOrNamespaceException) {
149
// Key doesn't exist - handle gracefully
150
handleMissingKey(key);
151
} else if (throwable instanceof UnknownKvStateIdException) {
152
// State removed or invalid - may need to refresh job info
153
handleInvalidState(stateName);
154
} else if (throwable instanceof UnknownLocationException ||
155
throwable instanceof UnknownKvStateKeyGroupLocationException) {
156
// Location resolution failed - consider retry
157
handleLocationError(throwable);
158
} else if (throwable instanceof IOException) {
159
// Network or serialization error
160
handleNetworkError(throwable);
161
} else {
162
// Other runtime errors
163
handleGeneralError(throwable);
164
}
165
} else {
166
// Success case
167
processState(state);
168
}
169
});
170
```
171
172
### Retry Logic with Exponential Backoff
173
174
```java
175
public CompletableFuture<ValueState<String>> queryWithRetry(
176
QueryableStateClient client,
177
JobID jobId,
178
String stateName,
179
String key,
180
int maxRetries) {
181
182
return queryWithRetryInternal(client, jobId, stateName, key, maxRetries, 0);
183
}
184
185
private CompletableFuture<ValueState<String>> queryWithRetryInternal(
186
QueryableStateClient client,
187
JobID jobId,
188
String stateName,
189
String key,
190
int maxRetries,
191
int attempt) {
192
193
return client.getKvState(
194
jobId, stateName, key,
195
TypeInformation.of(String.class),
196
new ValueStateDescriptor<>("state", String.class)
197
).handle((state, throwable) -> {
198
if (throwable != null && attempt < maxRetries) {
199
// Retry on location errors and network issues
200
if (throwable instanceof UnknownLocationException ||
201
throwable instanceof UnknownKvStateKeyGroupLocationException ||
202
throwable instanceof IOException) {
203
204
long delay = Math.min(1000 * (1L << attempt), 10000); // Max 10s delay
205
206
return CompletableFuture.delayedExecutor(delay, TimeUnit.MILLISECONDS)
207
.execute(() -> queryWithRetryInternal(client, jobId, stateName, key, maxRetries, attempt + 1));
208
}
209
}
210
211
if (throwable != null) {
212
throw new RuntimeException(throwable);
213
}
214
215
return CompletableFuture.completedFuture(state);
216
}).thenCompose(Function.identity());
217
}
218
```
219
220
### Default Value Handling
221
222
```java
223
public CompletableFuture<String> getValueWithDefault(
224
QueryableStateClient client,
225
JobID jobId,
226
String stateName,
227
String key,
228
String defaultValue) {
229
230
return client.getKvState(
231
jobId, stateName, key,
232
TypeInformation.of(String.class),
233
new ValueStateDescriptor<>("state", String.class)
234
).handle((state, throwable) -> {
235
if (throwable instanceof UnknownKeyOrNamespaceException) {
236
// Key doesn't exist, return default
237
return defaultValue;
238
} else if (throwable != null) {
239
// Other errors should be propagated
240
throw new RuntimeException("Query failed", throwable);
241
} else {
242
// Success - return actual value
243
return state.value();
244
}
245
});
246
}
247
```
248
249
## Debugging Tips
250
251
1. **Check Job Status**: Ensure the Flink job is running and the state is properly configured as queryable
252
2. **Verify State Names**: Confirm the queryable state name matches what's set in `StateDescriptor.setQueryable()`
253
3. **Network Connectivity**: Verify network connectivity between client and Flink cluster
254
4. **Serialization Issues**: Ensure proper serializers are configured in ExecutionConfig
255
5. **Key Group Distribution**: Check that the key maps to an existing key group in the job
256
257
## Error Recovery Strategies
258
259
- **Retry Logic**: Implement exponential backoff for transient failures
260
- **Circuit Breaker**: Prevent cascading failures in high-load scenarios
261
- **Fallback Values**: Provide default values for missing keys
262
- **State Verification**: Periodically verify state availability and configuration
263
- **Monitoring**: Log and monitor exception patterns to identify systemic issues