or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

copy-both.mdcopy-from.mdcopy-to.mdindex.md

copy-from.mddocs/

0

# COPY FROM Operations (Data Import)

1

2

Creates writable streams for importing data from external sources into PostgreSQL tables. The COPY FROM operation provides high-performance bulk data loading directly into PostgreSQL without the overhead of individual INSERT statements.

3

4

## Capabilities

5

6

### From Function

7

8

Creates a writable stream for COPY FROM operations.

9

10

```javascript { .api }

11

/**

12

* Creates a writable stream for COPY FROM operations

13

* @param {string} text - SQL COPY FROM statement (e.g., 'COPY my_table FROM STDIN')

14

* @param {object} [options] - Optional stream configuration

15

* @returns {CopyFromQueryStream} CopyFromQueryStream instance

16

*/

17

function from(text, options);

18

```

19

20

**Usage Examples:**

21

22

```javascript

23

const { from: copyFrom } = require('pg-copy-streams');

24

const { Pool } = require('pg');

25

const fs = require('fs');

26

27

// Basic table import

28

const pool = new Pool();

29

const client = await pool.connect();

30

const stream = client.query(copyFrom('COPY users FROM STDIN'));

31

32

// Import CSV with headers

33

const csvStream = client.query(copyFrom("COPY products FROM STDIN WITH (FORMAT CSV, HEADER)"));

34

35

// Import with custom delimiter

36

const tsvStream = client.query(copyFrom("COPY logs FROM STDIN WITH (FORMAT TEXT, DELIMITER E'\\t')"));

37

```

38

39

### CopyFromQueryStream Class

40

41

Writable stream implementation for COPY FROM operations.

42

43

```javascript { .api }

44

/**

45

* CopyFromQueryStream - Writable stream for COPY FROM operations

46

* @class

47

* @extends {Writable}

48

*/

49

class CopyFromQueryStream {

50

/**

51

* @param {string} text - The SQL COPY FROM statement

52

* @param {object} [options] - Stream options

53

*/

54

constructor(text, options) {}

55

56

/** @type {string} The SQL COPY FROM statement */

57

text;

58

59

/** @type {number} Number of rows imported (available after completion) */

60

rowCount;

61

62

/** @type {object} PostgreSQL connection reference */

63

connection;

64

65

/** @type {Array} Internal buffer for data chunks */

66

chunks;

67

}

68

```

69

70

### Stream Submission

71

72

Submits the COPY FROM query to a PostgreSQL connection.

73

74

```javascript { .api }

75

/**

76

* Submits the COPY FROM query to a PostgreSQL connection

77

* @param {object} connection - pg connection object

78

*/

79

submit(connection);

80

```

81

82

**Usage Example:**

83

84

```javascript

85

const { Pool } = require('pg');

86

const { from: copyFrom } = require('pg-copy-streams');

87

const { pipeline } = require('stream/promises');

88

const fs = require('fs');

89

90

const pool = new Pool();

91

const client = await pool.connect();

92

try {

93

const ingestStream = client.query(copyFrom('COPY users FROM STDIN'));

94

const sourceStream = fs.createReadStream('users.csv');

95

96

await pipeline(sourceStream, ingestStream);

97

console.log(`Imported ${ingestStream.rowCount} rows`);

98

} finally {

99

client.release();

100

}

101

```

102

103

### Timeout Callback

104

105

Placeholder callback for pg timeout mechanism integration.

106

107

```javascript { .api }

108

/**

109

* Empty callback placeholder for pg timeout mechanism

110

* This is overwritten by pg when query_timeout config is set

111

*/

112

callback();

113

```

114

115

### Error Handling

116

117

Handles errors during the COPY FROM operation.

118

119

```javascript { .api }

120

/**

121

* Handles errors during stream processing

122

* @param {Error} e - Error object

123

*/

124

handleError(e);

125

```

126

127

### CopyIn Response Handler

128

129

Handles the CopyInResponse message from PostgreSQL backend.

130

131

```javascript { .api }

132

/**

133

* Handles CopyInResponse from PostgreSQL backend

134

* @param {object} connection - pg connection object

135

*/

136

handleCopyInResponse(connection);

137

```

138

139

### Command Completion

140

141

Processes the CommandComplete message and extracts row count.

142

143

```javascript { .api }

144

/**

145

* Processes CommandComplete message and extracts row count

146

* @param {object} msg - Message object with text property containing row count

147

*/

148

handleCommandComplete(msg);

149

```

150

151

### Ready for Query

152

153

Called when the connection is ready for the next query.

154

155

```javascript { .api }

156

/**

157

* Called when connection is ready for next query

158

*/

159

handleReadyForQuery();

160

```

161

162

## Stream Events

163

164

As a writable stream, CopyFromQueryStream emits standard Node.js stream events:

165

166

```javascript { .api }

167

/**

168

* Standard writable stream events

169

*/

170

stream.on('drain', () => {

171

// Stream is ready to accept more data after being paused

172

console.log('Stream ready for more data');

173

});

174

175

stream.on('finish', () => {

176

// All data has been written and processed by PostgreSQL

177

console.log(`Import completed. Rows: ${stream.rowCount}`);

178

});

179

180

stream.on('error', (err) => {

181

// Error occurred during streaming

182

console.error('Stream error:', err.message);

183

});

184

185

stream.on('close', () => {

186

// Stream has been closed

187

console.log('Stream closed');

188

});

189

190

stream.on('pipe', (src) => {

191

// A readable stream has been piped to this writable stream

192

console.log('Source stream piped to copy stream');

193

});

194

```

195

196

## Advanced Usage

197

198

### Stream Destruction and Error Recovery

199

200

```javascript

201

const { from: copyFrom } = require('pg-copy-streams');

202

203

const client = await pool.connect();

204

const ingestStream = client.query(copyFrom('COPY users FROM STDIN'));

205

206

// Handle stream destruction (sends CopyFail message)

207

ingestStream.on('error', (err) => {

208

console.error('Import failed:', err.message);

209

// Stream will automatically send CopyFail to rollback the operation

210

});

211

212

// Manually destroy stream if needed

213

setTimeout(() => {

214

ingestStream.destroy(new Error('Timeout exceeded'));

215

}, 30000);

216

```

217

218

### CSV Import with Transformation

219

220

```javascript

221

const { Transform } = require('stream');

222

const { pipeline } = require('stream/promises');

223

const csvParser = require('csv-parser');

224

225

const client = await pool.connect();

226

const ingestStream = client.query(copyFrom('COPY processed_users FROM STDIN'));

227

228

// Transform CSV data before import

229

const transformStream = new Transform({

230

objectMode: true,

231

transform(chunk, encoding, callback) {

232

// Convert parsed CSV object back to PostgreSQL text format

233

const line = `${chunk.name}\t${chunk.email}\t${chunk.age}\n`;

234

callback(null, line);

235

}

236

});

237

238

await pipeline(

239

fs.createReadStream('users.csv'),

240

csvParser(),

241

transformStream,

242

ingestStream

243

);

244

```

245

246

### Binary Format Import

247

248

```javascript

249

const binaryStream = client.query(copyFrom("COPY data_table FROM STDIN WITH (FORMAT BINARY)"));

250

251

// Binary data must be properly formatted according to PostgreSQL binary protocol

252

const binaryData = Buffer.from([

253

// PostgreSQL binary COPY format headers and data

254

// This requires knowledge of PostgreSQL's binary encoding

255

]);

256

257

binaryStream.write(binaryData);

258

binaryStream.end();

259

```

260

261

### Batch Processing with Multiple Files

262

263

```javascript

264

const { glob } = require('glob');

265

266

const files = await glob('data/*.csv');

267

const client = await pool.connect();

268

269

try {

270

await client.query('BEGIN');

271

272

for (const file of files) {

273

const ingestStream = client.query(copyFrom('COPY temp_import FROM STDIN WITH (FORMAT CSV)'));

274

const sourceStream = fs.createReadStream(file);

275

276

await pipeline(sourceStream, ingestStream);

277

console.log(`Imported ${ingestStream.rowCount} rows from ${file}`);

278

}

279

280

await client.query('COMMIT');

281

} catch (error) {

282

await client.query('ROLLBACK');

283

throw error;

284

} finally {

285

client.release();

286

}

287

```

288

289

## Important Notes

290

291

- Always use the 'finish' event (not 'end') to detect completion in versions 4.0.0+

292

- The `rowCount` property is only available after the stream finishes

293

- Streams automatically handle transaction rollback on errors via CopyFail messages

294

- Stream destruction sends CopyFail to PostgreSQL for graceful error handling

295

- Binary format imports require knowledge of PostgreSQL's binary encoding

296

- All operations are transactional - errors will not leave data in inconsistent state