0
# COPY FROM Operations (Data Import)
1
2
Creates writable streams for importing data from external sources into PostgreSQL tables. The COPY FROM operation provides high-performance bulk data loading directly into PostgreSQL without the overhead of individual INSERT statements.
3
4
## Capabilities
5
6
### From Function
7
8
Creates a writable stream for COPY FROM operations.
9
10
```javascript { .api }
11
/**
12
* Creates a writable stream for COPY FROM operations
13
* @param {string} text - SQL COPY FROM statement (e.g., 'COPY my_table FROM STDIN')
14
* @param {object} [options] - Optional stream configuration
15
* @returns {CopyFromQueryStream} CopyFromQueryStream instance
16
*/
17
function from(text, options);
18
```
19
20
**Usage Examples:**
21
22
```javascript
23
const { from: copyFrom } = require('pg-copy-streams');
24
const { Pool } = require('pg');
25
const fs = require('fs');
26
27
// Basic table import
28
const pool = new Pool();
29
const client = await pool.connect();
30
const stream = client.query(copyFrom('COPY users FROM STDIN'));
31
32
// Import CSV with headers
33
const csvStream = client.query(copyFrom("COPY products FROM STDIN WITH (FORMAT CSV, HEADER)"));
34
35
// Import with custom delimiter
36
const tsvStream = client.query(copyFrom("COPY logs FROM STDIN WITH (FORMAT TEXT, DELIMITER E'\\t')"));
37
```
38
39
### CopyFromQueryStream Class
40
41
Writable stream implementation for COPY FROM operations.
42
43
```javascript { .api }
44
/**
45
* CopyFromQueryStream - Writable stream for COPY FROM operations
46
* @class
47
* @extends {Writable}
48
*/
49
class CopyFromQueryStream {
50
/**
51
* @param {string} text - The SQL COPY FROM statement
52
* @param {object} [options] - Stream options
53
*/
54
constructor(text, options) {}
55
56
/** @type {string} The SQL COPY FROM statement */
57
text;
58
59
/** @type {number} Number of rows imported (available after completion) */
60
rowCount;
61
62
/** @type {object} PostgreSQL connection reference */
63
connection;
64
65
/** @type {Array} Internal buffer for data chunks */
66
chunks;
67
}
68
```
69
70
### Stream Submission
71
72
Submits the COPY FROM query to a PostgreSQL connection.
73
74
```javascript { .api }
75
/**
76
* Submits the COPY FROM query to a PostgreSQL connection
77
* @param {object} connection - pg connection object
78
*/
79
submit(connection);
80
```
81
82
**Usage Example:**
83
84
```javascript
85
const { Pool } = require('pg');
86
const { from: copyFrom } = require('pg-copy-streams');
87
const { pipeline } = require('stream/promises');
88
const fs = require('fs');
89
90
const pool = new Pool();
91
const client = await pool.connect();
92
try {
93
const ingestStream = client.query(copyFrom('COPY users FROM STDIN'));
94
const sourceStream = fs.createReadStream('users.csv');
95
96
await pipeline(sourceStream, ingestStream);
97
console.log(`Imported ${ingestStream.rowCount} rows`);
98
} finally {
99
client.release();
100
}
101
```
102
103
### Timeout Callback
104
105
Placeholder callback for pg timeout mechanism integration.
106
107
```javascript { .api }
108
/**
109
* Empty callback placeholder for pg timeout mechanism
110
* This is overwritten by pg when query_timeout config is set
111
*/
112
callback();
113
```
114
115
### Error Handling
116
117
Handles errors during the COPY FROM operation.
118
119
```javascript { .api }
120
/**
121
* Handles errors during stream processing
122
* @param {Error} e - Error object
123
*/
124
handleError(e);
125
```
126
127
### CopyIn Response Handler
128
129
Handles the CopyInResponse message from PostgreSQL backend.
130
131
```javascript { .api }
132
/**
133
* Handles CopyInResponse from PostgreSQL backend
134
* @param {object} connection - pg connection object
135
*/
136
handleCopyInResponse(connection);
137
```
138
139
### Command Completion
140
141
Processes the CommandComplete message and extracts row count.
142
143
```javascript { .api }
144
/**
145
* Processes CommandComplete message and extracts row count
146
* @param {object} msg - Message object with text property containing row count
147
*/
148
handleCommandComplete(msg);
149
```
150
151
### Ready for Query
152
153
Called when the connection is ready for the next query.
154
155
```javascript { .api }
156
/**
157
* Called when connection is ready for next query
158
*/
159
handleReadyForQuery();
160
```
161
162
## Stream Events
163
164
As a writable stream, CopyFromQueryStream emits standard Node.js stream events:
165
166
```javascript { .api }
167
/**
168
* Standard writable stream events
169
*/
170
stream.on('drain', () => {
171
// Stream is ready to accept more data after being paused
172
console.log('Stream ready for more data');
173
});
174
175
stream.on('finish', () => {
176
// All data has been written and processed by PostgreSQL
177
console.log(`Import completed. Rows: ${stream.rowCount}`);
178
});
179
180
stream.on('error', (err) => {
181
// Error occurred during streaming
182
console.error('Stream error:', err.message);
183
});
184
185
stream.on('close', () => {
186
// Stream has been closed
187
console.log('Stream closed');
188
});
189
190
stream.on('pipe', (src) => {
191
// A readable stream has been piped to this writable stream
192
console.log('Source stream piped to copy stream');
193
});
194
```
195
196
## Advanced Usage
197
198
### Stream Destruction and Error Recovery
199
200
```javascript
201
const { from: copyFrom } = require('pg-copy-streams');
202
203
const client = await pool.connect();
204
const ingestStream = client.query(copyFrom('COPY users FROM STDIN'));
205
206
// Handle stream destruction (sends CopyFail message)
207
ingestStream.on('error', (err) => {
208
console.error('Import failed:', err.message);
209
// Stream will automatically send CopyFail to rollback the operation
210
});
211
212
// Manually destroy stream if needed
213
setTimeout(() => {
214
ingestStream.destroy(new Error('Timeout exceeded'));
215
}, 30000);
216
```
217
218
### CSV Import with Transformation
219
220
```javascript
221
const { Transform } = require('stream');
222
const { pipeline } = require('stream/promises');
223
const csvParser = require('csv-parser');
224
225
const client = await pool.connect();
226
const ingestStream = client.query(copyFrom('COPY processed_users FROM STDIN'));
227
228
// Transform CSV data before import
229
const transformStream = new Transform({
230
objectMode: true,
231
transform(chunk, encoding, callback) {
232
// Convert parsed CSV object back to PostgreSQL text format
233
const line = `${chunk.name}\t${chunk.email}\t${chunk.age}\n`;
234
callback(null, line);
235
}
236
});
237
238
await pipeline(
239
fs.createReadStream('users.csv'),
240
csvParser(),
241
transformStream,
242
ingestStream
243
);
244
```
245
246
### Binary Format Import
247
248
```javascript
249
const binaryStream = client.query(copyFrom("COPY data_table FROM STDIN WITH (FORMAT BINARY)"));
250
251
// Binary data must be properly formatted according to PostgreSQL binary protocol
252
const binaryData = Buffer.from([
253
// PostgreSQL binary COPY format headers and data
254
// This requires knowledge of PostgreSQL's binary encoding
255
]);
256
257
binaryStream.write(binaryData);
258
binaryStream.end();
259
```
260
261
### Batch Processing with Multiple Files
262
263
```javascript
264
const { glob } = require('glob');
265
266
const files = await glob('data/*.csv');
267
const client = await pool.connect();
268
269
try {
270
await client.query('BEGIN');
271
272
for (const file of files) {
273
const ingestStream = client.query(copyFrom('COPY temp_import FROM STDIN WITH (FORMAT CSV)'));
274
const sourceStream = fs.createReadStream(file);
275
276
await pipeline(sourceStream, ingestStream);
277
console.log(`Imported ${ingestStream.rowCount} rows from ${file}`);
278
}
279
280
await client.query('COMMIT');
281
} catch (error) {
282
await client.query('ROLLBACK');
283
throw error;
284
} finally {
285
client.release();
286
}
287
```
288
289
## Important Notes
290
291
- Always use the 'finish' event (not 'end') to detect completion in versions 4.0.0+
292
- The `rowCount` property is only available after the stream finishes
293
- Streams automatically handle transaction rollback on errors via CopyFail messages
294
- Stream destruction sends CopyFail to PostgreSQL for graceful error handling
295
- Binary format imports require knowledge of PostgreSQL's binary encoding
296
- All operations are transactional - errors will not leave data in inconsistent state