CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-pg

PostgreSQL client library for Node.js with both pure JavaScript and optional native libpq bindings

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

cursor.mddocs/

Query Cursors

Server-side cursors for efficient processing of large PostgreSQL result sets without loading all rows into memory at once.

Capabilities

Cursor Constructor

Creates a new query cursor for server-side result streaming.

/**
 * Create a new query cursor
 * @param text - SQL query text
 * @param values - Optional parameter values
 * @param config - Optional cursor configuration
 */
class Cursor extends EventEmitter {
  constructor(text: string, values?: any[], config?: CursorConfig);
}

interface CursorConfig {
  /** Row output mode ('array' or 'object') */
  rowMode?: 'array' | 'object';
  /** Custom type parser object */
  types?: any;
  /** Promise constructor to use */
  Promise?: typeof Promise;
}

Usage Examples:

const Cursor = require('pg-cursor');
const { Client } = require('pg');

const client = new Client();
await client.connect();

// Basic cursor
const cursor1 = new Cursor('SELECT * FROM users');

// Parameterized cursor
const cursor2 = new Cursor('SELECT * FROM users WHERE age > $1', [25]);

// Cursor with configuration
const cursor3 = new Cursor('SELECT id, name FROM users', null, {
  rowMode: 'array'
});

// Submit cursor to client
const query = client.query(cursor1);

Read Rows

Read a specified number of rows from the cursor.

/**
 * Read rows from the cursor
 * @param rows - Number of rows to read
 * @param callback - Optional callback for result
 * @returns Promise when no callback provided
 */
read(rows: number, callback?: (err: Error | null, rows: any[], result: QueryResult) => void): Promise<any[]>;

Usage Examples:

// Callback-based reading
const cursor = client.query(new Cursor('SELECT * FROM large_table'));

cursor.read(100, (err, rows, result) => {
  if (err) throw err;
  console.log(`Read ${rows.length} rows`);
  console.log(`Total processed: ${result.rows.length}`);
});

// Promise-based reading
const rows = await cursor.read(100);
console.log(`Retrieved ${rows.length} rows`);

// Read all remaining rows
const allRows = await cursor.read(1000000);

Close Cursor

Close the cursor and release server resources.

/**
 * Close the cursor and cleanup resources
 * @param callback - Optional callback for completion
 * @returns Promise when no callback provided
 */
close(callback?: (err: Error | null) => void): Promise<void>;

Usage Examples:

// Callback-based close
cursor.close((err) => {
  if (err) throw err;
  console.log('Cursor closed');
});

// Promise-based close
await cursor.close();
console.log('Cursor closed');

// Always close cursors in finally blocks
try {
  const cursor = client.query(new Cursor('SELECT * FROM users'));
  const rows = await cursor.read(100);
  // Process rows...
} finally {
  await cursor.close();
}

Cursor Events

Cursors emit events during their lifecycle for event-driven processing.

// Row processing event
cursor.on('row', (row: any, result: QueryResult) => {
  // Emitted for each row during read operations
});

// Completion event
cursor.on('end', (result: QueryResult) => {
  // Emitted when cursor reaches end or is closed
});

// Error event
cursor.on('error', (err: Error) => {
  // Emitted when cursor encounters an error
});

Usage Examples:

const cursor = client.query(new Cursor('SELECT * FROM large_table'));

// Process rows individually as they arrive
cursor.on('row', (row, result) => {
  console.log('Processing user:', row.name);
  // Add row to result accumulator
  result.addRow(row);
});

cursor.on('end', (result) => {
  console.log(`Processed ${result.rows.length} total rows`);
});

cursor.on('error', (err) => {
  console.error('Cursor error:', err);
});

// Start reading (triggers row events)
await cursor.read(1000);

Usage Patterns

Batch Processing

Process large tables in manageable batches.

const cursor = client.query(new Cursor('SELECT * FROM huge_table ORDER BY id'));

let batch;
let totalProcessed = 0;

do {
  batch = await cursor.read(1000);
  
  // Process this batch
  for (const row of batch) {
    await processRow(row);
  }
  
  totalProcessed += batch.length;
  console.log(`Processed ${totalProcessed} rows so far...`);
  
} while (batch.length > 0);

await cursor.close();
console.log(`Finished processing ${totalProcessed} total rows`);

Memory-Efficient ETL

Extract, transform, and load data without memory overflow.

async function etlLargeTable() {
  const sourceCursor = sourceClient.query(
    new Cursor('SELECT * FROM source_table')
  );
  
  try {
    let batch;
    do {
      batch = await sourceCursor.read(500);
      
      // Transform batch
      const transformedRows = batch.map(transformRow);
      
      // Load into destination
      if (transformedRows.length > 0) {
        await insertBatch(targetClient, transformedRows);
      }
      
    } while (batch.length > 0);
    
  } finally {
    await sourceCursor.close();
  }
}

Real-time Processing

Process rows as they become available.

const cursor = client.query(new Cursor(
  'SELECT * FROM events WHERE created_at > $1 ORDER BY created_at',
  [lastProcessedTime]
));

cursor.on('row', async (row) => {
  await processEvent(row);
  lastProcessedTime = row.created_at;
});

cursor.on('end', () => {
  console.log('All events processed');
});

// Start streaming
await cursor.read(10000);

Transaction Support

Use cursors within database transactions.

await client.query('BEGIN');

try {
  const cursor = client.query(new Cursor(
    'SELECT * FROM accounts WHERE balance > 0 FOR UPDATE'
  ));
  
  let batch;
  do {
    batch = await cursor.read(100);
    
    for (const account of batch) {
      // Update account within transaction
      await client.query(
        'UPDATE accounts SET balance = balance * $1 WHERE id = $2',
        [1.05, account.id]
      );
    }
    
  } while (batch.length > 0);
  
  await cursor.close();
  await client.query('COMMIT');
  
} catch (err) {
  await client.query('ROLLBACK');
  throw err;
}

Connection Pool Usage

Use cursors with connection pools (requires dedicated client).

const { Pool } = require('pg');
const pool = new Pool();

async function processCursorData() {
  const client = await pool.connect();
  
  try {
    const cursor = client.query(new Cursor('SELECT * FROM large_table'));
    
    let totalRows = 0;
    let batch;
    
    do {
      batch = await cursor.read(1000);
      totalRows += batch.length;
      
      // Process batch...
      await processBatch(batch);
      
    } while (batch.length > 0);
    
    await cursor.close();
    return totalRows;
    
  } finally {
    client.release(); // Return client to pool
  }
}

Configuration Options

Row Mode

Control how rows are returned from the cursor.

// Object mode (default)
const objectCursor = new Cursor('SELECT id, name FROM users');
// Returns: [{ id: 1, name: 'Alice' }, { id: 2, name: 'Bob' }]

// Array mode
const arrayCursor = new Cursor('SELECT id, name FROM users', null, {
  rowMode: 'array'
});
// Returns: [[1, 'Alice'], [2, 'Bob']]

Custom Type Parsing

Use custom type parsers with cursors.

const customTypes = {
  getTypeParser: (oid, format) => {
    if (oid === 1184) { // timestamptz
      return (val) => new Date(val);
    }
    return types.getTypeParser(oid, format);
  }
};

const cursor = new Cursor('SELECT created_at FROM events', null, {
  types: customTypes
});

Promise Constructor

Use custom Promise implementation.

const BluebirdPromise = require('bluebird');

const cursor = new Cursor('SELECT * FROM users', null, {
  Promise: BluebirdPromise
});

// All cursor operations return Bluebird promises
const rows = await cursor.read(100); // Returns Bluebird promise

Error Handling

try {
  const cursor = client.query(new Cursor('SELECT * FROM users'));
  
  cursor.on('error', (err) => {
    console.error('Cursor error:', err);
  });
  
  const rows = await cursor.read(100);
  await cursor.close();
  
} catch (err) {
  console.error('Operation failed:', err);
}

Best Practices

Always Close Cursors

// Good: Ensure cursors are always closed
const cursor = client.query(new Cursor('SELECT * FROM table'));
try {
  const rows = await cursor.read(100);
  // Process rows...
} finally {
  await cursor.close(); // Always close
}

Reasonable Batch Sizes

// Good: Moderate batch sizes
const rows = await cursor.read(1000); // Reasonable batch

// Avoid: Very large batches
const rows = await cursor.read(1000000); // May cause memory issues

Error Recovery

const cursor = client.query(new Cursor('SELECT * FROM table'));

try {
  let batch;
  do {
    try {
      batch = await cursor.read(100);
      await processBatch(batch);
    } catch (batchError) {
      console.error('Batch processing error:', batchError);
      // Decide whether to continue or abort
      break;
    }
  } while (batch && batch.length > 0);
  
} finally {
  await cursor.close();
}

Install with Tessl CLI

npx tessl i tessl/npm-pg

docs

client.md

connection-string.md

cursor.md

index.md

pool.md

query-stream.md

query.md

types.md

utilities.md

tile.json