PostgreSQL client library for Node.js with both pure JavaScript and optional native libpq bindings
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Node.js Readable stream interface for PostgreSQL query results, enabling memory-efficient processing of large result sets through standard stream APIs.
Creates a readable stream for PostgreSQL query results.
/**
* Create a new query stream
* @param text - SQL query text
* @param values - Optional parameter values
* @param config - Optional stream configuration
*/
class QueryStream extends Readable {
constructor(text: string, values?: any[], config?: QueryStreamConfig);
}
interface QueryStreamConfig {
/** Number of rows to fetch per batch (overrides highWaterMark) */
batchSize?: number;
/** Stream's high water mark (default: 100) */
highWaterMark?: number;
/** Return rows as arrays instead of objects */
rowMode?: 'array';
/** Custom type parser object */
types?: any;
}Usage Examples:
const QueryStream = require('pg-query-stream');
const { Client } = require('pg');
const client = new Client();
await client.connect();
// Basic query stream
const stream1 = new QueryStream('SELECT * FROM users');
// Parameterized query stream
const stream2 = new QueryStream('SELECT * FROM orders WHERE user_id = $1', [123]);
// Configured stream
const stream3 = new QueryStream('SELECT * FROM large_table', [], {
batchSize: 1000,
rowMode: 'array'
});
// Execute stream
const queryStream = client.query(stream1);Process query results using standard Node.js stream APIs.
// Standard Readable stream events
stream.on('data', (row) => {}); // Each row as it arrives
stream.on('end', () => {}); // All rows processed
stream.on('error', (err) => {}); // Stream error occurred
stream.on('close', () => {}); // Stream closed
stream.on('readable', () => {}); // Data available to read
// Stream control methods
stream.read(size?: number): any | null;
stream.pipe(destination: Writable): Writable;
stream.destroy(error?: Error): void;
stream.pause(): void;
stream.resume(): void;Usage Examples:
// Event-driven processing
const stream = client.query(new QueryStream('SELECT * FROM users'));
stream.on('data', (row) => {
console.log('User:', row.name, row.email);
});
stream.on('end', () => {
console.log('Finished processing all users');
});
stream.on('error', (err) => {
console.error('Query error:', err);
});
// Manual reading
stream.on('readable', () => {
let row;
while ((row = stream.read()) !== null) {
processRow(row);
}
});Use modern async iteration for clean streaming code (Node.js ≥10).
// Async iteration
const stream = client.query(new QueryStream('SELECT * FROM products'));
for await (const row of stream) {
console.log('Product:', row.name, row.price);
// Can break early if needed
if (row.discontinued) break;
}
console.log('Done processing products');Usage Examples:
// ETL processing with async iteration
async function processLargeDataset() {
const stream = client.query(new QueryStream(
'SELECT * FROM raw_data WHERE processed = false ORDER BY created_at'
));
let processedCount = 0;
try {
for await (const row of stream) {
// Transform data
const transformed = transformData(row);
// Load to destination
await insertIntoDestination(transformed);
// Mark as processed
await client.query('UPDATE raw_data SET processed = true WHERE id = $1', [row.id]);
processedCount++;
if (processedCount % 1000 === 0) {
console.log(`Processed ${processedCount} records...`);
}
}
} catch (err) {
console.error('Processing failed:', err);
}
console.log(`Total processed: ${processedCount} records`);
}Pipe query results to other streams for processing.
const fs = require('fs');
const { Transform } = require('stream');
// Create transform stream
const csvTransform = new Transform({
objectMode: true,
transform(row, encoding, callback) {
const csvLine = `${row.id},${row.name},${row.email}\n`;
callback(null, csvLine);
}
});
// Pipe query to CSV file
const stream = client.query(new QueryStream('SELECT id, name, email FROM users'));
const writeStream = fs.createWriteStream('users.csv');
// Write CSV header
writeStream.write('id,name,email\n');
// Pipe data
stream
.pipe(csvTransform)
.pipe(writeStream);
// Handle completion
writeStream.on('finish', () => {
console.log('CSV export completed');
});Pipeline Processing:
const { pipeline } = require('stream');
const { promisify } = require('util');
const pipelineAsync = promisify(pipeline);
// Create processing pipeline
async function exportToCSV() {
const queryStream = client.query(new QueryStream('SELECT * FROM orders'));
const transformStream = new Transform({
objectMode: true,
transform(order, encoding, callback) {
const csvRow = `${order.id},${order.customer_id},${order.total},${order.created_at}\n`;
callback(null, csvRow);
}
});
const writeStream = fs.createWriteStream('orders.csv');
writeStream.write('id,customer_id,total,created_date\n');
try {
await pipelineAsync(queryStream, transformStream, writeStream);
console.log('Export completed successfully');
} catch (err) {
console.error('Export failed:', err);
}
}Control memory usage and performance characteristics.
// Large batch size for high-throughput scenarios
const fastStream = new QueryStream('SELECT * FROM logs', [], {
batchSize: 5000 // Fetch 5000 rows at a time
});
// Small batch size for real-time processing
const realtimeStream = new QueryStream('SELECT * FROM events', [], {
batchSize: 10 // Process rows as they arrive
});
// Direct high water mark control
const customStream = new QueryStream('SELECT * FROM data', [], {
highWaterMark: 50 // Buffer up to 50 rows
});Control how rows are returned from the stream.
// Object mode (default)
const objectStream = new QueryStream('SELECT id, name FROM users');
// Emits: { id: 1, name: 'Alice' }
// Array mode
const arrayStream = new QueryStream('SELECT id, name FROM users', [], {
rowMode: 'array'
});
// Emits: [1, 'Alice']
// Array mode with ordered columns
for await (const [id, name, email] of arrayStream) {
console.log(`User ${id}: ${name} (${email})`);
}Use custom type parsers for specialized data handling.
const customTypes = {
getTypeParser: (oid, format) => {
// Custom JSON parsing
if (oid === 114) { // JSON type
return (val) => {
try {
return JSON.parse(val);
} catch {
return val; // Return as string if parsing fails
}
};
}
// Custom timestamp parsing
if (oid === 1184) { // TIMESTAMPTZ
return (val) => new Date(val);
}
// Default parser
return require('pg').types.getTypeParser(oid, format);
}
};
const stream = new QueryStream('SELECT metadata, created_at FROM events', [], {
types: customTypes
});Export large datasets without memory overflow.
async function exportUserData() {
const stream = client.query(new QueryStream(
'SELECT * FROM users ORDER BY created_at',
[],
{ batchSize: 2000 }
));
const writeStream = fs.createWriteStream('users_export.json');
writeStream.write('[\n');
let first = true;
for await (const user of stream) {
if (!first) writeStream.write(',\n');
writeStream.write(JSON.stringify(user));
first = false;
}
writeStream.write('\n]');
writeStream.end();
console.log('Export completed');
}Process data in real-time with small batches.
async function processRecentEvents() {
const stream = client.query(new QueryStream(
'SELECT * FROM events WHERE created_at > NOW() - INTERVAL \'1 hour\'',
[],
{ batchSize: 1 } // Process immediately as available
));
for await (const event of stream) {
// Process each event immediately
await handleEvent(event);
// Update progress
await client.query(
'UPDATE event_processing SET last_processed_at = $1',
[event.created_at]
);
}
}Stream processing for data analysis.
async function analyzeUserBehavior() {
const stream = client.query(new QueryStream(
'SELECT user_id, action, timestamp FROM user_events ORDER BY timestamp'
));
const userSessions = new Map();
for await (const event of stream) {
const userId = event.user_id;
if (!userSessions.has(userId)) {
userSessions.set(userId, {
actions: [],
startTime: event.timestamp,
endTime: event.timestamp
});
}
const session = userSessions.get(userId);
session.actions.push(event.action);
session.endTime = event.timestamp;
// Finalize session if idle too long
const idleTime = Date.now() - new Date(event.timestamp).getTime();
if (idleTime > 30 * 60 * 1000) { // 30 minutes
await finalizeSession(userId, session);
userSessions.delete(userId);
}
}
// Finalize remaining sessions
for (const [userId, session] of userSessions) {
await finalizeSession(userId, session);
}
}const stream = client.query(new QueryStream('SELECT * FROM users'));
stream.on('error', (err) => {
console.error('Stream error:', err);
// Connection is still usable after stream error
// Perform cleanup or retry logic here
});
// Promise-based error handling with async iteration
try {
for await (const row of stream) {
await processRow(row);
}
} catch (err) {
console.error('Processing error:', err);
}let currentStream = null;
// Graceful shutdown handler
process.on('SIGTERM', () => {
if (currentStream) {
console.log('Destroying current stream...');
currentStream.destroy();
}
client.end();
});
// Usage with cleanup
async function processData() {
currentStream = client.query(new QueryStream('SELECT * FROM large_table'));
try {
for await (const row of currentStream) {
await processRow(row);
}
} finally {
currentStream = null;
}
}// Memory-efficient: Small batch sizes
const memoryEfficientStream = new QueryStream('SELECT * FROM huge_table', [], {
batchSize: 100 // Only 100 rows in memory at once
});
// High-throughput: Larger batch sizes
const highThroughputStream = new QueryStream('SELECT * FROM huge_table', [], {
batchSize: 10000 // Better performance, more memory usage
});const { Pool } = require('pg');
const pool = new Pool();
async function streamProcessing() {
const client = await pool.connect();
try {
const stream = client.query(new QueryStream('SELECT * FROM data'));
for await (const row of stream) {
await processRow(row);
}
} finally {
client.release(); // Return to pool
}
}Install with Tessl CLI
npx tessl i tessl/npm-pg