PostgreSQL driver and tools library providing PG-API and DB-API 2.0 interfaces for Python.
—
Transaction control with savepoints, context managers, and isolation level management for reliable database operations.
High-level transaction interface using Python context managers for automatic commit/rollback.
def xact():
"""
Create a transaction context manager for automatic transaction control.
Returns:
Transaction: Context manager that commits on success, rolls back on exceptions
Usage:
with db.xact():
# All operations in this block are part of one transaction
# Automatic commit on success, rollback on exception
"""Low-level transaction control interface for fine-grained transaction management.
class Transaction:
"""
Transaction control interface providing commit, rollback, and savepoint operations.
"""
def start():
"""
Start a new transaction.
Raises:
TransactionError: If transaction cannot be started
"""
def commit():
"""
Commit the current transaction.
Raises:
TransactionError: If commit fails
"""
def rollback():
"""
Roll back the current transaction.
Raises:
TransactionError: If rollback fails
"""
def savepoint(name=None):
"""
Create a savepoint within the current transaction.
Parameters:
- name (str, optional): Savepoint name (auto-generated if not provided)
Returns:
str: Savepoint name
Raises:
TransactionError: If savepoint creation fails
"""
def rollback_to_savepoint(name):
"""
Roll back to a specific savepoint.
Parameters:
- name (str): Savepoint name
Raises:
TransactionError: If rollback to savepoint fails
"""
def release_savepoint(name):
"""
Release a savepoint (remove it without rolling back).
Parameters:
- name (str): Savepoint name
Raises:
TransactionError: If savepoint release fails
"""
@property
def state():
"""
Get current transaction state.
Returns:
str: Transaction state ('idle', 'active', 'error', 'aborted')
"""Transaction methods available on the main database connection interface.
class Database:
"""Database interface with transaction management methods."""
def xact():
"""
Create transaction context manager.
Returns:
Transaction: Transaction context manager
"""
def begin():
"""
Begin a new transaction explicitly.
Returns:
Transaction: Transaction object for manual control
"""
def commit():
"""
Commit current transaction (if any).
"""
def rollback():
"""
Roll back current transaction (if any).
"""
@property
def in_transaction():
"""
Check if currently in a transaction.
Returns:
bool: True if in transaction, False otherwise
"""Interface for controlling transaction isolation levels.
def set_isolation_level(level):
"""
Set transaction isolation level.
Parameters:
- level (str): Isolation level ('READ UNCOMMITTED', 'READ COMMITTED',
'REPEATABLE READ', 'SERIALIZABLE')
Raises:
ProgrammingError: If isolation level is invalid
"""
def get_isolation_level():
"""
Get current transaction isolation level.
Returns:
str: Current isolation level
"""import postgresql
import postgresql.exceptions as pg_exc
db = postgresql.open('pq://user:pass@localhost/mydb')
# Context manager approach (recommended)
try:
with db.xact():
# All operations in this block are part of one transaction
insert_user = db.prepare("INSERT INTO users (name, email) VALUES ($1, $2)")
insert_user("John Doe", "john@example.com")
insert_profile = db.prepare("INSERT INTO profiles (user_id, bio) VALUES (currval('users_id_seq'), $1)")
insert_profile("Software developer")
# Transaction automatically commits here if no exceptions
print("User and profile created successfully")
except pg_exc.ICVError as e:
print(f"Integrity constraint violation: {e}")
# Transaction automatically rolled back
except pg_exc.Error as e:
print(f"Database error: {e}")
# Transaction automatically rolled backimport postgresql
import postgresql.exceptions as pg_exc
db = postgresql.open('pq://user:pass@localhost/mydb')
# Manual transaction control
tx = db.begin()
try:
# Execute operations
db.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
db.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
# Check balances are valid
check_balance = db.prepare("SELECT balance FROM accounts WHERE id = $1")
balance1 = check_balance.first(1)['balance']
balance2 = check_balance.first(2)['balance']
if balance1 < 0:
raise ValueError("Insufficient funds in account 1")
# Manually commit
tx.commit()
print("Transfer completed successfully")
except Exception as e:
# Manually rollback
tx.rollback()
print(f"Transfer failed: {e}")import postgresql
import postgresql.exceptions as pg_exc
db = postgresql.open('pq://user:pass@localhost/mydb')
with db.xact() as tx:
# Insert initial user
insert_user = db.prepare("INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id")
user_id = insert_user.first("John Doe", "john@example.com")['id']
# Create savepoint before risky operations
sp1 = tx.savepoint("before_profile")
try:
# Try to insert profile with potentially problematic data
insert_profile = db.prepare("INSERT INTO profiles (user_id, bio, avatar_url) VALUES ($1, $2, $3)")
insert_profile(user_id, "Bio text", "http://invalid-image-url")
# If we get here, release the savepoint
tx.release_savepoint(sp1)
print("Profile created successfully")
except pg_exc.Error as e:
# Roll back to savepoint (keeps user, removes profile attempt)
tx.rollback_to_savepoint(sp1)
print(f"Profile creation failed: {e}")
# Insert minimal profile instead
insert_minimal_profile = db.prepare("INSERT INTO profiles (user_id, bio) VALUES ($1, $2)")
insert_minimal_profile(user_id, "Default bio")
print("Created minimal profile instead")
# Add additional data with another savepoint
sp2 = tx.savepoint("before_preferences")
try:
insert_prefs = db.prepare("INSERT INTO user_preferences (user_id, theme, notifications) VALUES ($1, $2, $3)")
insert_prefs(user_id, "dark", True)
tx.release_savepoint(sp2)
print("Preferences created successfully")
except pg_exc.Error as e:
tx.rollback_to_savepoint(sp2)
print(f"Preferences creation failed: {e}")
# Transaction commits here with user, profile, and possibly preferences
print("User creation process completed")import postgresql
import postgresql.exceptions as pg_exc
db = postgresql.open('pq://user:pass@localhost/mydb')
def create_order_with_items(db, customer_id, items):
"""Create order with multiple items using nested savepoints."""
with db.xact() as tx:
# Create order
create_order = db.prepare("""
INSERT INTO orders (customer_id, order_date, status)
VALUES ($1, NOW(), 'pending')
RETURNING id
""")
order_id = create_order.first(customer_id)['id']
# Create savepoint before adding items
main_savepoint = tx.savepoint("order_created")
successful_items = []
failed_items = []
for item in items:
# Create savepoint for each item
item_sp = tx.savepoint(f"item_{item['product_id']}")
try:
# Check product availability
check_stock = db.prepare("SELECT stock_quantity FROM products WHERE id = $1")
stock = check_stock.first(item['product_id'])
if not stock or stock['stock_quantity'] < item['quantity']:
raise ValueError(f"Insufficient stock for product {item['product_id']}")
# Add order item
add_item = db.prepare("""
INSERT INTO order_items (order_id, product_id, quantity, price)
VALUES ($1, $2, $3, $4)
""")
add_item(order_id, item['product_id'], item['quantity'], item['price'])
# Update stock
update_stock = db.prepare("""
UPDATE products
SET stock_quantity = stock_quantity - $1
WHERE id = $2
""")
update_stock(item['quantity'], item['product_id'])
# Release savepoint - item successfully added
tx.release_savepoint(item_sp)
successful_items.append(item)
except Exception as e:
# Roll back this item only
tx.rollback_to_savepoint(item_sp)
failed_items.append((item, str(e)))
# Check if any items were successfully added
if not successful_items:
# No items added, roll back entire order
tx.rollback_to_savepoint(main_savepoint)
raise ValueError("No items could be added to order")
# Update order total
total_query = db.prepare("""
SELECT SUM(quantity * price) as total
FROM order_items
WHERE order_id = $1
""")
total = total_query.first(order_id)['total']
update_total = db.prepare("UPDATE orders SET total = $1 WHERE id = $2")
update_total(total, order_id)
print(f"Order {order_id} created with {len(successful_items)} items")
if failed_items:
print(f"Failed to add {len(failed_items)} items:")
for item, error in failed_items:
print(f" Product {item['product_id']}: {error}")
return order_id
# Usage
customer_id = 123
items = [
{'product_id': 1, 'quantity': 2, 'price': 29.99},
{'product_id': 2, 'quantity': 1, 'price': 49.99},
{'product_id': 3, 'quantity': 5, 'price': 9.99} # May not have enough stock
]
try:
order_id = create_order_with_items(db, customer_id, items)
print(f"Order created successfully: {order_id}")
except Exception as e:
print(f"Order creation failed: {e}")import postgresql
db = postgresql.open('pq://user:pass@localhost/mydb')
# Check current isolation level
current_level = db.query("SHOW transaction_isolation")[0][0]
print(f"Current isolation level: {current_level}")
# Set isolation level for specific operations
def transfer_money_serializable(db, from_account, to_account, amount):
"""Transfer money with SERIALIZABLE isolation to prevent phantom reads."""
with db.xact():
# Set isolation level for this transaction
db.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
# Get account balances
get_balance = db.prepare("SELECT balance FROM accounts WHERE id = $1")
from_balance = get_balance.first(from_account)['balance']
to_balance = get_balance.first(to_account)['balance']
if from_balance < amount:
raise ValueError("Insufficient funds")
# Update balances
update_balance = db.prepare("UPDATE accounts SET balance = $1 WHERE id = $2")
update_balance(from_balance - amount, from_account)
update_balance(to_balance + amount, to_account)
print(f"Transferred ${amount} from account {from_account} to {to_account}")
# Usage with different isolation levels
def demonstrate_isolation_levels(db):
"""Demonstrate different isolation levels."""
isolation_levels = [
'READ UNCOMMITTED',
'READ COMMITTED',
'REPEATABLE READ',
'SERIALIZABLE'
]
for level in isolation_levels:
print(f"\nTesting {level}:")
with db.xact():
db.execute(f"SET TRANSACTION ISOLATION LEVEL {level}")
# Perform operations that might behave differently at different levels
result = db.query("SELECT COUNT(*) FROM users WHERE active = true")
print(f" Active users: {result[0][0]}")
# Small delay to see effects of concurrent transactions
import time
time.sleep(0.1)
# Second query - may show different results depending on isolation level
result2 = db.query("SELECT COUNT(*) FROM users WHERE active = true")
print(f" Active users (second read): {result2[0][0]}")
# Run demonstration
demonstrate_isolation_levels(db)import postgresql
import postgresql.exceptions as pg_exc
db = postgresql.open('pq://user:pass@localhost/mydb')
def monitored_transaction(db, operations):
"""Execute operations with transaction state monitoring."""
print(f"Starting transaction (in_transaction: {db.in_transaction})")
tx = db.begin()
try:
print(f"Transaction state: {tx.state}")
for i, operation in enumerate(operations):
print(f"Executing operation {i+1}: {operation.__name__}")
try:
operation(db)
print(f" Success - Transaction state: {tx.state}")
except Exception as e:
print(f" Failed: {e}")
print(f" Transaction state: {tx.state}")
# If transaction is in error state, we must rollback
if tx.state == 'error':
print(" Transaction in error state, rolling back")
tx.rollback()
return False
# All operations successful
tx.commit()
print(f"Transaction committed - state: {tx.state}")
return True
except Exception as e:
print(f"Transaction failed: {e}")
if tx.state != 'aborted': # Only rollback if not already aborted
tx.rollback()
print(f"Transaction rolled back - state: {tx.state}")
return False
# Define test operations
def operation1(db):
db.execute("INSERT INTO test_table (name) VALUES ('test1')")
def operation2(db):
db.execute("INSERT INTO test_table (name) VALUES ('test2')")
def operation3(db):
# This might fail due to constraints
db.execute("INSERT INTO test_table (id, name) VALUES (999999999, 'test3')")
# Test with operations
operations = [operation1, operation2, operation3]
success = monitored_transaction(db, operations)
print(f"Overall success: {success}")Install with Tessl CLI
npx tessl i tessl/pypi-py-postgresql