CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-pg-copy-streams

Low-level streaming interfaces for PostgreSQL's COPY operations in Node.js

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

copy-from.mddocs/

COPY FROM Operations (Data Import)

Creates writable streams for importing data from external sources into PostgreSQL tables. The COPY FROM operation provides high-performance bulk data loading directly into PostgreSQL without the overhead of individual INSERT statements.

Capabilities

From Function

Creates a writable stream for COPY FROM operations.

/**
 * Creates a writable stream for COPY FROM operations
 * @param {string} text - SQL COPY FROM statement (e.g., 'COPY my_table FROM STDIN')
 * @param {object} [options] - Optional stream configuration
 * @returns {CopyFromQueryStream} CopyFromQueryStream instance
 */
function from(text, options);

Usage Examples:

const { from: copyFrom } = require('pg-copy-streams');
const { Pool } = require('pg');
const fs = require('fs');

// Basic table import
const pool = new Pool();
const client = await pool.connect();
const stream = client.query(copyFrom('COPY users FROM STDIN'));

// Import CSV with headers
const csvStream = client.query(copyFrom("COPY products FROM STDIN WITH (FORMAT CSV, HEADER)"));

// Import with custom delimiter
const tsvStream = client.query(copyFrom("COPY logs FROM STDIN WITH (FORMAT TEXT, DELIMITER E'\\t')"));

CopyFromQueryStream Class

Writable stream implementation for COPY FROM operations.

/**
 * CopyFromQueryStream - Writable stream for COPY FROM operations
 * @class
 * @extends {Writable}
 */
class CopyFromQueryStream {
  /**
   * @param {string} text - The SQL COPY FROM statement
   * @param {object} [options] - Stream options
   */
  constructor(text, options) {}
  
  /** @type {string} The SQL COPY FROM statement */
  text;
  
  /** @type {number} Number of rows imported (available after completion) */
  rowCount;
  
  /** @type {object} PostgreSQL connection reference */
  connection;
  
  /** @type {Array} Internal buffer for data chunks */
  chunks;
}

Stream Submission

Submits the COPY FROM query to a PostgreSQL connection.

/**
 * Submits the COPY FROM query to a PostgreSQL connection
 * @param {object} connection - pg connection object
 */
submit(connection);

Usage Example:

const { Pool } = require('pg');
const { from: copyFrom } = require('pg-copy-streams');
const { pipeline } = require('stream/promises');
const fs = require('fs');

const pool = new Pool();
const client = await pool.connect();
try {
  const ingestStream = client.query(copyFrom('COPY users FROM STDIN'));
  const sourceStream = fs.createReadStream('users.csv');
  
  await pipeline(sourceStream, ingestStream);
  console.log(`Imported ${ingestStream.rowCount} rows`);
} finally {
  client.release();
}

Timeout Callback

Placeholder callback for pg timeout mechanism integration.

/**
 * Empty callback placeholder for pg timeout mechanism
 * This is overwritten by pg when query_timeout config is set
 */
callback();

Error Handling

Handles errors during the COPY FROM operation.

/**
 * Handles errors during stream processing
 * @param {Error} e - Error object
 */
handleError(e);

CopyIn Response Handler

Handles the CopyInResponse message from PostgreSQL backend.

/**
 * Handles CopyInResponse from PostgreSQL backend
 * @param {object} connection - pg connection object
 */
handleCopyInResponse(connection);

Command Completion

Processes the CommandComplete message and extracts row count.

/**
 * Processes CommandComplete message and extracts row count
 * @param {object} msg - Message object with text property containing row count
 */
handleCommandComplete(msg);

Ready for Query

Called when the connection is ready for the next query.

/**
 * Called when connection is ready for next query
 */
handleReadyForQuery();

Stream Events

As a writable stream, CopyFromQueryStream emits standard Node.js stream events:

/**
 * Standard writable stream events
 */
stream.on('drain', () => {
  // Stream is ready to accept more data after being paused
  console.log('Stream ready for more data');
});

stream.on('finish', () => {
  // All data has been written and processed by PostgreSQL
  console.log(`Import completed. Rows: ${stream.rowCount}`);
});

stream.on('error', (err) => {
  // Error occurred during streaming
  console.error('Stream error:', err.message);
});

stream.on('close', () => {
  // Stream has been closed
  console.log('Stream closed');
});

stream.on('pipe', (src) => {
  // A readable stream has been piped to this writable stream
  console.log('Source stream piped to copy stream');
});

Advanced Usage

Stream Destruction and Error Recovery

const { from: copyFrom } = require('pg-copy-streams');

const client = await pool.connect();
const ingestStream = client.query(copyFrom('COPY users FROM STDIN'));

// Handle stream destruction (sends CopyFail message)
ingestStream.on('error', (err) => {
  console.error('Import failed:', err.message);
  // Stream will automatically send CopyFail to rollback the operation
});

// Manually destroy stream if needed
setTimeout(() => {
  ingestStream.destroy(new Error('Timeout exceeded'));
}, 30000);

CSV Import with Transformation

const { Transform } = require('stream');
const { pipeline } = require('stream/promises');
const csvParser = require('csv-parser');

const client = await pool.connect();
const ingestStream = client.query(copyFrom('COPY processed_users FROM STDIN'));

// Transform CSV data before import
const transformStream = new Transform({
  objectMode: true,
  transform(chunk, encoding, callback) {
    // Convert parsed CSV object back to PostgreSQL text format
    const line = `${chunk.name}\t${chunk.email}\t${chunk.age}\n`;
    callback(null, line);
  }
});

await pipeline(
  fs.createReadStream('users.csv'),
  csvParser(),
  transformStream,
  ingestStream
);

Binary Format Import

const binaryStream = client.query(copyFrom("COPY data_table FROM STDIN WITH (FORMAT BINARY)"));

// Binary data must be properly formatted according to PostgreSQL binary protocol
const binaryData = Buffer.from([
  // PostgreSQL binary COPY format headers and data
  // This requires knowledge of PostgreSQL's binary encoding
]);

binaryStream.write(binaryData);
binaryStream.end();

Batch Processing with Multiple Files

const { glob } = require('glob');

const files = await glob('data/*.csv');
const client = await pool.connect();

try {
  await client.query('BEGIN');
  
  for (const file of files) {
    const ingestStream = client.query(copyFrom('COPY temp_import FROM STDIN WITH (FORMAT CSV)'));
    const sourceStream = fs.createReadStream(file);
    
    await pipeline(sourceStream, ingestStream);
    console.log(`Imported ${ingestStream.rowCount} rows from ${file}`);
  }
  
  await client.query('COMMIT');
} catch (error) {
  await client.query('ROLLBACK');
  throw error;
} finally {
  client.release();
}

Important Notes

  • Always use the 'finish' event (not 'end') to detect completion in versions 4.0.0+
  • The rowCount property is only available after the stream finishes
  • Streams automatically handle transaction rollback on errors via CopyFail messages
  • Stream destruction sends CopyFail to PostgreSQL for graceful error handling
  • Binary format imports require knowledge of PostgreSQL's binary encoding
  • All operations are transactional - errors will not leave data in inconsistent state

Install with Tessl CLI

npx tessl i tessl/npm-pg-copy-streams

docs

copy-both.md

copy-from.md

copy-to.md

index.md

tile.json