Messaging library for Python that provides an idiomatic high-level interface for AMQP and other message brokers.
npx @tessl/cli install tessl/pypi-kombu@5.5.0A comprehensive messaging library for Python that provides an idiomatic high-level interface for the Advanced Message Queuing Protocol (AMQP) and other message brokers. Kombu enables developers to build robust messaging systems with support for multiple message servers including RabbitMQ, Redis, MongoDB, Amazon SQS, and others.
pip install kombuimport kombuCommon patterns for working with messaging:
from kombu import Connection, BrokerConnection, Exchange, Queue, Producer, ConsumerFor simple queue operations:
from kombu.simple import SimpleQueueFor consumer programs:
from kombu.mixins import ConsumerMixinFor connection and producer pooling:
from kombu import pools
from kombu.pools import connections, producersFor compression:
from kombu import compressionfrom kombu import Connection, Exchange, Queue, Producer, Consumer
# Define message exchange and queue
task_exchange = Exchange('tasks', type='direct')
task_queue = Queue('task_queue', task_exchange, routing_key='task')
# Connect to broker
with Connection('redis://localhost:6379/0') as conn:
# Publish a message
with conn.Producer() as producer:
producer.publish(
{'hello': 'world'},
exchange=task_exchange,
routing_key='task',
declare=[task_queue]
)
# Consume messages
def process_message(body, message):
print('Received message:', body)
message.ack()
with conn.Consumer(task_queue, callbacks=[process_message]) as consumer:
# Process one message
conn.drain_events()Kombu provides a comprehensive messaging abstraction built around these core concepts:
The library abstracts transport-specific details while exposing rich AMQP features when available, making it suitable for both simple messaging scenarios and complex distributed systems. It serves as the foundation for distributed task processing systems like Celery.
Robust connection handling with pooling, retry logic, and failover support for connecting to message brokers across multiple transport backends.
class Connection:
def __init__(self, hostname=None, userid=None, password=None, virtual_host=None, port=None, ssl=None, transport=None, **kwargs): ...
def connect(self): ...
def channel(self): ...
def drain_events(self, **kwargs): ...
def ensure_connection(self, errback=None, max_retries=None, **retry_policy): ...
def parse_url(url: str) -> dict: ...AMQP entity declarations for exchanges, queues, and bindings that define message routing topology and behavior.
class Exchange:
def __init__(self, name='', type='direct', channel=None, durable=True, auto_delete=False, **kwargs): ...
def declare(self, nowait=False, passive=None, channel=None): ...
def publish(self, message, routing_key=None, **kwargs): ...
class Queue:
def __init__(self, name='', exchange=None, routing_key='', channel=None, durable=True, **kwargs): ...
def declare(self, nowait=False, channel=None): ...
def bind_to(self, exchange=None, routing_key=None, **kwargs): ...
def get(self, no_ack=None, accept=None): ...
class binding:
def __init__(self, exchange=None, routing_key='', arguments=None, **kwargs): ...High-level producer and consumer interfaces for publishing and receiving messages with comprehensive error handling and serialization support.
class Producer:
def __init__(self, channel, exchange=None, routing_key='', serializer=None, **kwargs): ...
def publish(self, body, routing_key=None, delivery_mode=None, mandatory=False, **kwargs): ...
def declare(self): ...
class Consumer:
def __init__(self, channel, queues=None, no_ack=None, auto_declare=True, callbacks=None, **kwargs): ...
def consume(self, no_ack=None): ...
def register_callback(self, callback): ...
def add_queue(self, queue): ...
class Message:
def ack(self, multiple=False): ...
def reject(self, requeue=False): ...
def decode(self): ...Queue-like API for simple use cases that provides an easy-to-use interface similar to Python's queue module.
class SimpleQueue:
def __init__(self, channel, name, no_ack=False, **kwargs): ...
def get(self, block=True, timeout=None): ...
def put(self, message, serializer=None, headers=None, **kwargs): ...
def clear(self): ...
def qsize(self): ...
class SimpleBuffer:
def __init__(self, channel, name, no_ack=True, **kwargs): ...Ready-to-use consumer frameworks that provide structured approaches for building consumer applications with connection management and error handling.
class ConsumerMixin:
def get_consumers(self, Consumer, channel): ...
def run(self, _tokens=1, **kwargs): ...
def consume(self, limit=None, timeout=None, safety_interval=1, **kwargs): ...
def on_connection_error(self, exc, interval): ...
class ConsumerProducerMixin(ConsumerMixin):
@property
def producer(self): ...
@property
def producer_connection(self): ...Pluggable serialization system with security controls for encoding and decoding message payloads across different formats.
def dumps(data, serializer=None): ...
def loads(data, content_type, content_encoding='utf-8', accept=None, **kwargs): ...
def register(name, encoder, decoder, content_type, content_encoding='utf-8'): ...
def enable_insecure_serializers(choices=None): ...
def disable_insecure_serializers(allowed=None): ...Comprehensive exception hierarchy for handling messaging errors, connection issues, and serialization problems.
class KombuError(Exception): ...
class OperationalError(KombuError): ...
class SerializationError(KombuError): ...
class NotBoundError(KombuError): ...
class MessageStateError(KombuError): ...
class LimitExceeded(KombuError): ...Resource pooling for connections and producers to optimize performance and manage resources efficiently in high-throughput applications.
class ProducerPool:
def __init__(self, connections, *args, **kwargs): ...
def acquire(self, block=False, timeout=None): ...
def release(self, resource): ...
class PoolGroup:
def __init__(self, limit=None, close_after_fork=True): ...
def create(self, resource, limit): ...
class Connections(PoolGroup): ...
class Producers(PoolGroup): ...
connections: Connections # Global connection pool group
producers: Producers # Global producer pool group
def get_limit() -> int: ...
def set_limit(limit: int, force=False, reset_after=False, ignore_errors=False) -> int: ...
def reset(*args, **kwargs): ...Connection and Producer Pooling
Message payload compression utilities supporting multiple compression algorithms for reducing message size.
def register(encoder, decoder, content_type, aliases=None): ...
def encoders() -> list: ...
def get_encoder(content_type): ...
def get_decoder(content_type): ...
def compress(body, content_type): ...
def decompress(body, content_type): ...Kombu supports multiple message brokers through pluggable transports:
Transport selection is automatic based on connection URL or can be specified explicitly.
from kombu import pools
# Global connection pool
with pools.connections['redis://localhost:6379/0'].acquire() as conn:
# Use connection
pass
# Producer pool
with pools.producers['redis://localhost:6379/0'].acquire() as producer:
producer.publish({'msg': 'data'}, routing_key='key')from kombu.common import eventloop
# Process events with timeout
for _ in eventloop(connection, limit=None, timeout=1.0):
pass # Events processed via consumersfrom kombu.utils.url import parse_url
# Parse broker URL
parsed = parse_url('redis://user:pass@localhost:6379/1')
# Returns: {'transport': 'redis', 'hostname': 'localhost', 'port': 6379, ...}