Messaging library for Python that provides an idiomatic high-level interface for AMQP and other message brokers.
—
High-level producer and consumer interfaces for publishing and receiving messages with comprehensive error handling and serialization support. The messaging API provides the primary interface for sending and receiving messages in Kombu applications.
Message producer for publishing messages to exchanges with serialization, compression, and delivery options.
class Producer:
def __init__(self, channel, exchange=None, routing_key='', serializer=None, compression=None, auto_declare=True, on_return=None, **kwargs):
"""
Create message producer.
Parameters:
- channel: AMQP channel to use
- exchange (Exchange): Default exchange for publishing
- routing_key (str): Default routing key
- serializer (str): Default serialization method
- compression (str): Default compression method
- auto_declare (bool): Automatically declare entities
- on_return (callable): Callback for returned messages
"""
def declare(self):
"""
Declare the default exchange and any entities in auto_declare list.
Returns:
Producer instance for chaining
"""
def maybe_declare(self, entity, retry=False, **retry_policy):
"""
Declare entity if not already declared (cached).
Parameters:
- entity (Exchange|Queue): Entity to declare
- retry (bool): Enable retry on failure
- retry_policy: Retry policy parameters
Returns:
bool: True if entity was declared
"""
def publish(self, body, routing_key=None, delivery_mode=None, mandatory=False, immediate=False, priority=0, content_type=None, content_encoding=None, serializer=None, headers=None, compression=None, exchange=None, retry=False, retry_policy=None, declare=None, expiration=None, timeout=None, confirm_timeout=None, **properties):
"""
Publish message to exchange.
Parameters:
- body: Message body (will be serialized)
- routing_key (str): Message routing key
- delivery_mode (int): Delivery mode (1=transient, 2=persistent)
- mandatory (bool): Return message if no route found
- immediate (bool): Return message if no consumer ready
- priority (int): Message priority (0-255)
- content_type (str): Content type override
- content_encoding (str): Content encoding override
- serializer (str): Serializer override
- headers (dict): Message headers
- compression (str): Compression method override
- exchange (Exchange): Exchange override
- retry (bool): Enable retry on failure
- retry_policy (dict): Retry policy parameters
- declare (list): Entities to declare before publishing
- expiration (str): Message expiration time
- timeout (float): Operation timeout in seconds
- confirm_timeout (float): Publisher confirmation timeout
- **properties: Additional message properties
Returns:
None
"""
def revive(self, channel):
"""
Revive producer after connection re-establishment.
Parameters:
- channel: New channel to use
Returns:
Producer instance for chaining
"""
def close(self):
"""Close producer and cleanup resources."""
def release(self):
"""Release producer resources (alias for close)."""
# Properties
@property
def channel(self):
"""Channel: AMQP channel"""
@property
def exchange(self):
"""Exchange: Default exchange"""
@property
def routing_key(self):
"""str: Default routing key"""
@property
def serializer(self):
"""str: Default serializer"""
@property
def compression(self):
"""str: Default compression method"""
@property
def auto_declare(self):
"""bool: Auto-declare flag"""
@property
def on_return(self):
"""callable: Basic return callback"""Message consumer for receiving messages from queues with callback handling, acknowledgment control, and quality of service management.
class Consumer:
def __init__(self, channel, queues=None, no_ack=None, auto_declare=True, callbacks=None, on_decode_error=None, on_message=None, accept=None, prefetch_count=None, tag_prefix=None, **kwargs):
"""
Create message consumer.
Parameters:
- channel: AMQP channel to use
- queues (list): Queues to consume from
- no_ack (bool): Disable message acknowledgments
- auto_declare (bool): Automatically declare entities
- callbacks (list): Message callback functions
- on_decode_error (callable): Decode error callback
- on_message (callable): Alternative message handler
- accept (list): Accepted content types
- prefetch_count (int): QoS prefetch count
- tag_prefix (str): Consumer tag prefix
"""
def revive(self, channel):
"""
Revive consumer after connection re-establishment.
Parameters:
- channel: New channel to use
Returns:
Consumer instance for chaining
"""
def declare(self):
"""
Declare queues, exchanges and bindings.
Returns:
Consumer instance for chaining
"""
def register_callback(self, callback):
"""
Register new callback function.
Parameters:
- callback (callable): Function to call for each message
Returns:
Consumer instance for chaining
"""
def add_queue(self, queue):
"""
Add queue to consume from.
Parameters:
- queue (Queue): Queue to add
Returns:
Consumer instance for chaining
"""
def consume(self, no_ack=None):
"""
Start consuming messages from queues.
Parameters:
- no_ack (bool): Disable acknowledgments override
Returns:
Consumer instance for chaining
"""
def cancel(self):
"""
End all active queue consumers.
Returns:
Consumer instance for chaining
"""
def cancel_by_queue(self, queue):
"""
Cancel consumer for specific queue.
Parameters:
- queue (str|Queue): Queue to stop consuming
Returns:
Consumer instance for chaining
"""
def consuming_from(self, queue):
"""
Check if currently consuming from queue.
Parameters:
- queue (str|Queue): Queue to check
Returns:
bool: True if consuming from queue
"""
def purge(self):
"""
Purge messages from all queues.
Returns:
int: Total number of messages purged
"""
def flow(self, active):
"""
Enable/disable flow from peer.
Parameters:
- active (bool): Enable or disable flow
Returns:
Consumer instance for chaining
"""
def qos(self, prefetch_size=0, prefetch_count=0, apply_global=False):
"""
Set quality of service limits.
Parameters:
- prefetch_size (int): Prefetch window size
- prefetch_count (int): Prefetch message count
- apply_global (bool): Apply globally or per-consumer
Returns:
Consumer instance for chaining
"""
def recover(self, requeue=False):
"""
Redeliver unacknowledged messages.
Parameters:
- requeue (bool): Requeue messages to original position
Returns:
Consumer instance for chaining
"""
def receive(self, body, message):
"""
Handle received message by calling callbacks.
Parameters:
- body: Decoded message body
- message (Message): Message instance
Returns:
None
"""
# Properties
@property
def channel(self):
"""Channel: AMQP channel"""
@property
def queues(self):
"""list: Queues being consumed"""
@property
def no_ack(self):
"""bool: Automatic acknowledgment flag"""
@property
def auto_declare(self):
"""bool: Auto-declare entities flag"""
@property
def callbacks(self):
"""list: Message callback functions"""
@property
def on_message(self):
"""callable: Alternative message handler"""
@property
def on_decode_error(self):
"""callable: Decode error callback"""
@property
def accept(self):
"""list: Accepted content types"""
@property
def prefetch_count(self):
"""int: QoS prefetch count"""Base class for received messages with acknowledgment, rejection, and decoding capabilities.
class Message:
def __init__(self, body=None, delivery_tag=None, content_type=None, content_encoding=None, delivery_info=None, properties=None, headers=None, **kwargs):
"""
Create message instance.
Parameters:
- body: Raw message body
- delivery_tag: Unique delivery identifier
- content_type (str): Message content type
- content_encoding (str): Content encoding
- delivery_info (dict): Delivery information
- properties (dict): Message properties
- headers (dict): Message headers
"""
def ack(self, multiple=False):
"""
Acknowledge message processing.
Parameters:
- multiple (bool): Acknowledge all messages up to this one
Raises:
MessageStateError: If message already acknowledged
"""
def ack_log_error(self, logger, errors, multiple=False):
"""
Acknowledge message with error logging.
Parameters:
- logger: Logger instance
- errors (tuple): Error types to catch and log
- multiple (bool): Acknowledge multiple messages
Returns:
bool: True if acknowledgment succeeded
"""
def reject(self, requeue=False):
"""
Reject message.
Parameters:
- requeue (bool): Requeue message for redelivery
Raises:
MessageStateError: If message already acknowledged
"""
def reject_log_error(self, logger, errors, requeue=False):
"""
Reject message with error logging.
Parameters:
- logger: Logger instance
- errors (tuple): Error types to catch and log
- requeue (bool): Requeue message
Returns:
bool: True if rejection succeeded
"""
def requeue(self):
"""
Reject and requeue message (shortcut for reject(requeue=True)).
Raises:
MessageStateError: If message already acknowledged
"""
def decode(self):
"""
Deserialize message body (cached).
Returns:
Decoded message body
"""
def _decode(self):
"""
Force re-decode message body.
Returns:
Decoded message body
"""
# Properties
@property
def acknowledged(self):
"""bool: True if message has been acknowledged"""
@property
def payload(self):
"""Decoded message body (cached)"""
@property
def body(self):
"""Raw message body"""
@property
def content_type(self):
"""str: Message content type"""
@property
def content_encoding(self):
"""str: Message content encoding"""
@property
def delivery_info(self):
"""dict: Delivery information"""
@property
def headers(self):
"""dict: Message headers"""
@property
def properties(self):
"""dict: Message properties"""from kombu import Connection, Exchange, Producer
# Define exchange
task_exchange = Exchange('tasks', type='direct', durable=True)
with Connection('redis://localhost:6379/0') as conn:
# Create producer
producer = Producer(
conn.channel(),
exchange=task_exchange,
routing_key='default',
serializer='json'
)
# Publish messages
producer.publish(
{'task': 'process_data', 'args': [1, 2, 3]},
routing_key='high_priority',
headers={'origin': 'web_app'},
priority=5
)
# Publish with different serializer
producer.publish(
b'binary data',
routing_key='binary_task',
serializer='pickle',
content_type='application/x-python-serialize'
)from kombu import Connection, Queue, Consumer
def process_message(body, message):
"""Message processing callback"""
try:
print(f"Processing: {body}")
# Simulate work
result = body['args'][0] + body['args'][1]
print(f"Result: {result}")
# Acknowledge successful processing
message.ack()
except Exception as exc:
print(f"Processing failed: {exc}")
# Reject and requeue for retry
message.reject(requeue=True)
# Define queue
task_queue = Queue('task_queue', durable=True)
with Connection('redis://localhost:6379/0') as conn:
# Create consumer
consumer = Consumer(
conn.channel(),
queues=[task_queue],
callbacks=[process_message],
prefetch_count=10
)
# Start consuming
consumer.consume()
# Process messages
while True:
try:
conn.drain_events(timeout=1.0)
except socket.timeout:
breakfrom kombu import Connection, Exchange, Queue, Producer
# Setup entities
exchange = Exchange('notifications', type='topic', durable=True)
queue = Queue('email_notifications', exchange, routing_key='email.*')
with Connection('amqp://localhost') as conn:
producer = Producer(conn.channel(), exchange=exchange)
# Publish with automatic declaration
producer.publish(
{
'to': 'user@example.com',
'subject': 'Welcome!',
'body': 'Welcome to our service'
},
routing_key='email.welcome',
declare=[queue], # Declare queue before publishing
mandatory=True, # Return if no route
expiration='300000', # 5 minute TTL
headers={'priority': 'high'}
)
# Publish compressed message
producer.publish(
{'large': 'data' * 1000},
routing_key='email.report',
compression='gzip',
serializer='pickle'
)from kombu import Connection, Queue, Consumer
import logging
logger = logging.getLogger(__name__)
def handle_decode_error(message, exc):
"""Handle message decode errors"""
logger.error(f"Failed to decode message: {exc}")
# Log the raw message for debugging
logger.error(f"Raw message body: {message.body}")
# Reject without requeue to avoid infinite loop
message.reject(requeue=False)
def process_message(body, message):
"""Process message with comprehensive error handling"""
try:
print(f"Processing message: {body}")
# Simulate processing that might fail
if body.get('fail'):
raise ValueError("Simulated processing error")
# Acknowledge successful processing
message.ack_log_error(logger, (Exception,))
except ValueError as exc:
logger.error(f"Processing error: {exc}")
# Reject and requeue for retry
message.reject_log_error(logger, (Exception,), requeue=True)
# Setup queues with different priorities
high_priority_queue = Queue('high_priority', routing_key='high')
low_priority_queue = Queue('low_priority', routing_key='low')
with Connection('redis://localhost:6379/0') as conn:
consumer = Consumer(
conn.channel(),
queues=[high_priority_queue, low_priority_queue],
callbacks=[process_message],
on_decode_error=handle_decode_error,
accept=['json', 'pickle'], # Only accept these content types
prefetch_count=5
)
# Set QoS limits
consumer.qos(prefetch_count=10, apply_global=True)
# Start consuming
consumer.consume()
# Process with graceful shutdown
try:
while True:
conn.drain_events(timeout=1.0)
except KeyboardInterrupt:
print("Shutting down...")
consumer.cancel()from kombu import Connection, Queue, Consumer
def inspect_message(body, message):
"""Inspect message properties and handle accordingly"""
# Check message properties
print(f"Content type: {message.content_type}")
print(f"Delivery info: {message.delivery_info}")
print(f"Headers: {message.headers}")
print(f"Properties: {message.properties}")
# Handle based on message properties
if message.headers and message.headers.get('priority') == 'urgent':
print("Processing urgent message immediately")
# Process immediately
process_urgent(body)
message.ack()
elif message.properties.get('redelivered'):
print("Message was redelivered - handling carefully")
# Special handling for redelivered messages
if handle_redelivered(body):
message.ack()
else:
# Dead letter or discard
message.reject(requeue=False)
else:
# Normal processing
if process_normal(body):
message.ack()
else:
message.requeue()
def process_urgent(body):
# Urgent processing logic
return True
def handle_redelivered(body):
# Redelivered message logic
return True
def process_normal(body):
# Normal processing logic
return True
queue = Queue('inspection_queue')
with Connection('redis://localhost:6379/0') as conn:
consumer = Consumer(conn.channel(), [queue], callbacks=[inspect_message])
consumer.consume()
# Process messages
conn.drain_events()from kombu import Connection, Exchange, Producer
def handle_returned_message(exception, exchange, routing_key, message):
"""Handle messages returned by broker"""
print(f"Message returned: {exception}")
print(f"Exchange: {exchange}, Routing key: {routing_key}")
print(f"Message: {message}")
# Could implement retry logic, logging, etc.
exchange = Exchange('optional_routing', type='direct')
with Connection('amqp://localhost') as conn:
producer = Producer(
conn.channel(),
exchange=exchange,
on_return=handle_returned_message
)
# Publish with mandatory flag - will be returned if no route exists
producer.publish(
{'data': 'test'},
routing_key='nonexistent_route',
mandatory=True # Return message if no queue bound
)Install with Tessl CLI
npx tessl i tessl/pypi-kombu