or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication-security.mdchannel-operations.mdconnection-adapters.mdconnection-management.mdexception-handling.mdindex.mdmessage-properties-types.md

exception-handling.mddocs/

0

# Exception Handling

1

2

Comprehensive exception hierarchy for connection, channel, and protocol errors with detailed error information and recovery patterns for robust AMQP client error handling.

3

4

## Capabilities

5

6

### Base AMQP Exceptions

7

8

Core exception classes for AMQP-related errors.

9

10

```python { .api }

11

class AMQPError(Exception):

12

"""Base class for all AMQP-related errors."""

13

14

class AMQPConnectionError(AMQPError):

15

"""Base class for connection-related errors."""

16

17

class AMQPChannelError(AMQPError):

18

"""Base class for channel-related errors."""

19

```

20

21

### Connection Exceptions

22

23

Exceptions related to connection establishment, maintenance, and closure.

24

25

```python { .api }

26

class ConnectionOpenAborted(AMQPConnectionError):

27

"""Client closed connection while opening."""

28

29

class StreamLostError(AMQPConnectionError):

30

"""Stream (TCP) connection lost."""

31

32

class IncompatibleProtocolError(AMQPConnectionError):

33

"""The protocol returned by the server is not supported."""

34

35

class AuthenticationError(AMQPConnectionError):

36

"""Server and client could not negotiate authentication mechanism."""

37

38

class ProbableAuthenticationError(AMQPConnectionError):

39

"""Client was disconnected at a connection stage indicating probable authentication error."""

40

41

class ProbableAccessDeniedError(AMQPConnectionError):

42

"""Client was disconnected indicating probable denial of access to virtual host."""

43

44

class NoFreeChannels(AMQPConnectionError):

45

"""The connection has run out of free channels."""

46

47

class ConnectionWrongStateError(AMQPConnectionError):

48

"""Connection is in wrong state for the requested operation."""

49

50

class ConnectionClosed(AMQPConnectionError):

51

"""Connection closed by broker or client."""

52

53

def __init__(self, reply_code, reply_text):

54

"""

55

Parameters:

56

- reply_code (int): AMQP reply code for closure

57

- reply_text (str): Human-readable closure reason

58

"""

59

60

@property

61

def reply_code(self) -> int:

62

"""AMQP reply code for connection closure."""

63

64

@property

65

def reply_text(self) -> str:

66

"""Human-readable reason for connection closure."""

67

68

class ConnectionClosedByBroker(ConnectionClosed):

69

"""Connection.Close from broker."""

70

71

class ConnectionClosedByClient(ConnectionClosed):

72

"""Connection was closed at request of Pika client."""

73

74

class ConnectionBlockedTimeout(AMQPConnectionError):

75

"""RabbitMQ-specific: timed out waiting for connection.unblocked."""

76

77

class AMQPHeartbeatTimeout(AMQPConnectionError):

78

"""Connection was dropped as result of heartbeat timeout."""

79

```

80

81

### Channel Exceptions

82

83

Exceptions related to channel operations and state management.

84

85

```python { .api }

86

class ChannelWrongStateError(AMQPChannelError):

87

"""Channel is in wrong state for the requested operation."""

88

89

class ChannelClosed(AMQPChannelError):

90

"""The channel closed by client or by broker."""

91

92

def __init__(self, reply_code, reply_text):

93

"""

94

Parameters:

95

- reply_code (int): AMQP reply code for channel closure

96

- reply_text (str): Human-readable closure reason

97

"""

98

99

@property

100

def reply_code(self) -> int:

101

"""AMQP reply code for channel closure."""

102

103

@property

104

def reply_text(self) -> str:

105

"""Human-readable reason for channel closure."""

106

107

class ChannelClosedByBroker(ChannelClosed):

108

"""Channel.Close from broker; may be passed as reason to channel's on-closed callback."""

109

110

class ChannelClosedByClient(ChannelClosed):

111

"""Channel closed by client upon receipt of Channel.CloseOk."""

112

113

class DuplicateConsumerTag(AMQPChannelError):

114

"""The consumer tag specified already exists for this channel."""

115

116

class ConsumerCancelled(AMQPChannelError):

117

"""Server cancelled consumer."""

118

119

class UnroutableError(AMQPChannelError):

120

"""Exception containing one or more unroutable messages returned by broker via Basic.Return."""

121

122

def __init__(self, messages):

123

"""

124

Parameters:

125

- messages (list): Sequence of returned unroutable messages

126

"""

127

128

@property

129

def messages(self) -> list:

130

"""List of unroutable returned messages."""

131

132

class NackError(AMQPChannelError):

133

"""Exception raised when a message published in publisher-acknowledgements mode is Nack'ed by the broker."""

134

135

def __init__(self, messages):

136

"""

137

Parameters:

138

- messages (list): Sequence of nacked messages

139

"""

140

141

@property

142

def messages(self) -> list:

143

"""List of nacked messages."""

144

```

145

146

### Protocol Exceptions

147

148

Exceptions related to AMQP protocol handling and frame processing.

149

150

```python { .api }

151

class ProtocolSyntaxError(AMQPError):

152

"""An unspecified protocol syntax error occurred."""

153

154

class UnexpectedFrameError(ProtocolSyntaxError):

155

"""Received a frame out of sequence."""

156

157

class ProtocolVersionMismatch(ProtocolSyntaxError):

158

"""Protocol versions did not match."""

159

160

class BodyTooLongError(ProtocolSyntaxError):

161

"""Received too many bytes for a message delivery."""

162

163

class InvalidFrameError(ProtocolSyntaxError):

164

"""Invalid frame received."""

165

166

class InvalidFieldTypeException(ProtocolSyntaxError):

167

"""Unsupported field kind."""

168

169

class UnsupportedAMQPFieldException(ProtocolSyntaxError):

170

"""Unsupported AMQP field kind."""

171

172

class InvalidChannelNumber(AMQPError):

173

"""An invalid channel number has been specified."""

174

175

class MethodNotImplemented(AMQPError):

176

"""AMQP method not implemented."""

177

178

class ShortStringTooLong(AMQPError):

179

"""AMQP Short String can contain up to 255 bytes."""

180

```

181

182

### Blocking Connection Exceptions

183

184

Exceptions specific to BlockingConnection usage patterns.

185

186

```python { .api }

187

class ChannelError(Exception):

188

"""An unspecified error occurred with the Channel."""

189

190

class ReentrancyError(Exception):

191

"""The requested operation would result in unsupported recursion or reentrancy."""

192

193

class DuplicateGetOkCallback(ChannelError):

194

"""basic_get can only be called again after the callback for the previous basic_get is executed."""

195

```

196

197

### Returned Message Types

198

199

Data structures for handling returned and nacked messages.

200

201

```python { .api }

202

class ReturnedMessage:

203

"""Represents an unroutable message returned by the broker."""

204

205

@property

206

def method(self):

207

"""Basic.Return method frame."""

208

209

@property

210

def properties(self):

211

"""Message properties (BasicProperties)."""

212

213

@property

214

def body(self) -> bytes:

215

"""Message body."""

216

```

217

218

## Usage Examples

219

220

### Basic Exception Handling

221

222

```python

223

import pika

224

import pika.exceptions

225

226

try:

227

connection = pika.BlockingConnection(

228

pika.ConnectionParameters('nonexistent-host')

229

)

230

except pika.exceptions.AMQPConnectionError as e:

231

print(f"Failed to connect: {e}")

232

except Exception as e:

233

print(f"Unexpected error: {e}")

234

```

235

236

### Connection Error Recovery

237

238

```python

239

import pika

240

import pika.exceptions

241

import time

242

243

def connect_with_retry(parameters, max_retries=5):

244

for attempt in range(max_retries):

245

try:

246

connection = pika.BlockingConnection(parameters)

247

print("Connected successfully")

248

return connection

249

250

except pika.exceptions.AMQPConnectionError as e:

251

print(f"Connection attempt {attempt + 1} failed: {e}")

252

253

if isinstance(e, pika.exceptions.AuthenticationError):

254

print("Authentication failed - check credentials")

255

break

256

elif isinstance(e, pika.exceptions.ProbableAccessDeniedError):

257

print("Access denied - check virtual host permissions")

258

break

259

elif attempt < max_retries - 1:

260

time.sleep(2 ** attempt) # Exponential backoff

261

262

except Exception as e:

263

print(f"Unexpected error: {e}")

264

break

265

266

raise pika.exceptions.AMQPConnectionError("Failed to connect after retries")

267

268

# Usage

269

parameters = pika.ConnectionParameters('localhost')

270

connection = connect_with_retry(parameters)

271

```

272

273

### Channel Exception Handling

274

275

```python

276

import pika

277

import pika.exceptions

278

279

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

280

channel = connection.channel()

281

282

try:

283

# This might fail if queue doesn't exist

284

channel.queue_delete('nonexistent_queue')

285

286

except pika.exceptions.ChannelClosedByBroker as e:

287

print(f"Broker closed channel: {e.reply_code} - {e.reply_text}")

288

289

# Create new channel to continue

290

channel = connection.channel()

291

292

except pika.exceptions.ChannelError as e:

293

print(f"Channel error: {e}")

294

295

connection.close()

296

```

297

298

### Publisher Confirms Error Handling

299

300

```python

301

import pika

302

import pika.exceptions

303

304

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

305

channel = connection.channel()

306

307

# Enable publisher confirms

308

channel.confirm_delivery()

309

310

try:

311

channel.basic_publish(

312

exchange='nonexistent_exchange',

313

routing_key='test',

314

body='Hello World!',

315

mandatory=True

316

)

317

print("Message published successfully")

318

319

except pika.exceptions.UnroutableError as e:

320

print(f"Message was unroutable: {len(e.messages)} messages returned")

321

for returned_message in e.messages:

322

print(f" Returned: {returned_message.body}")

323

324

except pika.exceptions.NackError as e:

325

print(f"Message was nacked: {len(e.messages)} messages")

326

for nacked_message in e.messages:

327

print(f" Nacked: {nacked_message.body}")

328

329

connection.close()

330

```

331

332

### Heartbeat Timeout Handling

333

334

```python

335

import pika

336

import pika.exceptions

337

import threading

338

import time

339

340

def heartbeat_timeout_handler():

341

parameters = pika.ConnectionParameters(

342

'localhost',

343

heartbeat=5 # 5 second heartbeat

344

)

345

346

try:

347

connection = pika.BlockingConnection(parameters)

348

channel = connection.channel()

349

350

# Simulate blocking operation that prevents heartbeat

351

print("Simulating long operation (no heartbeats)...")

352

time.sleep(10) # This will cause heartbeat timeout

353

354

channel.basic_publish(exchange='', routing_key='test', body='Hello')

355

356

except pika.exceptions.AMQPHeartbeatTimeout as e:

357

print(f"Heartbeat timeout occurred: {e}")

358

except pika.exceptions.StreamLostError as e:

359

print(f"Connection lost: {e}")

360

except Exception as e:

361

print(f"Other error: {e}")

362

363

# Run in thread to prevent blocking

364

thread = threading.Thread(target=heartbeat_timeout_handler)

365

thread.start()

366

thread.join()

367

```

368

369

### Comprehensive Error Handler

370

371

```python

372

import pika

373

import pika.exceptions

374

import logging

375

376

logging.basicConfig(level=logging.INFO)

377

logger = logging.getLogger(__name__)

378

379

class RobustConnection:

380

def __init__(self, parameters):

381

self.parameters = parameters

382

self.connection = None

383

self.channel = None

384

385

def connect(self):

386

try:

387

self.connection = pika.BlockingConnection(self.parameters)

388

self.channel = self.connection.channel()

389

logger.info("Connected successfully")

390

return True

391

392

except pika.exceptions.AuthenticationError:

393

logger.error("Authentication failed - check credentials")

394

except pika.exceptions.ProbableAccessDeniedError:

395

logger.error("Access denied - check virtual host permissions")

396

except pika.exceptions.IncompatibleProtocolError:

397

logger.error("Protocol version mismatch")

398

except pika.exceptions.StreamLostError:

399

logger.error("Network connection lost")

400

except pika.exceptions.AMQPConnectionError as e:

401

logger.error(f"Connection error: {e}")

402

except Exception as e:

403

logger.error(f"Unexpected error: {e}")

404

405

return False

406

407

def publish_safe(self, exchange, routing_key, body, properties=None):

408

if not self.channel or self.channel.is_closed:

409

if not self.connect():

410

return False

411

412

try:

413

self.channel.basic_publish(

414

exchange=exchange,

415

routing_key=routing_key,

416

body=body,

417

properties=properties

418

)

419

return True

420

421

except pika.exceptions.ChannelClosedByBroker as e:

422

logger.error(f"Channel closed by broker: {e.reply_code} - {e.reply_text}")

423

# Try to create new channel

424

try:

425

self.channel = self.connection.channel()

426

return self.publish_safe(exchange, routing_key, body, properties)

427

except Exception:

428

return False

429

430

except pika.exceptions.ConnectionClosed:

431

logger.error("Connection closed, attempting reconnection")

432

if self.connect():

433

return self.publish_safe(exchange, routing_key, body, properties)

434

return False

435

436

except pika.exceptions.AMQPError as e:

437

logger.error(f"AMQP error during publish: {e}")

438

return False

439

440

except Exception as e:

441

logger.error(f"Unexpected error during publish: {e}")

442

return False

443

444

def close(self):

445

try:

446

if self.connection and not self.connection.is_closed:

447

self.connection.close()

448

except Exception as e:

449

logger.error(f"Error closing connection: {e}")

450

451

# Usage

452

parameters = pika.ConnectionParameters('localhost')

453

robust_conn = RobustConnection(parameters)

454

455

if robust_conn.publish_safe('', 'test_queue', 'Hello World!'):

456

print("Message published successfully")

457

else:

458

print("Failed to publish message")

459

460

robust_conn.close()

461

```

462

463

### Exception Information Extraction

464

465

```python

466

import pika

467

import pika.exceptions

468

469

def handle_connection_error(error):

470

"""Extract detailed information from connection errors."""

471

472

if isinstance(error, pika.exceptions.ConnectionClosed):

473

print(f"Connection closed:")

474

print(f" Reply code: {error.reply_code}")

475

print(f" Reply text: {error.reply_text}")

476

477

if isinstance(error, pika.exceptions.ConnectionClosedByBroker):

478

print(" Closed by: Broker")

479

elif isinstance(error, pika.exceptions.ConnectionClosedByClient):

480

print(" Closed by: Client")

481

482

elif isinstance(error, pika.exceptions.AuthenticationError):

483

print(f"Authentication failed: {error}")

484

485

elif isinstance(error, pika.exceptions.StreamLostError):

486

print(f"Network connection lost: {error}")

487

488

elif isinstance(error, pika.exceptions.AMQPHeartbeatTimeout):

489

print(f"Heartbeat timeout: {error}")

490

491

else:

492

print(f"Other connection error: {type(error).__name__}: {error}")

493

494

# Example usage

495

try:

496

connection = pika.BlockingConnection(

497

pika.ConnectionParameters('localhost', credentials=pika.PlainCredentials('wrong', 'creds'))

498

)

499

except pika.exceptions.AMQPConnectionError as e:

500

handle_connection_error(e)

501

```