Low-level streaming interfaces for PostgreSQL's COPY operations in Node.js
npx @tessl/cli install tessl/npm-pg-copy-streams@7.0.00
# pg-copy-streams
1
2
pg-copy-streams provides low-level streaming interfaces for PostgreSQL's COPY TO and COPY FROM operations. It enables high-performance data transfer directly between Node.js applications and PostgreSQL databases through three specialized stream types: readable streams for exporting data, writable streams for importing data, and duplex streams for bidirectional operations.
3
4
## Package Information
5
6
- **Package Name**: pg-copy-streams
7
- **Package Type**: npm
8
- **Language**: JavaScript
9
- **Installation**: `npm install pg-copy-streams`
10
11
## Core Imports
12
13
CommonJS (Node.js default):
14
15
```javascript
16
const { to, from, both } = require('pg-copy-streams');
17
18
// Alternative destructuring with aliases
19
const { to: copyTo, from: copyFrom, both: copyBoth } = require('pg-copy-streams');
20
21
// Import entire module
22
const pgCopyStreams = require('pg-copy-streams');
23
const copyTo = pgCopyStreams.to;
24
```
25
26
ES Modules (when using "type": "module" in package.json):
27
28
```javascript
29
import { to, from, both } from 'pg-copy-streams';
30
31
// Alternative with aliases
32
import { to as copyTo, from as copyFrom, both as copyBoth } from 'pg-copy-streams';
33
34
// Default import (not available - module uses named exports only)
35
// import pgCopyStreams from 'pg-copy-streams'; // ❌ This won't work
36
```
37
38
## Basic Usage
39
40
```javascript
41
const { Pool } = require('pg');
42
const { to: copyTo, from: copyFrom } = require('pg-copy-streams');
43
44
const pool = new Pool();
45
46
// Export data from PostgreSQL (COPY TO)
47
pool.connect((err, client, done) => {
48
const stream = client.query(copyTo('COPY my_table TO STDOUT'));
49
stream.pipe(process.stdout);
50
stream.on('end', done);
51
stream.on('error', done);
52
});
53
54
// Import data into PostgreSQL (COPY FROM)
55
const fs = require('fs');
56
pool.connect((err, client, done) => {
57
const stream = client.query(copyFrom('COPY my_table FROM STDIN'));
58
const fileStream = fs.createReadStream('data.csv');
59
fileStream.pipe(stream);
60
stream.on('finish', done);
61
stream.on('error', done);
62
});
63
```
64
65
## Architecture
66
67
pg-copy-streams is built around three core components:
68
69
- **Stream Factory Functions**: Entry-point functions (`to`, `from`, `both`) that create specialized stream instances
70
- **Stream Classes**: Three stream implementations extending Node.js stream classes (Readable, Writable, Duplex)
71
- **Protocol Integration**: Low-level PostgreSQL protocol handling for COPY operations with message parsing and connection management
72
- **Node-postgres Integration**: Seamless integration with the `pg` library connection and query system
73
74
## Capabilities
75
76
### COPY TO Operations (Data Export)
77
78
Creates readable streams for exporting data from PostgreSQL tables to external destinations. Ideal for backup operations, data extraction, and ETL processes.
79
80
```javascript { .api }
81
/**
82
* Creates a readable stream for COPY TO operations
83
* @param {string} text - SQL COPY TO statement
84
* @param {object} [options] - Optional stream configuration
85
* @returns {CopyToQueryStream} Readable stream instance
86
*/
87
function to(text, options);
88
```
89
90
[COPY TO Operations](./copy-to.md)
91
92
### COPY FROM Operations (Data Import)
93
94
Creates writable streams for importing data from external sources into PostgreSQL tables. Perfect for bulk data loading, CSV imports, and ETL pipelines.
95
96
```javascript { .api }
97
/**
98
* Creates a writable stream for COPY FROM operations
99
* @param {string} text - SQL COPY FROM statement
100
* @param {object} [options] - Optional stream configuration
101
* @returns {CopyFromQueryStream} Writable stream instance
102
*/
103
function from(text, options);
104
```
105
106
[COPY FROM Operations](./copy-from.md)
107
108
### COPY BOTH Operations (Bidirectional Streaming)
109
110
Creates duplex streams for bidirectional data operations, primarily used for replication and logical decoding scenarios.
111
112
```javascript { .api }
113
/**
114
* Creates a duplex stream for COPY BOTH operations
115
* @param {string} text - SQL COPY statement
116
* @param {object} [options] - Optional stream configuration
117
* @returns {CopyBothQueryStream} Duplex stream instance
118
*/
119
function both(text, options);
120
```
121
122
[COPY BOTH Operations](./copy-both.md)
123
124
## Types
125
126
```javascript { .api }
127
/**
128
* @typedef {object} StreamOptions
129
* @property {number} [highWaterMark] - Stream high water mark
130
* @property {boolean} [objectMode] - Object mode flag
131
* @property {string} [encoding] - Stream encoding
132
*/
133
134
/**
135
* @typedef {StreamOptions} CopyBothOptions
136
* @property {boolean} [alignOnCopyDataFrame] - Whether to align data on COPY data frame boundaries
137
*/
138
139
/**
140
* CopyToQueryStream - Readable stream for COPY TO operations
141
* @class
142
* @extends {Readable}
143
*/
144
class CopyToQueryStream {
145
/**
146
* @param {string} text - The SQL COPY TO statement
147
* @param {StreamOptions} [options] - Stream options
148
*/
149
constructor(text, options) {}
150
151
/** @type {string} The SQL COPY statement */
152
text;
153
154
/** @type {number} Number of rows processed */
155
rowCount;
156
157
/** @type {object} PostgreSQL connection reference */
158
connection;
159
160
/**
161
* Submits the query to a PostgreSQL connection
162
* @param {object} connection - pg connection object
163
*/
164
submit(connection) {}
165
166
/**
167
* Handles errors during stream processing
168
* @param {Error} err - Error object
169
*/
170
handleError(err) {}
171
172
/**
173
* Processes CommandComplete message and extracts row count
174
* @param {object} msg - Message object with text property
175
*/
176
handleCommandComplete(msg) {}
177
178
/**
179
* Called when connection is ready for next query
180
*/
181
handleReadyForQuery() {}
182
}
183
184
/**
185
* CopyFromQueryStream - Writable stream for COPY FROM operations
186
* @class
187
* @extends {Writable}
188
*/
189
class CopyFromQueryStream {
190
/**
191
* @param {string} text - The SQL COPY FROM statement
192
* @param {StreamOptions} [options] - Stream options
193
*/
194
constructor(text, options) {}
195
196
/** @type {string} The SQL COPY statement */
197
text;
198
199
/** @type {number} Number of rows processed */
200
rowCount;
201
202
/** @type {object} PostgreSQL connection reference */
203
connection;
204
205
/**
206
* Submits the query to a PostgreSQL connection
207
* @param {object} connection - pg connection object
208
*/
209
submit(connection) {}
210
211
/**
212
* Callback placeholder for pg timeout mechanism
213
*/
214
callback() {}
215
216
/**
217
* Handles errors during stream processing
218
* @param {Error} e - Error object
219
*/
220
handleError(e) {}
221
222
/**
223
* Handles CopyInResponse from PostgreSQL backend
224
* @param {object} connection - pg connection object
225
*/
226
handleCopyInResponse(connection) {}
227
228
/**
229
* Processes CommandComplete message and extracts row count
230
* @param {object} msg - Message object with text property
231
*/
232
handleCommandComplete(msg) {}
233
234
/**
235
* Called when connection is ready for next query
236
*/
237
handleReadyForQuery() {}
238
}
239
240
/**
241
* CopyBothQueryStream - Duplex stream for COPY BOTH operations
242
* @class
243
* @extends {Duplex}
244
*/
245
class CopyBothQueryStream {
246
/**
247
* @param {string} text - The SQL COPY statement
248
* @param {CopyBothOptions} [options] - Stream options
249
*/
250
constructor(text, options) {}
251
252
/** @type {string} The SQL COPY statement */
253
text;
254
255
/** @type {object} PostgreSQL connection reference */
256
connection;
257
258
/** @type {boolean} Whether to align data on COPY data frame boundaries */
259
alignOnCopyDataFrame;
260
261
/**
262
* Submits the query to a PostgreSQL connection
263
* @param {object} connection - pg connection object
264
*/
265
submit(connection) {}
266
267
/**
268
* Handles errors during stream processing
269
* @param {Error} err - Error object
270
*/
271
handleError(err) {}
272
273
/**
274
* Handles out-of-band CopyData messages
275
* @param {Buffer} chunk - Data chunk
276
*/
277
handleCopyData(chunk) {}
278
279
/**
280
* Called when command completes
281
*/
282
handleCommandComplete() {}
283
284
/**
285
* Called when connection is ready for next query
286
*/
287
handleReadyForQuery() {}
288
}
289
```
290
291
## Error Handling
292
293
pg-copy-streams provides comprehensive error handling for various failure scenarios:
294
295
### Common Error Types
296
297
- **Syntax Errors**: Invalid SQL COPY statements
298
- **Connection Errors**: Database connection failures
299
- **Data Format Errors**: Malformed data that PostgreSQL cannot process
300
- **Permission Errors**: Insufficient database permissions
301
- **Stream Errors**: Network interruptions or stream corruption
302
303
### Error Recovery
304
305
- **Automatic Rollback**: Failed COPY operations are automatically rolled back by PostgreSQL
306
- **CopyFail Messages**: Stream destruction automatically sends CopyFail to abort operations
307
- **Transaction Safety**: Operations within transactions maintain ACID properties
308
309
## Compatibility and Performance
310
311
### PostgreSQL Compatibility
312
313
- **PostgreSQL Versions**: Compatible with PostgreSQL 9.0+
314
- **Protocol Support**: Uses PostgreSQL's native COPY protocol for optimal performance
315
- **Pure JavaScript**: Works only with pure JavaScript `pg` driver (not `pg.native`)
316
317
### Performance Characteristics
318
319
- **High Throughput**: Direct protocol integration bypasses query parsing overhead
320
- **Memory Efficient**: Streaming operations avoid loading entire datasets into memory
321
- **Chunk Boundaries**: PostgreSQL chunks data on 64kB boundaries (may split rows)
322
- **Backpressure Handling**: Proper Node.js stream backpressure support
323
324
### Node.js Requirements
325
326
- **Node.js Version**: Requires Node.js 8.0+ (async/await support recommended)
327
- **Stream API**: Built on Node.js streams for maximum compatibility
328
- **Buffer Handling**: Uses modern Buffer allocation methods for security