0
# Error Handling and Recovery
1
2
Exception classes and automatic recovery mechanisms for handling network failures, protocol errors, and application-initiated shutdowns. The RabbitMQ Java client provides comprehensive error handling and automatic recovery capabilities for robust messaging applications.
3
4
## Capabilities
5
6
### Core Exception Types
7
8
Base exception classes for different types of errors in AMQP operations.
9
10
```java { .api }
11
/**
12
* Exception indicating a shutdown signal from connection or channel
13
*/
14
public class ShutdownSignalException extends RuntimeException {
15
/**
16
* Get the shutdown reason object
17
* @return AMQP method object indicating the reason
18
*/
19
public Object getReason();
20
21
/**
22
* Check if this is a hard error (connection-level)
23
* @return true for connection errors, false for channel errors
24
*/
25
public boolean isHardError();
26
27
/**
28
* Check if shutdown was initiated by the application
29
* @return true if application initiated, false if server initiated
30
*/
31
public boolean isInitiatedByApplication();
32
33
/**
34
* Get reference to the connection or channel that was shut down
35
* @return ShutdownNotifier object (Connection or Channel)
36
*/
37
public ShutdownNotifier getReference();
38
}
39
40
/**
41
* Exception for operations on already closed connections or channels
42
*/
43
public class AlreadyClosedException extends ShutdownSignalException {
44
/**
45
* Create exception with shutdown signal
46
* @param shutdownSignalException - Original shutdown signal
47
*/
48
public AlreadyClosedException(ShutdownSignalException shutdownSignalException);
49
}
50
51
/**
52
* Exception for malformed AMQP frames
53
*/
54
public class MalformedFrameException extends IOException {
55
/**
56
* Create exception with message
57
* @param message - Error message
58
*/
59
public MalformedFrameException(String message);
60
}
61
62
/**
63
* Exception for missed heartbeats from server
64
*/
65
public class MissedHeartbeatException extends IOException {
66
/**
67
* Create exception with message
68
* @param message - Error message
69
*/
70
public MissedHeartbeatException(String message);
71
}
72
```
73
74
**Usage Examples:**
75
76
```java
77
// Handling shutdown signals
78
try {
79
channel.basicPublish("exchange", "key", null, message.getBytes());
80
} catch (ShutdownSignalException e) {
81
if (e.isHardError()) {
82
System.out.println("Connection error: " + e.getReason());
83
// Handle connection-level error
84
reconnect();
85
} else {
86
System.out.println("Channel error: " + e.getReason());
87
// Handle channel-level error
88
reopenChannel();
89
}
90
} catch (AlreadyClosedException e) {
91
System.out.println("Attempted operation on closed resource");
92
// Recreate connection/channel
93
recreateResources();
94
}
95
```
96
97
### Authentication Exceptions
98
99
Exception types for authentication and authorization failures.
100
101
```java { .api }
102
/**
103
* Base class for possible authentication failures
104
*/
105
public class PossibleAuthenticationFailureException extends IOException {
106
/**
107
* Create exception with message
108
* @param message - Error message
109
*/
110
public PossibleAuthenticationFailureException(String message);
111
}
112
113
/**
114
* Exception for confirmed authentication failures
115
*/
116
public class AuthenticationFailureException extends PossibleAuthenticationFailureException {
117
/**
118
* Create exception with message
119
* @param message - Error message
120
*/
121
public AuthenticationFailureException(String message);
122
}
123
```
124
125
### Protocol and Frame Exceptions
126
127
Exceptions for protocol-level errors and unexpected conditions.
128
129
```java { .api }
130
/**
131
* Exception for protocol version mismatches
132
*/
133
public class ProtocolVersionMismatchException extends IOException {
134
/**
135
* Create exception with version information
136
* @param clientMajor - Client protocol major version
137
* @param clientMinor - Client protocol minor version
138
* @param serverMajor - Server protocol major version
139
* @param serverMinor - Server protocol minor version
140
*/
141
public ProtocolVersionMismatchException(int clientMajor, int clientMinor, int serverMajor, int serverMinor);
142
143
public int getClientMajor();
144
public int getClientMinor();
145
public int getServerMajor();
146
public int getServerMinor();
147
}
148
149
/**
150
* Error for unexpected AMQP frames
151
*/
152
public class UnexpectedFrameError extends Error {
153
/**
154
* Create error with frame information
155
* @param frame - Unexpected frame object
156
* @param expectedFrameType - Expected frame type
157
*/
158
public UnexpectedFrameError(Frame frame, int expectedFrameType);
159
}
160
161
/**
162
* Error for unexpected AMQP methods
163
*/
164
public class UnexpectedMethodError extends Error {
165
/**
166
* Create error with method information
167
* @param method - Unexpected method object
168
*/
169
public UnexpectedMethodError(Method method);
170
}
171
172
/**
173
* Exception for unknown class or method IDs
174
*/
175
public class UnknownClassOrMethodId extends IOException {
176
/**
177
* Create exception with class and method IDs
178
* @param classId - AMQP class ID
179
* @param methodId - AMQP method ID
180
*/
181
public UnknownClassOrMethodId(int classId, int methodId);
182
183
public int getClassId();
184
public int getMethodId();
185
}
186
```
187
188
### Channel and Operation Exceptions
189
190
Exceptions specific to channel operations and RPC timeouts.
191
192
```java { .api }
193
/**
194
* Exception for channel RPC operation timeouts
195
*/
196
public class ChannelContinuationTimeoutException extends TimeoutException {
197
/**
198
* Create timeout exception
199
*/
200
public ChannelContinuationTimeoutException();
201
202
/**
203
* Create timeout exception with message
204
* @param message - Error message
205
*/
206
public ChannelContinuationTimeoutException(String message);
207
}
208
209
/**
210
* Exception for consumer cancellation
211
*/
212
public class ConsumerCancelledException extends RuntimeException {
213
/**
214
* Create consumer cancellation exception
215
*/
216
public ConsumerCancelledException();
217
}
218
219
/**
220
* Exception for unroutable RPC requests
221
*/
222
public class UnroutableRpcRequestException extends IOException {
223
/**
224
* Create exception for unroutable RPC request
225
* @param message - Error message
226
*/
227
public UnroutableRpcRequestException(String message);
228
}
229
```
230
231
### Topology Recovery Exceptions
232
233
Exceptions related to automatic topology recovery.
234
235
```java { .api }
236
/**
237
* Exception during topology recovery process
238
*/
239
public class TopologyRecoveryException extends Exception {
240
/**
241
* Create recovery exception with cause
242
* @param cause - Underlying cause of recovery failure
243
*/
244
public TopologyRecoveryException(Throwable cause);
245
246
/**
247
* Create recovery exception with message and cause
248
* @param message - Error message
249
* @param cause - Underlying cause
250
*/
251
public TopologyRecoveryException(String message, Throwable cause);
252
}
253
```
254
255
## Recovery System
256
257
### Automatic Recovery
258
259
Interfaces and classes for automatic connection and topology recovery.
260
261
```java { .api }
262
/**
263
* Interface for objects that support recovery
264
*/
265
public interface Recoverable {
266
/**
267
* Add a recovery listener
268
* @param listener - Listener to notify on recovery events
269
*/
270
void addRecoveryListener(RecoveryListener listener);
271
272
/**
273
* Remove a recovery listener
274
* @param listener - Listener to remove
275
*/
276
void removeRecoveryListener(RecoveryListener listener);
277
}
278
279
/**
280
* Listener interface for recovery events
281
*/
282
public interface RecoveryListener {
283
/**
284
* Called when recovery completes successfully
285
* @param recoverable - Object that was recovered
286
*/
287
void handleRecovery(Recoverable recoverable);
288
289
/**
290
* Called when recovery process starts
291
* @param recoverable - Object being recovered
292
*/
293
void handleRecoveryStarted(Recoverable recoverable);
294
}
295
296
/**
297
* Interface for handling recovery delays
298
*/
299
public interface RecoveryDelayHandler {
300
/**
301
* Get delay before next recovery attempt
302
* @param recoveryAttempts - Number of recovery attempts so far
303
* @return Delay in milliseconds before next attempt
304
*/
305
long getDelay(int recoveryAttempts);
306
}
307
```
308
309
**Usage Examples:**
310
311
```java
312
// Custom recovery listener
313
RecoveryListener recoveryListener = new RecoveryListener() {
314
@Override
315
public void handleRecovery(Recoverable recoverable) {
316
if (recoverable instanceof Connection) {
317
System.out.println("Connection recovered successfully");
318
// Notify application components
319
notifyConnectionRecovered();
320
} else if (recoverable instanceof Channel) {
321
System.out.println("Channel recovered successfully");
322
// Restart consumers if needed
323
restartConsumers();
324
}
325
}
326
327
@Override
328
public void handleRecoveryStarted(Recoverable recoverable) {
329
System.out.println("Recovery started for: " + recoverable);
330
// Pause message processing during recovery
331
pauseProcessing();
332
}
333
};
334
335
// Add to recoverable connection
336
ConnectionFactory factory = new ConnectionFactory();
337
factory.setAutomaticRecoveryEnabled(true);
338
RecoverableConnection connection = (RecoverableConnection) factory.newConnection();
339
connection.addRecoveryListener(recoveryListener);
340
```
341
342
```java
343
// Custom recovery delay handler with exponential backoff
344
RecoveryDelayHandler delayHandler = new RecoveryDelayHandler() {
345
@Override
346
public long getDelay(int recoveryAttempts) {
347
// Exponential backoff: 1s, 2s, 4s, 8s, max 30s
348
long delay = Math.min(1000L * (1L << recoveryAttempts), 30000L);
349
System.out.println("Recovery attempt " + recoveryAttempts + ", waiting " + delay + "ms");
350
return delay;
351
}
352
};
353
354
ConnectionFactory factory = new ConnectionFactory();
355
factory.setRecoveryDelayHandler(delayHandler);
356
factory.setAutomaticRecoveryEnabled(true);
357
```
358
359
### Exception Handling
360
361
Interface for handling exceptions in consumers and other callback contexts.
362
363
```java { .api }
364
/**
365
* Interface for handling exceptions in consumers and connections
366
*/
367
public interface ExceptionHandler {
368
/**
369
* Handle unexpected exception in connection driver
370
* @param conn - Connection where exception occurred
371
* @param exception - Exception that occurred
372
*/
373
void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception);
374
375
/**
376
* Handle exception in return listener
377
* @param channel - Channel where exception occurred
378
* @param exception - Exception that occurred
379
*/
380
void handleReturnListenerException(Channel channel, Throwable exception);
381
382
/**
383
* Handle exception in flow listener
384
* @param channel - Channel where exception occurred
385
* @param exception - Exception that occurred
386
*/
387
void handleFlowListenerException(Channel channel, Throwable exception);
388
389
/**
390
* Handle exception in confirm listener
391
* @param channel - Channel where exception occurred
392
* @param exception - Exception that occurred
393
*/
394
void handleConfirmListenerException(Channel channel, Throwable exception);
395
396
/**
397
* Handle exception in blocked listener
398
* @param connection - Connection where exception occurred
399
* @param exception - Exception that occurred
400
*/
401
void handleBlockedListenerException(Connection connection, Throwable exception);
402
403
/**
404
* Handle exception in consumer
405
* @param channel - Channel where exception occurred
406
* @param exception - Exception that occurred
407
* @param consumer - Consumer that threw exception
408
* @param consumerTag - Consumer tag
409
* @param methodName - Method where exception occurred
410
*/
411
void handleConsumerException(Channel channel, Throwable exception, Consumer consumer, String consumerTag, String methodName);
412
413
/**
414
* Handle exception during connection recovery
415
* @param conn - Connection being recovered
416
* @param exception - Exception that occurred
417
*/
418
void handleConnectionRecoveryException(Connection conn, Throwable exception);
419
420
/**
421
* Handle exception during channel recovery
422
* @param ch - Channel being recovered
423
* @param exception - Exception that occurred
424
*/
425
void handleChannelRecoveryException(Channel ch, Throwable exception);
426
427
/**
428
* Handle exception during topology recovery
429
* @param conn - Connection being recovered
430
* @param ch - Channel being recovered (may be null)
431
* @param exception - Exception that occurred
432
*/
433
void handleTopologyRecoveryException(Connection conn, Channel ch, TopologyRecoveryException exception);
434
}
435
```
436
437
**Usage Examples:**
438
439
```java
440
// Custom exception handler with logging and metrics
441
public class CustomExceptionHandler implements ExceptionHandler {
442
private static final Logger logger = LoggerFactory.getLogger(CustomExceptionHandler.class);
443
private final MetricsRegistry metrics;
444
445
public CustomExceptionHandler(MetricsRegistry metrics) {
446
this.metrics = metrics;
447
}
448
449
@Override
450
public void handleConsumerException(Channel channel, Throwable exception,
451
Consumer consumer, String consumerTag, String methodName) {
452
logger.error("Consumer exception in {}: {}", methodName, exception.getMessage(), exception);
453
metrics.counter("consumer.exceptions").increment();
454
455
// Optionally restart consumer
456
if (exception instanceof RuntimeException) {
457
restartConsumer(channel, consumer, consumerTag);
458
}
459
}
460
461
@Override
462
public void handleConnectionRecoveryException(Connection conn, Throwable exception) {
463
logger.error("Connection recovery failed: {}", exception.getMessage(), exception);
464
metrics.counter("connection.recovery.failures").increment();
465
466
// Send alert to monitoring system
467
alertingService.sendAlert("RabbitMQ connection recovery failed", exception);
468
}
469
470
@Override
471
public void handleTopologyRecoveryException(Connection conn, Channel ch, TopologyRecoveryException exception) {
472
logger.error("Topology recovery failed: {}", exception.getMessage(), exception);
473
metrics.counter("topology.recovery.failures").increment();
474
475
// Attempt manual topology recreation
476
scheduleTopologyRecreation(conn, ch);
477
}
478
479
// Implement other methods with appropriate logging and handling...
480
}
481
482
// Use custom exception handler
483
ConnectionFactory factory = new ConnectionFactory();
484
factory.setExceptionHandler(new CustomExceptionHandler(metricsRegistry));
485
```
486
487
### Error Handling Patterns
488
489
**Robust Consumer with Error Handling:**
490
491
```java
492
public class RobustConsumer extends DefaultConsumer {
493
private static final Logger logger = LoggerFactory.getLogger(RobustConsumer.class);
494
private final int maxRetries;
495
496
public RobustConsumer(Channel channel, int maxRetries) {
497
super(channel);
498
this.maxRetries = maxRetries;
499
}
500
501
@Override
502
public void handleDelivery(String consumerTag, Envelope envelope,
503
AMQP.Properties properties, byte[] body) throws IOException {
504
try {
505
processMessage(new String(body, "UTF-8"), properties);
506
getChannel().basicAck(envelope.getDeliveryTag(), false);
507
508
} catch (Exception e) {
509
logger.error("Error processing message: {}", e.getMessage(), e);
510
handleProcessingError(envelope, properties, body, e);
511
}
512
}
513
514
private void handleProcessingError(Envelope envelope, AMQP.BasicProperties properties,
515
byte[] body, Exception error) throws IOException {
516
// Get retry count from headers
517
Map<String, Object> headers = properties.getHeaders();
518
int retryCount = headers != null && headers.containsKey("x-retry-count") ?
519
(Integer) headers.get("x-retry-count") : 0;
520
521
if (retryCount < maxRetries) {
522
// Republish for retry
523
republishForRetry(envelope, properties, body, retryCount + 1);
524
getChannel().basicAck(envelope.getDeliveryTag(), false);
525
} else {
526
// Send to dead letter queue or log as failed
527
logger.error("Message failed after {} retries, sending to DLQ", maxRetries);
528
getChannel().basicNack(envelope.getDeliveryTag(), false, false);
529
}
530
}
531
532
@Override
533
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
534
if (!sig.isInitiatedByApplication()) {
535
logger.error("Consumer {} received unexpected shutdown: {}", consumerTag, sig.getReason());
536
// Implement reconnection logic
537
scheduleReconnection();
538
}
539
}
540
}
541
```
542
543
**Connection Recovery with Custom Logic:**
544
545
```java
546
public class ResilientConnectionManager {
547
private ConnectionFactory factory;
548
private volatile Connection connection;
549
private final List<Channel> channels = new CopyOnWriteArrayList<>();
550
551
public ResilientConnectionManager() {
552
setupConnectionFactory();
553
}
554
555
private void setupConnectionFactory() {
556
factory = new ConnectionFactory();
557
factory.setAutomaticRecoveryEnabled(true);
558
factory.setNetworkRecoveryInterval(5000);
559
560
// Custom recovery listener
561
factory.setRecoveryDelayHandler(recoveryAttempts -> {
562
long delay = Math.min(1000L * recoveryAttempts, 30000L);
563
logger.info("Recovery attempt {}, waiting {}ms", recoveryAttempts, delay);
564
return delay;
565
});
566
567
// Custom exception handler
568
factory.setExceptionHandler(new ExceptionHandler() {
569
@Override
570
public void handleConnectionRecoveryException(Connection conn, Throwable exception) {
571
logger.error("Connection recovery failed", exception);
572
// Custom recovery logic
573
attemptManualRecovery();
574
}
575
576
// ... implement other methods
577
});
578
}
579
580
public synchronized Connection getConnection() throws IOException, TimeoutException {
581
if (connection == null || !connection.isOpen()) {
582
connection = factory.newConnection();
583
((RecoverableConnection) connection).addRecoveryListener(new RecoveryListener() {
584
@Override
585
public void handleRecovery(Recoverable recoverable) {
586
logger.info("Connection recovered successfully");
587
// Recreate channels and consumers
588
recreateChannels();
589
}
590
591
@Override
592
public void handleRecoveryStarted(Recoverable recoverable) {
593
logger.info("Connection recovery started");
594
}
595
});
596
}
597
return connection;
598
}
599
}
600
```