Pure Python implementation of the AMQP 0.9.1 protocol including RabbitMQ's extensions
—
Comprehensive exception hierarchy for connection, channel, and protocol errors with detailed error information and recovery patterns for robust AMQP client error handling.
Core exception classes for AMQP-related errors.
class AMQPError(Exception):
"""Base class for all AMQP-related errors."""
class AMQPConnectionError(AMQPError):
"""Base class for connection-related errors."""
class AMQPChannelError(AMQPError):
"""Base class for channel-related errors."""Exceptions related to connection establishment, maintenance, and closure.
class ConnectionOpenAborted(AMQPConnectionError):
"""Client closed connection while opening."""
class StreamLostError(AMQPConnectionError):
"""Stream (TCP) connection lost."""
class IncompatibleProtocolError(AMQPConnectionError):
"""The protocol returned by the server is not supported."""
class AuthenticationError(AMQPConnectionError):
"""Server and client could not negotiate authentication mechanism."""
class ProbableAuthenticationError(AMQPConnectionError):
"""Client was disconnected at a connection stage indicating probable authentication error."""
class ProbableAccessDeniedError(AMQPConnectionError):
"""Client was disconnected indicating probable denial of access to virtual host."""
class NoFreeChannels(AMQPConnectionError):
"""The connection has run out of free channels."""
class ConnectionWrongStateError(AMQPConnectionError):
"""Connection is in wrong state for the requested operation."""
class ConnectionClosed(AMQPConnectionError):
"""Connection closed by broker or client."""
def __init__(self, reply_code, reply_text):
"""
Parameters:
- reply_code (int): AMQP reply code for closure
- reply_text (str): Human-readable closure reason
"""
@property
def reply_code(self) -> int:
"""AMQP reply code for connection closure."""
@property
def reply_text(self) -> str:
"""Human-readable reason for connection closure."""
class ConnectionClosedByBroker(ConnectionClosed):
"""Connection.Close from broker."""
class ConnectionClosedByClient(ConnectionClosed):
"""Connection was closed at request of Pika client."""
class ConnectionBlockedTimeout(AMQPConnectionError):
"""RabbitMQ-specific: timed out waiting for connection.unblocked."""
class AMQPHeartbeatTimeout(AMQPConnectionError):
"""Connection was dropped as result of heartbeat timeout."""Exceptions related to channel operations and state management.
class ChannelWrongStateError(AMQPChannelError):
"""Channel is in wrong state for the requested operation."""
class ChannelClosed(AMQPChannelError):
"""The channel closed by client or by broker."""
def __init__(self, reply_code, reply_text):
"""
Parameters:
- reply_code (int): AMQP reply code for channel closure
- reply_text (str): Human-readable closure reason
"""
@property
def reply_code(self) -> int:
"""AMQP reply code for channel closure."""
@property
def reply_text(self) -> str:
"""Human-readable reason for channel closure."""
class ChannelClosedByBroker(ChannelClosed):
"""Channel.Close from broker; may be passed as reason to channel's on-closed callback."""
class ChannelClosedByClient(ChannelClosed):
"""Channel closed by client upon receipt of Channel.CloseOk."""
class DuplicateConsumerTag(AMQPChannelError):
"""The consumer tag specified already exists for this channel."""
class ConsumerCancelled(AMQPChannelError):
"""Server cancelled consumer."""
class UnroutableError(AMQPChannelError):
"""Exception containing one or more unroutable messages returned by broker via Basic.Return."""
def __init__(self, messages):
"""
Parameters:
- messages (list): Sequence of returned unroutable messages
"""
@property
def messages(self) -> list:
"""List of unroutable returned messages."""
class NackError(AMQPChannelError):
"""Exception raised when a message published in publisher-acknowledgements mode is Nack'ed by the broker."""
def __init__(self, messages):
"""
Parameters:
- messages (list): Sequence of nacked messages
"""
@property
def messages(self) -> list:
"""List of nacked messages."""Exceptions related to AMQP protocol handling and frame processing.
class ProtocolSyntaxError(AMQPError):
"""An unspecified protocol syntax error occurred."""
class UnexpectedFrameError(ProtocolSyntaxError):
"""Received a frame out of sequence."""
class ProtocolVersionMismatch(ProtocolSyntaxError):
"""Protocol versions did not match."""
class BodyTooLongError(ProtocolSyntaxError):
"""Received too many bytes for a message delivery."""
class InvalidFrameError(ProtocolSyntaxError):
"""Invalid frame received."""
class InvalidFieldTypeException(ProtocolSyntaxError):
"""Unsupported field kind."""
class UnsupportedAMQPFieldException(ProtocolSyntaxError):
"""Unsupported AMQP field kind."""
class InvalidChannelNumber(AMQPError):
"""An invalid channel number has been specified."""
class MethodNotImplemented(AMQPError):
"""AMQP method not implemented."""
class ShortStringTooLong(AMQPError):
"""AMQP Short String can contain up to 255 bytes."""Exceptions specific to BlockingConnection usage patterns.
class ChannelError(Exception):
"""An unspecified error occurred with the Channel."""
class ReentrancyError(Exception):
"""The requested operation would result in unsupported recursion or reentrancy."""
class DuplicateGetOkCallback(ChannelError):
"""basic_get can only be called again after the callback for the previous basic_get is executed."""Data structures for handling returned and nacked messages.
class ReturnedMessage:
"""Represents an unroutable message returned by the broker."""
@property
def method(self):
"""Basic.Return method frame."""
@property
def properties(self):
"""Message properties (BasicProperties)."""
@property
def body(self) -> bytes:
"""Message body."""import pika
import pika.exceptions
try:
connection = pika.BlockingConnection(
pika.ConnectionParameters('nonexistent-host')
)
except pika.exceptions.AMQPConnectionError as e:
print(f"Failed to connect: {e}")
except Exception as e:
print(f"Unexpected error: {e}")import pika
import pika.exceptions
import time
def connect_with_retry(parameters, max_retries=5):
for attempt in range(max_retries):
try:
connection = pika.BlockingConnection(parameters)
print("Connected successfully")
return connection
except pika.exceptions.AMQPConnectionError as e:
print(f"Connection attempt {attempt + 1} failed: {e}")
if isinstance(e, pika.exceptions.AuthenticationError):
print("Authentication failed - check credentials")
break
elif isinstance(e, pika.exceptions.ProbableAccessDeniedError):
print("Access denied - check virtual host permissions")
break
elif attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
except Exception as e:
print(f"Unexpected error: {e}")
break
raise pika.exceptions.AMQPConnectionError("Failed to connect after retries")
# Usage
parameters = pika.ConnectionParameters('localhost')
connection = connect_with_retry(parameters)import pika
import pika.exceptions
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
try:
# This might fail if queue doesn't exist
channel.queue_delete('nonexistent_queue')
except pika.exceptions.ChannelClosedByBroker as e:
print(f"Broker closed channel: {e.reply_code} - {e.reply_text}")
# Create new channel to continue
channel = connection.channel()
except pika.exceptions.ChannelError as e:
print(f"Channel error: {e}")
connection.close()import pika
import pika.exceptions
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Enable publisher confirms
channel.confirm_delivery()
try:
channel.basic_publish(
exchange='nonexistent_exchange',
routing_key='test',
body='Hello World!',
mandatory=True
)
print("Message published successfully")
except pika.exceptions.UnroutableError as e:
print(f"Message was unroutable: {len(e.messages)} messages returned")
for returned_message in e.messages:
print(f" Returned: {returned_message.body}")
except pika.exceptions.NackError as e:
print(f"Message was nacked: {len(e.messages)} messages")
for nacked_message in e.messages:
print(f" Nacked: {nacked_message.body}")
connection.close()import pika
import pika.exceptions
import threading
import time
def heartbeat_timeout_handler():
parameters = pika.ConnectionParameters(
'localhost',
heartbeat=5 # 5 second heartbeat
)
try:
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# Simulate blocking operation that prevents heartbeat
print("Simulating long operation (no heartbeats)...")
time.sleep(10) # This will cause heartbeat timeout
channel.basic_publish(exchange='', routing_key='test', body='Hello')
except pika.exceptions.AMQPHeartbeatTimeout as e:
print(f"Heartbeat timeout occurred: {e}")
except pika.exceptions.StreamLostError as e:
print(f"Connection lost: {e}")
except Exception as e:
print(f"Other error: {e}")
# Run in thread to prevent blocking
thread = threading.Thread(target=heartbeat_timeout_handler)
thread.start()
thread.join()import pika
import pika.exceptions
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RobustConnection:
def __init__(self, parameters):
self.parameters = parameters
self.connection = None
self.channel = None
def connect(self):
try:
self.connection = pika.BlockingConnection(self.parameters)
self.channel = self.connection.channel()
logger.info("Connected successfully")
return True
except pika.exceptions.AuthenticationError:
logger.error("Authentication failed - check credentials")
except pika.exceptions.ProbableAccessDeniedError:
logger.error("Access denied - check virtual host permissions")
except pika.exceptions.IncompatibleProtocolError:
logger.error("Protocol version mismatch")
except pika.exceptions.StreamLostError:
logger.error("Network connection lost")
except pika.exceptions.AMQPConnectionError as e:
logger.error(f"Connection error: {e}")
except Exception as e:
logger.error(f"Unexpected error: {e}")
return False
def publish_safe(self, exchange, routing_key, body, properties=None):
if not self.channel or self.channel.is_closed:
if not self.connect():
return False
try:
self.channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=body,
properties=properties
)
return True
except pika.exceptions.ChannelClosedByBroker as e:
logger.error(f"Channel closed by broker: {e.reply_code} - {e.reply_text}")
# Try to create new channel
try:
self.channel = self.connection.channel()
return self.publish_safe(exchange, routing_key, body, properties)
except Exception:
return False
except pika.exceptions.ConnectionClosed:
logger.error("Connection closed, attempting reconnection")
if self.connect():
return self.publish_safe(exchange, routing_key, body, properties)
return False
except pika.exceptions.AMQPError as e:
logger.error(f"AMQP error during publish: {e}")
return False
except Exception as e:
logger.error(f"Unexpected error during publish: {e}")
return False
def close(self):
try:
if self.connection and not self.connection.is_closed:
self.connection.close()
except Exception as e:
logger.error(f"Error closing connection: {e}")
# Usage
parameters = pika.ConnectionParameters('localhost')
robust_conn = RobustConnection(parameters)
if robust_conn.publish_safe('', 'test_queue', 'Hello World!'):
print("Message published successfully")
else:
print("Failed to publish message")
robust_conn.close()import pika
import pika.exceptions
def handle_connection_error(error):
"""Extract detailed information from connection errors."""
if isinstance(error, pika.exceptions.ConnectionClosed):
print(f"Connection closed:")
print(f" Reply code: {error.reply_code}")
print(f" Reply text: {error.reply_text}")
if isinstance(error, pika.exceptions.ConnectionClosedByBroker):
print(" Closed by: Broker")
elif isinstance(error, pika.exceptions.ConnectionClosedByClient):
print(" Closed by: Client")
elif isinstance(error, pika.exceptions.AuthenticationError):
print(f"Authentication failed: {error}")
elif isinstance(error, pika.exceptions.StreamLostError):
print(f"Network connection lost: {error}")
elif isinstance(error, pika.exceptions.AMQPHeartbeatTimeout):
print(f"Heartbeat timeout: {error}")
else:
print(f"Other connection error: {type(error).__name__}: {error}")
# Example usage
try:
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost', credentials=pika.PlainCredentials('wrong', 'creds'))
)
except pika.exceptions.AMQPConnectionError as e:
handle_connection_error(e)Install with Tessl CLI
npx tessl i tessl/pypi-pika