Low-level streaming interfaces for PostgreSQL's COPY operations in Node.js
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Creates readable streams for exporting data from PostgreSQL tables to external destinations. The COPY TO operation streams data directly from PostgreSQL without loading entire datasets into memory.
Creates a readable stream for COPY TO operations.
/**
* Creates a readable stream for COPY TO operations
* @param {string} text - SQL COPY TO statement (e.g., 'COPY my_table TO STDOUT')
* @param {object} [options] - Optional stream configuration
* @returns {CopyToQueryStream} CopyToQueryStream instance
*/
function to(text, options);Usage Examples:
const { to: copyTo } = require('pg-copy-streams');
const { Pool } = require('pg');
// Basic table export
const pool = new Pool();
const client = await pool.connect();
const stream = client.query(copyTo('COPY users TO STDOUT'));
// Export with CSV format
const csvStream = client.query(copyTo("COPY products TO STDOUT WITH (FORMAT CSV, HEADER)"));
// Export with custom delimiter
const tsvStream = client.query(copyTo("COPY logs TO STDOUT WITH (FORMAT TEXT, DELIMITER E'\\t')"));Readable stream implementation for COPY TO operations.
/**
* CopyToQueryStream - Readable stream for COPY TO operations
* @class
* @extends {Readable}
*/
class CopyToQueryStream {
/**
* @param {string} text - The SQL COPY TO statement
* @param {object} [options] - Stream options
*/
constructor(text, options) {}
/** @type {string} The SQL COPY TO statement */
text;
/** @type {number} Number of rows exported (available after completion) */
rowCount;
/** @type {object} PostgreSQL connection reference */
connection;
}Submits the COPY TO query to a PostgreSQL connection.
/**
* Submits the COPY TO query to a PostgreSQL connection
* @param {object} connection - pg connection object
*/
submit(connection);Usage Example:
const { Pool } = require('pg');
const { to: copyTo } = require('pg-copy-streams');
const pool = new Pool();
const client = await pool.connect();
try {
const stream = client.query(copyTo('COPY my_table TO STDOUT'));
// Pipe to file
const fs = require('fs');
const writeStream = fs.createWriteStream('export.csv');
stream.pipe(writeStream);
// Wait for completion
await new Promise((resolve, reject) => {
stream.on('end', () => {
console.log(`Exported ${stream.rowCount} rows`);
resolve();
});
stream.on('error', reject);
});
} finally {
client.release();
}Handles errors during the COPY TO operation.
/**
* Handles errors during stream processing
* @param {Error} err - Error object
*/
handleError(err);Processes the CommandComplete message and extracts row count.
/**
* Processes CommandComplete message and extracts row count
* @param {object} msg - Message object with text property containing row count
*/
handleCommandComplete(msg);Called when the connection is ready for the next query.
/**
* Called when connection is ready for next query
*/
handleReadyForQuery();As a readable stream, CopyToQueryStream emits standard Node.js stream events:
/**
* Standard readable stream events
*/
stream.on('data', (chunk) => {
// Received data chunk from PostgreSQL
console.log(`Received ${chunk.length} bytes`);
});
stream.on('end', () => {
// All data has been read, stream is finished
console.log(`Export completed. Rows: ${stream.rowCount}`);
});
stream.on('error', (err) => {
// Error occurred during streaming
console.error('Stream error:', err.message);
});
stream.on('close', () => {
// Stream has been closed
console.log('Stream closed');
});const { pipeline } = require('stream/promises');
const csvParser = require('csv-parser');
const { to: copyTo } = require('pg-copy-streams');
const client = await pool.connect();
const copyStream = client.query(copyTo("COPY products TO STDOUT WITH (FORMAT CSV, HEADER)"));
const results = [];
await pipeline(
copyStream,
csvParser(),
async function* (source) {
for await (const chunk of source) {
// Process each parsed CSV row
yield { ...chunk, processed: true };
}
},
async function (source) {
for await (const row of source) {
results.push(row);
}
}
);
console.log(`Processed ${results.length} rows`);const binaryStream = client.query(copyTo("COPY data_table TO STDOUT WITH (FORMAT BINARY)"));
// Binary data requires special handling
binaryStream.on('data', (chunk) => {
// chunk contains binary-encoded PostgreSQL data
console.log(`Received ${chunk.length} bytes`);
});rowCount property is only available after the stream endsInstall with Tessl CLI
npx tessl i tessl/npm-pg-copy-streams