Pure Python implementation of the AMQP 0.9.1 protocol including RabbitMQ's extensions
—
Channel-based message operations including publishing, consuming, queue and exchange management, and transaction support with comprehensive callback handling for AMQP messaging.
Basic channel lifecycle and flow control operations.
class BlockingChannel:
"""Synchronous channel for message operations."""
def close(self, reply_code=200, reply_text='Normal shutdown'):
"""
Close the channel.
Parameters:
- reply_code (int): AMQP reply code (default: 200)
- reply_text (str): Human-readable close reason
"""
def flow(self, active):
"""
Enable or disable message flow.
Parameters:
- active (bool): True to enable flow, False to disable
Returns:
- bool: Current flow state
"""
def add_on_cancel_callback(self, callback):
"""
Add callback for consumer cancellation.
Parameters:
- callback (callable): Function called when consumer is cancelled
"""
def add_on_return_callback(self, callback):
"""
Add callback for returned messages.
Parameters:
- callback (callable): Function called with (channel, method, properties, body)
"""
# Channel properties
@property
def channel_number(self) -> int:
"""Channel number."""
@property
def connection(self):
"""Parent connection instance."""
@property
def is_closed(self) -> bool:
"""True if channel is closed."""
@property
def is_open(self) -> bool:
"""True if channel is open."""
@property
def consumer_tags(self) -> set:
"""Set of active consumer tags."""Publish messages to exchanges with routing keys and properties.
def basic_publish(self, exchange, routing_key, body, properties=None, mandatory=False):
"""
Publish a message.
Parameters:
- exchange (str): Exchange name (empty string for default exchange)
- routing_key (str): Routing key for message routing
- body (bytes or str): Message body
- properties (BasicProperties): Message properties
- mandatory (bool): If True, message must be routable or returned
Returns:
- bool: True if message was published (or raises exception)
"""Consume messages from queues with callback-based processing.
def basic_consume(self, queue, on_message_callback, auto_ack=False, exclusive=False,
consumer_tag=None, arguments=None):
"""
Start consuming messages from queue.
Parameters:
- queue (str): Queue name to consume from
- on_message_callback (callable): Function called for each message (ch, method, properties, body)
- auto_ack (bool): If True, automatically acknowledge messages
- exclusive (bool): If True, only this consumer can access the queue
- consumer_tag (str): Consumer identifier (auto-generated if None)
- arguments (dict): Additional arguments for consume
Returns:
- str: Consumer tag
"""
def basic_cancel(self, consumer_tag):
"""
Cancel a message consumer.
Parameters:
- consumer_tag (str): Consumer tag to cancel
Returns:
- str: Cancelled consumer tag
"""
def start_consuming(self):
"""Start consuming messages (blocking loop)."""
def stop_consuming(self, consumer_tag=None):
"""
Stop consuming messages.
Parameters:
- consumer_tag (str, optional): Specific consumer to stop (all if None)
"""
def consume(self, queue, no_ack=False, exclusive=False, arguments=None):
"""
Generator-based message consumption.
Parameters:
- queue (str): Queue name to consume from
- no_ack (bool): If True, don't require acknowledgments
- exclusive (bool): If True, exclusive access to queue
- arguments (dict): Additional consume arguments
Yields:
- tuple: (method, properties, body) for each message
"""Acknowledge, reject, or recover messages for reliable delivery.
def basic_ack(self, delivery_tag, multiple=False):
"""
Acknowledge message delivery.
Parameters:
- delivery_tag (int): Delivery tag of message to acknowledge
- multiple (bool): If True, acknowledge all messages up to delivery_tag
"""
def basic_nack(self, delivery_tag, multiple=False, requeue=True):
"""
Negative acknowledgment of message delivery.
Parameters:
- delivery_tag (int): Delivery tag of message to nack
- multiple (bool): If True, nack all messages up to delivery_tag
- requeue (bool): If True, requeue the message(s)
"""
def basic_reject(self, delivery_tag, requeue=True):
"""
Reject a single message.
Parameters:
- delivery_tag (int): Delivery tag of message to reject
- requeue (bool): If True, requeue the message
"""
def basic_recover(self, requeue=True):
"""
Recover unacknowledged messages.
Parameters:
- requeue (bool): If True, requeue unacknowledged messages
"""Get individual messages from queues without setting up consumers.
def basic_get(self, queue, auto_ack=False):
"""
Get a single message from queue.
Parameters:
- queue (str): Queue name to get message from
- auto_ack (bool): If True, automatically acknowledge message
Returns:
- tuple or None: (method, properties, body) if message available, None otherwise
"""Control message delivery rate and prefetch behavior.
def basic_qos(self, prefetch_size=0, prefetch_count=0, global_qos=False):
"""
Set quality of service parameters.
Parameters:
- prefetch_size (int): Prefetch window size in bytes (0 = no limit)
- prefetch_count (int): Number of messages to prefetch (0 = no limit)
- global_qos (bool): If True, apply QoS globally on connection
"""Declare, delete, purge, and bind queues.
def queue_declare(self, queue='', passive=False, durable=False, exclusive=False,
auto_delete=False, arguments=None):
"""
Declare a queue.
Parameters:
- queue (str): Queue name (empty string for server-generated name)
- passive (bool): If True, only check if queue exists
- durable (bool): If True, queue survives broker restart
- exclusive (bool): If True, queue is exclusive to this connection
- auto_delete (bool): If True, queue deletes when last consumer disconnects
- arguments (dict): Additional queue arguments
Returns:
- QueueDeclareOk: Result with queue name, message count, consumer count
"""
def queue_delete(self, queue, if_unused=False, if_empty=False):
"""
Delete a queue.
Parameters:
- queue (str): Queue name to delete
- if_unused (bool): If True, only delete if no consumers
- if_empty (bool): If True, only delete if no messages
Returns:
- QueueDeleteOk: Result with message count
"""
def queue_purge(self, queue):
"""
Purge messages from queue.
Parameters:
- queue (str): Queue name to purge
Returns:
- QueuePurgeOk: Result with purged message count
"""
def queue_bind(self, queue, exchange, routing_key=None, arguments=None):
"""
Bind queue to exchange.
Parameters:
- queue (str): Queue name to bind
- exchange (str): Exchange name to bind to
- routing_key (str): Routing key for binding
- arguments (dict): Additional binding arguments
"""
def queue_unbind(self, queue, exchange, routing_key=None, arguments=None):
"""
Unbind queue from exchange.
Parameters:
- queue (str): Queue name to unbind
- exchange (str): Exchange name to unbind from
- routing_key (str): Routing key for binding
- arguments (dict): Additional binding arguments
"""Declare, delete, and bind exchanges.
def exchange_declare(self, exchange, exchange_type='direct', passive=False,
durable=False, auto_delete=False, internal=False, arguments=None):
"""
Declare an exchange.
Parameters:
- exchange (str): Exchange name
- exchange_type (str): Exchange type ('direct', 'fanout', 'topic', 'headers')
- passive (bool): If True, only check if exchange exists
- durable (bool): If True, exchange survives broker restart
- auto_delete (bool): If True, exchange deletes when last queue unbinds
- internal (bool): If True, exchange is internal (cannot be published to directly)
- arguments (dict): Additional exchange arguments
"""
def exchange_delete(self, exchange, if_unused=False):
"""
Delete an exchange.
Parameters:
- exchange (str): Exchange name to delete
- if_unused (bool): If True, only delete if no queue bindings
"""
def exchange_bind(self, destination, source, routing_key='', arguments=None):
"""
Bind exchange to another exchange.
Parameters:
- destination (str): Destination exchange name
- source (str): Source exchange name
- routing_key (str): Routing key for binding
- arguments (dict): Additional binding arguments
"""
def exchange_unbind(self, destination, source, routing_key='', arguments=None):
"""
Unbind exchange from another exchange.
Parameters:
- destination (str): Destination exchange name
- source (str): Source exchange name
- routing_key (str): Routing key for binding
- arguments (dict): Additional binding arguments
"""AMQP transaction support for atomic message operations.
def tx_select(self):
"""Start a transaction."""
def tx_commit(self):
"""Commit the current transaction."""
def tx_rollback(self):
"""Rollback the current transaction."""Enable publisher confirmations for reliable message delivery.
def confirm_delivery(self):
"""
Enable publisher confirmations.
Returns:
- bool: True if confirmations enabled
"""
@property
def publisher_confirms(self) -> bool:
"""True if publisher confirmations are enabled."""import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Declare a queue
channel.queue_declare(queue='task_queue', durable=True)
# Publish a message
message = "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(delivery_mode=2) # Make message persistent
)
connection.close()import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
def callback(ch, method, properties, body):
print(f"Received {body.decode()}")
# Simulate work
import time
time.sleep(1)
# Acknowledge the message
ch.basic_ack(delivery_tag=method.delivery_tag)
# Set up consumer
channel.basic_qos(prefetch_count=1) # Fair dispatch
channel.basic_consume(queue='task_queue', on_message_callback=callback)
print('Waiting for messages. Press CTRL+C to exit')
channel.start_consuming()import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Enable publisher confirms
channel.confirm_delivery()
try:
channel.basic_publish(
exchange='',
routing_key='test_queue',
body='Hello World!',
mandatory=True
)
print("Message published successfully")
except pika.exceptions.UnroutableError:
print("Message was returned as unroutable")
except pika.exceptions.NackError:
print("Message was nacked by broker")
connection.close()import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test_queue')
# Consume messages using generator
for method, properties, body in channel.consume('test_queue', auto_ack=True):
print(f"Received: {body.decode()}")
# Process 10 messages then stop
if method.delivery_tag == 10:
channel.cancel()
break
connection.close()import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Declare topic exchange
channel.exchange_declare(exchange='logs', exchange_type='topic', durable=True)
# Declare queue with TTL
queue_args = {'x-message-ttl': 60000} # 60 seconds
result = channel.queue_declare(queue='', exclusive=True, arguments=queue_args)
queue_name = result.method.queue
# Bind queue to exchange with routing key pattern
channel.queue_bind(exchange='logs', queue=queue_name, routing_key='app.*.error')
connection.close()import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Start transaction
channel.tx_select()
try:
# Publish multiple messages in transaction
for i in range(5):
channel.basic_publish(
exchange='',
routing_key='transactional_queue',
body=f'Message {i}'
)
# Commit transaction
channel.tx_commit()
print("All messages published successfully")
except Exception as e:
# Rollback on error
channel.tx_rollback()
print(f"Transaction rolled back: {e}")
connection.close()Install with Tessl CLI
npx tessl i tessl/pypi-pika