0
# Tasks and Transactions
1
2
Task and transaction management for shared connections and automatic transaction handling. Tasks provide shared connection management for multiple queries, while transactions add automatic transaction semantics with support for nested transactions via savepoints.
3
4
## Capabilities
5
6
### Task Management
7
8
Tasks provide a shared connection context for executing multiple queries efficiently.
9
10
```javascript { .api }
11
/**
12
* Execute callback within a shared connection (task)
13
* @param cb - Callback function receiving task object
14
* @returns Promise resolving to callback result
15
*/
16
db.task(cb: (t: ITask) => Promise<any> | any): Promise<any>
17
18
/**
19
* Execute tagged task with shared connection
20
* @param tag - Task identifier for logging/monitoring
21
* @param cb - Callback function receiving task object
22
* @returns Promise resolving to callback result
23
*/
24
db.task(tag: string | number, cb: (t: ITask) => Promise<any> | any): Promise<any>
25
26
/**
27
* Execute task with options and shared connection
28
* @param options - Task configuration options
29
* @param cb - Callback function receiving task object
30
* @returns Promise resolving to callback result
31
*/
32
db.task(options: ITaskOptions, cb: (t: ITask) => Promise<any> | any): Promise<any>
33
34
interface ITaskOptions {
35
tag?: any // Task identifier
36
}
37
```
38
39
**Usage Examples:**
40
41
```javascript
42
// Basic task for multiple related queries
43
const result = await db.task(async t => {
44
const users = await t.any('SELECT * FROM users');
45
const profiles = await t.any('SELECT * FROM user_profiles');
46
const settings = await t.any('SELECT * FROM user_settings');
47
48
return { users, profiles, settings };
49
});
50
51
// Tagged task for monitoring
52
const summary = await db.task('user-summary', async t => {
53
const activeUsers = await t.any('SELECT * FROM users WHERE active = true');
54
const totalOrders = await t.one('SELECT COUNT(*) as count FROM orders', [], r => r.count);
55
56
return { activeUsers: activeUsers.length, totalOrders };
57
});
58
59
// Task with options
60
const data = await db.task({ tag: 'data-export' }, async t => {
61
const users = await t.any('SELECT * FROM users WHERE created_at > $1', [lastExport]);
62
const orders = await t.any('SELECT * FROM orders WHERE created_at > $1', [lastExport]);
63
64
return { users, orders };
65
});
66
```
67
68
### Conditional Tasks
69
70
Conditional tasks that execute only when a condition is met.
71
72
```javascript { .api }
73
/**
74
* Execute conditional task - runs only if condition is true
75
* @param cb - Callback function receiving task object
76
* @returns Promise resolving to callback result or undefined
77
*/
78
db.taskIf(cb: (t: ITask) => Promise<any> | any): Promise<any>
79
80
/**
81
* Execute conditional tagged task
82
* @param tag - Task identifier
83
* @param cb - Callback function receiving task object
84
* @returns Promise resolving to callback result or undefined
85
*/
86
db.taskIf(tag: string | number, cb: (t: ITask) => Promise<any> | any): Promise<any>
87
88
/**
89
* Execute conditional task with options
90
* @param options - Task configuration with condition
91
* @param cb - Callback function receiving task object
92
* @returns Promise resolving to callback result or undefined
93
*/
94
db.taskIf(options: ITaskIfOptions, cb: (t: ITask) => Promise<any> | any): Promise<any>
95
96
interface ITaskIfOptions {
97
cnd?: boolean | ((t: ITask) => boolean) // Condition to evaluate
98
tag?: any // Task identifier
99
}
100
```
101
102
**Usage Examples:**
103
104
```javascript
105
// Simple conditional task
106
const result = await db.taskIf(async t => {
107
// Only runs if some global condition is true
108
return await t.any('SELECT * FROM expensive_query');
109
});
110
111
// Conditional task with boolean condition
112
const backupData = await db.taskIf(
113
{ cnd: isBackupTime, tag: 'backup' },
114
async t => {
115
const users = await t.any('SELECT * FROM users');
116
const orders = await t.any('SELECT * FROM orders');
117
return { users, orders };
118
}
119
);
120
121
// Conditional task with function condition
122
const report = await db.taskIf(
123
{
124
cnd: t => t.ctx.level === 0, // Only at top level
125
tag: 'top-level-report'
126
},
127
async t => {
128
return await t.one('SELECT generate_report() as report', [], r => r.report);
129
}
130
);
131
```
132
133
### Transaction Management
134
135
Transactions provide automatic transaction handling with commit/rollback semantics.
136
137
```javascript { .api }
138
/**
139
* Execute callback within a transaction
140
* @param cb - Callback function receiving transaction object
141
* @returns Promise resolving to callback result
142
*/
143
db.tx(cb: (t: ITask) => Promise<any> | any): Promise<any>
144
145
/**
146
* Execute tagged transaction
147
* @param tag - Transaction identifier for logging/monitoring
148
* @param cb - Callback function receiving transaction object
149
* @returns Promise resolving to callback result
150
*/
151
db.tx(tag: string | number, cb: (t: ITask) => Promise<any> | any): Promise<any>
152
153
/**
154
* Execute transaction with options
155
* @param options - Transaction configuration options
156
* @param cb - Callback function receiving transaction object
157
* @returns Promise resolving to callback result
158
*/
159
db.tx(options: ITxOptions, cb: (t: ITask) => Promise<any> | any): Promise<any>
160
161
interface ITxOptions {
162
tag?: any // Transaction identifier
163
mode?: TransactionMode | null // Transaction isolation mode
164
reusable?: boolean | ((t: ITask) => boolean) // Allow reusable transactions
165
}
166
```
167
168
**Usage Examples:**
169
170
```javascript
171
// Basic transaction
172
const newUserId = await db.tx(async t => {
173
const userId = await t.one('INSERT INTO users(name, email) VALUES($1, $2) RETURNING id',
174
['John Doe', 'john@example.com'], r => r.id);
175
176
await t.none('INSERT INTO user_profiles(user_id, bio) VALUES($1, $2)',
177
[userId, 'Software Engineer']);
178
179
await t.none('INSERT INTO user_settings(user_id, theme) VALUES($1, $2)',
180
[userId, 'dark']);
181
182
return userId;
183
});
184
185
// Tagged transaction for monitoring
186
await db.tx('order-processing', async t => {
187
await t.none('UPDATE inventory SET quantity = quantity - $1 WHERE product_id = $2',
188
[quantity, productId]);
189
190
const orderId = await t.one('INSERT INTO orders(user_id, product_id, quantity) VALUES($1, $2, $3) RETURNING id',
191
[userId, productId, quantity], r => r.id);
192
193
await t.none('INSERT INTO order_audit(order_id, action) VALUES($1, $2)',
194
[orderId, 'created']);
195
});
196
197
// Transaction with isolation mode
198
await db.tx({
199
mode: new pgp.txMode.TransactionMode({
200
tiLevel: pgp.txMode.isolationLevel.serializable
201
})
202
}, async t => {
203
// Critical section requiring serializable isolation
204
const balance = await t.one('SELECT balance FROM accounts WHERE id = $1', [accountId], r => r.balance);
205
206
if (balance >= amount) {
207
await t.none('UPDATE accounts SET balance = balance - $1 WHERE id = $2', [amount, accountId]);
208
await t.none('INSERT INTO transactions(account_id, amount, type) VALUES($1, $2, $3)',
209
[accountId, -amount, 'withdrawal']);
210
} else {
211
throw new Error('Insufficient funds');
212
}
213
});
214
```
215
216
### Conditional Transactions
217
218
Conditional transactions that execute only when a condition is met.
219
220
```javascript { .api }
221
/**
222
* Execute conditional transaction - runs only if condition is true
223
* @param cb - Callback function receiving transaction object
224
* @returns Promise resolving to callback result or undefined
225
*/
226
db.txIf(cb: (t: ITask) => Promise<any> | any): Promise<any>
227
228
/**
229
* Execute conditional tagged transaction
230
* @param tag - Transaction identifier
231
* @param cb - Callback function receiving transaction object
232
* @returns Promise resolving to callback result or undefined
233
*/
234
db.txIf(tag: string | number, cb: (t: ITask) => Promise<any> | any): Promise<any>
235
236
/**
237
* Execute conditional transaction with options
238
* @param options - Transaction configuration with condition
239
* @param cb - Callback function receiving transaction object
240
* @returns Promise resolving to callback result or undefined
241
*/
242
db.txIf(options: ITxIfOptions, cb: (t: ITask) => Promise<any> | any): Promise<any>
243
244
interface ITxIfOptions {
245
cnd?: boolean | ((t: ITask) => boolean) // Condition to evaluate
246
tag?: any // Transaction identifier
247
mode?: TransactionMode | null // Transaction isolation mode
248
reusable?: boolean | ((t: ITask) => boolean) // Allow reusable transactions
249
}
250
```
251
252
**Usage Examples:**
253
254
```javascript
255
// Conditional transaction based on flag
256
const result = await db.txIf(
257
{ cnd: shouldProcessPayment, tag: 'payment' },
258
async t => {
259
await t.none('UPDATE orders SET status = $1 WHERE id = $2', ['paid', orderId]);
260
await t.none('INSERT INTO payments(order_id, amount) VALUES($1, $2)', [orderId, amount]);
261
}
262
);
263
264
// Conditional transaction with dynamic condition
265
await db.txIf(
266
{
267
cnd: t => t.ctx.level < 3, // Only if not too deeply nested
268
tag: 'nested-update'
269
},
270
async t => {
271
await t.none('UPDATE stats SET last_calculated = NOW()');
272
}
273
);
274
```
275
276
### Nested Transactions
277
278
Nested transactions are supported through savepoints.
279
280
**Usage Examples:**
281
282
```javascript
283
// Nested transactions with savepoint handling
284
await db.tx('outer-transaction', async t1 => {
285
await t1.none('INSERT INTO audit_log(action) VALUES($1)', ['outer-start']);
286
287
try {
288
await t1.tx('inner-transaction', async t2 => {
289
await t2.none('INSERT INTO users(name) VALUES($1)', ['Test User']);
290
await t2.none('INSERT INTO invalid_table(data) VALUES($1)', ['test']); // Will fail
291
});
292
} catch (error) {
293
// Inner transaction rolled back to savepoint
294
console.log('Inner transaction failed, continuing with outer');
295
}
296
297
await t1.none('INSERT INTO audit_log(action) VALUES($1)', ['outer-complete']);
298
// Outer transaction commits
299
});
300
```
301
302
### Task Context
303
304
All tasks and transactions provide context information through the `ctx` property.
305
306
```javascript { .api }
307
interface ITaskContext {
308
readonly context: any // User-defined context
309
readonly parent: ITaskContext | null // Parent task context
310
readonly connected: boolean // Whether connection is active
311
readonly inTransaction: boolean // Whether in transaction
312
readonly level: number // Nesting level (0 = top-level)
313
readonly useCount: number // Connection use count
314
readonly isTX: boolean // Whether this is a transaction
315
readonly start: Date // Task start time
316
readonly tag: any // Task identifier
317
readonly dc: any // Database context
318
readonly serverVersion: string // PostgreSQL server version
319
320
// Set at task completion
321
readonly finish?: Date // Task completion time
322
readonly duration?: number // Task duration in milliseconds
323
readonly success?: boolean // Whether task succeeded
324
readonly result?: any // Task result
325
326
// Transaction-specific
327
readonly txLevel?: number // Transaction nesting level
328
}
329
```
330
331
**Usage Examples:**
332
333
```javascript
334
// Using task context
335
await db.task('data-processing', async t => {
336
console.log('Task level:', t.ctx.level);
337
console.log('Task tag:', t.ctx.tag);
338
console.log('In transaction:', t.ctx.inTransaction);
339
console.log('Server version:', t.ctx.serverVersion);
340
341
const users = await t.any('SELECT * FROM users');
342
343
console.log('Task start time:', t.ctx.start);
344
return users;
345
});
346
347
// Conditional logic based on context
348
await db.tx(async t => {
349
const isNested = t.ctx.level > 0;
350
const logPrefix = isNested ? 'NESTED: ' : 'TOP: ';
351
352
console.log(logPrefix + 'Starting transaction at level', t.ctx.level);
353
354
if (!isNested) {
355
// Only do expensive operations at top level
356
await t.none('REFRESH MATERIALIZED VIEW expensive_view');
357
}
358
});
359
```
360
361
### Batch Operations
362
363
Tasks and transactions include batch operation methods from the spex library.
364
365
```javascript { .api }
366
/**
367
* Execute array of functions in parallel with controlled concurrency
368
* @param values - Array of functions or data to process
369
* @param options - Batch processing options
370
* @returns Promise resolving to array of results
371
*/
372
t.batch(values: any[], options?: IBatchOptions): Promise<any[]>
373
374
/**
375
* Process data in pages with controlled iteration
376
* @param source - Data source function or initial data
377
* @param options - Page processing options
378
* @param dest - Optional destination for results
379
* @returns Promise resolving to processing result
380
*/
381
t.page(source: any, options?: IPageOptions, dest?: any): Promise<any>
382
383
/**
384
* Process items in sequence with controlled flow
385
* @param source - Data source function or array
386
* @param options - Sequence processing options
387
* @param dest - Optional destination for results
388
* @returns Promise resolving to processing result
389
*/
390
t.sequence(source: any, options?: ISequenceOptions, dest?: any): Promise<any>
391
```
392
393
**Usage Examples:**
394
395
```javascript
396
// Batch insert multiple users
397
await db.tx(async t => {
398
const userQueries = userData.map(user => {
399
return () => t.none('INSERT INTO users(name, email) VALUES($1, $2)', [user.name, user.email]);
400
});
401
402
await t.batch(userQueries, { concurrency: 5 });
403
});
404
405
// Process large dataset in pages
406
await db.task(async t => {
407
let pageIndex = 0;
408
409
await t.page(
410
() => t.any('SELECT * FROM large_table LIMIT 1000 OFFSET $1', [pageIndex * 1000]),
411
{
412
limit: 10, // Process 10 pages
413
dest: (index, data) => {
414
console.log(`Processing page ${index}, ${data.length} records`);
415
pageIndex = index;
416
return data.length > 0; // Continue if data exists
417
}
418
}
419
);
420
});
421
```
422
423
## Types
424
425
```javascript { .api }
426
// Task interface extends all database methods
427
interface ITask {
428
// All database query methods (none, one, many, etc.)
429
// Plus context and batch operations
430
readonly ctx: ITaskContext
431
432
// Batch operations from spex
433
batch(values: any[], options?: IBatchOptions): Promise<any[]>
434
page(source: any, options?: IPageOptions, dest?: any): Promise<any>
435
sequence(source: any, options?: ISequenceOptions, dest?: any): Promise<any>
436
}
437
438
// Transaction mode configuration
439
class TransactionMode {
440
constructor(options?: ITransactionModeOptions)
441
begin(cap?: boolean): string
442
}
443
444
interface ITransactionModeOptions {
445
tiLevel?: isolationLevel // Transaction isolation level
446
readOnly?: boolean // Read-only transaction
447
deferrable?: boolean // Deferrable transaction
448
}
449
450
// Transaction isolation levels
451
enum isolationLevel {
452
none = 0,
453
serializable = 1,
454
repeatableRead = 2,
455
readCommitted = 3
456
}
457
458
// Batch operation options
459
interface IBatchOptions {
460
concurrency?: number // Maximum concurrent operations
461
}
462
463
interface IPageOptions {
464
limit?: number // Maximum pages to process
465
dest?: (index: number, data: any, delay: number) => boolean // Page processor
466
}
467
468
interface ISequenceOptions {
469
track?: boolean // Track processing statistics
470
dest?: (index: number, data: any) => any // Item processor
471
}
472
```