CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-asyncpg

An asyncio PostgreSQL driver for high-performance database connectivity with Python async/await syntax

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

transaction-management.mddocs/

Transaction Management

Comprehensive transaction support including transaction contexts, savepoints, isolation levels, read-only transactions, and nested transaction management with full PostgreSQL transaction semantics.

Capabilities

Transaction Context

Create and manage database transactions with automatic rollback on exceptions and explicit commit control.

def transaction(
    self, 
    *, 
    isolation: str = None, 
    readonly: bool = False, 
    deferrable: bool = False
) -> Transaction:
    """
    Create a Transaction context manager.
    
    Parameters:
    isolation: Transaction isolation level ('read_uncommitted', 'read_committed', 'repeatable_read', 'serializable')
    readonly: Make transaction read-only
    deferrable: Allow transaction to be deferred (only for serializable read-only)
    
    Returns:
    Transaction context manager
    """

def is_in_transaction(self) -> bool:
    """
    Return True if connection is currently inside a transaction.
    
    Returns:
    Boolean indicating transaction state
    """

Example Usage

# Basic transaction
async with conn.transaction():
    await conn.execute("INSERT INTO users(name) VALUES($1)", "Alice")
    await conn.execute("INSERT INTO orders(user_id, amount) VALUES($1, $2)", user_id, 100)
    # Automatic commit on success, rollback on exception

# Transaction with isolation level
async with conn.transaction(isolation='serializable'):
    # Strict consistency for critical operations
    balance = await conn.fetchval("SELECT balance FROM accounts WHERE id = $1", account_id)
    if balance >= amount:
        await conn.execute("UPDATE accounts SET balance = balance - $1 WHERE id = $2", amount, account_id)
        await conn.execute("INSERT INTO transactions(account_id, amount) VALUES($1, $2)", account_id, -amount)

# Read-only transaction for reporting
async with conn.transaction(readonly=True):
    # Multiple consistent reads
    user_count = await conn.fetchval("SELECT COUNT(*) FROM users")
    order_count = await conn.fetchval("SELECT COUNT(*) FROM orders")
    revenue = await conn.fetchval("SELECT SUM(amount) FROM orders WHERE status = 'completed'")

Transaction Control

Explicit transaction control with manual commit and rollback operations.

class Transaction:
    """Database transaction context manager."""
    
    async def start(self) -> None:
        """Start the transaction explicitly."""
    
    async def commit(self) -> None:
        """Commit the transaction."""
    
    async def rollback(self) -> None:
        """Rollback the transaction."""

Example Usage

# Manual transaction control
tx = conn.transaction()
await tx.start()

try:
    await conn.execute("INSERT INTO users(name) VALUES($1)", "Bob")
    user_id = await conn.fetchval("SELECT lastval()")
    
    await conn.execute("INSERT INTO profiles(user_id, email) VALUES($1, $2)", user_id, "bob@example.com")
    
    await tx.commit()
    print("Transaction committed successfully")
    
except Exception as e:
    await tx.rollback()
    print(f"Transaction rolled back: {e}")

Savepoints

Create nested transactions using PostgreSQL savepoints for complex transaction logic.

async def savepoint(self, name: str = None) -> Transaction:
    """
    Create a savepoint within the current transaction.
    
    Parameters:
    name: Optional savepoint name
    
    Returns:
    Transaction context manager for the savepoint
    """

async def rollback_to(self, savepoint_name: str) -> None:
    """
    Rollback to a specific savepoint.
    
    Parameters:
    savepoint_name: Name of the savepoint to rollback to
    """

Example Usage

async with conn.transaction():
    # Insert primary record
    await conn.execute("INSERT INTO orders(customer_id, total) VALUES($1, $2)", customer_id, total)
    order_id = await conn.fetchval("SELECT lastval()")
    
    # Try to process each item with individual error handling
    for item in order_items:
        async with conn.transaction():  # Nested savepoint
            try:
                await conn.execute(
                    "INSERT INTO order_items(order_id, product_id, quantity, price) VALUES($1, $2, $3, $4)",
                    order_id, item.product_id, item.quantity, item.price
                )
                await conn.execute(
                    "UPDATE products SET stock = stock - $1 WHERE id = $2",
                    item.quantity, item.product_id
                )
            except asyncpg.CheckViolationError:
                # Insufficient stock - this item will be skipped
                # but the order and other items will still be processed
                print(f"Insufficient stock for product {item.product_id}")
                continue

Transaction Isolation Levels

PostgreSQL supports four standard transaction isolation levels with different consistency guarantees.

Read Uncommitted

async with conn.transaction(isolation='read_uncommitted'):
    # Lowest isolation - can read uncommitted changes
    # Rarely used in practice
    result = await conn.fetch("SELECT * FROM volatile_data")

Read Committed (Default)

async with conn.transaction(isolation='read_committed'):
    # Default level - sees committed changes from other transactions
    # Good balance of consistency and performance
    await conn.execute("UPDATE accounts SET balance = balance + $1 WHERE id = $2", amount, account_id)

Repeatable Read

async with conn.transaction(isolation='repeatable_read'):
    # Consistent snapshot of data throughout transaction
    # Good for analytical queries and reports
    initial_count = await conn.fetchval("SELECT COUNT(*) FROM orders")
    
    # Do some work...
    time.sleep(1)
    
    # Same count guaranteed even if other transactions added orders
    final_count = await conn.fetchval("SELECT COUNT(*) FROM orders")
    assert initial_count == final_count

Serializable

async with conn.transaction(isolation='serializable'):
    # Strongest isolation - equivalent to serial execution
    # May fail with serialization errors that require retry
    try:
        balance = await conn.fetchval("SELECT balance FROM accounts WHERE id = $1", from_account)
        if balance >= amount:
            await conn.execute("UPDATE accounts SET balance = balance - $1 WHERE id = $2", amount, from_account)
            await conn.execute("UPDATE accounts SET balance = balance + $1 WHERE id = $2", amount, to_account)
    except asyncpg.SerializationError:
        # Retry the transaction
        raise TransactionRetryError()

Error Handling and Retry Logic

Handle transaction-specific errors with appropriate retry strategies.

import asyncio
import random

async def retry_transaction(func, max_retries=3):
    """Retry transaction with exponential backoff for serialization errors."""
    for attempt in range(max_retries):
        try:
            return await func()
        except asyncpg.SerializationError:
            if attempt == max_retries - 1:
                raise
            # Exponential backoff with jitter
            delay = 2 ** attempt + random.uniform(0, 1)
            await asyncio.sleep(delay)

async def transfer_money(from_account, to_account, amount):
    async with conn.transaction(isolation='serializable'):
        # Check balance
        balance = await conn.fetchval("SELECT balance FROM accounts WHERE id = $1", from_account)
        if balance < amount:
            raise ValueError("Insufficient funds")
        
        # Transfer money
        await conn.execute("UPDATE accounts SET balance = balance - $1 WHERE id = $2", amount, from_account)
        await conn.execute("UPDATE accounts SET balance = balance + $1 WHERE id = $2", amount, to_account)
        
        # Log transaction
        await conn.execute(
            "INSERT INTO transfers(from_account, to_account, amount, timestamp) VALUES($1, $2, $3, $4)",
            from_account, to_account, amount, datetime.now()
        )

# Use with retry
try:
    await retry_transaction(lambda: transfer_money(1, 2, 100))
    print("Transfer completed successfully")
except asyncpg.SerializationError:
    print("Transfer failed after retries due to conflicts")
except ValueError as e:
    print(f"Transfer failed: {e}")

Transaction Status

Check transaction state and handle various transaction conditions.

# Check if in transaction
if conn.is_in_transaction():
    print("Already in transaction")
else:
    async with conn.transaction():
        # Transaction operations
        pass

# Handle nested transaction attempts
try:
    async with conn.transaction():
        # Some operations
        async with conn.transaction():  # This creates a savepoint
            # Nested operations
            pass
except asyncpg.InterfaceError as e:
    if "already in transaction" in str(e):
        print("Cannot start transaction - already in one")

Long-Running Transactions

Best practices for managing long-running transactions and avoiding locks.

# Break long operations into smaller transactions
async def process_large_dataset(records, batch_size=1000):
    """Process large dataset in small transactions to avoid long locks."""
    
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        
        async with conn.transaction():
            for record in batch:
                await conn.execute(
                    "INSERT INTO processed_data(data) VALUES($1)",
                    json.dumps(record)
                )
        
        # Brief pause between batches to allow other transactions
        await asyncio.sleep(0.1)

# Use advisory locks for coordination
async with conn.transaction():
    # Acquire advisory lock
    acquired = await conn.fetchval("SELECT pg_try_advisory_lock($1)", lock_id)
    
    if acquired:
        try:
            # Do exclusive work
            await conn.execute("UPDATE global_counter SET value = value + 1")
        finally:
            # Release advisory lock
            await conn.fetchval("SELECT pg_advisory_unlock($1)", lock_id)
    else:
        print("Could not acquire lock - another process is working")

Types

class Transaction:
    """Database transaction context manager."""
    
    async def __aenter__(self) -> 'Transaction':
        """Enter transaction context."""
    
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Exit transaction context with commit/rollback."""

Install with Tessl CLI

npx tessl i tessl/pypi-asyncpg

docs

connection-management.md

connection-pooling.md

copy-operations.md

cursor-operations.md

exception-handling.md

index.md

listeners-notifications.md

prepared-statements.md

query-execution.md

transaction-management.md

type-system.md

tile.json