or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client.mdconnection-string.mdcursor.mdindex.mdpool.mdquery-stream.mdquery.mdtypes.mdutilities.md

query-stream.mddocs/

0

# Query Streaming

1

2

Node.js Readable stream interface for PostgreSQL query results, enabling memory-efficient processing of large result sets through standard stream APIs.

3

4

## Capabilities

5

6

### QueryStream Constructor

7

8

Creates a readable stream for PostgreSQL query results.

9

10

```javascript { .api }

11

/**

12

* Create a new query stream

13

* @param text - SQL query text

14

* @param values - Optional parameter values

15

* @param config - Optional stream configuration

16

*/

17

class QueryStream extends Readable {

18

constructor(text: string, values?: any[], config?: QueryStreamConfig);

19

}

20

21

interface QueryStreamConfig {

22

/** Number of rows to fetch per batch (overrides highWaterMark) */

23

batchSize?: number;

24

/** Stream's high water mark (default: 100) */

25

highWaterMark?: number;

26

/** Return rows as arrays instead of objects */

27

rowMode?: 'array';

28

/** Custom type parser object */

29

types?: any;

30

}

31

```

32

33

**Usage Examples:**

34

35

```javascript

36

const QueryStream = require('pg-query-stream');

37

const { Client } = require('pg');

38

39

const client = new Client();

40

await client.connect();

41

42

// Basic query stream

43

const stream1 = new QueryStream('SELECT * FROM users');

44

45

// Parameterized query stream

46

const stream2 = new QueryStream('SELECT * FROM orders WHERE user_id = $1', [123]);

47

48

// Configured stream

49

const stream3 = new QueryStream('SELECT * FROM large_table', [], {

50

batchSize: 1000,

51

rowMode: 'array'

52

});

53

54

// Execute stream

55

const queryStream = client.query(stream1);

56

```

57

58

### Stream Processing

59

60

Process query results using standard Node.js stream APIs.

61

62

```javascript { .api }

63

// Standard Readable stream events

64

stream.on('data', (row) => {}); // Each row as it arrives

65

stream.on('end', () => {}); // All rows processed

66

stream.on('error', (err) => {}); // Stream error occurred

67

stream.on('close', () => {}); // Stream closed

68

stream.on('readable', () => {}); // Data available to read

69

70

// Stream control methods

71

stream.read(size?: number): any | null;

72

stream.pipe(destination: Writable): Writable;

73

stream.destroy(error?: Error): void;

74

stream.pause(): void;

75

stream.resume(): void;

76

```

77

78

**Usage Examples:**

79

80

```javascript

81

// Event-driven processing

82

const stream = client.query(new QueryStream('SELECT * FROM users'));

83

84

stream.on('data', (row) => {

85

console.log('User:', row.name, row.email);

86

});

87

88

stream.on('end', () => {

89

console.log('Finished processing all users');

90

});

91

92

stream.on('error', (err) => {

93

console.error('Query error:', err);

94

});

95

96

// Manual reading

97

stream.on('readable', () => {

98

let row;

99

while ((row = stream.read()) !== null) {

100

processRow(row);

101

}

102

});

103

```

104

105

### Async Iteration

106

107

Use modern async iteration for clean streaming code (Node.js ≥10).

108

109

```javascript

110

// Async iteration

111

const stream = client.query(new QueryStream('SELECT * FROM products'));

112

113

for await (const row of stream) {

114

console.log('Product:', row.name, row.price);

115

116

// Can break early if needed

117

if (row.discontinued) break;

118

}

119

120

console.log('Done processing products');

121

```

122

123

**Usage Examples:**

124

125

```javascript

126

// ETL processing with async iteration

127

async function processLargeDataset() {

128

const stream = client.query(new QueryStream(

129

'SELECT * FROM raw_data WHERE processed = false ORDER BY created_at'

130

));

131

132

let processedCount = 0;

133

134

try {

135

for await (const row of stream) {

136

// Transform data

137

const transformed = transformData(row);

138

139

// Load to destination

140

await insertIntoDestination(transformed);

141

142

// Mark as processed

143

await client.query('UPDATE raw_data SET processed = true WHERE id = $1', [row.id]);

144

145

processedCount++;

146

147

if (processedCount % 1000 === 0) {

148

console.log(`Processed ${processedCount} records...`);

149

}

150

}

151

} catch (err) {

152

console.error('Processing failed:', err);

153

}

154

155

console.log(`Total processed: ${processedCount} records`);

156

}

157

```

158

159

### Stream Piping

160

161

Pipe query results to other streams for processing.

162

163

```javascript

164

const fs = require('fs');

165

const { Transform } = require('stream');

166

167

// Create transform stream

168

const csvTransform = new Transform({

169

objectMode: true,

170

transform(row, encoding, callback) {

171

const csvLine = `${row.id},${row.name},${row.email}\n`;

172

callback(null, csvLine);

173

}

174

});

175

176

// Pipe query to CSV file

177

const stream = client.query(new QueryStream('SELECT id, name, email FROM users'));

178

const writeStream = fs.createWriteStream('users.csv');

179

180

// Write CSV header

181

writeStream.write('id,name,email\n');

182

183

// Pipe data

184

stream

185

.pipe(csvTransform)

186

.pipe(writeStream);

187

188

// Handle completion

189

writeStream.on('finish', () => {

190

console.log('CSV export completed');

191

});

192

```

193

194

**Pipeline Processing:**

195

196

```javascript

197

const { pipeline } = require('stream');

198

const { promisify } = require('util');

199

const pipelineAsync = promisify(pipeline);

200

201

// Create processing pipeline

202

async function exportToCSV() {

203

const queryStream = client.query(new QueryStream('SELECT * FROM orders'));

204

205

const transformStream = new Transform({

206

objectMode: true,

207

transform(order, encoding, callback) {

208

const csvRow = `${order.id},${order.customer_id},${order.total},${order.created_at}\n`;

209

callback(null, csvRow);

210

}

211

});

212

213

const writeStream = fs.createWriteStream('orders.csv');

214

writeStream.write('id,customer_id,total,created_date\n');

215

216

try {

217

await pipelineAsync(queryStream, transformStream, writeStream);

218

console.log('Export completed successfully');

219

} catch (err) {

220

console.error('Export failed:', err);

221

}

222

}

223

```

224

225

## Configuration Options

226

227

### Batch Size and High Water Mark

228

229

Control memory usage and performance characteristics.

230

231

```javascript

232

// Large batch size for high-throughput scenarios

233

const fastStream = new QueryStream('SELECT * FROM logs', [], {

234

batchSize: 5000 // Fetch 5000 rows at a time

235

});

236

237

// Small batch size for real-time processing

238

const realtimeStream = new QueryStream('SELECT * FROM events', [], {

239

batchSize: 10 // Process rows as they arrive

240

});

241

242

// Direct high water mark control

243

const customStream = new QueryStream('SELECT * FROM data', [], {

244

highWaterMark: 50 // Buffer up to 50 rows

245

});

246

```

247

248

### Row Mode Configuration

249

250

Control how rows are returned from the stream.

251

252

```javascript

253

// Object mode (default)

254

const objectStream = new QueryStream('SELECT id, name FROM users');

255

// Emits: { id: 1, name: 'Alice' }

256

257

// Array mode

258

const arrayStream = new QueryStream('SELECT id, name FROM users', [], {

259

rowMode: 'array'

260

});

261

// Emits: [1, 'Alice']

262

263

// Array mode with ordered columns

264

for await (const [id, name, email] of arrayStream) {

265

console.log(`User ${id}: ${name} (${email})`);

266

}

267

```

268

269

### Custom Type Parsing

270

271

Use custom type parsers for specialized data handling.

272

273

```javascript

274

const customTypes = {

275

getTypeParser: (oid, format) => {

276

// Custom JSON parsing

277

if (oid === 114) { // JSON type

278

return (val) => {

279

try {

280

return JSON.parse(val);

281

} catch {

282

return val; // Return as string if parsing fails

283

}

284

};

285

}

286

287

// Custom timestamp parsing

288

if (oid === 1184) { // TIMESTAMPTZ

289

return (val) => new Date(val);

290

}

291

292

// Default parser

293

return require('pg').types.getTypeParser(oid, format);

294

}

295

};

296

297

const stream = new QueryStream('SELECT metadata, created_at FROM events', [], {

298

types: customTypes

299

});

300

```

301

302

## Usage Patterns

303

304

### Large Data Export

305

306

Export large datasets without memory overflow.

307

308

```javascript

309

async function exportUserData() {

310

const stream = client.query(new QueryStream(

311

'SELECT * FROM users ORDER BY created_at',

312

[],

313

{ batchSize: 2000 }

314

));

315

316

const writeStream = fs.createWriteStream('users_export.json');

317

writeStream.write('[\n');

318

319

let first = true;

320

321

for await (const user of stream) {

322

if (!first) writeStream.write(',\n');

323

writeStream.write(JSON.stringify(user));

324

first = false;

325

}

326

327

writeStream.write('\n]');

328

writeStream.end();

329

330

console.log('Export completed');

331

}

332

```

333

334

### Real-time Processing

335

336

Process data in real-time with small batches.

337

338

```javascript

339

async function processRecentEvents() {

340

const stream = client.query(new QueryStream(

341

'SELECT * FROM events WHERE created_at > NOW() - INTERVAL \'1 hour\'',

342

[],

343

{ batchSize: 1 } // Process immediately as available

344

));

345

346

for await (const event of stream) {

347

// Process each event immediately

348

await handleEvent(event);

349

350

// Update progress

351

await client.query(

352

'UPDATE event_processing SET last_processed_at = $1',

353

[event.created_at]

354

);

355

}

356

}

357

```

358

359

### Aggregation and Analysis

360

361

Stream processing for data analysis.

362

363

```javascript

364

async function analyzeUserBehavior() {

365

const stream = client.query(new QueryStream(

366

'SELECT user_id, action, timestamp FROM user_events ORDER BY timestamp'

367

));

368

369

const userSessions = new Map();

370

371

for await (const event of stream) {

372

const userId = event.user_id;

373

374

if (!userSessions.has(userId)) {

375

userSessions.set(userId, {

376

actions: [],

377

startTime: event.timestamp,

378

endTime: event.timestamp

379

});

380

}

381

382

const session = userSessions.get(userId);

383

session.actions.push(event.action);

384

session.endTime = event.timestamp;

385

386

// Finalize session if idle too long

387

const idleTime = Date.now() - new Date(event.timestamp).getTime();

388

if (idleTime > 30 * 60 * 1000) { // 30 minutes

389

await finalizeSession(userId, session);

390

userSessions.delete(userId);

391

}

392

}

393

394

// Finalize remaining sessions

395

for (const [userId, session] of userSessions) {

396

await finalizeSession(userId, session);

397

}

398

}

399

```

400

401

## Error Handling and Cleanup

402

403

### Stream Error Handling

404

405

```javascript

406

const stream = client.query(new QueryStream('SELECT * FROM users'));

407

408

stream.on('error', (err) => {

409

console.error('Stream error:', err);

410

411

// Connection is still usable after stream error

412

// Perform cleanup or retry logic here

413

});

414

415

// Promise-based error handling with async iteration

416

try {

417

for await (const row of stream) {

418

await processRow(row);

419

}

420

} catch (err) {

421

console.error('Processing error:', err);

422

}

423

```

424

425

### Graceful Shutdown

426

427

```javascript

428

let currentStream = null;

429

430

// Graceful shutdown handler

431

process.on('SIGTERM', () => {

432

if (currentStream) {

433

console.log('Destroying current stream...');

434

currentStream.destroy();

435

}

436

437

client.end();

438

});

439

440

// Usage with cleanup

441

async function processData() {

442

currentStream = client.query(new QueryStream('SELECT * FROM large_table'));

443

444

try {

445

for await (const row of currentStream) {

446

await processRow(row);

447

}

448

} finally {

449

currentStream = null;

450

}

451

}

452

```

453

454

## Performance Considerations

455

456

### Memory Usage

457

458

```javascript

459

// Memory-efficient: Small batch sizes

460

const memoryEfficientStream = new QueryStream('SELECT * FROM huge_table', [], {

461

batchSize: 100 // Only 100 rows in memory at once

462

});

463

464

// High-throughput: Larger batch sizes

465

const highThroughputStream = new QueryStream('SELECT * FROM huge_table', [], {

466

batchSize: 10000 // Better performance, more memory usage

467

});

468

```

469

470

### Connection Pool Usage

471

472

```javascript

473

const { Pool } = require('pg');

474

const pool = new Pool();

475

476

async function streamProcessing() {

477

const client = await pool.connect();

478

479

try {

480

const stream = client.query(new QueryStream('SELECT * FROM data'));

481

482

for await (const row of stream) {

483

await processRow(row);

484

}

485

486

} finally {

487

client.release(); // Return to pool

488

}

489

}

490

```