CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl-labs/postgresql-python-best-practices

PostgreSQL patterns for Python with psycopg and asyncpg — connection pooling,

99

1.15x
Quality

99%

Does it follow best practices?

Impact

99%

1.15x

Average score across 5 eval scenarios

SecuritybySnyk

Passed

No known issues

Overview
Quality
Evals
Security
Files

SKILL.mdskills/postgresql-python-best-practices/

name:
postgresql-python-best-practices
description:
PostgreSQL patterns for Python with psycopg 3 and asyncpg -- connection pooling, parameterized queries, transactions, bulk inserts, async context managers, server-side cursors, SSL config, and connection cleanup. Always apply when building or reviewing Python apps with PostgreSQL using raw database drivers (not an ORM), or when debugging connection pool or query issues.
keywords:
postgresql python, psycopg, asyncpg, psycopg3, psycopg_pool, postgres python, connection pool, parameterized queries, postgres transactions, async postgres, python database, postgres driver, bulk insert, COPY, server-side cursor, ssl postgres, notify listen, asyncpg pool
license:
MIT

PostgreSQL + Python (psycopg 3 / asyncpg)

Connection pooling, parameterized queries, transactions, bulk operations, async patterns, and production configuration for PostgreSQL in Python. Every section shows WRONG vs RIGHT code. Always apply these patterns when writing PostgreSQL code in Python.


1. Connection Pool (psycopg 3)

A connection pool MUST be used in any application that makes more than one database call. Never create connections per-request.

WRONG -- new connection per request (exhausts server connections)

import psycopg

def get_orders():
    conn = psycopg.connect("postgresql://localhost:5432/myapp")  # New connection every call!
    rows = conn.execute("SELECT * FROM orders").fetchall()
    conn.close()
    return rows

RIGHT -- connection pool with min/max size, timeout, and cleanup

# app/db.py
import psycopg_pool
import os
import atexit

DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://localhost:5432/myapp")

pool = psycopg_pool.ConnectionPool(
    DATABASE_URL,
    min_size=2,
    max_size=10,
    timeout=5.0,         # Wait 5s for connection, then raise PoolTimeout
    max_idle=300.0,      # Close idle connections after 5 min
    max_lifetime=3600.0, # Recycle connections after 1 hour (handles server-side timeouts)
)

def get_conn():
    """Use as: with get_conn() as conn: ..."""
    return pool.connection()

# Always close pool on shutdown -- prevents leaked connections
atexit.register(pool.close)

WRONG -- forgetting to return connection to pool

conn = pool.getconn()  # Checked out but never returned!
rows = conn.execute("SELECT 1").fetchall()
# Missing: pool.putconn(conn) -- connection leaked

RIGHT -- context manager always returns connection to pool

with get_conn() as conn:
    rows = conn.execute(
        "SELECT * FROM orders WHERE status = %s ORDER BY created_at DESC",
        ("pending",),
    ).fetchall()
# Connection automatically returned to pool on exit

2. Connection Pool (asyncpg)

WRONG -- creating pool without proper lifecycle management

import asyncpg

# Global pool created at import time -- fails if event loop isn't running
pool = asyncpg.create_pool("postgresql://localhost/myapp")  # This is a coroutine, not a pool!

RIGHT -- async pool with lifecycle hooks

# app/db.py
import asyncpg
import os

DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://localhost:5432/myapp")

pool: asyncpg.Pool | None = None

async def init_pool():
    global pool
    pool = await asyncpg.create_pool(
        DATABASE_URL,
        min_size=2,
        max_size=10,
        command_timeout=30.0,     # Per-query timeout
        max_inactive_connection_lifetime=300.0,  # Close idle after 5 min
    )

async def close_pool():
    if pool:
        await pool.close()

# In FastAPI/Starlette:
# app.add_event_handler("startup", init_pool)
# app.add_event_handler("shutdown", close_pool)

WRONG -- acquiring connection without context manager

conn = await pool.acquire()
row = await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
# If an exception occurs here, connection is never released!

RIGHT -- async context manager for connection acquisition

async def get_user(user_id: int) -> dict | None:
    async with pool.acquire() as conn:
        row = await conn.fetchrow(
            "SELECT * FROM users WHERE id = $1", user_id
        )
        return dict(row) if row else None
# Connection automatically released back to pool

3. Parameterized Queries

Never interpolate user input into SQL strings. The placeholder syntax differs between drivers.

WRONG -- string interpolation (SQL injection vulnerability)

# psycopg -- NEVER do this
conn.execute(f"SELECT * FROM users WHERE email = '{email}'")

# asyncpg -- NEVER do this
await conn.fetch(f"SELECT * FROM users WHERE name = '{name}'")

RIGHT -- parameterized queries (safe from SQL injection)

# psycopg 3: uses %s placeholders (positional), always pass tuple
conn.execute("SELECT * FROM users WHERE email = %s", (email,))
conn.execute(
    "SELECT * FROM orders WHERE status = %s AND total_cents > %s",
    (status, min_total),
)

# asyncpg: uses $1, $2, $N placeholders (numbered), pass as separate args
await conn.fetchrow("SELECT * FROM users WHERE email = $1", email)
await conn.fetch(
    "SELECT * FROM orders WHERE status = $1 AND total_cents > $2",
    status, min_total,
)

WRONG -- using psycopg %s syntax with asyncpg or vice versa

# asyncpg does NOT support %s -- this will error
await conn.fetch("SELECT * FROM users WHERE id = %s", user_id)

# psycopg does NOT support $1 -- this will error
conn.execute("SELECT * FROM users WHERE id = $1", (user_id,))

4. Transactions

WRONG -- multiple writes without transaction (partial failure leaves inconsistent data)

# psycopg -- if second INSERT fails, first INSERT is committed!
with get_conn() as conn:
    conn.execute("INSERT INTO orders (...) VALUES (%s, %s)", (name, total))
    conn.execute("INSERT INTO order_items (...) VALUES (%s, %s)", (order_id, item_id))
    # psycopg 3 auto-commits each statement by default in autocommit mode

RIGHT -- explicit transaction block for multi-statement writes

# psycopg 3: transaction context manager
with get_conn() as conn:
    with conn.transaction():
        cur = conn.execute(
            "INSERT INTO orders (customer_name, total_cents) VALUES (%s, %s) RETURNING id",
            (name, total),
        )
        order_id = cur.fetchone()[0]
        conn.execute(
            "INSERT INTO order_items (order_id, product_id, quantity) VALUES (%s, %s, %s)",
            (order_id, product_id, quantity),
        )
# Both INSERTs commit together or roll back together

# asyncpg: async transaction context manager
async def create_order(name: str, items: list[dict]) -> int:
    async with pool.acquire() as conn:
        async with conn.transaction():
            order_id = await conn.fetchval(
                "INSERT INTO orders (customer_name, total_cents) VALUES ($1, $2) RETURNING id",
                name, total_cents,
            )
            await conn.executemany(
                "INSERT INTO order_items (order_id, product_id, quantity, price_cents) "
                "VALUES ($1, $2, $3, $4)",
                [(order_id, i["product_id"], i["quantity"], i["price_cents"]) for i in items],
            )
            return order_id

Savepoints for partial rollback

# psycopg 3: nested transaction = savepoint
with get_conn() as conn:
    with conn.transaction():
        conn.execute("INSERT INTO audit_log (...) VALUES (%s)", (event,))
        try:
            with conn.transaction():  # Creates SAVEPOINT
                conn.execute("INSERT INTO notifications (...) VALUES (%s)", (msg,))
        except Exception:
            pass  # Savepoint rolled back, audit_log INSERT still committed

5. Bulk Inserts with COPY

For inserting large datasets (hundreds+ rows), COPY is 5-10x faster than INSERT. Always use COPY for bulk loading.

WRONG -- inserting rows one at a time in a loop

for row in large_dataset:
    conn.execute(
        "INSERT INTO events (name, timestamp, payload) VALUES (%s, %s, %s)",
        (row["name"], row["timestamp"], row["payload"]),
    )
# Very slow for large datasets -- one round trip per row

RIGHT -- psycopg 3 COPY for bulk insert

# psycopg 3: COPY with write_row
with get_conn() as conn:
    with conn.transaction():
        with conn.cursor().copy(
            "COPY events (name, timestamp, payload) FROM STDIN"
        ) as copy:
            for row in large_dataset:
                copy.write_row((row["name"], row["timestamp"], row["payload"]))

RIGHT -- asyncpg copy_records_to_table for bulk insert

# asyncpg: copy_records_to_table
async def bulk_insert_events(events: list[tuple]):
    async with pool.acquire() as conn:
        await conn.copy_records_to_table(
            "events",
            columns=["name", "timestamp", "payload"],
            records=events,
        )

RIGHT -- executemany for moderate batch sizes (10-500 rows)

# psycopg 3: executemany with returning
with get_conn() as conn:
    with conn.transaction():
        conn.executemany(
            "INSERT INTO users (name, email) VALUES (%s, %s)",
            [(u["name"], u["email"]) for u in users_batch],
        )

# asyncpg: executemany
async with pool.acquire() as conn:
    await conn.executemany(
        "INSERT INTO users (name, email) VALUES ($1, $2)",
        [(u["name"], u["email"]) for u in users_batch],
    )

6. Server-Side Cursors for Large Result Sets

When querying rows that may not fit in memory, use server-side cursors to stream results.

WRONG -- loading entire large table into memory

rows = conn.execute("SELECT * FROM events").fetchall()  # May OOM on millions of rows

RIGHT -- psycopg 3 server-side cursor with name parameter

with get_conn() as conn:
    with conn.cursor(name="events_cursor") as cur:
        cur.execute("SELECT * FROM events WHERE created_at > %s", (cutoff_date,))
        while batch := cur.fetchmany(1000):
            process_batch(batch)

RIGHT -- asyncpg cursor with async iteration

async with pool.acquire() as conn:
    async with conn.transaction():
        async for record in conn.cursor(
            "SELECT * FROM events WHERE created_at > $1", cutoff_date
        ):
            await process_record(record)

7. Row Factories and Type Adaptation

RIGHT -- psycopg 3 dict rows (avoid tuple indexing)

import psycopg.rows

with get_conn() as conn:
    conn.row_factory = psycopg.rows.dict_row
    orders = conn.execute("SELECT * FROM orders").fetchall()
    # [{"id": 1, "customer_name": "Alice", ...}, ...]

# Or set per-cursor for specific queries:
with get_conn() as conn:
    cur = conn.cursor(row_factory=psycopg.rows.dict_row)
    cur.execute("SELECT * FROM orders")
    orders = cur.fetchall()

RIGHT -- asyncpg rows are already Record objects (dict-like)

row = await conn.fetchrow("SELECT * FROM orders WHERE id = $1", order_id)
row["customer_name"]  # Access by column name
dict(row)             # Convert to plain dict

Type adaptation for custom types

# psycopg 3: register custom type adapters
import psycopg.types.json

# JSON columns are auto-adapted. For custom types:
from psycopg.adapt import Loader, Dumper

# asyncpg: set_type_codec for custom types
async with pool.acquire() as conn:
    await conn.set_type_codec(
        "jsonb",
        encoder=json.dumps,
        decoder=json.loads,
        schema="pg_catalog",
    )

8. SSL/TLS Configuration

Always use SSL in production. Never disable SSL verification in production.

WRONG -- connecting without SSL in production

pool = psycopg_pool.ConnectionPool("postgresql://prod-server/myapp")  # No SSL!

RIGHT -- SSL for psycopg 3

import ssl

# Verify server certificate (production)
ssl_ctx = ssl.create_default_context(cafile="/path/to/ca-cert.pem")
pool = psycopg_pool.ConnectionPool(
    "postgresql://prod-server/myapp",
    kwargs={"sslmode": "verify-full", "sslrootcert": "/path/to/ca-cert.pem"},
    min_size=2,
    max_size=10,
)

# Or via connection string parameter
DATABASE_URL = "postgresql://user:pass@host/db?sslmode=require"

RIGHT -- SSL for asyncpg

import ssl

ssl_ctx = ssl.create_default_context(cafile="/path/to/ca-cert.pem")
pool = await asyncpg.create_pool(
    DATABASE_URL,
    ssl=ssl_ctx,
    min_size=2,
    max_size=10,
)

9. Connection Health and Cleanup

RIGHT -- connection health checks in pool

# psycopg_pool: configure health checks
pool = psycopg_pool.ConnectionPool(
    DATABASE_URL,
    min_size=2,
    max_size=10,
    timeout=5.0,
    max_idle=300.0,
    max_lifetime=3600.0,     # Recycle connections hourly
    check=psycopg_pool.ConnectionPool.check_connection,  # Periodic health checks
)

RIGHT -- proper shutdown in web frameworks

# FastAPI
from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    await init_pool()
    yield
    await close_pool()

app = FastAPI(lifespan=lifespan)

# Flask
import atexit
atexit.register(pool.close)

Checklist -- always apply these rules

  • Connection pool (psycopg_pool or asyncpg.create_pool) with min_size, max_size, and timeout -- NEVER per-request connections
  • DATABASE_URL from environment variable, never hardcoded credentials
  • All queries parameterized: %s for psycopg, $1 for asyncpg -- NEVER f-strings or string concatenation
  • Context managers for ALL connections: with get_conn() or async with pool.acquire()
  • Explicit transactions (conn.transaction()) for multi-statement writes
  • COPY or executemany for bulk inserts (100+ rows) -- never single-row INSERT in a loop
  • Server-side cursors for large result sets that may not fit in memory
  • Pool closed on shutdown (atexit, lifespan handler, or app lifecycle hook)
  • max_lifetime set on pool to recycle connections (handles server-side timeouts)
  • Dict row factory (psycopg) for readable results -- avoid tuple indexing
  • SSL/sslmode configured for production connections
  • Migrations in version-controlled files (Alembic or yoyo-migrations)

Verifiers

  • pg-pool-config -- Connection pool setup with proper sizing, timeouts, and lifecycle management
  • pg-query-safety -- Parameterized queries, transactions, and bulk operations
  • pg-async-patterns -- Async connection pool, context managers, and cursor patterns

skills

postgresql-python-best-practices

tile.json