Messaging library for Python that provides an idiomatic high-level interface for AMQP and other message brokers.
—
Resource pooling for connections and producers to optimize performance and manage resources efficiently in high-throughput applications. Connection pools prevent the overhead of establishing connections for each operation, while producer pools maintain ready-to-use producer instances.
from kombu import pools
from kombu.pools import connections, producers, ProducerPool, PoolGroup
from kombu.pools import get_limit, set_limit, resetPre-configured global pools for connections and producers that can be used throughout an application.
connections: Connections # Global connection pool group
producers: Producers # Global producer pool groupUsage Example:
from kombu import pools
# Use global connection pool
with pools.connections['redis://localhost:6379/0'].acquire() as conn:
# Use connection
with conn.channel() as channel:
# Operate on channel
pass
# Use global producer pool
with pools.producers['redis://localhost:6379/0'].acquire() as producer:
producer.publish({'message': 'data'}, routing_key='task')Pool of Producer instances that share connections from a connection pool.
class ProducerPool:
"""Pool of kombu.Producer instances."""
def __init__(self, connections, *args, **kwargs):
"""
Initialize producer pool.
Parameters:
- connections: Connection pool to use for producers
- Producer: Producer class to use (default: kombu.Producer)
- *args, **kwargs: Passed to Resource base class
"""
def acquire(self, block=False, timeout=None):
"""
Acquire producer from pool.
Parameters:
- block (bool): Whether to block if no resource available
- timeout (float): Timeout for blocking acquire
Returns:
Producer instance from pool
"""
def release(self, resource):
"""
Release producer back to pool.
Parameters:
- resource: Producer instance to release
"""
def create_producer(self):
"""Create new producer instance."""
def prepare(self, producer):
"""Prepare producer for use."""
def close_resource(self, resource):
"""Close producer resource."""Base class for managing collections of resource pools, automatically creating pools for different resources as needed.
class PoolGroup:
"""Collection of resource pools."""
def __init__(self, limit=None, close_after_fork=True):
"""
Initialize pool group.
Parameters:
- limit (int): Default limit for created pools
- close_after_fork (bool): Whether to close pools after fork
"""
def create(self, resource, limit):
"""
Create pool for resource (must be implemented by subclasses).
Parameters:
- resource: Resource to create pool for
- limit (int): Pool size limit
Returns:
Pool instance for resource
"""
def __getitem__(self, resource):
"""Get or create pool for resource."""
def __missing__(self, resource):
"""Create new pool when resource not found."""Specific implementations of PoolGroup for connections and producers.
class Connections(PoolGroup):
"""Collection of connection pools."""
def create(self, connection, limit):
"""Create connection pool for given connection."""
class Producers(PoolGroup):
"""Collection of producer pools."""
def create(self, connection, limit):
"""Create producer pool for given connection."""Global functions for managing pool limits and state.
def get_limit() -> int:
"""Get current global connection pool limit."""
def set_limit(limit: int, force=False, reset_after=False, ignore_errors=False) -> int:
"""
Set new global connection pool limit.
Parameters:
- limit (int): New pool size limit
- force (bool): Force limit change even if same as current
- reset_after (bool): Reset pools after setting limit
- ignore_errors (bool): Ignore errors during pool resizing
Returns:
The set limit value
"""
def reset(*args, **kwargs):
"""Reset all pools by closing open resources and clearing groups."""
def register_group(group) -> PoolGroup:
"""Register pool group for management (can be used as decorator)."""from kombu.pools import ProducerPool
from kombu import Connection
# Create custom producer pool
conn_pool = Connection('redis://localhost:6379/0').Pool(limit=10)
producer_pool = ProducerPool(conn_pool, limit=5)
# Use custom pool
with producer_pool.acquire() as producer:
producer.publish({'custom': 'message'}, routing_key='custom')from kombu.pools import get_limit, set_limit, reset
# Check current limit
current = get_limit()
print(f"Current pool limit: {current}")
# Set new limit
set_limit(20)
# Reset all pools (useful for testing or cleanup)
reset()from kombu.pools import connections, producers
# Connection pool context manager
with connections['redis://localhost:6379/0'].acquire() as conn:
# Connection is automatically released when exiting context
with conn.channel() as channel:
# Use channel
pass
# Producer pool context manager
with producers['redis://localhost:6379/0'].acquire() as producer:
# Producer is automatically released when exiting context
producer.publish({'msg': 'data'}, routing_key='task')Pool operations can raise various exceptions:
from kombu.pools import producers
from kombu.exceptions import LimitExceeded, OperationalError
try:
with producers['redis://localhost:6379/0'].acquire(timeout=5.0) as producer:
producer.publish({'data': 'message'}, routing_key='task')
except LimitExceeded:
print("Pool limit exceeded, no resources available")
except OperationalError as e:
print(f"Connection error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-kombu