EdgeDB Python driver providing both blocking IO and asyncio implementations for connecting to and interacting with EdgeDB databases.
—
Transaction handling with context managers, retry logic, isolation level control, and automatic rollback on errors.
Create and manage database transactions using Python context managers for automatic commit/rollback handling.
def transaction(
*,
options: Optional[TransactionOptions] = None,
retry_options: Optional[RetryOptions] = None
) -> ContextManager:
"""
Create a transaction context manager for synchronous operations.
Parameters:
- options: Transaction configuration options
- retry_options: Retry policy for transaction conflicts
Returns:
Context manager that automatically commits on success or rolls back on error
Usage:
with client.transaction() as tx:
tx.execute("INSERT User { name := 'Alice' }")
tx.execute("INSERT User { name := 'Bob' }")
# Automatically commits if no exceptions
"""
def transaction(
*,
options: Optional[TransactionOptions] = None,
retry_options: Optional[RetryOptions] = None
) -> AsyncContextManager:
"""
Create an async transaction context manager.
Parameters: Same as synchronous version
Returns:
Async context manager for transaction handling
Usage:
async with client.transaction() as tx:
await tx.execute("INSERT User { name := 'Alice' }")
await tx.execute("INSERT User { name := 'Bob' }")
# Automatically commits if no exceptions
"""Configuration options for controlling transaction behavior.
class TransactionOptions:
"""
Transaction configuration options.
Controls isolation level, read-only mode, and deferrable transactions.
"""
def __init__(
self,
isolation: IsolationLevel = IsolationLevel.Serializable,
readonly: bool = False,
deferrable: bool = False
):
"""
Create transaction options.
Parameters:
- isolation: Transaction isolation level
- readonly: Whether transaction is read-only
- deferrable: Whether transaction can be deferred
"""
@classmethod
def defaults(cls) -> TransactionOptions:
"""Get default transaction options."""
def start_transaction_query(self) -> str:
"""Generate START TRANSACTION query with options."""
class IsolationLevel:
"""
Transaction isolation levels.
EdgeDB currently supports only Serializable isolation.
"""
Serializable = "SERIALIZABLE"Configuration for automatic retry of transactions on conflicts.
class RetryOptions:
"""
Configuration for transaction retry logic.
Defines retry attempts and backoff strategy for handling
transaction conflicts and temporary failures.
"""
def __init__(
self,
attempts: int,
backoff: Callable[[int], float]
):
"""
Create retry options.
Parameters:
- attempts: Maximum number of retry attempts
- backoff: Function that takes attempt number and returns delay in seconds
"""
@classmethod
def defaults(cls) -> RetryOptions:
"""Get default retry options (3 attempts with exponential backoff)."""
def with_rule(
self,
condition: RetryCondition,
attempts: Optional[int] = None,
backoff: Optional[Callable[[int], float]] = None
) -> RetryOptions:
"""
Create retry options with specific rule.
Parameters:
- condition: Condition that triggers retry
- attempts: Number of retry attempts for this condition
- backoff: Backoff function for this condition
Returns:
New RetryOptions with added rule
"""
def get_rule_for_exception(self, exc: Exception) -> Optional[Any]:
"""Get retry rule for specific exception."""
class RetryCondition:
"""
Conditions that trigger transaction retry.
"""
TransactionConflict = "transaction_conflict"
NetworkError = "network_error"
def default_backoff(attempt: int) -> float:
"""
Default exponential backoff algorithm.
Parameters:
- attempt: Attempt number (0-based)
Returns:
Delay in seconds before next retry
"""Transaction lifecycle state management.
class TransactionState(Enum):
"""
Transaction lifecycle states.
"""
NEW = "new"
STARTED = "started"
COMMITTED = "committed"
ROLLEDBACK = "rolledback"
FAILED = "failed"
class BaseTransaction:
"""
Base transaction implementation.
Provides transaction lifecycle management and state tracking.
"""
def start(self) -> None:
"""Start the transaction."""
def commit(self) -> None:
"""Commit the transaction."""
def rollback(self) -> None:
"""Roll back the transaction."""
@property
def state(self) -> TransactionState:
"""Current transaction state."""import edgedb
client = edgedb.create_client()
# Synchronous transaction
with client.transaction() as tx:
# All operations in this block are part of the transaction
tx.execute("INSERT User { name := 'Alice', email := 'alice@example.com' }")
tx.execute("INSERT User { name := 'Bob', email := 'bob@example.com' }")
# Query within transaction sees uncommitted changes
users = tx.query("SELECT User { name }")
print(f"Users in transaction: {len(users)}")
# Transaction automatically commits when exiting the block
print("Transaction committed successfully")import asyncio
import edgedb
async def main():
client = edgedb.create_async_client()
async with client.transaction() as tx:
await tx.execute("INSERT User { name := 'Alice', email := 'alice@example.com' }")
await tx.execute("INSERT User { name := 'Bob', email := 'bob@example.com' }")
users = await tx.query("SELECT User { name }")
print(f"Users in transaction: {len(users)}")
print("Async transaction committed successfully")
await client.aclose()
asyncio.run(main())import edgedb
from edgedb import TransactionOptions, IsolationLevel
client = edgedb.create_client()
# Read-only transaction
options = TransactionOptions(readonly=True)
with client.transaction(options=options) as tx:
# Can only perform read operations
users = tx.query("SELECT User { name, email }")
# tx.execute("INSERT User { name := 'Charlie' }") # Would raise error
# Deferrable transaction
options = TransactionOptions(deferrable=True)
with client.transaction(options=options) as tx:
# Transaction can be deferred by the database
report = tx.query("SELECT generate_monthly_report()")import edgedb
from edgedb import RetryOptions, RetryCondition, default_backoff
client = edgedb.create_client()
# Custom retry options
retry_options = RetryOptions(
attempts=5,
backoff=default_backoff
)
with client.transaction(retry_options=retry_options) as tx:
# This transaction will be retried up to 5 times on conflicts
tx.execute("""
UPDATE User
FILTER .email = 'alice@example.com'
SET { login_count := .login_count + 1 }
""")
# Retry with specific conditions
retry_options = (
RetryOptions.defaults()
.with_rule(RetryCondition.TransactionConflict, attempts=10)
.with_rule(RetryCondition.NetworkError, attempts=3)
)
with client.transaction(retry_options=retry_options) as tx:
# Different retry strategies for different error types
tx.execute("INSERT User { name := 'Dave', email := 'dave@example.com' }")import edgedb
client = edgedb.create_client()
try:
with client.transaction() as tx:
tx.execute("INSERT User { name := 'Alice', email := 'alice@example.com' }")
tx.execute("INSERT User { name := 'Alice', email := 'alice@example.com' }") # Duplicate
# Transaction is automatically rolled back on error
except edgedb.ConstraintViolationError:
print("Constraint violation - transaction rolled back")
except edgedb.TransactionError as e:
print(f"Transaction error: {e}")
print("Continuing after transaction failure")import edgedb
client = edgedb.create_client()
# Get transaction object without context manager
tx = client.transaction()
try:
tx.start()
tx.execute("INSERT User { name := 'Alice' }")
tx.execute("INSERT User { name := 'Bob' }")
tx.commit()
print("Transaction committed manually")
except Exception as e:
tx.rollback()
print(f"Transaction rolled back: {e}")
raiseimport edgedb
client = edgedb.create_client()
def create_user_with_profile(tx, name, email, bio):
"""Function that takes a transaction and performs multiple operations."""
user_id = tx.query_single(
"INSERT User { name := $name, email := $email } RETURNING .id",
name=name, email=email
)
tx.execute(
"INSERT Profile { user := $user_id, bio := $bio }",
user_id=user_id, bio=bio
)
return user_id
# Use function within transaction
with client.transaction() as tx:
user1_id = create_user_with_profile(
tx, "Alice", "alice@example.com", "Software developer"
)
user2_id = create_user_with_profile(
tx, "Bob", "bob@example.com", "Data scientist"
)
# Both users and profiles created atomically
print(f"Created users: {user1_id}, {user2_id}")import edgedb
client = edgedb.create_client()
# Access transaction state
tx = client.transaction()
print(f"Initial state: {tx.state}") # TransactionState.NEW
tx.start()
print(f"After start: {tx.state}") # TransactionState.STARTED
try:
tx.execute("INSERT User { name := 'Alice' }")
tx.commit()
print(f"After commit: {tx.state}") # TransactionState.COMMITTED
except Exception:
tx.rollback()
print(f"After rollback: {tx.state}") # TransactionState.ROLLEDBACKimport edgedb
from edgedb import RetryOptions, TransactionOptions
client = edgedb.create_client()
def transfer_credits(from_user_id, to_user_id, amount):
"""Transfer credits between users with retry logic."""
retry_options = RetryOptions(
attempts=10,
backoff=lambda attempt: min(2 ** attempt * 0.1, 1.0)
)
with client.transaction(retry_options=retry_options) as tx:
# Check sender balance
sender = tx.query_required_single(
"SELECT User { credits } FILTER .id = $id",
id=from_user_id
)
if sender.credits < amount:
raise ValueError("Insufficient credits")
# Perform transfer
tx.execute(
"UPDATE User FILTER .id = $id SET { credits := .credits - $amount }",
id=from_user_id, amount=amount
)
tx.execute(
"UPDATE User FILTER .id = $id SET { credits := .credits + $amount }",
id=to_user_id, amount=amount
)
# Log transaction
tx.execute("""
INSERT Transaction {
from_user := $from_user,
to_user := $to_user,
amount := $amount,
timestamp := datetime_current()
}
""", from_user=from_user_id, to_user=to_user_id, amount=amount)
# Use the transfer function
try:
transfer_credits(
"123e4567-e89b-12d3-a456-426614174000",
"987fcdeb-51d2-43a8-b456-426614174000",
100
)
print("Transfer completed successfully")
except ValueError as e:
print(f"Transfer failed: {e}")
except edgedb.TransactionConflictError:
print("Transfer failed due to conflict, but was retried automatically")Install with Tessl CLI
npx tessl i tessl/pypi-edgedb