An asyncio PostgreSQL driver for high-performance database connectivity with Python async/await syntax
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Advanced connection pooling with configurable pool sizes, automatic connection lifecycle management, load balancing, and transparent query execution through pooled connections.
Create connection pools with comprehensive configuration options for optimal resource utilization and performance.
def create_pool(
dsn: str = None,
*,
min_size: int = 10,
max_size: int = 10,
max_queries: int = 50000,
max_inactive_connection_lifetime: float = 300.0,
connect: callable = None,
setup: callable = None,
init: callable = None,
reset: callable = None,
loop = None,
connection_class: typing.Type[Connection] = Connection,
record_class: typing.Type[Record] = Record,
**connect_kwargs
) -> Pool:
"""
Create a connection pool.
Parameters:
dsn: PostgreSQL connection string
min_size: Minimum number of connections in the pool
max_size: Maximum number of connections in the pool
max_queries: Maximum queries per connection before replacement
max_inactive_connection_lifetime: Maximum idle time before connection closure
connect: Custom connection factory function
setup: Function called on each new connection
init: Function called on each acquired connection
reset: Function called when connection is released
loop: Event loop to use
connection_class: Custom connection class
record_class: Custom record class
**connect_kwargs: Additional connection parameters
Returns:
Pool instance
"""# Basic pool
pool = asyncpg.create_pool('postgresql://user:pass@localhost/mydb')
# Advanced pool configuration
pool = asyncpg.create_pool(
'postgresql://user:pass@localhost/mydb',
min_size=5,
max_size=20,
max_queries=10000,
max_inactive_connection_lifetime=600.0,
command_timeout=60.0
)
# Pool with custom setup
async def setup_connection(conn):
await conn.set_type_codec('json', encoder=json.dumps, decoder=json.loads)
await conn.execute("SET timezone = 'UTC'")
pool = asyncpg.create_pool(
dsn,
setup=setup_connection,
min_size=2,
max_size=10
)
await pool # Initialize the poolAcquire and release connections from the pool with automatic lifecycle management.
def acquire(self, *, timeout: float = None) -> PoolAcquireContext:
"""
Acquire a database connection from the pool.
Parameters:
timeout: Maximum time to wait for a connection
Returns:
Connection proxy context manager
"""
async def release(self, connection, *, timeout: float = None) -> None:
"""
Release a database connection back to the pool.
Parameters:
connection: Connection to release
timeout: Maximum time to wait for release
"""# Context manager (recommended)
async with pool.acquire() as conn:
result = await conn.fetch("SELECT * FROM users")
# Connection automatically released
# Manual acquisition/release
conn = await pool.acquire()
try:
result = await conn.fetch("SELECT * FROM users")
finally:
await pool.release(conn)
# With timeout
try:
async with pool.acquire(timeout=5.0) as conn:
result = await conn.execute("LONG RUNNING QUERY")
except asyncio.TimeoutError:
print("Could not acquire connection within timeout")Execute queries directly through the pool without explicit connection management.
async def execute(self, query: str, *args, timeout: float = None) -> str:
"""Execute SQL command using a pool connection."""
async def executemany(self, command: str, args, *, timeout: float = None) -> None:
"""Execute SQL command for multiple argument sets using a pool connection."""
async def fetch(self, query: str, *args, timeout: float = None, record_class = None) -> list[Record]:
"""Fetch all results using a pool connection."""
async def fetchval(self, query: str, *args, column: int = 0, timeout: float = None):
"""Fetch single value using a pool connection."""
async def fetchrow(self, query: str, *args, timeout: float = None, record_class = None) -> Record:
"""Fetch first row using a pool connection."""
async def fetchmany(self, query: str, args, *, timeout: float = None, record_class = None) -> list[list[Record]]:
"""Execute query for multiple argument sets using a pool connection."""# Direct pool queries (connection handled automatically)
users = await pool.fetch("SELECT * FROM users WHERE active = $1", True)
count = await pool.fetchval("SELECT COUNT(*) FROM orders")
await pool.execute(
"INSERT INTO logs(message, timestamp) VALUES($1, $2)",
"User logged in", datetime.now()
)
# Batch operations
orders = [(100, 'pending'), (200, 'shipped'), (300, 'delivered')]
await pool.executemany(
"INSERT INTO orders(amount, status) VALUES($1, $2)",
orders
)High-performance bulk operations using the pool's COPY functionality.
async def copy_from_table(
self,
table_name: str,
*,
output,
columns: list = None,
schema_name: str = None,
timeout: float = None,
**kwargs
) -> str:
"""Copy table data to output using a pool connection."""
async def copy_to_table(
self,
table_name: str,
*,
source,
columns: list = None,
schema_name: str = None,
timeout: float = None,
**kwargs
) -> str:
"""Copy data from source to table using a pool connection."""
async def copy_records_to_table(
self,
table_name: str,
*,
records,
columns: list = None,
schema_name: str = None,
timeout: float = None,
where: str = None
) -> str:
"""Copy records to table using a pool connection."""Control pool lifecycle, monitor status, and manage configuration.
async def close(self) -> None:
"""Attempt to gracefully close all connections in the pool."""
def terminate(self) -> None:
"""Terminate all connections in the pool immediately."""
async def expire_connections(self) -> None:
"""Expire all currently open connections."""
def is_closing(self) -> bool:
"""Return True if the pool is closing or closed."""
def set_connect_args(self, dsn: str = None, **connect_kwargs) -> None:
"""Update connection arguments for new connections."""# Graceful shutdown
await pool.close()
# Force shutdown
pool.terminate()
# Expire old connections (useful after schema changes)
await pool.expire_connections()
# Update connection parameters
pool.set_connect_args(
command_timeout=30.0,
server_settings={'timezone': 'America/New_York'}
)Monitor pool health, connection usage, and performance metrics.
def get_size(self) -> int:
"""Return the current number of connections in the pool."""
def get_min_size(self) -> int:
"""Return the minimum pool size."""
def get_max_size(self) -> int:
"""Return the maximum pool size."""
def get_idle_size(self) -> int:
"""Return the number of idle connections."""# Pool status monitoring
print(f"Pool size: {pool.get_size()}")
print(f"Idle connections: {pool.get_idle_size()}")
print(f"Active connections: {pool.get_size() - pool.get_idle_size()}")
# Health check
if pool.get_idle_size() == 0 and pool.get_size() == pool.get_max_size():
print("Warning: Pool is at maximum capacity with no idle connections")
# Auto-scaling logic
if pool.get_idle_size() < 2:
print("Consider increasing pool size")Common pool configuration patterns for different use cases.
# Web application with variable load
pool = asyncpg.create_pool(
dsn,
min_size=5, # Always keep some connections ready
max_size=50, # Handle traffic spikes
max_queries=10000, # Prevent connection reuse issues
max_inactive_connection_lifetime=300, # 5 minutes
command_timeout=30.0 # Prevent hanging requests
)# Batch processing with long-running queries
pool = asyncpg.create_pool(
dsn,
min_size=2, # Minimal overhead
max_size=10, # Limited parallelism
max_queries=1000, # More connection reuse
max_inactive_connection_lifetime=1800, # 30 minutes
command_timeout=3600.0 # Long-running queries
)# High-throughput OLTP system
pool = asyncpg.create_pool(
dsn,
min_size=20, # Keep many connections ready
max_size=100, # High concurrency
max_queries=50000, # Longer connection lifetime
max_inactive_connection_lifetime=60, # Quick recycling
command_timeout=5.0 # Fast queries only
)Handle pool-specific errors and connection acquisition failures.
try:
async with pool.acquire() as conn:
result = await conn.fetch("SELECT * FROM users")
except asyncio.TimeoutError:
print("Timeout acquiring connection from pool")
except asyncpg.TooManyConnectionsError:
print("Pool has reached maximum size")
except asyncpg.PostgresConnectionError:
print("Connection error in pool")
# Check pool state before operations
if pool.is_closing():
print("Pool is shutting down")
else:
result = await pool.fetchval("SELECT 1")class Pool:
"""A connection pool."""
class PoolAcquireContext:
"""Context manager for acquiring pool connections."""
async def __aenter__(self) -> Connection:
"""Acquire connection from pool."""
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Release connection back to pool."""