0
# COPY BOTH Operations (Bidirectional Streaming)
1
2
Creates duplex streams for bidirectional data operations, primarily used for replication scenarios and logical decoding. COPY BOTH allows simultaneous reading and writing within a single stream, enabling advanced PostgreSQL features like logical replication slots.
3
4
## Capabilities
5
6
### Both Function
7
8
Creates a duplex stream for COPY BOTH operations.
9
10
```javascript { .api }
11
/**
12
* Creates a duplex stream for COPY BOTH operations
13
* @param {string} text - SQL COPY statement for bidirectional operations
14
* @param {object} [options] - Optional stream configuration with COPY BOTH specific options
15
* @returns {CopyBothQueryStream} CopyBothQueryStream instance
16
*/
17
function both(text, options);
18
```
19
20
**Usage Examples:**
21
22
```javascript
23
const { both: copyBoth } = require('pg-copy-streams');
24
const { Pool } = require('pg');
25
26
// Basic replication slot streaming
27
const pool = new Pool();
28
const client = await pool.connect();
29
const stream = client.query(copyBoth("START_REPLICATION SLOT my_slot LOGICAL 0/0"));
30
31
// With frame alignment option
32
const alignedStream = client.query(copyBoth(
33
"START_REPLICATION SLOT my_slot LOGICAL 0/0",
34
{ alignOnCopyDataFrame: true }
35
));
36
```
37
38
### CopyBothQueryStream Class
39
40
Duplex stream implementation combining readable and writable functionality for COPY BOTH operations.
41
42
```javascript { .api }
43
/**
44
* CopyBothQueryStream - Duplex stream for COPY BOTH operations
45
* @class
46
* @extends {Duplex}
47
*/
48
class CopyBothQueryStream {
49
/**
50
* @param {string} text - The SQL COPY statement
51
* @param {object} [options] - Stream options
52
*/
53
constructor(text, options) {}
54
55
/** @type {string} The SQL COPY statement */
56
text;
57
58
/** @type {object} PostgreSQL connection reference */
59
connection;
60
61
/** @type {boolean} Whether to align data on COPY data frame boundaries */
62
alignOnCopyDataFrame;
63
64
/** @type {Array} Internal buffer for writable data chunks */
65
chunks;
66
}
67
```
68
69
### Stream Submission
70
71
Submits the COPY BOTH query to a PostgreSQL connection.
72
73
```javascript { .api }
74
/**
75
* Submits the COPY BOTH query to a PostgreSQL connection
76
* @param {object} connection - pg connection object
77
*/
78
submit(connection);
79
```
80
81
**Usage Example:**
82
83
```javascript
84
const { Pool } = require('pg');
85
const { both: copyBoth } = require('pg-copy-streams');
86
87
const pool = new Pool();
88
const client = await pool.connect();
89
try {
90
// Set up logical replication
91
await client.query("SELECT pg_create_logical_replication_slot('my_slot', 'test_decoding')");
92
93
const stream = client.query(copyBoth("START_REPLICATION SLOT my_slot LOGICAL 0/0"));
94
95
// Read replication data
96
stream.on('data', (chunk) => {
97
console.log('Replication data:', chunk.toString());
98
});
99
100
// Send replication feedback
101
const feedback = Buffer.from('feedback_data');
102
stream.write(feedback);
103
104
} finally {
105
client.release();
106
}
107
```
108
109
### Error Handling
110
111
Handles errors during the COPY BOTH operation.
112
113
```javascript { .api }
114
/**
115
* Handles errors during stream processing
116
* @param {Error} err - Error object
117
*/
118
handleError(err);
119
```
120
121
### CopyData Handler
122
123
Handles out-of-band CopyData messages received after CopyDone.
124
125
```javascript { .api }
126
/**
127
* Handles out-of-band CopyData messages
128
* @param {Buffer} chunk - Data chunk received
129
*/
130
handleCopyData(chunk);
131
```
132
133
### Command Completion
134
135
Called when the command completes.
136
137
```javascript { .api }
138
/**
139
* Called when command completes
140
*/
141
handleCommandComplete();
142
```
143
144
### Ready for Query
145
146
Called when the connection is ready for the next query.
147
148
```javascript { .api }
149
/**
150
* Called when connection is ready for next query
151
*/
152
handleReadyForQuery();
153
```
154
155
## Stream Events
156
157
As a duplex stream, CopyBothQueryStream emits both readable and writable stream events:
158
159
```javascript { .api }
160
/**
161
* Duplex stream events (readable side)
162
*/
163
stream.on('data', (chunk) => {
164
// Received data from PostgreSQL (replication data, etc.)
165
console.log(`Received ${chunk.length} bytes from PostgreSQL`);
166
});
167
168
stream.on('end', () => {
169
// No more data will be received from PostgreSQL
170
console.log('Reading side ended');
171
});
172
173
/**
174
* Duplex stream events (writable side)
175
*/
176
stream.on('drain', () => {
177
// Stream is ready to accept more data after being paused
178
console.log('Stream ready for more data');
179
});
180
181
stream.on('finish', () => {
182
// All data has been written to PostgreSQL
183
console.log('Writing side finished');
184
});
185
186
/**
187
* Common stream events
188
*/
189
stream.on('error', (err) => {
190
// Error occurred during streaming
191
console.error('Stream error:', err.message);
192
});
193
194
stream.on('close', () => {
195
// Stream has been closed
196
console.log('Stream closed');
197
});
198
```
199
200
## Advanced Usage
201
202
### Logical Replication with Feedback
203
204
```javascript
205
const { both: copyBoth } = require('pg-copy-streams');
206
207
const client = await pool.connect();
208
209
// Create replication slot
210
await client.query("SELECT pg_create_logical_replication_slot('my_app_slot', 'test_decoding')");
211
212
const stream = client.query(copyBoth("START_REPLICATION SLOT my_app_slot LOGICAL 0/0"));
213
214
let lastLSN = null;
215
216
stream.on('data', (chunk) => {
217
const data = chunk.toString();
218
console.log('Logical replication data:', data);
219
220
// Parse LSN from replication message (simplified)
221
const lsnMatch = data.match(/LSN: ([0-9A-F\/]+)/);
222
if (lsnMatch) {
223
lastLSN = lsnMatch[1];
224
}
225
226
// Send feedback to acknowledge processing
227
if (lastLSN) {
228
const feedback = createStandbyStatusUpdate(lastLSN);
229
stream.write(feedback);
230
}
231
});
232
233
function createStandbyStatusUpdate(lsn) {
234
// Create PostgreSQL standby status update message
235
// This is a simplified example - real implementation needs proper message formatting
236
const buffer = Buffer.alloc(34);
237
buffer.writeUInt8(0x72, 0); // 'r' message type
238
// ... additional message formatting
239
return buffer;
240
}
241
```
242
243
### Frame Alignment Usage
244
245
```javascript
246
// With frame alignment enabled
247
const alignedStream = client.query(copyBoth(
248
"START_REPLICATION SLOT my_slot LOGICAL 0/0",
249
{ alignOnCopyDataFrame: true }
250
));
251
252
// Data will be aligned on COPY data frame boundaries
253
alignedStream.on('data', (chunk) => {
254
// chunk contains complete COPY data frames
255
console.log('Complete frame:', chunk.length);
256
});
257
258
// Without frame alignment (default)
259
const stream = client.query(copyBoth("START_REPLICATION SLOT my_slot LOGICAL 0/0"));
260
261
// Data may be received in partial frames
262
stream.on('data', (chunk) => {
263
// chunk may contain partial COPY data frames
264
console.log('Data chunk:', chunk.length);
265
});
266
```
267
268
### Bidirectional Communication
269
270
```javascript
271
const { both: copyBoth } = require('pg-copy-streams');
272
const { Transform } = require('stream');
273
274
const client = await pool.connect();
275
const stream = client.query(copyBoth("START_REPLICATION SLOT my_slot LOGICAL 0/0"));
276
277
// Process incoming replication data
278
const processor = new Transform({
279
transform(chunk, encoding, callback) {
280
const data = chunk.toString();
281
282
// Process replication messages
283
if (data.includes('BEGIN')) {
284
console.log('Transaction started');
285
} else if (data.includes('COMMIT')) {
286
console.log('Transaction committed');
287
288
// Send acknowledgment
289
const ack = createAckMessage();
290
this.push(ack);
291
}
292
293
callback();
294
}
295
});
296
297
// Set up bidirectional pipeline
298
stream.pipe(processor).pipe(stream, { end: false });
299
300
function createAckMessage() {
301
// Create acknowledgment message
302
return Buffer.from('ACK\n');
303
}
304
```
305
306
## Options
307
308
### alignOnCopyDataFrame
309
310
Controls whether data is aligned on COPY data frame boundaries.
311
312
```javascript { .api }
313
/**
314
* @typedef {object} CopyBothOptions
315
* @property {boolean} [alignOnCopyDataFrame=false] - Whether to align data on COPY data frame boundaries
316
* @property {number} [highWaterMark] - Stream high water mark
317
* @property {boolean} [objectMode] - Object mode flag
318
* @property {string} [encoding] - Stream encoding
319
*/
320
```
321
322
When `alignOnCopyDataFrame` is `true`:
323
- Data is buffered until complete COPY data frames are available
324
- Each 'data' event contains complete frames
325
- Higher memory usage but easier parsing
326
327
When `alignOnCopyDataFrame` is `false` (default):
328
- Data is forwarded as soon as it arrives
329
- More efficient memory usage
330
- May require frame boundary handling in application code
331
332
## Important Notes
333
334
- COPY BOTH is primarily used for PostgreSQL replication scenarios
335
- This is an advanced feature requiring knowledge of PostgreSQL replication protocols
336
- Logical replication requires appropriate PostgreSQL configuration (`wal_level = logical`)
337
- Replication slots must be created before use and cleaned up after use
338
- The stream handles both readable and writable operations simultaneously
339
- Proper error handling is crucial as replication operations can be long-running
340
- Frame alignment affects memory usage and parsing complexity