0
# Exception Handling
1
2
Specialized exception classes for RPC-specific error conditions, state management, and message handling in the Pekko RPC system.
3
4
## Capabilities
5
6
### RpcInvalidStateException
7
8
Exception indicating that an RPC endpoint or service is in an invalid state for the requested operation.
9
10
```java { .api }
11
/**
12
* Exception indicating invalid RPC state.
13
* Thrown when RPC operations are attempted on endpoints or services
14
* that are not in the appropriate state (e.g., stopped, terminated, or not started).
15
*/
16
public class RpcInvalidStateException extends FlinkRuntimeException {
17
18
/**
19
* Constructor with descriptive message.
20
* @param message Description of the invalid state condition
21
*/
22
public RpcInvalidStateException(String message);
23
24
/**
25
* Constructor with underlying cause.
26
* @param cause The underlying exception that led to the invalid state
27
*/
28
public RpcInvalidStateException(Throwable cause);
29
30
/**
31
* Constructor with both message and cause.
32
* @param message Description of the invalid state condition
33
* @param cause The underlying exception that led to the invalid state
34
*/
35
public RpcInvalidStateException(String message, Throwable cause);
36
}
37
```
38
39
**Usage Examples:**
40
41
```java
42
import org.apache.flink.runtime.rpc.pekko.exceptions.RpcInvalidStateException;
43
44
public class RpcEndpointImpl extends RpcEndpoint {
45
private volatile boolean isStarted = false;
46
private volatile boolean isTerminated = false;
47
48
public void performOperation() throws RpcInvalidStateException {
49
if (!isStarted) {
50
throw new RpcInvalidStateException(
51
"Cannot perform operation: RPC endpoint has not been started"
52
);
53
}
54
55
if (isTerminated) {
56
throw new RpcInvalidStateException(
57
"Cannot perform operation: RPC endpoint has been terminated"
58
);
59
}
60
61
// Perform the operation
62
doActualWork();
63
}
64
65
public void connectToRemote(String address) throws RpcInvalidStateException {
66
try {
67
if (getActorSystem().whenTerminated().isCompleted()) {
68
throw new RpcInvalidStateException(
69
"Cannot connect to remote: Actor system has been terminated"
70
);
71
}
72
73
// Attempt connection
74
establishConnection(address);
75
76
} catch (Exception e) {
77
throw new RpcInvalidStateException(
78
"Failed to connect to remote endpoint due to invalid state",
79
e
80
);
81
}
82
}
83
84
// Error handling in service lifecycle
85
public void handleStateTransition() {
86
try {
87
transitionToNextState();
88
} catch (IllegalStateException e) {
89
// Convert to RPC-specific exception
90
throw new RpcInvalidStateException(
91
"Invalid state transition in RPC service",
92
e
93
);
94
}
95
}
96
}
97
```
98
99
### UnknownMessageException
100
101
Exception for handling unknown or unsupported message types in RPC communication.
102
103
```java { .api }
104
/**
105
* Exception for unknown message types.
106
* Thrown when an RPC actor receives a message it cannot handle or recognize.
107
* Extends RpcRuntimeException to indicate RPC-specific runtime errors.
108
*/
109
public class UnknownMessageException extends RpcRuntimeException {
110
111
/**
112
* Constructor with descriptive message.
113
* @param message Description of the unknown message condition
114
*/
115
public UnknownMessageException(String message);
116
117
/**
118
* Constructor with message and underlying cause.
119
* @param message Description of the unknown message condition
120
* @param cause The underlying exception that occurred while processing
121
*/
122
public UnknownMessageException(String message, Throwable cause);
123
124
/**
125
* Constructor with underlying cause only.
126
* @param cause The underlying exception that occurred while processing
127
*/
128
public UnknownMessageException(Throwable cause);
129
}
130
```
131
132
**Usage Examples:**
133
134
```java
135
import org.apache.flink.runtime.rpc.pekko.exceptions.UnknownMessageException;
136
import org.apache.pekko.actor.AbstractActor;
137
138
public class RpcActorImpl extends AbstractActor {
139
140
@Override
141
public Receive createReceive() {
142
return receiveBuilder()
143
.match(String.class, this::handleStringMessage)
144
.match(Integer.class, this::handleIntegerMessage)
145
.match(StartMessage.class, this::handleStartMessage)
146
.match(StopMessage.class, this::handleStopMessage)
147
.matchAny(this::handleUnknownMessage)
148
.build();
149
}
150
151
private void handleStringMessage(String message) {
152
// Handle string messages
153
logger.debug("Received string message: {}", message);
154
}
155
156
private void handleIntegerMessage(Integer message) {
157
// Handle integer messages
158
logger.debug("Received integer message: {}", message);
159
}
160
161
private void handleStartMessage(StartMessage message) {
162
// Handle start control message
163
logger.info("Starting RPC actor");
164
}
165
166
private void handleStopMessage(StopMessage message) {
167
// Handle stop control message
168
logger.info("Stopping RPC actor");
169
}
170
171
private void handleUnknownMessage(Object message) {
172
String messageType = message != null ? message.getClass().getSimpleName() : "null";
173
String errorMsg = String.format(
174
"Received unknown message type '%s': %s",
175
messageType,
176
message
177
);
178
179
logger.warn(errorMsg);
180
181
// Send error response back to sender
182
getSender().tell(
183
new UnknownMessageException(errorMsg),
184
getSelf()
185
);
186
}
187
}
188
189
// Message handler with exception propagation
190
public class MessageProcessor {
191
192
public void processMessage(Object message) throws UnknownMessageException {
193
if (message instanceof CommandMessage) {
194
processCommand((CommandMessage) message);
195
} else if (message instanceof QueryMessage) {
196
processQuery((QueryMessage) message);
197
} else if (message instanceof EventMessage) {
198
processEvent((EventMessage) message);
199
} else {
200
throw new UnknownMessageException(
201
"Cannot process message of type: " + message.getClass().getName()
202
);
203
}
204
}
205
206
public void handleMessageWithRecovery(Object message) {
207
try {
208
processMessage(message);
209
} catch (UnknownMessageException e) {
210
logger.error("Failed to process unknown message", e);
211
212
// Attempt fallback processing
213
try {
214
processFallback(message);
215
} catch (Exception fallbackException) {
216
throw new UnknownMessageException(
217
"Message processing failed completely",
218
fallbackException
219
);
220
}
221
}
222
}
223
224
private void processCommand(CommandMessage cmd) { /* implementation */ }
225
private void processQuery(QueryMessage query) { /* implementation */ }
226
private void processEvent(EventMessage event) { /* implementation */ }
227
private void processFallback(Object message) { /* fallback implementation */ }
228
}
229
```
230
231
### Exception Handling Patterns
232
233
Common patterns for handling RPC exceptions in distributed environments.
234
235
```java { .api }
236
/**
237
* Utility methods for common RPC exception handling patterns.
238
*/
239
public class RpcExceptionHandling {
240
241
/**
242
* Determines if an exception indicates a recoverable RPC error.
243
* @param exception Exception to analyze
244
* @return true if the error might be recoverable with retry
245
*/
246
public static boolean isRecoverableException(Throwable exception);
247
248
/**
249
* Determines if an exception indicates RPC endpoint termination.
250
* @param exception Exception to analyze
251
* @return true if the exception indicates endpoint termination
252
*/
253
public static boolean isEndpointTerminatedException(Throwable exception);
254
255
/**
256
* Extracts the root cause from a chain of RPC exceptions.
257
* @param exception Exception to analyze
258
* @return Root cause exception
259
*/
260
public static Throwable getRootCause(Throwable exception);
261
}
262
```
263
264
**Comprehensive Error Handling Examples:**
265
266
```java
267
import org.apache.flink.runtime.rpc.pekko.exceptions.*;
268
import java.util.concurrent.CompletableFuture;
269
import java.util.concurrent.CompletionException;
270
271
public class RobustRpcClient {
272
private static final int MAX_RETRIES = 3;
273
private static final long RETRY_DELAY_MS = 1000;
274
275
public <T> CompletableFuture<T> callWithRetry(
276
String endpoint,
277
String methodName,
278
Object... args) {
279
280
return callWithRetryInternal(endpoint, methodName, 0, args);
281
}
282
283
private <T> CompletableFuture<T> callWithRetryInternal(
284
String endpoint,
285
String methodName,
286
int attempt,
287
Object... args) {
288
289
return makeRpcCall(endpoint, methodName, args)
290
.handle((result, throwable) -> {
291
if (throwable == null) {
292
return CompletableFuture.completedFuture(result);
293
}
294
295
// Unwrap CompletionException
296
Throwable actualException = throwable instanceof CompletionException
297
? throwable.getCause() : throwable;
298
299
if (shouldRetry(actualException, attempt)) {
300
logger.warn("RPC call failed (attempt {}), retrying: {}",
301
attempt + 1, actualException.getMessage());
302
303
return CompletableFuture
304
.delayedExecutor(RETRY_DELAY_MS, TimeUnit.MILLISECONDS)
305
.execute(() -> callWithRetryInternal(endpoint, methodName, attempt + 1, args));
306
} else {
307
logger.error("RPC call failed permanently after {} attempts", attempt + 1, actualException);
308
return CompletableFuture.<T>failedFuture(actualException);
309
}
310
})
311
.thenCompose(future -> future);
312
}
313
314
private boolean shouldRetry(Throwable exception, int attempt) {
315
if (attempt >= MAX_RETRIES) {
316
return false;
317
}
318
319
// Don't retry on invalid state - usually permanent
320
if (exception instanceof RpcInvalidStateException) {
321
return false;
322
}
323
324
// Don't retry on unknown message - usually a programming error
325
if (exception instanceof UnknownMessageException) {
326
return false;
327
}
328
329
// Retry on network issues, timeouts, etc.
330
return isRetryableException(exception);
331
}
332
333
private boolean isRetryableException(Throwable exception) {
334
return exception instanceof java.net.ConnectException ||
335
exception instanceof java.util.concurrent.TimeoutException ||
336
exception instanceof org.apache.pekko.actor.ActorNotFound ||
337
(exception.getMessage() != null &&
338
exception.getMessage().contains("connection refused"));
339
}
340
341
// Circuit breaker pattern for RPC calls
342
public class RpcCircuitBreaker {
343
private volatile State state = State.CLOSED;
344
private volatile int failureCount = 0;
345
private volatile long lastFailureTime = 0;
346
347
private static final int FAILURE_THRESHOLD = 5;
348
private static final long TIMEOUT_MS = 60000; // 1 minute
349
350
enum State { CLOSED, OPEN, HALF_OPEN }
351
352
public <T> CompletableFuture<T> execute(CompletableFuture<T> operation) {
353
if (state == State.OPEN) {
354
if (System.currentTimeMillis() - lastFailureTime > TIMEOUT_MS) {
355
state = State.HALF_OPEN;
356
} else {
357
return CompletableFuture.failedFuture(
358
new RpcInvalidStateException("Circuit breaker is OPEN")
359
);
360
}
361
}
362
363
return operation
364
.whenComplete((result, throwable) -> {
365
if (throwable == null) {
366
onSuccess();
367
} else {
368
onFailure();
369
}
370
});
371
}
372
373
private void onSuccess() {
374
failureCount = 0;
375
state = State.CLOSED;
376
}
377
378
private void onFailure() {
379
failureCount++;
380
lastFailureTime = System.currentTimeMillis();
381
382
if (failureCount >= FAILURE_THRESHOLD) {
383
state = State.OPEN;
384
}
385
}
386
}
387
}
388
```
389
390
**Error Recovery Strategies:**
391
392
```java
393
public class RpcErrorRecovery {
394
395
// Graceful degradation on RPC failures
396
public String getJobStatus(String jobId) {
397
try {
398
return jobManagerGateway.getJobStatus(jobId).get();
399
} catch (RpcInvalidStateException e) {
400
logger.warn("JobManager not available, returning cached status", e);
401
return getCachedJobStatus(jobId);
402
} catch (UnknownMessageException e) {
403
logger.error("JobManager doesn't support getJobStatus operation", e);
404
return "UNKNOWN";
405
} catch (Exception e) {
406
logger.error("Failed to get job status", e);
407
throw new RuntimeException("Job status unavailable", e);
408
}
409
}
410
411
// Failover to backup endpoints
412
public CompletableFuture<String> callWithFailover(List<String> endpoints, String method) {
413
if (endpoints.isEmpty()) {
414
return CompletableFuture.failedFuture(
415
new RpcInvalidStateException("No endpoints available")
416
);
417
}
418
419
return callEndpoint(endpoints.get(0), method)
420
.handle((result, throwable) -> {
421
if (throwable == null) {
422
return CompletableFuture.completedFuture(result);
423
}
424
425
if (endpoints.size() > 1) {
426
logger.warn("Primary endpoint failed, trying backup: {}", throwable.getMessage());
427
return callWithFailover(endpoints.subList(1, endpoints.size()), method);
428
} else {
429
return CompletableFuture.<String>failedFuture(throwable);
430
}
431
})
432
.thenCompose(future -> future);
433
}
434
435
private CompletableFuture<String> callEndpoint(String endpoint, String method) {
436
// Implementation for calling specific endpoint
437
return CompletableFuture.completedFuture("result");
438
}
439
440
private String getCachedJobStatus(String jobId) {
441
// Implementation for getting cached status
442
return "CACHED_STATUS";
443
}
444
}
445
```