CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kombu

Messaging library for Python that provides an idiomatic high-level interface for AMQP and other message brokers.

Pending
Overview
Eval results
Files

messaging.mddocs/

Messaging

High-level producer and consumer interfaces for publishing and receiving messages with comprehensive error handling and serialization support. The messaging API provides the primary interface for sending and receiving messages in Kombu applications.

Capabilities

Producer

Message producer for publishing messages to exchanges with serialization, compression, and delivery options.

class Producer:
    def __init__(self, channel, exchange=None, routing_key='', serializer=None, compression=None, auto_declare=True, on_return=None, **kwargs):
        """
        Create message producer.

        Parameters:
        - channel: AMQP channel to use
        - exchange (Exchange): Default exchange for publishing
        - routing_key (str): Default routing key
        - serializer (str): Default serialization method
        - compression (str): Default compression method
        - auto_declare (bool): Automatically declare entities
        - on_return (callable): Callback for returned messages
        """

    def declare(self):
        """
        Declare the default exchange and any entities in auto_declare list.

        Returns:
        Producer instance for chaining
        """

    def maybe_declare(self, entity, retry=False, **retry_policy):
        """
        Declare entity if not already declared (cached).

        Parameters:
        - entity (Exchange|Queue): Entity to declare
        - retry (bool): Enable retry on failure
        - retry_policy: Retry policy parameters

        Returns:
        bool: True if entity was declared
        """

    def publish(self, body, routing_key=None, delivery_mode=None, mandatory=False, immediate=False, priority=0, content_type=None, content_encoding=None, serializer=None, headers=None, compression=None, exchange=None, retry=False, retry_policy=None, declare=None, expiration=None, timeout=None, confirm_timeout=None, **properties):
        """
        Publish message to exchange.

        Parameters:
        - body: Message body (will be serialized)
        - routing_key (str): Message routing key
        - delivery_mode (int): Delivery mode (1=transient, 2=persistent)
        - mandatory (bool): Return message if no route found
        - immediate (bool): Return message if no consumer ready
        - priority (int): Message priority (0-255)
        - content_type (str): Content type override
        - content_encoding (str): Content encoding override
        - serializer (str): Serializer override
        - headers (dict): Message headers
        - compression (str): Compression method override
        - exchange (Exchange): Exchange override
        - retry (bool): Enable retry on failure
        - retry_policy (dict): Retry policy parameters
        - declare (list): Entities to declare before publishing
        - expiration (str): Message expiration time
        - timeout (float): Operation timeout in seconds
        - confirm_timeout (float): Publisher confirmation timeout
        - **properties: Additional message properties

        Returns:
        None
        """

    def revive(self, channel):
        """
        Revive producer after connection re-establishment.

        Parameters:
        - channel: New channel to use

        Returns:
        Producer instance for chaining
        """

    def close(self):
        """Close producer and cleanup resources."""

    def release(self):
        """Release producer resources (alias for close)."""

    # Properties
    @property
    def channel(self):
        """Channel: AMQP channel"""

    @property
    def exchange(self):
        """Exchange: Default exchange"""

    @property
    def routing_key(self):
        """str: Default routing key"""

    @property
    def serializer(self):
        """str: Default serializer"""

    @property
    def compression(self):
        """str: Default compression method"""

    @property
    def auto_declare(self):
        """bool: Auto-declare flag"""

    @property
    def on_return(self):
        """callable: Basic return callback"""

Consumer

Message consumer for receiving messages from queues with callback handling, acknowledgment control, and quality of service management.

class Consumer:
    def __init__(self, channel, queues=None, no_ack=None, auto_declare=True, callbacks=None, on_decode_error=None, on_message=None, accept=None, prefetch_count=None, tag_prefix=None, **kwargs):
        """
        Create message consumer.

        Parameters:
        - channel: AMQP channel to use
        - queues (list): Queues to consume from
        - no_ack (bool): Disable message acknowledgments
        - auto_declare (bool): Automatically declare entities
        - callbacks (list): Message callback functions
        - on_decode_error (callable): Decode error callback
        - on_message (callable): Alternative message handler
        - accept (list): Accepted content types
        - prefetch_count (int): QoS prefetch count
        - tag_prefix (str): Consumer tag prefix
        """

    def revive(self, channel):
        """
        Revive consumer after connection re-establishment.

        Parameters:
        - channel: New channel to use

        Returns:
        Consumer instance for chaining
        """

    def declare(self):
        """
        Declare queues, exchanges and bindings.

        Returns:
        Consumer instance for chaining
        """

    def register_callback(self, callback):
        """
        Register new callback function.

        Parameters:
        - callback (callable): Function to call for each message

        Returns:
        Consumer instance for chaining
        """

    def add_queue(self, queue):
        """
        Add queue to consume from.

        Parameters:
        - queue (Queue): Queue to add

        Returns:
        Consumer instance for chaining
        """

    def consume(self, no_ack=None):
        """
        Start consuming messages from queues.

        Parameters:
        - no_ack (bool): Disable acknowledgments override

        Returns:
        Consumer instance for chaining
        """

    def cancel(self):
        """
        End all active queue consumers.

        Returns:
        Consumer instance for chaining
        """

    def cancel_by_queue(self, queue):
        """
        Cancel consumer for specific queue.

        Parameters:
        - queue (str|Queue): Queue to stop consuming

        Returns:
        Consumer instance for chaining
        """

    def consuming_from(self, queue):
        """
        Check if currently consuming from queue.

        Parameters:
        - queue (str|Queue): Queue to check

        Returns:
        bool: True if consuming from queue
        """

    def purge(self):
        """
        Purge messages from all queues.

        Returns:
        int: Total number of messages purged
        """

    def flow(self, active):
        """
        Enable/disable flow from peer.

        Parameters:
        - active (bool): Enable or disable flow

        Returns:
        Consumer instance for chaining
        """

    def qos(self, prefetch_size=0, prefetch_count=0, apply_global=False):
        """
        Set quality of service limits.

        Parameters:
        - prefetch_size (int): Prefetch window size
        - prefetch_count (int): Prefetch message count
        - apply_global (bool): Apply globally or per-consumer

        Returns:
        Consumer instance for chaining
        """

    def recover(self, requeue=False):
        """
        Redeliver unacknowledged messages.

        Parameters:
        - requeue (bool): Requeue messages to original position

        Returns:
        Consumer instance for chaining
        """

    def receive(self, body, message):
        """
        Handle received message by calling callbacks.

        Parameters:
        - body: Decoded message body
        - message (Message): Message instance

        Returns:
        None
        """

    # Properties
    @property
    def channel(self):
        """Channel: AMQP channel"""

    @property
    def queues(self):
        """list: Queues being consumed"""

    @property
    def no_ack(self):
        """bool: Automatic acknowledgment flag"""

    @property
    def auto_declare(self):
        """bool: Auto-declare entities flag"""

    @property
    def callbacks(self):
        """list: Message callback functions"""

    @property
    def on_message(self):
        """callable: Alternative message handler"""

    @property
    def on_decode_error(self):
        """callable: Decode error callback"""

    @property
    def accept(self):
        """list: Accepted content types"""

    @property
    def prefetch_count(self):
        """int: QoS prefetch count"""

Message

Base class for received messages with acknowledgment, rejection, and decoding capabilities.

class Message:
    def __init__(self, body=None, delivery_tag=None, content_type=None, content_encoding=None, delivery_info=None, properties=None, headers=None, **kwargs):
        """
        Create message instance.

        Parameters:
        - body: Raw message body
        - delivery_tag: Unique delivery identifier
        - content_type (str): Message content type
        - content_encoding (str): Content encoding
        - delivery_info (dict): Delivery information
        - properties (dict): Message properties
        - headers (dict): Message headers
        """

    def ack(self, multiple=False):
        """
        Acknowledge message processing.

        Parameters:
        - multiple (bool): Acknowledge all messages up to this one

        Raises:
        MessageStateError: If message already acknowledged
        """

    def ack_log_error(self, logger, errors, multiple=False):
        """
        Acknowledge message with error logging.

        Parameters:
        - logger: Logger instance
        - errors (tuple): Error types to catch and log
        - multiple (bool): Acknowledge multiple messages

        Returns:
        bool: True if acknowledgment succeeded
        """

    def reject(self, requeue=False):
        """
        Reject message.

        Parameters:
        - requeue (bool): Requeue message for redelivery

        Raises:
        MessageStateError: If message already acknowledged
        """

    def reject_log_error(self, logger, errors, requeue=False):
        """
        Reject message with error logging.

        Parameters:
        - logger: Logger instance
        - errors (tuple): Error types to catch and log
        - requeue (bool): Requeue message

        Returns:
        bool: True if rejection succeeded
        """

    def requeue(self):
        """
        Reject and requeue message (shortcut for reject(requeue=True)).

        Raises:
        MessageStateError: If message already acknowledged
        """

    def decode(self):
        """
        Deserialize message body (cached).

        Returns:
        Decoded message body
        """

    def _decode(self):
        """
        Force re-decode message body.

        Returns:
        Decoded message body
        """

    # Properties
    @property
    def acknowledged(self):
        """bool: True if message has been acknowledged"""

    @property
    def payload(self):
        """Decoded message body (cached)"""

    @property
    def body(self):
        """Raw message body"""

    @property
    def content_type(self):
        """str: Message content type"""

    @property
    def content_encoding(self):
        """str: Message content encoding"""

    @property
    def delivery_info(self):
        """dict: Delivery information"""

    @property
    def headers(self):
        """dict: Message headers"""

    @property
    def properties(self):
        """dict: Message properties"""

Usage Examples

Basic Producer Usage

from kombu import Connection, Exchange, Producer

# Define exchange
task_exchange = Exchange('tasks', type='direct', durable=True)

with Connection('redis://localhost:6379/0') as conn:
    # Create producer
    producer = Producer(
        conn.channel(),
        exchange=task_exchange,
        routing_key='default',
        serializer='json'
    )
    
    # Publish messages
    producer.publish(
        {'task': 'process_data', 'args': [1, 2, 3]},
        routing_key='high_priority',
        headers={'origin': 'web_app'},
        priority=5
    )
    
    # Publish with different serializer
    producer.publish(
        b'binary data',
        routing_key='binary_task',
        serializer='pickle',
        content_type='application/x-python-serialize'
    )

Basic Consumer Usage

from kombu import Connection, Queue, Consumer

def process_message(body, message):
    """Message processing callback"""
    try:
        print(f"Processing: {body}")
        # Simulate work
        result = body['args'][0] + body['args'][1] 
        print(f"Result: {result}")
        
        # Acknowledge successful processing
        message.ack()
    except Exception as exc:
        print(f"Processing failed: {exc}")
        # Reject and requeue for retry
        message.reject(requeue=True)

# Define queue
task_queue = Queue('task_queue', durable=True)

with Connection('redis://localhost:6379/0') as conn:
    # Create consumer
    consumer = Consumer(
        conn.channel(),
        queues=[task_queue],
        callbacks=[process_message],
        prefetch_count=10
    )
    
    # Start consuming
    consumer.consume()
    
    # Process messages
    while True:
        try:
            conn.drain_events(timeout=1.0)
        except socket.timeout:
            break

Advanced Producer Features

from kombu import Connection, Exchange, Queue, Producer

# Setup entities
exchange = Exchange('notifications', type='topic', durable=True)
queue = Queue('email_notifications', exchange, routing_key='email.*')

with Connection('amqp://localhost') as conn:
    producer = Producer(conn.channel(), exchange=exchange)
    
    # Publish with automatic declaration
    producer.publish(
        {
            'to': 'user@example.com',
            'subject': 'Welcome!',
            'body': 'Welcome to our service'
        },
        routing_key='email.welcome',
        declare=[queue],  # Declare queue before publishing
        mandatory=True,   # Return if no route
        expiration='300000',  # 5 minute TTL
        headers={'priority': 'high'}
    )
    
    # Publish compressed message
    producer.publish(
        {'large': 'data' * 1000},
        routing_key='email.report',
        compression='gzip',
        serializer='pickle'
    )

Advanced Consumer Features

from kombu import Connection, Queue, Consumer
import logging

logger = logging.getLogger(__name__)

def handle_decode_error(message, exc):
    """Handle message decode errors"""
    logger.error(f"Failed to decode message: {exc}")
    # Log the raw message for debugging
    logger.error(f"Raw message body: {message.body}")
    # Reject without requeue to avoid infinite loop
    message.reject(requeue=False)

def process_message(body, message):
    """Process message with comprehensive error handling"""
    try:
        print(f"Processing message: {body}")
        
        # Simulate processing that might fail
        if body.get('fail'):
            raise ValueError("Simulated processing error")
            
        # Acknowledge successful processing
        message.ack_log_error(logger, (Exception,))
        
    except ValueError as exc:
        logger.error(f"Processing error: {exc}")
        # Reject and requeue for retry
        message.reject_log_error(logger, (Exception,), requeue=True)

# Setup queues with different priorities
high_priority_queue = Queue('high_priority', routing_key='high')
low_priority_queue = Queue('low_priority', routing_key='low')

with Connection('redis://localhost:6379/0') as conn:
    consumer = Consumer(
        conn.channel(),
        queues=[high_priority_queue, low_priority_queue],
        callbacks=[process_message],
        on_decode_error=handle_decode_error,
        accept=['json', 'pickle'],  # Only accept these content types
        prefetch_count=5
    )
    
    # Set QoS limits
    consumer.qos(prefetch_count=10, apply_global=True)
    
    # Start consuming
    consumer.consume()
    
    # Process with graceful shutdown
    try:
        while True:
            conn.drain_events(timeout=1.0)
    except KeyboardInterrupt:
        print("Shutting down...")
        consumer.cancel()

Message Inspection and Handling

from kombu import Connection, Queue, Consumer

def inspect_message(body, message):
    """Inspect message properties and handle accordingly"""
    
    # Check message properties
    print(f"Content type: {message.content_type}")
    print(f"Delivery info: {message.delivery_info}")
    print(f"Headers: {message.headers}")
    print(f"Properties: {message.properties}")
    
    # Handle based on message properties
    if message.headers and message.headers.get('priority') == 'urgent':
        print("Processing urgent message immediately")
        # Process immediately
        process_urgent(body)
        message.ack()
    elif message.properties.get('redelivered'):
        print("Message was redelivered - handling carefully")
        # Special handling for redelivered messages
        if handle_redelivered(body):
            message.ack()
        else:
            # Dead letter or discard
            message.reject(requeue=False)
    else:
        # Normal processing
        if process_normal(body):
            message.ack()
        else:
            message.requeue()

def process_urgent(body):
    # Urgent processing logic
    return True

def handle_redelivered(body):
    # Redelivered message logic
    return True

def process_normal(body):
    # Normal processing logic
    return True

queue = Queue('inspection_queue')

with Connection('redis://localhost:6379/0') as conn:
    consumer = Consumer(conn.channel(), [queue], callbacks=[inspect_message])
    consumer.consume()
    
    # Process messages
    conn.drain_events()

Producer with Return Handling

from kombu import Connection, Exchange, Producer

def handle_returned_message(exception, exchange, routing_key, message):
    """Handle messages returned by broker"""
    print(f"Message returned: {exception}")
    print(f"Exchange: {exchange}, Routing key: {routing_key}")
    print(f"Message: {message}")
    
    # Could implement retry logic, logging, etc.

exchange = Exchange('optional_routing', type='direct')

with Connection('amqp://localhost') as conn:
    producer = Producer(
        conn.channel(),
        exchange=exchange,
        on_return=handle_returned_message
    )
    
    # Publish with mandatory flag - will be returned if no route exists
    producer.publish(
        {'data': 'test'},
        routing_key='nonexistent_route',
        mandatory=True  # Return message if no queue bound
    )

Install with Tessl CLI

npx tessl i tessl/pypi-kombu

docs

compression.md

connection.md

entities.md

exceptions.md

index.md

messaging.md

mixins.md

pools.md

serialization.md

simple.md

tile.json