0
# Transaction Management
1
2
Transaction control with savepoints, context managers, and isolation level management for reliable database operations.
3
4
## Capabilities
5
6
### Transaction Context Manager
7
8
High-level transaction interface using Python context managers for automatic commit/rollback.
9
10
```python { .api }
11
def xact():
12
"""
13
Create a transaction context manager for automatic transaction control.
14
15
Returns:
16
Transaction: Context manager that commits on success, rolls back on exceptions
17
18
Usage:
19
with db.xact():
20
# All operations in this block are part of one transaction
21
# Automatic commit on success, rollback on exception
22
"""
23
```
24
25
### Transaction Interface
26
27
Low-level transaction control interface for fine-grained transaction management.
28
29
```python { .api }
30
class Transaction:
31
"""
32
Transaction control interface providing commit, rollback, and savepoint operations.
33
"""
34
35
def start():
36
"""
37
Start a new transaction.
38
39
Raises:
40
TransactionError: If transaction cannot be started
41
"""
42
43
def commit():
44
"""
45
Commit the current transaction.
46
47
Raises:
48
TransactionError: If commit fails
49
"""
50
51
def rollback():
52
"""
53
Roll back the current transaction.
54
55
Raises:
56
TransactionError: If rollback fails
57
"""
58
59
def savepoint(name=None):
60
"""
61
Create a savepoint within the current transaction.
62
63
Parameters:
64
- name (str, optional): Savepoint name (auto-generated if not provided)
65
66
Returns:
67
str: Savepoint name
68
69
Raises:
70
TransactionError: If savepoint creation fails
71
"""
72
73
def rollback_to_savepoint(name):
74
"""
75
Roll back to a specific savepoint.
76
77
Parameters:
78
- name (str): Savepoint name
79
80
Raises:
81
TransactionError: If rollback to savepoint fails
82
"""
83
84
def release_savepoint(name):
85
"""
86
Release a savepoint (remove it without rolling back).
87
88
Parameters:
89
- name (str): Savepoint name
90
91
Raises:
92
TransactionError: If savepoint release fails
93
"""
94
95
@property
96
def state():
97
"""
98
Get current transaction state.
99
100
Returns:
101
str: Transaction state ('idle', 'active', 'error', 'aborted')
102
"""
103
```
104
105
### Database Transaction Methods
106
107
Transaction methods available on the main database connection interface.
108
109
```python { .api }
110
class Database:
111
"""Database interface with transaction management methods."""
112
113
def xact():
114
"""
115
Create transaction context manager.
116
117
Returns:
118
Transaction: Transaction context manager
119
"""
120
121
def begin():
122
"""
123
Begin a new transaction explicitly.
124
125
Returns:
126
Transaction: Transaction object for manual control
127
"""
128
129
def commit():
130
"""
131
Commit current transaction (if any).
132
"""
133
134
def rollback():
135
"""
136
Roll back current transaction (if any).
137
"""
138
139
@property
140
def in_transaction():
141
"""
142
Check if currently in a transaction.
143
144
Returns:
145
bool: True if in transaction, False otherwise
146
"""
147
```
148
149
### Isolation Level Control
150
151
Interface for controlling transaction isolation levels.
152
153
```python { .api }
154
def set_isolation_level(level):
155
"""
156
Set transaction isolation level.
157
158
Parameters:
159
- level (str): Isolation level ('READ UNCOMMITTED', 'READ COMMITTED',
160
'REPEATABLE READ', 'SERIALIZABLE')
161
162
Raises:
163
ProgrammingError: If isolation level is invalid
164
"""
165
166
def get_isolation_level():
167
"""
168
Get current transaction isolation level.
169
170
Returns:
171
str: Current isolation level
172
"""
173
```
174
175
## Usage Examples
176
177
### Basic Transaction Usage
178
179
```python
180
import postgresql
181
import postgresql.exceptions as pg_exc
182
183
db = postgresql.open('pq://user:pass@localhost/mydb')
184
185
# Context manager approach (recommended)
186
try:
187
with db.xact():
188
# All operations in this block are part of one transaction
189
insert_user = db.prepare("INSERT INTO users (name, email) VALUES ($1, $2)")
190
insert_user("John Doe", "john@example.com")
191
192
insert_profile = db.prepare("INSERT INTO profiles (user_id, bio) VALUES (currval('users_id_seq'), $1)")
193
insert_profile("Software developer")
194
195
# Transaction automatically commits here if no exceptions
196
print("User and profile created successfully")
197
198
except pg_exc.ICVError as e:
199
print(f"Integrity constraint violation: {e}")
200
# Transaction automatically rolled back
201
202
except pg_exc.Error as e:
203
print(f"Database error: {e}")
204
# Transaction automatically rolled back
205
```
206
207
### Manual Transaction Control
208
209
```python
210
import postgresql
211
import postgresql.exceptions as pg_exc
212
213
db = postgresql.open('pq://user:pass@localhost/mydb')
214
215
# Manual transaction control
216
tx = db.begin()
217
218
try:
219
# Execute operations
220
db.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
221
db.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
222
223
# Check balances are valid
224
check_balance = db.prepare("SELECT balance FROM accounts WHERE id = $1")
225
balance1 = check_balance.first(1)['balance']
226
balance2 = check_balance.first(2)['balance']
227
228
if balance1 < 0:
229
raise ValueError("Insufficient funds in account 1")
230
231
# Manually commit
232
tx.commit()
233
print("Transfer completed successfully")
234
235
except Exception as e:
236
# Manually rollback
237
tx.rollback()
238
print(f"Transfer failed: {e}")
239
```
240
241
### Savepoint Usage
242
243
```python
244
import postgresql
245
import postgresql.exceptions as pg_exc
246
247
db = postgresql.open('pq://user:pass@localhost/mydb')
248
249
with db.xact() as tx:
250
# Insert initial user
251
insert_user = db.prepare("INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id")
252
user_id = insert_user.first("John Doe", "john@example.com")['id']
253
254
# Create savepoint before risky operations
255
sp1 = tx.savepoint("before_profile")
256
257
try:
258
# Try to insert profile with potentially problematic data
259
insert_profile = db.prepare("INSERT INTO profiles (user_id, bio, avatar_url) VALUES ($1, $2, $3)")
260
insert_profile(user_id, "Bio text", "http://invalid-image-url")
261
262
# If we get here, release the savepoint
263
tx.release_savepoint(sp1)
264
print("Profile created successfully")
265
266
except pg_exc.Error as e:
267
# Roll back to savepoint (keeps user, removes profile attempt)
268
tx.rollback_to_savepoint(sp1)
269
print(f"Profile creation failed: {e}")
270
271
# Insert minimal profile instead
272
insert_minimal_profile = db.prepare("INSERT INTO profiles (user_id, bio) VALUES ($1, $2)")
273
insert_minimal_profile(user_id, "Default bio")
274
print("Created minimal profile instead")
275
276
# Add additional data with another savepoint
277
sp2 = tx.savepoint("before_preferences")
278
279
try:
280
insert_prefs = db.prepare("INSERT INTO user_preferences (user_id, theme, notifications) VALUES ($1, $2, $3)")
281
insert_prefs(user_id, "dark", True)
282
tx.release_savepoint(sp2)
283
print("Preferences created successfully")
284
285
except pg_exc.Error as e:
286
tx.rollback_to_savepoint(sp2)
287
print(f"Preferences creation failed: {e}")
288
289
# Transaction commits here with user, profile, and possibly preferences
290
print("User creation process completed")
291
```
292
293
### Nested Transactions with Savepoints
294
295
```python
296
import postgresql
297
import postgresql.exceptions as pg_exc
298
299
db = postgresql.open('pq://user:pass@localhost/mydb')
300
301
def create_order_with_items(db, customer_id, items):
302
"""Create order with multiple items using nested savepoints."""
303
304
with db.xact() as tx:
305
# Create order
306
create_order = db.prepare("""
307
INSERT INTO orders (customer_id, order_date, status)
308
VALUES ($1, NOW(), 'pending')
309
RETURNING id
310
""")
311
order_id = create_order.first(customer_id)['id']
312
313
# Create savepoint before adding items
314
main_savepoint = tx.savepoint("order_created")
315
316
successful_items = []
317
failed_items = []
318
319
for item in items:
320
# Create savepoint for each item
321
item_sp = tx.savepoint(f"item_{item['product_id']}")
322
323
try:
324
# Check product availability
325
check_stock = db.prepare("SELECT stock_quantity FROM products WHERE id = $1")
326
stock = check_stock.first(item['product_id'])
327
328
if not stock or stock['stock_quantity'] < item['quantity']:
329
raise ValueError(f"Insufficient stock for product {item['product_id']}")
330
331
# Add order item
332
add_item = db.prepare("""
333
INSERT INTO order_items (order_id, product_id, quantity, price)
334
VALUES ($1, $2, $3, $4)
335
""")
336
add_item(order_id, item['product_id'], item['quantity'], item['price'])
337
338
# Update stock
339
update_stock = db.prepare("""
340
UPDATE products
341
SET stock_quantity = stock_quantity - $1
342
WHERE id = $2
343
""")
344
update_stock(item['quantity'], item['product_id'])
345
346
# Release savepoint - item successfully added
347
tx.release_savepoint(item_sp)
348
successful_items.append(item)
349
350
except Exception as e:
351
# Roll back this item only
352
tx.rollback_to_savepoint(item_sp)
353
failed_items.append((item, str(e)))
354
355
# Check if any items were successfully added
356
if not successful_items:
357
# No items added, roll back entire order
358
tx.rollback_to_savepoint(main_savepoint)
359
raise ValueError("No items could be added to order")
360
361
# Update order total
362
total_query = db.prepare("""
363
SELECT SUM(quantity * price) as total
364
FROM order_items
365
WHERE order_id = $1
366
""")
367
total = total_query.first(order_id)['total']
368
369
update_total = db.prepare("UPDATE orders SET total = $1 WHERE id = $2")
370
update_total(total, order_id)
371
372
print(f"Order {order_id} created with {len(successful_items)} items")
373
if failed_items:
374
print(f"Failed to add {len(failed_items)} items:")
375
for item, error in failed_items:
376
print(f" Product {item['product_id']}: {error}")
377
378
return order_id
379
380
# Usage
381
customer_id = 123
382
items = [
383
{'product_id': 1, 'quantity': 2, 'price': 29.99},
384
{'product_id': 2, 'quantity': 1, 'price': 49.99},
385
{'product_id': 3, 'quantity': 5, 'price': 9.99} # May not have enough stock
386
]
387
388
try:
389
order_id = create_order_with_items(db, customer_id, items)
390
print(f"Order created successfully: {order_id}")
391
except Exception as e:
392
print(f"Order creation failed: {e}")
393
```
394
395
### Isolation Level Management
396
397
```python
398
import postgresql
399
400
db = postgresql.open('pq://user:pass@localhost/mydb')
401
402
# Check current isolation level
403
current_level = db.query("SHOW transaction_isolation")[0][0]
404
print(f"Current isolation level: {current_level}")
405
406
# Set isolation level for specific operations
407
def transfer_money_serializable(db, from_account, to_account, amount):
408
"""Transfer money with SERIALIZABLE isolation to prevent phantom reads."""
409
410
with db.xact():
411
# Set isolation level for this transaction
412
db.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
413
414
# Get account balances
415
get_balance = db.prepare("SELECT balance FROM accounts WHERE id = $1")
416
from_balance = get_balance.first(from_account)['balance']
417
to_balance = get_balance.first(to_account)['balance']
418
419
if from_balance < amount:
420
raise ValueError("Insufficient funds")
421
422
# Update balances
423
update_balance = db.prepare("UPDATE accounts SET balance = $1 WHERE id = $2")
424
update_balance(from_balance - amount, from_account)
425
update_balance(to_balance + amount, to_account)
426
427
print(f"Transferred ${amount} from account {from_account} to {to_account}")
428
429
# Usage with different isolation levels
430
def demonstrate_isolation_levels(db):
431
"""Demonstrate different isolation levels."""
432
433
isolation_levels = [
434
'READ UNCOMMITTED',
435
'READ COMMITTED',
436
'REPEATABLE READ',
437
'SERIALIZABLE'
438
]
439
440
for level in isolation_levels:
441
print(f"\nTesting {level}:")
442
443
with db.xact():
444
db.execute(f"SET TRANSACTION ISOLATION LEVEL {level}")
445
446
# Perform operations that might behave differently at different levels
447
result = db.query("SELECT COUNT(*) FROM users WHERE active = true")
448
print(f" Active users: {result[0][0]}")
449
450
# Small delay to see effects of concurrent transactions
451
import time
452
time.sleep(0.1)
453
454
# Second query - may show different results depending on isolation level
455
result2 = db.query("SELECT COUNT(*) FROM users WHERE active = true")
456
print(f" Active users (second read): {result2[0][0]}")
457
458
# Run demonstration
459
demonstrate_isolation_levels(db)
460
```
461
462
### Transaction State Monitoring
463
464
```python
465
import postgresql
466
import postgresql.exceptions as pg_exc
467
468
db = postgresql.open('pq://user:pass@localhost/mydb')
469
470
def monitored_transaction(db, operations):
471
"""Execute operations with transaction state monitoring."""
472
473
print(f"Starting transaction (in_transaction: {db.in_transaction})")
474
475
tx = db.begin()
476
477
try:
478
print(f"Transaction state: {tx.state}")
479
480
for i, operation in enumerate(operations):
481
print(f"Executing operation {i+1}: {operation.__name__}")
482
483
try:
484
operation(db)
485
print(f" Success - Transaction state: {tx.state}")
486
487
except Exception as e:
488
print(f" Failed: {e}")
489
print(f" Transaction state: {tx.state}")
490
491
# If transaction is in error state, we must rollback
492
if tx.state == 'error':
493
print(" Transaction in error state, rolling back")
494
tx.rollback()
495
return False
496
497
# All operations successful
498
tx.commit()
499
print(f"Transaction committed - state: {tx.state}")
500
return True
501
502
except Exception as e:
503
print(f"Transaction failed: {e}")
504
if tx.state != 'aborted': # Only rollback if not already aborted
505
tx.rollback()
506
print(f"Transaction rolled back - state: {tx.state}")
507
return False
508
509
# Define test operations
510
def operation1(db):
511
db.execute("INSERT INTO test_table (name) VALUES ('test1')")
512
513
def operation2(db):
514
db.execute("INSERT INTO test_table (name) VALUES ('test2')")
515
516
def operation3(db):
517
# This might fail due to constraints
518
db.execute("INSERT INTO test_table (id, name) VALUES (999999999, 'test3')")
519
520
# Test with operations
521
operations = [operation1, operation2, operation3]
522
success = monitored_transaction(db, operations)
523
print(f"Overall success: {success}")
524
```