Messaging library for Python that provides an idiomatic high-level interface for AMQP and other message brokers.
—
Ready-to-use consumer frameworks that provide structured approaches for building consumer applications with connection management and error handling. The mixin classes offer a robust foundation for building long-running consumer services.
Convenience mixin for implementing consumer programs with automatic connection management, error handling, and graceful shutdown support.
class ConsumerMixin:
def get_consumers(self, Consumer, channel):
"""
Abstract method that must be implemented by subclasses.
Should return a list of Consumer instances.
Parameters:
- Consumer (class): Consumer class to instantiate
- channel: AMQP channel to use
Returns:
list: List of Consumer instances
Must be implemented by subclasses.
"""
def run(self, _tokens=1, **kwargs):
"""
Main run loop that handles connections and consumers.
Parameters:
- _tokens (int): Number of times to restart on connection failure
- **kwargs: Additional arguments passed to consume()
Returns:
None
"""
def consume(self, limit=None, timeout=None, safety_interval=1, **kwargs):
"""
Consume messages from queues.
Parameters:
- limit (int): Maximum number of messages to process
- timeout (float): Timeout for each drain_events call
- safety_interval (float): Sleep interval between iterations
- **kwargs: Additional consume arguments
Returns:
None
"""
def on_connection_error(self, exc, interval):
"""
Called when connection error occurs.
Parameters:
- exc (Exception): Connection exception
- interval (float): Sleep interval before retry
Override to customize error handling.
"""
def on_connection_revived(self):
"""
Called when connection is re-established after failure.
Override to perform cleanup or reinitialization.
"""
def on_consume_ready(self, connection, channel, consumers, **kwargs):
"""
Called when consumers are ready to process messages.
Parameters:
- connection: Connection instance
- channel: AMQP channel
- consumers (list): List of Consumer instances
- **kwargs: Additional context
Override to perform setup before message processing.
"""
def on_consume_end(self, connection, channel):
"""
Called when consume loop ends.
Parameters:
- connection: Connection instance
- channel: AMQP channel
Override to perform cleanup after message processing.
"""
def on_iteration(self):
"""
Called on each iteration of the consume loop.
Override to perform periodic tasks.
"""
def on_decode_error(self, message, exc):
"""
Called when message decode error occurs.
Parameters:
- message (Message): Message that failed to decode
- exc (Exception): Decode exception
Override to handle decode errors. Default rejects message.
"""
def extra_context(self, connection, channel):
"""
Extra context manager for consume loop.
Parameters:
- connection: Connection instance
- channel: AMQP channel
Returns:
Context manager or None
Override to provide additional context management.
"""
# Properties and attributes
@property
def connect_max_retries(self):
"""int: Maximum connection retry attempts (default: 5)"""
@property
def should_stop(self):
"""bool: Flag to stop the consumer (set to True to stop)"""Consumer and Producer mixin that provides separate producer connection for publishing messages while consuming, preventing deadlocks and improving performance.
class ConsumerProducerMixin(ConsumerMixin):
def get_consumers(self, Consumer, channel):
"""
Abstract method inherited from ConsumerMixin.
Must be implemented by subclasses.
"""
@property
def producer(self):
"""
Producer instance with separate connection.
Returns:
Producer: Producer for publishing messages
Automatically creates and manages separate connection.
"""
@property
def producer_connection(self):
"""
Separate connection for producer.
Returns:
Connection: Producer connection instance
Automatically created and managed.
"""
# Inherits all other methods from ConsumerMixinfrom kombu import Connection, Queue, Exchange
from kombu.mixins import ConsumerMixin
class TaskWorker(ConsumerMixin):
def __init__(self, connection, queues):
self.connection = connection
self.queues = queues
def get_consumers(self, Consumer, channel):
return [
Consumer(
queues=self.queues,
callbacks=[self.process_message],
accept=['json', 'pickle']
)
]
def process_message(self, body, message):
print(f"Processing task: {body}")
try:
# Simulate work
task_type = body.get('type')
if task_type == 'email':
self.send_email(body)
elif task_type == 'report':
self.generate_report(body)
else:
print(f"Unknown task type: {task_type}")
message.ack()
print(f"Task completed: {body.get('id')}")
except Exception as exc:
print(f"Task failed: {exc}")
message.reject(requeue=True)
def send_email(self, task):
# Email sending logic
print(f"Sending email: {task}")
def generate_report(self, task):
# Report generation logic
print(f"Generating report: {task}")
# Usage
if __name__ == '__main__':
# Define message routing
task_exchange = Exchange('tasks', type='direct', durable=True)
task_queue = Queue(
'task_queue',
exchange=task_exchange,
routing_key='task',
durable=True
)
# Create and run worker
with Connection('redis://localhost:6379/0') as conn:
worker = TaskWorker(conn, [task_queue])
try:
worker.run()
except KeyboardInterrupt:
print('Stopping worker...')from kombu import Connection, Queue, Exchange
from kombu.mixins import ConsumerMixin
import logging
import time
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RobustWorker(ConsumerMixin):
def __init__(self, connection, queues):
self.connection = connection
self.queues = queues
self.processed_count = 0
self.error_count = 0
def get_consumers(self, Consumer, channel):
return [
Consumer(
queues=self.queues,
callbacks=[self.process_message],
prefetch_count=10,
accept=['json']
)
]
def process_message(self, body, message):
start_time = time.time()
try:
logger.info(f"Processing message: {body.get('id')}")
# Simulate variable processing time
processing_time = body.get('processing_time', 1.0)
time.sleep(processing_time)
# Simulate occasional failures
if body.get('should_fail', False):
raise ValueError("Simulated processing failure")
message.ack()
self.processed_count += 1
duration = time.time() - start_time
logger.info(f"Message processed in {duration:.2f}s")
except Exception as exc:
self.error_count += 1
logger.error(f"Processing failed: {exc}")
# Check retry count
retry_count = message.headers.get('x-retry-count', 0) if message.headers else 0
if retry_count < 3:
# Increment retry count and requeue
logger.info(f"Requeuing message (retry {retry_count + 1}/3)")
# Note: Headers manipulation depends on transport support
message.reject(requeue=True)
else:
# Max retries exceeded, reject permanently
logger.error(f"Max retries exceeded, rejecting message")
message.reject(requeue=False)
def on_connection_error(self, exc, interval):
logger.error(f"Connection error: {exc}, retrying in {interval}s")
def on_connection_revived(self):
logger.info("Connection re-established")
def on_consume_ready(self, connection, channel, consumers, **kwargs):
logger.info(f"Ready to consume from {len(self.queues)} queues")
def on_consume_end(self, connection, channel):
logger.info(f"Consumer stopped. Processed: {self.processed_count}, Errors: {self.error_count}")
def on_iteration(self):
# Log stats every 100 messages
if self.processed_count > 0 and self.processed_count % 100 == 0:
logger.info(f"Stats - Processed: {self.processed_count}, Errors: {self.error_count}")
def on_decode_error(self, message, exc):
logger.error(f"Message decode error: {exc}")
logger.error(f"Raw message: {message.body}")
message.reject(requeue=False)
# Usage with graceful shutdown
if __name__ == '__main__':
queue = Queue('robust_queue', durable=True)
with Connection('redis://localhost:6379/0') as conn:
worker = RobustWorker(conn, [queue])
try:
worker.run()
except KeyboardInterrupt:
logger.info('Received interrupt, stopping...')
worker.should_stop = Truefrom kombu import Connection, Queue, Exchange
from kombu.mixins import ConsumerProducerMixin
import json
import time
class RequestProcessor(ConsumerProducerMixin):
def __init__(self, connection, request_queue, response_exchange):
self.connection = connection
self.request_queue = request_queue
self.response_exchange = response_exchange
def get_consumers(self, Consumer, channel):
return [
Consumer(
queues=[self.request_queue],
callbacks=[self.process_request],
prefetch_count=5
)
]
def process_request(self, body, message):
request_id = body.get('id')
print(f"Processing request {request_id}")
try:
# Process the request
result = self.handle_request(body)
# Send response using separate producer connection
response = {
'request_id': request_id,
'status': 'success',
'result': result,
'processed_at': time.time()
}
# Publish response
self.producer.publish(
response,
exchange=self.response_exchange,
routing_key=body.get('reply_to', 'default'),
serializer='json'
)
message.ack()
print(f"Request {request_id} completed successfully")
except Exception as exc:
print(f"Request {request_id} failed: {exc}")
# Send error response
error_response = {
'request_id': request_id,
'status': 'error',
'error': str(exc),
'processed_at': time.time()
}
self.producer.publish(
error_response,
exchange=self.response_exchange,
routing_key=body.get('reply_to', 'errors'),
serializer='json'
)
message.ack() # Acknowledge even failed messages
def handle_request(self, request):
# Simulate request processing
request_type = request.get('type')
if request_type == 'calculation':
return {'result': request['a'] + request['b']}
elif request_type == 'lookup':
return {'data': f"Data for {request['key']}"}
else:
raise ValueError(f"Unknown request type: {request_type}")
# Usage
if __name__ == '__main__':
# Define routing
request_queue = Queue('requests', durable=True)
response_exchange = Exchange('responses', type='direct', durable=True)
with Connection('redis://localhost:6379/0') as conn:
processor = RequestProcessor(conn, request_queue, response_exchange)
try:
processor.run()
except KeyboardInterrupt:
print('Stopping processor...')from kombu import Connection, Queue, Exchange
from kombu.mixins import ConsumerMixin
class MultiQueueWorker(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
self.stats = {
'high_priority': 0,
'normal_priority': 0,
'low_priority': 0
}
def get_consumers(self, Consumer, channel):
# Define different priority queues
high_priority_queue = Queue('high_priority', durable=True)
normal_priority_queue = Queue('normal_priority', durable=True)
low_priority_queue = Queue('low_priority', durable=True)
return [
# High priority consumer with higher prefetch
Consumer(
queues=[high_priority_queue],
callbacks=[self.process_high_priority],
prefetch_count=20
),
# Normal priority consumer
Consumer(
queues=[normal_priority_queue],
callbacks=[self.process_normal_priority],
prefetch_count=10
),
# Low priority consumer with lower prefetch
Consumer(
queues=[low_priority_queue],
callbacks=[self.process_low_priority],
prefetch_count=5
)
]
def process_high_priority(self, body, message):
print(f"HIGH PRIORITY: {body}")
# Fast processing for high priority
self.stats['high_priority'] += 1
message.ack()
def process_normal_priority(self, body, message):
print(f"Normal priority: {body}")
# Standard processing
time.sleep(0.1) # Simulate work
self.stats['normal_priority'] += 1
message.ack()
def process_low_priority(self, body, message):
print(f"low priority: {body}")
# Slower processing for low priority
time.sleep(0.5) # Simulate slower work
self.stats['low_priority'] += 1
message.ack()
def on_iteration(self):
# Print stats periodically
total = sum(self.stats.values())
if total > 0 and total % 50 == 0:
print(f"Stats: {self.stats}")
# Usage
if __name__ == '__main__':
with Connection('redis://localhost:6379/0') as conn:
worker = MultiQueueWorker(conn)
try:
worker.run()
except KeyboardInterrupt:
print(f'Final stats: {worker.stats}')from kombu import Connection, Queue
from kombu.mixins import ConsumerMixin
from contextlib import contextmanager
import redis
import logging
class CacheIntegratedWorker(ConsumerMixin):
def __init__(self, connection, queues, redis_url):
self.connection = connection
self.queues = queues
self.redis_url = redis_url
self.redis_client = None
self.logger = logging.getLogger(__name__)
def get_consumers(self, Consumer, channel):
return [
Consumer(
queues=self.queues,
callbacks=[self.process_with_cache]
)
]
def process_with_cache(self, body, message):
# Use Redis cache in message processing
cache_key = f"task:{body.get('id')}"
# Check if already processed
if self.redis_client.get(cache_key):
self.logger.info(f"Task {body['id']} already processed, skipping")
message.ack()
return
try:
# Process the task
result = self.process_task(body)
# Cache the result
self.redis_client.setex(
cache_key,
3600, # 1 hour TTL
json.dumps(result)
)
message.ack()
self.logger.info(f"Task {body['id']} processed and cached")
except Exception as exc:
self.logger.error(f"Task processing failed: {exc}")
message.reject(requeue=True)
def process_task(self, task):
# Actual task processing logic
return {'processed': True, 'data': task}
@contextmanager
def redis_connection(self):
"""Context manager for Redis connection"""
client = redis.from_url(self.redis_url)
try:
yield client
finally:
client.close()
def extra_context(self, connection, channel):
"""Provide Redis connection as extra context"""
return self.redis_connection()
def on_consume_ready(self, connection, channel, consumers, **kwargs):
# Get Redis client from context
self.redis_client = kwargs.get('redis_client')
self.logger.info("Consumer ready with Redis integration")
# Usage would be:
# worker = CacheIntegratedWorker(conn, [queue], 'redis://localhost:6379/1')Install with Tessl CLI
npx tessl i tessl/pypi-kombu