CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-pg

PostgreSQL client library for Node.js with both pure JavaScript and optional native libpq bindings

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

query-stream.mddocs/

Query Streaming

Node.js Readable stream interface for PostgreSQL query results, enabling memory-efficient processing of large result sets through standard stream APIs.

Capabilities

QueryStream Constructor

Creates a readable stream for PostgreSQL query results.

/**
 * Create a new query stream
 * @param text - SQL query text
 * @param values - Optional parameter values
 * @param config - Optional stream configuration
 */
class QueryStream extends Readable {
  constructor(text: string, values?: any[], config?: QueryStreamConfig);
}

interface QueryStreamConfig {
  /** Number of rows to fetch per batch (overrides highWaterMark) */
  batchSize?: number;
  /** Stream's high water mark (default: 100) */
  highWaterMark?: number;
  /** Return rows as arrays instead of objects */
  rowMode?: 'array';
  /** Custom type parser object */
  types?: any;
}

Usage Examples:

const QueryStream = require('pg-query-stream');
const { Client } = require('pg');

const client = new Client();
await client.connect();

// Basic query stream
const stream1 = new QueryStream('SELECT * FROM users');

// Parameterized query stream
const stream2 = new QueryStream('SELECT * FROM orders WHERE user_id = $1', [123]);

// Configured stream
const stream3 = new QueryStream('SELECT * FROM large_table', [], {
  batchSize: 1000,
  rowMode: 'array'
});

// Execute stream
const queryStream = client.query(stream1);

Stream Processing

Process query results using standard Node.js stream APIs.

// Standard Readable stream events
stream.on('data', (row) => {});     // Each row as it arrives
stream.on('end', () => {});         // All rows processed
stream.on('error', (err) => {});    // Stream error occurred
stream.on('close', () => {});       // Stream closed
stream.on('readable', () => {});    // Data available to read

// Stream control methods
stream.read(size?: number): any | null;
stream.pipe(destination: Writable): Writable;
stream.destroy(error?: Error): void;
stream.pause(): void;
stream.resume(): void;

Usage Examples:

// Event-driven processing
const stream = client.query(new QueryStream('SELECT * FROM users'));

stream.on('data', (row) => {
  console.log('User:', row.name, row.email);
});

stream.on('end', () => {
  console.log('Finished processing all users');
});

stream.on('error', (err) => {
  console.error('Query error:', err);
});

// Manual reading
stream.on('readable', () => {
  let row;
  while ((row = stream.read()) !== null) {
    processRow(row);
  }
});

Async Iteration

Use modern async iteration for clean streaming code (Node.js ≥10).

// Async iteration
const stream = client.query(new QueryStream('SELECT * FROM products'));

for await (const row of stream) {
  console.log('Product:', row.name, row.price);
  
  // Can break early if needed
  if (row.discontinued) break;
}

console.log('Done processing products');

Usage Examples:

// ETL processing with async iteration
async function processLargeDataset() {
  const stream = client.query(new QueryStream(
    'SELECT * FROM raw_data WHERE processed = false ORDER BY created_at'
  ));
  
  let processedCount = 0;
  
  try {
    for await (const row of stream) {
      // Transform data
      const transformed = transformData(row);
      
      // Load to destination
      await insertIntoDestination(transformed);
      
      // Mark as processed
      await client.query('UPDATE raw_data SET processed = true WHERE id = $1', [row.id]);
      
      processedCount++;
      
      if (processedCount % 1000 === 0) {
        console.log(`Processed ${processedCount} records...`);
      }
    }
  } catch (err) {
    console.error('Processing failed:', err);
  }
  
  console.log(`Total processed: ${processedCount} records`);
}

Stream Piping

Pipe query results to other streams for processing.

const fs = require('fs');
const { Transform } = require('stream');

// Create transform stream
const csvTransform = new Transform({
  objectMode: true,
  transform(row, encoding, callback) {
    const csvLine = `${row.id},${row.name},${row.email}\n`;
    callback(null, csvLine);
  }
});

// Pipe query to CSV file
const stream = client.query(new QueryStream('SELECT id, name, email FROM users'));
const writeStream = fs.createWriteStream('users.csv');

// Write CSV header
writeStream.write('id,name,email\n');

// Pipe data
stream
  .pipe(csvTransform)
  .pipe(writeStream);

// Handle completion
writeStream.on('finish', () => {
  console.log('CSV export completed');
});

Pipeline Processing:

const { pipeline } = require('stream');
const { promisify } = require('util');
const pipelineAsync = promisify(pipeline);

// Create processing pipeline
async function exportToCSV() {
  const queryStream = client.query(new QueryStream('SELECT * FROM orders'));
  
  const transformStream = new Transform({
    objectMode: true,
    transform(order, encoding, callback) {
      const csvRow = `${order.id},${order.customer_id},${order.total},${order.created_at}\n`;
      callback(null, csvRow);
    }
  });
  
  const writeStream = fs.createWriteStream('orders.csv');
  writeStream.write('id,customer_id,total,created_date\n');
  
  try {
    await pipelineAsync(queryStream, transformStream, writeStream);
    console.log('Export completed successfully');
  } catch (err) {
    console.error('Export failed:', err);
  }
}

Configuration Options

Batch Size and High Water Mark

Control memory usage and performance characteristics.

// Large batch size for high-throughput scenarios
const fastStream = new QueryStream('SELECT * FROM logs', [], {
  batchSize: 5000  // Fetch 5000 rows at a time
});

// Small batch size for real-time processing
const realtimeStream = new QueryStream('SELECT * FROM events', [], {
  batchSize: 10   // Process rows as they arrive
});

// Direct high water mark control
const customStream = new QueryStream('SELECT * FROM data', [], {
  highWaterMark: 50  // Buffer up to 50 rows
});

Row Mode Configuration

Control how rows are returned from the stream.

// Object mode (default)
const objectStream = new QueryStream('SELECT id, name FROM users');
// Emits: { id: 1, name: 'Alice' }

// Array mode
const arrayStream = new QueryStream('SELECT id, name FROM users', [], {
  rowMode: 'array'
});
// Emits: [1, 'Alice']

// Array mode with ordered columns
for await (const [id, name, email] of arrayStream) {
  console.log(`User ${id}: ${name} (${email})`);
}

Custom Type Parsing

Use custom type parsers for specialized data handling.

const customTypes = {
  getTypeParser: (oid, format) => {
    // Custom JSON parsing
    if (oid === 114) { // JSON type
      return (val) => {
        try {
          return JSON.parse(val);
        } catch {
          return val; // Return as string if parsing fails
        }
      };
    }
    
    // Custom timestamp parsing
    if (oid === 1184) { // TIMESTAMPTZ
      return (val) => new Date(val);
    }
    
    // Default parser
    return require('pg').types.getTypeParser(oid, format);
  }
};

const stream = new QueryStream('SELECT metadata, created_at FROM events', [], {
  types: customTypes
});

Usage Patterns

Large Data Export

Export large datasets without memory overflow.

async function exportUserData() {
  const stream = client.query(new QueryStream(
    'SELECT * FROM users ORDER BY created_at',
    [],
    { batchSize: 2000 }
  ));
  
  const writeStream = fs.createWriteStream('users_export.json');
  writeStream.write('[\n');
  
  let first = true;
  
  for await (const user of stream) {
    if (!first) writeStream.write(',\n');
    writeStream.write(JSON.stringify(user));
    first = false;
  }
  
  writeStream.write('\n]');
  writeStream.end();
  
  console.log('Export completed');
}

Real-time Processing

Process data in real-time with small batches.

async function processRecentEvents() {
  const stream = client.query(new QueryStream(
    'SELECT * FROM events WHERE created_at > NOW() - INTERVAL \'1 hour\'',
    [],
    { batchSize: 1 }  // Process immediately as available
  ));
  
  for await (const event of stream) {
    // Process each event immediately
    await handleEvent(event);
    
    // Update progress
    await client.query(
      'UPDATE event_processing SET last_processed_at = $1',
      [event.created_at]
    );
  }
}

Aggregation and Analysis

Stream processing for data analysis.

async function analyzeUserBehavior() {
  const stream = client.query(new QueryStream(
    'SELECT user_id, action, timestamp FROM user_events ORDER BY timestamp'
  ));
  
  const userSessions = new Map();
  
  for await (const event of stream) {
    const userId = event.user_id;
    
    if (!userSessions.has(userId)) {
      userSessions.set(userId, {
        actions: [],
        startTime: event.timestamp,
        endTime: event.timestamp
      });
    }
    
    const session = userSessions.get(userId);
    session.actions.push(event.action);
    session.endTime = event.timestamp;
    
    // Finalize session if idle too long
    const idleTime = Date.now() - new Date(event.timestamp).getTime();
    if (idleTime > 30 * 60 * 1000) { // 30 minutes
      await finalizeSession(userId, session);
      userSessions.delete(userId);
    }
  }
  
  // Finalize remaining sessions
  for (const [userId, session] of userSessions) {
    await finalizeSession(userId, session);
  }
}

Error Handling and Cleanup

Stream Error Handling

const stream = client.query(new QueryStream('SELECT * FROM users'));

stream.on('error', (err) => {
  console.error('Stream error:', err);
  
  // Connection is still usable after stream error
  // Perform cleanup or retry logic here
});

// Promise-based error handling with async iteration
try {
  for await (const row of stream) {
    await processRow(row);
  }
} catch (err) {
  console.error('Processing error:', err);
}

Graceful Shutdown

let currentStream = null;

// Graceful shutdown handler
process.on('SIGTERM', () => {
  if (currentStream) {
    console.log('Destroying current stream...');
    currentStream.destroy();
  }
  
  client.end();
});

// Usage with cleanup
async function processData() {
  currentStream = client.query(new QueryStream('SELECT * FROM large_table'));
  
  try {
    for await (const row of currentStream) {
      await processRow(row);
    }
  } finally {
    currentStream = null;
  }
}

Performance Considerations

Memory Usage

// Memory-efficient: Small batch sizes
const memoryEfficientStream = new QueryStream('SELECT * FROM huge_table', [], {
  batchSize: 100  // Only 100 rows in memory at once
});

// High-throughput: Larger batch sizes
const highThroughputStream = new QueryStream('SELECT * FROM huge_table', [], {
  batchSize: 10000  // Better performance, more memory usage
});

Connection Pool Usage

const { Pool } = require('pg');
const pool = new Pool();

async function streamProcessing() {
  const client = await pool.connect();
  
  try {
    const stream = client.query(new QueryStream('SELECT * FROM data'));
    
    for await (const row of stream) {
      await processRow(row);
    }
    
  } finally {
    client.release(); // Return to pool
  }
}

Install with Tessl CLI

npx tessl i tessl/npm-pg

docs

client.md

connection-string.md

cursor.md

index.md

pool.md

query-stream.md

query.md

types.md

utilities.md

tile.json