0
# Query Streaming
1
2
Node.js Readable stream interface for PostgreSQL query results, enabling memory-efficient processing of large result sets through standard stream APIs.
3
4
## Capabilities
5
6
### QueryStream Constructor
7
8
Creates a readable stream for PostgreSQL query results.
9
10
```javascript { .api }
11
/**
12
* Create a new query stream
13
* @param text - SQL query text
14
* @param values - Optional parameter values
15
* @param config - Optional stream configuration
16
*/
17
class QueryStream extends Readable {
18
constructor(text: string, values?: any[], config?: QueryStreamConfig);
19
}
20
21
interface QueryStreamConfig {
22
/** Number of rows to fetch per batch (overrides highWaterMark) */
23
batchSize?: number;
24
/** Stream's high water mark (default: 100) */
25
highWaterMark?: number;
26
/** Return rows as arrays instead of objects */
27
rowMode?: 'array';
28
/** Custom type parser object */
29
types?: any;
30
}
31
```
32
33
**Usage Examples:**
34
35
```javascript
36
const QueryStream = require('pg-query-stream');
37
const { Client } = require('pg');
38
39
const client = new Client();
40
await client.connect();
41
42
// Basic query stream
43
const stream1 = new QueryStream('SELECT * FROM users');
44
45
// Parameterized query stream
46
const stream2 = new QueryStream('SELECT * FROM orders WHERE user_id = $1', [123]);
47
48
// Configured stream
49
const stream3 = new QueryStream('SELECT * FROM large_table', [], {
50
batchSize: 1000,
51
rowMode: 'array'
52
});
53
54
// Execute stream
55
const queryStream = client.query(stream1);
56
```
57
58
### Stream Processing
59
60
Process query results using standard Node.js stream APIs.
61
62
```javascript { .api }
63
// Standard Readable stream events
64
stream.on('data', (row) => {}); // Each row as it arrives
65
stream.on('end', () => {}); // All rows processed
66
stream.on('error', (err) => {}); // Stream error occurred
67
stream.on('close', () => {}); // Stream closed
68
stream.on('readable', () => {}); // Data available to read
69
70
// Stream control methods
71
stream.read(size?: number): any | null;
72
stream.pipe(destination: Writable): Writable;
73
stream.destroy(error?: Error): void;
74
stream.pause(): void;
75
stream.resume(): void;
76
```
77
78
**Usage Examples:**
79
80
```javascript
81
// Event-driven processing
82
const stream = client.query(new QueryStream('SELECT * FROM users'));
83
84
stream.on('data', (row) => {
85
console.log('User:', row.name, row.email);
86
});
87
88
stream.on('end', () => {
89
console.log('Finished processing all users');
90
});
91
92
stream.on('error', (err) => {
93
console.error('Query error:', err);
94
});
95
96
// Manual reading
97
stream.on('readable', () => {
98
let row;
99
while ((row = stream.read()) !== null) {
100
processRow(row);
101
}
102
});
103
```
104
105
### Async Iteration
106
107
Use modern async iteration for clean streaming code (Node.js ≥10).
108
109
```javascript
110
// Async iteration
111
const stream = client.query(new QueryStream('SELECT * FROM products'));
112
113
for await (const row of stream) {
114
console.log('Product:', row.name, row.price);
115
116
// Can break early if needed
117
if (row.discontinued) break;
118
}
119
120
console.log('Done processing products');
121
```
122
123
**Usage Examples:**
124
125
```javascript
126
// ETL processing with async iteration
127
async function processLargeDataset() {
128
const stream = client.query(new QueryStream(
129
'SELECT * FROM raw_data WHERE processed = false ORDER BY created_at'
130
));
131
132
let processedCount = 0;
133
134
try {
135
for await (const row of stream) {
136
// Transform data
137
const transformed = transformData(row);
138
139
// Load to destination
140
await insertIntoDestination(transformed);
141
142
// Mark as processed
143
await client.query('UPDATE raw_data SET processed = true WHERE id = $1', [row.id]);
144
145
processedCount++;
146
147
if (processedCount % 1000 === 0) {
148
console.log(`Processed ${processedCount} records...`);
149
}
150
}
151
} catch (err) {
152
console.error('Processing failed:', err);
153
}
154
155
console.log(`Total processed: ${processedCount} records`);
156
}
157
```
158
159
### Stream Piping
160
161
Pipe query results to other streams for processing.
162
163
```javascript
164
const fs = require('fs');
165
const { Transform } = require('stream');
166
167
// Create transform stream
168
const csvTransform = new Transform({
169
objectMode: true,
170
transform(row, encoding, callback) {
171
const csvLine = `${row.id},${row.name},${row.email}\n`;
172
callback(null, csvLine);
173
}
174
});
175
176
// Pipe query to CSV file
177
const stream = client.query(new QueryStream('SELECT id, name, email FROM users'));
178
const writeStream = fs.createWriteStream('users.csv');
179
180
// Write CSV header
181
writeStream.write('id,name,email\n');
182
183
// Pipe data
184
stream
185
.pipe(csvTransform)
186
.pipe(writeStream);
187
188
// Handle completion
189
writeStream.on('finish', () => {
190
console.log('CSV export completed');
191
});
192
```
193
194
**Pipeline Processing:**
195
196
```javascript
197
const { pipeline } = require('stream');
198
const { promisify } = require('util');
199
const pipelineAsync = promisify(pipeline);
200
201
// Create processing pipeline
202
async function exportToCSV() {
203
const queryStream = client.query(new QueryStream('SELECT * FROM orders'));
204
205
const transformStream = new Transform({
206
objectMode: true,
207
transform(order, encoding, callback) {
208
const csvRow = `${order.id},${order.customer_id},${order.total},${order.created_at}\n`;
209
callback(null, csvRow);
210
}
211
});
212
213
const writeStream = fs.createWriteStream('orders.csv');
214
writeStream.write('id,customer_id,total,created_date\n');
215
216
try {
217
await pipelineAsync(queryStream, transformStream, writeStream);
218
console.log('Export completed successfully');
219
} catch (err) {
220
console.error('Export failed:', err);
221
}
222
}
223
```
224
225
## Configuration Options
226
227
### Batch Size and High Water Mark
228
229
Control memory usage and performance characteristics.
230
231
```javascript
232
// Large batch size for high-throughput scenarios
233
const fastStream = new QueryStream('SELECT * FROM logs', [], {
234
batchSize: 5000 // Fetch 5000 rows at a time
235
});
236
237
// Small batch size for real-time processing
238
const realtimeStream = new QueryStream('SELECT * FROM events', [], {
239
batchSize: 10 // Process rows as they arrive
240
});
241
242
// Direct high water mark control
243
const customStream = new QueryStream('SELECT * FROM data', [], {
244
highWaterMark: 50 // Buffer up to 50 rows
245
});
246
```
247
248
### Row Mode Configuration
249
250
Control how rows are returned from the stream.
251
252
```javascript
253
// Object mode (default)
254
const objectStream = new QueryStream('SELECT id, name FROM users');
255
// Emits: { id: 1, name: 'Alice' }
256
257
// Array mode
258
const arrayStream = new QueryStream('SELECT id, name FROM users', [], {
259
rowMode: 'array'
260
});
261
// Emits: [1, 'Alice']
262
263
// Array mode with ordered columns
264
for await (const [id, name, email] of arrayStream) {
265
console.log(`User ${id}: ${name} (${email})`);
266
}
267
```
268
269
### Custom Type Parsing
270
271
Use custom type parsers for specialized data handling.
272
273
```javascript
274
const customTypes = {
275
getTypeParser: (oid, format) => {
276
// Custom JSON parsing
277
if (oid === 114) { // JSON type
278
return (val) => {
279
try {
280
return JSON.parse(val);
281
} catch {
282
return val; // Return as string if parsing fails
283
}
284
};
285
}
286
287
// Custom timestamp parsing
288
if (oid === 1184) { // TIMESTAMPTZ
289
return (val) => new Date(val);
290
}
291
292
// Default parser
293
return require('pg').types.getTypeParser(oid, format);
294
}
295
};
296
297
const stream = new QueryStream('SELECT metadata, created_at FROM events', [], {
298
types: customTypes
299
});
300
```
301
302
## Usage Patterns
303
304
### Large Data Export
305
306
Export large datasets without memory overflow.
307
308
```javascript
309
async function exportUserData() {
310
const stream = client.query(new QueryStream(
311
'SELECT * FROM users ORDER BY created_at',
312
[],
313
{ batchSize: 2000 }
314
));
315
316
const writeStream = fs.createWriteStream('users_export.json');
317
writeStream.write('[\n');
318
319
let first = true;
320
321
for await (const user of stream) {
322
if (!first) writeStream.write(',\n');
323
writeStream.write(JSON.stringify(user));
324
first = false;
325
}
326
327
writeStream.write('\n]');
328
writeStream.end();
329
330
console.log('Export completed');
331
}
332
```
333
334
### Real-time Processing
335
336
Process data in real-time with small batches.
337
338
```javascript
339
async function processRecentEvents() {
340
const stream = client.query(new QueryStream(
341
'SELECT * FROM events WHERE created_at > NOW() - INTERVAL \'1 hour\'',
342
[],
343
{ batchSize: 1 } // Process immediately as available
344
));
345
346
for await (const event of stream) {
347
// Process each event immediately
348
await handleEvent(event);
349
350
// Update progress
351
await client.query(
352
'UPDATE event_processing SET last_processed_at = $1',
353
[event.created_at]
354
);
355
}
356
}
357
```
358
359
### Aggregation and Analysis
360
361
Stream processing for data analysis.
362
363
```javascript
364
async function analyzeUserBehavior() {
365
const stream = client.query(new QueryStream(
366
'SELECT user_id, action, timestamp FROM user_events ORDER BY timestamp'
367
));
368
369
const userSessions = new Map();
370
371
for await (const event of stream) {
372
const userId = event.user_id;
373
374
if (!userSessions.has(userId)) {
375
userSessions.set(userId, {
376
actions: [],
377
startTime: event.timestamp,
378
endTime: event.timestamp
379
});
380
}
381
382
const session = userSessions.get(userId);
383
session.actions.push(event.action);
384
session.endTime = event.timestamp;
385
386
// Finalize session if idle too long
387
const idleTime = Date.now() - new Date(event.timestamp).getTime();
388
if (idleTime > 30 * 60 * 1000) { // 30 minutes
389
await finalizeSession(userId, session);
390
userSessions.delete(userId);
391
}
392
}
393
394
// Finalize remaining sessions
395
for (const [userId, session] of userSessions) {
396
await finalizeSession(userId, session);
397
}
398
}
399
```
400
401
## Error Handling and Cleanup
402
403
### Stream Error Handling
404
405
```javascript
406
const stream = client.query(new QueryStream('SELECT * FROM users'));
407
408
stream.on('error', (err) => {
409
console.error('Stream error:', err);
410
411
// Connection is still usable after stream error
412
// Perform cleanup or retry logic here
413
});
414
415
// Promise-based error handling with async iteration
416
try {
417
for await (const row of stream) {
418
await processRow(row);
419
}
420
} catch (err) {
421
console.error('Processing error:', err);
422
}
423
```
424
425
### Graceful Shutdown
426
427
```javascript
428
let currentStream = null;
429
430
// Graceful shutdown handler
431
process.on('SIGTERM', () => {
432
if (currentStream) {
433
console.log('Destroying current stream...');
434
currentStream.destroy();
435
}
436
437
client.end();
438
});
439
440
// Usage with cleanup
441
async function processData() {
442
currentStream = client.query(new QueryStream('SELECT * FROM large_table'));
443
444
try {
445
for await (const row of currentStream) {
446
await processRow(row);
447
}
448
} finally {
449
currentStream = null;
450
}
451
}
452
```
453
454
## Performance Considerations
455
456
### Memory Usage
457
458
```javascript
459
// Memory-efficient: Small batch sizes
460
const memoryEfficientStream = new QueryStream('SELECT * FROM huge_table', [], {
461
batchSize: 100 // Only 100 rows in memory at once
462
});
463
464
// High-throughput: Larger batch sizes
465
const highThroughputStream = new QueryStream('SELECT * FROM huge_table', [], {
466
batchSize: 10000 // Better performance, more memory usage
467
});
468
```
469
470
### Connection Pool Usage
471
472
```javascript
473
const { Pool } = require('pg');
474
const pool = new Pool();
475
476
async function streamProcessing() {
477
const client = await pool.connect();
478
479
try {
480
const stream = client.query(new QueryStream('SELECT * FROM data'));
481
482
for await (const row of stream) {
483
await processRow(row);
484
}
485
486
} finally {
487
client.release(); // Return to pool
488
}
489
}
490
```