or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration-properties.mdconnection-configuration.mderror-handling.mdindex.mdlistener-configuration.mdmessage-operations.mdqueue-exchange-management.mdstream-processing.md

error-handling.mddocs/

0

# Error Handling and Retry Mechanisms

1

2

Configuration of retry policies, error handlers, and dead letter queue processing for robust message handling in AMQP messaging scenarios.

3

4

## Basic Error Handling

5

6

### Exception Types

7

8

```java { .api }

9

// Core AMQP exceptions

10

public class AmqpException extends RuntimeException {

11

public AmqpException(String message);

12

public AmqpException(String message, Throwable cause);

13

}

14

15

public class AmqpRejectAndDontRequeueException extends AmqpException {

16

public AmqpRejectAndDontRequeueException(String message);

17

public AmqpRejectAndDontRequeueException(String message, Throwable cause);

18

}

19

20

public class AmqpIOException extends AmqpException {

21

public AmqpIOException(String message);

22

public AmqpIOException(String message, Throwable cause);

23

}

24

25

public class AmqpTimeoutException extends AmqpException {

26

public AmqpTimeoutException(String message);

27

public AmqpTimeoutException(String message, Throwable cause);

28

}

29

30

// Connection exceptions

31

public class AmqpConnectException extends AmqpIOException {

32

public AmqpConnectException(String message);

33

public AmqpConnectException(String message, Throwable cause);

34

}

35

36

// Resource exceptions

37

public class AmqpResourceNotAvailableException extends AmqpException {

38

public AmqpResourceNotAvailableException(String message);

39

public AmqpResourceNotAvailableException(String message, Throwable cause);

40

}

41

```

42

43

### Basic Error Handling in Listeners

44

45

```java { .api }

46

@Component

47

public class ErrorHandlingListeners {

48

49

@RabbitListener(queues = "error.handling.queue")

50

public void handleWithBasicErrorHandling(String message) {

51

try {

52

processMessage(message);

53

} catch (BusinessException e) {

54

// Business logic error - don't requeue

55

log.error("Business error processing message: {}", message, e);

56

throw new AmqpRejectAndDontRequeueException("Business error", e);

57

} catch (TransientException e) {

58

// Transient error - allow requeue

59

log.warn("Transient error processing message: {}", message, e);

60

throw e; // Will be requeued by default

61

} catch (Exception e) {

62

// Unknown error - log and reject

63

log.error("Unknown error processing message: {}", message, e);

64

throw new AmqpRejectAndDontRequeueException("Unknown error", e);

65

}

66

}

67

68

@RabbitListener(queues = "manual.ack.queue", ackMode = "MANUAL")

69

public void handleWithManualAck(String message, Channel channel,

70

@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {

71

try {

72

processMessage(message);

73

channel.basicAck(deliveryTag, false);

74

} catch (RecoverableException e) {

75

// Negative acknowledgment with requeue

76

channel.basicNack(deliveryTag, false, true);

77

} catch (Exception e) {

78

// Negative acknowledgment without requeue

79

channel.basicNack(deliveryTag, false, false);

80

}

81

}

82

}

83

```

84

85

## Retry Configuration

86

87

### Template Retry Configuration

88

89

```java { .api }

90

@FunctionalInterface

91

public interface RabbitRetryTemplateCustomizer {

92

void customize(Target target, RetryTemplate retryTemplate);

93

94

enum Target { SENDER, LISTENER }

95

}

96

97

@Configuration

98

public class RetryConfig {

99

100

@Bean

101

public RabbitRetryTemplateCustomizer retryTemplateCustomizer() {

102

return (target, template) -> {

103

template.setRetryPolicy(createRetryPolicy(target));

104

template.setBackOffPolicy(createBackOffPolicy());

105

template.setListeners(new RetryListener[] { createRetryListener() });

106

};

107

}

108

109

private RetryPolicy createRetryPolicy(RabbitRetryTemplateCustomizer.Target target) {

110

SimpleRetryPolicy policy = new SimpleRetryPolicy();

111

policy.setMaxAttempts(3);

112

113

// Configure which exceptions to retry

114

Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();

115

retryableExceptions.put(AmqpIOException.class, true);

116

retryableExceptions.put(AmqpTimeoutException.class, true);

117

retryableExceptions.put(AmqpResourceNotAvailableException.class, true);

118

retryableExceptions.put(AmqpRejectAndDontRequeueException.class, false);

119

120

policy.setRetryableExceptions(retryableExceptions);

121

return policy;

122

}

123

124

private BackOffPolicy createBackOffPolicy() {

125

ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();

126

backOffPolicy.setInitialInterval(1000L);

127

backOffPolicy.setMultiplier(2.0);

128

backOffPolicy.setMaxInterval(10000L);

129

return backOffPolicy;

130

}

131

132

private RetryListener createRetryListener() {

133

return new RetryListenerSupport() {

134

@Override

135

public <T, E extends Throwable> void onError(RetryContext context,

136

RetryCallback<T, E> callback,

137

Throwable throwable) {

138

log.warn("Retry attempt {} failed: {}",

139

context.getRetryCount(), throwable.getMessage());

140

}

141

};

142

}

143

}

144

```

145

146

### Listener Retry Configuration

147

148

```java { .api }

149

@Configuration

150

public class ListenerRetryConfig {

151

152

@Bean

153

public SimpleRabbitListenerContainerFactory retryContainerFactory(

154

ConnectionFactory connectionFactory,

155

MessageRecoverer messageRecoverer) {

156

157

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

158

factory.setConnectionFactory(connectionFactory);

159

160

// Enable retry

161

factory.setRetryTemplate(retryTemplate());

162

factory.setRecoveryCallback(recoveryCallback());

163

164

return factory;

165

}

166

167

@Bean

168

public RetryTemplate retryTemplate() {

169

RetryTemplate template = new RetryTemplate();

170

171

// Fixed delay retry policy

172

FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();

173

backOffPolicy.setBackOffPeriod(5000L); // 5 seconds

174

template.setBackOffPolicy(backOffPolicy);

175

176

// Simple retry policy

177

SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();

178

retryPolicy.setMaxAttempts(3);

179

template.setRetryPolicy(retryPolicy);

180

181

return template;

182

}

183

184

@Bean

185

public RecoveryCallback<Void> recoveryCallback() {

186

return context -> {

187

Message message = (Message) context.getAttribute("message");

188

Throwable lastException = context.getLastThrowable();

189

190

log.error("Message recovery after {} attempts: {}",

191

context.getRetryCount(), new String(message.getBody()), lastException);

192

193

// Send to dead letter queue or handle differently

194

handleFailedMessage(message, lastException);

195

return null;

196

};

197

}

198

199

private void handleFailedMessage(Message message, Throwable exception) {

200

// Implementation for handling failed messages

201

}

202

}

203

```

204

205

### Advanced Retry Policies

206

207

```java { .api }

208

@Configuration

209

public class AdvancedRetryConfig {

210

211

@Bean

212

public RetryTemplate advancedRetryTemplate() {

213

RetryTemplate template = new RetryTemplate();

214

215

// Composite retry policy

216

CompositeRetryPolicy compositePolicy = new CompositeRetryPolicy();

217

218

// Max attempts policy

219

SimpleRetryPolicy maxAttemptsPolicy = new SimpleRetryPolicy();

220

maxAttemptsPolicy.setMaxAttempts(5);

221

222

// Time-based policy

223

TimeoutRetryPolicy timeoutPolicy = new TimeoutRetryPolicy();

224

timeoutPolicy.setTimeout(30000L); // 30 seconds max

225

226

compositePolicy.setPolicies(new RetryPolicy[] { maxAttemptsPolicy, timeoutPolicy });

227

template.setRetryPolicy(compositePolicy);

228

229

// Exponential backoff with randomization

230

ExponentialRandomBackOffPolicy backOffPolicy = new ExponentialRandomBackOffPolicy();

231

backOffPolicy.setInitialInterval(1000L);

232

backOffPolicy.setMultiplier(2.0);

233

backOffPolicy.setMaxInterval(30000L);

234

template.setBackOffPolicy(backOffPolicy);

235

236

return template;

237

}

238

239

@Bean

240

public MessageRecoverer customMessageRecoverer(RabbitTemplate rabbitTemplate) {

241

return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error.routing.key") {

242

@Override

243

protected Map<String, Object> additionalHeaders(Message message, Throwable cause) {

244

Map<String, Object> headers = new HashMap<>();

245

headers.put("x-original-exchange", message.getMessageProperties().getReceivedExchange());

246

headers.put("x-original-routing-key", message.getMessageProperties().getReceivedRoutingKey());

247

headers.put("x-exception-message", cause.getMessage());

248

headers.put("x-exception-stacktrace", getStackTrace(cause));

249

headers.put("x-retry-count", getRetryCount(message));

250

return headers;

251

}

252

};

253

}

254

}

255

```

256

257

## Dead Letter Queue Configuration

258

259

### Basic Dead Letter Setup

260

261

```java { .api }

262

@Configuration

263

public class DeadLetterConfig {

264

265

// Main exchange and queue

266

@Bean

267

public TopicExchange mainExchange() {

268

return ExchangeBuilder.topicExchange("main.exchange").durable(true).build();

269

}

270

271

@Bean

272

public Queue mainQueue() {

273

return QueueBuilder.durable("main.queue")

274

.withArgument("x-dead-letter-exchange", "dlx.exchange")

275

.withArgument("x-dead-letter-routing-key", "failed")

276

.withArgument("x-message-ttl", 300000) // 5 minutes TTL

277

.build();

278

}

279

280

@Bean

281

public Binding mainQueueBinding() {

282

return BindingBuilder.bind(mainQueue()).to(mainExchange()).with("main.routing.key");

283

}

284

285

// Dead letter exchange and queue

286

@Bean

287

public DirectExchange deadLetterExchange() {

288

return ExchangeBuilder.directExchange("dlx.exchange").durable(true).build();

289

}

290

291

@Bean

292

public Queue deadLetterQueue() {

293

return QueueBuilder.durable("dead.letter.queue").build();

294

}

295

296

@Bean

297

public Binding deadLetterBinding() {

298

return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("failed");

299

}

300

}

301

```

302

303

### Dead Letter Message Handler

304

305

```java { .api }

306

@Component

307

public class DeadLetterHandler {

308

309

@RabbitListener(queues = "dead.letter.queue")

310

public void handleDeadLetter(Message message,

311

@Header Map<String, Object> headers,

312

@Header(required = false, name = "x-death") List<Map<String, Object>> xDeath) {

313

314

String originalExchange = (String) headers.get("x-original-exchange");

315

String originalRoutingKey = (String) headers.get("x-original-routing-key");

316

String messageBody = new String(message.getBody());

317

318

log.error("Dead letter received - Original exchange: {}, routing key: {}, message: {}",

319

originalExchange, originalRoutingKey, messageBody);

320

321

// Analyze death information

322

if (xDeath != null && !xDeath.isEmpty()) {

323

Map<String, Object> firstDeath = xDeath.get(0);

324

String reason = (String) firstDeath.get("reason");

325

Long count = (Long) firstDeath.get("count");

326

String queue = (String) firstDeath.get("queue");

327

328

log.error("Death reason: {}, count: {}, from queue: {}", reason, count, queue);

329

330

// Handle based on death reason

331

handleDeadLetterByReason(message, reason, count);

332

}

333

}

334

335

private void handleDeadLetterByReason(Message message, String reason, Long count) {

336

switch (reason) {

337

case "rejected":

338

handleRejectedMessage(message);

339

break;

340

case "expired":

341

handleExpiredMessage(message);

342

break;

343

case "maxlen":

344

handleMaxLengthExceeded(message);

345

break;

346

default:

347

handleUnknownDeathReason(message, reason);

348

}

349

}

350

351

private void handleRejectedMessage(Message message) {

352

// Handle messages that were explicitly rejected

353

log.info("Handling rejected message: {}", new String(message.getBody()));

354

// Could send to human review queue, log to external system, etc.

355

}

356

357

private void handleExpiredMessage(Message message) {

358

// Handle messages that expired (TTL exceeded)

359

log.info("Handling expired message: {}", new String(message.getBody()));

360

}

361

362

private void handleMaxLengthExceeded(Message message) {

363

// Handle messages dropped due to queue length limit

364

log.info("Handling message dropped due to max length: {}", new String(message.getBody()));

365

}

366

367

private void handleUnknownDeathReason(Message message, String reason) {

368

log.warn("Unknown death reason '{}' for message: {}", reason, new String(message.getBody()));

369

}

370

}

371

```

372

373

### Advanced Dead Letter Scenarios

374

375

```java { .api }

376

@Configuration

377

public class AdvancedDeadLetterConfig {

378

379

// Retry queue with limited attempts

380

@Bean

381

public Queue retryQueue() {

382

return QueueBuilder.durable("retry.queue")

383

.withArgument("x-dead-letter-exchange", "processing.exchange")

384

.withArgument("x-dead-letter-routing-key", "retry.processing")

385

.withArgument("x-message-ttl", 30000) // 30 seconds delay

386

.build();

387

}

388

389

// Final dead letter queue after retries exhausted

390

@Bean

391

public Queue finalDeadLetterQueue() {

392

return QueueBuilder.durable("final.dead.letter.queue").build();

393

}

394

395

@Component

396

public static class RetryDeadLetterHandler {

397

398

private static final String RETRY_COUNT_HEADER = "x-retry-count";

399

private static final int MAX_RETRIES = 3;

400

401

@Autowired

402

private RabbitTemplate rabbitTemplate;

403

404

@RabbitListener(queues = "main.processing.queue")

405

public void processMessage(Message message) throws Exception {

406

try {

407

// Process message

408

processBusinessLogic(new String(message.getBody()));

409

} catch (RecoverableException e) {

410

handleRecoverableError(message, e);

411

} catch (Exception e) {

412

// Non-recoverable error - send to final dead letter

413

log.error("Non-recoverable error processing message", e);

414

throw new AmqpRejectAndDontRequeueException("Non-recoverable error", e);

415

}

416

}

417

418

private void handleRecoverableError(Message message, Exception e) {

419

Integer retryCount = (Integer) message.getMessageProperties().getHeaders().get(RETRY_COUNT_HEADER);

420

retryCount = retryCount == null ? 0 : retryCount;

421

422

if (retryCount < MAX_RETRIES) {

423

// Send to retry queue with incremented count

424

message.getMessageProperties().setHeader(RETRY_COUNT_HEADER, retryCount + 1);

425

rabbitTemplate.send("retry.exchange", "retry", message);

426

log.info("Sending message for retry attempt {}", retryCount + 1);

427

} else {

428

// Max retries exceeded - send to final dead letter

429

log.error("Max retries exceeded for message after {} attempts", retryCount);

430

throw new AmqpRejectAndDontRequeueException("Max retries exceeded", e);

431

}

432

}

433

434

private void processBusinessLogic(String messageBody) throws Exception {

435

// Business logic implementation

436

}

437

}

438

}

439

```

440

441

## Error Recovery Strategies

442

443

### Message Recovery Interface

444

445

```java { .api }

446

public interface MessageRecoverer {

447

void recover(Message message, Throwable cause);

448

}

449

450

// Built-in recoverers

451

public class RejectAndDontRequeueRecoverer implements MessageRecoverer {

452

@Override

453

public void recover(Message message, Throwable cause) {

454

throw new AmqpRejectAndDontRequeueException("Retry attempts exhausted", cause);

455

}

456

}

457

458

public class RepublishMessageRecoverer implements MessageRecoverer {

459

public RepublishMessageRecoverer(AmqpTemplate amqpTemplate, String exchange, String routingKey);

460

461

@Override

462

public void recover(Message message, Throwable cause) {

463

// Republish message to error queue with additional headers

464

}

465

466

protected Map<String, Object> additionalHeaders(Message message, Throwable cause) {

467

// Override to add custom headers

468

return new HashMap<>();

469

}

470

}

471

472

public class ImmediateRequeueMessageRecoverer implements MessageRecoverer {

473

@Override

474

public void recover(Message message, Throwable cause) {

475

throw new ImmediateRequeueAmqpException("Immediate requeue", cause);

476

}

477

}

478

```

479

480

### Custom Recovery Strategies

481

482

```java { .api }

483

@Component

484

public class CustomMessageRecoverer implements MessageRecoverer {

485

486

private final RabbitTemplate rabbitTemplate;

487

private final NotificationService notificationService;

488

private final MetricsService metricsService;

489

490

public CustomMessageRecoverer(RabbitTemplate rabbitTemplate,

491

NotificationService notificationService,

492

MetricsService metricsService) {

493

this.rabbitTemplate = rabbitTemplate;

494

this.notificationService = notificationService;

495

this.metricsService = metricsService;

496

}

497

498

@Override

499

public void recover(Message message, Throwable cause) {

500

String messageType = getMessageType(message);

501

String messageId = getMessageId(message);

502

503

// Record metrics

504

metricsService.incrementFailedMessageCount(messageType);

505

506

// Determine recovery strategy based on message type and error

507

RecoveryStrategy strategy = determineRecoveryStrategy(message, cause);

508

509

switch (strategy) {

510

case REPUBLISH_TO_ERROR_QUEUE:

511

republishToErrorQueue(message, cause);

512

break;

513

case REPUBLISH_WITH_DELAY:

514

republishWithDelay(message, cause);

515

break;

516

case NOTIFY_AND_DISCARD:

517

notifyAndDiscard(message, cause);

518

break;

519

case STORE_FOR_MANUAL_REVIEW:

520

storeForManualReview(message, cause);

521

break;

522

default:

523

rejectMessage(message, cause);

524

}

525

}

526

527

private RecoveryStrategy determineRecoveryStrategy(Message message, Throwable cause) {

528

if (cause instanceof BusinessValidationException) {

529

return RecoveryStrategy.STORE_FOR_MANUAL_REVIEW;

530

} else if (cause instanceof ExternalServiceException) {

531

return RecoveryStrategy.REPUBLISH_WITH_DELAY;

532

} else if (isCriticalMessage(message)) {

533

return RecoveryStrategy.NOTIFY_AND_DISCARD;

534

} else {

535

return RecoveryStrategy.REPUBLISH_TO_ERROR_QUEUE;

536

}

537

}

538

539

private void republishToErrorQueue(Message message, Throwable cause) {

540

Map<String, Object> headers = new HashMap<>(message.getMessageProperties().getHeaders());

541

headers.put("x-exception-message", cause.getMessage());

542

headers.put("x-exception-class", cause.getClass().getName());

543

headers.put("x-failed-timestamp", System.currentTimeMillis());

544

545

Message errorMessage = MessageBuilder.withBody(message.getBody())

546

.copyProperties(message.getMessageProperties())

547

.setHeaders(headers)

548

.build();

549

550

rabbitTemplate.send("error.exchange", "error", errorMessage);

551

}

552

553

private void republishWithDelay(Message message, Throwable cause) {

554

// Implement delayed republishing logic

555

}

556

557

private void notifyAndDiscard(Message message, Throwable cause) {

558

notificationService.sendAlert("Critical message processing failed",

559

getMessageId(message), cause.getMessage());

560

}

561

562

private void storeForManualReview(Message message, Throwable cause) {

563

// Store message in database for manual review

564

}

565

566

private void rejectMessage(Message message, Throwable cause) {

567

throw new AmqpRejectAndDontRequeueException("Message recovery failed", cause);

568

}

569

570

enum RecoveryStrategy {

571

REPUBLISH_TO_ERROR_QUEUE,

572

REPUBLISH_WITH_DELAY,

573

NOTIFY_AND_DISCARD,

574

STORE_FOR_MANUAL_REVIEW,

575

REJECT

576

}

577

}

578

```

579

580

## Circuit Breaker Pattern

581

582

### Circuit Breaker Implementation

583

584

```java { .api }

585

@Component

586

public class CircuitBreakerMessageHandler {

587

588

private final CircuitBreaker circuitBreaker;

589

private final RabbitTemplate rabbitTemplate;

590

591

public CircuitBreakerMessageHandler() {

592

this.circuitBreaker = CircuitBreaker.ofDefaults("messageProcessing");

593

configureCircuitBreaker();

594

}

595

596

private void configureCircuitBreaker() {

597

circuitBreaker.getEventPublisher().onStateTransition(event -> {

598

log.info("Circuit breaker state transition: {} -> {}",

599

event.getStateTransition().getFromState(),

600

event.getStateTransition().getToState());

601

});

602

603

circuitBreaker.getEventPublisher().onFailureRateExceeded(event -> {

604

log.warn("Circuit breaker failure rate exceeded: {}%",

605

event.getFailureRate());

606

});

607

}

608

609

@RabbitListener(queues = "circuit.breaker.queue")

610

public void handleWithCircuitBreaker(String message) {

611

Supplier<String> decoratedSupplier = CircuitBreaker

612

.decorateSupplier(circuitBreaker, () -> processMessage(message));

613

614

try {

615

String result = decoratedSupplier.get();

616

log.info("Message processed successfully: {}", result);

617

} catch (CallNotPermittedException e) {

618

// Circuit breaker is open

619

log.warn("Circuit breaker is open, message will be requeued: {}", message);

620

handleCircuitBreakerOpen(message);

621

} catch (Exception e) {

622

log.error("Error processing message: {}", message, e);

623

throw new AmqpRejectAndDontRequeueException("Processing failed", e);

624

}

625

}

626

627

private String processMessage(String message) {

628

// Message processing logic that might fail

629

if (message.contains("error")) {

630

throw new RuntimeException("Simulated processing error");

631

}

632

return "processed: " + message;

633

}

634

635

private void handleCircuitBreakerOpen(String message) {

636

// Send to delay queue for later retry when circuit breaker might be closed

637

rabbitTemplate.convertAndSend("delay.exchange", "delay.5min", message);

638

}

639

}