Messaging library for Python that provides an idiomatic high-level interface for AMQP and other message brokers.
—
Robust connection handling with pooling, retry logic, and failover support for connecting to message brokers across multiple transport backends. Kombu's connection management provides automatic reconnection, resource pooling, and comprehensive error handling for reliable messaging operations.
Primary connection class for establishing and managing broker connections with support for automatic reconnection, channel management, and transport abstraction.
class Connection:
def __init__(self, hostname='localhost', userid=None, password=None, virtual_host=None, port=None, insist=False, ssl=False, transport=None, connect_timeout=5, transport_options=None, login_method=None, uri_prefix=None, heartbeat=0, failover_strategy='round-robin', alternates=None, **kwargs):
"""
Create connection to message broker.
Parameters:
- hostname (str): Broker hostname (default 'localhost')
- userid (str): Username for authentication
- password (str): Password for authentication
- virtual_host (str): Virtual host (AMQP concept)
- port (int): Broker port number
- insist (bool): Insist on connection (deprecated)
- ssl (bool|dict): SSL configuration
- transport (str): Transport backend name
- connect_timeout (float): Connection timeout in seconds
- transport_options (dict): Transport-specific options
- login_method (str): SASL login method
- uri_prefix (str): URI prefix for transport
- heartbeat (int): Heartbeat interval in seconds (0=disabled)
- failover_strategy (str): Strategy for multiple hosts
- alternates (list): Alternative broker URLs
- **kwargs: Additional connection parameters
"""
def connect(self):
"""
Establish connection to the broker immediately.
Returns:
Connection instance for chaining
"""
def channel(self):
"""
Create and return a new channel.
Returns:
Channel instance
"""
def drain_events(self, timeout=None):
"""
Wait for a single event from the server.
Parameters:
- timeout (float): Timeout in seconds
Returns:
Event data or raises socket.timeout
"""
def ensure_connection(self, errback=None, max_retries=None, interval_start=2, interval_step=2, interval_max=30):
"""
Ensure connection is established with retry logic.
Parameters:
- errback (callable): Error callback function
- max_retries (int): Maximum retry attempts
- interval_start (float): Initial retry interval
- interval_step (float): Interval increase per retry
- interval_max (float): Maximum retry interval
Returns:
Context manager for ensured connection
"""
def ensure(self, obj, fun, errback=None, max_retries=None, **retry_policy):
"""
Ensure operation completes despite connection errors.
Parameters:
- obj: Object to call method on
- fun (str): Method name to call
- errback (callable): Error callback
- max_retries (int): Maximum retries
- retry_policy: Additional retry parameters
Returns:
Result of the operation
"""
def heartbeat_check(self, rate=2):
"""
Check heartbeats at specified rate.
Parameters:
- rate (int): Check frequency in seconds
"""
def close(self):
"""Close the connection and cleanup resources."""
def release(self):
"""Release connection back to pool."""
# Connection factory methods
def Pool(self, limit=None, preload=None):
"""
Create connection pool.
Parameters:
- limit (int): Maximum pool size
- preload (int): Number of connections to preload
Returns:
ConnectionPool instance
"""
def ChannelPool(self, limit=None, preload=None):
"""
Create channel pool.
Parameters:
- limit (int): Maximum pool size
- preload (int): Number of channels to preload
Returns:
ChannelPool instance
"""
def Producer(self, channel=None, *args, **kwargs):
"""
Create Producer instance.
Parameters:
- channel: Channel to use (uses default_channel if None)
Returns:
Producer instance
"""
def Consumer(self, queues, channel=None, *args, **kwargs):
"""
Create Consumer instance.
Parameters:
- queues: Queues to consume from
- channel: Channel to use (uses default_channel if None)
Returns:
Consumer instance
"""
def SimpleQueue(self, name, no_ack=None, queue_opts=None, exchange_opts=None, channel=None, **kwargs):
"""
Create SimpleQueue instance.
Parameters:
- name (str): Queue name
- no_ack (bool): Disable acknowledgments
- queue_opts (dict): Queue options
- exchange_opts (dict): Exchange options
- channel: Channel to use
Returns:
SimpleQueue instance
"""
def SimpleBuffer(self, name, no_ack=True, queue_opts=None, exchange_opts=None, channel=None, **kwargs):
"""
Create SimpleBuffer instance (ephemeral queue).
Parameters:
- name (str): Queue name
- no_ack (bool): Disable acknowledgments (default True)
- queue_opts (dict): Queue options
- exchange_opts (dict): Exchange options
- channel: Channel to use
Returns:
SimpleBuffer instance
"""
# Properties
@property
def connected(self):
"""bool: True if connection is established"""
@property
def connection(self):
"""Transport-specific connection object"""
@property
def default_channel(self):
"""Default channel (created on first access)"""
@property
def transport(self):
"""Transport instance"""
@property
def recoverable_connection_errors(self):
"""Tuple of recoverable connection error types"""
@property
def connection_errors(self):
"""Tuple of connection error types"""Legacy alias for Connection class provided for backward compatibility.
BrokerConnection = ConnectionNote: BrokerConnection is an alias for Connection and provides identical functionality. New code should use Connection directly.
Global connection and producer pools for efficient resource management across applications.
# Pool management functions
def get_limit():
"""
Get current connection pool limit.
Returns:
int: Current pool limit
"""
def set_limit(limit, force=False, reset_after=False, ignore_errors=False):
"""
Set new connection pool limit.
Parameters:
- limit (int): New pool limit
- force (bool): Force limit change
- reset_after (bool): Reset pools after change
- ignore_errors (bool): Ignore errors during reset
"""
def reset(*args, **kwargs):
"""
Reset all pools by closing resources.
Parameters:
- *args, **kwargs: Arguments passed to pool reset
"""
# Pool classes
class ProducerPool:
"""Pool of Producer instances"""
def acquire(self, block=False, timeout=None):
"""
Acquire producer from pool.
Parameters:
- block (bool): Block if pool empty
- timeout (float): Acquisition timeout
Returns:
Producer instance
"""
def release(self, resource):
"""
Release producer back to pool.
Parameters:
- resource: Producer to release
"""
class ConnectionPool:
"""Pool of Connection instances"""
def acquire(self, block=False, timeout=None):
"""
Acquire connection from pool.
Parameters:
- block (bool): Block if pool empty
- timeout (float): Acquisition timeout
Returns:
Connection instance
"""
def release(self, resource):
"""
Release connection back to pool.
Parameters:
- resource: Connection to release
"""
class ChannelPool:
"""Pool of Channel instances bound to connection"""
def acquire(self, block=False, timeout=None):
"""
Acquire channel from pool.
Parameters:
- block (bool): Block if pool empty
- timeout (float): Acquisition timeout
Returns:
Channel instance
"""
def release(self, resource):
"""
Release channel back to pool.
Parameters:
- resource: Channel to release
"""
# Global pool instances
connections: Connections # Global connection pool group
producers: Producers # Global producer pool groupUtility for parsing broker URLs into connection parameters.
def parse_url(url):
"""
Parse URL into mapping of connection components.
Parameters:
- url (str): Broker URL to parse
Returns:
dict: Parsed URL components with keys:
- transport (str): Transport name
- hostname (str): Broker hostname
- port (int): Port number
- userid (str): Username
- password (str): Password
- virtual_host (str): Virtual host
"""from kombu import Connection
# Connect with URL
conn = Connection('redis://localhost:6379/0')
# Connect with parameters
conn = Connection(
hostname='localhost',
userid='guest',
password='guest',
virtual_host='/',
transport='pyamqp'
)
with conn:
# Use connection
channel = conn.channel()
# ... perform operationsfrom kombu import Connection
conn = Connection('redis://localhost:6379/0')
def on_connection_error(exc, interval):
print(f"Connection error: {exc}, retrying in {interval}s")
# Ensure connection with custom retry policy
with conn.ensure_connection(
errback=on_connection_error,
max_retries=5,
interval_start=1,
interval_step=2,
interval_max=10
):
# Connection guaranteed to be established
channel = conn.channel()from kombu import pools
# Use global connection pool
with pools.connections['redis://localhost:6379/0'].acquire() as conn:
producer = conn.Producer()
producer.publish({'msg': 'hello'}, routing_key='test')
# Use global producer pool
with pools.producers['redis://localhost:6379/0'].acquire() as producer:
producer.publish({'msg': 'hello'}, routing_key='test')
# Set pool limits
pools.set_limit(100) # Allow up to 100 connections per poolfrom kombu.utils.url import parse_url
# Parse Redis URL
parsed = parse_url('redis://user:pass@localhost:6379/1')
print(parsed)
# {'transport': 'redis', 'hostname': 'localhost', 'port': 6379,
# 'userid': 'user', 'password': 'pass', 'virtual_host': '1'}
# Parse AMQP URL
parsed = parse_url('amqp://guest:guest@localhost:5672//')
print(parsed)
# {'transport': 'pyamqp', 'hostname': 'localhost', 'port': 5672,
# 'userid': 'guest', 'password': 'guest', 'virtual_host': '/'}from kombu import Connection, Queue, Consumer
from kombu.common import eventloop
def process_message(body, message):
print(f"Processing: {body}")
message.ack()
conn = Connection('redis://localhost:6379/0')
queue = Queue('test_queue')
with conn:
consumer = conn.Consumer(queue, callbacks=[process_message])
consumer.consume()
# Process events with timeout
for _ in eventloop(conn, limit=10, timeout=5.0):
pass # Events processed via callbacksInstall with Tessl CLI
npx tessl i tessl/pypi-kombu