CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-pg-query-stream

Postgres query result returned as readable stream

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

pg-query-stream

pg-query-stream provides memory-efficient streaming of PostgreSQL query results using server-side cursors. It extends Node.js Readable streams to handle massive result sets without loading all data into memory, making it ideal for ETL operations and processing large tables.

Package Information

  • Package Name: pg-query-stream
  • Package Type: npm
  • Language: TypeScript
  • Installation: npm install pg-query-stream
  • Peer Dependency: Requires pg (^8) - the PostgreSQL client library

Core Imports

import QueryStream from "pg-query-stream";

For CommonJS:

const QueryStream = require("pg-query-stream");

Basic Usage

import pg from "pg";
import QueryStream from "pg-query-stream";

const client = new pg.Client();
await client.connect();

// Stream large result set without memory issues
const query = new QueryStream('SELECT * FROM large_table', []);
const stream = client.query(query);

stream.on('data', (row) => {
  console.log(row); // Process each row as it arrives
});

stream.on('end', () => {
  console.log('Stream completed');
  client.end();
});

stream.on('error', (err) => {
  console.error('Stream error:', err);
});

Architecture

pg-query-stream is built on these key components:

  • Cursor-based Streaming: Uses PostgreSQL cursors via pg-cursor for efficient memory usage
  • Stream Integration: Extends Node.js Readable stream for standard stream operations
  • pg Client Integration: Implements Submittable interface for seamless pg client compatibility
  • Configurable Batching: Controls memory usage through batch size and high water mark settings

Capabilities

Query Stream Creation

Create a streaming query that reads results in batches using PostgreSQL cursors.

class QueryStream extends Readable implements Submittable {
  constructor(
    text: string,
    values?: any[],
    config?: QueryStreamConfig
  );
}

Stream Configuration

Configure memory usage and result format for the query stream.

interface QueryStreamConfig {
  batchSize?: number;        // Number of rows to fetch per batch
  highWaterMark?: number;    // Stream buffer size (defaults to 100)
  rowMode?: 'array';         // Return rows as arrays instead of objects
  types?: any;               // Custom type parsers for result columns
}

Database Integration

Submit the query to a PostgreSQL connection for execution.

submit(connection: Connection): void;

Stream Properties

Access underlying cursor and result metadata.

cursor: any;      // The underlying pg-cursor instance
_result: any;     // Query result metadata from pg-cursor

Usage Examples

Basic Streaming

import pg from "pg";
import QueryStream from "pg-query-stream";

const pool = new pg.Pool();

pool.connect((err, client, done) => {
  if (err) throw err;
  
  const query = new QueryStream('SELECT * FROM users', []);
  const stream = client.query(query);
  
  stream.on('data', (row) => {
    // Process each user row
    console.log(`User: ${row.name}, Email: ${row.email}`);
  });
  
  stream.on('end', done);
  stream.on('error', (err) => {
    done();
    throw err;
  });
});

With Configuration Options

const query = new QueryStream(
  'SELECT id, data FROM large_table WHERE active = $1',
  [true],
  {
    batchSize: 50,        // Fetch 50 rows at a time
    highWaterMark: 25,    // Buffer up to 25 rows
    rowMode: 'array'      // Return [id, data] instead of {id, data}
  }
);

const stream = client.query(query);
stream.on('data', (row) => {
  const [id, data] = row; // Array destructuring
  console.log(`ID: ${id}, Data: ${data}`);
});

Async Iteration (Node.js 10+)

const query = new QueryStream('SELECT * FROM products', []);
const stream = client.query(query);

for await (const row of stream) {
  // Process each product
  console.log(`Product: ${row.name}, Price: ${row.price}`);
}

Piping to Transform Streams

import { Transform } from 'stream';

const query = new QueryStream('SELECT * FROM orders', []);
const stream = client.query(query);

const transform = new Transform({
  objectMode: true,
  transform(row, encoding, callback) {
    // Transform each order
    const transformed = {
      orderId: row.id,
      total: parseFloat(row.total),
      date: new Date(row.created_at)
    };
    callback(null, transformed);
  }
});

stream.pipe(transform).on('data', (order) => {
  console.log('Processed order:', order);
});

Custom Type Parsing

const customTypes = {
  getTypeParser: (oid: number) => {
    // Custom parser for specific PostgreSQL types
    if (oid === 1184) { // timestamptz
      return (value: string) => new Date(value);
    }
    return (value: string) => value;
  }
};

const query = new QueryStream(
  'SELECT created_at, data FROM events',
  [],
  { types: customTypes }
);

Stream Events

pg-query-stream emits all standard Node.js Readable stream events:

  • 'data': Emitted for each row returned from the query
  • 'end': Emitted when all rows have been read
  • 'error': Emitted when a query or cursor error occurs
  • 'close': Emitted when the stream is destroyed
  • 'readable': Emitted when data is available to read

Memory Management

The stream maintains a small memory footprint by:

  • Cursor-based fetching: Only keeps a small batch of rows in memory
  • Configurable batch size: Control how many rows are fetched at once
  • Stream backpressure: Automatically pauses fetching when downstream is slow
  • Automatic cleanup: Cursor is closed when stream ends or is destroyed

Error Handling

const query = new QueryStream('SELECT * FROM table', []);
const stream = client.query(query);

stream.on('error', (err) => {
  console.error('Query error:', err.message);
  // Stream automatically destroys itself on error
});

// Handle connection errors separately
client.on('error', (err) => {
  console.error('Client error:', err.message);
});

Performance Considerations

  • Batch Size: Larger batches reduce network roundtrips but use more memory
  • High Water Mark: Controls internal buffering; tune based on processing speed
  • Row Mode: Array mode is slightly faster than object mode for simple data
  • Cursor Overhead: Small performance cost compared to loading all results in memory
  • Network Latency: Batch size becomes more important with high network latency

Limitations

  • JavaScript Client Only: Does not work with native libpq bindings
  • Read-Only: Stream is read-only; no updates through the stream
  • Single Query: Each stream handles one query; create new instances for multiple queries
  • Connection Binding: Stream must complete before connection can be used for other queries
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/pg-query-stream@4.10.x
Publish Source
CLI
Badge
tessl/npm-pg-query-stream badge