Low-level streaming interfaces for PostgreSQL's COPY operations in Node.js
npx @tessl/cli install tessl/npm-pg-copy-streams@7.0.0pg-copy-streams provides low-level streaming interfaces for PostgreSQL's COPY TO and COPY FROM operations. It enables high-performance data transfer directly between Node.js applications and PostgreSQL databases through three specialized stream types: readable streams for exporting data, writable streams for importing data, and duplex streams for bidirectional operations.
npm install pg-copy-streamsCommonJS (Node.js default):
const { to, from, both } = require('pg-copy-streams');
// Alternative destructuring with aliases
const { to: copyTo, from: copyFrom, both: copyBoth } = require('pg-copy-streams');
// Import entire module
const pgCopyStreams = require('pg-copy-streams');
const copyTo = pgCopyStreams.to;ES Modules (when using "type": "module" in package.json):
import { to, from, both } from 'pg-copy-streams';
// Alternative with aliases
import { to as copyTo, from as copyFrom, both as copyBoth } from 'pg-copy-streams';
// Default import (not available - module uses named exports only)
// import pgCopyStreams from 'pg-copy-streams'; // ❌ This won't workconst { Pool } = require('pg');
const { to: copyTo, from: copyFrom } = require('pg-copy-streams');
const pool = new Pool();
// Export data from PostgreSQL (COPY TO)
pool.connect((err, client, done) => {
const stream = client.query(copyTo('COPY my_table TO STDOUT'));
stream.pipe(process.stdout);
stream.on('end', done);
stream.on('error', done);
});
// Import data into PostgreSQL (COPY FROM)
const fs = require('fs');
pool.connect((err, client, done) => {
const stream = client.query(copyFrom('COPY my_table FROM STDIN'));
const fileStream = fs.createReadStream('data.csv');
fileStream.pipe(stream);
stream.on('finish', done);
stream.on('error', done);
});pg-copy-streams is built around three core components:
to, from, both) that create specialized stream instancespg library connection and query systemCreates readable streams for exporting data from PostgreSQL tables to external destinations. Ideal for backup operations, data extraction, and ETL processes.
/**
* Creates a readable stream for COPY TO operations
* @param {string} text - SQL COPY TO statement
* @param {object} [options] - Optional stream configuration
* @returns {CopyToQueryStream} Readable stream instance
*/
function to(text, options);Creates writable streams for importing data from external sources into PostgreSQL tables. Perfect for bulk data loading, CSV imports, and ETL pipelines.
/**
* Creates a writable stream for COPY FROM operations
* @param {string} text - SQL COPY FROM statement
* @param {object} [options] - Optional stream configuration
* @returns {CopyFromQueryStream} Writable stream instance
*/
function from(text, options);Creates duplex streams for bidirectional data operations, primarily used for replication and logical decoding scenarios.
/**
* Creates a duplex stream for COPY BOTH operations
* @param {string} text - SQL COPY statement
* @param {object} [options] - Optional stream configuration
* @returns {CopyBothQueryStream} Duplex stream instance
*/
function both(text, options);/**
* @typedef {object} StreamOptions
* @property {number} [highWaterMark] - Stream high water mark
* @property {boolean} [objectMode] - Object mode flag
* @property {string} [encoding] - Stream encoding
*/
/**
* @typedef {StreamOptions} CopyBothOptions
* @property {boolean} [alignOnCopyDataFrame] - Whether to align data on COPY data frame boundaries
*/
/**
* CopyToQueryStream - Readable stream for COPY TO operations
* @class
* @extends {Readable}
*/
class CopyToQueryStream {
/**
* @param {string} text - The SQL COPY TO statement
* @param {StreamOptions} [options] - Stream options
*/
constructor(text, options) {}
/** @type {string} The SQL COPY statement */
text;
/** @type {number} Number of rows processed */
rowCount;
/** @type {object} PostgreSQL connection reference */
connection;
/**
* Submits the query to a PostgreSQL connection
* @param {object} connection - pg connection object
*/
submit(connection) {}
/**
* Handles errors during stream processing
* @param {Error} err - Error object
*/
handleError(err) {}
/**
* Processes CommandComplete message and extracts row count
* @param {object} msg - Message object with text property
*/
handleCommandComplete(msg) {}
/**
* Called when connection is ready for next query
*/
handleReadyForQuery() {}
}
/**
* CopyFromQueryStream - Writable stream for COPY FROM operations
* @class
* @extends {Writable}
*/
class CopyFromQueryStream {
/**
* @param {string} text - The SQL COPY FROM statement
* @param {StreamOptions} [options] - Stream options
*/
constructor(text, options) {}
/** @type {string} The SQL COPY statement */
text;
/** @type {number} Number of rows processed */
rowCount;
/** @type {object} PostgreSQL connection reference */
connection;
/**
* Submits the query to a PostgreSQL connection
* @param {object} connection - pg connection object
*/
submit(connection) {}
/**
* Callback placeholder for pg timeout mechanism
*/
callback() {}
/**
* Handles errors during stream processing
* @param {Error} e - Error object
*/
handleError(e) {}
/**
* Handles CopyInResponse from PostgreSQL backend
* @param {object} connection - pg connection object
*/
handleCopyInResponse(connection) {}
/**
* Processes CommandComplete message and extracts row count
* @param {object} msg - Message object with text property
*/
handleCommandComplete(msg) {}
/**
* Called when connection is ready for next query
*/
handleReadyForQuery() {}
}
/**
* CopyBothQueryStream - Duplex stream for COPY BOTH operations
* @class
* @extends {Duplex}
*/
class CopyBothQueryStream {
/**
* @param {string} text - The SQL COPY statement
* @param {CopyBothOptions} [options] - Stream options
*/
constructor(text, options) {}
/** @type {string} The SQL COPY statement */
text;
/** @type {object} PostgreSQL connection reference */
connection;
/** @type {boolean} Whether to align data on COPY data frame boundaries */
alignOnCopyDataFrame;
/**
* Submits the query to a PostgreSQL connection
* @param {object} connection - pg connection object
*/
submit(connection) {}
/**
* Handles errors during stream processing
* @param {Error} err - Error object
*/
handleError(err) {}
/**
* Handles out-of-band CopyData messages
* @param {Buffer} chunk - Data chunk
*/
handleCopyData(chunk) {}
/**
* Called when command completes
*/
handleCommandComplete() {}
/**
* Called when connection is ready for next query
*/
handleReadyForQuery() {}
}pg-copy-streams provides comprehensive error handling for various failure scenarios:
pg driver (not pg.native)