0
# Transaction Management
1
2
Comprehensive transaction support including transaction contexts, savepoints, isolation levels, read-only transactions, and nested transaction management with full PostgreSQL transaction semantics.
3
4
## Capabilities
5
6
### Transaction Context
7
8
Create and manage database transactions with automatic rollback on exceptions and explicit commit control.
9
10
```python { .api }
11
def transaction(
12
self,
13
*,
14
isolation: str = None,
15
readonly: bool = False,
16
deferrable: bool = False
17
) -> Transaction:
18
"""
19
Create a Transaction context manager.
20
21
Parameters:
22
isolation: Transaction isolation level ('read_uncommitted', 'read_committed', 'repeatable_read', 'serializable')
23
readonly: Make transaction read-only
24
deferrable: Allow transaction to be deferred (only for serializable read-only)
25
26
Returns:
27
Transaction context manager
28
"""
29
30
def is_in_transaction(self) -> bool:
31
"""
32
Return True if connection is currently inside a transaction.
33
34
Returns:
35
Boolean indicating transaction state
36
"""
37
```
38
39
#### Example Usage
40
41
```python
42
# Basic transaction
43
async with conn.transaction():
44
await conn.execute("INSERT INTO users(name) VALUES($1)", "Alice")
45
await conn.execute("INSERT INTO orders(user_id, amount) VALUES($1, $2)", user_id, 100)
46
# Automatic commit on success, rollback on exception
47
48
# Transaction with isolation level
49
async with conn.transaction(isolation='serializable'):
50
# Strict consistency for critical operations
51
balance = await conn.fetchval("SELECT balance FROM accounts WHERE id = $1", account_id)
52
if balance >= amount:
53
await conn.execute("UPDATE accounts SET balance = balance - $1 WHERE id = $2", amount, account_id)
54
await conn.execute("INSERT INTO transactions(account_id, amount) VALUES($1, $2)", account_id, -amount)
55
56
# Read-only transaction for reporting
57
async with conn.transaction(readonly=True):
58
# Multiple consistent reads
59
user_count = await conn.fetchval("SELECT COUNT(*) FROM users")
60
order_count = await conn.fetchval("SELECT COUNT(*) FROM orders")
61
revenue = await conn.fetchval("SELECT SUM(amount) FROM orders WHERE status = 'completed'")
62
```
63
64
### Transaction Control
65
66
Explicit transaction control with manual commit and rollback operations.
67
68
```python { .api }
69
class Transaction:
70
"""Database transaction context manager."""
71
72
async def start(self) -> None:
73
"""Start the transaction explicitly."""
74
75
async def commit(self) -> None:
76
"""Commit the transaction."""
77
78
async def rollback(self) -> None:
79
"""Rollback the transaction."""
80
```
81
82
#### Example Usage
83
84
```python
85
# Manual transaction control
86
tx = conn.transaction()
87
await tx.start()
88
89
try:
90
await conn.execute("INSERT INTO users(name) VALUES($1)", "Bob")
91
user_id = await conn.fetchval("SELECT lastval()")
92
93
await conn.execute("INSERT INTO profiles(user_id, email) VALUES($1, $2)", user_id, "bob@example.com")
94
95
await tx.commit()
96
print("Transaction committed successfully")
97
98
except Exception as e:
99
await tx.rollback()
100
print(f"Transaction rolled back: {e}")
101
```
102
103
### Savepoints
104
105
Create nested transactions using PostgreSQL savepoints for complex transaction logic.
106
107
```python { .api }
108
async def savepoint(self, name: str = None) -> Transaction:
109
"""
110
Create a savepoint within the current transaction.
111
112
Parameters:
113
name: Optional savepoint name
114
115
Returns:
116
Transaction context manager for the savepoint
117
"""
118
119
async def rollback_to(self, savepoint_name: str) -> None:
120
"""
121
Rollback to a specific savepoint.
122
123
Parameters:
124
savepoint_name: Name of the savepoint to rollback to
125
"""
126
```
127
128
#### Example Usage
129
130
```python
131
async with conn.transaction():
132
# Insert primary record
133
await conn.execute("INSERT INTO orders(customer_id, total) VALUES($1, $2)", customer_id, total)
134
order_id = await conn.fetchval("SELECT lastval()")
135
136
# Try to process each item with individual error handling
137
for item in order_items:
138
async with conn.transaction(): # Nested savepoint
139
try:
140
await conn.execute(
141
"INSERT INTO order_items(order_id, product_id, quantity, price) VALUES($1, $2, $3, $4)",
142
order_id, item.product_id, item.quantity, item.price
143
)
144
await conn.execute(
145
"UPDATE products SET stock = stock - $1 WHERE id = $2",
146
item.quantity, item.product_id
147
)
148
except asyncpg.CheckViolationError:
149
# Insufficient stock - this item will be skipped
150
# but the order and other items will still be processed
151
print(f"Insufficient stock for product {item.product_id}")
152
continue
153
```
154
155
### Transaction Isolation Levels
156
157
PostgreSQL supports four standard transaction isolation levels with different consistency guarantees.
158
159
#### Read Uncommitted
160
161
```python
162
async with conn.transaction(isolation='read_uncommitted'):
163
# Lowest isolation - can read uncommitted changes
164
# Rarely used in practice
165
result = await conn.fetch("SELECT * FROM volatile_data")
166
```
167
168
#### Read Committed (Default)
169
170
```python
171
async with conn.transaction(isolation='read_committed'):
172
# Default level - sees committed changes from other transactions
173
# Good balance of consistency and performance
174
await conn.execute("UPDATE accounts SET balance = balance + $1 WHERE id = $2", amount, account_id)
175
```
176
177
#### Repeatable Read
178
179
```python
180
async with conn.transaction(isolation='repeatable_read'):
181
# Consistent snapshot of data throughout transaction
182
# Good for analytical queries and reports
183
initial_count = await conn.fetchval("SELECT COUNT(*) FROM orders")
184
185
# Do some work...
186
time.sleep(1)
187
188
# Same count guaranteed even if other transactions added orders
189
final_count = await conn.fetchval("SELECT COUNT(*) FROM orders")
190
assert initial_count == final_count
191
```
192
193
#### Serializable
194
195
```python
196
async with conn.transaction(isolation='serializable'):
197
# Strongest isolation - equivalent to serial execution
198
# May fail with serialization errors that require retry
199
try:
200
balance = await conn.fetchval("SELECT balance FROM accounts WHERE id = $1", from_account)
201
if balance >= amount:
202
await conn.execute("UPDATE accounts SET balance = balance - $1 WHERE id = $2", amount, from_account)
203
await conn.execute("UPDATE accounts SET balance = balance + $1 WHERE id = $2", amount, to_account)
204
except asyncpg.SerializationError:
205
# Retry the transaction
206
raise TransactionRetryError()
207
```
208
209
### Error Handling and Retry Logic
210
211
Handle transaction-specific errors with appropriate retry strategies.
212
213
```python
214
import asyncio
215
import random
216
217
async def retry_transaction(func, max_retries=3):
218
"""Retry transaction with exponential backoff for serialization errors."""
219
for attempt in range(max_retries):
220
try:
221
return await func()
222
except asyncpg.SerializationError:
223
if attempt == max_retries - 1:
224
raise
225
# Exponential backoff with jitter
226
delay = 2 ** attempt + random.uniform(0, 1)
227
await asyncio.sleep(delay)
228
229
async def transfer_money(from_account, to_account, amount):
230
async with conn.transaction(isolation='serializable'):
231
# Check balance
232
balance = await conn.fetchval("SELECT balance FROM accounts WHERE id = $1", from_account)
233
if balance < amount:
234
raise ValueError("Insufficient funds")
235
236
# Transfer money
237
await conn.execute("UPDATE accounts SET balance = balance - $1 WHERE id = $2", amount, from_account)
238
await conn.execute("UPDATE accounts SET balance = balance + $1 WHERE id = $2", amount, to_account)
239
240
# Log transaction
241
await conn.execute(
242
"INSERT INTO transfers(from_account, to_account, amount, timestamp) VALUES($1, $2, $3, $4)",
243
from_account, to_account, amount, datetime.now()
244
)
245
246
# Use with retry
247
try:
248
await retry_transaction(lambda: transfer_money(1, 2, 100))
249
print("Transfer completed successfully")
250
except asyncpg.SerializationError:
251
print("Transfer failed after retries due to conflicts")
252
except ValueError as e:
253
print(f"Transfer failed: {e}")
254
```
255
256
### Transaction Status
257
258
Check transaction state and handle various transaction conditions.
259
260
```python
261
# Check if in transaction
262
if conn.is_in_transaction():
263
print("Already in transaction")
264
else:
265
async with conn.transaction():
266
# Transaction operations
267
pass
268
269
# Handle nested transaction attempts
270
try:
271
async with conn.transaction():
272
# Some operations
273
async with conn.transaction(): # This creates a savepoint
274
# Nested operations
275
pass
276
except asyncpg.InterfaceError as e:
277
if "already in transaction" in str(e):
278
print("Cannot start transaction - already in one")
279
```
280
281
### Long-Running Transactions
282
283
Best practices for managing long-running transactions and avoiding locks.
284
285
```python
286
# Break long operations into smaller transactions
287
async def process_large_dataset(records, batch_size=1000):
288
"""Process large dataset in small transactions to avoid long locks."""
289
290
for i in range(0, len(records), batch_size):
291
batch = records[i:i + batch_size]
292
293
async with conn.transaction():
294
for record in batch:
295
await conn.execute(
296
"INSERT INTO processed_data(data) VALUES($1)",
297
json.dumps(record)
298
)
299
300
# Brief pause between batches to allow other transactions
301
await asyncio.sleep(0.1)
302
303
# Use advisory locks for coordination
304
async with conn.transaction():
305
# Acquire advisory lock
306
acquired = await conn.fetchval("SELECT pg_try_advisory_lock($1)", lock_id)
307
308
if acquired:
309
try:
310
# Do exclusive work
311
await conn.execute("UPDATE global_counter SET value = value + 1")
312
finally:
313
# Release advisory lock
314
await conn.fetchval("SELECT pg_advisory_unlock($1)", lock_id)
315
else:
316
print("Could not acquire lock - another process is working")
317
```
318
319
## Types
320
321
```python { .api }
322
class Transaction:
323
"""Database transaction context manager."""
324
325
async def __aenter__(self) -> 'Transaction':
326
"""Enter transaction context."""
327
328
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
329
"""Exit transaction context with commit/rollback."""
330
```