CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kombu

Messaging library for Python that provides an idiomatic high-level interface for AMQP and other message brokers.

Pending
Overview
Eval results
Files

pools.mddocs/

Connection and Producer Pooling

Resource pooling for connections and producers to optimize performance and manage resources efficiently in high-throughput applications. Connection pools prevent the overhead of establishing connections for each operation, while producer pools maintain ready-to-use producer instances.

Core Imports

from kombu import pools
from kombu.pools import connections, producers, ProducerPool, PoolGroup
from kombu.pools import get_limit, set_limit, reset

Capabilities

Global Pool Instances

Pre-configured global pools for connections and producers that can be used throughout an application.

connections: Connections  # Global connection pool group
producers: Producers     # Global producer pool group

Usage Example:

from kombu import pools

# Use global connection pool
with pools.connections['redis://localhost:6379/0'].acquire() as conn:
    # Use connection
    with conn.channel() as channel:
        # Operate on channel
        pass

# Use global producer pool  
with pools.producers['redis://localhost:6379/0'].acquire() as producer:
    producer.publish({'message': 'data'}, routing_key='task')

ProducerPool Class

Pool of Producer instances that share connections from a connection pool.

class ProducerPool:
    """Pool of kombu.Producer instances."""
    
    def __init__(self, connections, *args, **kwargs):
        """
        Initialize producer pool.
        
        Parameters:
        - connections: Connection pool to use for producers
        - Producer: Producer class to use (default: kombu.Producer)
        - *args, **kwargs: Passed to Resource base class
        """
    
    def acquire(self, block=False, timeout=None):
        """
        Acquire producer from pool.
        
        Parameters:
        - block (bool): Whether to block if no resource available
        - timeout (float): Timeout for blocking acquire
        
        Returns:
        Producer instance from pool
        """
    
    def release(self, resource):
        """
        Release producer back to pool.
        
        Parameters:
        - resource: Producer instance to release
        """
    
    def create_producer(self):
        """Create new producer instance."""
    
    def prepare(self, producer):
        """Prepare producer for use."""
    
    def close_resource(self, resource):
        """Close producer resource."""

PoolGroup Class

Base class for managing collections of resource pools, automatically creating pools for different resources as needed.

class PoolGroup:
    """Collection of resource pools."""
    
    def __init__(self, limit=None, close_after_fork=True):
        """
        Initialize pool group.
        
        Parameters:
        - limit (int): Default limit for created pools
        - close_after_fork (bool): Whether to close pools after fork
        """
    
    def create(self, resource, limit):
        """
        Create pool for resource (must be implemented by subclasses).
        
        Parameters:
        - resource: Resource to create pool for
        - limit (int): Pool size limit
        
        Returns:
        Pool instance for resource
        """
    
    def __getitem__(self, resource):
        """Get or create pool for resource."""
    
    def __missing__(self, resource):
        """Create new pool when resource not found."""

Concrete Pool Group Classes

Specific implementations of PoolGroup for connections and producers.

class Connections(PoolGroup):
    """Collection of connection pools."""
    
    def create(self, connection, limit):
        """Create connection pool for given connection."""

class Producers(PoolGroup):
    """Collection of producer pools."""
    
    def create(self, connection, limit):
        """Create producer pool for given connection."""

Pool Management Functions

Global functions for managing pool limits and state.

def get_limit() -> int:
    """Get current global connection pool limit."""

def set_limit(limit: int, force=False, reset_after=False, ignore_errors=False) -> int:
    """
    Set new global connection pool limit.
    
    Parameters:
    - limit (int): New pool size limit
    - force (bool): Force limit change even if same as current
    - reset_after (bool): Reset pools after setting limit
    - ignore_errors (bool): Ignore errors during pool resizing
    
    Returns:
    The set limit value
    """

def reset(*args, **kwargs):
    """Reset all pools by closing open resources and clearing groups."""

def register_group(group) -> PoolGroup:
    """Register pool group for management (can be used as decorator)."""

Advanced Usage

Custom Producer Pool

from kombu.pools import ProducerPool
from kombu import Connection

# Create custom producer pool
conn_pool = Connection('redis://localhost:6379/0').Pool(limit=10)
producer_pool = ProducerPool(conn_pool, limit=5)

# Use custom pool
with producer_pool.acquire() as producer:
    producer.publish({'custom': 'message'}, routing_key='custom')

Pool Limit Management

from kombu.pools import get_limit, set_limit, reset

# Check current limit
current = get_limit()
print(f"Current pool limit: {current}")

# Set new limit
set_limit(20)

# Reset all pools (useful for testing or cleanup)
reset()

Context Manager Usage

from kombu.pools import connections, producers

# Connection pool context manager
with connections['redis://localhost:6379/0'].acquire() as conn:
    # Connection is automatically released when exiting context
    with conn.channel() as channel:
        # Use channel
        pass

# Producer pool context manager  
with producers['redis://localhost:6379/0'].acquire() as producer:
    # Producer is automatically released when exiting context
    producer.publish({'msg': 'data'}, routing_key='task')

Error Handling

Pool operations can raise various exceptions:

from kombu.pools import producers
from kombu.exceptions import LimitExceeded, OperationalError

try:
    with producers['redis://localhost:6379/0'].acquire(timeout=5.0) as producer:
        producer.publish({'data': 'message'}, routing_key='task')
except LimitExceeded:
    print("Pool limit exceeded, no resources available")
except OperationalError as e:
    print(f"Connection error: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-kombu

docs

compression.md

connection.md

entities.md

exceptions.md

index.md

messaging.md

mixins.md

pools.md

serialization.md

simple.md

tile.json