PostgreSQL patterns for Python with psycopg and asyncpg — connection pooling,
99
99%
Does it follow best practices?
Impact
99%
1.15xAverage score across 5 eval scenarios
Passed
No known issues
Connection pooling, parameterized queries, transactions, bulk operations, async patterns, and production configuration for PostgreSQL in Python. Every section shows WRONG vs RIGHT code. Always apply these patterns when writing PostgreSQL code in Python.
A connection pool MUST be used in any application that makes more than one database call. Never create connections per-request.
import psycopg
def get_orders():
conn = psycopg.connect("postgresql://localhost:5432/myapp") # New connection every call!
rows = conn.execute("SELECT * FROM orders").fetchall()
conn.close()
return rows# app/db.py
import psycopg_pool
import os
import atexit
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://localhost:5432/myapp")
pool = psycopg_pool.ConnectionPool(
DATABASE_URL,
min_size=2,
max_size=10,
timeout=5.0, # Wait 5s for connection, then raise PoolTimeout
max_idle=300.0, # Close idle connections after 5 min
max_lifetime=3600.0, # Recycle connections after 1 hour (handles server-side timeouts)
)
def get_conn():
"""Use as: with get_conn() as conn: ..."""
return pool.connection()
# Always close pool on shutdown -- prevents leaked connections
atexit.register(pool.close)conn = pool.getconn() # Checked out but never returned!
rows = conn.execute("SELECT 1").fetchall()
# Missing: pool.putconn(conn) -- connection leakedwith get_conn() as conn:
rows = conn.execute(
"SELECT * FROM orders WHERE status = %s ORDER BY created_at DESC",
("pending",),
).fetchall()
# Connection automatically returned to pool on exitimport asyncpg
# Global pool created at import time -- fails if event loop isn't running
pool = asyncpg.create_pool("postgresql://localhost/myapp") # This is a coroutine, not a pool!# app/db.py
import asyncpg
import os
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://localhost:5432/myapp")
pool: asyncpg.Pool | None = None
async def init_pool():
global pool
pool = await asyncpg.create_pool(
DATABASE_URL,
min_size=2,
max_size=10,
command_timeout=30.0, # Per-query timeout
max_inactive_connection_lifetime=300.0, # Close idle after 5 min
)
async def close_pool():
if pool:
await pool.close()
# In FastAPI/Starlette:
# app.add_event_handler("startup", init_pool)
# app.add_event_handler("shutdown", close_pool)conn = await pool.acquire()
row = await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
# If an exception occurs here, connection is never released!async def get_user(user_id: int) -> dict | None:
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM users WHERE id = $1", user_id
)
return dict(row) if row else None
# Connection automatically released back to poolNever interpolate user input into SQL strings. The placeholder syntax differs between drivers.
# psycopg -- NEVER do this
conn.execute(f"SELECT * FROM users WHERE email = '{email}'")
# asyncpg -- NEVER do this
await conn.fetch(f"SELECT * FROM users WHERE name = '{name}'")# psycopg 3: uses %s placeholders (positional), always pass tuple
conn.execute("SELECT * FROM users WHERE email = %s", (email,))
conn.execute(
"SELECT * FROM orders WHERE status = %s AND total_cents > %s",
(status, min_total),
)
# asyncpg: uses $1, $2, $N placeholders (numbered), pass as separate args
await conn.fetchrow("SELECT * FROM users WHERE email = $1", email)
await conn.fetch(
"SELECT * FROM orders WHERE status = $1 AND total_cents > $2",
status, min_total,
)# asyncpg does NOT support %s -- this will error
await conn.fetch("SELECT * FROM users WHERE id = %s", user_id)
# psycopg does NOT support $1 -- this will error
conn.execute("SELECT * FROM users WHERE id = $1", (user_id,))# psycopg -- if second INSERT fails, first INSERT is committed!
with get_conn() as conn:
conn.execute("INSERT INTO orders (...) VALUES (%s, %s)", (name, total))
conn.execute("INSERT INTO order_items (...) VALUES (%s, %s)", (order_id, item_id))
# psycopg 3 auto-commits each statement by default in autocommit mode# psycopg 3: transaction context manager
with get_conn() as conn:
with conn.transaction():
cur = conn.execute(
"INSERT INTO orders (customer_name, total_cents) VALUES (%s, %s) RETURNING id",
(name, total),
)
order_id = cur.fetchone()[0]
conn.execute(
"INSERT INTO order_items (order_id, product_id, quantity) VALUES (%s, %s, %s)",
(order_id, product_id, quantity),
)
# Both INSERTs commit together or roll back together
# asyncpg: async transaction context manager
async def create_order(name: str, items: list[dict]) -> int:
async with pool.acquire() as conn:
async with conn.transaction():
order_id = await conn.fetchval(
"INSERT INTO orders (customer_name, total_cents) VALUES ($1, $2) RETURNING id",
name, total_cents,
)
await conn.executemany(
"INSERT INTO order_items (order_id, product_id, quantity, price_cents) "
"VALUES ($1, $2, $3, $4)",
[(order_id, i["product_id"], i["quantity"], i["price_cents"]) for i in items],
)
return order_id# psycopg 3: nested transaction = savepoint
with get_conn() as conn:
with conn.transaction():
conn.execute("INSERT INTO audit_log (...) VALUES (%s)", (event,))
try:
with conn.transaction(): # Creates SAVEPOINT
conn.execute("INSERT INTO notifications (...) VALUES (%s)", (msg,))
except Exception:
pass # Savepoint rolled back, audit_log INSERT still committedFor inserting large datasets (hundreds+ rows), COPY is 5-10x faster than INSERT. Always use COPY for bulk loading.
for row in large_dataset:
conn.execute(
"INSERT INTO events (name, timestamp, payload) VALUES (%s, %s, %s)",
(row["name"], row["timestamp"], row["payload"]),
)
# Very slow for large datasets -- one round trip per row# psycopg 3: COPY with write_row
with get_conn() as conn:
with conn.transaction():
with conn.cursor().copy(
"COPY events (name, timestamp, payload) FROM STDIN"
) as copy:
for row in large_dataset:
copy.write_row((row["name"], row["timestamp"], row["payload"]))# asyncpg: copy_records_to_table
async def bulk_insert_events(events: list[tuple]):
async with pool.acquire() as conn:
await conn.copy_records_to_table(
"events",
columns=["name", "timestamp", "payload"],
records=events,
)# psycopg 3: executemany with returning
with get_conn() as conn:
with conn.transaction():
conn.executemany(
"INSERT INTO users (name, email) VALUES (%s, %s)",
[(u["name"], u["email"]) for u in users_batch],
)
# asyncpg: executemany
async with pool.acquire() as conn:
await conn.executemany(
"INSERT INTO users (name, email) VALUES ($1, $2)",
[(u["name"], u["email"]) for u in users_batch],
)When querying rows that may not fit in memory, use server-side cursors to stream results.
rows = conn.execute("SELECT * FROM events").fetchall() # May OOM on millions of rowswith get_conn() as conn:
with conn.cursor(name="events_cursor") as cur:
cur.execute("SELECT * FROM events WHERE created_at > %s", (cutoff_date,))
while batch := cur.fetchmany(1000):
process_batch(batch)async with pool.acquire() as conn:
async with conn.transaction():
async for record in conn.cursor(
"SELECT * FROM events WHERE created_at > $1", cutoff_date
):
await process_record(record)import psycopg.rows
with get_conn() as conn:
conn.row_factory = psycopg.rows.dict_row
orders = conn.execute("SELECT * FROM orders").fetchall()
# [{"id": 1, "customer_name": "Alice", ...}, ...]
# Or set per-cursor for specific queries:
with get_conn() as conn:
cur = conn.cursor(row_factory=psycopg.rows.dict_row)
cur.execute("SELECT * FROM orders")
orders = cur.fetchall()row = await conn.fetchrow("SELECT * FROM orders WHERE id = $1", order_id)
row["customer_name"] # Access by column name
dict(row) # Convert to plain dict# psycopg 3: register custom type adapters
import psycopg.types.json
# JSON columns are auto-adapted. For custom types:
from psycopg.adapt import Loader, Dumper
# asyncpg: set_type_codec for custom types
async with pool.acquire() as conn:
await conn.set_type_codec(
"jsonb",
encoder=json.dumps,
decoder=json.loads,
schema="pg_catalog",
)Always use SSL in production. Never disable SSL verification in production.
pool = psycopg_pool.ConnectionPool("postgresql://prod-server/myapp") # No SSL!import ssl
# Verify server certificate (production)
ssl_ctx = ssl.create_default_context(cafile="/path/to/ca-cert.pem")
pool = psycopg_pool.ConnectionPool(
"postgresql://prod-server/myapp",
kwargs={"sslmode": "verify-full", "sslrootcert": "/path/to/ca-cert.pem"},
min_size=2,
max_size=10,
)
# Or via connection string parameter
DATABASE_URL = "postgresql://user:pass@host/db?sslmode=require"import ssl
ssl_ctx = ssl.create_default_context(cafile="/path/to/ca-cert.pem")
pool = await asyncpg.create_pool(
DATABASE_URL,
ssl=ssl_ctx,
min_size=2,
max_size=10,
)# psycopg_pool: configure health checks
pool = psycopg_pool.ConnectionPool(
DATABASE_URL,
min_size=2,
max_size=10,
timeout=5.0,
max_idle=300.0,
max_lifetime=3600.0, # Recycle connections hourly
check=psycopg_pool.ConnectionPool.check_connection, # Periodic health checks
)# FastAPI
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
await init_pool()
yield
await close_pool()
app = FastAPI(lifespan=lifespan)
# Flask
import atexit
atexit.register(pool.close)DATABASE_URL from environment variable, never hardcoded credentials%s for psycopg, $1 for asyncpg -- NEVER f-strings or string concatenationwith get_conn() or async with pool.acquire()conn.transaction()) for multi-statement writesevals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
skills
postgresql-python-best-practices