0
# Query Cursors
1
2
Server-side cursors for efficient processing of large PostgreSQL result sets without loading all rows into memory at once.
3
4
## Capabilities
5
6
### Cursor Constructor
7
8
Creates a new query cursor for server-side result streaming.
9
10
```javascript { .api }
11
/**
12
* Create a new query cursor
13
* @param text - SQL query text
14
* @param values - Optional parameter values
15
* @param config - Optional cursor configuration
16
*/
17
class Cursor extends EventEmitter {
18
constructor(text: string, values?: any[], config?: CursorConfig);
19
}
20
21
interface CursorConfig {
22
/** Row output mode ('array' or 'object') */
23
rowMode?: 'array' | 'object';
24
/** Custom type parser object */
25
types?: any;
26
/** Promise constructor to use */
27
Promise?: typeof Promise;
28
}
29
```
30
31
**Usage Examples:**
32
33
```javascript
34
const Cursor = require('pg-cursor');
35
const { Client } = require('pg');
36
37
const client = new Client();
38
await client.connect();
39
40
// Basic cursor
41
const cursor1 = new Cursor('SELECT * FROM users');
42
43
// Parameterized cursor
44
const cursor2 = new Cursor('SELECT * FROM users WHERE age > $1', [25]);
45
46
// Cursor with configuration
47
const cursor3 = new Cursor('SELECT id, name FROM users', null, {
48
rowMode: 'array'
49
});
50
51
// Submit cursor to client
52
const query = client.query(cursor1);
53
```
54
55
### Read Rows
56
57
Read a specified number of rows from the cursor.
58
59
```javascript { .api }
60
/**
61
* Read rows from the cursor
62
* @param rows - Number of rows to read
63
* @param callback - Optional callback for result
64
* @returns Promise when no callback provided
65
*/
66
read(rows: number, callback?: (err: Error | null, rows: any[], result: QueryResult) => void): Promise<any[]>;
67
```
68
69
**Usage Examples:**
70
71
```javascript
72
// Callback-based reading
73
const cursor = client.query(new Cursor('SELECT * FROM large_table'));
74
75
cursor.read(100, (err, rows, result) => {
76
if (err) throw err;
77
console.log(`Read ${rows.length} rows`);
78
console.log(`Total processed: ${result.rows.length}`);
79
});
80
81
// Promise-based reading
82
const rows = await cursor.read(100);
83
console.log(`Retrieved ${rows.length} rows`);
84
85
// Read all remaining rows
86
const allRows = await cursor.read(1000000);
87
```
88
89
### Close Cursor
90
91
Close the cursor and release server resources.
92
93
```javascript { .api }
94
/**
95
* Close the cursor and cleanup resources
96
* @param callback - Optional callback for completion
97
* @returns Promise when no callback provided
98
*/
99
close(callback?: (err: Error | null) => void): Promise<void>;
100
```
101
102
**Usage Examples:**
103
104
```javascript
105
// Callback-based close
106
cursor.close((err) => {
107
if (err) throw err;
108
console.log('Cursor closed');
109
});
110
111
// Promise-based close
112
await cursor.close();
113
console.log('Cursor closed');
114
115
// Always close cursors in finally blocks
116
try {
117
const cursor = client.query(new Cursor('SELECT * FROM users'));
118
const rows = await cursor.read(100);
119
// Process rows...
120
} finally {
121
await cursor.close();
122
}
123
```
124
125
### Cursor Events
126
127
Cursors emit events during their lifecycle for event-driven processing.
128
129
```javascript { .api }
130
// Row processing event
131
cursor.on('row', (row: any, result: QueryResult) => {
132
// Emitted for each row during read operations
133
});
134
135
// Completion event
136
cursor.on('end', (result: QueryResult) => {
137
// Emitted when cursor reaches end or is closed
138
});
139
140
// Error event
141
cursor.on('error', (err: Error) => {
142
// Emitted when cursor encounters an error
143
});
144
```
145
146
**Usage Examples:**
147
148
```javascript
149
const cursor = client.query(new Cursor('SELECT * FROM large_table'));
150
151
// Process rows individually as they arrive
152
cursor.on('row', (row, result) => {
153
console.log('Processing user:', row.name);
154
// Add row to result accumulator
155
result.addRow(row);
156
});
157
158
cursor.on('end', (result) => {
159
console.log(`Processed ${result.rows.length} total rows`);
160
});
161
162
cursor.on('error', (err) => {
163
console.error('Cursor error:', err);
164
});
165
166
// Start reading (triggers row events)
167
await cursor.read(1000);
168
```
169
170
## Usage Patterns
171
172
### Batch Processing
173
174
Process large tables in manageable batches.
175
176
```javascript
177
const cursor = client.query(new Cursor('SELECT * FROM huge_table ORDER BY id'));
178
179
let batch;
180
let totalProcessed = 0;
181
182
do {
183
batch = await cursor.read(1000);
184
185
// Process this batch
186
for (const row of batch) {
187
await processRow(row);
188
}
189
190
totalProcessed += batch.length;
191
console.log(`Processed ${totalProcessed} rows so far...`);
192
193
} while (batch.length > 0);
194
195
await cursor.close();
196
console.log(`Finished processing ${totalProcessed} total rows`);
197
```
198
199
### Memory-Efficient ETL
200
201
Extract, transform, and load data without memory overflow.
202
203
```javascript
204
async function etlLargeTable() {
205
const sourceCursor = sourceClient.query(
206
new Cursor('SELECT * FROM source_table')
207
);
208
209
try {
210
let batch;
211
do {
212
batch = await sourceCursor.read(500);
213
214
// Transform batch
215
const transformedRows = batch.map(transformRow);
216
217
// Load into destination
218
if (transformedRows.length > 0) {
219
await insertBatch(targetClient, transformedRows);
220
}
221
222
} while (batch.length > 0);
223
224
} finally {
225
await sourceCursor.close();
226
}
227
}
228
```
229
230
### Real-time Processing
231
232
Process rows as they become available.
233
234
```javascript
235
const cursor = client.query(new Cursor(
236
'SELECT * FROM events WHERE created_at > $1 ORDER BY created_at',
237
[lastProcessedTime]
238
));
239
240
cursor.on('row', async (row) => {
241
await processEvent(row);
242
lastProcessedTime = row.created_at;
243
});
244
245
cursor.on('end', () => {
246
console.log('All events processed');
247
});
248
249
// Start streaming
250
await cursor.read(10000);
251
```
252
253
### Transaction Support
254
255
Use cursors within database transactions.
256
257
```javascript
258
await client.query('BEGIN');
259
260
try {
261
const cursor = client.query(new Cursor(
262
'SELECT * FROM accounts WHERE balance > 0 FOR UPDATE'
263
));
264
265
let batch;
266
do {
267
batch = await cursor.read(100);
268
269
for (const account of batch) {
270
// Update account within transaction
271
await client.query(
272
'UPDATE accounts SET balance = balance * $1 WHERE id = $2',
273
[1.05, account.id]
274
);
275
}
276
277
} while (batch.length > 0);
278
279
await cursor.close();
280
await client.query('COMMIT');
281
282
} catch (err) {
283
await client.query('ROLLBACK');
284
throw err;
285
}
286
```
287
288
### Connection Pool Usage
289
290
Use cursors with connection pools (requires dedicated client).
291
292
```javascript
293
const { Pool } = require('pg');
294
const pool = new Pool();
295
296
async function processCursorData() {
297
const client = await pool.connect();
298
299
try {
300
const cursor = client.query(new Cursor('SELECT * FROM large_table'));
301
302
let totalRows = 0;
303
let batch;
304
305
do {
306
batch = await cursor.read(1000);
307
totalRows += batch.length;
308
309
// Process batch...
310
await processBatch(batch);
311
312
} while (batch.length > 0);
313
314
await cursor.close();
315
return totalRows;
316
317
} finally {
318
client.release(); // Return client to pool
319
}
320
}
321
```
322
323
## Configuration Options
324
325
### Row Mode
326
327
Control how rows are returned from the cursor.
328
329
```javascript
330
// Object mode (default)
331
const objectCursor = new Cursor('SELECT id, name FROM users');
332
// Returns: [{ id: 1, name: 'Alice' }, { id: 2, name: 'Bob' }]
333
334
// Array mode
335
const arrayCursor = new Cursor('SELECT id, name FROM users', null, {
336
rowMode: 'array'
337
});
338
// Returns: [[1, 'Alice'], [2, 'Bob']]
339
```
340
341
### Custom Type Parsing
342
343
Use custom type parsers with cursors.
344
345
```javascript
346
const customTypes = {
347
getTypeParser: (oid, format) => {
348
if (oid === 1184) { // timestamptz
349
return (val) => new Date(val);
350
}
351
return types.getTypeParser(oid, format);
352
}
353
};
354
355
const cursor = new Cursor('SELECT created_at FROM events', null, {
356
types: customTypes
357
});
358
```
359
360
### Promise Constructor
361
362
Use custom Promise implementation.
363
364
```javascript
365
const BluebirdPromise = require('bluebird');
366
367
const cursor = new Cursor('SELECT * FROM users', null, {
368
Promise: BluebirdPromise
369
});
370
371
// All cursor operations return Bluebird promises
372
const rows = await cursor.read(100); // Returns Bluebird promise
373
```
374
375
## Error Handling
376
377
```javascript
378
try {
379
const cursor = client.query(new Cursor('SELECT * FROM users'));
380
381
cursor.on('error', (err) => {
382
console.error('Cursor error:', err);
383
});
384
385
const rows = await cursor.read(100);
386
await cursor.close();
387
388
} catch (err) {
389
console.error('Operation failed:', err);
390
}
391
```
392
393
## Best Practices
394
395
### Always Close Cursors
396
397
```javascript
398
// Good: Ensure cursors are always closed
399
const cursor = client.query(new Cursor('SELECT * FROM table'));
400
try {
401
const rows = await cursor.read(100);
402
// Process rows...
403
} finally {
404
await cursor.close(); // Always close
405
}
406
```
407
408
### Reasonable Batch Sizes
409
410
```javascript
411
// Good: Moderate batch sizes
412
const rows = await cursor.read(1000); // Reasonable batch
413
414
// Avoid: Very large batches
415
const rows = await cursor.read(1000000); // May cause memory issues
416
```
417
418
### Error Recovery
419
420
```javascript
421
const cursor = client.query(new Cursor('SELECT * FROM table'));
422
423
try {
424
let batch;
425
do {
426
try {
427
batch = await cursor.read(100);
428
await processBatch(batch);
429
} catch (batchError) {
430
console.error('Batch processing error:', batchError);
431
// Decide whether to continue or abort
432
break;
433
}
434
} while (batch && batch.length > 0);
435
436
} finally {
437
await cursor.close();
438
}
439
```