or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

copy-both.mdcopy-from.mdcopy-to.mdindex.md

copy-both.mddocs/

0

# COPY BOTH Operations (Bidirectional Streaming)

1

2

Creates duplex streams for bidirectional data operations, primarily used for replication scenarios and logical decoding. COPY BOTH allows simultaneous reading and writing within a single stream, enabling advanced PostgreSQL features like logical replication slots.

3

4

## Capabilities

5

6

### Both Function

7

8

Creates a duplex stream for COPY BOTH operations.

9

10

```javascript { .api }

11

/**

12

* Creates a duplex stream for COPY BOTH operations

13

* @param {string} text - SQL COPY statement for bidirectional operations

14

* @param {object} [options] - Optional stream configuration with COPY BOTH specific options

15

* @returns {CopyBothQueryStream} CopyBothQueryStream instance

16

*/

17

function both(text, options);

18

```

19

20

**Usage Examples:**

21

22

```javascript

23

const { both: copyBoth } = require('pg-copy-streams');

24

const { Pool } = require('pg');

25

26

// Basic replication slot streaming

27

const pool = new Pool();

28

const client = await pool.connect();

29

const stream = client.query(copyBoth("START_REPLICATION SLOT my_slot LOGICAL 0/0"));

30

31

// With frame alignment option

32

const alignedStream = client.query(copyBoth(

33

"START_REPLICATION SLOT my_slot LOGICAL 0/0",

34

{ alignOnCopyDataFrame: true }

35

));

36

```

37

38

### CopyBothQueryStream Class

39

40

Duplex stream implementation combining readable and writable functionality for COPY BOTH operations.

41

42

```javascript { .api }

43

/**

44

* CopyBothQueryStream - Duplex stream for COPY BOTH operations

45

* @class

46

* @extends {Duplex}

47

*/

48

class CopyBothQueryStream {

49

/**

50

* @param {string} text - The SQL COPY statement

51

* @param {object} [options] - Stream options

52

*/

53

constructor(text, options) {}

54

55

/** @type {string} The SQL COPY statement */

56

text;

57

58

/** @type {object} PostgreSQL connection reference */

59

connection;

60

61

/** @type {boolean} Whether to align data on COPY data frame boundaries */

62

alignOnCopyDataFrame;

63

64

/** @type {Array} Internal buffer for writable data chunks */

65

chunks;

66

}

67

```

68

69

### Stream Submission

70

71

Submits the COPY BOTH query to a PostgreSQL connection.

72

73

```javascript { .api }

74

/**

75

* Submits the COPY BOTH query to a PostgreSQL connection

76

* @param {object} connection - pg connection object

77

*/

78

submit(connection);

79

```

80

81

**Usage Example:**

82

83

```javascript

84

const { Pool } = require('pg');

85

const { both: copyBoth } = require('pg-copy-streams');

86

87

const pool = new Pool();

88

const client = await pool.connect();

89

try {

90

// Set up logical replication

91

await client.query("SELECT pg_create_logical_replication_slot('my_slot', 'test_decoding')");

92

93

const stream = client.query(copyBoth("START_REPLICATION SLOT my_slot LOGICAL 0/0"));

94

95

// Read replication data

96

stream.on('data', (chunk) => {

97

console.log('Replication data:', chunk.toString());

98

});

99

100

// Send replication feedback

101

const feedback = Buffer.from('feedback_data');

102

stream.write(feedback);

103

104

} finally {

105

client.release();

106

}

107

```

108

109

### Error Handling

110

111

Handles errors during the COPY BOTH operation.

112

113

```javascript { .api }

114

/**

115

* Handles errors during stream processing

116

* @param {Error} err - Error object

117

*/

118

handleError(err);

119

```

120

121

### CopyData Handler

122

123

Handles out-of-band CopyData messages received after CopyDone.

124

125

```javascript { .api }

126

/**

127

* Handles out-of-band CopyData messages

128

* @param {Buffer} chunk - Data chunk received

129

*/

130

handleCopyData(chunk);

131

```

132

133

### Command Completion

134

135

Called when the command completes.

136

137

```javascript { .api }

138

/**

139

* Called when command completes

140

*/

141

handleCommandComplete();

142

```

143

144

### Ready for Query

145

146

Called when the connection is ready for the next query.

147

148

```javascript { .api }

149

/**

150

* Called when connection is ready for next query

151

*/

152

handleReadyForQuery();

153

```

154

155

## Stream Events

156

157

As a duplex stream, CopyBothQueryStream emits both readable and writable stream events:

158

159

```javascript { .api }

160

/**

161

* Duplex stream events (readable side)

162

*/

163

stream.on('data', (chunk) => {

164

// Received data from PostgreSQL (replication data, etc.)

165

console.log(`Received ${chunk.length} bytes from PostgreSQL`);

166

});

167

168

stream.on('end', () => {

169

// No more data will be received from PostgreSQL

170

console.log('Reading side ended');

171

});

172

173

/**

174

* Duplex stream events (writable side)

175

*/

176

stream.on('drain', () => {

177

// Stream is ready to accept more data after being paused

178

console.log('Stream ready for more data');

179

});

180

181

stream.on('finish', () => {

182

// All data has been written to PostgreSQL

183

console.log('Writing side finished');

184

});

185

186

/**

187

* Common stream events

188

*/

189

stream.on('error', (err) => {

190

// Error occurred during streaming

191

console.error('Stream error:', err.message);

192

});

193

194

stream.on('close', () => {

195

// Stream has been closed

196

console.log('Stream closed');

197

});

198

```

199

200

## Advanced Usage

201

202

### Logical Replication with Feedback

203

204

```javascript

205

const { both: copyBoth } = require('pg-copy-streams');

206

207

const client = await pool.connect();

208

209

// Create replication slot

210

await client.query("SELECT pg_create_logical_replication_slot('my_app_slot', 'test_decoding')");

211

212

const stream = client.query(copyBoth("START_REPLICATION SLOT my_app_slot LOGICAL 0/0"));

213

214

let lastLSN = null;

215

216

stream.on('data', (chunk) => {

217

const data = chunk.toString();

218

console.log('Logical replication data:', data);

219

220

// Parse LSN from replication message (simplified)

221

const lsnMatch = data.match(/LSN: ([0-9A-F\/]+)/);

222

if (lsnMatch) {

223

lastLSN = lsnMatch[1];

224

}

225

226

// Send feedback to acknowledge processing

227

if (lastLSN) {

228

const feedback = createStandbyStatusUpdate(lastLSN);

229

stream.write(feedback);

230

}

231

});

232

233

function createStandbyStatusUpdate(lsn) {

234

// Create PostgreSQL standby status update message

235

// This is a simplified example - real implementation needs proper message formatting

236

const buffer = Buffer.alloc(34);

237

buffer.writeUInt8(0x72, 0); // 'r' message type

238

// ... additional message formatting

239

return buffer;

240

}

241

```

242

243

### Frame Alignment Usage

244

245

```javascript

246

// With frame alignment enabled

247

const alignedStream = client.query(copyBoth(

248

"START_REPLICATION SLOT my_slot LOGICAL 0/0",

249

{ alignOnCopyDataFrame: true }

250

));

251

252

// Data will be aligned on COPY data frame boundaries

253

alignedStream.on('data', (chunk) => {

254

// chunk contains complete COPY data frames

255

console.log('Complete frame:', chunk.length);

256

});

257

258

// Without frame alignment (default)

259

const stream = client.query(copyBoth("START_REPLICATION SLOT my_slot LOGICAL 0/0"));

260

261

// Data may be received in partial frames

262

stream.on('data', (chunk) => {

263

// chunk may contain partial COPY data frames

264

console.log('Data chunk:', chunk.length);

265

});

266

```

267

268

### Bidirectional Communication

269

270

```javascript

271

const { both: copyBoth } = require('pg-copy-streams');

272

const { Transform } = require('stream');

273

274

const client = await pool.connect();

275

const stream = client.query(copyBoth("START_REPLICATION SLOT my_slot LOGICAL 0/0"));

276

277

// Process incoming replication data

278

const processor = new Transform({

279

transform(chunk, encoding, callback) {

280

const data = chunk.toString();

281

282

// Process replication messages

283

if (data.includes('BEGIN')) {

284

console.log('Transaction started');

285

} else if (data.includes('COMMIT')) {

286

console.log('Transaction committed');

287

288

// Send acknowledgment

289

const ack = createAckMessage();

290

this.push(ack);

291

}

292

293

callback();

294

}

295

});

296

297

// Set up bidirectional pipeline

298

stream.pipe(processor).pipe(stream, { end: false });

299

300

function createAckMessage() {

301

// Create acknowledgment message

302

return Buffer.from('ACK\n');

303

}

304

```

305

306

## Options

307

308

### alignOnCopyDataFrame

309

310

Controls whether data is aligned on COPY data frame boundaries.

311

312

```javascript { .api }

313

/**

314

* @typedef {object} CopyBothOptions

315

* @property {boolean} [alignOnCopyDataFrame=false] - Whether to align data on COPY data frame boundaries

316

* @property {number} [highWaterMark] - Stream high water mark

317

* @property {boolean} [objectMode] - Object mode flag

318

* @property {string} [encoding] - Stream encoding

319

*/

320

```

321

322

When `alignOnCopyDataFrame` is `true`:

323

- Data is buffered until complete COPY data frames are available

324

- Each 'data' event contains complete frames

325

- Higher memory usage but easier parsing

326

327

When `alignOnCopyDataFrame` is `false` (default):

328

- Data is forwarded as soon as it arrives

329

- More efficient memory usage

330

- May require frame boundary handling in application code

331

332

## Important Notes

333

334

- COPY BOTH is primarily used for PostgreSQL replication scenarios

335

- This is an advanced feature requiring knowledge of PostgreSQL replication protocols

336

- Logical replication requires appropriate PostgreSQL configuration (`wal_level = logical`)

337

- Replication slots must be created before use and cleaned up after use

338

- The stream handles both readable and writable operations simultaneously

339

- Proper error handling is crucial as replication operations can be long-running

340

- Frame alignment affects memory usage and parsing complexity