or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connection-management.mdconnection-pooling.mdcopy-operations.mdcursor-operations.mdexception-handling.mdindex.mdlisteners-notifications.mdprepared-statements.mdquery-execution.mdtransaction-management.mdtype-system.md

transaction-management.mddocs/

0

# Transaction Management

1

2

Comprehensive transaction support including transaction contexts, savepoints, isolation levels, read-only transactions, and nested transaction management with full PostgreSQL transaction semantics.

3

4

## Capabilities

5

6

### Transaction Context

7

8

Create and manage database transactions with automatic rollback on exceptions and explicit commit control.

9

10

```python { .api }

11

def transaction(

12

self,

13

*,

14

isolation: str = None,

15

readonly: bool = False,

16

deferrable: bool = False

17

) -> Transaction:

18

"""

19

Create a Transaction context manager.

20

21

Parameters:

22

isolation: Transaction isolation level ('read_uncommitted', 'read_committed', 'repeatable_read', 'serializable')

23

readonly: Make transaction read-only

24

deferrable: Allow transaction to be deferred (only for serializable read-only)

25

26

Returns:

27

Transaction context manager

28

"""

29

30

def is_in_transaction(self) -> bool:

31

"""

32

Return True if connection is currently inside a transaction.

33

34

Returns:

35

Boolean indicating transaction state

36

"""

37

```

38

39

#### Example Usage

40

41

```python

42

# Basic transaction

43

async with conn.transaction():

44

await conn.execute("INSERT INTO users(name) VALUES($1)", "Alice")

45

await conn.execute("INSERT INTO orders(user_id, amount) VALUES($1, $2)", user_id, 100)

46

# Automatic commit on success, rollback on exception

47

48

# Transaction with isolation level

49

async with conn.transaction(isolation='serializable'):

50

# Strict consistency for critical operations

51

balance = await conn.fetchval("SELECT balance FROM accounts WHERE id = $1", account_id)

52

if balance >= amount:

53

await conn.execute("UPDATE accounts SET balance = balance - $1 WHERE id = $2", amount, account_id)

54

await conn.execute("INSERT INTO transactions(account_id, amount) VALUES($1, $2)", account_id, -amount)

55

56

# Read-only transaction for reporting

57

async with conn.transaction(readonly=True):

58

# Multiple consistent reads

59

user_count = await conn.fetchval("SELECT COUNT(*) FROM users")

60

order_count = await conn.fetchval("SELECT COUNT(*) FROM orders")

61

revenue = await conn.fetchval("SELECT SUM(amount) FROM orders WHERE status = 'completed'")

62

```

63

64

### Transaction Control

65

66

Explicit transaction control with manual commit and rollback operations.

67

68

```python { .api }

69

class Transaction:

70

"""Database transaction context manager."""

71

72

async def start(self) -> None:

73

"""Start the transaction explicitly."""

74

75

async def commit(self) -> None:

76

"""Commit the transaction."""

77

78

async def rollback(self) -> None:

79

"""Rollback the transaction."""

80

```

81

82

#### Example Usage

83

84

```python

85

# Manual transaction control

86

tx = conn.transaction()

87

await tx.start()

88

89

try:

90

await conn.execute("INSERT INTO users(name) VALUES($1)", "Bob")

91

user_id = await conn.fetchval("SELECT lastval()")

92

93

await conn.execute("INSERT INTO profiles(user_id, email) VALUES($1, $2)", user_id, "bob@example.com")

94

95

await tx.commit()

96

print("Transaction committed successfully")

97

98

except Exception as e:

99

await tx.rollback()

100

print(f"Transaction rolled back: {e}")

101

```

102

103

### Savepoints

104

105

Create nested transactions using PostgreSQL savepoints for complex transaction logic.

106

107

```python { .api }

108

async def savepoint(self, name: str = None) -> Transaction:

109

"""

110

Create a savepoint within the current transaction.

111

112

Parameters:

113

name: Optional savepoint name

114

115

Returns:

116

Transaction context manager for the savepoint

117

"""

118

119

async def rollback_to(self, savepoint_name: str) -> None:

120

"""

121

Rollback to a specific savepoint.

122

123

Parameters:

124

savepoint_name: Name of the savepoint to rollback to

125

"""

126

```

127

128

#### Example Usage

129

130

```python

131

async with conn.transaction():

132

# Insert primary record

133

await conn.execute("INSERT INTO orders(customer_id, total) VALUES($1, $2)", customer_id, total)

134

order_id = await conn.fetchval("SELECT lastval()")

135

136

# Try to process each item with individual error handling

137

for item in order_items:

138

async with conn.transaction(): # Nested savepoint

139

try:

140

await conn.execute(

141

"INSERT INTO order_items(order_id, product_id, quantity, price) VALUES($1, $2, $3, $4)",

142

order_id, item.product_id, item.quantity, item.price

143

)

144

await conn.execute(

145

"UPDATE products SET stock = stock - $1 WHERE id = $2",

146

item.quantity, item.product_id

147

)

148

except asyncpg.CheckViolationError:

149

# Insufficient stock - this item will be skipped

150

# but the order and other items will still be processed

151

print(f"Insufficient stock for product {item.product_id}")

152

continue

153

```

154

155

### Transaction Isolation Levels

156

157

PostgreSQL supports four standard transaction isolation levels with different consistency guarantees.

158

159

#### Read Uncommitted

160

161

```python

162

async with conn.transaction(isolation='read_uncommitted'):

163

# Lowest isolation - can read uncommitted changes

164

# Rarely used in practice

165

result = await conn.fetch("SELECT * FROM volatile_data")

166

```

167

168

#### Read Committed (Default)

169

170

```python

171

async with conn.transaction(isolation='read_committed'):

172

# Default level - sees committed changes from other transactions

173

# Good balance of consistency and performance

174

await conn.execute("UPDATE accounts SET balance = balance + $1 WHERE id = $2", amount, account_id)

175

```

176

177

#### Repeatable Read

178

179

```python

180

async with conn.transaction(isolation='repeatable_read'):

181

# Consistent snapshot of data throughout transaction

182

# Good for analytical queries and reports

183

initial_count = await conn.fetchval("SELECT COUNT(*) FROM orders")

184

185

# Do some work...

186

time.sleep(1)

187

188

# Same count guaranteed even if other transactions added orders

189

final_count = await conn.fetchval("SELECT COUNT(*) FROM orders")

190

assert initial_count == final_count

191

```

192

193

#### Serializable

194

195

```python

196

async with conn.transaction(isolation='serializable'):

197

# Strongest isolation - equivalent to serial execution

198

# May fail with serialization errors that require retry

199

try:

200

balance = await conn.fetchval("SELECT balance FROM accounts WHERE id = $1", from_account)

201

if balance >= amount:

202

await conn.execute("UPDATE accounts SET balance = balance - $1 WHERE id = $2", amount, from_account)

203

await conn.execute("UPDATE accounts SET balance = balance + $1 WHERE id = $2", amount, to_account)

204

except asyncpg.SerializationError:

205

# Retry the transaction

206

raise TransactionRetryError()

207

```

208

209

### Error Handling and Retry Logic

210

211

Handle transaction-specific errors with appropriate retry strategies.

212

213

```python

214

import asyncio

215

import random

216

217

async def retry_transaction(func, max_retries=3):

218

"""Retry transaction with exponential backoff for serialization errors."""

219

for attempt in range(max_retries):

220

try:

221

return await func()

222

except asyncpg.SerializationError:

223

if attempt == max_retries - 1:

224

raise

225

# Exponential backoff with jitter

226

delay = 2 ** attempt + random.uniform(0, 1)

227

await asyncio.sleep(delay)

228

229

async def transfer_money(from_account, to_account, amount):

230

async with conn.transaction(isolation='serializable'):

231

# Check balance

232

balance = await conn.fetchval("SELECT balance FROM accounts WHERE id = $1", from_account)

233

if balance < amount:

234

raise ValueError("Insufficient funds")

235

236

# Transfer money

237

await conn.execute("UPDATE accounts SET balance = balance - $1 WHERE id = $2", amount, from_account)

238

await conn.execute("UPDATE accounts SET balance = balance + $1 WHERE id = $2", amount, to_account)

239

240

# Log transaction

241

await conn.execute(

242

"INSERT INTO transfers(from_account, to_account, amount, timestamp) VALUES($1, $2, $3, $4)",

243

from_account, to_account, amount, datetime.now()

244

)

245

246

# Use with retry

247

try:

248

await retry_transaction(lambda: transfer_money(1, 2, 100))

249

print("Transfer completed successfully")

250

except asyncpg.SerializationError:

251

print("Transfer failed after retries due to conflicts")

252

except ValueError as e:

253

print(f"Transfer failed: {e}")

254

```

255

256

### Transaction Status

257

258

Check transaction state and handle various transaction conditions.

259

260

```python

261

# Check if in transaction

262

if conn.is_in_transaction():

263

print("Already in transaction")

264

else:

265

async with conn.transaction():

266

# Transaction operations

267

pass

268

269

# Handle nested transaction attempts

270

try:

271

async with conn.transaction():

272

# Some operations

273

async with conn.transaction(): # This creates a savepoint

274

# Nested operations

275

pass

276

except asyncpg.InterfaceError as e:

277

if "already in transaction" in str(e):

278

print("Cannot start transaction - already in one")

279

```

280

281

### Long-Running Transactions

282

283

Best practices for managing long-running transactions and avoiding locks.

284

285

```python

286

# Break long operations into smaller transactions

287

async def process_large_dataset(records, batch_size=1000):

288

"""Process large dataset in small transactions to avoid long locks."""

289

290

for i in range(0, len(records), batch_size):

291

batch = records[i:i + batch_size]

292

293

async with conn.transaction():

294

for record in batch:

295

await conn.execute(

296

"INSERT INTO processed_data(data) VALUES($1)",

297

json.dumps(record)

298

)

299

300

# Brief pause between batches to allow other transactions

301

await asyncio.sleep(0.1)

302

303

# Use advisory locks for coordination

304

async with conn.transaction():

305

# Acquire advisory lock

306

acquired = await conn.fetchval("SELECT pg_try_advisory_lock($1)", lock_id)

307

308

if acquired:

309

try:

310

# Do exclusive work

311

await conn.execute("UPDATE global_counter SET value = value + 1")

312

finally:

313

# Release advisory lock

314

await conn.fetchval("SELECT pg_advisory_unlock($1)", lock_id)

315

else:

316

print("Could not acquire lock - another process is working")

317

```

318

319

## Types

320

321

```python { .api }

322

class Transaction:

323

"""Database transaction context manager."""

324

325

async def __aenter__(self) -> 'Transaction':

326

"""Enter transaction context."""

327

328

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:

329

"""Exit transaction context with commit/rollback."""

330

```