CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-pg-promise

PostgreSQL interface for Node.js built on top of node-postgres, offering automatic connection management, promise-based API, automatic transaction handling with support for nested transactions and savepoints, advanced query formatting with named parameters and filtering capabilities, task and transaction management with conditional execution, query file loading and processing, custom type formatting support, and comprehensive error handling.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

tasks-transactions.mddocs/

Tasks and Transactions

Task and transaction management for shared connections and automatic transaction handling. Tasks provide shared connection management for multiple queries, while transactions add automatic transaction semantics with support for nested transactions via savepoints.

Capabilities

Task Management

Tasks provide a shared connection context for executing multiple queries efficiently.

/**
 * Execute callback within a shared connection (task)
 * @param cb - Callback function receiving task object
 * @returns Promise resolving to callback result
 */
db.task(cb: (t: ITask) => Promise<any> | any): Promise<any>

/**
 * Execute tagged task with shared connection
 * @param tag - Task identifier for logging/monitoring
 * @param cb - Callback function receiving task object
 * @returns Promise resolving to callback result
 */
db.task(tag: string | number, cb: (t: ITask) => Promise<any> | any): Promise<any>

/**
 * Execute task with options and shared connection
 * @param options - Task configuration options
 * @param cb - Callback function receiving task object
 * @returns Promise resolving to callback result
 */
db.task(options: ITaskOptions, cb: (t: ITask) => Promise<any> | any): Promise<any>

interface ITaskOptions {
  tag?: any // Task identifier
}

Usage Examples:

// Basic task for multiple related queries
const result = await db.task(async t => {
  const users = await t.any('SELECT * FROM users');
  const profiles = await t.any('SELECT * FROM user_profiles');
  const settings = await t.any('SELECT * FROM user_settings');
  
  return { users, profiles, settings };
});

// Tagged task for monitoring
const summary = await db.task('user-summary', async t => {
  const activeUsers = await t.any('SELECT * FROM users WHERE active = true');
  const totalOrders = await t.one('SELECT COUNT(*) as count FROM orders', [], r => r.count);
  
  return { activeUsers: activeUsers.length, totalOrders };
});

// Task with options
const data = await db.task({ tag: 'data-export' }, async t => {
  const users = await t.any('SELECT * FROM users WHERE created_at > $1', [lastExport]);
  const orders = await t.any('SELECT * FROM orders WHERE created_at > $1', [lastExport]);
  
  return { users, orders };
});

Conditional Tasks

Conditional tasks that execute only when a condition is met.

/**
 * Execute conditional task - runs only if condition is true
 * @param cb - Callback function receiving task object
 * @returns Promise resolving to callback result or undefined
 */
db.taskIf(cb: (t: ITask) => Promise<any> | any): Promise<any>

/**
 * Execute conditional tagged task
 * @param tag - Task identifier
 * @param cb - Callback function receiving task object
 * @returns Promise resolving to callback result or undefined
 */
db.taskIf(tag: string | number, cb: (t: ITask) => Promise<any> | any): Promise<any>

/**
 * Execute conditional task with options
 * @param options - Task configuration with condition
 * @param cb - Callback function receiving task object
 * @returns Promise resolving to callback result or undefined
 */
db.taskIf(options: ITaskIfOptions, cb: (t: ITask) => Promise<any> | any): Promise<any>

interface ITaskIfOptions {
  cnd?: boolean | ((t: ITask) => boolean) // Condition to evaluate
  tag?: any // Task identifier
}

Usage Examples:

// Simple conditional task
const result = await db.taskIf(async t => {
  // Only runs if some global condition is true
  return await t.any('SELECT * FROM expensive_query');
});

// Conditional task with boolean condition
const backupData = await db.taskIf(
  { cnd: isBackupTime, tag: 'backup' },
  async t => {
    const users = await t.any('SELECT * FROM users');
    const orders = await t.any('SELECT * FROM orders');
    return { users, orders };
  }
);

// Conditional task with function condition
const report = await db.taskIf(
  { 
    cnd: t => t.ctx.level === 0, // Only at top level
    tag: 'top-level-report' 
  },
  async t => {
    return await t.one('SELECT generate_report() as report', [], r => r.report);
  }
);

Transaction Management

Transactions provide automatic transaction handling with commit/rollback semantics.

/**
 * Execute callback within a transaction
 * @param cb - Callback function receiving transaction object
 * @returns Promise resolving to callback result
 */
db.tx(cb: (t: ITask) => Promise<any> | any): Promise<any>

/**
 * Execute tagged transaction
 * @param tag - Transaction identifier for logging/monitoring
 * @param cb - Callback function receiving transaction object
 * @returns Promise resolving to callback result
 */
db.tx(tag: string | number, cb: (t: ITask) => Promise<any> | any): Promise<any>

/**
 * Execute transaction with options
 * @param options - Transaction configuration options
 * @param cb - Callback function receiving transaction object
 * @returns Promise resolving to callback result
 */
db.tx(options: ITxOptions, cb: (t: ITask) => Promise<any> | any): Promise<any>

interface ITxOptions {
  tag?: any // Transaction identifier
  mode?: TransactionMode | null // Transaction isolation mode
  reusable?: boolean | ((t: ITask) => boolean) // Allow reusable transactions
}

Usage Examples:

// Basic transaction
const newUserId = await db.tx(async t => {
  const userId = await t.one('INSERT INTO users(name, email) VALUES($1, $2) RETURNING id', 
    ['John Doe', 'john@example.com'], r => r.id);
    
  await t.none('INSERT INTO user_profiles(user_id, bio) VALUES($1, $2)', 
    [userId, 'Software Engineer']);
    
  await t.none('INSERT INTO user_settings(user_id, theme) VALUES($1, $2)', 
    [userId, 'dark']);
    
  return userId;
});

// Tagged transaction for monitoring
await db.tx('order-processing', async t => {
  await t.none('UPDATE inventory SET quantity = quantity - $1 WHERE product_id = $2', 
    [quantity, productId]);
    
  const orderId = await t.one('INSERT INTO orders(user_id, product_id, quantity) VALUES($1, $2, $3) RETURNING id',
    [userId, productId, quantity], r => r.id);
    
  await t.none('INSERT INTO order_audit(order_id, action) VALUES($1, $2)', 
    [orderId, 'created']);
});

// Transaction with isolation mode
await db.tx({ 
  mode: new pgp.txMode.TransactionMode({ 
    tiLevel: pgp.txMode.isolationLevel.serializable 
  }) 
}, async t => {
  // Critical section requiring serializable isolation
  const balance = await t.one('SELECT balance FROM accounts WHERE id = $1', [accountId], r => r.balance);
  
  if (balance >= amount) {
    await t.none('UPDATE accounts SET balance = balance - $1 WHERE id = $2', [amount, accountId]);
    await t.none('INSERT INTO transactions(account_id, amount, type) VALUES($1, $2, $3)', 
      [accountId, -amount, 'withdrawal']);
  } else {
    throw new Error('Insufficient funds');
  }
});

Conditional Transactions

Conditional transactions that execute only when a condition is met.

/**
 * Execute conditional transaction - runs only if condition is true
 * @param cb - Callback function receiving transaction object
 * @returns Promise resolving to callback result or undefined
 */
db.txIf(cb: (t: ITask) => Promise<any> | any): Promise<any>

/**
 * Execute conditional tagged transaction
 * @param tag - Transaction identifier
 * @param cb - Callback function receiving transaction object
 * @returns Promise resolving to callback result or undefined
 */
db.txIf(tag: string | number, cb: (t: ITask) => Promise<any> | any): Promise<any>

/**
 * Execute conditional transaction with options
 * @param options - Transaction configuration with condition
 * @param cb - Callback function receiving transaction object
 * @returns Promise resolving to callback result or undefined
 */
db.txIf(options: ITxIfOptions, cb: (t: ITask) => Promise<any> | any): Promise<any>

interface ITxIfOptions {
  cnd?: boolean | ((t: ITask) => boolean) // Condition to evaluate
  tag?: any // Transaction identifier
  mode?: TransactionMode | null // Transaction isolation mode
  reusable?: boolean | ((t: ITask) => boolean) // Allow reusable transactions
}

Usage Examples:

// Conditional transaction based on flag
const result = await db.txIf(
  { cnd: shouldProcessPayment, tag: 'payment' },
  async t => {
    await t.none('UPDATE orders SET status = $1 WHERE id = $2', ['paid', orderId]);
    await t.none('INSERT INTO payments(order_id, amount) VALUES($1, $2)', [orderId, amount]);
  }
);

// Conditional transaction with dynamic condition
await db.txIf(
  { 
    cnd: t => t.ctx.level < 3, // Only if not too deeply nested
    tag: 'nested-update' 
  },
  async t => {
    await t.none('UPDATE stats SET last_calculated = NOW()');
  }
);

Nested Transactions

Nested transactions are supported through savepoints.

Usage Examples:

// Nested transactions with savepoint handling
await db.tx('outer-transaction', async t1 => {
  await t1.none('INSERT INTO audit_log(action) VALUES($1)', ['outer-start']);
  
  try {
    await t1.tx('inner-transaction', async t2 => {
      await t2.none('INSERT INTO users(name) VALUES($1)', ['Test User']);
      await t2.none('INSERT INTO invalid_table(data) VALUES($1)', ['test']); // Will fail
    });
  } catch (error) {
    // Inner transaction rolled back to savepoint
    console.log('Inner transaction failed, continuing with outer');
  }
  
  await t1.none('INSERT INTO audit_log(action) VALUES($1)', ['outer-complete']);
  // Outer transaction commits
});

Task Context

All tasks and transactions provide context information through the ctx property.

interface ITaskContext {
  readonly context: any // User-defined context
  readonly parent: ITaskContext | null // Parent task context
  readonly connected: boolean // Whether connection is active
  readonly inTransaction: boolean // Whether in transaction
  readonly level: number // Nesting level (0 = top-level)
  readonly useCount: number // Connection use count
  readonly isTX: boolean // Whether this is a transaction
  readonly start: Date // Task start time
  readonly tag: any // Task identifier
  readonly dc: any // Database context
  readonly serverVersion: string // PostgreSQL server version
  
  // Set at task completion
  readonly finish?: Date // Task completion time
  readonly duration?: number // Task duration in milliseconds
  readonly success?: boolean // Whether task succeeded
  readonly result?: any // Task result
  
  // Transaction-specific
  readonly txLevel?: number // Transaction nesting level
}

Usage Examples:

// Using task context
await db.task('data-processing', async t => {
  console.log('Task level:', t.ctx.level);
  console.log('Task tag:', t.ctx.tag);
  console.log('In transaction:', t.ctx.inTransaction);
  console.log('Server version:', t.ctx.serverVersion);
  
  const users = await t.any('SELECT * FROM users');
  
  console.log('Task start time:', t.ctx.start);
  return users;
});

// Conditional logic based on context
await db.tx(async t => {
  const isNested = t.ctx.level > 0;
  const logPrefix = isNested ? 'NESTED: ' : 'TOP: ';
  
  console.log(logPrefix + 'Starting transaction at level', t.ctx.level);
  
  if (!isNested) {
    // Only do expensive operations at top level
    await t.none('REFRESH MATERIALIZED VIEW expensive_view');
  }
});

Batch Operations

Tasks and transactions include batch operation methods from the spex library.

/**
 * Execute array of functions in parallel with controlled concurrency
 * @param values - Array of functions or data to process
 * @param options - Batch processing options
 * @returns Promise resolving to array of results
 */
t.batch(values: any[], options?: IBatchOptions): Promise<any[]>

/**
 * Process data in pages with controlled iteration
 * @param source - Data source function or initial data
 * @param options - Page processing options  
 * @param dest - Optional destination for results
 * @returns Promise resolving to processing result
 */
t.page(source: any, options?: IPageOptions, dest?: any): Promise<any>

/**
 * Process items in sequence with controlled flow
 * @param source - Data source function or array
 * @param options - Sequence processing options
 * @param dest - Optional destination for results
 * @returns Promise resolving to processing result
 */
t.sequence(source: any, options?: ISequenceOptions, dest?: any): Promise<any>

Usage Examples:

// Batch insert multiple users
await db.tx(async t => {
  const userQueries = userData.map(user => {
    return () => t.none('INSERT INTO users(name, email) VALUES($1, $2)', [user.name, user.email]);
  });
  
  await t.batch(userQueries, { concurrency: 5 });
});

// Process large dataset in pages
await db.task(async t => {
  let pageIndex = 0;
  
  await t.page(
    () => t.any('SELECT * FROM large_table LIMIT 1000 OFFSET $1', [pageIndex * 1000]),
    {
      limit: 10, // Process 10 pages
      dest: (index, data) => {
        console.log(`Processing page ${index}, ${data.length} records`);
        pageIndex = index;
        return data.length > 0; // Continue if data exists
      }
    }
  );
});

Types

// Task interface extends all database methods
interface ITask {
  // All database query methods (none, one, many, etc.)
  // Plus context and batch operations
  readonly ctx: ITaskContext
  
  // Batch operations from spex
  batch(values: any[], options?: IBatchOptions): Promise<any[]>
  page(source: any, options?: IPageOptions, dest?: any): Promise<any>
  sequence(source: any, options?: ISequenceOptions, dest?: any): Promise<any>
}

// Transaction mode configuration
class TransactionMode {
  constructor(options?: ITransactionModeOptions)
  begin(cap?: boolean): string
}

interface ITransactionModeOptions {
  tiLevel?: isolationLevel // Transaction isolation level
  readOnly?: boolean // Read-only transaction
  deferrable?: boolean // Deferrable transaction
}

// Transaction isolation levels
enum isolationLevel {
  none = 0,
  serializable = 1,
  repeatableRead = 2,
  readCommitted = 3
}

// Batch operation options
interface IBatchOptions {
  concurrency?: number // Maximum concurrent operations
}

interface IPageOptions {
  limit?: number // Maximum pages to process
  dest?: (index: number, data: any, delay: number) => boolean // Page processor
}

interface ISequenceOptions {
  track?: boolean // Track processing statistics
  dest?: (index: number, data: any) => any // Item processor
}

Install with Tessl CLI

npx tessl i tessl/npm-pg-promise

docs

configuration-utilities.md

connection-management.md

database-operations.md

error-handling.md

index.md

query-files.md

query-formatting.md

tasks-transactions.md

tile.json