Postgres query result returned as readable stream
npx @tessl/cli install tessl/npm-pg-query-stream@4.10.00
# pg-query-stream
1
2
pg-query-stream provides memory-efficient streaming of PostgreSQL query results using server-side cursors. It extends Node.js Readable streams to handle massive result sets without loading all data into memory, making it ideal for ETL operations and processing large tables.
3
4
## Package Information
5
6
- **Package Name**: pg-query-stream
7
- **Package Type**: npm
8
- **Language**: TypeScript
9
- **Installation**: `npm install pg-query-stream`
10
- **Peer Dependency**: Requires `pg` (^8) - the PostgreSQL client library
11
12
## Core Imports
13
14
```typescript
15
import QueryStream from "pg-query-stream";
16
```
17
18
For CommonJS:
19
20
```javascript
21
const QueryStream = require("pg-query-stream");
22
```
23
24
## Basic Usage
25
26
```typescript
27
import pg from "pg";
28
import QueryStream from "pg-query-stream";
29
30
const client = new pg.Client();
31
await client.connect();
32
33
// Stream large result set without memory issues
34
const query = new QueryStream('SELECT * FROM large_table', []);
35
const stream = client.query(query);
36
37
stream.on('data', (row) => {
38
console.log(row); // Process each row as it arrives
39
});
40
41
stream.on('end', () => {
42
console.log('Stream completed');
43
client.end();
44
});
45
46
stream.on('error', (err) => {
47
console.error('Stream error:', err);
48
});
49
```
50
51
## Architecture
52
53
pg-query-stream is built on these key components:
54
55
- **Cursor-based Streaming**: Uses PostgreSQL cursors via `pg-cursor` for efficient memory usage
56
- **Stream Integration**: Extends Node.js `Readable` stream for standard stream operations
57
- **pg Client Integration**: Implements `Submittable` interface for seamless pg client compatibility
58
- **Configurable Batching**: Controls memory usage through batch size and high water mark settings
59
60
## Capabilities
61
62
### Query Stream Creation
63
64
Create a streaming query that reads results in batches using PostgreSQL cursors.
65
66
```typescript { .api }
67
class QueryStream extends Readable implements Submittable {
68
constructor(
69
text: string,
70
values?: any[],
71
config?: QueryStreamConfig
72
);
73
}
74
```
75
76
### Stream Configuration
77
78
Configure memory usage and result format for the query stream.
79
80
```typescript { .api }
81
interface QueryStreamConfig {
82
batchSize?: number; // Number of rows to fetch per batch
83
highWaterMark?: number; // Stream buffer size (defaults to 100)
84
rowMode?: 'array'; // Return rows as arrays instead of objects
85
types?: any; // Custom type parsers for result columns
86
}
87
```
88
89
### Database Integration
90
91
Submit the query to a PostgreSQL connection for execution.
92
93
```typescript { .api }
94
submit(connection: Connection): void;
95
```
96
97
### Stream Properties
98
99
Access underlying cursor and result metadata.
100
101
```typescript { .api }
102
cursor: any; // The underlying pg-cursor instance
103
_result: any; // Query result metadata from pg-cursor
104
```
105
106
## Usage Examples
107
108
### Basic Streaming
109
110
```typescript
111
import pg from "pg";
112
import QueryStream from "pg-query-stream";
113
114
const pool = new pg.Pool();
115
116
pool.connect((err, client, done) => {
117
if (err) throw err;
118
119
const query = new QueryStream('SELECT * FROM users', []);
120
const stream = client.query(query);
121
122
stream.on('data', (row) => {
123
// Process each user row
124
console.log(`User: ${row.name}, Email: ${row.email}`);
125
});
126
127
stream.on('end', done);
128
stream.on('error', (err) => {
129
done();
130
throw err;
131
});
132
});
133
```
134
135
### With Configuration Options
136
137
```typescript
138
const query = new QueryStream(
139
'SELECT id, data FROM large_table WHERE active = $1',
140
[true],
141
{
142
batchSize: 50, // Fetch 50 rows at a time
143
highWaterMark: 25, // Buffer up to 25 rows
144
rowMode: 'array' // Return [id, data] instead of {id, data}
145
}
146
);
147
148
const stream = client.query(query);
149
stream.on('data', (row) => {
150
const [id, data] = row; // Array destructuring
151
console.log(`ID: ${id}, Data: ${data}`);
152
});
153
```
154
155
### Async Iteration (Node.js 10+)
156
157
```typescript
158
const query = new QueryStream('SELECT * FROM products', []);
159
const stream = client.query(query);
160
161
for await (const row of stream) {
162
// Process each product
163
console.log(`Product: ${row.name}, Price: ${row.price}`);
164
}
165
```
166
167
### Piping to Transform Streams
168
169
```typescript
170
import { Transform } from 'stream';
171
172
const query = new QueryStream('SELECT * FROM orders', []);
173
const stream = client.query(query);
174
175
const transform = new Transform({
176
objectMode: true,
177
transform(row, encoding, callback) {
178
// Transform each order
179
const transformed = {
180
orderId: row.id,
181
total: parseFloat(row.total),
182
date: new Date(row.created_at)
183
};
184
callback(null, transformed);
185
}
186
});
187
188
stream.pipe(transform).on('data', (order) => {
189
console.log('Processed order:', order);
190
});
191
```
192
193
### Custom Type Parsing
194
195
```typescript
196
const customTypes = {
197
getTypeParser: (oid: number) => {
198
// Custom parser for specific PostgreSQL types
199
if (oid === 1184) { // timestamptz
200
return (value: string) => new Date(value);
201
}
202
return (value: string) => value;
203
}
204
};
205
206
const query = new QueryStream(
207
'SELECT created_at, data FROM events',
208
[],
209
{ types: customTypes }
210
);
211
```
212
213
## Stream Events
214
215
pg-query-stream emits all standard Node.js Readable stream events:
216
217
- **'data'**: Emitted for each row returned from the query
218
- **'end'**: Emitted when all rows have been read
219
- **'error'**: Emitted when a query or cursor error occurs
220
- **'close'**: Emitted when the stream is destroyed
221
- **'readable'**: Emitted when data is available to read
222
223
## Memory Management
224
225
The stream maintains a small memory footprint by:
226
227
- **Cursor-based fetching**: Only keeps a small batch of rows in memory
228
- **Configurable batch size**: Control how many rows are fetched at once
229
- **Stream backpressure**: Automatically pauses fetching when downstream is slow
230
- **Automatic cleanup**: Cursor is closed when stream ends or is destroyed
231
232
## Error Handling
233
234
```typescript
235
const query = new QueryStream('SELECT * FROM table', []);
236
const stream = client.query(query);
237
238
stream.on('error', (err) => {
239
console.error('Query error:', err.message);
240
// Stream automatically destroys itself on error
241
});
242
243
// Handle connection errors separately
244
client.on('error', (err) => {
245
console.error('Client error:', err.message);
246
});
247
```
248
249
## Performance Considerations
250
251
- **Batch Size**: Larger batches reduce network roundtrips but use more memory
252
- **High Water Mark**: Controls internal buffering; tune based on processing speed
253
- **Row Mode**: Array mode is slightly faster than object mode for simple data
254
- **Cursor Overhead**: Small performance cost compared to loading all results in memory
255
- **Network Latency**: Batch size becomes more important with high network latency
256
257
## Limitations
258
259
- **JavaScript Client Only**: Does not work with native libpq bindings
260
- **Read-Only**: Stream is read-only; no updates through the stream
261
- **Single Query**: Each stream handles one query; create new instances for multiple queries
262
- **Connection Binding**: Stream must complete before connection can be used for other queries