0
# Transaction Management
1
2
Database transaction support providing ACID guarantees across multiple collections. Supports both JavaScript-based server-side transactions and managed transaction contexts with automatic commit/rollback.
3
4
## Capabilities
5
6
### Transaction Context
7
8
Create managed transaction contexts that provide transactional access to collections with automatic resource cleanup.
9
10
```python { .api }
11
class StandardDatabase:
12
def begin_transaction(self, read=None, write=None, exclusive=None,
13
sync=None, allow_implicit=None, lock_timeout=None,
14
max_transaction_size=None) -> Result[TransactionDatabase]:
15
"""
16
Begin a managed transaction.
17
18
Parameters:
19
- read: str or list, collections for read access
20
- write: str or list, collections for write access
21
- exclusive: str or list, collections for exclusive access
22
- sync: bool, wait for sync to disk on commit
23
- allow_implicit: bool, allow implicit collection access
24
- lock_timeout: int, lock timeout in seconds
25
- max_transaction_size: int, maximum transaction size in bytes
26
27
Returns:
28
Result[TransactionDatabase]: Transaction database context
29
"""
30
```
31
32
### JavaScript Transactions
33
34
Execute server-side JavaScript transactions for atomic multi-collection operations with full transaction guarantees.
35
36
```python { .api }
37
def execute_transaction(self, command: str, params=None, read=None, write=None,
38
sync=None, timeout=None, max_size=None, allow_implicit=None,
39
intermediate_commit_count=None, intermediate_commit_size=None,
40
allow_dirty_read: bool = False) -> Result:
41
"""
42
Execute JavaScript transaction.
43
44
Parameters:
45
- command: str, JavaScript code to execute
46
- params: dict, parameters passed to JavaScript function
47
- read: Sequence[str], collections for read access
48
- write: Sequence[str], collections for write access
49
- sync: bool, wait for sync to disk
50
- timeout: Number, timeout for waiting on collection locks
51
- max_size: int, maximum transaction size in bytes
52
- allow_implicit: bool, allow implicit collection access
53
- intermediate_commit_count: int, intermediate commit count
54
- intermediate_commit_size: int, intermediate commit size in bytes
55
- allow_dirty_read: bool, allow reads from followers in cluster
56
57
Returns:
58
Result: Transaction execution result
59
"""
60
61
def list_transactions(self) -> Result[List[Json]]:
62
"""
63
Return the list of running stream transactions.
64
65
Returns:
66
Result[List[Json]]: List of transactions with id and state fields
67
"""
68
```
69
70
### Transaction Database Context
71
72
The TransactionDatabase provides the same interface as StandardDatabase but within a transaction context.
73
74
```python { .api }
75
class TransactionDatabase:
76
"""
77
Database interface within transaction context.
78
Inherits all methods from StandardDatabase but operations
79
are executed within the transaction scope.
80
"""
81
82
def commit(self) -> Result[bool]:
83
"""
84
Commit the transaction.
85
86
Returns:
87
Result[bool]: True on successful commit
88
"""
89
90
def abort(self) -> Result[bool]:
91
"""
92
Abort/rollback the transaction.
93
94
Returns:
95
Result[bool]: True on successful abort
96
"""
97
98
@property
99
def transaction_id(self) -> str:
100
"""Get transaction ID."""
101
102
def status(self) -> Result[Json]:
103
"""
104
Get transaction status.
105
106
Returns:
107
Result[Json]: Transaction status information
108
"""
109
```
110
111
## Usage Examples
112
113
### Managed Transactions
114
115
```python
116
from arango import ArangoClient, TransactionCommitError
117
118
client = ArangoClient()
119
db = client.db('banking', username='root', password='password')
120
121
# Simple transaction context
122
try:
123
# Begin transaction with read/write access
124
txn_db = db.begin_transaction(
125
read=['accounts'],
126
write=['accounts', 'transactions']
127
)
128
129
# Work within transaction
130
accounts = txn_db.collection('accounts')
131
transactions = txn_db.collection('transactions')
132
133
# Check account balance
134
sender = accounts.get('alice')
135
if sender['balance'] < 100:
136
txn_db.abort()
137
print("Insufficient funds")
138
else:
139
# Update balances
140
accounts.update({'_key': 'alice', 'balance': sender['balance'] - 100})
141
accounts.update({'_key': 'bob', 'balance':
142
accounts.get('bob')['balance'] + 100})
143
144
# Log transaction
145
transactions.insert({
146
'from': 'alice',
147
'to': 'bob',
148
'amount': 100,
149
'timestamp': '2023-01-15T10:30:00Z'
150
})
151
152
# Commit all changes
153
txn_db.commit()
154
print("Transfer completed")
155
156
except TransactionCommitError as e:
157
print(f"Transaction failed: {e}")
158
```
159
160
### Context Manager Pattern
161
162
```python
163
# Using transaction as context manager
164
try:
165
with db.begin_transaction(write=['inventory', 'orders']) as txn_db:
166
inventory = txn_db.collection('inventory')
167
orders = txn_db.collection('orders')
168
169
# Check inventory
170
item = inventory.get('laptop_001')
171
if item['quantity'] < 5:
172
raise ValueError("Insufficient inventory")
173
174
# Update inventory
175
inventory.update({
176
'_key': 'laptop_001',
177
'quantity': item['quantity'] - 5
178
})
179
180
# Create order
181
order = orders.insert({
182
'customer': 'customer_123',
183
'items': [{'sku': 'laptop_001', 'quantity': 5}],
184
'total': 2500.00,
185
'status': 'confirmed'
186
})
187
188
print(f"Order {order['_key']} created successfully")
189
# Transaction commits automatically on context exit
190
191
except Exception as e:
192
print(f"Order failed: {e}")
193
# Transaction aborts automatically on exception
194
```
195
196
### JavaScript Transactions
197
198
```python
199
# Complex server-side transaction
200
transfer_script = """
201
function(params) {
202
var db = require('@arangodb').db;
203
var accounts = db.accounts;
204
var transactions = db.transactions;
205
206
// Get account documents
207
var sender = accounts.document(params.from_account);
208
var receiver = accounts.document(params.to_account);
209
210
// Validate transfer
211
if (sender.balance < params.amount) {
212
throw new Error('Insufficient funds');
213
}
214
215
if (sender.status !== 'active' || receiver.status !== 'active') {
216
throw new Error('Account not active');
217
}
218
219
// Calculate new balances
220
var new_sender_balance = sender.balance - params.amount;
221
var new_receiver_balance = receiver.balance + params.amount;
222
223
// Update accounts
224
accounts.update(sender._key, {
225
balance: new_sender_balance,
226
last_transaction: params.timestamp
227
});
228
229
accounts.update(receiver._key, {
230
balance: new_receiver_balance,
231
last_transaction: params.timestamp
232
});
233
234
// Log transaction
235
var txn_record = transactions.insert({
236
from_account: params.from_account,
237
to_account: params.to_account,
238
amount: params.amount,
239
timestamp: params.timestamp,
240
type: 'transfer',
241
status: 'completed'
242
});
243
244
return {
245
transaction_id: txn_record._key,
246
sender_balance: new_sender_balance,
247
receiver_balance: new_receiver_balance
248
};
249
}
250
"""
251
252
# Execute transaction
253
result = db.transaction(
254
command=transfer_script,
255
params={
256
'from_account': 'alice',
257
'to_account': 'bob',
258
'amount': 250.00,
259
'timestamp': '2023-01-15T14:22:00Z'
260
},
261
write=['accounts', 'transactions']
262
)
263
264
print(f"Transfer completed: {result['transaction_id']}")
265
print(f"Alice balance: ${result['sender_balance']}")
266
print(f"Bob balance: ${result['receiver_balance']}")
267
```
268
269
### Batch Operations in Transactions
270
271
```python
272
# Process multiple orders atomically
273
def process_orders_batch(db, order_batch):
274
try:
275
with db.begin_transaction(
276
write=['orders', 'inventory', 'customers'],
277
lock_timeout=30
278
) as txn_db:
279
280
orders_col = txn_db.collection('orders')
281
inventory_col = txn_db.collection('inventory')
282
customers_col = txn_db.collection('customers')
283
284
processed_orders = []
285
286
for order_data in order_batch:
287
# Validate customer
288
customer = customers_col.get(order_data['customer_id'])
289
if not customer or customer['status'] != 'active':
290
raise ValueError(f"Invalid customer: {order_data['customer_id']}")
291
292
# Check and update inventory
293
total_cost = 0
294
for item in order_data['items']:
295
inventory_item = inventory_col.get(item['sku'])
296
if inventory_item['quantity'] < item['quantity']:
297
raise ValueError(f"Insufficient inventory for {item['sku']}")
298
299
# Reserve inventory
300
inventory_col.update({
301
'_key': item['sku'],
302
'quantity': inventory_item['quantity'] - item['quantity']
303
})
304
305
total_cost += inventory_item['price'] * item['quantity']
306
307
# Create order
308
order = orders_col.insert({
309
'customer_id': order_data['customer_id'],
310
'items': order_data['items'],
311
'total_cost': total_cost,
312
'status': 'confirmed',
313
'created_at': order_data.get('timestamp')
314
})
315
316
processed_orders.append(order['_key'])
317
318
print(f"Successfully processed {len(processed_orders)} orders")
319
return processed_orders
320
321
except Exception as e:
322
print(f"Batch processing failed: {e}")
323
return []
324
325
# Process batch
326
batch = [
327
{
328
'customer_id': 'cust_001',
329
'items': [{'sku': 'item_001', 'quantity': 2}],
330
'timestamp': '2023-01-15T15:00:00Z'
331
},
332
{
333
'customer_id': 'cust_002',
334
'items': [{'sku': 'item_002', 'quantity': 1}],
335
'timestamp': '2023-01-15T15:01:00Z'
336
}
337
]
338
339
processed = process_orders_batch(db, batch)
340
```
341
342
### Transaction Monitoring
343
344
```python
345
# Long-running transaction with monitoring
346
def monitored_data_migration(db):
347
try:
348
txn_db = db.begin_transaction(
349
read=['legacy_data'],
350
write=['new_schema'],
351
max_transaction_size=100*1024*1024, # 100MB limit
352
lock_timeout=300 # 5 minute timeout
353
)
354
355
legacy = txn_db.collection('legacy_data')
356
new_schema = txn_db.collection('new_schema')
357
358
# Get transaction info
359
print(f"Transaction ID: {txn_db.transaction_id}")
360
361
# Process data in batches
362
batch_size = 1000
363
processed = 0
364
365
cursor = txn_db.aql.execute(
366
"FOR doc IN legacy_data RETURN doc",
367
batch_size=batch_size
368
)
369
370
batch = []
371
for doc in cursor:
372
# Transform document
373
transformed = {
374
'id': doc['_key'],
375
'data': doc['payload'],
376
'created': doc['timestamp'],
377
'migrated_at': '2023-01-15T16:00:00Z'
378
}
379
batch.append(transformed)
380
381
if len(batch) >= batch_size:
382
# Insert batch
383
new_schema.insert_many(batch)
384
processed += len(batch)
385
batch = []
386
387
# Check transaction status
388
status = txn_db.status()
389
print(f"Processed: {processed}, Status: {status['status']}")
390
391
# Insert remaining documents
392
if batch:
393
new_schema.insert_many(batch)
394
processed += len(batch)
395
396
# Commit transaction
397
txn_db.commit()
398
print(f"Migration completed: {processed} documents")
399
400
except Exception as e:
401
print(f"Migration failed: {e}")
402
if 'txn_db' in locals():
403
txn_db.abort()
404
```
405
406
### Transaction Configuration
407
408
```python
409
# Advanced transaction configuration
410
high_performance_txn = db.begin_transaction(
411
write=['high_volume_collection'],
412
sync=False, # Don't wait for disk sync
413
allow_implicit=False, # Strict collection access
414
lock_timeout=60, # 1 minute lock timeout
415
max_transaction_size=50*1024*1024 # 50MB size limit
416
)
417
418
# Exclusive access transaction
419
exclusive_txn = db.begin_transaction(
420
exclusive=['critical_data'], # Exclusive lock
421
sync=True, # Ensure durability
422
lock_timeout=120 # 2 minute timeout
423
)
424
425
# Read-only transaction for consistent snapshots
426
readonly_txn = db.begin_transaction(
427
read=['analytics_data', 'reference_tables']
428
)
429
430
# Generate report with consistent data view
431
with readonly_txn as txn_db:
432
analytics = txn_db.collection('analytics_data')
433
reference = txn_db.collection('reference_tables')
434
435
# All reads see the same consistent snapshot
436
report_data = txn_db.aql.execute("""
437
FOR analytics_record IN analytics_data
438
FOR ref IN reference_tables
439
FILTER analytics_record.category == ref.category
440
RETURN {
441
record: analytics_record,
442
category_info: ref
443
}
444
""")
445
446
# Process report...
447
```
448
449
### Error Handling
450
451
```python
452
from arango import (
453
TransactionInitError,
454
TransactionCommitError,
455
TransactionAbortError,
456
TransactionStatusError
457
)
458
459
def safe_transaction_operation(db, operation_data):
460
txn_db = None
461
try:
462
# Begin transaction
463
txn_db = db.begin_transaction(
464
write=['orders', 'inventory']
465
)
466
467
# Perform operations
468
result = perform_business_logic(txn_db, operation_data)
469
470
# Commit
471
txn_db.commit()
472
return result
473
474
except TransactionInitError as e:
475
print(f"Failed to start transaction: {e}")
476
return None
477
478
except TransactionCommitError as e:
479
print(f"Failed to commit transaction: {e}")
480
if txn_db:
481
try:
482
txn_db.abort()
483
except TransactionAbortError:
484
print("Failed to abort transaction")
485
return None
486
487
except Exception as e:
488
print(f"Operation failed: {e}")
489
if txn_db:
490
try:
491
txn_db.abort()
492
print("Transaction rolled back successfully")
493
except TransactionAbortError as abort_error:
494
print(f"Failed to abort transaction: {abort_error}")
495
return None
496
497
def perform_business_logic(txn_db, data):
498
# Your business logic here
499
orders = txn_db.collection('orders')
500
inventory = txn_db.collection('inventory')
501
502
# Simulate complex operations
503
order = orders.insert(data['order'])
504
inventory.update(data['inventory_update'])
505
506
return {'order_id': order['_key']}
507
```