Low-level streaming interfaces for PostgreSQL's COPY operations in Node.js
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Creates duplex streams for bidirectional data operations, primarily used for replication scenarios and logical decoding. COPY BOTH allows simultaneous reading and writing within a single stream, enabling advanced PostgreSQL features like logical replication slots.
Creates a duplex stream for COPY BOTH operations.
/**
* Creates a duplex stream for COPY BOTH operations
* @param {string} text - SQL COPY statement for bidirectional operations
* @param {object} [options] - Optional stream configuration with COPY BOTH specific options
* @returns {CopyBothQueryStream} CopyBothQueryStream instance
*/
function both(text, options);Usage Examples:
const { both: copyBoth } = require('pg-copy-streams');
const { Pool } = require('pg');
// Basic replication slot streaming
const pool = new Pool();
const client = await pool.connect();
const stream = client.query(copyBoth("START_REPLICATION SLOT my_slot LOGICAL 0/0"));
// With frame alignment option
const alignedStream = client.query(copyBoth(
"START_REPLICATION SLOT my_slot LOGICAL 0/0",
{ alignOnCopyDataFrame: true }
));Duplex stream implementation combining readable and writable functionality for COPY BOTH operations.
/**
* CopyBothQueryStream - Duplex stream for COPY BOTH operations
* @class
* @extends {Duplex}
*/
class CopyBothQueryStream {
/**
* @param {string} text - The SQL COPY statement
* @param {object} [options] - Stream options
*/
constructor(text, options) {}
/** @type {string} The SQL COPY statement */
text;
/** @type {object} PostgreSQL connection reference */
connection;
/** @type {boolean} Whether to align data on COPY data frame boundaries */
alignOnCopyDataFrame;
/** @type {Array} Internal buffer for writable data chunks */
chunks;
}Submits the COPY BOTH query to a PostgreSQL connection.
/**
* Submits the COPY BOTH query to a PostgreSQL connection
* @param {object} connection - pg connection object
*/
submit(connection);Usage Example:
const { Pool } = require('pg');
const { both: copyBoth } = require('pg-copy-streams');
const pool = new Pool();
const client = await pool.connect();
try {
// Set up logical replication
await client.query("SELECT pg_create_logical_replication_slot('my_slot', 'test_decoding')");
const stream = client.query(copyBoth("START_REPLICATION SLOT my_slot LOGICAL 0/0"));
// Read replication data
stream.on('data', (chunk) => {
console.log('Replication data:', chunk.toString());
});
// Send replication feedback
const feedback = Buffer.from('feedback_data');
stream.write(feedback);
} finally {
client.release();
}Handles errors during the COPY BOTH operation.
/**
* Handles errors during stream processing
* @param {Error} err - Error object
*/
handleError(err);Handles out-of-band CopyData messages received after CopyDone.
/**
* Handles out-of-band CopyData messages
* @param {Buffer} chunk - Data chunk received
*/
handleCopyData(chunk);Called when the command completes.
/**
* Called when command completes
*/
handleCommandComplete();Called when the connection is ready for the next query.
/**
* Called when connection is ready for next query
*/
handleReadyForQuery();As a duplex stream, CopyBothQueryStream emits both readable and writable stream events:
/**
* Duplex stream events (readable side)
*/
stream.on('data', (chunk) => {
// Received data from PostgreSQL (replication data, etc.)
console.log(`Received ${chunk.length} bytes from PostgreSQL`);
});
stream.on('end', () => {
// No more data will be received from PostgreSQL
console.log('Reading side ended');
});
/**
* Duplex stream events (writable side)
*/
stream.on('drain', () => {
// Stream is ready to accept more data after being paused
console.log('Stream ready for more data');
});
stream.on('finish', () => {
// All data has been written to PostgreSQL
console.log('Writing side finished');
});
/**
* Common stream events
*/
stream.on('error', (err) => {
// Error occurred during streaming
console.error('Stream error:', err.message);
});
stream.on('close', () => {
// Stream has been closed
console.log('Stream closed');
});const { both: copyBoth } = require('pg-copy-streams');
const client = await pool.connect();
// Create replication slot
await client.query("SELECT pg_create_logical_replication_slot('my_app_slot', 'test_decoding')");
const stream = client.query(copyBoth("START_REPLICATION SLOT my_app_slot LOGICAL 0/0"));
let lastLSN = null;
stream.on('data', (chunk) => {
const data = chunk.toString();
console.log('Logical replication data:', data);
// Parse LSN from replication message (simplified)
const lsnMatch = data.match(/LSN: ([0-9A-F\/]+)/);
if (lsnMatch) {
lastLSN = lsnMatch[1];
}
// Send feedback to acknowledge processing
if (lastLSN) {
const feedback = createStandbyStatusUpdate(lastLSN);
stream.write(feedback);
}
});
function createStandbyStatusUpdate(lsn) {
// Create PostgreSQL standby status update message
// This is a simplified example - real implementation needs proper message formatting
const buffer = Buffer.alloc(34);
buffer.writeUInt8(0x72, 0); // 'r' message type
// ... additional message formatting
return buffer;
}// With frame alignment enabled
const alignedStream = client.query(copyBoth(
"START_REPLICATION SLOT my_slot LOGICAL 0/0",
{ alignOnCopyDataFrame: true }
));
// Data will be aligned on COPY data frame boundaries
alignedStream.on('data', (chunk) => {
// chunk contains complete COPY data frames
console.log('Complete frame:', chunk.length);
});
// Without frame alignment (default)
const stream = client.query(copyBoth("START_REPLICATION SLOT my_slot LOGICAL 0/0"));
// Data may be received in partial frames
stream.on('data', (chunk) => {
// chunk may contain partial COPY data frames
console.log('Data chunk:', chunk.length);
});const { both: copyBoth } = require('pg-copy-streams');
const { Transform } = require('stream');
const client = await pool.connect();
const stream = client.query(copyBoth("START_REPLICATION SLOT my_slot LOGICAL 0/0"));
// Process incoming replication data
const processor = new Transform({
transform(chunk, encoding, callback) {
const data = chunk.toString();
// Process replication messages
if (data.includes('BEGIN')) {
console.log('Transaction started');
} else if (data.includes('COMMIT')) {
console.log('Transaction committed');
// Send acknowledgment
const ack = createAckMessage();
this.push(ack);
}
callback();
}
});
// Set up bidirectional pipeline
stream.pipe(processor).pipe(stream, { end: false });
function createAckMessage() {
// Create acknowledgment message
return Buffer.from('ACK\n');
}Controls whether data is aligned on COPY data frame boundaries.
/**
* @typedef {object} CopyBothOptions
* @property {boolean} [alignOnCopyDataFrame=false] - Whether to align data on COPY data frame boundaries
* @property {number} [highWaterMark] - Stream high water mark
* @property {boolean} [objectMode] - Object mode flag
* @property {string} [encoding] - Stream encoding
*/When alignOnCopyDataFrame is true:
When alignOnCopyDataFrame is false (default):
wal_level = logical)Install with Tessl CLI
npx tessl i tessl/npm-pg-copy-streams