CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kombu

Messaging library for Python that provides an idiomatic high-level interface for AMQP and other message brokers.

Pending
Overview
Eval results
Files

connection.mddocs/

Connection Management

Robust connection handling with pooling, retry logic, and failover support for connecting to message brokers across multiple transport backends. Kombu's connection management provides automatic reconnection, resource pooling, and comprehensive error handling for reliable messaging operations.

Capabilities

Connection Class

Primary connection class for establishing and managing broker connections with support for automatic reconnection, channel management, and transport abstraction.

class Connection:
    def __init__(self, hostname='localhost', userid=None, password=None, virtual_host=None, port=None, insist=False, ssl=False, transport=None, connect_timeout=5, transport_options=None, login_method=None, uri_prefix=None, heartbeat=0, failover_strategy='round-robin', alternates=None, **kwargs):
        """
        Create connection to message broker.

        Parameters:
        - hostname (str): Broker hostname (default 'localhost')
        - userid (str): Username for authentication
        - password (str): Password for authentication  
        - virtual_host (str): Virtual host (AMQP concept)
        - port (int): Broker port number
        - insist (bool): Insist on connection (deprecated)
        - ssl (bool|dict): SSL configuration
        - transport (str): Transport backend name
        - connect_timeout (float): Connection timeout in seconds
        - transport_options (dict): Transport-specific options
        - login_method (str): SASL login method
        - uri_prefix (str): URI prefix for transport
        - heartbeat (int): Heartbeat interval in seconds (0=disabled)
        - failover_strategy (str): Strategy for multiple hosts
        - alternates (list): Alternative broker URLs
        - **kwargs: Additional connection parameters
        """

    def connect(self):
        """
        Establish connection to the broker immediately.
        
        Returns:
        Connection instance for chaining
        """

    def channel(self):
        """
        Create and return a new channel.
        
        Returns:
        Channel instance
        """

    def drain_events(self, timeout=None):
        """
        Wait for a single event from the server.
        
        Parameters:
        - timeout (float): Timeout in seconds
        
        Returns:
        Event data or raises socket.timeout
        """

    def ensure_connection(self, errback=None, max_retries=None, interval_start=2, interval_step=2, interval_max=30):
        """
        Ensure connection is established with retry logic.
        
        Parameters:
        - errback (callable): Error callback function
        - max_retries (int): Maximum retry attempts
        - interval_start (float): Initial retry interval
        - interval_step (float): Interval increase per retry
        - interval_max (float): Maximum retry interval
        
        Returns:
        Context manager for ensured connection
        """

    def ensure(self, obj, fun, errback=None, max_retries=None, **retry_policy):
        """
        Ensure operation completes despite connection errors.
        
        Parameters:
        - obj: Object to call method on
        - fun (str): Method name to call
        - errback (callable): Error callback
        - max_retries (int): Maximum retries
        - retry_policy: Additional retry parameters
        
        Returns:
        Result of the operation
        """

    def heartbeat_check(self, rate=2):
        """
        Check heartbeats at specified rate.
        
        Parameters:
        - rate (int): Check frequency in seconds
        """

    def close(self):
        """Close the connection and cleanup resources."""

    def release(self):
        """Release connection back to pool."""

    # Connection factory methods
    def Pool(self, limit=None, preload=None):
        """
        Create connection pool.
        
        Parameters:
        - limit (int): Maximum pool size
        - preload (int): Number of connections to preload
        
        Returns:
        ConnectionPool instance
        """

    def ChannelPool(self, limit=None, preload=None):
        """
        Create channel pool.
        
        Parameters:
        - limit (int): Maximum pool size  
        - preload (int): Number of channels to preload
        
        Returns:
        ChannelPool instance
        """

    def Producer(self, channel=None, *args, **kwargs):
        """
        Create Producer instance.
        
        Parameters:
        - channel: Channel to use (uses default_channel if None)
        
        Returns:
        Producer instance
        """

    def Consumer(self, queues, channel=None, *args, **kwargs):
        """
        Create Consumer instance.
        
        Parameters:
        - queues: Queues to consume from
        - channel: Channel to use (uses default_channel if None)
        
        Returns:
        Consumer instance
        """

    def SimpleQueue(self, name, no_ack=None, queue_opts=None, exchange_opts=None, channel=None, **kwargs):
        """
        Create SimpleQueue instance.
        
        Parameters:
        - name (str): Queue name
        - no_ack (bool): Disable acknowledgments
        - queue_opts (dict): Queue options
        - exchange_opts (dict): Exchange options
        - channel: Channel to use
        
        Returns:
        SimpleQueue instance
        """

    def SimpleBuffer(self, name, no_ack=True, queue_opts=None, exchange_opts=None, channel=None, **kwargs):
        """
        Create SimpleBuffer instance (ephemeral queue).
        
        Parameters:
        - name (str): Queue name
        - no_ack (bool): Disable acknowledgments (default True)
        - queue_opts (dict): Queue options
        - exchange_opts (dict): Exchange options
        - channel: Channel to use
        
        Returns:
        SimpleBuffer instance
        """

    # Properties
    @property
    def connected(self):
        """bool: True if connection is established"""

    @property
    def connection(self):
        """Transport-specific connection object"""

    @property
    def default_channel(self):
        """Default channel (created on first access)"""

    @property
    def transport(self):
        """Transport instance"""

    @property
    def recoverable_connection_errors(self):
        """Tuple of recoverable connection error types"""

    @property  
    def connection_errors(self):
        """Tuple of connection error types"""

BrokerConnection Class

Legacy alias for Connection class provided for backward compatibility.

BrokerConnection = Connection

Note: BrokerConnection is an alias for Connection and provides identical functionality. New code should use Connection directly.

Connection Pooling

Global connection and producer pools for efficient resource management across applications.

# Pool management functions
def get_limit():
    """
    Get current connection pool limit.
    
    Returns:
    int: Current pool limit
    """

def set_limit(limit, force=False, reset_after=False, ignore_errors=False):
    """
    Set new connection pool limit.
    
    Parameters:
    - limit (int): New pool limit
    - force (bool): Force limit change
    - reset_after (bool): Reset pools after change
    - ignore_errors (bool): Ignore errors during reset
    """

def reset(*args, **kwargs):
    """
    Reset all pools by closing resources.
    
    Parameters:
    - *args, **kwargs: Arguments passed to pool reset
    """

# Pool classes
class ProducerPool:
    """Pool of Producer instances"""
    
    def acquire(self, block=False, timeout=None):
        """
        Acquire producer from pool.
        
        Parameters:
        - block (bool): Block if pool empty
        - timeout (float): Acquisition timeout
        
        Returns:
        Producer instance
        """

    def release(self, resource):
        """
        Release producer back to pool.
        
        Parameters:
        - resource: Producer to release
        """

class ConnectionPool:
    """Pool of Connection instances"""
    
    def acquire(self, block=False, timeout=None):
        """
        Acquire connection from pool.
        
        Parameters:
        - block (bool): Block if pool empty
        - timeout (float): Acquisition timeout
        
        Returns:
        Connection instance
        """

    def release(self, resource):
        """
        Release connection back to pool.
        
        Parameters:
        - resource: Connection to release
        """

class ChannelPool:
    """Pool of Channel instances bound to connection"""
    
    def acquire(self, block=False, timeout=None):
        """
        Acquire channel from pool.
        
        Parameters:
        - block (bool): Block if pool empty
        - timeout (float): Acquisition timeout
        
        Returns:
        Channel instance
        """

    def release(self, resource):
        """
        Release channel back to pool.
        
        Parameters:
        - resource: Channel to release
        """

# Global pool instances
connections: Connections  # Global connection pool group
producers: Producers      # Global producer pool group

URL Parsing

Utility for parsing broker URLs into connection parameters.

def parse_url(url):
    """
    Parse URL into mapping of connection components.
    
    Parameters:
    - url (str): Broker URL to parse
    
    Returns:
    dict: Parsed URL components with keys:
        - transport (str): Transport name
        - hostname (str): Broker hostname  
        - port (int): Port number
        - userid (str): Username
        - password (str): Password
        - virtual_host (str): Virtual host
    """

Usage Examples

Basic Connection

from kombu import Connection

# Connect with URL
conn = Connection('redis://localhost:6379/0')

# Connect with parameters
conn = Connection(
    hostname='localhost',
    userid='guest', 
    password='guest',
    virtual_host='/',
    transport='pyamqp'
)

with conn:
    # Use connection
    channel = conn.channel()
    # ... perform operations

Connection with Retry Logic

from kombu import Connection

conn = Connection('redis://localhost:6379/0')

def on_connection_error(exc, interval):
    print(f"Connection error: {exc}, retrying in {interval}s")

# Ensure connection with custom retry policy
with conn.ensure_connection(
    errback=on_connection_error,
    max_retries=5,
    interval_start=1,
    interval_step=2,
    interval_max=10
):
    # Connection guaranteed to be established
    channel = conn.channel()

Connection Pooling

from kombu import pools

# Use global connection pool
with pools.connections['redis://localhost:6379/0'].acquire() as conn:
    producer = conn.Producer()
    producer.publish({'msg': 'hello'}, routing_key='test')

# Use global producer pool  
with pools.producers['redis://localhost:6379/0'].acquire() as producer:
    producer.publish({'msg': 'hello'}, routing_key='test')

# Set pool limits
pools.set_limit(100)  # Allow up to 100 connections per pool

URL Parsing

from kombu.utils.url import parse_url

# Parse Redis URL
parsed = parse_url('redis://user:pass@localhost:6379/1')
print(parsed)
# {'transport': 'redis', 'hostname': 'localhost', 'port': 6379, 
#  'userid': 'user', 'password': 'pass', 'virtual_host': '1'}

# Parse AMQP URL  
parsed = parse_url('amqp://guest:guest@localhost:5672//')
print(parsed)
# {'transport': 'pyamqp', 'hostname': 'localhost', 'port': 5672,
#  'userid': 'guest', 'password': 'guest', 'virtual_host': '/'}

Event Processing

from kombu import Connection, Queue, Consumer
from kombu.common import eventloop

def process_message(body, message):
    print(f"Processing: {body}")
    message.ack()

conn = Connection('redis://localhost:6379/0') 
queue = Queue('test_queue')

with conn:
    consumer = conn.Consumer(queue, callbacks=[process_message])
    consumer.consume()
    
    # Process events with timeout
    for _ in eventloop(conn, limit=10, timeout=5.0):
        pass  # Events processed via callbacks

Install with Tessl CLI

npx tessl i tessl/pypi-kombu

docs

compression.md

connection.md

entities.md

exceptions.md

index.md

messaging.md

mixins.md

pools.md

serialization.md

simple.md

tile.json