CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kombu

Messaging library for Python that provides an idiomatic high-level interface for AMQP and other message brokers.

Pending
Overview
Eval results
Files

mixins.mddocs/

Consumer Mixins

Ready-to-use consumer frameworks that provide structured approaches for building consumer applications with connection management and error handling. The mixin classes offer a robust foundation for building long-running consumer services.

Capabilities

ConsumerMixin

Convenience mixin for implementing consumer programs with automatic connection management, error handling, and graceful shutdown support.

class ConsumerMixin:
    def get_consumers(self, Consumer, channel):
        """
        Abstract method that must be implemented by subclasses.
        
        Should return a list of Consumer instances.

        Parameters:
        - Consumer (class): Consumer class to instantiate
        - channel: AMQP channel to use

        Returns:
        list: List of Consumer instances

        Must be implemented by subclasses.
        """

    def run(self, _tokens=1, **kwargs):
        """
        Main run loop that handles connections and consumers.

        Parameters:
        - _tokens (int): Number of times to restart on connection failure
        - **kwargs: Additional arguments passed to consume()

        Returns:
        None
        """

    def consume(self, limit=None, timeout=None, safety_interval=1, **kwargs):
        """
        Consume messages from queues.

        Parameters:
        - limit (int): Maximum number of messages to process
        - timeout (float): Timeout for each drain_events call
        - safety_interval (float): Sleep interval between iterations
        - **kwargs: Additional consume arguments

        Returns:
        None
        """

    def on_connection_error(self, exc, interval):
        """
        Called when connection error occurs.

        Parameters:
        - exc (Exception): Connection exception
        - interval (float): Sleep interval before retry

        Override to customize error handling.
        """

    def on_connection_revived(self):
        """
        Called when connection is re-established after failure.

        Override to perform cleanup or reinitialization.
        """

    def on_consume_ready(self, connection, channel, consumers, **kwargs):
        """
        Called when consumers are ready to process messages.

        Parameters:
        - connection: Connection instance
        - channel: AMQP channel
        - consumers (list): List of Consumer instances
        - **kwargs: Additional context

        Override to perform setup before message processing.
        """

    def on_consume_end(self, connection, channel):
        """
        Called when consume loop ends.

        Parameters:
        - connection: Connection instance
        - channel: AMQP channel

        Override to perform cleanup after message processing.
        """

    def on_iteration(self):
        """
        Called on each iteration of the consume loop.

        Override to perform periodic tasks.
        """

    def on_decode_error(self, message, exc):
        """
        Called when message decode error occurs.

        Parameters:
        - message (Message): Message that failed to decode
        - exc (Exception): Decode exception

        Override to handle decode errors. Default rejects message.
        """

    def extra_context(self, connection, channel):
        """
        Extra context manager for consume loop.

        Parameters:
        - connection: Connection instance
        - channel: AMQP channel

        Returns:
        Context manager or None

        Override to provide additional context management.
        """

    # Properties and attributes
    @property
    def connect_max_retries(self):
        """int: Maximum connection retry attempts (default: 5)"""

    @property
    def should_stop(self):
        """bool: Flag to stop the consumer (set to True to stop)"""

ConsumerProducerMixin

Consumer and Producer mixin that provides separate producer connection for publishing messages while consuming, preventing deadlocks and improving performance.

class ConsumerProducerMixin(ConsumerMixin):
    def get_consumers(self, Consumer, channel):
        """
        Abstract method inherited from ConsumerMixin.
        
        Must be implemented by subclasses.
        """

    @property
    def producer(self):
        """
        Producer instance with separate connection.

        Returns:
        Producer: Producer for publishing messages

        Automatically creates and manages separate connection.
        """

    @property
    def producer_connection(self):
        """
        Separate connection for producer.

        Returns:
        Connection: Producer connection instance

        Automatically created and managed.
        """

    # Inherits all other methods from ConsumerMixin

Usage Examples

Basic ConsumerMixin Implementation

from kombu import Connection, Queue, Exchange
from kombu.mixins import ConsumerMixin

class TaskWorker(ConsumerMixin):
    def __init__(self, connection, queues):
        self.connection = connection
        self.queues = queues
    
    def get_consumers(self, Consumer, channel):
        return [
            Consumer(
                queues=self.queues,
                callbacks=[self.process_message],
                accept=['json', 'pickle']
            )
        ]
    
    def process_message(self, body, message):
        print(f"Processing task: {body}")
        
        try:
            # Simulate work
            task_type = body.get('type')
            if task_type == 'email':
                self.send_email(body)
            elif task_type == 'report':
                self.generate_report(body)
            else:
                print(f"Unknown task type: {task_type}")
            
            message.ack()
            print(f"Task completed: {body.get('id')}")
            
        except Exception as exc:
            print(f"Task failed: {exc}")
            message.reject(requeue=True)
    
    def send_email(self, task):
        # Email sending logic
        print(f"Sending email: {task}")
    
    def generate_report(self, task):
        # Report generation logic
        print(f"Generating report: {task}")

# Usage
if __name__ == '__main__':
    # Define message routing
    task_exchange = Exchange('tasks', type='direct', durable=True)
    task_queue = Queue(
        'task_queue',
        exchange=task_exchange,
        routing_key='task',
        durable=True
    )
    
    # Create and run worker
    with Connection('redis://localhost:6379/0') as conn:
        worker = TaskWorker(conn, [task_queue])
        try:
            worker.run()
        except KeyboardInterrupt:
            print('Stopping worker...')

Advanced ConsumerMixin with Error Handling

from kombu import Connection, Queue, Exchange
from kombu.mixins import ConsumerMixin
import logging
import time

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class RobustWorker(ConsumerMixin):
    def __init__(self, connection, queues):
        self.connection = connection
        self.queues = queues
        self.processed_count = 0
        self.error_count = 0
    
    def get_consumers(self, Consumer, channel):
        return [
            Consumer(
                queues=self.queues,
                callbacks=[self.process_message],
                prefetch_count=10,
                accept=['json']
            )
        ]
    
    def process_message(self, body, message):
        start_time = time.time()
        
        try:
            logger.info(f"Processing message: {body.get('id')}")
            
            # Simulate variable processing time
            processing_time = body.get('processing_time', 1.0)
            time.sleep(processing_time)
            
            # Simulate occasional failures
            if body.get('should_fail', False):
                raise ValueError("Simulated processing failure")
            
            message.ack()
            self.processed_count += 1
            
            duration = time.time() - start_time
            logger.info(f"Message processed in {duration:.2f}s")
            
        except Exception as exc:
            self.error_count += 1
            logger.error(f"Processing failed: {exc}")
            
            # Check retry count
            retry_count = message.headers.get('x-retry-count', 0) if message.headers else 0
            if retry_count < 3:
                # Increment retry count and requeue
                logger.info(f"Requeuing message (retry {retry_count + 1}/3)")
                # Note: Headers manipulation depends on transport support
                message.reject(requeue=True)
            else:
                # Max retries exceeded, reject permanently
                logger.error(f"Max retries exceeded, rejecting message")
                message.reject(requeue=False)
    
    def on_connection_error(self, exc, interval):
        logger.error(f"Connection error: {exc}, retrying in {interval}s")
    
    def on_connection_revived(self):
        logger.info("Connection re-established")
    
    def on_consume_ready(self, connection, channel, consumers, **kwargs):
        logger.info(f"Ready to consume from {len(self.queues)} queues")
    
    def on_consume_end(self, connection, channel):
        logger.info(f"Consumer stopped. Processed: {self.processed_count}, Errors: {self.error_count}")
    
    def on_iteration(self):
        # Log stats every 100 messages
        if self.processed_count > 0 and self.processed_count % 100 == 0:
            logger.info(f"Stats - Processed: {self.processed_count}, Errors: {self.error_count}")
    
    def on_decode_error(self, message, exc):
        logger.error(f"Message decode error: {exc}")
        logger.error(f"Raw message: {message.body}")
        message.reject(requeue=False)

# Usage with graceful shutdown
if __name__ == '__main__':
    queue = Queue('robust_queue', durable=True)
    
    with Connection('redis://localhost:6379/0') as conn:
        worker = RobustWorker(conn, [queue])
        
        try:
            worker.run()
        except KeyboardInterrupt:
            logger.info('Received interrupt, stopping...')
            worker.should_stop = True

ConsumerProducerMixin Implementation

from kombu import Connection, Queue, Exchange
from kombu.mixins import ConsumerProducerMixin
import json
import time

class RequestProcessor(ConsumerProducerMixin):
    def __init__(self, connection, request_queue, response_exchange):
        self.connection = connection
        self.request_queue = request_queue
        self.response_exchange = response_exchange
    
    def get_consumers(self, Consumer, channel):
        return [
            Consumer(
                queues=[self.request_queue],
                callbacks=[self.process_request],
                prefetch_count=5
            )
        ]
    
    def process_request(self, body, message):
        request_id = body.get('id')
        print(f"Processing request {request_id}")
        
        try:
            # Process the request
            result = self.handle_request(body)
            
            # Send response using separate producer connection
            response = {
                'request_id': request_id,
                'status': 'success',
                'result': result,
                'processed_at': time.time()
            }
            
            # Publish response
            self.producer.publish(
                response,
                exchange=self.response_exchange,
                routing_key=body.get('reply_to', 'default'),
                serializer='json'
            )
            
            message.ack()
            print(f"Request {request_id} completed successfully")
            
        except Exception as exc:
            print(f"Request {request_id} failed: {exc}")
            
            # Send error response
            error_response = {
                'request_id': request_id,
                'status': 'error',
                'error': str(exc),
                'processed_at': time.time()
            }
            
            self.producer.publish(
                error_response,
                exchange=self.response_exchange,
                routing_key=body.get('reply_to', 'errors'),
                serializer='json'
            )
            
            message.ack()  # Acknowledge even failed messages
    
    def handle_request(self, request):
        # Simulate request processing
        request_type = request.get('type')
        
        if request_type == 'calculation':
            return {'result': request['a'] + request['b']}
        elif request_type == 'lookup':
            return {'data': f"Data for {request['key']}"}
        else:
            raise ValueError(f"Unknown request type: {request_type}")

# Usage
if __name__ == '__main__':
    # Define routing
    request_queue = Queue('requests', durable=True)
    response_exchange = Exchange('responses', type='direct', durable=True)
    
    with Connection('redis://localhost:6379/0') as conn:
        processor = RequestProcessor(conn, request_queue, response_exchange)
        
        try:
            processor.run()
        except KeyboardInterrupt:
            print('Stopping processor...')

Multi-Queue Consumer

from kombu import Connection, Queue, Exchange
from kombu.mixins import ConsumerMixin

class MultiQueueWorker(ConsumerMixin):
    def __init__(self, connection):
        self.connection = connection
        self.stats = {
            'high_priority': 0,
            'normal_priority': 0,
            'low_priority': 0
        }
    
    def get_consumers(self, Consumer, channel):
        # Define different priority queues
        high_priority_queue = Queue('high_priority', durable=True)
        normal_priority_queue = Queue('normal_priority', durable=True)
        low_priority_queue = Queue('low_priority', durable=True)
        
        return [
            # High priority consumer with higher prefetch
            Consumer(
                queues=[high_priority_queue],
                callbacks=[self.process_high_priority],
                prefetch_count=20
            ),
            # Normal priority consumer
            Consumer(
                queues=[normal_priority_queue],
                callbacks=[self.process_normal_priority],
                prefetch_count=10
            ),
            # Low priority consumer with lower prefetch
            Consumer(
                queues=[low_priority_queue],
                callbacks=[self.process_low_priority],
                prefetch_count=5
            )
        ]
    
    def process_high_priority(self, body, message):
        print(f"HIGH PRIORITY: {body}")
        # Fast processing for high priority
        self.stats['high_priority'] += 1
        message.ack()
    
    def process_normal_priority(self, body, message):
        print(f"Normal priority: {body}")
        # Standard processing
        time.sleep(0.1)  # Simulate work
        self.stats['normal_priority'] += 1
        message.ack()
    
    def process_low_priority(self, body, message):
        print(f"low priority: {body}")
        # Slower processing for low priority
        time.sleep(0.5)  # Simulate slower work
        self.stats['low_priority'] += 1
        message.ack()
    
    def on_iteration(self):
        # Print stats periodically
        total = sum(self.stats.values())
        if total > 0 and total % 50 == 0:
            print(f"Stats: {self.stats}")

# Usage
if __name__ == '__main__':
    with Connection('redis://localhost:6379/0') as conn:
        worker = MultiQueueWorker(conn)
        
        try:
            worker.run()
        except KeyboardInterrupt:
            print(f'Final stats: {worker.stats}')

Mixin with Custom Context Management

from kombu import Connection, Queue
from kombu.mixins import ConsumerMixin
from contextlib import contextmanager
import redis
import logging

class CacheIntegratedWorker(ConsumerMixin):
    def __init__(self, connection, queues, redis_url):
        self.connection = connection
        self.queues = queues
        self.redis_url = redis_url
        self.redis_client = None
        self.logger = logging.getLogger(__name__)
    
    def get_consumers(self, Consumer, channel):
        return [
            Consumer(
                queues=self.queues,
                callbacks=[self.process_with_cache]
            )
        ]
    
    def process_with_cache(self, body, message):
        # Use Redis cache in message processing
        cache_key = f"task:{body.get('id')}"
        
        # Check if already processed
        if self.redis_client.get(cache_key):
            self.logger.info(f"Task {body['id']} already processed, skipping")
            message.ack()
            return
        
        try:
            # Process the task
            result = self.process_task(body)
            
            # Cache the result
            self.redis_client.setex(
                cache_key, 
                3600,  # 1 hour TTL
                json.dumps(result)
            )
            
            message.ack()
            self.logger.info(f"Task {body['id']} processed and cached")
            
        except Exception as exc:
            self.logger.error(f"Task processing failed: {exc}")
            message.reject(requeue=True)
    
    def process_task(self, task):
        # Actual task processing logic
        return {'processed': True, 'data': task}
    
    @contextmanager
    def redis_connection(self):
        """Context manager for Redis connection"""
        client = redis.from_url(self.redis_url)
        try:
            yield client
        finally:
            client.close()
    
    def extra_context(self, connection, channel):
        """Provide Redis connection as extra context"""
        return self.redis_connection()
    
    def on_consume_ready(self, connection, channel, consumers, **kwargs):
        # Get Redis client from context
        self.redis_client = kwargs.get('redis_client')
        self.logger.info("Consumer ready with Redis integration")

# Usage would be:
# worker = CacheIntegratedWorker(conn, [queue], 'redis://localhost:6379/1')

Install with Tessl CLI

npx tessl i tessl/pypi-kombu

docs

compression.md

connection.md

entities.md

exceptions.md

index.md

messaging.md

mixins.md

pools.md

serialization.md

simple.md

tile.json