CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pika

Pure Python implementation of the AMQP 0.9.1 protocol including RabbitMQ's extensions

Pending
Overview
Eval results
Files

exception-handling.mddocs/

Exception Handling

Comprehensive exception hierarchy for connection, channel, and protocol errors with detailed error information and recovery patterns for robust AMQP client error handling.

Capabilities

Base AMQP Exceptions

Core exception classes for AMQP-related errors.

class AMQPError(Exception):
    """Base class for all AMQP-related errors."""

class AMQPConnectionError(AMQPError):
    """Base class for connection-related errors."""

class AMQPChannelError(AMQPError):
    """Base class for channel-related errors."""

Connection Exceptions

Exceptions related to connection establishment, maintenance, and closure.

class ConnectionOpenAborted(AMQPConnectionError):
    """Client closed connection while opening."""

class StreamLostError(AMQPConnectionError):
    """Stream (TCP) connection lost."""

class IncompatibleProtocolError(AMQPConnectionError):
    """The protocol returned by the server is not supported."""

class AuthenticationError(AMQPConnectionError):
    """Server and client could not negotiate authentication mechanism."""

class ProbableAuthenticationError(AMQPConnectionError):
    """Client was disconnected at a connection stage indicating probable authentication error."""

class ProbableAccessDeniedError(AMQPConnectionError):
    """Client was disconnected indicating probable denial of access to virtual host."""

class NoFreeChannels(AMQPConnectionError):
    """The connection has run out of free channels."""

class ConnectionWrongStateError(AMQPConnectionError):
    """Connection is in wrong state for the requested operation."""

class ConnectionClosed(AMQPConnectionError):
    """Connection closed by broker or client."""
    
    def __init__(self, reply_code, reply_text):
        """
        Parameters:
        - reply_code (int): AMQP reply code for closure
        - reply_text (str): Human-readable closure reason
        """
    
    @property
    def reply_code(self) -> int:
        """AMQP reply code for connection closure."""
    
    @property
    def reply_text(self) -> str:
        """Human-readable reason for connection closure."""

class ConnectionClosedByBroker(ConnectionClosed):
    """Connection.Close from broker."""

class ConnectionClosedByClient(ConnectionClosed):
    """Connection was closed at request of Pika client."""

class ConnectionBlockedTimeout(AMQPConnectionError):
    """RabbitMQ-specific: timed out waiting for connection.unblocked."""

class AMQPHeartbeatTimeout(AMQPConnectionError):
    """Connection was dropped as result of heartbeat timeout."""

Channel Exceptions

Exceptions related to channel operations and state management.

class ChannelWrongStateError(AMQPChannelError):
    """Channel is in wrong state for the requested operation."""

class ChannelClosed(AMQPChannelError):
    """The channel closed by client or by broker."""
    
    def __init__(self, reply_code, reply_text):
        """
        Parameters:
        - reply_code (int): AMQP reply code for channel closure
        - reply_text (str): Human-readable closure reason
        """
    
    @property
    def reply_code(self) -> int:
        """AMQP reply code for channel closure."""
    
    @property
    def reply_text(self) -> str:
        """Human-readable reason for channel closure."""

class ChannelClosedByBroker(ChannelClosed):
    """Channel.Close from broker; may be passed as reason to channel's on-closed callback."""

class ChannelClosedByClient(ChannelClosed):
    """Channel closed by client upon receipt of Channel.CloseOk."""

class DuplicateConsumerTag(AMQPChannelError):
    """The consumer tag specified already exists for this channel."""

class ConsumerCancelled(AMQPChannelError):
    """Server cancelled consumer."""

class UnroutableError(AMQPChannelError):
    """Exception containing one or more unroutable messages returned by broker via Basic.Return."""
    
    def __init__(self, messages):
        """
        Parameters:
        - messages (list): Sequence of returned unroutable messages
        """
    
    @property
    def messages(self) -> list:
        """List of unroutable returned messages."""

class NackError(AMQPChannelError):
    """Exception raised when a message published in publisher-acknowledgements mode is Nack'ed by the broker."""
    
    def __init__(self, messages):
        """
        Parameters:
        - messages (list): Sequence of nacked messages
        """
    
    @property
    def messages(self) -> list:
        """List of nacked messages."""

Protocol Exceptions

Exceptions related to AMQP protocol handling and frame processing.

class ProtocolSyntaxError(AMQPError):
    """An unspecified protocol syntax error occurred."""

class UnexpectedFrameError(ProtocolSyntaxError):
    """Received a frame out of sequence."""

class ProtocolVersionMismatch(ProtocolSyntaxError):
    """Protocol versions did not match."""

class BodyTooLongError(ProtocolSyntaxError):
    """Received too many bytes for a message delivery."""

class InvalidFrameError(ProtocolSyntaxError):
    """Invalid frame received."""

class InvalidFieldTypeException(ProtocolSyntaxError):
    """Unsupported field kind."""

class UnsupportedAMQPFieldException(ProtocolSyntaxError):
    """Unsupported AMQP field kind."""

class InvalidChannelNumber(AMQPError):
    """An invalid channel number has been specified."""

class MethodNotImplemented(AMQPError):
    """AMQP method not implemented."""

class ShortStringTooLong(AMQPError):
    """AMQP Short String can contain up to 255 bytes."""

Blocking Connection Exceptions

Exceptions specific to BlockingConnection usage patterns.

class ChannelError(Exception):
    """An unspecified error occurred with the Channel."""

class ReentrancyError(Exception):
    """The requested operation would result in unsupported recursion or reentrancy."""

class DuplicateGetOkCallback(ChannelError):
    """basic_get can only be called again after the callback for the previous basic_get is executed."""

Returned Message Types

Data structures for handling returned and nacked messages.

class ReturnedMessage:
    """Represents an unroutable message returned by the broker."""
    
    @property
    def method(self):
        """Basic.Return method frame."""
    
    @property
    def properties(self):
        """Message properties (BasicProperties)."""
    
    @property
    def body(self) -> bytes:
        """Message body."""

Usage Examples

Basic Exception Handling

import pika
import pika.exceptions

try:
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('nonexistent-host')
    )
except pika.exceptions.AMQPConnectionError as e:
    print(f"Failed to connect: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")

Connection Error Recovery

import pika
import pika.exceptions
import time

def connect_with_retry(parameters, max_retries=5):
    for attempt in range(max_retries):
        try:
            connection = pika.BlockingConnection(parameters)
            print("Connected successfully")
            return connection
            
        except pika.exceptions.AMQPConnectionError as e:
            print(f"Connection attempt {attempt + 1} failed: {e}")
            
            if isinstance(e, pika.exceptions.AuthenticationError):
                print("Authentication failed - check credentials")
                break
            elif isinstance(e, pika.exceptions.ProbableAccessDeniedError):
                print("Access denied - check virtual host permissions")
                break
            elif attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # Exponential backoff
            
        except Exception as e:
            print(f"Unexpected error: {e}")
            break
    
    raise pika.exceptions.AMQPConnectionError("Failed to connect after retries")

# Usage
parameters = pika.ConnectionParameters('localhost')
connection = connect_with_retry(parameters)

Channel Exception Handling

import pika
import pika.exceptions

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

try:
    # This might fail if queue doesn't exist
    channel.queue_delete('nonexistent_queue')
    
except pika.exceptions.ChannelClosedByBroker as e:
    print(f"Broker closed channel: {e.reply_code} - {e.reply_text}")
    
    # Create new channel to continue
    channel = connection.channel()
    
except pika.exceptions.ChannelError as e:
    print(f"Channel error: {e}")

connection.close()

Publisher Confirms Error Handling

import pika
import pika.exceptions

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Enable publisher confirms
channel.confirm_delivery()

try:
    channel.basic_publish(
        exchange='nonexistent_exchange',
        routing_key='test',
        body='Hello World!',
        mandatory=True
    )
    print("Message published successfully")
    
except pika.exceptions.UnroutableError as e:
    print(f"Message was unroutable: {len(e.messages)} messages returned")
    for returned_message in e.messages:
        print(f"  Returned: {returned_message.body}")
        
except pika.exceptions.NackError as e:
    print(f"Message was nacked: {len(e.messages)} messages")
    for nacked_message in e.messages:
        print(f"  Nacked: {nacked_message.body}")

connection.close()

Heartbeat Timeout Handling

import pika
import pika.exceptions
import threading
import time

def heartbeat_timeout_handler():
    parameters = pika.ConnectionParameters(
        'localhost',
        heartbeat=5  # 5 second heartbeat
    )
    
    try:
        connection = pika.BlockingConnection(parameters)
        channel = connection.channel()
        
        # Simulate blocking operation that prevents heartbeat
        print("Simulating long operation (no heartbeats)...")
        time.sleep(10)  # This will cause heartbeat timeout
        
        channel.basic_publish(exchange='', routing_key='test', body='Hello')
        
    except pika.exceptions.AMQPHeartbeatTimeout as e:
        print(f"Heartbeat timeout occurred: {e}")
    except pika.exceptions.StreamLostError as e:
        print(f"Connection lost: {e}")
    except Exception as e:
        print(f"Other error: {e}")

# Run in thread to prevent blocking
thread = threading.Thread(target=heartbeat_timeout_handler)
thread.start()
thread.join()

Comprehensive Error Handler

import pika
import pika.exceptions
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class RobustConnection:
    def __init__(self, parameters):
        self.parameters = parameters
        self.connection = None
        self.channel = None
    
    def connect(self):
        try:
            self.connection = pika.BlockingConnection(self.parameters)
            self.channel = self.connection.channel()
            logger.info("Connected successfully")
            return True
            
        except pika.exceptions.AuthenticationError:
            logger.error("Authentication failed - check credentials")
        except pika.exceptions.ProbableAccessDeniedError:
            logger.error("Access denied - check virtual host permissions")
        except pika.exceptions.IncompatibleProtocolError:
            logger.error("Protocol version mismatch")
        except pika.exceptions.StreamLostError:
            logger.error("Network connection lost")
        except pika.exceptions.AMQPConnectionError as e:
            logger.error(f"Connection error: {e}")
        except Exception as e:
            logger.error(f"Unexpected error: {e}")
        
        return False
    
    def publish_safe(self, exchange, routing_key, body, properties=None):
        if not self.channel or self.channel.is_closed:
            if not self.connect():
                return False
        
        try:
            self.channel.basic_publish(
                exchange=exchange,
                routing_key=routing_key,
                body=body,
                properties=properties
            )
            return True
            
        except pika.exceptions.ChannelClosedByBroker as e:
            logger.error(f"Channel closed by broker: {e.reply_code} - {e.reply_text}")
            # Try to create new channel
            try:
                self.channel = self.connection.channel()
                return self.publish_safe(exchange, routing_key, body, properties)
            except Exception:
                return False
                
        except pika.exceptions.ConnectionClosed:
            logger.error("Connection closed, attempting reconnection")
            if self.connect():
                return self.publish_safe(exchange, routing_key, body, properties)
            return False
            
        except pika.exceptions.AMQPError as e:
            logger.error(f"AMQP error during publish: {e}")
            return False
        
        except Exception as e:
            logger.error(f"Unexpected error during publish: {e}")
            return False
    
    def close(self):
        try:
            if self.connection and not self.connection.is_closed:
                self.connection.close()
        except Exception as e:
            logger.error(f"Error closing connection: {e}")

# Usage
parameters = pika.ConnectionParameters('localhost')
robust_conn = RobustConnection(parameters)

if robust_conn.publish_safe('', 'test_queue', 'Hello World!'):
    print("Message published successfully")
else:
    print("Failed to publish message")

robust_conn.close()

Exception Information Extraction

import pika
import pika.exceptions

def handle_connection_error(error):
    """Extract detailed information from connection errors."""
    
    if isinstance(error, pika.exceptions.ConnectionClosed):
        print(f"Connection closed:")
        print(f"  Reply code: {error.reply_code}")
        print(f"  Reply text: {error.reply_text}")
        
        if isinstance(error, pika.exceptions.ConnectionClosedByBroker):
            print("  Closed by: Broker")
        elif isinstance(error, pika.exceptions.ConnectionClosedByClient):
            print("  Closed by: Client")
            
    elif isinstance(error, pika.exceptions.AuthenticationError):
        print(f"Authentication failed: {error}")
        
    elif isinstance(error, pika.exceptions.StreamLostError):
        print(f"Network connection lost: {error}")
        
    elif isinstance(error, pika.exceptions.AMQPHeartbeatTimeout):
        print(f"Heartbeat timeout: {error}")
        
    else:
        print(f"Other connection error: {type(error).__name__}: {error}")

# Example usage
try:
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost', credentials=pika.PlainCredentials('wrong', 'creds'))
    )
except pika.exceptions.AMQPConnectionError as e:
    handle_connection_error(e)

Install with Tessl CLI

npx tessl i tessl/pypi-pika

docs

authentication-security.md

channel-operations.md

connection-adapters.md

connection-management.md

exception-handling.md

index.md

message-properties-types.md

tile.json