An asyncio PostgreSQL driver for high-performance database connectivity with Python async/await syntax
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive transaction support including transaction contexts, savepoints, isolation levels, read-only transactions, and nested transaction management with full PostgreSQL transaction semantics.
Create and manage database transactions with automatic rollback on exceptions and explicit commit control.
def transaction(
self,
*,
isolation: str = None,
readonly: bool = False,
deferrable: bool = False
) -> Transaction:
"""
Create a Transaction context manager.
Parameters:
isolation: Transaction isolation level ('read_uncommitted', 'read_committed', 'repeatable_read', 'serializable')
readonly: Make transaction read-only
deferrable: Allow transaction to be deferred (only for serializable read-only)
Returns:
Transaction context manager
"""
def is_in_transaction(self) -> bool:
"""
Return True if connection is currently inside a transaction.
Returns:
Boolean indicating transaction state
"""# Basic transaction
async with conn.transaction():
await conn.execute("INSERT INTO users(name) VALUES($1)", "Alice")
await conn.execute("INSERT INTO orders(user_id, amount) VALUES($1, $2)", user_id, 100)
# Automatic commit on success, rollback on exception
# Transaction with isolation level
async with conn.transaction(isolation='serializable'):
# Strict consistency for critical operations
balance = await conn.fetchval("SELECT balance FROM accounts WHERE id = $1", account_id)
if balance >= amount:
await conn.execute("UPDATE accounts SET balance = balance - $1 WHERE id = $2", amount, account_id)
await conn.execute("INSERT INTO transactions(account_id, amount) VALUES($1, $2)", account_id, -amount)
# Read-only transaction for reporting
async with conn.transaction(readonly=True):
# Multiple consistent reads
user_count = await conn.fetchval("SELECT COUNT(*) FROM users")
order_count = await conn.fetchval("SELECT COUNT(*) FROM orders")
revenue = await conn.fetchval("SELECT SUM(amount) FROM orders WHERE status = 'completed'")Explicit transaction control with manual commit and rollback operations.
class Transaction:
"""Database transaction context manager."""
async def start(self) -> None:
"""Start the transaction explicitly."""
async def commit(self) -> None:
"""Commit the transaction."""
async def rollback(self) -> None:
"""Rollback the transaction."""# Manual transaction control
tx = conn.transaction()
await tx.start()
try:
await conn.execute("INSERT INTO users(name) VALUES($1)", "Bob")
user_id = await conn.fetchval("SELECT lastval()")
await conn.execute("INSERT INTO profiles(user_id, email) VALUES($1, $2)", user_id, "bob@example.com")
await tx.commit()
print("Transaction committed successfully")
except Exception as e:
await tx.rollback()
print(f"Transaction rolled back: {e}")Create nested transactions using PostgreSQL savepoints for complex transaction logic.
async def savepoint(self, name: str = None) -> Transaction:
"""
Create a savepoint within the current transaction.
Parameters:
name: Optional savepoint name
Returns:
Transaction context manager for the savepoint
"""
async def rollback_to(self, savepoint_name: str) -> None:
"""
Rollback to a specific savepoint.
Parameters:
savepoint_name: Name of the savepoint to rollback to
"""async with conn.transaction():
# Insert primary record
await conn.execute("INSERT INTO orders(customer_id, total) VALUES($1, $2)", customer_id, total)
order_id = await conn.fetchval("SELECT lastval()")
# Try to process each item with individual error handling
for item in order_items:
async with conn.transaction(): # Nested savepoint
try:
await conn.execute(
"INSERT INTO order_items(order_id, product_id, quantity, price) VALUES($1, $2, $3, $4)",
order_id, item.product_id, item.quantity, item.price
)
await conn.execute(
"UPDATE products SET stock = stock - $1 WHERE id = $2",
item.quantity, item.product_id
)
except asyncpg.CheckViolationError:
# Insufficient stock - this item will be skipped
# but the order and other items will still be processed
print(f"Insufficient stock for product {item.product_id}")
continuePostgreSQL supports four standard transaction isolation levels with different consistency guarantees.
async with conn.transaction(isolation='read_uncommitted'):
# Lowest isolation - can read uncommitted changes
# Rarely used in practice
result = await conn.fetch("SELECT * FROM volatile_data")async with conn.transaction(isolation='read_committed'):
# Default level - sees committed changes from other transactions
# Good balance of consistency and performance
await conn.execute("UPDATE accounts SET balance = balance + $1 WHERE id = $2", amount, account_id)async with conn.transaction(isolation='repeatable_read'):
# Consistent snapshot of data throughout transaction
# Good for analytical queries and reports
initial_count = await conn.fetchval("SELECT COUNT(*) FROM orders")
# Do some work...
time.sleep(1)
# Same count guaranteed even if other transactions added orders
final_count = await conn.fetchval("SELECT COUNT(*) FROM orders")
assert initial_count == final_countasync with conn.transaction(isolation='serializable'):
# Strongest isolation - equivalent to serial execution
# May fail with serialization errors that require retry
try:
balance = await conn.fetchval("SELECT balance FROM accounts WHERE id = $1", from_account)
if balance >= amount:
await conn.execute("UPDATE accounts SET balance = balance - $1 WHERE id = $2", amount, from_account)
await conn.execute("UPDATE accounts SET balance = balance + $1 WHERE id = $2", amount, to_account)
except asyncpg.SerializationError:
# Retry the transaction
raise TransactionRetryError()Handle transaction-specific errors with appropriate retry strategies.
import asyncio
import random
async def retry_transaction(func, max_retries=3):
"""Retry transaction with exponential backoff for serialization errors."""
for attempt in range(max_retries):
try:
return await func()
except asyncpg.SerializationError:
if attempt == max_retries - 1:
raise
# Exponential backoff with jitter
delay = 2 ** attempt + random.uniform(0, 1)
await asyncio.sleep(delay)
async def transfer_money(from_account, to_account, amount):
async with conn.transaction(isolation='serializable'):
# Check balance
balance = await conn.fetchval("SELECT balance FROM accounts WHERE id = $1", from_account)
if balance < amount:
raise ValueError("Insufficient funds")
# Transfer money
await conn.execute("UPDATE accounts SET balance = balance - $1 WHERE id = $2", amount, from_account)
await conn.execute("UPDATE accounts SET balance = balance + $1 WHERE id = $2", amount, to_account)
# Log transaction
await conn.execute(
"INSERT INTO transfers(from_account, to_account, amount, timestamp) VALUES($1, $2, $3, $4)",
from_account, to_account, amount, datetime.now()
)
# Use with retry
try:
await retry_transaction(lambda: transfer_money(1, 2, 100))
print("Transfer completed successfully")
except asyncpg.SerializationError:
print("Transfer failed after retries due to conflicts")
except ValueError as e:
print(f"Transfer failed: {e}")Check transaction state and handle various transaction conditions.
# Check if in transaction
if conn.is_in_transaction():
print("Already in transaction")
else:
async with conn.transaction():
# Transaction operations
pass
# Handle nested transaction attempts
try:
async with conn.transaction():
# Some operations
async with conn.transaction(): # This creates a savepoint
# Nested operations
pass
except asyncpg.InterfaceError as e:
if "already in transaction" in str(e):
print("Cannot start transaction - already in one")Best practices for managing long-running transactions and avoiding locks.
# Break long operations into smaller transactions
async def process_large_dataset(records, batch_size=1000):
"""Process large dataset in small transactions to avoid long locks."""
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
async with conn.transaction():
for record in batch:
await conn.execute(
"INSERT INTO processed_data(data) VALUES($1)",
json.dumps(record)
)
# Brief pause between batches to allow other transactions
await asyncio.sleep(0.1)
# Use advisory locks for coordination
async with conn.transaction():
# Acquire advisory lock
acquired = await conn.fetchval("SELECT pg_try_advisory_lock($1)", lock_id)
if acquired:
try:
# Do exclusive work
await conn.execute("UPDATE global_counter SET value = value + 1")
finally:
# Release advisory lock
await conn.fetchval("SELECT pg_advisory_unlock($1)", lock_id)
else:
print("Could not acquire lock - another process is working")class Transaction:
"""Database transaction context manager."""
async def __aenter__(self) -> 'Transaction':
"""Enter transaction context."""
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Exit transaction context with commit/rollback."""