or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

copy-both.mdcopy-from.mdcopy-to.mdindex.md

index.mddocs/

0

# pg-copy-streams

1

2

pg-copy-streams provides low-level streaming interfaces for PostgreSQL's COPY TO and COPY FROM operations. It enables high-performance data transfer directly between Node.js applications and PostgreSQL databases through three specialized stream types: readable streams for exporting data, writable streams for importing data, and duplex streams for bidirectional operations.

3

4

## Package Information

5

6

- **Package Name**: pg-copy-streams

7

- **Package Type**: npm

8

- **Language**: JavaScript

9

- **Installation**: `npm install pg-copy-streams`

10

11

## Core Imports

12

13

CommonJS (Node.js default):

14

15

```javascript

16

const { to, from, both } = require('pg-copy-streams');

17

18

// Alternative destructuring with aliases

19

const { to: copyTo, from: copyFrom, both: copyBoth } = require('pg-copy-streams');

20

21

// Import entire module

22

const pgCopyStreams = require('pg-copy-streams');

23

const copyTo = pgCopyStreams.to;

24

```

25

26

ES Modules (when using "type": "module" in package.json):

27

28

```javascript

29

import { to, from, both } from 'pg-copy-streams';

30

31

// Alternative with aliases

32

import { to as copyTo, from as copyFrom, both as copyBoth } from 'pg-copy-streams';

33

34

// Default import (not available - module uses named exports only)

35

// import pgCopyStreams from 'pg-copy-streams'; // ❌ This won't work

36

```

37

38

## Basic Usage

39

40

```javascript

41

const { Pool } = require('pg');

42

const { to: copyTo, from: copyFrom } = require('pg-copy-streams');

43

44

const pool = new Pool();

45

46

// Export data from PostgreSQL (COPY TO)

47

pool.connect((err, client, done) => {

48

const stream = client.query(copyTo('COPY my_table TO STDOUT'));

49

stream.pipe(process.stdout);

50

stream.on('end', done);

51

stream.on('error', done);

52

});

53

54

// Import data into PostgreSQL (COPY FROM)

55

const fs = require('fs');

56

pool.connect((err, client, done) => {

57

const stream = client.query(copyFrom('COPY my_table FROM STDIN'));

58

const fileStream = fs.createReadStream('data.csv');

59

fileStream.pipe(stream);

60

stream.on('finish', done);

61

stream.on('error', done);

62

});

63

```

64

65

## Architecture

66

67

pg-copy-streams is built around three core components:

68

69

- **Stream Factory Functions**: Entry-point functions (`to`, `from`, `both`) that create specialized stream instances

70

- **Stream Classes**: Three stream implementations extending Node.js stream classes (Readable, Writable, Duplex)

71

- **Protocol Integration**: Low-level PostgreSQL protocol handling for COPY operations with message parsing and connection management

72

- **Node-postgres Integration**: Seamless integration with the `pg` library connection and query system

73

74

## Capabilities

75

76

### COPY TO Operations (Data Export)

77

78

Creates readable streams for exporting data from PostgreSQL tables to external destinations. Ideal for backup operations, data extraction, and ETL processes.

79

80

```javascript { .api }

81

/**

82

* Creates a readable stream for COPY TO operations

83

* @param {string} text - SQL COPY TO statement

84

* @param {object} [options] - Optional stream configuration

85

* @returns {CopyToQueryStream} Readable stream instance

86

*/

87

function to(text, options);

88

```

89

90

[COPY TO Operations](./copy-to.md)

91

92

### COPY FROM Operations (Data Import)

93

94

Creates writable streams for importing data from external sources into PostgreSQL tables. Perfect for bulk data loading, CSV imports, and ETL pipelines.

95

96

```javascript { .api }

97

/**

98

* Creates a writable stream for COPY FROM operations

99

* @param {string} text - SQL COPY FROM statement

100

* @param {object} [options] - Optional stream configuration

101

* @returns {CopyFromQueryStream} Writable stream instance

102

*/

103

function from(text, options);

104

```

105

106

[COPY FROM Operations](./copy-from.md)

107

108

### COPY BOTH Operations (Bidirectional Streaming)

109

110

Creates duplex streams for bidirectional data operations, primarily used for replication and logical decoding scenarios.

111

112

```javascript { .api }

113

/**

114

* Creates a duplex stream for COPY BOTH operations

115

* @param {string} text - SQL COPY statement

116

* @param {object} [options] - Optional stream configuration

117

* @returns {CopyBothQueryStream} Duplex stream instance

118

*/

119

function both(text, options);

120

```

121

122

[COPY BOTH Operations](./copy-both.md)

123

124

## Types

125

126

```javascript { .api }

127

/**

128

* @typedef {object} StreamOptions

129

* @property {number} [highWaterMark] - Stream high water mark

130

* @property {boolean} [objectMode] - Object mode flag

131

* @property {string} [encoding] - Stream encoding

132

*/

133

134

/**

135

* @typedef {StreamOptions} CopyBothOptions

136

* @property {boolean} [alignOnCopyDataFrame] - Whether to align data on COPY data frame boundaries

137

*/

138

139

/**

140

* CopyToQueryStream - Readable stream for COPY TO operations

141

* @class

142

* @extends {Readable}

143

*/

144

class CopyToQueryStream {

145

/**

146

* @param {string} text - The SQL COPY TO statement

147

* @param {StreamOptions} [options] - Stream options

148

*/

149

constructor(text, options) {}

150

151

/** @type {string} The SQL COPY statement */

152

text;

153

154

/** @type {number} Number of rows processed */

155

rowCount;

156

157

/** @type {object} PostgreSQL connection reference */

158

connection;

159

160

/**

161

* Submits the query to a PostgreSQL connection

162

* @param {object} connection - pg connection object

163

*/

164

submit(connection) {}

165

166

/**

167

* Handles errors during stream processing

168

* @param {Error} err - Error object

169

*/

170

handleError(err) {}

171

172

/**

173

* Processes CommandComplete message and extracts row count

174

* @param {object} msg - Message object with text property

175

*/

176

handleCommandComplete(msg) {}

177

178

/**

179

* Called when connection is ready for next query

180

*/

181

handleReadyForQuery() {}

182

}

183

184

/**

185

* CopyFromQueryStream - Writable stream for COPY FROM operations

186

* @class

187

* @extends {Writable}

188

*/

189

class CopyFromQueryStream {

190

/**

191

* @param {string} text - The SQL COPY FROM statement

192

* @param {StreamOptions} [options] - Stream options

193

*/

194

constructor(text, options) {}

195

196

/** @type {string} The SQL COPY statement */

197

text;

198

199

/** @type {number} Number of rows processed */

200

rowCount;

201

202

/** @type {object} PostgreSQL connection reference */

203

connection;

204

205

/**

206

* Submits the query to a PostgreSQL connection

207

* @param {object} connection - pg connection object

208

*/

209

submit(connection) {}

210

211

/**

212

* Callback placeholder for pg timeout mechanism

213

*/

214

callback() {}

215

216

/**

217

* Handles errors during stream processing

218

* @param {Error} e - Error object

219

*/

220

handleError(e) {}

221

222

/**

223

* Handles CopyInResponse from PostgreSQL backend

224

* @param {object} connection - pg connection object

225

*/

226

handleCopyInResponse(connection) {}

227

228

/**

229

* Processes CommandComplete message and extracts row count

230

* @param {object} msg - Message object with text property

231

*/

232

handleCommandComplete(msg) {}

233

234

/**

235

* Called when connection is ready for next query

236

*/

237

handleReadyForQuery() {}

238

}

239

240

/**

241

* CopyBothQueryStream - Duplex stream for COPY BOTH operations

242

* @class

243

* @extends {Duplex}

244

*/

245

class CopyBothQueryStream {

246

/**

247

* @param {string} text - The SQL COPY statement

248

* @param {CopyBothOptions} [options] - Stream options

249

*/

250

constructor(text, options) {}

251

252

/** @type {string} The SQL COPY statement */

253

text;

254

255

/** @type {object} PostgreSQL connection reference */

256

connection;

257

258

/** @type {boolean} Whether to align data on COPY data frame boundaries */

259

alignOnCopyDataFrame;

260

261

/**

262

* Submits the query to a PostgreSQL connection

263

* @param {object} connection - pg connection object

264

*/

265

submit(connection) {}

266

267

/**

268

* Handles errors during stream processing

269

* @param {Error} err - Error object

270

*/

271

handleError(err) {}

272

273

/**

274

* Handles out-of-band CopyData messages

275

* @param {Buffer} chunk - Data chunk

276

*/

277

handleCopyData(chunk) {}

278

279

/**

280

* Called when command completes

281

*/

282

handleCommandComplete() {}

283

284

/**

285

* Called when connection is ready for next query

286

*/

287

handleReadyForQuery() {}

288

}

289

```

290

291

## Error Handling

292

293

pg-copy-streams provides comprehensive error handling for various failure scenarios:

294

295

### Common Error Types

296

297

- **Syntax Errors**: Invalid SQL COPY statements

298

- **Connection Errors**: Database connection failures

299

- **Data Format Errors**: Malformed data that PostgreSQL cannot process

300

- **Permission Errors**: Insufficient database permissions

301

- **Stream Errors**: Network interruptions or stream corruption

302

303

### Error Recovery

304

305

- **Automatic Rollback**: Failed COPY operations are automatically rolled back by PostgreSQL

306

- **CopyFail Messages**: Stream destruction automatically sends CopyFail to abort operations

307

- **Transaction Safety**: Operations within transactions maintain ACID properties

308

309

## Compatibility and Performance

310

311

### PostgreSQL Compatibility

312

313

- **PostgreSQL Versions**: Compatible with PostgreSQL 9.0+

314

- **Protocol Support**: Uses PostgreSQL's native COPY protocol for optimal performance

315

- **Pure JavaScript**: Works only with pure JavaScript `pg` driver (not `pg.native`)

316

317

### Performance Characteristics

318

319

- **High Throughput**: Direct protocol integration bypasses query parsing overhead

320

- **Memory Efficient**: Streaming operations avoid loading entire datasets into memory

321

- **Chunk Boundaries**: PostgreSQL chunks data on 64kB boundaries (may split rows)

322

- **Backpressure Handling**: Proper Node.js stream backpressure support

323

324

### Node.js Requirements

325

326

- **Node.js Version**: Requires Node.js 8.0+ (async/await support recommended)

327

- **Stream API**: Built on Node.js streams for maximum compatibility

328

- **Buffer Handling**: Uses modern Buffer allocation methods for security