CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-asyncpg

An asyncio PostgreSQL driver for high-performance database connectivity with Python async/await syntax

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

connection-pooling.mddocs/

Connection Pooling

Advanced connection pooling with configurable pool sizes, automatic connection lifecycle management, load balancing, and transparent query execution through pooled connections.

Capabilities

Pool Creation

Create connection pools with comprehensive configuration options for optimal resource utilization and performance.

def create_pool(
    dsn: str = None,
    *,
    min_size: int = 10,
    max_size: int = 10,
    max_queries: int = 50000,
    max_inactive_connection_lifetime: float = 300.0,
    connect: callable = None,
    setup: callable = None,
    init: callable = None,
    reset: callable = None,
    loop = None,
    connection_class: typing.Type[Connection] = Connection,
    record_class: typing.Type[Record] = Record,
    **connect_kwargs
) -> Pool:
    """
    Create a connection pool.
    
    Parameters:
    dsn: PostgreSQL connection string
    min_size: Minimum number of connections in the pool
    max_size: Maximum number of connections in the pool
    max_queries: Maximum queries per connection before replacement
    max_inactive_connection_lifetime: Maximum idle time before connection closure
    connect: Custom connection factory function
    setup: Function called on each new connection
    init: Function called on each acquired connection
    reset: Function called when connection is released
    loop: Event loop to use
    connection_class: Custom connection class
    record_class: Custom record class
    **connect_kwargs: Additional connection parameters
    
    Returns:
    Pool instance
    """

Example Usage

# Basic pool
pool = asyncpg.create_pool('postgresql://user:pass@localhost/mydb')

# Advanced pool configuration
pool = asyncpg.create_pool(
    'postgresql://user:pass@localhost/mydb',
    min_size=5,
    max_size=20,
    max_queries=10000,
    max_inactive_connection_lifetime=600.0,
    command_timeout=60.0
)

# Pool with custom setup
async def setup_connection(conn):
    await conn.set_type_codec('json', encoder=json.dumps, decoder=json.loads)
    await conn.execute("SET timezone = 'UTC'")

pool = asyncpg.create_pool(
    dsn,
    setup=setup_connection,
    min_size=2,
    max_size=10
)

await pool  # Initialize the pool

Connection Acquisition

Acquire and release connections from the pool with automatic lifecycle management.

def acquire(self, *, timeout: float = None) -> PoolAcquireContext:
    """
    Acquire a database connection from the pool.
    
    Parameters:
    timeout: Maximum time to wait for a connection
    
    Returns:
    Connection proxy context manager
    """

async def release(self, connection, *, timeout: float = None) -> None:
    """
    Release a database connection back to the pool.
    
    Parameters:
    connection: Connection to release
    timeout: Maximum time to wait for release
    """

Example Usage

# Context manager (recommended)
async with pool.acquire() as conn:
    result = await conn.fetch("SELECT * FROM users")
    # Connection automatically released

# Manual acquisition/release
conn = await pool.acquire()
try:
    result = await conn.fetch("SELECT * FROM users")
finally:
    await pool.release(conn)

# With timeout
try:
    async with pool.acquire(timeout=5.0) as conn:
        result = await conn.execute("LONG RUNNING QUERY")
except asyncio.TimeoutError:
    print("Could not acquire connection within timeout")

Pool Query Methods

Execute queries directly through the pool without explicit connection management.

async def execute(self, query: str, *args, timeout: float = None) -> str:
    """Execute SQL command using a pool connection."""

async def executemany(self, command: str, args, *, timeout: float = None) -> None:
    """Execute SQL command for multiple argument sets using a pool connection."""

async def fetch(self, query: str, *args, timeout: float = None, record_class = None) -> list[Record]:
    """Fetch all results using a pool connection."""

async def fetchval(self, query: str, *args, column: int = 0, timeout: float = None):
    """Fetch single value using a pool connection."""

async def fetchrow(self, query: str, *args, timeout: float = None, record_class = None) -> Record:
    """Fetch first row using a pool connection."""

async def fetchmany(self, query: str, args, *, timeout: float = None, record_class = None) -> list[list[Record]]:
    """Execute query for multiple argument sets using a pool connection."""

Example Usage

# Direct pool queries (connection handled automatically)
users = await pool.fetch("SELECT * FROM users WHERE active = $1", True)

count = await pool.fetchval("SELECT COUNT(*) FROM orders")

await pool.execute(
    "INSERT INTO logs(message, timestamp) VALUES($1, $2)",
    "User logged in", datetime.now()
)

# Batch operations
orders = [(100, 'pending'), (200, 'shipped'), (300, 'delivered')]
await pool.executemany(
    "INSERT INTO orders(amount, status) VALUES($1, $2)",
    orders
)

Pool COPY Operations

High-performance bulk operations using the pool's COPY functionality.

async def copy_from_table(
    self,
    table_name: str,
    *,
    output,
    columns: list = None,
    schema_name: str = None,
    timeout: float = None,
    **kwargs
) -> str:
    """Copy table data to output using a pool connection."""

async def copy_to_table(
    self,
    table_name: str,
    *,
    source,
    columns: list = None,
    schema_name: str = None,
    timeout: float = None,
    **kwargs
) -> str:
    """Copy data from source to table using a pool connection."""

async def copy_records_to_table(
    self,
    table_name: str,
    *,
    records,
    columns: list = None,
    schema_name: str = None,
    timeout: float = None,
    where: str = None
) -> str:
    """Copy records to table using a pool connection."""

Pool Management

Control pool lifecycle, monitor status, and manage configuration.

async def close(self) -> None:
    """Attempt to gracefully close all connections in the pool."""

def terminate(self) -> None:
    """Terminate all connections in the pool immediately."""

async def expire_connections(self) -> None:
    """Expire all currently open connections."""

def is_closing(self) -> bool:
    """Return True if the pool is closing or closed."""

def set_connect_args(self, dsn: str = None, **connect_kwargs) -> None:
    """Update connection arguments for new connections."""

Example Usage

# Graceful shutdown
await pool.close()

# Force shutdown
pool.terminate()

# Expire old connections (useful after schema changes)
await pool.expire_connections()

# Update connection parameters
pool.set_connect_args(
    command_timeout=30.0,
    server_settings={'timezone': 'America/New_York'}
)

Pool Status Monitoring

Monitor pool health, connection usage, and performance metrics.

def get_size(self) -> int:
    """Return the current number of connections in the pool."""

def get_min_size(self) -> int:
    """Return the minimum pool size."""

def get_max_size(self) -> int:
    """Return the maximum pool size."""

def get_idle_size(self) -> int:
    """Return the number of idle connections."""

Example Usage

# Pool status monitoring
print(f"Pool size: {pool.get_size()}")
print(f"Idle connections: {pool.get_idle_size()}")
print(f"Active connections: {pool.get_size() - pool.get_idle_size()}")

# Health check
if pool.get_idle_size() == 0 and pool.get_size() == pool.get_max_size():
    print("Warning: Pool is at maximum capacity with no idle connections")

# Auto-scaling logic
if pool.get_idle_size() < 2:
    print("Consider increasing pool size")

Pool Configuration Patterns

Common pool configuration patterns for different use cases.

Web Application Pool

# Web application with variable load
pool = asyncpg.create_pool(
    dsn,
    min_size=5,           # Always keep some connections ready
    max_size=50,          # Handle traffic spikes
    max_queries=10000,    # Prevent connection reuse issues
    max_inactive_connection_lifetime=300,  # 5 minutes
    command_timeout=30.0  # Prevent hanging requests
)

Batch Processing Pool

# Batch processing with long-running queries
pool = asyncpg.create_pool(
    dsn,
    min_size=2,           # Minimal overhead
    max_size=10,          # Limited parallelism
    max_queries=1000,     # More connection reuse
    max_inactive_connection_lifetime=1800,  # 30 minutes
    command_timeout=3600.0  # Long-running queries
)

High-Throughput Pool

# High-throughput OLTP system
pool = asyncpg.create_pool(
    dsn,
    min_size=20,          # Keep many connections ready
    max_size=100,         # High concurrency
    max_queries=50000,    # Longer connection lifetime
    max_inactive_connection_lifetime=60,   # Quick recycling
    command_timeout=5.0   # Fast queries only
)

Error Handling

Handle pool-specific errors and connection acquisition failures.

try:
    async with pool.acquire() as conn:
        result = await conn.fetch("SELECT * FROM users")
except asyncio.TimeoutError:
    print("Timeout acquiring connection from pool")
except asyncpg.TooManyConnectionsError:
    print("Pool has reached maximum size")
except asyncpg.PostgresConnectionError:
    print("Connection error in pool")

# Check pool state before operations
if pool.is_closing():
    print("Pool is shutting down")
else:
    result = await pool.fetchval("SELECT 1")

Types

class Pool:
    """A connection pool."""
    
class PoolAcquireContext:
    """Context manager for acquiring pool connections."""
    
    async def __aenter__(self) -> Connection:
        """Acquire connection from pool."""
    
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Release connection back to pool."""

Install with Tessl CLI

npx tessl i tessl/pypi-asyncpg

docs

connection-management.md

connection-pooling.md

copy-operations.md

cursor-operations.md

exception-handling.md

index.md

listeners-notifications.md

prepared-statements.md

query-execution.md

transaction-management.md

type-system.md

tile.json