CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-pg-copy-streams

Low-level streaming interfaces for PostgreSQL's COPY operations in Node.js

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

copy-both.mddocs/

COPY BOTH Operations (Bidirectional Streaming)

Creates duplex streams for bidirectional data operations, primarily used for replication scenarios and logical decoding. COPY BOTH allows simultaneous reading and writing within a single stream, enabling advanced PostgreSQL features like logical replication slots.

Capabilities

Both Function

Creates a duplex stream for COPY BOTH operations.

/**
 * Creates a duplex stream for COPY BOTH operations
 * @param {string} text - SQL COPY statement for bidirectional operations
 * @param {object} [options] - Optional stream configuration with COPY BOTH specific options
 * @returns {CopyBothQueryStream} CopyBothQueryStream instance
 */
function both(text, options);

Usage Examples:

const { both: copyBoth } = require('pg-copy-streams');
const { Pool } = require('pg');

// Basic replication slot streaming
const pool = new Pool();
const client = await pool.connect();
const stream = client.query(copyBoth("START_REPLICATION SLOT my_slot LOGICAL 0/0"));

// With frame alignment option
const alignedStream = client.query(copyBoth(
  "START_REPLICATION SLOT my_slot LOGICAL 0/0",
  { alignOnCopyDataFrame: true }
));

CopyBothQueryStream Class

Duplex stream implementation combining readable and writable functionality for COPY BOTH operations.

/**
 * CopyBothQueryStream - Duplex stream for COPY BOTH operations
 * @class
 * @extends {Duplex}
 */
class CopyBothQueryStream {
  /**
   * @param {string} text - The SQL COPY statement
   * @param {object} [options] - Stream options
   */
  constructor(text, options) {}
  
  /** @type {string} The SQL COPY statement */
  text;
  
  /** @type {object} PostgreSQL connection reference */
  connection;
  
  /** @type {boolean} Whether to align data on COPY data frame boundaries */
  alignOnCopyDataFrame;
  
  /** @type {Array} Internal buffer for writable data chunks */
  chunks;
}

Stream Submission

Submits the COPY BOTH query to a PostgreSQL connection.

/**
 * Submits the COPY BOTH query to a PostgreSQL connection
 * @param {object} connection - pg connection object
 */
submit(connection);

Usage Example:

const { Pool } = require('pg');
const { both: copyBoth } = require('pg-copy-streams');

const pool = new Pool();
const client = await pool.connect();
try {
  // Set up logical replication
  await client.query("SELECT pg_create_logical_replication_slot('my_slot', 'test_decoding')");
  
  const stream = client.query(copyBoth("START_REPLICATION SLOT my_slot LOGICAL 0/0"));
  
  // Read replication data
  stream.on('data', (chunk) => {
    console.log('Replication data:', chunk.toString());
  });
  
  // Send replication feedback
  const feedback = Buffer.from('feedback_data');
  stream.write(feedback);
  
} finally {
  client.release();
}

Error Handling

Handles errors during the COPY BOTH operation.

/**
 * Handles errors during stream processing
 * @param {Error} err - Error object
 */
handleError(err);

CopyData Handler

Handles out-of-band CopyData messages received after CopyDone.

/**
 * Handles out-of-band CopyData messages
 * @param {Buffer} chunk - Data chunk received
 */
handleCopyData(chunk);

Command Completion

Called when the command completes.

/**
 * Called when command completes
 */
handleCommandComplete();

Ready for Query

Called when the connection is ready for the next query.

/**
 * Called when connection is ready for next query
 */
handleReadyForQuery();

Stream Events

As a duplex stream, CopyBothQueryStream emits both readable and writable stream events:

/**
 * Duplex stream events (readable side)
 */
stream.on('data', (chunk) => {
  // Received data from PostgreSQL (replication data, etc.)
  console.log(`Received ${chunk.length} bytes from PostgreSQL`);
});

stream.on('end', () => {
  // No more data will be received from PostgreSQL
  console.log('Reading side ended');
});

/**
 * Duplex stream events (writable side)
 */
stream.on('drain', () => {
  // Stream is ready to accept more data after being paused
  console.log('Stream ready for more data');
});

stream.on('finish', () => {
  // All data has been written to PostgreSQL
  console.log('Writing side finished');
});

/**
 * Common stream events
 */
stream.on('error', (err) => {
  // Error occurred during streaming
  console.error('Stream error:', err.message);
});

stream.on('close', () => {
  // Stream has been closed
  console.log('Stream closed');
});

Advanced Usage

Logical Replication with Feedback

const { both: copyBoth } = require('pg-copy-streams');

const client = await pool.connect();

// Create replication slot
await client.query("SELECT pg_create_logical_replication_slot('my_app_slot', 'test_decoding')");

const stream = client.query(copyBoth("START_REPLICATION SLOT my_app_slot LOGICAL 0/0"));

let lastLSN = null;

stream.on('data', (chunk) => {
  const data = chunk.toString();
  console.log('Logical replication data:', data);
  
  // Parse LSN from replication message (simplified)
  const lsnMatch = data.match(/LSN: ([0-9A-F\/]+)/);
  if (lsnMatch) {
    lastLSN = lsnMatch[1];
  }
  
  // Send feedback to acknowledge processing
  if (lastLSN) {
    const feedback = createStandbyStatusUpdate(lastLSN);
    stream.write(feedback);
  }
});

function createStandbyStatusUpdate(lsn) {
  // Create PostgreSQL standby status update message
  // This is a simplified example - real implementation needs proper message formatting
  const buffer = Buffer.alloc(34);
  buffer.writeUInt8(0x72, 0); // 'r' message type
  // ... additional message formatting
  return buffer;
}

Frame Alignment Usage

// With frame alignment enabled
const alignedStream = client.query(copyBoth(
  "START_REPLICATION SLOT my_slot LOGICAL 0/0",
  { alignOnCopyDataFrame: true }
));

// Data will be aligned on COPY data frame boundaries
alignedStream.on('data', (chunk) => {
  // chunk contains complete COPY data frames
  console.log('Complete frame:', chunk.length);
});

// Without frame alignment (default)
const stream = client.query(copyBoth("START_REPLICATION SLOT my_slot LOGICAL 0/0"));

// Data may be received in partial frames
stream.on('data', (chunk) => {
  // chunk may contain partial COPY data frames
  console.log('Data chunk:', chunk.length);
});

Bidirectional Communication

const { both: copyBoth } = require('pg-copy-streams');
const { Transform } = require('stream');

const client = await pool.connect();
const stream = client.query(copyBoth("START_REPLICATION SLOT my_slot LOGICAL 0/0"));

// Process incoming replication data
const processor = new Transform({
  transform(chunk, encoding, callback) {
    const data = chunk.toString();
    
    // Process replication messages
    if (data.includes('BEGIN')) {
      console.log('Transaction started');
    } else if (data.includes('COMMIT')) {
      console.log('Transaction committed');
      
      // Send acknowledgment
      const ack = createAckMessage();
      this.push(ack);
    }
    
    callback();
  }
});

// Set up bidirectional pipeline
stream.pipe(processor).pipe(stream, { end: false });

function createAckMessage() {
  // Create acknowledgment message
  return Buffer.from('ACK\n');
}

Options

alignOnCopyDataFrame

Controls whether data is aligned on COPY data frame boundaries.

/**
 * @typedef {object} CopyBothOptions
 * @property {boolean} [alignOnCopyDataFrame=false] - Whether to align data on COPY data frame boundaries
 * @property {number} [highWaterMark] - Stream high water mark
 * @property {boolean} [objectMode] - Object mode flag
 * @property {string} [encoding] - Stream encoding
 */

When alignOnCopyDataFrame is true:

  • Data is buffered until complete COPY data frames are available
  • Each 'data' event contains complete frames
  • Higher memory usage but easier parsing

When alignOnCopyDataFrame is false (default):

  • Data is forwarded as soon as it arrives
  • More efficient memory usage
  • May require frame boundary handling in application code

Important Notes

  • COPY BOTH is primarily used for PostgreSQL replication scenarios
  • This is an advanced feature requiring knowledge of PostgreSQL replication protocols
  • Logical replication requires appropriate PostgreSQL configuration (wal_level = logical)
  • Replication slots must be created before use and cleaned up after use
  • The stream handles both readable and writable operations simultaneously
  • Proper error handling is crucial as replication operations can be long-running
  • Frame alignment affects memory usage and parsing complexity

Install with Tessl CLI

npx tessl i tessl/npm-pg-copy-streams

docs

copy-both.md

copy-from.md

copy-to.md

index.md

tile.json