CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-pg-copy-streams

Low-level streaming interfaces for PostgreSQL's COPY operations in Node.js

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

copy-to.mddocs/

COPY TO Operations (Data Export)

Creates readable streams for exporting data from PostgreSQL tables to external destinations. The COPY TO operation streams data directly from PostgreSQL without loading entire datasets into memory.

Capabilities

To Function

Creates a readable stream for COPY TO operations.

/**
 * Creates a readable stream for COPY TO operations
 * @param {string} text - SQL COPY TO statement (e.g., 'COPY my_table TO STDOUT')
 * @param {object} [options] - Optional stream configuration
 * @returns {CopyToQueryStream} CopyToQueryStream instance
 */
function to(text, options);

Usage Examples:

const { to: copyTo } = require('pg-copy-streams');
const { Pool } = require('pg');

// Basic table export
const pool = new Pool();
const client = await pool.connect();
const stream = client.query(copyTo('COPY users TO STDOUT'));

// Export with CSV format
const csvStream = client.query(copyTo("COPY products TO STDOUT WITH (FORMAT CSV, HEADER)"));

// Export with custom delimiter
const tsvStream = client.query(copyTo("COPY logs TO STDOUT WITH (FORMAT TEXT, DELIMITER E'\\t')"));

CopyToQueryStream Class

Readable stream implementation for COPY TO operations.

/**
 * CopyToQueryStream - Readable stream for COPY TO operations
 * @class
 * @extends {Readable}
 */
class CopyToQueryStream {
  /**
   * @param {string} text - The SQL COPY TO statement
   * @param {object} [options] - Stream options
   */
  constructor(text, options) {}
  
  /** @type {string} The SQL COPY TO statement */
  text;
  
  /** @type {number} Number of rows exported (available after completion) */
  rowCount;
  
  /** @type {object} PostgreSQL connection reference */
  connection;
}

Stream Submission

Submits the COPY TO query to a PostgreSQL connection.

/**
 * Submits the COPY TO query to a PostgreSQL connection
 * @param {object} connection - pg connection object
 */
submit(connection);

Usage Example:

const { Pool } = require('pg');
const { to: copyTo } = require('pg-copy-streams');

const pool = new Pool();
const client = await pool.connect();
try {
  const stream = client.query(copyTo('COPY my_table TO STDOUT'));
  
  // Pipe to file
  const fs = require('fs');
  const writeStream = fs.createWriteStream('export.csv');
  stream.pipe(writeStream);
  
  // Wait for completion
  await new Promise((resolve, reject) => {
    stream.on('end', () => {
      console.log(`Exported ${stream.rowCount} rows`);
      resolve();
    });
    stream.on('error', reject);
  });
} finally {
  client.release();
}

Error Handling

Handles errors during the COPY TO operation.

/**
 * Handles errors during stream processing
 * @param {Error} err - Error object
 */
handleError(err);

Command Completion

Processes the CommandComplete message and extracts row count.

/**
 * Processes CommandComplete message and extracts row count
 * @param {object} msg - Message object with text property containing row count
 */
handleCommandComplete(msg);

Ready for Query

Called when the connection is ready for the next query.

/**
 * Called when connection is ready for next query
 */
handleReadyForQuery();

Stream Events

As a readable stream, CopyToQueryStream emits standard Node.js stream events:

/**
 * Standard readable stream events
 */
stream.on('data', (chunk) => {
  // Received data chunk from PostgreSQL
  console.log(`Received ${chunk.length} bytes`);
});

stream.on('end', () => {
  // All data has been read, stream is finished
  console.log(`Export completed. Rows: ${stream.rowCount}`);
});

stream.on('error', (err) => {
  // Error occurred during streaming
  console.error('Stream error:', err.message);
});

stream.on('close', () => {
  // Stream has been closed
  console.log('Stream closed');
});

Advanced Usage

Streaming with CSV Parser

const { pipeline } = require('stream/promises');
const csvParser = require('csv-parser');
const { to: copyTo } = require('pg-copy-streams');

const client = await pool.connect();
const copyStream = client.query(copyTo("COPY products TO STDOUT WITH (FORMAT CSV, HEADER)"));

const results = [];
await pipeline(
  copyStream,
  csvParser(),
  async function* (source) {
    for await (const chunk of source) {
      // Process each parsed CSV row
      yield { ...chunk, processed: true };
    }
  },
  async function (source) {
    for await (const row of source) {
      results.push(row);
    }
  }
);

console.log(`Processed ${results.length} rows`);

Binary Format Export

const binaryStream = client.query(copyTo("COPY data_table TO STDOUT WITH (FORMAT BINARY)"));

// Binary data requires special handling
binaryStream.on('data', (chunk) => {
  // chunk contains binary-encoded PostgreSQL data
  console.log(`Received ${chunk.length} bytes`);
});

Important Notes

  • PostgreSQL chunks data on 64kB boundaries, which may cut across row boundaries
  • The rowCount property is only available after the stream ends
  • Always handle both 'end' and 'error' events
  • Use proper backpressure handling when piping to slower destinations
  • Binary format exports require knowledge of PostgreSQL's binary encoding

Install with Tessl CLI

npx tessl i tessl/npm-pg-copy-streams

docs

copy-both.md

copy-from.md

copy-to.md

index.md

tile.json