or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconfirms-returns.mdconnection-channel.mdconsumer-api.mdconsuming.mderror-recovery.mdindex.mdobservability.mdpublishing.mdrpc.md

error-recovery.mddocs/

0

# Error Handling and Recovery

1

2

Exception classes and automatic recovery mechanisms for handling network failures, protocol errors, and application-initiated shutdowns. The RabbitMQ Java client provides comprehensive error handling and automatic recovery capabilities for robust messaging applications.

3

4

## Capabilities

5

6

### Core Exception Types

7

8

Base exception classes for different types of errors in AMQP operations.

9

10

```java { .api }

11

/**

12

* Exception indicating a shutdown signal from connection or channel

13

*/

14

public class ShutdownSignalException extends RuntimeException {

15

/**

16

* Get the shutdown reason object

17

* @return AMQP method object indicating the reason

18

*/

19

public Object getReason();

20

21

/**

22

* Check if this is a hard error (connection-level)

23

* @return true for connection errors, false for channel errors

24

*/

25

public boolean isHardError();

26

27

/**

28

* Check if shutdown was initiated by the application

29

* @return true if application initiated, false if server initiated

30

*/

31

public boolean isInitiatedByApplication();

32

33

/**

34

* Get reference to the connection or channel that was shut down

35

* @return ShutdownNotifier object (Connection or Channel)

36

*/

37

public ShutdownNotifier getReference();

38

}

39

40

/**

41

* Exception for operations on already closed connections or channels

42

*/

43

public class AlreadyClosedException extends ShutdownSignalException {

44

/**

45

* Create exception with shutdown signal

46

* @param shutdownSignalException - Original shutdown signal

47

*/

48

public AlreadyClosedException(ShutdownSignalException shutdownSignalException);

49

}

50

51

/**

52

* Exception for malformed AMQP frames

53

*/

54

public class MalformedFrameException extends IOException {

55

/**

56

* Create exception with message

57

* @param message - Error message

58

*/

59

public MalformedFrameException(String message);

60

}

61

62

/**

63

* Exception for missed heartbeats from server

64

*/

65

public class MissedHeartbeatException extends IOException {

66

/**

67

* Create exception with message

68

* @param message - Error message

69

*/

70

public MissedHeartbeatException(String message);

71

}

72

```

73

74

**Usage Examples:**

75

76

```java

77

// Handling shutdown signals

78

try {

79

channel.basicPublish("exchange", "key", null, message.getBytes());

80

} catch (ShutdownSignalException e) {

81

if (e.isHardError()) {

82

System.out.println("Connection error: " + e.getReason());

83

// Handle connection-level error

84

reconnect();

85

} else {

86

System.out.println("Channel error: " + e.getReason());

87

// Handle channel-level error

88

reopenChannel();

89

}

90

} catch (AlreadyClosedException e) {

91

System.out.println("Attempted operation on closed resource");

92

// Recreate connection/channel

93

recreateResources();

94

}

95

```

96

97

### Authentication Exceptions

98

99

Exception types for authentication and authorization failures.

100

101

```java { .api }

102

/**

103

* Base class for possible authentication failures

104

*/

105

public class PossibleAuthenticationFailureException extends IOException {

106

/**

107

* Create exception with message

108

* @param message - Error message

109

*/

110

public PossibleAuthenticationFailureException(String message);

111

}

112

113

/**

114

* Exception for confirmed authentication failures

115

*/

116

public class AuthenticationFailureException extends PossibleAuthenticationFailureException {

117

/**

118

* Create exception with message

119

* @param message - Error message

120

*/

121

public AuthenticationFailureException(String message);

122

}

123

```

124

125

### Protocol and Frame Exceptions

126

127

Exceptions for protocol-level errors and unexpected conditions.

128

129

```java { .api }

130

/**

131

* Exception for protocol version mismatches

132

*/

133

public class ProtocolVersionMismatchException extends IOException {

134

/**

135

* Create exception with version information

136

* @param clientMajor - Client protocol major version

137

* @param clientMinor - Client protocol minor version

138

* @param serverMajor - Server protocol major version

139

* @param serverMinor - Server protocol minor version

140

*/

141

public ProtocolVersionMismatchException(int clientMajor, int clientMinor, int serverMajor, int serverMinor);

142

143

public int getClientMajor();

144

public int getClientMinor();

145

public int getServerMajor();

146

public int getServerMinor();

147

}

148

149

/**

150

* Error for unexpected AMQP frames

151

*/

152

public class UnexpectedFrameError extends Error {

153

/**

154

* Create error with frame information

155

* @param frame - Unexpected frame object

156

* @param expectedFrameType - Expected frame type

157

*/

158

public UnexpectedFrameError(Frame frame, int expectedFrameType);

159

}

160

161

/**

162

* Error for unexpected AMQP methods

163

*/

164

public class UnexpectedMethodError extends Error {

165

/**

166

* Create error with method information

167

* @param method - Unexpected method object

168

*/

169

public UnexpectedMethodError(Method method);

170

}

171

172

/**

173

* Exception for unknown class or method IDs

174

*/

175

public class UnknownClassOrMethodId extends IOException {

176

/**

177

* Create exception with class and method IDs

178

* @param classId - AMQP class ID

179

* @param methodId - AMQP method ID

180

*/

181

public UnknownClassOrMethodId(int classId, int methodId);

182

183

public int getClassId();

184

public int getMethodId();

185

}

186

```

187

188

### Channel and Operation Exceptions

189

190

Exceptions specific to channel operations and RPC timeouts.

191

192

```java { .api }

193

/**

194

* Exception for channel RPC operation timeouts

195

*/

196

public class ChannelContinuationTimeoutException extends TimeoutException {

197

/**

198

* Create timeout exception

199

*/

200

public ChannelContinuationTimeoutException();

201

202

/**

203

* Create timeout exception with message

204

* @param message - Error message

205

*/

206

public ChannelContinuationTimeoutException(String message);

207

}

208

209

/**

210

* Exception for consumer cancellation

211

*/

212

public class ConsumerCancelledException extends RuntimeException {

213

/**

214

* Create consumer cancellation exception

215

*/

216

public ConsumerCancelledException();

217

}

218

219

/**

220

* Exception for unroutable RPC requests

221

*/

222

public class UnroutableRpcRequestException extends IOException {

223

/**

224

* Create exception for unroutable RPC request

225

* @param message - Error message

226

*/

227

public UnroutableRpcRequestException(String message);

228

}

229

```

230

231

### Topology Recovery Exceptions

232

233

Exceptions related to automatic topology recovery.

234

235

```java { .api }

236

/**

237

* Exception during topology recovery process

238

*/

239

public class TopologyRecoveryException extends Exception {

240

/**

241

* Create recovery exception with cause

242

* @param cause - Underlying cause of recovery failure

243

*/

244

public TopologyRecoveryException(Throwable cause);

245

246

/**

247

* Create recovery exception with message and cause

248

* @param message - Error message

249

* @param cause - Underlying cause

250

*/

251

public TopologyRecoveryException(String message, Throwable cause);

252

}

253

```

254

255

## Recovery System

256

257

### Automatic Recovery

258

259

Interfaces and classes for automatic connection and topology recovery.

260

261

```java { .api }

262

/**

263

* Interface for objects that support recovery

264

*/

265

public interface Recoverable {

266

/**

267

* Add a recovery listener

268

* @param listener - Listener to notify on recovery events

269

*/

270

void addRecoveryListener(RecoveryListener listener);

271

272

/**

273

* Remove a recovery listener

274

* @param listener - Listener to remove

275

*/

276

void removeRecoveryListener(RecoveryListener listener);

277

}

278

279

/**

280

* Listener interface for recovery events

281

*/

282

public interface RecoveryListener {

283

/**

284

* Called when recovery completes successfully

285

* @param recoverable - Object that was recovered

286

*/

287

void handleRecovery(Recoverable recoverable);

288

289

/**

290

* Called when recovery process starts

291

* @param recoverable - Object being recovered

292

*/

293

void handleRecoveryStarted(Recoverable recoverable);

294

}

295

296

/**

297

* Interface for handling recovery delays

298

*/

299

public interface RecoveryDelayHandler {

300

/**

301

* Get delay before next recovery attempt

302

* @param recoveryAttempts - Number of recovery attempts so far

303

* @return Delay in milliseconds before next attempt

304

*/

305

long getDelay(int recoveryAttempts);

306

}

307

```

308

309

**Usage Examples:**

310

311

```java

312

// Custom recovery listener

313

RecoveryListener recoveryListener = new RecoveryListener() {

314

@Override

315

public void handleRecovery(Recoverable recoverable) {

316

if (recoverable instanceof Connection) {

317

System.out.println("Connection recovered successfully");

318

// Notify application components

319

notifyConnectionRecovered();

320

} else if (recoverable instanceof Channel) {

321

System.out.println("Channel recovered successfully");

322

// Restart consumers if needed

323

restartConsumers();

324

}

325

}

326

327

@Override

328

public void handleRecoveryStarted(Recoverable recoverable) {

329

System.out.println("Recovery started for: " + recoverable);

330

// Pause message processing during recovery

331

pauseProcessing();

332

}

333

};

334

335

// Add to recoverable connection

336

ConnectionFactory factory = new ConnectionFactory();

337

factory.setAutomaticRecoveryEnabled(true);

338

RecoverableConnection connection = (RecoverableConnection) factory.newConnection();

339

connection.addRecoveryListener(recoveryListener);

340

```

341

342

```java

343

// Custom recovery delay handler with exponential backoff

344

RecoveryDelayHandler delayHandler = new RecoveryDelayHandler() {

345

@Override

346

public long getDelay(int recoveryAttempts) {

347

// Exponential backoff: 1s, 2s, 4s, 8s, max 30s

348

long delay = Math.min(1000L * (1L << recoveryAttempts), 30000L);

349

System.out.println("Recovery attempt " + recoveryAttempts + ", waiting " + delay + "ms");

350

return delay;

351

}

352

};

353

354

ConnectionFactory factory = new ConnectionFactory();

355

factory.setRecoveryDelayHandler(delayHandler);

356

factory.setAutomaticRecoveryEnabled(true);

357

```

358

359

### Exception Handling

360

361

Interface for handling exceptions in consumers and other callback contexts.

362

363

```java { .api }

364

/**

365

* Interface for handling exceptions in consumers and connections

366

*/

367

public interface ExceptionHandler {

368

/**

369

* Handle unexpected exception in connection driver

370

* @param conn - Connection where exception occurred

371

* @param exception - Exception that occurred

372

*/

373

void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception);

374

375

/**

376

* Handle exception in return listener

377

* @param channel - Channel where exception occurred

378

* @param exception - Exception that occurred

379

*/

380

void handleReturnListenerException(Channel channel, Throwable exception);

381

382

/**

383

* Handle exception in flow listener

384

* @param channel - Channel where exception occurred

385

* @param exception - Exception that occurred

386

*/

387

void handleFlowListenerException(Channel channel, Throwable exception);

388

389

/**

390

* Handle exception in confirm listener

391

* @param channel - Channel where exception occurred

392

* @param exception - Exception that occurred

393

*/

394

void handleConfirmListenerException(Channel channel, Throwable exception);

395

396

/**

397

* Handle exception in blocked listener

398

* @param connection - Connection where exception occurred

399

* @param exception - Exception that occurred

400

*/

401

void handleBlockedListenerException(Connection connection, Throwable exception);

402

403

/**

404

* Handle exception in consumer

405

* @param channel - Channel where exception occurred

406

* @param exception - Exception that occurred

407

* @param consumer - Consumer that threw exception

408

* @param consumerTag - Consumer tag

409

* @param methodName - Method where exception occurred

410

*/

411

void handleConsumerException(Channel channel, Throwable exception, Consumer consumer, String consumerTag, String methodName);

412

413

/**

414

* Handle exception during connection recovery

415

* @param conn - Connection being recovered

416

* @param exception - Exception that occurred

417

*/

418

void handleConnectionRecoveryException(Connection conn, Throwable exception);

419

420

/**

421

* Handle exception during channel recovery

422

* @param ch - Channel being recovered

423

* @param exception - Exception that occurred

424

*/

425

void handleChannelRecoveryException(Channel ch, Throwable exception);

426

427

/**

428

* Handle exception during topology recovery

429

* @param conn - Connection being recovered

430

* @param ch - Channel being recovered (may be null)

431

* @param exception - Exception that occurred

432

*/

433

void handleTopologyRecoveryException(Connection conn, Channel ch, TopologyRecoveryException exception);

434

}

435

```

436

437

**Usage Examples:**

438

439

```java

440

// Custom exception handler with logging and metrics

441

public class CustomExceptionHandler implements ExceptionHandler {

442

private static final Logger logger = LoggerFactory.getLogger(CustomExceptionHandler.class);

443

private final MetricsRegistry metrics;

444

445

public CustomExceptionHandler(MetricsRegistry metrics) {

446

this.metrics = metrics;

447

}

448

449

@Override

450

public void handleConsumerException(Channel channel, Throwable exception,

451

Consumer consumer, String consumerTag, String methodName) {

452

logger.error("Consumer exception in {}: {}", methodName, exception.getMessage(), exception);

453

metrics.counter("consumer.exceptions").increment();

454

455

// Optionally restart consumer

456

if (exception instanceof RuntimeException) {

457

restartConsumer(channel, consumer, consumerTag);

458

}

459

}

460

461

@Override

462

public void handleConnectionRecoveryException(Connection conn, Throwable exception) {

463

logger.error("Connection recovery failed: {}", exception.getMessage(), exception);

464

metrics.counter("connection.recovery.failures").increment();

465

466

// Send alert to monitoring system

467

alertingService.sendAlert("RabbitMQ connection recovery failed", exception);

468

}

469

470

@Override

471

public void handleTopologyRecoveryException(Connection conn, Channel ch, TopologyRecoveryException exception) {

472

logger.error("Topology recovery failed: {}", exception.getMessage(), exception);

473

metrics.counter("topology.recovery.failures").increment();

474

475

// Attempt manual topology recreation

476

scheduleTopologyRecreation(conn, ch);

477

}

478

479

// Implement other methods with appropriate logging and handling...

480

}

481

482

// Use custom exception handler

483

ConnectionFactory factory = new ConnectionFactory();

484

factory.setExceptionHandler(new CustomExceptionHandler(metricsRegistry));

485

```

486

487

### Error Handling Patterns

488

489

**Robust Consumer with Error Handling:**

490

491

```java

492

public class RobustConsumer extends DefaultConsumer {

493

private static final Logger logger = LoggerFactory.getLogger(RobustConsumer.class);

494

private final int maxRetries;

495

496

public RobustConsumer(Channel channel, int maxRetries) {

497

super(channel);

498

this.maxRetries = maxRetries;

499

}

500

501

@Override

502

public void handleDelivery(String consumerTag, Envelope envelope,

503

AMQP.Properties properties, byte[] body) throws IOException {

504

try {

505

processMessage(new String(body, "UTF-8"), properties);

506

getChannel().basicAck(envelope.getDeliveryTag(), false);

507

508

} catch (Exception e) {

509

logger.error("Error processing message: {}", e.getMessage(), e);

510

handleProcessingError(envelope, properties, body, e);

511

}

512

}

513

514

private void handleProcessingError(Envelope envelope, AMQP.BasicProperties properties,

515

byte[] body, Exception error) throws IOException {

516

// Get retry count from headers

517

Map<String, Object> headers = properties.getHeaders();

518

int retryCount = headers != null && headers.containsKey("x-retry-count") ?

519

(Integer) headers.get("x-retry-count") : 0;

520

521

if (retryCount < maxRetries) {

522

// Republish for retry

523

republishForRetry(envelope, properties, body, retryCount + 1);

524

getChannel().basicAck(envelope.getDeliveryTag(), false);

525

} else {

526

// Send to dead letter queue or log as failed

527

logger.error("Message failed after {} retries, sending to DLQ", maxRetries);

528

getChannel().basicNack(envelope.getDeliveryTag(), false, false);

529

}

530

}

531

532

@Override

533

public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {

534

if (!sig.isInitiatedByApplication()) {

535

logger.error("Consumer {} received unexpected shutdown: {}", consumerTag, sig.getReason());

536

// Implement reconnection logic

537

scheduleReconnection();

538

}

539

}

540

}

541

```

542

543

**Connection Recovery with Custom Logic:**

544

545

```java

546

public class ResilientConnectionManager {

547

private ConnectionFactory factory;

548

private volatile Connection connection;

549

private final List<Channel> channels = new CopyOnWriteArrayList<>();

550

551

public ResilientConnectionManager() {

552

setupConnectionFactory();

553

}

554

555

private void setupConnectionFactory() {

556

factory = new ConnectionFactory();

557

factory.setAutomaticRecoveryEnabled(true);

558

factory.setNetworkRecoveryInterval(5000);

559

560

// Custom recovery listener

561

factory.setRecoveryDelayHandler(recoveryAttempts -> {

562

long delay = Math.min(1000L * recoveryAttempts, 30000L);

563

logger.info("Recovery attempt {}, waiting {}ms", recoveryAttempts, delay);

564

return delay;

565

});

566

567

// Custom exception handler

568

factory.setExceptionHandler(new ExceptionHandler() {

569

@Override

570

public void handleConnectionRecoveryException(Connection conn, Throwable exception) {

571

logger.error("Connection recovery failed", exception);

572

// Custom recovery logic

573

attemptManualRecovery();

574

}

575

576

// ... implement other methods

577

});

578

}

579

580

public synchronized Connection getConnection() throws IOException, TimeoutException {

581

if (connection == null || !connection.isOpen()) {

582

connection = factory.newConnection();

583

((RecoverableConnection) connection).addRecoveryListener(new RecoveryListener() {

584

@Override

585

public void handleRecovery(Recoverable recoverable) {

586

logger.info("Connection recovered successfully");

587

// Recreate channels and consumers

588

recreateChannels();

589

}

590

591

@Override

592

public void handleRecoveryStarted(Recoverable recoverable) {

593

logger.info("Connection recovery started");

594

}

595

});

596

}

597

return connection;

598

}

599

}

600

```