Python Driver for ArangoDB, a scalable multi-model database that natively supports documents, graphs, and search.
—
Database transaction support providing ACID guarantees across multiple collections. Supports both JavaScript-based server-side transactions and managed transaction contexts with automatic commit/rollback.
Create managed transaction contexts that provide transactional access to collections with automatic resource cleanup.
class StandardDatabase:
def begin_transaction(self, read=None, write=None, exclusive=None,
sync=None, allow_implicit=None, lock_timeout=None,
max_transaction_size=None) -> Result[TransactionDatabase]:
"""
Begin a managed transaction.
Parameters:
- read: str or list, collections for read access
- write: str or list, collections for write access
- exclusive: str or list, collections for exclusive access
- sync: bool, wait for sync to disk on commit
- allow_implicit: bool, allow implicit collection access
- lock_timeout: int, lock timeout in seconds
- max_transaction_size: int, maximum transaction size in bytes
Returns:
Result[TransactionDatabase]: Transaction database context
"""Execute server-side JavaScript transactions for atomic multi-collection operations with full transaction guarantees.
def execute_transaction(self, command: str, params=None, read=None, write=None,
sync=None, timeout=None, max_size=None, allow_implicit=None,
intermediate_commit_count=None, intermediate_commit_size=None,
allow_dirty_read: bool = False) -> Result:
"""
Execute JavaScript transaction.
Parameters:
- command: str, JavaScript code to execute
- params: dict, parameters passed to JavaScript function
- read: Sequence[str], collections for read access
- write: Sequence[str], collections for write access
- sync: bool, wait for sync to disk
- timeout: Number, timeout for waiting on collection locks
- max_size: int, maximum transaction size in bytes
- allow_implicit: bool, allow implicit collection access
- intermediate_commit_count: int, intermediate commit count
- intermediate_commit_size: int, intermediate commit size in bytes
- allow_dirty_read: bool, allow reads from followers in cluster
Returns:
Result: Transaction execution result
"""
def list_transactions(self) -> Result[List[Json]]:
"""
Return the list of running stream transactions.
Returns:
Result[List[Json]]: List of transactions with id and state fields
"""The TransactionDatabase provides the same interface as StandardDatabase but within a transaction context.
class TransactionDatabase:
"""
Database interface within transaction context.
Inherits all methods from StandardDatabase but operations
are executed within the transaction scope.
"""
def commit(self) -> Result[bool]:
"""
Commit the transaction.
Returns:
Result[bool]: True on successful commit
"""
def abort(self) -> Result[bool]:
"""
Abort/rollback the transaction.
Returns:
Result[bool]: True on successful abort
"""
@property
def transaction_id(self) -> str:
"""Get transaction ID."""
def status(self) -> Result[Json]:
"""
Get transaction status.
Returns:
Result[Json]: Transaction status information
"""from arango import ArangoClient, TransactionCommitError
client = ArangoClient()
db = client.db('banking', username='root', password='password')
# Simple transaction context
try:
# Begin transaction with read/write access
txn_db = db.begin_transaction(
read=['accounts'],
write=['accounts', 'transactions']
)
# Work within transaction
accounts = txn_db.collection('accounts')
transactions = txn_db.collection('transactions')
# Check account balance
sender = accounts.get('alice')
if sender['balance'] < 100:
txn_db.abort()
print("Insufficient funds")
else:
# Update balances
accounts.update({'_key': 'alice', 'balance': sender['balance'] - 100})
accounts.update({'_key': 'bob', 'balance':
accounts.get('bob')['balance'] + 100})
# Log transaction
transactions.insert({
'from': 'alice',
'to': 'bob',
'amount': 100,
'timestamp': '2023-01-15T10:30:00Z'
})
# Commit all changes
txn_db.commit()
print("Transfer completed")
except TransactionCommitError as e:
print(f"Transaction failed: {e}")# Using transaction as context manager
try:
with db.begin_transaction(write=['inventory', 'orders']) as txn_db:
inventory = txn_db.collection('inventory')
orders = txn_db.collection('orders')
# Check inventory
item = inventory.get('laptop_001')
if item['quantity'] < 5:
raise ValueError("Insufficient inventory")
# Update inventory
inventory.update({
'_key': 'laptop_001',
'quantity': item['quantity'] - 5
})
# Create order
order = orders.insert({
'customer': 'customer_123',
'items': [{'sku': 'laptop_001', 'quantity': 5}],
'total': 2500.00,
'status': 'confirmed'
})
print(f"Order {order['_key']} created successfully")
# Transaction commits automatically on context exit
except Exception as e:
print(f"Order failed: {e}")
# Transaction aborts automatically on exception# Complex server-side transaction
transfer_script = """
function(params) {
var db = require('@arangodb').db;
var accounts = db.accounts;
var transactions = db.transactions;
// Get account documents
var sender = accounts.document(params.from_account);
var receiver = accounts.document(params.to_account);
// Validate transfer
if (sender.balance < params.amount) {
throw new Error('Insufficient funds');
}
if (sender.status !== 'active' || receiver.status !== 'active') {
throw new Error('Account not active');
}
// Calculate new balances
var new_sender_balance = sender.balance - params.amount;
var new_receiver_balance = receiver.balance + params.amount;
// Update accounts
accounts.update(sender._key, {
balance: new_sender_balance,
last_transaction: params.timestamp
});
accounts.update(receiver._key, {
balance: new_receiver_balance,
last_transaction: params.timestamp
});
// Log transaction
var txn_record = transactions.insert({
from_account: params.from_account,
to_account: params.to_account,
amount: params.amount,
timestamp: params.timestamp,
type: 'transfer',
status: 'completed'
});
return {
transaction_id: txn_record._key,
sender_balance: new_sender_balance,
receiver_balance: new_receiver_balance
};
}
"""
# Execute transaction
result = db.transaction(
command=transfer_script,
params={
'from_account': 'alice',
'to_account': 'bob',
'amount': 250.00,
'timestamp': '2023-01-15T14:22:00Z'
},
write=['accounts', 'transactions']
)
print(f"Transfer completed: {result['transaction_id']}")
print(f"Alice balance: ${result['sender_balance']}")
print(f"Bob balance: ${result['receiver_balance']}")# Process multiple orders atomically
def process_orders_batch(db, order_batch):
try:
with db.begin_transaction(
write=['orders', 'inventory', 'customers'],
lock_timeout=30
) as txn_db:
orders_col = txn_db.collection('orders')
inventory_col = txn_db.collection('inventory')
customers_col = txn_db.collection('customers')
processed_orders = []
for order_data in order_batch:
# Validate customer
customer = customers_col.get(order_data['customer_id'])
if not customer or customer['status'] != 'active':
raise ValueError(f"Invalid customer: {order_data['customer_id']}")
# Check and update inventory
total_cost = 0
for item in order_data['items']:
inventory_item = inventory_col.get(item['sku'])
if inventory_item['quantity'] < item['quantity']:
raise ValueError(f"Insufficient inventory for {item['sku']}")
# Reserve inventory
inventory_col.update({
'_key': item['sku'],
'quantity': inventory_item['quantity'] - item['quantity']
})
total_cost += inventory_item['price'] * item['quantity']
# Create order
order = orders_col.insert({
'customer_id': order_data['customer_id'],
'items': order_data['items'],
'total_cost': total_cost,
'status': 'confirmed',
'created_at': order_data.get('timestamp')
})
processed_orders.append(order['_key'])
print(f"Successfully processed {len(processed_orders)} orders")
return processed_orders
except Exception as e:
print(f"Batch processing failed: {e}")
return []
# Process batch
batch = [
{
'customer_id': 'cust_001',
'items': [{'sku': 'item_001', 'quantity': 2}],
'timestamp': '2023-01-15T15:00:00Z'
},
{
'customer_id': 'cust_002',
'items': [{'sku': 'item_002', 'quantity': 1}],
'timestamp': '2023-01-15T15:01:00Z'
}
]
processed = process_orders_batch(db, batch)# Long-running transaction with monitoring
def monitored_data_migration(db):
try:
txn_db = db.begin_transaction(
read=['legacy_data'],
write=['new_schema'],
max_transaction_size=100*1024*1024, # 100MB limit
lock_timeout=300 # 5 minute timeout
)
legacy = txn_db.collection('legacy_data')
new_schema = txn_db.collection('new_schema')
# Get transaction info
print(f"Transaction ID: {txn_db.transaction_id}")
# Process data in batches
batch_size = 1000
processed = 0
cursor = txn_db.aql.execute(
"FOR doc IN legacy_data RETURN doc",
batch_size=batch_size
)
batch = []
for doc in cursor:
# Transform document
transformed = {
'id': doc['_key'],
'data': doc['payload'],
'created': doc['timestamp'],
'migrated_at': '2023-01-15T16:00:00Z'
}
batch.append(transformed)
if len(batch) >= batch_size:
# Insert batch
new_schema.insert_many(batch)
processed += len(batch)
batch = []
# Check transaction status
status = txn_db.status()
print(f"Processed: {processed}, Status: {status['status']}")
# Insert remaining documents
if batch:
new_schema.insert_many(batch)
processed += len(batch)
# Commit transaction
txn_db.commit()
print(f"Migration completed: {processed} documents")
except Exception as e:
print(f"Migration failed: {e}")
if 'txn_db' in locals():
txn_db.abort()# Advanced transaction configuration
high_performance_txn = db.begin_transaction(
write=['high_volume_collection'],
sync=False, # Don't wait for disk sync
allow_implicit=False, # Strict collection access
lock_timeout=60, # 1 minute lock timeout
max_transaction_size=50*1024*1024 # 50MB size limit
)
# Exclusive access transaction
exclusive_txn = db.begin_transaction(
exclusive=['critical_data'], # Exclusive lock
sync=True, # Ensure durability
lock_timeout=120 # 2 minute timeout
)
# Read-only transaction for consistent snapshots
readonly_txn = db.begin_transaction(
read=['analytics_data', 'reference_tables']
)
# Generate report with consistent data view
with readonly_txn as txn_db:
analytics = txn_db.collection('analytics_data')
reference = txn_db.collection('reference_tables')
# All reads see the same consistent snapshot
report_data = txn_db.aql.execute("""
FOR analytics_record IN analytics_data
FOR ref IN reference_tables
FILTER analytics_record.category == ref.category
RETURN {
record: analytics_record,
category_info: ref
}
""")
# Process report...from arango import (
TransactionInitError,
TransactionCommitError,
TransactionAbortError,
TransactionStatusError
)
def safe_transaction_operation(db, operation_data):
txn_db = None
try:
# Begin transaction
txn_db = db.begin_transaction(
write=['orders', 'inventory']
)
# Perform operations
result = perform_business_logic(txn_db, operation_data)
# Commit
txn_db.commit()
return result
except TransactionInitError as e:
print(f"Failed to start transaction: {e}")
return None
except TransactionCommitError as e:
print(f"Failed to commit transaction: {e}")
if txn_db:
try:
txn_db.abort()
except TransactionAbortError:
print("Failed to abort transaction")
return None
except Exception as e:
print(f"Operation failed: {e}")
if txn_db:
try:
txn_db.abort()
print("Transaction rolled back successfully")
except TransactionAbortError as abort_error:
print(f"Failed to abort transaction: {abort_error}")
return None
def perform_business_logic(txn_db, data):
# Your business logic here
orders = txn_db.collection('orders')
inventory = txn_db.collection('inventory')
# Simulate complex operations
order = orders.insert(data['order'])
inventory.update(data['inventory_update'])
return {'order_id': order['_key']}Install with Tessl CLI
npx tessl i tessl/pypi-python-arango