0
# Exception Handling
1
2
Comprehensive exception hierarchy for connection, channel, and protocol errors with detailed error information and recovery patterns for robust AMQP client error handling.
3
4
## Capabilities
5
6
### Base AMQP Exceptions
7
8
Core exception classes for AMQP-related errors.
9
10
```python { .api }
11
class AMQPError(Exception):
12
"""Base class for all AMQP-related errors."""
13
14
class AMQPConnectionError(AMQPError):
15
"""Base class for connection-related errors."""
16
17
class AMQPChannelError(AMQPError):
18
"""Base class for channel-related errors."""
19
```
20
21
### Connection Exceptions
22
23
Exceptions related to connection establishment, maintenance, and closure.
24
25
```python { .api }
26
class ConnectionOpenAborted(AMQPConnectionError):
27
"""Client closed connection while opening."""
28
29
class StreamLostError(AMQPConnectionError):
30
"""Stream (TCP) connection lost."""
31
32
class IncompatibleProtocolError(AMQPConnectionError):
33
"""The protocol returned by the server is not supported."""
34
35
class AuthenticationError(AMQPConnectionError):
36
"""Server and client could not negotiate authentication mechanism."""
37
38
class ProbableAuthenticationError(AMQPConnectionError):
39
"""Client was disconnected at a connection stage indicating probable authentication error."""
40
41
class ProbableAccessDeniedError(AMQPConnectionError):
42
"""Client was disconnected indicating probable denial of access to virtual host."""
43
44
class NoFreeChannels(AMQPConnectionError):
45
"""The connection has run out of free channels."""
46
47
class ConnectionWrongStateError(AMQPConnectionError):
48
"""Connection is in wrong state for the requested operation."""
49
50
class ConnectionClosed(AMQPConnectionError):
51
"""Connection closed by broker or client."""
52
53
def __init__(self, reply_code, reply_text):
54
"""
55
Parameters:
56
- reply_code (int): AMQP reply code for closure
57
- reply_text (str): Human-readable closure reason
58
"""
59
60
@property
61
def reply_code(self) -> int:
62
"""AMQP reply code for connection closure."""
63
64
@property
65
def reply_text(self) -> str:
66
"""Human-readable reason for connection closure."""
67
68
class ConnectionClosedByBroker(ConnectionClosed):
69
"""Connection.Close from broker."""
70
71
class ConnectionClosedByClient(ConnectionClosed):
72
"""Connection was closed at request of Pika client."""
73
74
class ConnectionBlockedTimeout(AMQPConnectionError):
75
"""RabbitMQ-specific: timed out waiting for connection.unblocked."""
76
77
class AMQPHeartbeatTimeout(AMQPConnectionError):
78
"""Connection was dropped as result of heartbeat timeout."""
79
```
80
81
### Channel Exceptions
82
83
Exceptions related to channel operations and state management.
84
85
```python { .api }
86
class ChannelWrongStateError(AMQPChannelError):
87
"""Channel is in wrong state for the requested operation."""
88
89
class ChannelClosed(AMQPChannelError):
90
"""The channel closed by client or by broker."""
91
92
def __init__(self, reply_code, reply_text):
93
"""
94
Parameters:
95
- reply_code (int): AMQP reply code for channel closure
96
- reply_text (str): Human-readable closure reason
97
"""
98
99
@property
100
def reply_code(self) -> int:
101
"""AMQP reply code for channel closure."""
102
103
@property
104
def reply_text(self) -> str:
105
"""Human-readable reason for channel closure."""
106
107
class ChannelClosedByBroker(ChannelClosed):
108
"""Channel.Close from broker; may be passed as reason to channel's on-closed callback."""
109
110
class ChannelClosedByClient(ChannelClosed):
111
"""Channel closed by client upon receipt of Channel.CloseOk."""
112
113
class DuplicateConsumerTag(AMQPChannelError):
114
"""The consumer tag specified already exists for this channel."""
115
116
class ConsumerCancelled(AMQPChannelError):
117
"""Server cancelled consumer."""
118
119
class UnroutableError(AMQPChannelError):
120
"""Exception containing one or more unroutable messages returned by broker via Basic.Return."""
121
122
def __init__(self, messages):
123
"""
124
Parameters:
125
- messages (list): Sequence of returned unroutable messages
126
"""
127
128
@property
129
def messages(self) -> list:
130
"""List of unroutable returned messages."""
131
132
class NackError(AMQPChannelError):
133
"""Exception raised when a message published in publisher-acknowledgements mode is Nack'ed by the broker."""
134
135
def __init__(self, messages):
136
"""
137
Parameters:
138
- messages (list): Sequence of nacked messages
139
"""
140
141
@property
142
def messages(self) -> list:
143
"""List of nacked messages."""
144
```
145
146
### Protocol Exceptions
147
148
Exceptions related to AMQP protocol handling and frame processing.
149
150
```python { .api }
151
class ProtocolSyntaxError(AMQPError):
152
"""An unspecified protocol syntax error occurred."""
153
154
class UnexpectedFrameError(ProtocolSyntaxError):
155
"""Received a frame out of sequence."""
156
157
class ProtocolVersionMismatch(ProtocolSyntaxError):
158
"""Protocol versions did not match."""
159
160
class BodyTooLongError(ProtocolSyntaxError):
161
"""Received too many bytes for a message delivery."""
162
163
class InvalidFrameError(ProtocolSyntaxError):
164
"""Invalid frame received."""
165
166
class InvalidFieldTypeException(ProtocolSyntaxError):
167
"""Unsupported field kind."""
168
169
class UnsupportedAMQPFieldException(ProtocolSyntaxError):
170
"""Unsupported AMQP field kind."""
171
172
class InvalidChannelNumber(AMQPError):
173
"""An invalid channel number has been specified."""
174
175
class MethodNotImplemented(AMQPError):
176
"""AMQP method not implemented."""
177
178
class ShortStringTooLong(AMQPError):
179
"""AMQP Short String can contain up to 255 bytes."""
180
```
181
182
### Blocking Connection Exceptions
183
184
Exceptions specific to BlockingConnection usage patterns.
185
186
```python { .api }
187
class ChannelError(Exception):
188
"""An unspecified error occurred with the Channel."""
189
190
class ReentrancyError(Exception):
191
"""The requested operation would result in unsupported recursion or reentrancy."""
192
193
class DuplicateGetOkCallback(ChannelError):
194
"""basic_get can only be called again after the callback for the previous basic_get is executed."""
195
```
196
197
### Returned Message Types
198
199
Data structures for handling returned and nacked messages.
200
201
```python { .api }
202
class ReturnedMessage:
203
"""Represents an unroutable message returned by the broker."""
204
205
@property
206
def method(self):
207
"""Basic.Return method frame."""
208
209
@property
210
def properties(self):
211
"""Message properties (BasicProperties)."""
212
213
@property
214
def body(self) -> bytes:
215
"""Message body."""
216
```
217
218
## Usage Examples
219
220
### Basic Exception Handling
221
222
```python
223
import pika
224
import pika.exceptions
225
226
try:
227
connection = pika.BlockingConnection(
228
pika.ConnectionParameters('nonexistent-host')
229
)
230
except pika.exceptions.AMQPConnectionError as e:
231
print(f"Failed to connect: {e}")
232
except Exception as e:
233
print(f"Unexpected error: {e}")
234
```
235
236
### Connection Error Recovery
237
238
```python
239
import pika
240
import pika.exceptions
241
import time
242
243
def connect_with_retry(parameters, max_retries=5):
244
for attempt in range(max_retries):
245
try:
246
connection = pika.BlockingConnection(parameters)
247
print("Connected successfully")
248
return connection
249
250
except pika.exceptions.AMQPConnectionError as e:
251
print(f"Connection attempt {attempt + 1} failed: {e}")
252
253
if isinstance(e, pika.exceptions.AuthenticationError):
254
print("Authentication failed - check credentials")
255
break
256
elif isinstance(e, pika.exceptions.ProbableAccessDeniedError):
257
print("Access denied - check virtual host permissions")
258
break
259
elif attempt < max_retries - 1:
260
time.sleep(2 ** attempt) # Exponential backoff
261
262
except Exception as e:
263
print(f"Unexpected error: {e}")
264
break
265
266
raise pika.exceptions.AMQPConnectionError("Failed to connect after retries")
267
268
# Usage
269
parameters = pika.ConnectionParameters('localhost')
270
connection = connect_with_retry(parameters)
271
```
272
273
### Channel Exception Handling
274
275
```python
276
import pika
277
import pika.exceptions
278
279
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
280
channel = connection.channel()
281
282
try:
283
# This might fail if queue doesn't exist
284
channel.queue_delete('nonexistent_queue')
285
286
except pika.exceptions.ChannelClosedByBroker as e:
287
print(f"Broker closed channel: {e.reply_code} - {e.reply_text}")
288
289
# Create new channel to continue
290
channel = connection.channel()
291
292
except pika.exceptions.ChannelError as e:
293
print(f"Channel error: {e}")
294
295
connection.close()
296
```
297
298
### Publisher Confirms Error Handling
299
300
```python
301
import pika
302
import pika.exceptions
303
304
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
305
channel = connection.channel()
306
307
# Enable publisher confirms
308
channel.confirm_delivery()
309
310
try:
311
channel.basic_publish(
312
exchange='nonexistent_exchange',
313
routing_key='test',
314
body='Hello World!',
315
mandatory=True
316
)
317
print("Message published successfully")
318
319
except pika.exceptions.UnroutableError as e:
320
print(f"Message was unroutable: {len(e.messages)} messages returned")
321
for returned_message in e.messages:
322
print(f" Returned: {returned_message.body}")
323
324
except pika.exceptions.NackError as e:
325
print(f"Message was nacked: {len(e.messages)} messages")
326
for nacked_message in e.messages:
327
print(f" Nacked: {nacked_message.body}")
328
329
connection.close()
330
```
331
332
### Heartbeat Timeout Handling
333
334
```python
335
import pika
336
import pika.exceptions
337
import threading
338
import time
339
340
def heartbeat_timeout_handler():
341
parameters = pika.ConnectionParameters(
342
'localhost',
343
heartbeat=5 # 5 second heartbeat
344
)
345
346
try:
347
connection = pika.BlockingConnection(parameters)
348
channel = connection.channel()
349
350
# Simulate blocking operation that prevents heartbeat
351
print("Simulating long operation (no heartbeats)...")
352
time.sleep(10) # This will cause heartbeat timeout
353
354
channel.basic_publish(exchange='', routing_key='test', body='Hello')
355
356
except pika.exceptions.AMQPHeartbeatTimeout as e:
357
print(f"Heartbeat timeout occurred: {e}")
358
except pika.exceptions.StreamLostError as e:
359
print(f"Connection lost: {e}")
360
except Exception as e:
361
print(f"Other error: {e}")
362
363
# Run in thread to prevent blocking
364
thread = threading.Thread(target=heartbeat_timeout_handler)
365
thread.start()
366
thread.join()
367
```
368
369
### Comprehensive Error Handler
370
371
```python
372
import pika
373
import pika.exceptions
374
import logging
375
376
logging.basicConfig(level=logging.INFO)
377
logger = logging.getLogger(__name__)
378
379
class RobustConnection:
380
def __init__(self, parameters):
381
self.parameters = parameters
382
self.connection = None
383
self.channel = None
384
385
def connect(self):
386
try:
387
self.connection = pika.BlockingConnection(self.parameters)
388
self.channel = self.connection.channel()
389
logger.info("Connected successfully")
390
return True
391
392
except pika.exceptions.AuthenticationError:
393
logger.error("Authentication failed - check credentials")
394
except pika.exceptions.ProbableAccessDeniedError:
395
logger.error("Access denied - check virtual host permissions")
396
except pika.exceptions.IncompatibleProtocolError:
397
logger.error("Protocol version mismatch")
398
except pika.exceptions.StreamLostError:
399
logger.error("Network connection lost")
400
except pika.exceptions.AMQPConnectionError as e:
401
logger.error(f"Connection error: {e}")
402
except Exception as e:
403
logger.error(f"Unexpected error: {e}")
404
405
return False
406
407
def publish_safe(self, exchange, routing_key, body, properties=None):
408
if not self.channel or self.channel.is_closed:
409
if not self.connect():
410
return False
411
412
try:
413
self.channel.basic_publish(
414
exchange=exchange,
415
routing_key=routing_key,
416
body=body,
417
properties=properties
418
)
419
return True
420
421
except pika.exceptions.ChannelClosedByBroker as e:
422
logger.error(f"Channel closed by broker: {e.reply_code} - {e.reply_text}")
423
# Try to create new channel
424
try:
425
self.channel = self.connection.channel()
426
return self.publish_safe(exchange, routing_key, body, properties)
427
except Exception:
428
return False
429
430
except pika.exceptions.ConnectionClosed:
431
logger.error("Connection closed, attempting reconnection")
432
if self.connect():
433
return self.publish_safe(exchange, routing_key, body, properties)
434
return False
435
436
except pika.exceptions.AMQPError as e:
437
logger.error(f"AMQP error during publish: {e}")
438
return False
439
440
except Exception as e:
441
logger.error(f"Unexpected error during publish: {e}")
442
return False
443
444
def close(self):
445
try:
446
if self.connection and not self.connection.is_closed:
447
self.connection.close()
448
except Exception as e:
449
logger.error(f"Error closing connection: {e}")
450
451
# Usage
452
parameters = pika.ConnectionParameters('localhost')
453
robust_conn = RobustConnection(parameters)
454
455
if robust_conn.publish_safe('', 'test_queue', 'Hello World!'):
456
print("Message published successfully")
457
else:
458
print("Failed to publish message")
459
460
robust_conn.close()
461
```
462
463
### Exception Information Extraction
464
465
```python
466
import pika
467
import pika.exceptions
468
469
def handle_connection_error(error):
470
"""Extract detailed information from connection errors."""
471
472
if isinstance(error, pika.exceptions.ConnectionClosed):
473
print(f"Connection closed:")
474
print(f" Reply code: {error.reply_code}")
475
print(f" Reply text: {error.reply_text}")
476
477
if isinstance(error, pika.exceptions.ConnectionClosedByBroker):
478
print(" Closed by: Broker")
479
elif isinstance(error, pika.exceptions.ConnectionClosedByClient):
480
print(" Closed by: Client")
481
482
elif isinstance(error, pika.exceptions.AuthenticationError):
483
print(f"Authentication failed: {error}")
484
485
elif isinstance(error, pika.exceptions.StreamLostError):
486
print(f"Network connection lost: {error}")
487
488
elif isinstance(error, pika.exceptions.AMQPHeartbeatTimeout):
489
print(f"Heartbeat timeout: {error}")
490
491
else:
492
print(f"Other connection error: {type(error).__name__}: {error}")
493
494
# Example usage
495
try:
496
connection = pika.BlockingConnection(
497
pika.ConnectionParameters('localhost', credentials=pika.PlainCredentials('wrong', 'creds'))
498
)
499
except pika.exceptions.AMQPConnectionError as e:
500
handle_connection_error(e)
501
```