CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-python-arango

Python Driver for ArangoDB, a scalable multi-model database that natively supports documents, graphs, and search.

Pending
Overview
Eval results
Files

transactions.mddocs/

Transaction Management

Database transaction support providing ACID guarantees across multiple collections. Supports both JavaScript-based server-side transactions and managed transaction contexts with automatic commit/rollback.

Capabilities

Transaction Context

Create managed transaction contexts that provide transactional access to collections with automatic resource cleanup.

class StandardDatabase:
    def begin_transaction(self, read=None, write=None, exclusive=None,
                          sync=None, allow_implicit=None, lock_timeout=None,
                          max_transaction_size=None) -> Result[TransactionDatabase]:
        """
        Begin a managed transaction.
        
        Parameters:
        - read: str or list, collections for read access
        - write: str or list, collections for write access  
        - exclusive: str or list, collections for exclusive access
        - sync: bool, wait for sync to disk on commit
        - allow_implicit: bool, allow implicit collection access
        - lock_timeout: int, lock timeout in seconds
        - max_transaction_size: int, maximum transaction size in bytes
        
        Returns:
        Result[TransactionDatabase]: Transaction database context
        """

JavaScript Transactions

Execute server-side JavaScript transactions for atomic multi-collection operations with full transaction guarantees.

def execute_transaction(self, command: str, params=None, read=None, write=None,
                        sync=None, timeout=None, max_size=None, allow_implicit=None,
                        intermediate_commit_count=None, intermediate_commit_size=None,
                        allow_dirty_read: bool = False) -> Result:
    """
    Execute JavaScript transaction.
    
    Parameters:
    - command: str, JavaScript code to execute
    - params: dict, parameters passed to JavaScript function
    - read: Sequence[str], collections for read access
    - write: Sequence[str], collections for write access
    - sync: bool, wait for sync to disk
    - timeout: Number, timeout for waiting on collection locks
    - max_size: int, maximum transaction size in bytes
    - allow_implicit: bool, allow implicit collection access
    - intermediate_commit_count: int, intermediate commit count
    - intermediate_commit_size: int, intermediate commit size in bytes
    - allow_dirty_read: bool, allow reads from followers in cluster
    
    Returns:
    Result: Transaction execution result
    """

def list_transactions(self) -> Result[List[Json]]:
    """
    Return the list of running stream transactions.
    
    Returns:
    Result[List[Json]]: List of transactions with id and state fields
    """

Transaction Database Context

The TransactionDatabase provides the same interface as StandardDatabase but within a transaction context.

class TransactionDatabase:
    """
    Database interface within transaction context.
    Inherits all methods from StandardDatabase but operations
    are executed within the transaction scope.
    """
    
    def commit(self) -> Result[bool]:
        """
        Commit the transaction.
        
        Returns:
        Result[bool]: True on successful commit
        """
        
    def abort(self) -> Result[bool]:
        """
        Abort/rollback the transaction.
        
        Returns:
        Result[bool]: True on successful abort
        """
        
    @property
    def transaction_id(self) -> str:
        """Get transaction ID."""
        
    def status(self) -> Result[Json]:
        """
        Get transaction status.
        
        Returns:
        Result[Json]: Transaction status information
        """

Usage Examples

Managed Transactions

from arango import ArangoClient, TransactionCommitError

client = ArangoClient()
db = client.db('banking', username='root', password='password')

# Simple transaction context
try:
    # Begin transaction with read/write access
    txn_db = db.begin_transaction(
        read=['accounts'],
        write=['accounts', 'transactions']
    )
    
    # Work within transaction
    accounts = txn_db.collection('accounts')
    transactions = txn_db.collection('transactions')
    
    # Check account balance
    sender = accounts.get('alice')
    if sender['balance'] < 100:
        txn_db.abort()
        print("Insufficient funds")
    else:
        # Update balances
        accounts.update({'_key': 'alice', 'balance': sender['balance'] - 100})
        accounts.update({'_key': 'bob', 'balance': 
                        accounts.get('bob')['balance'] + 100})
        
        # Log transaction
        transactions.insert({
            'from': 'alice',
            'to': 'bob', 
            'amount': 100,
            'timestamp': '2023-01-15T10:30:00Z'
        })
        
        # Commit all changes
        txn_db.commit()
        print("Transfer completed")
        
except TransactionCommitError as e:
    print(f"Transaction failed: {e}")

Context Manager Pattern

# Using transaction as context manager
try:
    with db.begin_transaction(write=['inventory', 'orders']) as txn_db:
        inventory = txn_db.collection('inventory')  
        orders = txn_db.collection('orders')
        
        # Check inventory
        item = inventory.get('laptop_001')
        if item['quantity'] < 5:
            raise ValueError("Insufficient inventory")
            
        # Update inventory
        inventory.update({
            '_key': 'laptop_001',
            'quantity': item['quantity'] - 5
        })
        
        # Create order
        order = orders.insert({
            'customer': 'customer_123',
            'items': [{'sku': 'laptop_001', 'quantity': 5}],
            'total': 2500.00,
            'status': 'confirmed'
        })
        
        print(f"Order {order['_key']} created successfully")
        # Transaction commits automatically on context exit
        
except Exception as e:
    print(f"Order failed: {e}")
    # Transaction aborts automatically on exception

JavaScript Transactions

# Complex server-side transaction
transfer_script = """
function(params) {
    var db = require('@arangodb').db;
    var accounts = db.accounts;
    var transactions = db.transactions;
    
    // Get account documents
    var sender = accounts.document(params.from_account);
    var receiver = accounts.document(params.to_account);
    
    // Validate transfer
    if (sender.balance < params.amount) {
        throw new Error('Insufficient funds');
    }
    
    if (sender.status !== 'active' || receiver.status !== 'active') {
        throw new Error('Account not active');
    }
    
    // Calculate new balances
    var new_sender_balance = sender.balance - params.amount;
    var new_receiver_balance = receiver.balance + params.amount;
    
    // Update accounts
    accounts.update(sender._key, {
        balance: new_sender_balance,
        last_transaction: params.timestamp
    });
    
    accounts.update(receiver._key, {  
        balance: new_receiver_balance,
        last_transaction: params.timestamp
    });
    
    // Log transaction
    var txn_record = transactions.insert({
        from_account: params.from_account,
        to_account: params.to_account,
        amount: params.amount,
        timestamp: params.timestamp,
        type: 'transfer',
        status: 'completed'
    });
    
    return {
        transaction_id: txn_record._key,
        sender_balance: new_sender_balance,
        receiver_balance: new_receiver_balance
    };
}
"""

# Execute transaction
result = db.transaction(
    command=transfer_script,
    params={
        'from_account': 'alice',
        'to_account': 'bob',
        'amount': 250.00,
        'timestamp': '2023-01-15T14:22:00Z'
    },
    write=['accounts', 'transactions']
)

print(f"Transfer completed: {result['transaction_id']}")
print(f"Alice balance: ${result['sender_balance']}")
print(f"Bob balance: ${result['receiver_balance']}")

Batch Operations in Transactions

# Process multiple orders atomically
def process_orders_batch(db, order_batch):
    try:
        with db.begin_transaction(
            write=['orders', 'inventory', 'customers'],
            lock_timeout=30
        ) as txn_db:
            
            orders_col = txn_db.collection('orders')
            inventory_col = txn_db.collection('inventory')  
            customers_col = txn_db.collection('customers')
            
            processed_orders = []
            
            for order_data in order_batch:
                # Validate customer
                customer = customers_col.get(order_data['customer_id'])
                if not customer or customer['status'] != 'active':
                    raise ValueError(f"Invalid customer: {order_data['customer_id']}")
                
                # Check and update inventory
                total_cost = 0
                for item in order_data['items']:
                    inventory_item = inventory_col.get(item['sku'])
                    if inventory_item['quantity'] < item['quantity']:
                        raise ValueError(f"Insufficient inventory for {item['sku']}")
                    
                    # Reserve inventory
                    inventory_col.update({
                        '_key': item['sku'],
                        'quantity': inventory_item['quantity'] - item['quantity']
                    })
                    
                    total_cost += inventory_item['price'] * item['quantity']
                
                # Create order
                order = orders_col.insert({
                    'customer_id': order_data['customer_id'],
                    'items': order_data['items'],
                    'total_cost': total_cost,
                    'status': 'confirmed',
                    'created_at': order_data.get('timestamp')
                })
                
                processed_orders.append(order['_key'])
            
            print(f"Successfully processed {len(processed_orders)} orders")
            return processed_orders
            
    except Exception as e:
        print(f"Batch processing failed: {e}")
        return []

# Process batch
batch = [
    {
        'customer_id': 'cust_001',
        'items': [{'sku': 'item_001', 'quantity': 2}],
        'timestamp': '2023-01-15T15:00:00Z'
    },
    {
        'customer_id': 'cust_002', 
        'items': [{'sku': 'item_002', 'quantity': 1}],
        'timestamp': '2023-01-15T15:01:00Z'
    }
]

processed = process_orders_batch(db, batch)

Transaction Monitoring

# Long-running transaction with monitoring
def monitored_data_migration(db):
    try:
        txn_db = db.begin_transaction(
            read=['legacy_data'],
            write=['new_schema'],
            max_transaction_size=100*1024*1024,  # 100MB limit
            lock_timeout=300  # 5 minute timeout
        )
        
        legacy = txn_db.collection('legacy_data')
        new_schema = txn_db.collection('new_schema')
        
        # Get transaction info
        print(f"Transaction ID: {txn_db.transaction_id}")
        
        # Process data in batches
        batch_size = 1000
        processed = 0
        
        cursor = txn_db.aql.execute(
            "FOR doc IN legacy_data RETURN doc",
            batch_size=batch_size
        )
        
        batch = []
        for doc in cursor:
            # Transform document
            transformed = {
                'id': doc['_key'],
                'data': doc['payload'],
                'created': doc['timestamp'],
                'migrated_at': '2023-01-15T16:00:00Z'
            }
            batch.append(transformed)
            
            if len(batch) >= batch_size:
                # Insert batch
                new_schema.insert_many(batch)
                processed += len(batch)
                batch = []
                
                # Check transaction status
                status = txn_db.status()
                print(f"Processed: {processed}, Status: {status['status']}")
        
        # Insert remaining documents
        if batch:
            new_schema.insert_many(batch)
            processed += len(batch)
        
        # Commit transaction
        txn_db.commit()
        print(f"Migration completed: {processed} documents")
        
    except Exception as e:
        print(f"Migration failed: {e}")
        if 'txn_db' in locals():
            txn_db.abort()

Transaction Configuration

# Advanced transaction configuration
high_performance_txn = db.begin_transaction(
    write=['high_volume_collection'],
    sync=False,                    # Don't wait for disk sync
    allow_implicit=False,          # Strict collection access
    lock_timeout=60,               # 1 minute lock timeout
    max_transaction_size=50*1024*1024  # 50MB size limit
)

# Exclusive access transaction
exclusive_txn = db.begin_transaction(
    exclusive=['critical_data'],   # Exclusive lock
    sync=True,                     # Ensure durability
    lock_timeout=120               # 2 minute timeout
)

# Read-only transaction for consistent snapshots
readonly_txn = db.begin_transaction(
    read=['analytics_data', 'reference_tables']
)

# Generate report with consistent data view
with readonly_txn as txn_db:
    analytics = txn_db.collection('analytics_data')
    reference = txn_db.collection('reference_tables')
    
    # All reads see the same consistent snapshot
    report_data = txn_db.aql.execute("""
        FOR analytics_record IN analytics_data
            FOR ref IN reference_tables
                FILTER analytics_record.category == ref.category
                RETURN {
                    record: analytics_record,
                    category_info: ref
                }
    """)
    
    # Process report...

Error Handling

from arango import (
    TransactionInitError,
    TransactionCommitError, 
    TransactionAbortError,
    TransactionStatusError
)

def safe_transaction_operation(db, operation_data):
    txn_db = None
    try:
        # Begin transaction
        txn_db = db.begin_transaction(
            write=['orders', 'inventory']
        )
        
        # Perform operations
        result = perform_business_logic(txn_db, operation_data)
        
        # Commit
        txn_db.commit()
        return result
        
    except TransactionInitError as e:
        print(f"Failed to start transaction: {e}")
        return None
        
    except TransactionCommitError as e:
        print(f"Failed to commit transaction: {e}")
        if txn_db:
            try:
                txn_db.abort()
            except TransactionAbortError:
                print("Failed to abort transaction")
        return None
        
    except Exception as e:
        print(f"Operation failed: {e}")
        if txn_db:
            try:
                txn_db.abort()
                print("Transaction rolled back successfully")
            except TransactionAbortError as abort_error:
                print(f"Failed to abort transaction: {abort_error}")
        return None

def perform_business_logic(txn_db, data):
    # Your business logic here
    orders = txn_db.collection('orders')
    inventory = txn_db.collection('inventory')
    
    # Simulate complex operations
    order = orders.insert(data['order'])
    inventory.update(data['inventory_update'])
    
    return {'order_id': order['_key']}

Install with Tessl CLI

npx tessl i tessl/pypi-python-arango

docs

aql.md

client-database.md

collections.md

errors-types.md

graphs.md

index.md

transactions.md

tile.json