or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

copy-both.mdcopy-from.mdcopy-to.mdindex.md
tile.json

tessl/npm-pg-copy-streams

Low-level streaming interfaces for PostgreSQL's COPY operations in Node.js

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/pg-copy-streams@7.0.x

To install, run

npx @tessl/cli install tessl/npm-pg-copy-streams@7.0.0

index.mddocs/

pg-copy-streams

pg-copy-streams provides low-level streaming interfaces for PostgreSQL's COPY TO and COPY FROM operations. It enables high-performance data transfer directly between Node.js applications and PostgreSQL databases through three specialized stream types: readable streams for exporting data, writable streams for importing data, and duplex streams for bidirectional operations.

Package Information

  • Package Name: pg-copy-streams
  • Package Type: npm
  • Language: JavaScript
  • Installation: npm install pg-copy-streams

Core Imports

CommonJS (Node.js default):

const { to, from, both } = require('pg-copy-streams');

// Alternative destructuring with aliases
const { to: copyTo, from: copyFrom, both: copyBoth } = require('pg-copy-streams');

// Import entire module
const pgCopyStreams = require('pg-copy-streams');
const copyTo = pgCopyStreams.to;

ES Modules (when using "type": "module" in package.json):

import { to, from, both } from 'pg-copy-streams';

// Alternative with aliases
import { to as copyTo, from as copyFrom, both as copyBoth } from 'pg-copy-streams';

// Default import (not available - module uses named exports only)
// import pgCopyStreams from 'pg-copy-streams'; // ❌ This won't work

Basic Usage

const { Pool } = require('pg');
const { to: copyTo, from: copyFrom } = require('pg-copy-streams');

const pool = new Pool();

// Export data from PostgreSQL (COPY TO)
pool.connect((err, client, done) => {
  const stream = client.query(copyTo('COPY my_table TO STDOUT'));
  stream.pipe(process.stdout);
  stream.on('end', done);
  stream.on('error', done);
});

// Import data into PostgreSQL (COPY FROM)
const fs = require('fs');
pool.connect((err, client, done) => {
  const stream = client.query(copyFrom('COPY my_table FROM STDIN'));
  const fileStream = fs.createReadStream('data.csv');
  fileStream.pipe(stream);
  stream.on('finish', done);
  stream.on('error', done);
});

Architecture

pg-copy-streams is built around three core components:

  • Stream Factory Functions: Entry-point functions (to, from, both) that create specialized stream instances
  • Stream Classes: Three stream implementations extending Node.js stream classes (Readable, Writable, Duplex)
  • Protocol Integration: Low-level PostgreSQL protocol handling for COPY operations with message parsing and connection management
  • Node-postgres Integration: Seamless integration with the pg library connection and query system

Capabilities

COPY TO Operations (Data Export)

Creates readable streams for exporting data from PostgreSQL tables to external destinations. Ideal for backup operations, data extraction, and ETL processes.

/**
 * Creates a readable stream for COPY TO operations
 * @param {string} text - SQL COPY TO statement
 * @param {object} [options] - Optional stream configuration 
 * @returns {CopyToQueryStream} Readable stream instance
 */
function to(text, options);

COPY TO Operations

COPY FROM Operations (Data Import)

Creates writable streams for importing data from external sources into PostgreSQL tables. Perfect for bulk data loading, CSV imports, and ETL pipelines.

/**
 * Creates a writable stream for COPY FROM operations
 * @param {string} text - SQL COPY FROM statement
 * @param {object} [options] - Optional stream configuration
 * @returns {CopyFromQueryStream} Writable stream instance
 */
function from(text, options);

COPY FROM Operations

COPY BOTH Operations (Bidirectional Streaming)

Creates duplex streams for bidirectional data operations, primarily used for replication and logical decoding scenarios.

/**
 * Creates a duplex stream for COPY BOTH operations
 * @param {string} text - SQL COPY statement
 * @param {object} [options] - Optional stream configuration
 * @returns {CopyBothQueryStream} Duplex stream instance
 */
function both(text, options);

COPY BOTH Operations

Types

/**
 * @typedef {object} StreamOptions
 * @property {number} [highWaterMark] - Stream high water mark
 * @property {boolean} [objectMode] - Object mode flag
 * @property {string} [encoding] - Stream encoding
 */

/**
 * @typedef {StreamOptions} CopyBothOptions
 * @property {boolean} [alignOnCopyDataFrame] - Whether to align data on COPY data frame boundaries
 */

/**
 * CopyToQueryStream - Readable stream for COPY TO operations
 * @class
 * @extends {Readable}
 */
class CopyToQueryStream {
  /**
   * @param {string} text - The SQL COPY TO statement
   * @param {StreamOptions} [options] - Stream options
   */
  constructor(text, options) {}
  
  /** @type {string} The SQL COPY statement */
  text;
  
  /** @type {number} Number of rows processed */
  rowCount;
  
  /** @type {object} PostgreSQL connection reference */
  connection;
  
  /**
   * Submits the query to a PostgreSQL connection
   * @param {object} connection - pg connection object
   */
  submit(connection) {}
  
  /**
   * Handles errors during stream processing
   * @param {Error} err - Error object
   */
  handleError(err) {}
  
  /**
   * Processes CommandComplete message and extracts row count
   * @param {object} msg - Message object with text property
   */
  handleCommandComplete(msg) {}
  
  /**
   * Called when connection is ready for next query
   */
  handleReadyForQuery() {}
}

/**
 * CopyFromQueryStream - Writable stream for COPY FROM operations
 * @class
 * @extends {Writable}
 */
class CopyFromQueryStream {
  /**
   * @param {string} text - The SQL COPY FROM statement
   * @param {StreamOptions} [options] - Stream options
   */
  constructor(text, options) {}
  
  /** @type {string} The SQL COPY statement */
  text;
  
  /** @type {number} Number of rows processed */
  rowCount;
  
  /** @type {object} PostgreSQL connection reference */
  connection;
  
  /**
   * Submits the query to a PostgreSQL connection
   * @param {object} connection - pg connection object
   */
  submit(connection) {}
  
  /**
   * Callback placeholder for pg timeout mechanism
   */
  callback() {}
  
  /**
   * Handles errors during stream processing
   * @param {Error} e - Error object
   */
  handleError(e) {}
  
  /**
   * Handles CopyInResponse from PostgreSQL backend
   * @param {object} connection - pg connection object
   */
  handleCopyInResponse(connection) {}
  
  /**
   * Processes CommandComplete message and extracts row count
   * @param {object} msg - Message object with text property
   */
  handleCommandComplete(msg) {}
  
  /**
   * Called when connection is ready for next query
   */
  handleReadyForQuery() {}
}

/**
 * CopyBothQueryStream - Duplex stream for COPY BOTH operations
 * @class
 * @extends {Duplex}
 */
class CopyBothQueryStream {
  /**
   * @param {string} text - The SQL COPY statement
   * @param {CopyBothOptions} [options] - Stream options
   */
  constructor(text, options) {}
  
  /** @type {string} The SQL COPY statement */
  text;
  
  /** @type {object} PostgreSQL connection reference */
  connection;
  
  /** @type {boolean} Whether to align data on COPY data frame boundaries */
  alignOnCopyDataFrame;
  
  /**
   * Submits the query to a PostgreSQL connection
   * @param {object} connection - pg connection object
   */
  submit(connection) {}
  
  /**
   * Handles errors during stream processing
   * @param {Error} err - Error object
   */
  handleError(err) {}
  
  /**
   * Handles out-of-band CopyData messages
   * @param {Buffer} chunk - Data chunk
   */
  handleCopyData(chunk) {}
  
  /**
   * Called when command completes
   */
  handleCommandComplete() {}
  
  /**
   * Called when connection is ready for next query
   */
  handleReadyForQuery() {}
}

Error Handling

pg-copy-streams provides comprehensive error handling for various failure scenarios:

Common Error Types

  • Syntax Errors: Invalid SQL COPY statements
  • Connection Errors: Database connection failures
  • Data Format Errors: Malformed data that PostgreSQL cannot process
  • Permission Errors: Insufficient database permissions
  • Stream Errors: Network interruptions or stream corruption

Error Recovery

  • Automatic Rollback: Failed COPY operations are automatically rolled back by PostgreSQL
  • CopyFail Messages: Stream destruction automatically sends CopyFail to abort operations
  • Transaction Safety: Operations within transactions maintain ACID properties

Compatibility and Performance

PostgreSQL Compatibility

  • PostgreSQL Versions: Compatible with PostgreSQL 9.0+
  • Protocol Support: Uses PostgreSQL's native COPY protocol for optimal performance
  • Pure JavaScript: Works only with pure JavaScript pg driver (not pg.native)

Performance Characteristics

  • High Throughput: Direct protocol integration bypasses query parsing overhead
  • Memory Efficient: Streaming operations avoid loading entire datasets into memory
  • Chunk Boundaries: PostgreSQL chunks data on 64kB boundaries (may split rows)
  • Backpressure Handling: Proper Node.js stream backpressure support

Node.js Requirements

  • Node.js Version: Requires Node.js 8.0+ (async/await support recommended)
  • Stream API: Built on Node.js streams for maximum compatibility
  • Buffer Handling: Uses modern Buffer allocation methods for security