0
# Changes Feed
1
2
Real-time database changes streaming with filtering, continuous monitoring support, and comprehensive options for tracking document modifications in remote CouchDB databases.
3
4
## Capabilities
5
6
### Changes Feed Monitoring
7
8
Streams database changes with extensive filtering and monitoring options.
9
10
```javascript { .api }
11
/**
12
* Stream database changes with various filtering options
13
* @param opts - Changes feed options including filters, continuous mode, and callbacks
14
* @returns Object with cancel method to stop the changes feed
15
*/
16
api._changes(opts): { cancel: function };
17
```
18
19
**Usage Examples:**
20
21
```javascript
22
// Basic changes monitoring
23
const changes = db.changes({
24
since: 'now',
25
live: true,
26
include_docs: true
27
}).on('change', (change) => {
28
console.log('Document changed:', change.id);
29
console.log('New revision:', change.changes[0].rev);
30
if (change.doc) {
31
console.log('Document content:', change.doc);
32
}
33
}).on('error', (err) => {
34
console.error('Changes feed error:', err);
35
});
36
37
// Stop monitoring after 30 seconds
38
setTimeout(() => {
39
changes.cancel();
40
console.log('Changes feed stopped');
41
}, 30000);
42
43
// One-time changes since last update
44
db.changes({
45
since: lastUpdateSeq,
46
include_docs: true
47
}, (err, result) => {
48
if (err) {
49
console.error('Error getting changes:', err);
50
return;
51
}
52
53
result.results.forEach(change => {
54
console.log('Changed document:', change.id);
55
console.log('Current revision:', change.changes[0].rev);
56
});
57
58
// Store the last sequence for next query
59
lastUpdateSeq = result.last_seq;
60
});
61
```
62
63
### Filtered Changes
64
65
Monitor changes with document filtering and specific document sets.
66
67
```javascript { .api }
68
/**
69
* Monitor changes with filtering options
70
* @param opts - Options including filter functions and document IDs
71
* @returns Changes feed controller with cancel method
72
*/
73
api._changes(opts): { cancel: function };
74
```
75
76
**Usage Examples:**
77
78
```javascript
79
// Filter by document IDs
80
const userChanges = db.changes({
81
live: true,
82
include_docs: true,
83
doc_ids: ['user:alice', 'user:bob', 'user:charlie']
84
}).on('change', (change) => {
85
console.log('User document changed:', change.id);
86
if (change.doc) {
87
console.log('User data:', {
88
name: change.doc.name,
89
email: change.doc.email
90
});
91
}
92
});
93
94
// Filter by design document view
95
const filteredChanges = db.changes({
96
live: true,
97
include_docs: true,
98
filter: '_view',
99
view: 'users/active'
100
}).on('change', (change) => {
101
console.log('Active user changed:', change.id);
102
});
103
104
// Custom server-side filter
105
const customFilterChanges = db.changes({
106
live: true,
107
include_docs: true,
108
filter: 'myapp/important',
109
query_params: {
110
priority: 'high'
111
}
112
}).on('change', (change) => {
113
console.log('Important document changed:', change.id);
114
});
115
```
116
117
### Continuous Changes Monitoring
118
119
Set up persistent changes monitoring with reconnection and error handling.
120
121
```javascript { .api }
122
/**
123
* Set up continuous changes monitoring with reconnection
124
* @param opts - Continuous monitoring options
125
* @returns Changes feed controller
126
*/
127
api._changes(opts): { cancel: function };
128
```
129
130
**Usage Examples:**
131
132
```javascript
133
// Continuous monitoring with heartbeat
134
let changesController;
135
let reconnectTimeout;
136
137
function startChangesMonitoring() {
138
changesController = db.changes({
139
live: true,
140
continuous: true,
141
include_docs: true,
142
heartbeat: 10000, // 10 seconds
143
timeout: 60000, // 60 seconds
144
since: localStorage.getItem('lastSeq') || 'now'
145
}).on('change', (change) => {
146
console.log('Document updated:', change.id);
147
148
// Store sequence for recovery
149
localStorage.setItem('lastSeq', change.seq);
150
151
// Process the change
152
handleDocumentChange(change);
153
154
}).on('error', (err) => {
155
console.error('Changes feed error:', err);
156
157
// Attempt to reconnect after 5 seconds
158
if (!reconnectTimeout) {
159
reconnectTimeout = setTimeout(() => {
160
console.log('Attempting to reconnect changes feed...');
161
reconnectTimeout = null;
162
startChangesMonitoring();
163
}, 5000);
164
}
165
166
}).on('complete', (info) => {
167
console.log('Changes feed completed:', info);
168
});
169
}
170
171
function handleDocumentChange(change) {
172
if (change.deleted) {
173
console.log('Document deleted:', change.id);
174
removeFromLocalCache(change.id);
175
} else {
176
console.log('Document updated:', change.id);
177
updateLocalCache(change.id, change.doc);
178
}
179
}
180
181
function stopChangesMonitoring() {
182
if (changesController) {
183
changesController.cancel();
184
changesController = null;
185
}
186
187
if (reconnectTimeout) {
188
clearTimeout(reconnectTimeout);
189
reconnectTimeout = null;
190
}
191
}
192
193
// Start monitoring
194
startChangesMonitoring();
195
```
196
197
### Changes with Conflict Detection
198
199
Monitor changes while detecting and handling document conflicts.
200
201
```javascript { .api }
202
/**
203
* Monitor changes with conflict detection
204
* @param opts - Options including conflicts flag
205
* @returns Changes feed controller
206
*/
207
api._changes(opts): { cancel: function };
208
```
209
210
**Usage Examples:**
211
212
```javascript
213
// Monitor with conflict detection
214
const conflictAwareChanges = db.changes({
215
live: true,
216
include_docs: true,
217
conflicts: true
218
}).on('change', (change) => {
219
console.log('Document changed:', change.id);
220
221
if (change.doc._conflicts) {
222
console.warn('Document has conflicts:', change.doc._conflicts);
223
224
// Handle conflicts by fetching all conflicting revisions
225
handleDocumentConflicts(change.id, change.doc._conflicts);
226
} else {
227
// Normal document update
228
console.log('Clean document update:', change.doc);
229
}
230
});
231
232
async function handleDocumentConflicts(docId, conflicts) {
233
console.log(`Resolving conflicts for ${docId}`);
234
235
try {
236
// Get all conflicting revisions
237
const conflictingDocs = await Promise.all(
238
conflicts.map(rev =>
239
new Promise((resolve, reject) => {
240
db.get(docId, { rev }, (err, doc) => {
241
if (err) reject(err);
242
else resolve(doc);
243
});
244
})
245
)
246
);
247
248
// Implement conflict resolution logic
249
const resolvedDoc = resolveConflicts(conflictingDocs);
250
251
// Save resolved document
252
db.put(resolvedDoc, (err, result) => {
253
if (err) {
254
console.error('Failed to resolve conflict:', err);
255
} else {
256
console.log('Conflict resolved:', result.rev);
257
}
258
});
259
260
} catch (err) {
261
console.error('Error handling conflicts:', err);
262
}
263
}
264
265
function resolveConflicts(docs) {
266
// Simple conflict resolution: use most recent document
267
return docs.reduce((latest, current) => {
268
const latestTime = new Date(latest.updated || latest._rev);
269
const currentTime = new Date(current.updated || current._rev);
270
return currentTime > latestTime ? current : latest;
271
});
272
}
273
```
274
275
## Advanced Usage Patterns
276
277
### Batch Processing Changes
278
279
```javascript
280
// Process changes in batches for better performance
281
let changesBatch = [];
282
const BATCH_SIZE = 10;
283
const BATCH_TIMEOUT = 5000;
284
let batchTimeout;
285
286
const batchChanges = db.changes({
287
live: true,
288
include_docs: true
289
}).on('change', (change) => {
290
changesBatch.push(change);
291
292
// Process batch when it reaches target size
293
if (changesBatch.length >= BATCH_SIZE) {
294
processBatch();
295
} else {
296
// Set timeout to process partial batch
297
if (batchTimeout) {
298
clearTimeout(batchTimeout);
299
}
300
batchTimeout = setTimeout(processBatch, BATCH_TIMEOUT);
301
}
302
});
303
304
function processBatch() {
305
if (changesBatch.length === 0) return;
306
307
console.log(`Processing batch of ${changesBatch.length} changes`);
308
309
// Process all changes in the batch
310
const batch = changesBatch.slice();
311
changesBatch = [];
312
313
if (batchTimeout) {
314
clearTimeout(batchTimeout);
315
batchTimeout = null;
316
}
317
318
// Handle batch processing
319
batch.forEach(change => {
320
console.log('Processing change:', change.id);
321
// Your batch processing logic here
322
});
323
}
324
```
325
326
### Changes Feed with Retry Logic
327
328
```javascript
329
class RobustChangesMonitor {
330
constructor(db, options = {}) {
331
this.db = db;
332
this.options = {
333
maxRetries: 5,
334
retryDelay: 1000,
335
backoffMultiplier: 2,
336
...options
337
};
338
this.currentChanges = null;
339
this.retryCount = 0;
340
this.isRunning = false;
341
}
342
343
start() {
344
if (this.isRunning) return;
345
346
this.isRunning = true;
347
this.connectChanges();
348
}
349
350
stop() {
351
this.isRunning = false;
352
353
if (this.currentChanges) {
354
this.currentChanges.cancel();
355
this.currentChanges = null;
356
}
357
}
358
359
connectChanges() {
360
if (!this.isRunning) return;
361
362
console.log('Starting changes feed...');
363
364
this.currentChanges = this.db.changes({
365
live: true,
366
include_docs: true,
367
since: this.options.since || 'now',
368
heartbeat: 30000
369
}).on('change', (change) => {
370
// Reset retry count on successful change
371
this.retryCount = 0;
372
this.handleChange(change);
373
374
}).on('error', (err) => {
375
console.error('Changes feed error:', err);
376
this.handleError(err);
377
378
}).on('complete', (info) => {
379
console.log('Changes feed completed');
380
if (this.isRunning) {
381
this.handleError(new Error('Changes feed completed unexpectedly'));
382
}
383
});
384
}
385
386
handleChange(change) {
387
console.log('Document changed:', change.id);
388
389
// Emit change event for external handlers
390
if (this.options.onChange) {
391
this.options.onChange(change);
392
}
393
}
394
395
handleError(err) {
396
if (!this.isRunning) return;
397
398
this.retryCount++;
399
400
if (this.retryCount <= this.options.maxRetries) {
401
const delay = this.options.retryDelay * Math.pow(this.options.backoffMultiplier, this.retryCount - 1);
402
403
console.log(`Retrying changes feed in ${delay}ms (attempt ${this.retryCount}/${this.options.maxRetries})`);
404
405
setTimeout(() => {
406
if (this.isRunning) {
407
this.connectChanges();
408
}
409
}, delay);
410
} else {
411
console.error(`Max retries (${this.options.maxRetries}) exceeded, stopping changes feed`);
412
this.stop();
413
414
if (this.options.onMaxRetriesReached) {
415
this.options.onMaxRetriesReached(err);
416
}
417
}
418
}
419
}
420
421
// Usage
422
const monitor = new RobustChangesMonitor(db, {
423
since: 'now',
424
maxRetries: 10,
425
retryDelay: 2000,
426
onChange: (change) => {
427
console.log('Robust change handler:', change.id);
428
},
429
onMaxRetriesReached: (err) => {
430
console.error('Changes monitoring failed permanently:', err);
431
}
432
});
433
434
monitor.start();
435
```
436
437
## Types
438
439
```javascript { .api }
440
// Changes options
441
interface ChangesOptions {
442
conflicts?: boolean;
443
include_docs?: boolean;
444
attachments?: boolean;
445
descending?: boolean;
446
since?: string | number;
447
limit?: number;
448
timeout?: number;
449
heartbeat?: number | boolean;
450
live?: boolean;
451
continuous?: boolean;
452
filter?: string;
453
view?: string;
454
doc_ids?: string[];
455
query_params?: { [key: string]: any };
456
selector?: any;
457
style?: 'main_only' | 'all_docs';
458
seq_interval?: number;
459
batch_size?: number;
460
onChange?: (change: Change) => void;
461
onError?: (error: Error) => void;
462
onComplete?: (result: ChangesResult) => void;
463
}
464
465
// Change object
466
interface Change {
467
id: string;
468
seq: string | number;
469
changes: ChangeRevision[];
470
doc?: PouchDoc;
471
deleted?: boolean;
472
}
473
474
interface ChangeRevision {
475
rev: string;
476
}
477
478
// Changes result
479
interface ChangesResult {
480
results: Change[];
481
last_seq: string | number;
482
pending?: number;
483
}
484
485
// Changes controller
486
interface ChangesController {
487
cancel(): void;
488
on(event: 'change', callback: (change: Change) => void): ChangesController;
489
on(event: 'error', callback: (error: Error) => void): ChangesController;
490
on(event: 'complete', callback: (result: ChangesResult) => void): ChangesController;
491
}
492
```