PostgreSQL client library for Node.js with both pure JavaScript and optional native libpq bindings
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Server-side cursors for efficient processing of large PostgreSQL result sets without loading all rows into memory at once.
Creates a new query cursor for server-side result streaming.
/**
* Create a new query cursor
* @param text - SQL query text
* @param values - Optional parameter values
* @param config - Optional cursor configuration
*/
class Cursor extends EventEmitter {
constructor(text: string, values?: any[], config?: CursorConfig);
}
interface CursorConfig {
/** Row output mode ('array' or 'object') */
rowMode?: 'array' | 'object';
/** Custom type parser object */
types?: any;
/** Promise constructor to use */
Promise?: typeof Promise;
}Usage Examples:
const Cursor = require('pg-cursor');
const { Client } = require('pg');
const client = new Client();
await client.connect();
// Basic cursor
const cursor1 = new Cursor('SELECT * FROM users');
// Parameterized cursor
const cursor2 = new Cursor('SELECT * FROM users WHERE age > $1', [25]);
// Cursor with configuration
const cursor3 = new Cursor('SELECT id, name FROM users', null, {
rowMode: 'array'
});
// Submit cursor to client
const query = client.query(cursor1);Read a specified number of rows from the cursor.
/**
* Read rows from the cursor
* @param rows - Number of rows to read
* @param callback - Optional callback for result
* @returns Promise when no callback provided
*/
read(rows: number, callback?: (err: Error | null, rows: any[], result: QueryResult) => void): Promise<any[]>;Usage Examples:
// Callback-based reading
const cursor = client.query(new Cursor('SELECT * FROM large_table'));
cursor.read(100, (err, rows, result) => {
if (err) throw err;
console.log(`Read ${rows.length} rows`);
console.log(`Total processed: ${result.rows.length}`);
});
// Promise-based reading
const rows = await cursor.read(100);
console.log(`Retrieved ${rows.length} rows`);
// Read all remaining rows
const allRows = await cursor.read(1000000);Close the cursor and release server resources.
/**
* Close the cursor and cleanup resources
* @param callback - Optional callback for completion
* @returns Promise when no callback provided
*/
close(callback?: (err: Error | null) => void): Promise<void>;Usage Examples:
// Callback-based close
cursor.close((err) => {
if (err) throw err;
console.log('Cursor closed');
});
// Promise-based close
await cursor.close();
console.log('Cursor closed');
// Always close cursors in finally blocks
try {
const cursor = client.query(new Cursor('SELECT * FROM users'));
const rows = await cursor.read(100);
// Process rows...
} finally {
await cursor.close();
}Cursors emit events during their lifecycle for event-driven processing.
// Row processing event
cursor.on('row', (row: any, result: QueryResult) => {
// Emitted for each row during read operations
});
// Completion event
cursor.on('end', (result: QueryResult) => {
// Emitted when cursor reaches end or is closed
});
// Error event
cursor.on('error', (err: Error) => {
// Emitted when cursor encounters an error
});Usage Examples:
const cursor = client.query(new Cursor('SELECT * FROM large_table'));
// Process rows individually as they arrive
cursor.on('row', (row, result) => {
console.log('Processing user:', row.name);
// Add row to result accumulator
result.addRow(row);
});
cursor.on('end', (result) => {
console.log(`Processed ${result.rows.length} total rows`);
});
cursor.on('error', (err) => {
console.error('Cursor error:', err);
});
// Start reading (triggers row events)
await cursor.read(1000);Process large tables in manageable batches.
const cursor = client.query(new Cursor('SELECT * FROM huge_table ORDER BY id'));
let batch;
let totalProcessed = 0;
do {
batch = await cursor.read(1000);
// Process this batch
for (const row of batch) {
await processRow(row);
}
totalProcessed += batch.length;
console.log(`Processed ${totalProcessed} rows so far...`);
} while (batch.length > 0);
await cursor.close();
console.log(`Finished processing ${totalProcessed} total rows`);Extract, transform, and load data without memory overflow.
async function etlLargeTable() {
const sourceCursor = sourceClient.query(
new Cursor('SELECT * FROM source_table')
);
try {
let batch;
do {
batch = await sourceCursor.read(500);
// Transform batch
const transformedRows = batch.map(transformRow);
// Load into destination
if (transformedRows.length > 0) {
await insertBatch(targetClient, transformedRows);
}
} while (batch.length > 0);
} finally {
await sourceCursor.close();
}
}Process rows as they become available.
const cursor = client.query(new Cursor(
'SELECT * FROM events WHERE created_at > $1 ORDER BY created_at',
[lastProcessedTime]
));
cursor.on('row', async (row) => {
await processEvent(row);
lastProcessedTime = row.created_at;
});
cursor.on('end', () => {
console.log('All events processed');
});
// Start streaming
await cursor.read(10000);Use cursors within database transactions.
await client.query('BEGIN');
try {
const cursor = client.query(new Cursor(
'SELECT * FROM accounts WHERE balance > 0 FOR UPDATE'
));
let batch;
do {
batch = await cursor.read(100);
for (const account of batch) {
// Update account within transaction
await client.query(
'UPDATE accounts SET balance = balance * $1 WHERE id = $2',
[1.05, account.id]
);
}
} while (batch.length > 0);
await cursor.close();
await client.query('COMMIT');
} catch (err) {
await client.query('ROLLBACK');
throw err;
}Use cursors with connection pools (requires dedicated client).
const { Pool } = require('pg');
const pool = new Pool();
async function processCursorData() {
const client = await pool.connect();
try {
const cursor = client.query(new Cursor('SELECT * FROM large_table'));
let totalRows = 0;
let batch;
do {
batch = await cursor.read(1000);
totalRows += batch.length;
// Process batch...
await processBatch(batch);
} while (batch.length > 0);
await cursor.close();
return totalRows;
} finally {
client.release(); // Return client to pool
}
}Control how rows are returned from the cursor.
// Object mode (default)
const objectCursor = new Cursor('SELECT id, name FROM users');
// Returns: [{ id: 1, name: 'Alice' }, { id: 2, name: 'Bob' }]
// Array mode
const arrayCursor = new Cursor('SELECT id, name FROM users', null, {
rowMode: 'array'
});
// Returns: [[1, 'Alice'], [2, 'Bob']]Use custom type parsers with cursors.
const customTypes = {
getTypeParser: (oid, format) => {
if (oid === 1184) { // timestamptz
return (val) => new Date(val);
}
return types.getTypeParser(oid, format);
}
};
const cursor = new Cursor('SELECT created_at FROM events', null, {
types: customTypes
});Use custom Promise implementation.
const BluebirdPromise = require('bluebird');
const cursor = new Cursor('SELECT * FROM users', null, {
Promise: BluebirdPromise
});
// All cursor operations return Bluebird promises
const rows = await cursor.read(100); // Returns Bluebird promisetry {
const cursor = client.query(new Cursor('SELECT * FROM users'));
cursor.on('error', (err) => {
console.error('Cursor error:', err);
});
const rows = await cursor.read(100);
await cursor.close();
} catch (err) {
console.error('Operation failed:', err);
}// Good: Ensure cursors are always closed
const cursor = client.query(new Cursor('SELECT * FROM table'));
try {
const rows = await cursor.read(100);
// Process rows...
} finally {
await cursor.close(); // Always close
}// Good: Moderate batch sizes
const rows = await cursor.read(1000); // Reasonable batch
// Avoid: Very large batches
const rows = await cursor.read(1000000); // May cause memory issuesconst cursor = client.query(new Cursor('SELECT * FROM table'));
try {
let batch;
do {
try {
batch = await cursor.read(100);
await processBatch(batch);
} catch (batchError) {
console.error('Batch processing error:', batchError);
// Decide whether to continue or abort
break;
}
} while (batch && batch.length > 0);
} finally {
await cursor.close();
}Install with Tessl CLI
npx tessl i tessl/npm-pg