CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-py-postgresql

PostgreSQL driver and tools library providing PG-API and DB-API 2.0 interfaces for Python.

Pending
Overview
Eval results
Files

transaction-management.mddocs/

Transaction Management

Transaction control with savepoints, context managers, and isolation level management for reliable database operations.

Capabilities

Transaction Context Manager

High-level transaction interface using Python context managers for automatic commit/rollback.

def xact():
    """
    Create a transaction context manager for automatic transaction control.
    
    Returns:
    Transaction: Context manager that commits on success, rolls back on exceptions
    
    Usage:
    with db.xact():
        # All operations in this block are part of one transaction
        # Automatic commit on success, rollback on exception
    """

Transaction Interface

Low-level transaction control interface for fine-grained transaction management.

class Transaction:
    """
    Transaction control interface providing commit, rollback, and savepoint operations.
    """
    
    def start():
        """
        Start a new transaction.
        
        Raises:
        TransactionError: If transaction cannot be started
        """
    
    def commit():
        """
        Commit the current transaction.
        
        Raises:
        TransactionError: If commit fails
        """
    
    def rollback():
        """
        Roll back the current transaction.
        
        Raises:
        TransactionError: If rollback fails
        """
    
    def savepoint(name=None):
        """
        Create a savepoint within the current transaction.
        
        Parameters:
        - name (str, optional): Savepoint name (auto-generated if not provided)
        
        Returns:
        str: Savepoint name
        
        Raises:
        TransactionError: If savepoint creation fails
        """
    
    def rollback_to_savepoint(name):
        """
        Roll back to a specific savepoint.
        
        Parameters:
        - name (str): Savepoint name
        
        Raises:
        TransactionError: If rollback to savepoint fails
        """
    
    def release_savepoint(name):
        """
        Release a savepoint (remove it without rolling back).
        
        Parameters:
        - name (str): Savepoint name
        
        Raises:
        TransactionError: If savepoint release fails
        """
    
    @property
    def state():
        """
        Get current transaction state.
        
        Returns:
        str: Transaction state ('idle', 'active', 'error', 'aborted')
        """

Database Transaction Methods

Transaction methods available on the main database connection interface.

class Database:
    """Database interface with transaction management methods."""
    
    def xact():
        """
        Create transaction context manager.
        
        Returns:
        Transaction: Transaction context manager
        """
    
    def begin():
        """
        Begin a new transaction explicitly.
        
        Returns:
        Transaction: Transaction object for manual control
        """
    
    def commit():
        """
        Commit current transaction (if any).
        """
    
    def rollback():  
        """
        Roll back current transaction (if any).
        """
    
    @property
    def in_transaction():
        """
        Check if currently in a transaction.
        
        Returns:
        bool: True if in transaction, False otherwise
        """

Isolation Level Control

Interface for controlling transaction isolation levels.

def set_isolation_level(level):
    """
    Set transaction isolation level.
    
    Parameters:
    - level (str): Isolation level ('READ UNCOMMITTED', 'READ COMMITTED', 
                   'REPEATABLE READ', 'SERIALIZABLE')
    
    Raises:
    ProgrammingError: If isolation level is invalid
    """

def get_isolation_level():
    """
    Get current transaction isolation level.
    
    Returns:
    str: Current isolation level
    """

Usage Examples

Basic Transaction Usage

import postgresql
import postgresql.exceptions as pg_exc

db = postgresql.open('pq://user:pass@localhost/mydb')

# Context manager approach (recommended)
try:
    with db.xact():
        # All operations in this block are part of one transaction
        insert_user = db.prepare("INSERT INTO users (name, email) VALUES ($1, $2)")
        insert_user("John Doe", "john@example.com")
        
        insert_profile = db.prepare("INSERT INTO profiles (user_id, bio) VALUES (currval('users_id_seq'), $1)")
        insert_profile("Software developer")
        
        # Transaction automatically commits here if no exceptions
        print("User and profile created successfully")
        
except pg_exc.ICVError as e:
    print(f"Integrity constraint violation: {e}")
    # Transaction automatically rolled back
    
except pg_exc.Error as e:
    print(f"Database error: {e}")
    # Transaction automatically rolled back

Manual Transaction Control

import postgresql
import postgresql.exceptions as pg_exc

db = postgresql.open('pq://user:pass@localhost/mydb')

# Manual transaction control
tx = db.begin()

try:
    # Execute operations
    db.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
    db.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
    
    # Check balances are valid
    check_balance = db.prepare("SELECT balance FROM accounts WHERE id = $1")
    balance1 = check_balance.first(1)['balance']
    balance2 = check_balance.first(2)['balance'] 
    
    if balance1 < 0:
        raise ValueError("Insufficient funds in account 1")
    
    # Manually commit
    tx.commit()
    print("Transfer completed successfully")
    
except Exception as e:
    # Manually rollback
    tx.rollback()
    print(f"Transfer failed: {e}")

Savepoint Usage

import postgresql
import postgresql.exceptions as pg_exc

db = postgresql.open('pq://user:pass@localhost/mydb')

with db.xact() as tx:
    # Insert initial user
    insert_user = db.prepare("INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id")
    user_id = insert_user.first("John Doe", "john@example.com")['id']
    
    # Create savepoint before risky operations
    sp1 = tx.savepoint("before_profile")
    
    try:
        # Try to insert profile with potentially problematic data
        insert_profile = db.prepare("INSERT INTO profiles (user_id, bio, avatar_url) VALUES ($1, $2, $3)")
        insert_profile(user_id, "Bio text", "http://invalid-image-url")
        
        # If we get here, release the savepoint
        tx.release_savepoint(sp1)
        print("Profile created successfully")
        
    except pg_exc.Error as e:
        # Roll back to savepoint (keeps user, removes profile attempt)
        tx.rollback_to_savepoint(sp1)
        print(f"Profile creation failed: {e}")
        
        # Insert minimal profile instead
        insert_minimal_profile = db.prepare("INSERT INTO profiles (user_id, bio) VALUES ($1, $2)")
        insert_minimal_profile(user_id, "Default bio")
        print("Created minimal profile instead")
    
    # Add additional data with another savepoint
    sp2 = tx.savepoint("before_preferences")
    
    try:
        insert_prefs = db.prepare("INSERT INTO user_preferences (user_id, theme, notifications) VALUES ($1, $2, $3)")
        insert_prefs(user_id, "dark", True)
        tx.release_savepoint(sp2)
        print("Preferences created successfully")
        
    except pg_exc.Error as e:
        tx.rollback_to_savepoint(sp2)
        print(f"Preferences creation failed: {e}")
    
    # Transaction commits here with user, profile, and possibly preferences
    print("User creation process completed")

Nested Transactions with Savepoints

import postgresql
import postgresql.exceptions as pg_exc

db = postgresql.open('pq://user:pass@localhost/mydb')

def create_order_with_items(db, customer_id, items):
    """Create order with multiple items using nested savepoints."""
    
    with db.xact() as tx:
        # Create order
        create_order = db.prepare("""
            INSERT INTO orders (customer_id, order_date, status) 
            VALUES ($1, NOW(), 'pending') 
            RETURNING id
        """)
        order_id = create_order.first(customer_id)['id']
        
        # Create savepoint before adding items
        main_savepoint = tx.savepoint("order_created")
        
        successful_items = []
        failed_items = []
        
        for item in items:
            # Create savepoint for each item
            item_sp = tx.savepoint(f"item_{item['product_id']}")
            
            try:
                # Check product availability
                check_stock = db.prepare("SELECT stock_quantity FROM products WHERE id = $1")
                stock = check_stock.first(item['product_id'])
                
                if not stock or stock['stock_quantity'] < item['quantity']:
                    raise ValueError(f"Insufficient stock for product {item['product_id']}")
                
                # Add order item
                add_item = db.prepare("""
                    INSERT INTO order_items (order_id, product_id, quantity, price)
                    VALUES ($1, $2, $3, $4)
                """)
                add_item(order_id, item['product_id'], item['quantity'], item['price'])
                
                # Update stock
                update_stock = db.prepare("""
                    UPDATE products 
                    SET stock_quantity = stock_quantity - $1 
                    WHERE id = $2
                """)
                update_stock(item['quantity'], item['product_id'])
                
                # Release savepoint - item successfully added
                tx.release_savepoint(item_sp)
                successful_items.append(item)
                
            except Exception as e:
                # Roll back this item only
                tx.rollback_to_savepoint(item_sp)
                failed_items.append((item, str(e)))
        
        # Check if any items were successfully added
        if not successful_items:
            # No items added, roll back entire order
            tx.rollback_to_savepoint(main_savepoint)
            raise ValueError("No items could be added to order")
        
        # Update order total
        total_query = db.prepare("""
            SELECT SUM(quantity * price) as total 
            FROM order_items 
            WHERE order_id = $1
        """)
        total = total_query.first(order_id)['total']
        
        update_total = db.prepare("UPDATE orders SET total = $1 WHERE id = $2")
        update_total(total, order_id)
        
        print(f"Order {order_id} created with {len(successful_items)} items")
        if failed_items:
            print(f"Failed to add {len(failed_items)} items:")
            for item, error in failed_items:
                print(f"  Product {item['product_id']}: {error}")
        
        return order_id

# Usage
customer_id = 123
items = [
    {'product_id': 1, 'quantity': 2, 'price': 29.99},
    {'product_id': 2, 'quantity': 1, 'price': 49.99},
    {'product_id': 3, 'quantity': 5, 'price': 9.99}  # May not have enough stock
]

try:
    order_id = create_order_with_items(db, customer_id, items)
    print(f"Order created successfully: {order_id}")
except Exception as e:
    print(f"Order creation failed: {e}")

Isolation Level Management

import postgresql

db = postgresql.open('pq://user:pass@localhost/mydb')

# Check current isolation level
current_level = db.query("SHOW transaction_isolation")[0][0]
print(f"Current isolation level: {current_level}")

# Set isolation level for specific operations
def transfer_money_serializable(db, from_account, to_account, amount):
    """Transfer money with SERIALIZABLE isolation to prevent phantom reads."""
    
    with db.xact():
        # Set isolation level for this transaction
        db.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
        
        # Get account balances
        get_balance = db.prepare("SELECT balance FROM accounts WHERE id = $1")
        from_balance = get_balance.first(from_account)['balance']
        to_balance = get_balance.first(to_account)['balance']
        
        if from_balance < amount:
            raise ValueError("Insufficient funds")
        
        # Update balances
        update_balance = db.prepare("UPDATE accounts SET balance = $1 WHERE id = $2")
        update_balance(from_balance - amount, from_account)
        update_balance(to_balance + amount, to_account)
        
        print(f"Transferred ${amount} from account {from_account} to {to_account}")

# Usage with different isolation levels
def demonstrate_isolation_levels(db):
    """Demonstrate different isolation levels."""
    
    isolation_levels = [
        'READ UNCOMMITTED',
        'READ COMMITTED', 
        'REPEATABLE READ',
        'SERIALIZABLE'
    ]
    
    for level in isolation_levels:
        print(f"\nTesting {level}:")
        
        with db.xact():
            db.execute(f"SET TRANSACTION ISOLATION LEVEL {level}")
            
            # Perform operations that might behave differently at different levels
            result = db.query("SELECT COUNT(*) FROM users WHERE active = true")
            print(f"  Active users: {result[0][0]}")
            
            # Small delay to see effects of concurrent transactions
            import time
            time.sleep(0.1)
            
            # Second query - may show different results depending on isolation level
            result2 = db.query("SELECT COUNT(*) FROM users WHERE active = true")
            print(f"  Active users (second read): {result2[0][0]}")

# Run demonstration
demonstrate_isolation_levels(db)

Transaction State Monitoring

import postgresql
import postgresql.exceptions as pg_exc

db = postgresql.open('pq://user:pass@localhost/mydb')

def monitored_transaction(db, operations):
    """Execute operations with transaction state monitoring."""
    
    print(f"Starting transaction (in_transaction: {db.in_transaction})")
    
    tx = db.begin()
    
    try:
        print(f"Transaction state: {tx.state}")
        
        for i, operation in enumerate(operations):
            print(f"Executing operation {i+1}: {operation.__name__}")
            
            try:
                operation(db)
                print(f"  Success - Transaction state: {tx.state}")
                
            except Exception as e:
                print(f"  Failed: {e}")
                print(f"  Transaction state: {tx.state}")
                
                # If transaction is in error state, we must rollback
                if tx.state == 'error':
                    print("  Transaction in error state, rolling back")
                    tx.rollback()
                    return False
        
        # All operations successful
        tx.commit()
        print(f"Transaction committed - state: {tx.state}")
        return True
        
    except Exception as e:
        print(f"Transaction failed: {e}")
        if tx.state != 'aborted':  # Only rollback if not already aborted
            tx.rollback()
        print(f"Transaction rolled back - state: {tx.state}")
        return False

# Define test operations
def operation1(db):
    db.execute("INSERT INTO test_table (name) VALUES ('test1')")

def operation2(db):
    db.execute("INSERT INTO test_table (name) VALUES ('test2')")

def operation3(db):
    # This might fail due to constraints
    db.execute("INSERT INTO test_table (id, name) VALUES (999999999, 'test3')")

# Test with operations
operations = [operation1, operation2, operation3]
success = monitored_transaction(db, operations)
print(f"Overall success: {success}")

Install with Tessl CLI

npx tessl i tessl/pypi-py-postgresql

docs

advanced-features.md

cluster-management.md

connection-management.md

dbapi-interface.md

exception-handling.md

index.md

query-execution.md

transaction-management.md

type-system.md

tile.json