CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kombu

Messaging library for Python that provides an idiomatic high-level interface for AMQP and other message brokers.

Pending
Overview
Eval results
Files

entities.mddocs/

Message Entities

AMQP entity declarations for exchanges, queues, and bindings that define message routing topology and behavior. These entities provide the foundation for message routing, persistence, and delivery guarantees in messaging systems.

Capabilities

Exchange

Represents AMQP exchange declarations that route messages to queues based on routing rules and exchange types.

class Exchange:
    def __init__(self, name='', type='direct', channel=None, durable=True, auto_delete=False, delivery_mode=None, arguments=None, **kwargs):
        """
        Create exchange declaration.

        Parameters:
        - name (str): Exchange name
        - type (str): Exchange type ('direct', 'topic', 'fanout', 'headers')
        - channel: AMQP channel to bind to
        - durable (bool): Survive broker restart
        - auto_delete (bool): Delete when no queues bound
        - delivery_mode (int): Default delivery mode (1=transient, 2=persistent)
        - arguments (dict): Additional exchange arguments
        """

    def declare(self, nowait=False, passive=None, channel=None):
        """
        Declare exchange on the broker.

        Parameters:
        - nowait (bool): Don't wait for confirmation
        - passive (bool): Only check if exchange exists
        - channel: Channel to use (uses bound channel if None)

        Returns:
        Exchange instance for chaining
        """

    def bind_to(self, exchange, routing_key='', arguments=None, nowait=False, channel=None):
        """
        Bind this exchange to another exchange.

        Parameters:
        - exchange (Exchange|str): Source exchange
        - routing_key (str): Binding routing key
        - arguments (dict): Binding arguments
        - nowait (bool): Don't wait for confirmation
        - channel: Channel to use

        Returns:
        Exchange instance for chaining
        """

    def unbind_from(self, source, routing_key='', arguments=None, nowait=False, channel=None):
        """
        Unbind exchange from source exchange.

        Parameters:
        - source (Exchange|str): Source exchange to unbind from
        - routing_key (str): Binding routing key that was used
        - arguments (dict): Binding arguments that were used
        - nowait (bool): Don't wait for confirmation
        - channel: Channel to use

        Returns:
        Exchange instance for chaining
        """

    def publish(self, message, routing_key=None, mandatory=False, immediate=False, **kwargs):
        """
        Publish message to this exchange.

        Parameters:
        - message: Message body (will be serialized)
        - routing_key (str): Message routing key
        - mandatory (bool): Return message if no route found
        - immediate (bool): Return message if no consumer ready
        - **kwargs: Additional publish arguments

        Returns:
        Message instance
        """

    def Message(self, body, delivery_mode=None, properties=None, **kwargs):
        """
        Create message bound to this exchange.

        Parameters:
        - body: Message body
        - delivery_mode (int): Delivery mode (1=transient, 2=persistent)
        - properties (dict): Message properties
        - **kwargs: Additional message arguments

        Returns:
        Message instance
        """

    def delete(self, if_unused=False, nowait=False, channel=None):
        """
        Delete exchange from broker.

        Parameters:
        - if_unused (bool): Only delete if no queues bound
        - nowait (bool): Don't wait for confirmation
        - channel: Channel to use
        """

    def binding(self, routing_key='', arguments=None, unbind_arguments=None):
        """
        Create binding object for this exchange.

        Parameters:
        - routing_key (str): Binding routing key
        - arguments (dict): Binding arguments
        - unbind_arguments (dict): Arguments for unbinding

        Returns:
        binding instance
        """

    # Properties
    @property
    def name(self):
        """str: Exchange name"""

    @property  
    def type(self):
        """str: Exchange type"""

    @property
    def durable(self):
        """bool: Durability flag"""

    @property
    def auto_delete(self):
        """bool: Auto-delete flag"""

    @property
    def delivery_mode(self):
        """int: Default delivery mode"""

    @property
    def arguments(self):
        """dict: Additional arguments"""

Queue

Represents AMQP queue declarations that store messages and define consumption parameters.

class Queue:
    def __init__(self, name='', exchange=None, routing_key='', channel=None, durable=True, exclusive=False, auto_delete=False, no_ack=None, alias=None, bindings=None, on_declared=None, expires=None, message_ttl=None, max_length=None, max_length_bytes=None, max_priority=None, queue_arguments=None, binding_arguments=None, consumer_arguments=None, **kwargs):
        """
        Create queue declaration.

        Parameters:
        - name (str): Queue name
        - exchange (Exchange): Exchange to bind to
        - routing_key (str): Routing key for binding
        - channel: AMQP channel to bind to
        - durable (bool): Survive broker restart
        - exclusive (bool): Only allow one connection
        - auto_delete (bool): Delete when no consumers
        - no_ack (bool): Disable acknowledgments
        - alias (str): Alias name for queue
        - bindings (list): Additional bindings
        - on_declared (callable): Callback when declared
        - expires (int): Queue expiry time in ms (RabbitMQ)
        - message_ttl (int): Message TTL in ms (RabbitMQ)
        - max_length (int): Max queue length (RabbitMQ)
        - max_length_bytes (int): Max queue size in bytes (RabbitMQ)
        - max_priority (int): Max message priority (RabbitMQ)
        - queue_arguments (dict): Queue-specific arguments
        - binding_arguments (dict): Binding-specific arguments
        - consumer_arguments (dict): Consumer-specific arguments
        """

    def declare(self, nowait=False, channel=None):
        """
        Declare queue and bindings on broker.

        Parameters:
        - nowait (bool): Don't wait for confirmation
        - channel: Channel to use

        Returns:
        Queue instance for chaining
        """

    def bind_to(self, exchange=None, routing_key=None, arguments=None, nowait=False, channel=None):
        """
        Bind queue to exchange.

        Parameters:
        - exchange (Exchange|str): Exchange to bind to
        - routing_key (str): Routing key for binding
        - arguments (dict): Binding arguments
        - nowait (bool): Don't wait for confirmation
        - channel: Channel to use

        Returns:
        Queue instance for chaining
        """

    def unbind_from(self, exchange, routing_key=None, arguments=None, nowait=False, channel=None):
        """
        Unbind queue from exchange.

        Parameters:
        - exchange (Exchange|str): Exchange to unbind from
        - routing_key (str): Routing key that was used
        - arguments (dict): Binding arguments that were used
        - nowait (bool): Don't wait for confirmation
        - channel: Channel to use

        Returns:
        Queue instance for chaining
        """

    def get(self, no_ack=None, accept=None):
        """
        Poll for single message from queue.

        Parameters:
        - no_ack (bool): Disable acknowledgment
        - accept (list): Accepted content types

        Returns:
        Message instance or None if queue empty
        """

    def purge(self, nowait=False):
        """
        Remove all ready messages from queue.

        Parameters:
        - nowait (bool): Don't wait for confirmation

        Returns:
        Number of messages purged
        """

    def consume(self, consumer_tag='', callback=None, no_ack=None, nowait=False):
        """
        Start consuming messages from queue.

        Parameters:
        - consumer_tag (str): Consumer identifier
        - callback (callable): Message callback function
        - no_ack (bool): Disable acknowledgments
        - nowait (bool): Don't wait for confirmation

        Returns:
        Consumer tag
        """

    def cancel(self, consumer_tag, nowait=False):
        """
        Cancel queue consumer.

        Parameters:
        - consumer_tag (str): Consumer to cancel
        - nowait (bool): Don't wait for confirmation
        """

    def delete(self, if_unused=False, if_empty=False, nowait=False):
        """
        Delete queue from broker.

        Parameters:
        - if_unused (bool): Only delete if no consumers
        - if_empty (bool): Only delete if no messages
        - nowait (bool): Don't wait for confirmation

        Returns:
        Number of messages deleted
        """

    # Properties
    @property
    def name(self):
        """str: Queue name"""

    @property
    def exchange(self):
        """Exchange: Associated exchange"""

    @property
    def routing_key(self):
        """str: Routing/binding key"""

    @property
    def durable(self):
        """bool: Durability flag"""

    @property
    def exclusive(self):
        """bool: Exclusivity flag"""

    @property
    def auto_delete(self):
        """bool: Auto-delete flag"""

    @property
    def expires(self):
        """int: Queue expiry time"""

    @property
    def message_ttl(self):
        """int: Message TTL"""

    @property
    def max_length(self):
        """int: Maximum queue length"""

    @property
    def max_priority(self):
        """int: Maximum message priority"""

Binding

Represents queue or exchange binding declarations that define routing relationships.

class binding:
    def __init__(self, exchange=None, routing_key='', arguments=None, unbind_arguments=None):
        """
        Create binding declaration.

        Parameters:
        - exchange (Exchange): Exchange to bind to
        - routing_key (str): Routing key for binding
        - arguments (dict): Binding arguments
        - unbind_arguments (dict): Arguments for unbinding
        """

    def declare(self, channel, nowait=False):
        """
        Declare the destination exchange.

        Parameters:
        - channel: AMQP channel to use
        - nowait (bool): Don't wait for confirmation

        Returns:
        binding instance for chaining
        """

    def bind(self, entity, nowait=False, channel=None):
        """
        Bind entity (queue/exchange) to this binding.

        Parameters:
        - entity (Queue|Exchange): Entity to bind
        - nowait (bool): Don't wait for confirmation
        - channel: Channel to use

        Returns:
        binding instance for chaining
        """

    def unbind(self, entity, nowait=False, channel=None):
        """
        Unbind entity from this binding.

        Parameters:
        - entity (Queue|Exchange): Entity to unbind
        - nowait (bool): Don't wait for confirmation
        - channel: Channel to use

        Returns:
        binding instance for chaining
        """

    # Properties
    @property
    def exchange(self):
        """Exchange: Target exchange"""

    @property
    def routing_key(self):
        """str: Binding routing key"""

    @property
    def arguments(self):
        """dict: Binding arguments"""

Usage Examples

Exchange Types and Routing

from kombu import Exchange, Queue

# Direct exchange - exact routing key match
direct_exchange = Exchange('logs', type='direct', durable=True)

# Topic exchange - pattern matching with wildcards
topic_exchange = Exchange('events', type='topic', durable=True)

# Fanout exchange - broadcast to all bound queues
fanout_exchange = Exchange('notifications', type='fanout', durable=True)

# Headers exchange - route by message headers
headers_exchange = Exchange('priority', type='headers', durable=True)

Queue Declaration and Binding

from kombu import Connection, Exchange, Queue

# Define entities
task_exchange = Exchange('tasks', type='direct', durable=True)
task_queue = Queue(
    'high_priority_tasks',
    exchange=task_exchange,
    routing_key='high',
    durable=True,
    message_ttl=300000,  # 5 minutes
    max_length=1000
)

with Connection('redis://localhost:6379/0') as conn:
    channel = conn.channel()
    
    # Declare exchange and queue
    task_exchange.declare(channel=channel)
    task_queue.declare(channel=channel)

Multiple Bindings

from kombu import Exchange, Queue, binding

# Create exchange and queue
log_exchange = Exchange('logs', type='topic', durable=True)
error_queue = Queue('error_logs', durable=True)

# Create multiple bindings for the queue
error_bindings = [
    binding(log_exchange, 'app.*.error'),
    binding(log_exchange, 'system.critical'),
    binding(log_exchange, 'database.failure')
]

# Apply bindings to queue
error_queue.bindings = error_bindings

with Connection('amqp://localhost') as conn:
    # Declare everything
    error_queue.declare(channel=conn.channel())

Queue Operations

from kombu import Connection, Queue, Exchange

task_exchange = Exchange('tasks', type='direct')
task_queue = Queue('task_queue', task_exchange, routing_key='task')

with Connection('redis://localhost:6379/0') as conn:
    channel = conn.channel()
    task_queue = task_queue.bind(channel)
    
    # Get single message
    message = task_queue.get()
    if message:
        print(f"Received: {message.payload}")
        message.ack()
    
    # Purge queue
    purged_count = task_queue.purge()
    print(f"Purged {purged_count} messages")
    
    # Get queue info (if supported by transport)
    try:
        # Some transports support queue inspection
        info = channel.queue_declare(task_queue.name, passive=True)
        print(f"Queue has {info.message_count} messages")
    except Exception:
        pass

Exchange-to-Exchange Binding

from kombu import Exchange

# Create exchanges
source_exchange = Exchange('source', type='topic', durable=True)
destination_exchange = Exchange('destination', type='direct', durable=True)

with Connection('amqp://localhost') as conn:
    channel = conn.channel()
    
    # Declare both exchanges
    source_exchange.declare(channel=channel)
    destination_exchange.declare(channel=channel)
    
    # Bind destination to source
    destination_exchange.bind_to(
        source_exchange,
        routing_key='important.*',
        channel=channel
    )

Dynamic Queue Management

from kombu import Connection, Exchange, Queue

def create_user_queue(user_id):
    """Create dedicated queue for user"""
    user_exchange = Exchange(f'user_{user_id}', type='direct', durable=True)
    user_queue = Queue(
        f'user_{user_id}_messages',
        exchange=user_exchange,
        routing_key='message',
        durable=True,
        auto_delete=True,  # Clean up when user disconnects
        expires=3600000    # Expire after 1 hour of inactivity
    )
    return user_exchange, user_queue

# Usage
with Connection('redis://localhost:6379/0') as conn:
    exchange, queue = create_user_queue('12345')
    
    # Declare entities
    exchange.declare(channel=conn.channel())
    queue.declare(channel=conn.channel())
    
    # Later, when user is done
    queue.delete(if_unused=True)
    exchange.delete(if_unused=True)

Install with Tessl CLI

npx tessl i tessl/pypi-kombu

docs

compression.md

connection.md

entities.md

exceptions.md

index.md

messaging.md

mixins.md

pools.md

serialization.md

simple.md

tile.json