Messaging library for Python that provides an idiomatic high-level interface for AMQP and other message brokers.
—
AMQP entity declarations for exchanges, queues, and bindings that define message routing topology and behavior. These entities provide the foundation for message routing, persistence, and delivery guarantees in messaging systems.
Represents AMQP exchange declarations that route messages to queues based on routing rules and exchange types.
class Exchange:
def __init__(self, name='', type='direct', channel=None, durable=True, auto_delete=False, delivery_mode=None, arguments=None, **kwargs):
"""
Create exchange declaration.
Parameters:
- name (str): Exchange name
- type (str): Exchange type ('direct', 'topic', 'fanout', 'headers')
- channel: AMQP channel to bind to
- durable (bool): Survive broker restart
- auto_delete (bool): Delete when no queues bound
- delivery_mode (int): Default delivery mode (1=transient, 2=persistent)
- arguments (dict): Additional exchange arguments
"""
def declare(self, nowait=False, passive=None, channel=None):
"""
Declare exchange on the broker.
Parameters:
- nowait (bool): Don't wait for confirmation
- passive (bool): Only check if exchange exists
- channel: Channel to use (uses bound channel if None)
Returns:
Exchange instance for chaining
"""
def bind_to(self, exchange, routing_key='', arguments=None, nowait=False, channel=None):
"""
Bind this exchange to another exchange.
Parameters:
- exchange (Exchange|str): Source exchange
- routing_key (str): Binding routing key
- arguments (dict): Binding arguments
- nowait (bool): Don't wait for confirmation
- channel: Channel to use
Returns:
Exchange instance for chaining
"""
def unbind_from(self, source, routing_key='', arguments=None, nowait=False, channel=None):
"""
Unbind exchange from source exchange.
Parameters:
- source (Exchange|str): Source exchange to unbind from
- routing_key (str): Binding routing key that was used
- arguments (dict): Binding arguments that were used
- nowait (bool): Don't wait for confirmation
- channel: Channel to use
Returns:
Exchange instance for chaining
"""
def publish(self, message, routing_key=None, mandatory=False, immediate=False, **kwargs):
"""
Publish message to this exchange.
Parameters:
- message: Message body (will be serialized)
- routing_key (str): Message routing key
- mandatory (bool): Return message if no route found
- immediate (bool): Return message if no consumer ready
- **kwargs: Additional publish arguments
Returns:
Message instance
"""
def Message(self, body, delivery_mode=None, properties=None, **kwargs):
"""
Create message bound to this exchange.
Parameters:
- body: Message body
- delivery_mode (int): Delivery mode (1=transient, 2=persistent)
- properties (dict): Message properties
- **kwargs: Additional message arguments
Returns:
Message instance
"""
def delete(self, if_unused=False, nowait=False, channel=None):
"""
Delete exchange from broker.
Parameters:
- if_unused (bool): Only delete if no queues bound
- nowait (bool): Don't wait for confirmation
- channel: Channel to use
"""
def binding(self, routing_key='', arguments=None, unbind_arguments=None):
"""
Create binding object for this exchange.
Parameters:
- routing_key (str): Binding routing key
- arguments (dict): Binding arguments
- unbind_arguments (dict): Arguments for unbinding
Returns:
binding instance
"""
# Properties
@property
def name(self):
"""str: Exchange name"""
@property
def type(self):
"""str: Exchange type"""
@property
def durable(self):
"""bool: Durability flag"""
@property
def auto_delete(self):
"""bool: Auto-delete flag"""
@property
def delivery_mode(self):
"""int: Default delivery mode"""
@property
def arguments(self):
"""dict: Additional arguments"""Represents AMQP queue declarations that store messages and define consumption parameters.
class Queue:
def __init__(self, name='', exchange=None, routing_key='', channel=None, durable=True, exclusive=False, auto_delete=False, no_ack=None, alias=None, bindings=None, on_declared=None, expires=None, message_ttl=None, max_length=None, max_length_bytes=None, max_priority=None, queue_arguments=None, binding_arguments=None, consumer_arguments=None, **kwargs):
"""
Create queue declaration.
Parameters:
- name (str): Queue name
- exchange (Exchange): Exchange to bind to
- routing_key (str): Routing key for binding
- channel: AMQP channel to bind to
- durable (bool): Survive broker restart
- exclusive (bool): Only allow one connection
- auto_delete (bool): Delete when no consumers
- no_ack (bool): Disable acknowledgments
- alias (str): Alias name for queue
- bindings (list): Additional bindings
- on_declared (callable): Callback when declared
- expires (int): Queue expiry time in ms (RabbitMQ)
- message_ttl (int): Message TTL in ms (RabbitMQ)
- max_length (int): Max queue length (RabbitMQ)
- max_length_bytes (int): Max queue size in bytes (RabbitMQ)
- max_priority (int): Max message priority (RabbitMQ)
- queue_arguments (dict): Queue-specific arguments
- binding_arguments (dict): Binding-specific arguments
- consumer_arguments (dict): Consumer-specific arguments
"""
def declare(self, nowait=False, channel=None):
"""
Declare queue and bindings on broker.
Parameters:
- nowait (bool): Don't wait for confirmation
- channel: Channel to use
Returns:
Queue instance for chaining
"""
def bind_to(self, exchange=None, routing_key=None, arguments=None, nowait=False, channel=None):
"""
Bind queue to exchange.
Parameters:
- exchange (Exchange|str): Exchange to bind to
- routing_key (str): Routing key for binding
- arguments (dict): Binding arguments
- nowait (bool): Don't wait for confirmation
- channel: Channel to use
Returns:
Queue instance for chaining
"""
def unbind_from(self, exchange, routing_key=None, arguments=None, nowait=False, channel=None):
"""
Unbind queue from exchange.
Parameters:
- exchange (Exchange|str): Exchange to unbind from
- routing_key (str): Routing key that was used
- arguments (dict): Binding arguments that were used
- nowait (bool): Don't wait for confirmation
- channel: Channel to use
Returns:
Queue instance for chaining
"""
def get(self, no_ack=None, accept=None):
"""
Poll for single message from queue.
Parameters:
- no_ack (bool): Disable acknowledgment
- accept (list): Accepted content types
Returns:
Message instance or None if queue empty
"""
def purge(self, nowait=False):
"""
Remove all ready messages from queue.
Parameters:
- nowait (bool): Don't wait for confirmation
Returns:
Number of messages purged
"""
def consume(self, consumer_tag='', callback=None, no_ack=None, nowait=False):
"""
Start consuming messages from queue.
Parameters:
- consumer_tag (str): Consumer identifier
- callback (callable): Message callback function
- no_ack (bool): Disable acknowledgments
- nowait (bool): Don't wait for confirmation
Returns:
Consumer tag
"""
def cancel(self, consumer_tag, nowait=False):
"""
Cancel queue consumer.
Parameters:
- consumer_tag (str): Consumer to cancel
- nowait (bool): Don't wait for confirmation
"""
def delete(self, if_unused=False, if_empty=False, nowait=False):
"""
Delete queue from broker.
Parameters:
- if_unused (bool): Only delete if no consumers
- if_empty (bool): Only delete if no messages
- nowait (bool): Don't wait for confirmation
Returns:
Number of messages deleted
"""
# Properties
@property
def name(self):
"""str: Queue name"""
@property
def exchange(self):
"""Exchange: Associated exchange"""
@property
def routing_key(self):
"""str: Routing/binding key"""
@property
def durable(self):
"""bool: Durability flag"""
@property
def exclusive(self):
"""bool: Exclusivity flag"""
@property
def auto_delete(self):
"""bool: Auto-delete flag"""
@property
def expires(self):
"""int: Queue expiry time"""
@property
def message_ttl(self):
"""int: Message TTL"""
@property
def max_length(self):
"""int: Maximum queue length"""
@property
def max_priority(self):
"""int: Maximum message priority"""Represents queue or exchange binding declarations that define routing relationships.
class binding:
def __init__(self, exchange=None, routing_key='', arguments=None, unbind_arguments=None):
"""
Create binding declaration.
Parameters:
- exchange (Exchange): Exchange to bind to
- routing_key (str): Routing key for binding
- arguments (dict): Binding arguments
- unbind_arguments (dict): Arguments for unbinding
"""
def declare(self, channel, nowait=False):
"""
Declare the destination exchange.
Parameters:
- channel: AMQP channel to use
- nowait (bool): Don't wait for confirmation
Returns:
binding instance for chaining
"""
def bind(self, entity, nowait=False, channel=None):
"""
Bind entity (queue/exchange) to this binding.
Parameters:
- entity (Queue|Exchange): Entity to bind
- nowait (bool): Don't wait for confirmation
- channel: Channel to use
Returns:
binding instance for chaining
"""
def unbind(self, entity, nowait=False, channel=None):
"""
Unbind entity from this binding.
Parameters:
- entity (Queue|Exchange): Entity to unbind
- nowait (bool): Don't wait for confirmation
- channel: Channel to use
Returns:
binding instance for chaining
"""
# Properties
@property
def exchange(self):
"""Exchange: Target exchange"""
@property
def routing_key(self):
"""str: Binding routing key"""
@property
def arguments(self):
"""dict: Binding arguments"""from kombu import Exchange, Queue
# Direct exchange - exact routing key match
direct_exchange = Exchange('logs', type='direct', durable=True)
# Topic exchange - pattern matching with wildcards
topic_exchange = Exchange('events', type='topic', durable=True)
# Fanout exchange - broadcast to all bound queues
fanout_exchange = Exchange('notifications', type='fanout', durable=True)
# Headers exchange - route by message headers
headers_exchange = Exchange('priority', type='headers', durable=True)from kombu import Connection, Exchange, Queue
# Define entities
task_exchange = Exchange('tasks', type='direct', durable=True)
task_queue = Queue(
'high_priority_tasks',
exchange=task_exchange,
routing_key='high',
durable=True,
message_ttl=300000, # 5 minutes
max_length=1000
)
with Connection('redis://localhost:6379/0') as conn:
channel = conn.channel()
# Declare exchange and queue
task_exchange.declare(channel=channel)
task_queue.declare(channel=channel)from kombu import Exchange, Queue, binding
# Create exchange and queue
log_exchange = Exchange('logs', type='topic', durable=True)
error_queue = Queue('error_logs', durable=True)
# Create multiple bindings for the queue
error_bindings = [
binding(log_exchange, 'app.*.error'),
binding(log_exchange, 'system.critical'),
binding(log_exchange, 'database.failure')
]
# Apply bindings to queue
error_queue.bindings = error_bindings
with Connection('amqp://localhost') as conn:
# Declare everything
error_queue.declare(channel=conn.channel())from kombu import Connection, Queue, Exchange
task_exchange = Exchange('tasks', type='direct')
task_queue = Queue('task_queue', task_exchange, routing_key='task')
with Connection('redis://localhost:6379/0') as conn:
channel = conn.channel()
task_queue = task_queue.bind(channel)
# Get single message
message = task_queue.get()
if message:
print(f"Received: {message.payload}")
message.ack()
# Purge queue
purged_count = task_queue.purge()
print(f"Purged {purged_count} messages")
# Get queue info (if supported by transport)
try:
# Some transports support queue inspection
info = channel.queue_declare(task_queue.name, passive=True)
print(f"Queue has {info.message_count} messages")
except Exception:
passfrom kombu import Exchange
# Create exchanges
source_exchange = Exchange('source', type='topic', durable=True)
destination_exchange = Exchange('destination', type='direct', durable=True)
with Connection('amqp://localhost') as conn:
channel = conn.channel()
# Declare both exchanges
source_exchange.declare(channel=channel)
destination_exchange.declare(channel=channel)
# Bind destination to source
destination_exchange.bind_to(
source_exchange,
routing_key='important.*',
channel=channel
)from kombu import Connection, Exchange, Queue
def create_user_queue(user_id):
"""Create dedicated queue for user"""
user_exchange = Exchange(f'user_{user_id}', type='direct', durable=True)
user_queue = Queue(
f'user_{user_id}_messages',
exchange=user_exchange,
routing_key='message',
durable=True,
auto_delete=True, # Clean up when user disconnects
expires=3600000 # Expire after 1 hour of inactivity
)
return user_exchange, user_queue
# Usage
with Connection('redis://localhost:6379/0') as conn:
exchange, queue = create_user_queue('12345')
# Declare entities
exchange.declare(channel=conn.channel())
queue.declare(channel=conn.channel())
# Later, when user is done
queue.delete(if_unused=True)
exchange.delete(if_unused=True)Install with Tessl CLI
npx tessl i tessl/pypi-kombu