or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration-utilities.mdconnection-management.mddatabase-operations.mderror-handling.mdindex.mdquery-files.mdquery-formatting.mdtasks-transactions.md

tasks-transactions.mddocs/

0

# Tasks and Transactions

1

2

Task and transaction management for shared connections and automatic transaction handling. Tasks provide shared connection management for multiple queries, while transactions add automatic transaction semantics with support for nested transactions via savepoints.

3

4

## Capabilities

5

6

### Task Management

7

8

Tasks provide a shared connection context for executing multiple queries efficiently.

9

10

```javascript { .api }

11

/**

12

* Execute callback within a shared connection (task)

13

* @param cb - Callback function receiving task object

14

* @returns Promise resolving to callback result

15

*/

16

db.task(cb: (t: ITask) => Promise<any> | any): Promise<any>

17

18

/**

19

* Execute tagged task with shared connection

20

* @param tag - Task identifier for logging/monitoring

21

* @param cb - Callback function receiving task object

22

* @returns Promise resolving to callback result

23

*/

24

db.task(tag: string | number, cb: (t: ITask) => Promise<any> | any): Promise<any>

25

26

/**

27

* Execute task with options and shared connection

28

* @param options - Task configuration options

29

* @param cb - Callback function receiving task object

30

* @returns Promise resolving to callback result

31

*/

32

db.task(options: ITaskOptions, cb: (t: ITask) => Promise<any> | any): Promise<any>

33

34

interface ITaskOptions {

35

tag?: any // Task identifier

36

}

37

```

38

39

**Usage Examples:**

40

41

```javascript

42

// Basic task for multiple related queries

43

const result = await db.task(async t => {

44

const users = await t.any('SELECT * FROM users');

45

const profiles = await t.any('SELECT * FROM user_profiles');

46

const settings = await t.any('SELECT * FROM user_settings');

47

48

return { users, profiles, settings };

49

});

50

51

// Tagged task for monitoring

52

const summary = await db.task('user-summary', async t => {

53

const activeUsers = await t.any('SELECT * FROM users WHERE active = true');

54

const totalOrders = await t.one('SELECT COUNT(*) as count FROM orders', [], r => r.count);

55

56

return { activeUsers: activeUsers.length, totalOrders };

57

});

58

59

// Task with options

60

const data = await db.task({ tag: 'data-export' }, async t => {

61

const users = await t.any('SELECT * FROM users WHERE created_at > $1', [lastExport]);

62

const orders = await t.any('SELECT * FROM orders WHERE created_at > $1', [lastExport]);

63

64

return { users, orders };

65

});

66

```

67

68

### Conditional Tasks

69

70

Conditional tasks that execute only when a condition is met.

71

72

```javascript { .api }

73

/**

74

* Execute conditional task - runs only if condition is true

75

* @param cb - Callback function receiving task object

76

* @returns Promise resolving to callback result or undefined

77

*/

78

db.taskIf(cb: (t: ITask) => Promise<any> | any): Promise<any>

79

80

/**

81

* Execute conditional tagged task

82

* @param tag - Task identifier

83

* @param cb - Callback function receiving task object

84

* @returns Promise resolving to callback result or undefined

85

*/

86

db.taskIf(tag: string | number, cb: (t: ITask) => Promise<any> | any): Promise<any>

87

88

/**

89

* Execute conditional task with options

90

* @param options - Task configuration with condition

91

* @param cb - Callback function receiving task object

92

* @returns Promise resolving to callback result or undefined

93

*/

94

db.taskIf(options: ITaskIfOptions, cb: (t: ITask) => Promise<any> | any): Promise<any>

95

96

interface ITaskIfOptions {

97

cnd?: boolean | ((t: ITask) => boolean) // Condition to evaluate

98

tag?: any // Task identifier

99

}

100

```

101

102

**Usage Examples:**

103

104

```javascript

105

// Simple conditional task

106

const result = await db.taskIf(async t => {

107

// Only runs if some global condition is true

108

return await t.any('SELECT * FROM expensive_query');

109

});

110

111

// Conditional task with boolean condition

112

const backupData = await db.taskIf(

113

{ cnd: isBackupTime, tag: 'backup' },

114

async t => {

115

const users = await t.any('SELECT * FROM users');

116

const orders = await t.any('SELECT * FROM orders');

117

return { users, orders };

118

}

119

);

120

121

// Conditional task with function condition

122

const report = await db.taskIf(

123

{

124

cnd: t => t.ctx.level === 0, // Only at top level

125

tag: 'top-level-report'

126

},

127

async t => {

128

return await t.one('SELECT generate_report() as report', [], r => r.report);

129

}

130

);

131

```

132

133

### Transaction Management

134

135

Transactions provide automatic transaction handling with commit/rollback semantics.

136

137

```javascript { .api }

138

/**

139

* Execute callback within a transaction

140

* @param cb - Callback function receiving transaction object

141

* @returns Promise resolving to callback result

142

*/

143

db.tx(cb: (t: ITask) => Promise<any> | any): Promise<any>

144

145

/**

146

* Execute tagged transaction

147

* @param tag - Transaction identifier for logging/monitoring

148

* @param cb - Callback function receiving transaction object

149

* @returns Promise resolving to callback result

150

*/

151

db.tx(tag: string | number, cb: (t: ITask) => Promise<any> | any): Promise<any>

152

153

/**

154

* Execute transaction with options

155

* @param options - Transaction configuration options

156

* @param cb - Callback function receiving transaction object

157

* @returns Promise resolving to callback result

158

*/

159

db.tx(options: ITxOptions, cb: (t: ITask) => Promise<any> | any): Promise<any>

160

161

interface ITxOptions {

162

tag?: any // Transaction identifier

163

mode?: TransactionMode | null // Transaction isolation mode

164

reusable?: boolean | ((t: ITask) => boolean) // Allow reusable transactions

165

}

166

```

167

168

**Usage Examples:**

169

170

```javascript

171

// Basic transaction

172

const newUserId = await db.tx(async t => {

173

const userId = await t.one('INSERT INTO users(name, email) VALUES($1, $2) RETURNING id',

174

['John Doe', 'john@example.com'], r => r.id);

175

176

await t.none('INSERT INTO user_profiles(user_id, bio) VALUES($1, $2)',

177

[userId, 'Software Engineer']);

178

179

await t.none('INSERT INTO user_settings(user_id, theme) VALUES($1, $2)',

180

[userId, 'dark']);

181

182

return userId;

183

});

184

185

// Tagged transaction for monitoring

186

await db.tx('order-processing', async t => {

187

await t.none('UPDATE inventory SET quantity = quantity - $1 WHERE product_id = $2',

188

[quantity, productId]);

189

190

const orderId = await t.one('INSERT INTO orders(user_id, product_id, quantity) VALUES($1, $2, $3) RETURNING id',

191

[userId, productId, quantity], r => r.id);

192

193

await t.none('INSERT INTO order_audit(order_id, action) VALUES($1, $2)',

194

[orderId, 'created']);

195

});

196

197

// Transaction with isolation mode

198

await db.tx({

199

mode: new pgp.txMode.TransactionMode({

200

tiLevel: pgp.txMode.isolationLevel.serializable

201

})

202

}, async t => {

203

// Critical section requiring serializable isolation

204

const balance = await t.one('SELECT balance FROM accounts WHERE id = $1', [accountId], r => r.balance);

205

206

if (balance >= amount) {

207

await t.none('UPDATE accounts SET balance = balance - $1 WHERE id = $2', [amount, accountId]);

208

await t.none('INSERT INTO transactions(account_id, amount, type) VALUES($1, $2, $3)',

209

[accountId, -amount, 'withdrawal']);

210

} else {

211

throw new Error('Insufficient funds');

212

}

213

});

214

```

215

216

### Conditional Transactions

217

218

Conditional transactions that execute only when a condition is met.

219

220

```javascript { .api }

221

/**

222

* Execute conditional transaction - runs only if condition is true

223

* @param cb - Callback function receiving transaction object

224

* @returns Promise resolving to callback result or undefined

225

*/

226

db.txIf(cb: (t: ITask) => Promise<any> | any): Promise<any>

227

228

/**

229

* Execute conditional tagged transaction

230

* @param tag - Transaction identifier

231

* @param cb - Callback function receiving transaction object

232

* @returns Promise resolving to callback result or undefined

233

*/

234

db.txIf(tag: string | number, cb: (t: ITask) => Promise<any> | any): Promise<any>

235

236

/**

237

* Execute conditional transaction with options

238

* @param options - Transaction configuration with condition

239

* @param cb - Callback function receiving transaction object

240

* @returns Promise resolving to callback result or undefined

241

*/

242

db.txIf(options: ITxIfOptions, cb: (t: ITask) => Promise<any> | any): Promise<any>

243

244

interface ITxIfOptions {

245

cnd?: boolean | ((t: ITask) => boolean) // Condition to evaluate

246

tag?: any // Transaction identifier

247

mode?: TransactionMode | null // Transaction isolation mode

248

reusable?: boolean | ((t: ITask) => boolean) // Allow reusable transactions

249

}

250

```

251

252

**Usage Examples:**

253

254

```javascript

255

// Conditional transaction based on flag

256

const result = await db.txIf(

257

{ cnd: shouldProcessPayment, tag: 'payment' },

258

async t => {

259

await t.none('UPDATE orders SET status = $1 WHERE id = $2', ['paid', orderId]);

260

await t.none('INSERT INTO payments(order_id, amount) VALUES($1, $2)', [orderId, amount]);

261

}

262

);

263

264

// Conditional transaction with dynamic condition

265

await db.txIf(

266

{

267

cnd: t => t.ctx.level < 3, // Only if not too deeply nested

268

tag: 'nested-update'

269

},

270

async t => {

271

await t.none('UPDATE stats SET last_calculated = NOW()');

272

}

273

);

274

```

275

276

### Nested Transactions

277

278

Nested transactions are supported through savepoints.

279

280

**Usage Examples:**

281

282

```javascript

283

// Nested transactions with savepoint handling

284

await db.tx('outer-transaction', async t1 => {

285

await t1.none('INSERT INTO audit_log(action) VALUES($1)', ['outer-start']);

286

287

try {

288

await t1.tx('inner-transaction', async t2 => {

289

await t2.none('INSERT INTO users(name) VALUES($1)', ['Test User']);

290

await t2.none('INSERT INTO invalid_table(data) VALUES($1)', ['test']); // Will fail

291

});

292

} catch (error) {

293

// Inner transaction rolled back to savepoint

294

console.log('Inner transaction failed, continuing with outer');

295

}

296

297

await t1.none('INSERT INTO audit_log(action) VALUES($1)', ['outer-complete']);

298

// Outer transaction commits

299

});

300

```

301

302

### Task Context

303

304

All tasks and transactions provide context information through the `ctx` property.

305

306

```javascript { .api }

307

interface ITaskContext {

308

readonly context: any // User-defined context

309

readonly parent: ITaskContext | null // Parent task context

310

readonly connected: boolean // Whether connection is active

311

readonly inTransaction: boolean // Whether in transaction

312

readonly level: number // Nesting level (0 = top-level)

313

readonly useCount: number // Connection use count

314

readonly isTX: boolean // Whether this is a transaction

315

readonly start: Date // Task start time

316

readonly tag: any // Task identifier

317

readonly dc: any // Database context

318

readonly serverVersion: string // PostgreSQL server version

319

320

// Set at task completion

321

readonly finish?: Date // Task completion time

322

readonly duration?: number // Task duration in milliseconds

323

readonly success?: boolean // Whether task succeeded

324

readonly result?: any // Task result

325

326

// Transaction-specific

327

readonly txLevel?: number // Transaction nesting level

328

}

329

```

330

331

**Usage Examples:**

332

333

```javascript

334

// Using task context

335

await db.task('data-processing', async t => {

336

console.log('Task level:', t.ctx.level);

337

console.log('Task tag:', t.ctx.tag);

338

console.log('In transaction:', t.ctx.inTransaction);

339

console.log('Server version:', t.ctx.serverVersion);

340

341

const users = await t.any('SELECT * FROM users');

342

343

console.log('Task start time:', t.ctx.start);

344

return users;

345

});

346

347

// Conditional logic based on context

348

await db.tx(async t => {

349

const isNested = t.ctx.level > 0;

350

const logPrefix = isNested ? 'NESTED: ' : 'TOP: ';

351

352

console.log(logPrefix + 'Starting transaction at level', t.ctx.level);

353

354

if (!isNested) {

355

// Only do expensive operations at top level

356

await t.none('REFRESH MATERIALIZED VIEW expensive_view');

357

}

358

});

359

```

360

361

### Batch Operations

362

363

Tasks and transactions include batch operation methods from the spex library.

364

365

```javascript { .api }

366

/**

367

* Execute array of functions in parallel with controlled concurrency

368

* @param values - Array of functions or data to process

369

* @param options - Batch processing options

370

* @returns Promise resolving to array of results

371

*/

372

t.batch(values: any[], options?: IBatchOptions): Promise<any[]>

373

374

/**

375

* Process data in pages with controlled iteration

376

* @param source - Data source function or initial data

377

* @param options - Page processing options

378

* @param dest - Optional destination for results

379

* @returns Promise resolving to processing result

380

*/

381

t.page(source: any, options?: IPageOptions, dest?: any): Promise<any>

382

383

/**

384

* Process items in sequence with controlled flow

385

* @param source - Data source function or array

386

* @param options - Sequence processing options

387

* @param dest - Optional destination for results

388

* @returns Promise resolving to processing result

389

*/

390

t.sequence(source: any, options?: ISequenceOptions, dest?: any): Promise<any>

391

```

392

393

**Usage Examples:**

394

395

```javascript

396

// Batch insert multiple users

397

await db.tx(async t => {

398

const userQueries = userData.map(user => {

399

return () => t.none('INSERT INTO users(name, email) VALUES($1, $2)', [user.name, user.email]);

400

});

401

402

await t.batch(userQueries, { concurrency: 5 });

403

});

404

405

// Process large dataset in pages

406

await db.task(async t => {

407

let pageIndex = 0;

408

409

await t.page(

410

() => t.any('SELECT * FROM large_table LIMIT 1000 OFFSET $1', [pageIndex * 1000]),

411

{

412

limit: 10, // Process 10 pages

413

dest: (index, data) => {

414

console.log(`Processing page ${index}, ${data.length} records`);

415

pageIndex = index;

416

return data.length > 0; // Continue if data exists

417

}

418

}

419

);

420

});

421

```

422

423

## Types

424

425

```javascript { .api }

426

// Task interface extends all database methods

427

interface ITask {

428

// All database query methods (none, one, many, etc.)

429

// Plus context and batch operations

430

readonly ctx: ITaskContext

431

432

// Batch operations from spex

433

batch(values: any[], options?: IBatchOptions): Promise<any[]>

434

page(source: any, options?: IPageOptions, dest?: any): Promise<any>

435

sequence(source: any, options?: ISequenceOptions, dest?: any): Promise<any>

436

}

437

438

// Transaction mode configuration

439

class TransactionMode {

440

constructor(options?: ITransactionModeOptions)

441

begin(cap?: boolean): string

442

}

443

444

interface ITransactionModeOptions {

445

tiLevel?: isolationLevel // Transaction isolation level

446

readOnly?: boolean // Read-only transaction

447

deferrable?: boolean // Deferrable transaction

448

}

449

450

// Transaction isolation levels

451

enum isolationLevel {

452

none = 0,

453

serializable = 1,

454

repeatableRead = 2,

455

readCommitted = 3

456

}

457

458

// Batch operation options

459

interface IBatchOptions {

460

concurrency?: number // Maximum concurrent operations

461

}

462

463

interface IPageOptions {

464

limit?: number // Maximum pages to process

465

dest?: (index: number, data: any, delay: number) => boolean // Page processor

466

}

467

468

interface ISequenceOptions {

469

track?: boolean // Track processing statistics

470

dest?: (index: number, data: any) => any // Item processor

471

}

472

```