or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/npm-pg-query-stream

Postgres query result returned as readable stream

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/pg-query-stream@4.10.x

To install, run

npx @tessl/cli install tessl/npm-pg-query-stream@4.10.0

index.mddocs/

pg-query-stream

pg-query-stream provides memory-efficient streaming of PostgreSQL query results using server-side cursors. It extends Node.js Readable streams to handle massive result sets without loading all data into memory, making it ideal for ETL operations and processing large tables.

Package Information

  • Package Name: pg-query-stream
  • Package Type: npm
  • Language: TypeScript
  • Installation: npm install pg-query-stream
  • Peer Dependency: Requires pg (^8) - the PostgreSQL client library

Core Imports

import QueryStream from "pg-query-stream";

For CommonJS:

const QueryStream = require("pg-query-stream");

Basic Usage

import pg from "pg";
import QueryStream from "pg-query-stream";

const client = new pg.Client();
await client.connect();

// Stream large result set without memory issues
const query = new QueryStream('SELECT * FROM large_table', []);
const stream = client.query(query);

stream.on('data', (row) => {
  console.log(row); // Process each row as it arrives
});

stream.on('end', () => {
  console.log('Stream completed');
  client.end();
});

stream.on('error', (err) => {
  console.error('Stream error:', err);
});

Architecture

pg-query-stream is built on these key components:

  • Cursor-based Streaming: Uses PostgreSQL cursors via pg-cursor for efficient memory usage
  • Stream Integration: Extends Node.js Readable stream for standard stream operations
  • pg Client Integration: Implements Submittable interface for seamless pg client compatibility
  • Configurable Batching: Controls memory usage through batch size and high water mark settings

Capabilities

Query Stream Creation

Create a streaming query that reads results in batches using PostgreSQL cursors.

class QueryStream extends Readable implements Submittable {
  constructor(
    text: string,
    values?: any[],
    config?: QueryStreamConfig
  );
}

Stream Configuration

Configure memory usage and result format for the query stream.

interface QueryStreamConfig {
  batchSize?: number;        // Number of rows to fetch per batch
  highWaterMark?: number;    // Stream buffer size (defaults to 100)
  rowMode?: 'array';         // Return rows as arrays instead of objects
  types?: any;               // Custom type parsers for result columns
}

Database Integration

Submit the query to a PostgreSQL connection for execution.

submit(connection: Connection): void;

Stream Properties

Access underlying cursor and result metadata.

cursor: any;      // The underlying pg-cursor instance
_result: any;     // Query result metadata from pg-cursor

Usage Examples

Basic Streaming

import pg from "pg";
import QueryStream from "pg-query-stream";

const pool = new pg.Pool();

pool.connect((err, client, done) => {
  if (err) throw err;
  
  const query = new QueryStream('SELECT * FROM users', []);
  const stream = client.query(query);
  
  stream.on('data', (row) => {
    // Process each user row
    console.log(`User: ${row.name}, Email: ${row.email}`);
  });
  
  stream.on('end', done);
  stream.on('error', (err) => {
    done();
    throw err;
  });
});

With Configuration Options

const query = new QueryStream(
  'SELECT id, data FROM large_table WHERE active = $1',
  [true],
  {
    batchSize: 50,        // Fetch 50 rows at a time
    highWaterMark: 25,    // Buffer up to 25 rows
    rowMode: 'array'      // Return [id, data] instead of {id, data}
  }
);

const stream = client.query(query);
stream.on('data', (row) => {
  const [id, data] = row; // Array destructuring
  console.log(`ID: ${id}, Data: ${data}`);
});

Async Iteration (Node.js 10+)

const query = new QueryStream('SELECT * FROM products', []);
const stream = client.query(query);

for await (const row of stream) {
  // Process each product
  console.log(`Product: ${row.name}, Price: ${row.price}`);
}

Piping to Transform Streams

import { Transform } from 'stream';

const query = new QueryStream('SELECT * FROM orders', []);
const stream = client.query(query);

const transform = new Transform({
  objectMode: true,
  transform(row, encoding, callback) {
    // Transform each order
    const transformed = {
      orderId: row.id,
      total: parseFloat(row.total),
      date: new Date(row.created_at)
    };
    callback(null, transformed);
  }
});

stream.pipe(transform).on('data', (order) => {
  console.log('Processed order:', order);
});

Custom Type Parsing

const customTypes = {
  getTypeParser: (oid: number) => {
    // Custom parser for specific PostgreSQL types
    if (oid === 1184) { // timestamptz
      return (value: string) => new Date(value);
    }
    return (value: string) => value;
  }
};

const query = new QueryStream(
  'SELECT created_at, data FROM events',
  [],
  { types: customTypes }
);

Stream Events

pg-query-stream emits all standard Node.js Readable stream events:

  • 'data': Emitted for each row returned from the query
  • 'end': Emitted when all rows have been read
  • 'error': Emitted when a query or cursor error occurs
  • 'close': Emitted when the stream is destroyed
  • 'readable': Emitted when data is available to read

Memory Management

The stream maintains a small memory footprint by:

  • Cursor-based fetching: Only keeps a small batch of rows in memory
  • Configurable batch size: Control how many rows are fetched at once
  • Stream backpressure: Automatically pauses fetching when downstream is slow
  • Automatic cleanup: Cursor is closed when stream ends or is destroyed

Error Handling

const query = new QueryStream('SELECT * FROM table', []);
const stream = client.query(query);

stream.on('error', (err) => {
  console.error('Query error:', err.message);
  // Stream automatically destroys itself on error
});

// Handle connection errors separately
client.on('error', (err) => {
  console.error('Client error:', err.message);
});

Performance Considerations

  • Batch Size: Larger batches reduce network roundtrips but use more memory
  • High Water Mark: Controls internal buffering; tune based on processing speed
  • Row Mode: Array mode is slightly faster than object mode for simple data
  • Cursor Overhead: Small performance cost compared to loading all results in memory
  • Network Latency: Batch size becomes more important with high network latency

Limitations

  • JavaScript Client Only: Does not work with native libpq bindings
  • Read-Only: Stream is read-only; no updates through the stream
  • Single Query: Each stream handles one query; create new instances for multiple queries
  • Connection Binding: Stream must complete before connection can be used for other queries