Low-level streaming interfaces for PostgreSQL's COPY operations in Node.js
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Creates writable streams for importing data from external sources into PostgreSQL tables. The COPY FROM operation provides high-performance bulk data loading directly into PostgreSQL without the overhead of individual INSERT statements.
Creates a writable stream for COPY FROM operations.
/**
* Creates a writable stream for COPY FROM operations
* @param {string} text - SQL COPY FROM statement (e.g., 'COPY my_table FROM STDIN')
* @param {object} [options] - Optional stream configuration
* @returns {CopyFromQueryStream} CopyFromQueryStream instance
*/
function from(text, options);Usage Examples:
const { from: copyFrom } = require('pg-copy-streams');
const { Pool } = require('pg');
const fs = require('fs');
// Basic table import
const pool = new Pool();
const client = await pool.connect();
const stream = client.query(copyFrom('COPY users FROM STDIN'));
// Import CSV with headers
const csvStream = client.query(copyFrom("COPY products FROM STDIN WITH (FORMAT CSV, HEADER)"));
// Import with custom delimiter
const tsvStream = client.query(copyFrom("COPY logs FROM STDIN WITH (FORMAT TEXT, DELIMITER E'\\t')"));Writable stream implementation for COPY FROM operations.
/**
* CopyFromQueryStream - Writable stream for COPY FROM operations
* @class
* @extends {Writable}
*/
class CopyFromQueryStream {
/**
* @param {string} text - The SQL COPY FROM statement
* @param {object} [options] - Stream options
*/
constructor(text, options) {}
/** @type {string} The SQL COPY FROM statement */
text;
/** @type {number} Number of rows imported (available after completion) */
rowCount;
/** @type {object} PostgreSQL connection reference */
connection;
/** @type {Array} Internal buffer for data chunks */
chunks;
}Submits the COPY FROM query to a PostgreSQL connection.
/**
* Submits the COPY FROM query to a PostgreSQL connection
* @param {object} connection - pg connection object
*/
submit(connection);Usage Example:
const { Pool } = require('pg');
const { from: copyFrom } = require('pg-copy-streams');
const { pipeline } = require('stream/promises');
const fs = require('fs');
const pool = new Pool();
const client = await pool.connect();
try {
const ingestStream = client.query(copyFrom('COPY users FROM STDIN'));
const sourceStream = fs.createReadStream('users.csv');
await pipeline(sourceStream, ingestStream);
console.log(`Imported ${ingestStream.rowCount} rows`);
} finally {
client.release();
}Placeholder callback for pg timeout mechanism integration.
/**
* Empty callback placeholder for pg timeout mechanism
* This is overwritten by pg when query_timeout config is set
*/
callback();Handles errors during the COPY FROM operation.
/**
* Handles errors during stream processing
* @param {Error} e - Error object
*/
handleError(e);Handles the CopyInResponse message from PostgreSQL backend.
/**
* Handles CopyInResponse from PostgreSQL backend
* @param {object} connection - pg connection object
*/
handleCopyInResponse(connection);Processes the CommandComplete message and extracts row count.
/**
* Processes CommandComplete message and extracts row count
* @param {object} msg - Message object with text property containing row count
*/
handleCommandComplete(msg);Called when the connection is ready for the next query.
/**
* Called when connection is ready for next query
*/
handleReadyForQuery();As a writable stream, CopyFromQueryStream emits standard Node.js stream events:
/**
* Standard writable stream events
*/
stream.on('drain', () => {
// Stream is ready to accept more data after being paused
console.log('Stream ready for more data');
});
stream.on('finish', () => {
// All data has been written and processed by PostgreSQL
console.log(`Import completed. Rows: ${stream.rowCount}`);
});
stream.on('error', (err) => {
// Error occurred during streaming
console.error('Stream error:', err.message);
});
stream.on('close', () => {
// Stream has been closed
console.log('Stream closed');
});
stream.on('pipe', (src) => {
// A readable stream has been piped to this writable stream
console.log('Source stream piped to copy stream');
});const { from: copyFrom } = require('pg-copy-streams');
const client = await pool.connect();
const ingestStream = client.query(copyFrom('COPY users FROM STDIN'));
// Handle stream destruction (sends CopyFail message)
ingestStream.on('error', (err) => {
console.error('Import failed:', err.message);
// Stream will automatically send CopyFail to rollback the operation
});
// Manually destroy stream if needed
setTimeout(() => {
ingestStream.destroy(new Error('Timeout exceeded'));
}, 30000);const { Transform } = require('stream');
const { pipeline } = require('stream/promises');
const csvParser = require('csv-parser');
const client = await pool.connect();
const ingestStream = client.query(copyFrom('COPY processed_users FROM STDIN'));
// Transform CSV data before import
const transformStream = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
// Convert parsed CSV object back to PostgreSQL text format
const line = `${chunk.name}\t${chunk.email}\t${chunk.age}\n`;
callback(null, line);
}
});
await pipeline(
fs.createReadStream('users.csv'),
csvParser(),
transformStream,
ingestStream
);const binaryStream = client.query(copyFrom("COPY data_table FROM STDIN WITH (FORMAT BINARY)"));
// Binary data must be properly formatted according to PostgreSQL binary protocol
const binaryData = Buffer.from([
// PostgreSQL binary COPY format headers and data
// This requires knowledge of PostgreSQL's binary encoding
]);
binaryStream.write(binaryData);
binaryStream.end();const { glob } = require('glob');
const files = await glob('data/*.csv');
const client = await pool.connect();
try {
await client.query('BEGIN');
for (const file of files) {
const ingestStream = client.query(copyFrom('COPY temp_import FROM STDIN WITH (FORMAT CSV)'));
const sourceStream = fs.createReadStream(file);
await pipeline(sourceStream, ingestStream);
console.log(`Imported ${ingestStream.rowCount} rows from ${file}`);
}
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}rowCount property is only available after the stream finishesInstall with Tessl CLI
npx tessl i tessl/npm-pg-copy-streams