Postgres query result returned as readable stream
npx @tessl/cli install tessl/npm-pg-query-stream@4.10.0pg-query-stream provides memory-efficient streaming of PostgreSQL query results using server-side cursors. It extends Node.js Readable streams to handle massive result sets without loading all data into memory, making it ideal for ETL operations and processing large tables.
npm install pg-query-streampg (^8) - the PostgreSQL client libraryimport QueryStream from "pg-query-stream";For CommonJS:
const QueryStream = require("pg-query-stream");import pg from "pg";
import QueryStream from "pg-query-stream";
const client = new pg.Client();
await client.connect();
// Stream large result set without memory issues
const query = new QueryStream('SELECT * FROM large_table', []);
const stream = client.query(query);
stream.on('data', (row) => {
console.log(row); // Process each row as it arrives
});
stream.on('end', () => {
console.log('Stream completed');
client.end();
});
stream.on('error', (err) => {
console.error('Stream error:', err);
});pg-query-stream is built on these key components:
pg-cursor for efficient memory usageReadable stream for standard stream operationsSubmittable interface for seamless pg client compatibilityCreate a streaming query that reads results in batches using PostgreSQL cursors.
class QueryStream extends Readable implements Submittable {
constructor(
text: string,
values?: any[],
config?: QueryStreamConfig
);
}Configure memory usage and result format for the query stream.
interface QueryStreamConfig {
batchSize?: number; // Number of rows to fetch per batch
highWaterMark?: number; // Stream buffer size (defaults to 100)
rowMode?: 'array'; // Return rows as arrays instead of objects
types?: any; // Custom type parsers for result columns
}Submit the query to a PostgreSQL connection for execution.
submit(connection: Connection): void;Access underlying cursor and result metadata.
cursor: any; // The underlying pg-cursor instance
_result: any; // Query result metadata from pg-cursorimport pg from "pg";
import QueryStream from "pg-query-stream";
const pool = new pg.Pool();
pool.connect((err, client, done) => {
if (err) throw err;
const query = new QueryStream('SELECT * FROM users', []);
const stream = client.query(query);
stream.on('data', (row) => {
// Process each user row
console.log(`User: ${row.name}, Email: ${row.email}`);
});
stream.on('end', done);
stream.on('error', (err) => {
done();
throw err;
});
});const query = new QueryStream(
'SELECT id, data FROM large_table WHERE active = $1',
[true],
{
batchSize: 50, // Fetch 50 rows at a time
highWaterMark: 25, // Buffer up to 25 rows
rowMode: 'array' // Return [id, data] instead of {id, data}
}
);
const stream = client.query(query);
stream.on('data', (row) => {
const [id, data] = row; // Array destructuring
console.log(`ID: ${id}, Data: ${data}`);
});const query = new QueryStream('SELECT * FROM products', []);
const stream = client.query(query);
for await (const row of stream) {
// Process each product
console.log(`Product: ${row.name}, Price: ${row.price}`);
}import { Transform } from 'stream';
const query = new QueryStream('SELECT * FROM orders', []);
const stream = client.query(query);
const transform = new Transform({
objectMode: true,
transform(row, encoding, callback) {
// Transform each order
const transformed = {
orderId: row.id,
total: parseFloat(row.total),
date: new Date(row.created_at)
};
callback(null, transformed);
}
});
stream.pipe(transform).on('data', (order) => {
console.log('Processed order:', order);
});const customTypes = {
getTypeParser: (oid: number) => {
// Custom parser for specific PostgreSQL types
if (oid === 1184) { // timestamptz
return (value: string) => new Date(value);
}
return (value: string) => value;
}
};
const query = new QueryStream(
'SELECT created_at, data FROM events',
[],
{ types: customTypes }
);pg-query-stream emits all standard Node.js Readable stream events:
The stream maintains a small memory footprint by:
const query = new QueryStream('SELECT * FROM table', []);
const stream = client.query(query);
stream.on('error', (err) => {
console.error('Query error:', err.message);
// Stream automatically destroys itself on error
});
// Handle connection errors separately
client.on('error', (err) => {
console.error('Client error:', err.message);
});