or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

compression.mdconnection.mdentities.mdexceptions.mdindex.mdmessaging.mdmixins.mdpools.mdserialization.mdsimple.md
tile.json

tessl/pypi-kombu

Messaging library for Python that provides an idiomatic high-level interface for AMQP and other message brokers.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kombu@5.5.x

To install, run

npx @tessl/cli install tessl/pypi-kombu@5.5.0

index.mddocs/

Kombu

A comprehensive messaging library for Python that provides an idiomatic high-level interface for the Advanced Message Queuing Protocol (AMQP) and other message brokers. Kombu enables developers to build robust messaging systems with support for multiple message servers including RabbitMQ, Redis, MongoDB, Amazon SQS, and others.

Package Information

  • Package Name: kombu
  • Language: Python
  • Installation: pip install kombu
  • Version: 5.5.4
  • License: BSD-3-Clause

Core Imports

import kombu

Common patterns for working with messaging:

from kombu import Connection, BrokerConnection, Exchange, Queue, Producer, Consumer

For simple queue operations:

from kombu.simple import SimpleQueue

For consumer programs:

from kombu.mixins import ConsumerMixin

For connection and producer pooling:

from kombu import pools
from kombu.pools import connections, producers

For compression:

from kombu import compression

Basic Usage

from kombu import Connection, Exchange, Queue, Producer, Consumer

# Define message exchange and queue
task_exchange = Exchange('tasks', type='direct')
task_queue = Queue('task_queue', task_exchange, routing_key='task')

# Connect to broker
with Connection('redis://localhost:6379/0') as conn:
    # Publish a message
    with conn.Producer() as producer:
        producer.publish(
            {'hello': 'world'},
            exchange=task_exchange,
            routing_key='task',
            declare=[task_queue]
        )
    
    # Consume messages
    def process_message(body, message):
        print('Received message:', body)
        message.ack()
    
    with conn.Consumer(task_queue, callbacks=[process_message]) as consumer:
        # Process one message
        conn.drain_events()

Architecture

Kombu provides a comprehensive messaging abstraction built around these core concepts:

  • Connection: Manages broker connections with automatic reconnection and pooling
  • Producer/Consumer: High-level interfaces for publishing and consuming messages
  • Exchange/Queue: AMQP entity declarations for message routing
  • Message: Unified message handling with acknowledgment and error management
  • Transport: Pluggable backend support for different message brokers

The library abstracts transport-specific details while exposing rich AMQP features when available, making it suitable for both simple messaging scenarios and complex distributed systems. It serves as the foundation for distributed task processing systems like Celery.

Capabilities

Connection Management

Robust connection handling with pooling, retry logic, and failover support for connecting to message brokers across multiple transport backends.

class Connection:
    def __init__(self, hostname=None, userid=None, password=None, virtual_host=None, port=None, ssl=None, transport=None, **kwargs): ...
    def connect(self): ...
    def channel(self): ...
    def drain_events(self, **kwargs): ...
    def ensure_connection(self, errback=None, max_retries=None, **retry_policy): ...

def parse_url(url: str) -> dict: ...

Connection Management

Message Entities

AMQP entity declarations for exchanges, queues, and bindings that define message routing topology and behavior.

class Exchange:
    def __init__(self, name='', type='direct', channel=None, durable=True, auto_delete=False, **kwargs): ...
    def declare(self, nowait=False, passive=None, channel=None): ...
    def publish(self, message, routing_key=None, **kwargs): ...

class Queue:
    def __init__(self, name='', exchange=None, routing_key='', channel=None, durable=True, **kwargs): ...
    def declare(self, nowait=False, channel=None): ...
    def bind_to(self, exchange=None, routing_key=None, **kwargs): ...
    def get(self, no_ack=None, accept=None): ...

class binding:
    def __init__(self, exchange=None, routing_key='', arguments=None, **kwargs): ...

Message Entities

Messaging

High-level producer and consumer interfaces for publishing and receiving messages with comprehensive error handling and serialization support.

class Producer:
    def __init__(self, channel, exchange=None, routing_key='', serializer=None, **kwargs): ...
    def publish(self, body, routing_key=None, delivery_mode=None, mandatory=False, **kwargs): ...
    def declare(self): ...

class Consumer:
    def __init__(self, channel, queues=None, no_ack=None, auto_declare=True, callbacks=None, **kwargs): ...
    def consume(self, no_ack=None): ...
    def register_callback(self, callback): ...
    def add_queue(self, queue): ...

class Message:
    def ack(self, multiple=False): ...
    def reject(self, requeue=False): ...
    def decode(self): ...

Messaging

Simple Interface

Queue-like API for simple use cases that provides an easy-to-use interface similar to Python's queue module.

class SimpleQueue:
    def __init__(self, channel, name, no_ack=False, **kwargs): ...
    def get(self, block=True, timeout=None): ...
    def put(self, message, serializer=None, headers=None, **kwargs): ...
    def clear(self): ...
    def qsize(self): ...

class SimpleBuffer:
    def __init__(self, channel, name, no_ack=True, **kwargs): ...

Simple Interface

Consumer Mixins

Ready-to-use consumer frameworks that provide structured approaches for building consumer applications with connection management and error handling.

class ConsumerMixin:
    def get_consumers(self, Consumer, channel): ...
    def run(self, _tokens=1, **kwargs): ...
    def consume(self, limit=None, timeout=None, safety_interval=1, **kwargs): ...
    def on_connection_error(self, exc, interval): ...

class ConsumerProducerMixin(ConsumerMixin):
    @property
    def producer(self): ...
    @property
    def producer_connection(self): ...

Consumer Mixins

Serialization

Pluggable serialization system with security controls for encoding and decoding message payloads across different formats.

def dumps(data, serializer=None): ...
def loads(data, content_type, content_encoding='utf-8', accept=None, **kwargs): ...
def register(name, encoder, decoder, content_type, content_encoding='utf-8'): ...
def enable_insecure_serializers(choices=None): ...
def disable_insecure_serializers(allowed=None): ...

Serialization

Exception Handling

Comprehensive exception hierarchy for handling messaging errors, connection issues, and serialization problems.

class KombuError(Exception): ...
class OperationalError(KombuError): ...
class SerializationError(KombuError): ...
class NotBoundError(KombuError): ...
class MessageStateError(KombuError): ...
class LimitExceeded(KombuError): ...

Exception Handling

Connection and Producer Pooling

Resource pooling for connections and producers to optimize performance and manage resources efficiently in high-throughput applications.

class ProducerPool:
    def __init__(self, connections, *args, **kwargs): ...
    def acquire(self, block=False, timeout=None): ...
    def release(self, resource): ...

class PoolGroup:
    def __init__(self, limit=None, close_after_fork=True): ...
    def create(self, resource, limit): ...

class Connections(PoolGroup): ...
class Producers(PoolGroup): ...

connections: Connections  # Global connection pool group
producers: Producers     # Global producer pool group

def get_limit() -> int: ...
def set_limit(limit: int, force=False, reset_after=False, ignore_errors=False) -> int: ...
def reset(*args, **kwargs): ...

Connection and Producer Pooling

Compression

Message payload compression utilities supporting multiple compression algorithms for reducing message size.

def register(encoder, decoder, content_type, aliases=None): ...
def encoders() -> list: ...
def get_encoder(content_type): ...
def get_decoder(content_type): ...
def compress(body, content_type): ...
def decompress(body, content_type): ...

Compression

Transport Support

Kombu supports multiple message brokers through pluggable transports:

  • AMQP: RabbitMQ, Apache Qpid (pyamqp, librabbitmq)
  • Redis: Redis server with pub/sub support
  • Memory: In-memory transport for testing
  • Amazon SQS: Amazon Simple Queue Service
  • MongoDB: MongoDB as message broker
  • SQLAlchemy: Database-backed transport
  • Filesystem: File-based transport for development
  • Plus many others: Google Cloud Pub/Sub, Azure Service Bus, Kafka, etc.

Transport selection is automatic based on connection URL or can be specified explicitly.

Common Patterns

Connection Pooling

from kombu import pools

# Global connection pool
with pools.connections['redis://localhost:6379/0'].acquire() as conn:
    # Use connection
    pass

# Producer pool
with pools.producers['redis://localhost:6379/0'].acquire() as producer:
    producer.publish({'msg': 'data'}, routing_key='key')

Event Loop Processing

from kombu.common import eventloop

# Process events with timeout
for _ in eventloop(connection, limit=None, timeout=1.0):
    pass  # Events processed via consumers

URL-based Configuration

from kombu.utils.url import parse_url

# Parse broker URL
parsed = parse_url('redis://user:pass@localhost:6379/1')
# Returns: {'transport': 'redis', 'hostname': 'localhost', 'port': 6379, ...}