0
# Error Handling and Retry Mechanisms
1
2
Configuration of retry policies, error handlers, and dead letter queue processing for robust message handling in AMQP messaging scenarios.
3
4
## Basic Error Handling
5
6
### Exception Types
7
8
```java { .api }
9
// Core AMQP exceptions
10
public class AmqpException extends RuntimeException {
11
public AmqpException(String message);
12
public AmqpException(String message, Throwable cause);
13
}
14
15
public class AmqpRejectAndDontRequeueException extends AmqpException {
16
public AmqpRejectAndDontRequeueException(String message);
17
public AmqpRejectAndDontRequeueException(String message, Throwable cause);
18
}
19
20
public class AmqpIOException extends AmqpException {
21
public AmqpIOException(String message);
22
public AmqpIOException(String message, Throwable cause);
23
}
24
25
public class AmqpTimeoutException extends AmqpException {
26
public AmqpTimeoutException(String message);
27
public AmqpTimeoutException(String message, Throwable cause);
28
}
29
30
// Connection exceptions
31
public class AmqpConnectException extends AmqpIOException {
32
public AmqpConnectException(String message);
33
public AmqpConnectException(String message, Throwable cause);
34
}
35
36
// Resource exceptions
37
public class AmqpResourceNotAvailableException extends AmqpException {
38
public AmqpResourceNotAvailableException(String message);
39
public AmqpResourceNotAvailableException(String message, Throwable cause);
40
}
41
```
42
43
### Basic Error Handling in Listeners
44
45
```java { .api }
46
@Component
47
public class ErrorHandlingListeners {
48
49
@RabbitListener(queues = "error.handling.queue")
50
public void handleWithBasicErrorHandling(String message) {
51
try {
52
processMessage(message);
53
} catch (BusinessException e) {
54
// Business logic error - don't requeue
55
log.error("Business error processing message: {}", message, e);
56
throw new AmqpRejectAndDontRequeueException("Business error", e);
57
} catch (TransientException e) {
58
// Transient error - allow requeue
59
log.warn("Transient error processing message: {}", message, e);
60
throw e; // Will be requeued by default
61
} catch (Exception e) {
62
// Unknown error - log and reject
63
log.error("Unknown error processing message: {}", message, e);
64
throw new AmqpRejectAndDontRequeueException("Unknown error", e);
65
}
66
}
67
68
@RabbitListener(queues = "manual.ack.queue", ackMode = "MANUAL")
69
public void handleWithManualAck(String message, Channel channel,
70
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
71
try {
72
processMessage(message);
73
channel.basicAck(deliveryTag, false);
74
} catch (RecoverableException e) {
75
// Negative acknowledgment with requeue
76
channel.basicNack(deliveryTag, false, true);
77
} catch (Exception e) {
78
// Negative acknowledgment without requeue
79
channel.basicNack(deliveryTag, false, false);
80
}
81
}
82
}
83
```
84
85
## Retry Configuration
86
87
### Template Retry Configuration
88
89
```java { .api }
90
@FunctionalInterface
91
public interface RabbitRetryTemplateCustomizer {
92
void customize(Target target, RetryTemplate retryTemplate);
93
94
enum Target { SENDER, LISTENER }
95
}
96
97
@Configuration
98
public class RetryConfig {
99
100
@Bean
101
public RabbitRetryTemplateCustomizer retryTemplateCustomizer() {
102
return (target, template) -> {
103
template.setRetryPolicy(createRetryPolicy(target));
104
template.setBackOffPolicy(createBackOffPolicy());
105
template.setListeners(new RetryListener[] { createRetryListener() });
106
};
107
}
108
109
private RetryPolicy createRetryPolicy(RabbitRetryTemplateCustomizer.Target target) {
110
SimpleRetryPolicy policy = new SimpleRetryPolicy();
111
policy.setMaxAttempts(3);
112
113
// Configure which exceptions to retry
114
Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();
115
retryableExceptions.put(AmqpIOException.class, true);
116
retryableExceptions.put(AmqpTimeoutException.class, true);
117
retryableExceptions.put(AmqpResourceNotAvailableException.class, true);
118
retryableExceptions.put(AmqpRejectAndDontRequeueException.class, false);
119
120
policy.setRetryableExceptions(retryableExceptions);
121
return policy;
122
}
123
124
private BackOffPolicy createBackOffPolicy() {
125
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
126
backOffPolicy.setInitialInterval(1000L);
127
backOffPolicy.setMultiplier(2.0);
128
backOffPolicy.setMaxInterval(10000L);
129
return backOffPolicy;
130
}
131
132
private RetryListener createRetryListener() {
133
return new RetryListenerSupport() {
134
@Override
135
public <T, E extends Throwable> void onError(RetryContext context,
136
RetryCallback<T, E> callback,
137
Throwable throwable) {
138
log.warn("Retry attempt {} failed: {}",
139
context.getRetryCount(), throwable.getMessage());
140
}
141
};
142
}
143
}
144
```
145
146
### Listener Retry Configuration
147
148
```java { .api }
149
@Configuration
150
public class ListenerRetryConfig {
151
152
@Bean
153
public SimpleRabbitListenerContainerFactory retryContainerFactory(
154
ConnectionFactory connectionFactory,
155
MessageRecoverer messageRecoverer) {
156
157
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
158
factory.setConnectionFactory(connectionFactory);
159
160
// Enable retry
161
factory.setRetryTemplate(retryTemplate());
162
factory.setRecoveryCallback(recoveryCallback());
163
164
return factory;
165
}
166
167
@Bean
168
public RetryTemplate retryTemplate() {
169
RetryTemplate template = new RetryTemplate();
170
171
// Fixed delay retry policy
172
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
173
backOffPolicy.setBackOffPeriod(5000L); // 5 seconds
174
template.setBackOffPolicy(backOffPolicy);
175
176
// Simple retry policy
177
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
178
retryPolicy.setMaxAttempts(3);
179
template.setRetryPolicy(retryPolicy);
180
181
return template;
182
}
183
184
@Bean
185
public RecoveryCallback<Void> recoveryCallback() {
186
return context -> {
187
Message message = (Message) context.getAttribute("message");
188
Throwable lastException = context.getLastThrowable();
189
190
log.error("Message recovery after {} attempts: {}",
191
context.getRetryCount(), new String(message.getBody()), lastException);
192
193
// Send to dead letter queue or handle differently
194
handleFailedMessage(message, lastException);
195
return null;
196
};
197
}
198
199
private void handleFailedMessage(Message message, Throwable exception) {
200
// Implementation for handling failed messages
201
}
202
}
203
```
204
205
### Advanced Retry Policies
206
207
```java { .api }
208
@Configuration
209
public class AdvancedRetryConfig {
210
211
@Bean
212
public RetryTemplate advancedRetryTemplate() {
213
RetryTemplate template = new RetryTemplate();
214
215
// Composite retry policy
216
CompositeRetryPolicy compositePolicy = new CompositeRetryPolicy();
217
218
// Max attempts policy
219
SimpleRetryPolicy maxAttemptsPolicy = new SimpleRetryPolicy();
220
maxAttemptsPolicy.setMaxAttempts(5);
221
222
// Time-based policy
223
TimeoutRetryPolicy timeoutPolicy = new TimeoutRetryPolicy();
224
timeoutPolicy.setTimeout(30000L); // 30 seconds max
225
226
compositePolicy.setPolicies(new RetryPolicy[] { maxAttemptsPolicy, timeoutPolicy });
227
template.setRetryPolicy(compositePolicy);
228
229
// Exponential backoff with randomization
230
ExponentialRandomBackOffPolicy backOffPolicy = new ExponentialRandomBackOffPolicy();
231
backOffPolicy.setInitialInterval(1000L);
232
backOffPolicy.setMultiplier(2.0);
233
backOffPolicy.setMaxInterval(30000L);
234
template.setBackOffPolicy(backOffPolicy);
235
236
return template;
237
}
238
239
@Bean
240
public MessageRecoverer customMessageRecoverer(RabbitTemplate rabbitTemplate) {
241
return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error.routing.key") {
242
@Override
243
protected Map<String, Object> additionalHeaders(Message message, Throwable cause) {
244
Map<String, Object> headers = new HashMap<>();
245
headers.put("x-original-exchange", message.getMessageProperties().getReceivedExchange());
246
headers.put("x-original-routing-key", message.getMessageProperties().getReceivedRoutingKey());
247
headers.put("x-exception-message", cause.getMessage());
248
headers.put("x-exception-stacktrace", getStackTrace(cause));
249
headers.put("x-retry-count", getRetryCount(message));
250
return headers;
251
}
252
};
253
}
254
}
255
```
256
257
## Dead Letter Queue Configuration
258
259
### Basic Dead Letter Setup
260
261
```java { .api }
262
@Configuration
263
public class DeadLetterConfig {
264
265
// Main exchange and queue
266
@Bean
267
public TopicExchange mainExchange() {
268
return ExchangeBuilder.topicExchange("main.exchange").durable(true).build();
269
}
270
271
@Bean
272
public Queue mainQueue() {
273
return QueueBuilder.durable("main.queue")
274
.withArgument("x-dead-letter-exchange", "dlx.exchange")
275
.withArgument("x-dead-letter-routing-key", "failed")
276
.withArgument("x-message-ttl", 300000) // 5 minutes TTL
277
.build();
278
}
279
280
@Bean
281
public Binding mainQueueBinding() {
282
return BindingBuilder.bind(mainQueue()).to(mainExchange()).with("main.routing.key");
283
}
284
285
// Dead letter exchange and queue
286
@Bean
287
public DirectExchange deadLetterExchange() {
288
return ExchangeBuilder.directExchange("dlx.exchange").durable(true).build();
289
}
290
291
@Bean
292
public Queue deadLetterQueue() {
293
return QueueBuilder.durable("dead.letter.queue").build();
294
}
295
296
@Bean
297
public Binding deadLetterBinding() {
298
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("failed");
299
}
300
}
301
```
302
303
### Dead Letter Message Handler
304
305
```java { .api }
306
@Component
307
public class DeadLetterHandler {
308
309
@RabbitListener(queues = "dead.letter.queue")
310
public void handleDeadLetter(Message message,
311
@Header Map<String, Object> headers,
312
@Header(required = false, name = "x-death") List<Map<String, Object>> xDeath) {
313
314
String originalExchange = (String) headers.get("x-original-exchange");
315
String originalRoutingKey = (String) headers.get("x-original-routing-key");
316
String messageBody = new String(message.getBody());
317
318
log.error("Dead letter received - Original exchange: {}, routing key: {}, message: {}",
319
originalExchange, originalRoutingKey, messageBody);
320
321
// Analyze death information
322
if (xDeath != null && !xDeath.isEmpty()) {
323
Map<String, Object> firstDeath = xDeath.get(0);
324
String reason = (String) firstDeath.get("reason");
325
Long count = (Long) firstDeath.get("count");
326
String queue = (String) firstDeath.get("queue");
327
328
log.error("Death reason: {}, count: {}, from queue: {}", reason, count, queue);
329
330
// Handle based on death reason
331
handleDeadLetterByReason(message, reason, count);
332
}
333
}
334
335
private void handleDeadLetterByReason(Message message, String reason, Long count) {
336
switch (reason) {
337
case "rejected":
338
handleRejectedMessage(message);
339
break;
340
case "expired":
341
handleExpiredMessage(message);
342
break;
343
case "maxlen":
344
handleMaxLengthExceeded(message);
345
break;
346
default:
347
handleUnknownDeathReason(message, reason);
348
}
349
}
350
351
private void handleRejectedMessage(Message message) {
352
// Handle messages that were explicitly rejected
353
log.info("Handling rejected message: {}", new String(message.getBody()));
354
// Could send to human review queue, log to external system, etc.
355
}
356
357
private void handleExpiredMessage(Message message) {
358
// Handle messages that expired (TTL exceeded)
359
log.info("Handling expired message: {}", new String(message.getBody()));
360
}
361
362
private void handleMaxLengthExceeded(Message message) {
363
// Handle messages dropped due to queue length limit
364
log.info("Handling message dropped due to max length: {}", new String(message.getBody()));
365
}
366
367
private void handleUnknownDeathReason(Message message, String reason) {
368
log.warn("Unknown death reason '{}' for message: {}", reason, new String(message.getBody()));
369
}
370
}
371
```
372
373
### Advanced Dead Letter Scenarios
374
375
```java { .api }
376
@Configuration
377
public class AdvancedDeadLetterConfig {
378
379
// Retry queue with limited attempts
380
@Bean
381
public Queue retryQueue() {
382
return QueueBuilder.durable("retry.queue")
383
.withArgument("x-dead-letter-exchange", "processing.exchange")
384
.withArgument("x-dead-letter-routing-key", "retry.processing")
385
.withArgument("x-message-ttl", 30000) // 30 seconds delay
386
.build();
387
}
388
389
// Final dead letter queue after retries exhausted
390
@Bean
391
public Queue finalDeadLetterQueue() {
392
return QueueBuilder.durable("final.dead.letter.queue").build();
393
}
394
395
@Component
396
public static class RetryDeadLetterHandler {
397
398
private static final String RETRY_COUNT_HEADER = "x-retry-count";
399
private static final int MAX_RETRIES = 3;
400
401
@Autowired
402
private RabbitTemplate rabbitTemplate;
403
404
@RabbitListener(queues = "main.processing.queue")
405
public void processMessage(Message message) throws Exception {
406
try {
407
// Process message
408
processBusinessLogic(new String(message.getBody()));
409
} catch (RecoverableException e) {
410
handleRecoverableError(message, e);
411
} catch (Exception e) {
412
// Non-recoverable error - send to final dead letter
413
log.error("Non-recoverable error processing message", e);
414
throw new AmqpRejectAndDontRequeueException("Non-recoverable error", e);
415
}
416
}
417
418
private void handleRecoverableError(Message message, Exception e) {
419
Integer retryCount = (Integer) message.getMessageProperties().getHeaders().get(RETRY_COUNT_HEADER);
420
retryCount = retryCount == null ? 0 : retryCount;
421
422
if (retryCount < MAX_RETRIES) {
423
// Send to retry queue with incremented count
424
message.getMessageProperties().setHeader(RETRY_COUNT_HEADER, retryCount + 1);
425
rabbitTemplate.send("retry.exchange", "retry", message);
426
log.info("Sending message for retry attempt {}", retryCount + 1);
427
} else {
428
// Max retries exceeded - send to final dead letter
429
log.error("Max retries exceeded for message after {} attempts", retryCount);
430
throw new AmqpRejectAndDontRequeueException("Max retries exceeded", e);
431
}
432
}
433
434
private void processBusinessLogic(String messageBody) throws Exception {
435
// Business logic implementation
436
}
437
}
438
}
439
```
440
441
## Error Recovery Strategies
442
443
### Message Recovery Interface
444
445
```java { .api }
446
public interface MessageRecoverer {
447
void recover(Message message, Throwable cause);
448
}
449
450
// Built-in recoverers
451
public class RejectAndDontRequeueRecoverer implements MessageRecoverer {
452
@Override
453
public void recover(Message message, Throwable cause) {
454
throw new AmqpRejectAndDontRequeueException("Retry attempts exhausted", cause);
455
}
456
}
457
458
public class RepublishMessageRecoverer implements MessageRecoverer {
459
public RepublishMessageRecoverer(AmqpTemplate amqpTemplate, String exchange, String routingKey);
460
461
@Override
462
public void recover(Message message, Throwable cause) {
463
// Republish message to error queue with additional headers
464
}
465
466
protected Map<String, Object> additionalHeaders(Message message, Throwable cause) {
467
// Override to add custom headers
468
return new HashMap<>();
469
}
470
}
471
472
public class ImmediateRequeueMessageRecoverer implements MessageRecoverer {
473
@Override
474
public void recover(Message message, Throwable cause) {
475
throw new ImmediateRequeueAmqpException("Immediate requeue", cause);
476
}
477
}
478
```
479
480
### Custom Recovery Strategies
481
482
```java { .api }
483
@Component
484
public class CustomMessageRecoverer implements MessageRecoverer {
485
486
private final RabbitTemplate rabbitTemplate;
487
private final NotificationService notificationService;
488
private final MetricsService metricsService;
489
490
public CustomMessageRecoverer(RabbitTemplate rabbitTemplate,
491
NotificationService notificationService,
492
MetricsService metricsService) {
493
this.rabbitTemplate = rabbitTemplate;
494
this.notificationService = notificationService;
495
this.metricsService = metricsService;
496
}
497
498
@Override
499
public void recover(Message message, Throwable cause) {
500
String messageType = getMessageType(message);
501
String messageId = getMessageId(message);
502
503
// Record metrics
504
metricsService.incrementFailedMessageCount(messageType);
505
506
// Determine recovery strategy based on message type and error
507
RecoveryStrategy strategy = determineRecoveryStrategy(message, cause);
508
509
switch (strategy) {
510
case REPUBLISH_TO_ERROR_QUEUE:
511
republishToErrorQueue(message, cause);
512
break;
513
case REPUBLISH_WITH_DELAY:
514
republishWithDelay(message, cause);
515
break;
516
case NOTIFY_AND_DISCARD:
517
notifyAndDiscard(message, cause);
518
break;
519
case STORE_FOR_MANUAL_REVIEW:
520
storeForManualReview(message, cause);
521
break;
522
default:
523
rejectMessage(message, cause);
524
}
525
}
526
527
private RecoveryStrategy determineRecoveryStrategy(Message message, Throwable cause) {
528
if (cause instanceof BusinessValidationException) {
529
return RecoveryStrategy.STORE_FOR_MANUAL_REVIEW;
530
} else if (cause instanceof ExternalServiceException) {
531
return RecoveryStrategy.REPUBLISH_WITH_DELAY;
532
} else if (isCriticalMessage(message)) {
533
return RecoveryStrategy.NOTIFY_AND_DISCARD;
534
} else {
535
return RecoveryStrategy.REPUBLISH_TO_ERROR_QUEUE;
536
}
537
}
538
539
private void republishToErrorQueue(Message message, Throwable cause) {
540
Map<String, Object> headers = new HashMap<>(message.getMessageProperties().getHeaders());
541
headers.put("x-exception-message", cause.getMessage());
542
headers.put("x-exception-class", cause.getClass().getName());
543
headers.put("x-failed-timestamp", System.currentTimeMillis());
544
545
Message errorMessage = MessageBuilder.withBody(message.getBody())
546
.copyProperties(message.getMessageProperties())
547
.setHeaders(headers)
548
.build();
549
550
rabbitTemplate.send("error.exchange", "error", errorMessage);
551
}
552
553
private void republishWithDelay(Message message, Throwable cause) {
554
// Implement delayed republishing logic
555
}
556
557
private void notifyAndDiscard(Message message, Throwable cause) {
558
notificationService.sendAlert("Critical message processing failed",
559
getMessageId(message), cause.getMessage());
560
}
561
562
private void storeForManualReview(Message message, Throwable cause) {
563
// Store message in database for manual review
564
}
565
566
private void rejectMessage(Message message, Throwable cause) {
567
throw new AmqpRejectAndDontRequeueException("Message recovery failed", cause);
568
}
569
570
enum RecoveryStrategy {
571
REPUBLISH_TO_ERROR_QUEUE,
572
REPUBLISH_WITH_DELAY,
573
NOTIFY_AND_DISCARD,
574
STORE_FOR_MANUAL_REVIEW,
575
REJECT
576
}
577
}
578
```
579
580
## Circuit Breaker Pattern
581
582
### Circuit Breaker Implementation
583
584
```java { .api }
585
@Component
586
public class CircuitBreakerMessageHandler {
587
588
private final CircuitBreaker circuitBreaker;
589
private final RabbitTemplate rabbitTemplate;
590
591
public CircuitBreakerMessageHandler() {
592
this.circuitBreaker = CircuitBreaker.ofDefaults("messageProcessing");
593
configureCircuitBreaker();
594
}
595
596
private void configureCircuitBreaker() {
597
circuitBreaker.getEventPublisher().onStateTransition(event -> {
598
log.info("Circuit breaker state transition: {} -> {}",
599
event.getStateTransition().getFromState(),
600
event.getStateTransition().getToState());
601
});
602
603
circuitBreaker.getEventPublisher().onFailureRateExceeded(event -> {
604
log.warn("Circuit breaker failure rate exceeded: {}%",
605
event.getFailureRate());
606
});
607
}
608
609
@RabbitListener(queues = "circuit.breaker.queue")
610
public void handleWithCircuitBreaker(String message) {
611
Supplier<String> decoratedSupplier = CircuitBreaker
612
.decorateSupplier(circuitBreaker, () -> processMessage(message));
613
614
try {
615
String result = decoratedSupplier.get();
616
log.info("Message processed successfully: {}", result);
617
} catch (CallNotPermittedException e) {
618
// Circuit breaker is open
619
log.warn("Circuit breaker is open, message will be requeued: {}", message);
620
handleCircuitBreakerOpen(message);
621
} catch (Exception e) {
622
log.error("Error processing message: {}", message, e);
623
throw new AmqpRejectAndDontRequeueException("Processing failed", e);
624
}
625
}
626
627
private String processMessage(String message) {
628
// Message processing logic that might fail
629
if (message.contains("error")) {
630
throw new RuntimeException("Simulated processing error");
631
}
632
return "processed: " + message;
633
}
634
635
private void handleCircuitBreakerOpen(String message) {
636
// Send to delay queue for later retry when circuit breaker might be closed
637
rabbitTemplate.convertAndSend("delay.exchange", "delay.5min", message);
638
}
639
}