0
# Change Feeds and Monitoring
1
2
Monitor database changes in real-time with comprehensive support for continuous feeds, filtering, infinite monitoring, and change event processing.
3
4
## Capabilities
5
6
### Feed Classes
7
8
Iterator classes for consuming change feeds and database updates.
9
10
```python { .api }
11
class Feed:
12
"""
13
Iterator for consuming continuous and non-continuous change feeds.
14
"""
15
16
def __init__(self, source, raw_data=False, **options):
17
"""
18
Initialize feed iterator.
19
20
Parameters:
21
- source (callable): Function that returns feed data
22
- raw_data (bool): Return raw JSON response data
23
- heartbeat (int): Heartbeat interval in milliseconds
24
- timeout (int): Feed timeout in milliseconds
25
- since (str | int): Start from specific sequence
26
- limit (int): Maximum number of changes
27
- descending (bool): Process changes in reverse order
28
- include_docs (bool): Include document content in changes
29
- attachments (bool): Include attachment info
30
- att_encoding_info (bool): Include attachment encoding info
31
- conflicts (bool): Include conflict information
32
- filter (str): Filter function name
33
- **options: Additional feed parameters
34
"""
35
36
def __iter__(self):
37
"""
38
Iterator protocol implementation.
39
40
Yields:
41
dict: Change events from the feed
42
"""
43
44
def stop(self):
45
"""
46
Stop feed iteration and close connection.
47
48
Returns:
49
None
50
"""
51
52
@property
53
def last_seq(self):
54
"""
55
Last processed sequence identifier.
56
57
Returns:
58
str | int: Sequence identifier for resuming feed
59
"""
60
61
class InfiniteFeed(Feed):
62
"""
63
Perpetually refreshed feed iterator that automatically reconnects.
64
"""
65
```
66
67
### Database Change Feed Methods
68
69
Database methods for accessing change feeds.
70
71
```python { .api }
72
class CouchDatabase(dict):
73
"""Database change feed methods."""
74
75
def changes(self, raw_data=False, **kwargs):
76
"""
77
Get changes feed for database.
78
79
Parameters:
80
- raw_data (bool): Return raw response data
81
- feed (str): Feed type ('normal', 'continuous', 'longpoll')
82
- style (str): Change format ('main_only', 'all_docs')
83
- heartbeat (int): Heartbeat interval (continuous feeds)
84
- timeout (int): Request timeout in milliseconds
85
- since (str | int): Start sequence (0, 'now', or sequence ID)
86
- limit (int): Maximum number of changes
87
- descending (bool): Reverse chronological order
88
- include_docs (bool): Include full document content
89
- attachments (bool): Include attachment stubs
90
- att_encoding_info (bool): Include attachment encoding info
91
- conflicts (bool): Include conflict revisions
92
- deleted_conflicts (bool): Include deleted conflict revisions
93
- filter (str): Filter function name from design document
94
- doc_ids (list[str]): Only changes for specific document IDs
95
- selector (dict): Filter changes by document selector (Cloudant only)
96
- **kwargs: Additional filter parameters
97
98
Returns:
99
Feed: Change feed iterator
100
101
Raises:
102
CloudantFeedException: Feed creation failed
103
"""
104
105
def infinite_changes(self, **kwargs):
106
"""
107
Get infinite changes feed that automatically reconnects.
108
109
Parameters:
110
- **kwargs: Same options as changes() method
111
112
Returns:
113
InfiniteFeed: Perpetual change feed iterator
114
"""
115
```
116
117
### Client Database Update Feeds
118
119
Monitor database creation and deletion events.
120
121
```python { .api }
122
class CouchDB(dict):
123
"""Database update monitoring."""
124
125
def db_updates(self, raw_data=False, **kwargs):
126
"""
127
Monitor database creation and deletion events.
128
129
Parameters:
130
- raw_data (bool): Return raw response data
131
- feed (str): Feed type ('normal', 'continuous', 'longpoll')
132
- timeout (int): Request timeout
133
- heartbeat (int): Heartbeat interval (continuous feeds)
134
- since (str): Start from specific sequence
135
136
Returns:
137
Feed: Database updates feed iterator
138
"""
139
140
class Cloudant(CouchDB):
141
"""Cloudant database update monitoring."""
142
143
def infinite_db_updates(self, **kwargs):
144
"""
145
Get infinite database updates feed.
146
147
Parameters:
148
- **kwargs: Same options as db_updates()
149
150
Returns:
151
InfiniteFeed: Perpetual database updates iterator
152
"""
153
```
154
155
## Usage Examples
156
157
### Basic Change Monitoring
158
159
```python
160
from cloudant import cloudant
161
162
with cloudant('user', 'pass', account='myaccount') as client:
163
db = client['my_database']
164
165
# Get recent changes (non-continuous)
166
changes = db.changes(limit=10, include_docs=True)
167
168
for change in changes:
169
doc_id = change['id']
170
seq = change['seq']
171
172
if 'doc' in change:
173
doc = change['doc']
174
print(f"Changed document {doc_id}: {doc.get('name', 'N/A')}")
175
else:
176
print(f"Document {doc_id} changed (deleted: {change.get('deleted', False)})")
177
178
print(f"Last sequence: {changes.last_seq}")
179
```
180
181
### Continuous Change Monitoring
182
183
```python
184
from cloudant import cloudant
185
import time
186
import threading
187
188
with cloudant('user', 'pass', account='myaccount') as client:
189
db = client['my_database']
190
191
# Start continuous feed in background thread
192
def monitor_changes():
193
try:
194
changes = db.changes(
195
feed='continuous',
196
include_docs=True,
197
heartbeat=30000, # 30 second heartbeat
198
timeout=60000 # 60 second timeout
199
)
200
201
for change in changes:
202
if change: # Skip heartbeat messages
203
doc_id = change['id']
204
if change.get('deleted'):
205
print(f"Document deleted: {doc_id}")
206
else:
207
doc = change.get('doc', {})
208
print(f"Document updated: {doc_id} - {doc.get('name', 'N/A')}")
209
210
except Exception as e:
211
print(f"Change feed error: {e}")
212
213
# Start monitoring in background
214
monitor_thread = threading.Thread(target=monitor_changes)
215
monitor_thread.daemon = True
216
monitor_thread.start()
217
218
# Do other work while monitoring changes
219
print("Monitoring changes in background...")
220
time.sleep(60) # Monitor for 1 minute
221
222
print("Stopping change monitoring")
223
```
224
225
### Infinite Change Feed
226
227
```python
228
from cloudant import cloudant
229
230
with cloudant('user', 'pass', account='myaccount') as client:
231
db = client['my_database']
232
233
# Infinite feed automatically reconnects on connection loss
234
infinite_changes = db.infinite_changes(
235
include_docs=True,
236
heartbeat=15000, # 15 second heartbeat
237
since='now' # Start from current time
238
)
239
240
try:
241
for change in infinite_changes:
242
if change: # Skip heartbeat messages
243
doc_id = change['id']
244
seq = change['seq']
245
246
if change.get('deleted'):
247
print(f"[{seq}] Document deleted: {doc_id}")
248
else:
249
doc = change.get('doc', {})
250
doc_type = doc.get('type', 'unknown')
251
print(f"[{seq}] {doc_type} document updated: {doc_id}")
252
253
except KeyboardInterrupt:
254
print("Stopping infinite change feed...")
255
infinite_changes.stop()
256
```
257
258
### Filtered Change Feeds
259
260
```python
261
from cloudant import cloudant
262
263
with cloudant('user', 'pass', account='myaccount') as client:
264
db = client['my_database']
265
266
# Filter by document IDs
267
doc_ids = ['user123', 'user456', 'user789']
268
changes = db.changes(
269
doc_ids=doc_ids,
270
include_docs=True,
271
since='0' # From beginning
272
)
273
274
for change in changes:
275
print(f"User document changed: {change['id']}")
276
277
# Filter using selector (Cloudant only)
278
changes = db.changes(
279
selector={'type': 'order', 'status': 'pending'},
280
include_docs=True,
281
feed='continuous'
282
)
283
284
for change in changes:
285
if change:
286
doc = change.get('doc', {})
287
print(f"Pending order updated: {doc.get('order_id', 'N/A')}")
288
289
# Filter using design document filter function
290
changes = db.changes(
291
filter='filters/by_user_type',
292
user_type='admin', # Parameter for filter function
293
include_docs=True
294
)
295
296
for change in changes:
297
print(f"Admin user changed: {change['id']}")
298
```
299
300
### Resumable Change Processing
301
302
```python
303
from cloudant import cloudant
304
import json
305
import os
306
307
CHECKPOINT_FILE = 'change_checkpoint.json'
308
309
def load_checkpoint():
310
"""Load last processed sequence from file."""
311
if os.path.exists(CHECKPOINT_FILE):
312
with open(CHECKPOINT_FILE, 'r') as f:
313
data = json.load(f)
314
return data.get('last_seq', '0')
315
return '0'
316
317
def save_checkpoint(seq):
318
"""Save current sequence to file."""
319
with open(CHECKPOINT_FILE, 'w') as f:
320
json.dump({'last_seq': seq}, f)
321
322
with cloudant('user', 'pass', account='myaccount') as client:
323
db = client['my_database']
324
325
# Start from last checkpoint
326
last_seq = load_checkpoint()
327
print(f"Resuming from sequence: {last_seq}")
328
329
changes = db.changes(
330
since=last_seq,
331
include_docs=True,
332
limit=100 # Process in batches
333
)
334
335
processed_count = 0
336
337
for change in changes:
338
doc_id = change['id']
339
current_seq = change['seq']
340
341
# Process the change
342
if change.get('deleted'):
343
print(f"Processing deletion: {doc_id}")
344
else:
345
doc = change.get('doc', {})
346
print(f"Processing update: {doc_id} - {doc.get('type', 'unknown')}")
347
348
processed_count += 1
349
350
# Save checkpoint periodically
351
if processed_count % 10 == 0:
352
save_checkpoint(current_seq)
353
print(f"Checkpoint saved at sequence: {current_seq}")
354
355
# Save final checkpoint
356
if changes.last_seq:
357
save_checkpoint(changes.last_seq)
358
print(f"Final checkpoint: {changes.last_seq}")
359
360
print(f"Processed {processed_count} changes")
361
```
362
363
### Database Updates Monitoring
364
365
```python
366
from cloudant import cloudant
367
368
with cloudant('user', 'pass', account='myaccount') as client:
369
# Monitor database creation/deletion events
370
db_updates = client.db_updates(feed='continuous')
371
372
try:
373
for update in db_updates:
374
if update: # Skip heartbeat messages
375
db_name = update['db_name']
376
update_type = update['type']
377
378
if update_type == 'created':
379
print(f"Database created: {db_name}")
380
elif update_type == 'deleted':
381
print(f"Database deleted: {db_name}")
382
elif update_type == 'updated':
383
print(f"Database updated: {db_name}")
384
385
except KeyboardInterrupt:
386
print("Stopping database updates monitoring")
387
db_updates.stop()
388
```
389
390
### Change Feed with Error Handling
391
392
```python
393
from cloudant import cloudant
394
from cloudant.error import CloudantFeedException
395
import time
396
397
def robust_change_monitor(client, db_name, max_retries=5):
398
"""Monitor changes with automatic retry on errors."""
399
400
retry_count = 0
401
last_seq = '0'
402
403
while retry_count < max_retries:
404
try:
405
db = client[db_name]
406
407
print(f"Starting change feed from sequence: {last_seq}")
408
changes = db.changes(
409
since=last_seq,
410
feed='continuous',
411
include_docs=True,
412
heartbeat=30000,
413
timeout=60000
414
)
415
416
for change in changes:
417
if change:
418
doc_id = change['id']
419
last_seq = change['seq']
420
421
# Process change
422
if change.get('deleted'):
423
print(f"Document deleted: {doc_id}")
424
else:
425
doc = change.get('doc', {})
426
print(f"Document updated: {doc_id}")
427
428
# If we reach here, feed ended normally
429
print("Change feed ended normally")
430
break
431
432
except CloudantFeedException as e:
433
retry_count += 1
434
print(f"Feed error (attempt {retry_count}/{max_retries}): {e}")
435
436
if retry_count < max_retries:
437
# Exponential backoff
438
wait_time = 2 ** retry_count
439
print(f"Retrying in {wait_time} seconds...")
440
time.sleep(wait_time)
441
else:
442
print("Max retries reached, giving up")
443
raise
444
445
except KeyboardInterrupt:
446
print("Change monitoring interrupted by user")
447
break
448
except Exception as e:
449
print(f"Unexpected error: {e}")
450
break
451
452
# Usage
453
with cloudant('user', 'pass', account='myaccount') as client:
454
robust_change_monitor(client, 'my_database')
455
```
456
457
### Multi-Database Change Monitoring
458
459
```python
460
from cloudant import cloudant
461
import threading
462
import time
463
464
def monitor_database_changes(client, db_name, callback):
465
"""Monitor changes for a specific database."""
466
try:
467
db = client[db_name]
468
changes = db.changes(
469
feed='continuous',
470
include_docs=True,
471
heartbeat=30000
472
)
473
474
for change in changes:
475
if change:
476
callback(db_name, change)
477
478
except Exception as e:
479
print(f"Error monitoring {db_name}: {e}")
480
481
def change_handler(db_name, change):
482
"""Handle change events from any database."""
483
doc_id = change['id']
484
seq = change['seq']
485
486
if change.get('deleted'):
487
print(f"[{db_name}] Document deleted: {doc_id}")
488
else:
489
doc = change.get('doc', {})
490
doc_type = doc.get('type', 'unknown')
491
print(f"[{db_name}] {doc_type} updated: {doc_id}")
492
493
with cloudant('user', 'pass', account='myaccount') as client:
494
# Monitor multiple databases
495
databases = ['users', 'orders', 'inventory', 'logs']
496
threads = []
497
498
for db_name in databases:
499
thread = threading.Thread(
500
target=monitor_database_changes,
501
args=(client, db_name, change_handler)
502
)
503
thread.daemon = True
504
thread.start()
505
threads.append(thread)
506
print(f"Started monitoring {db_name}")
507
508
try:
509
# Keep main thread alive
510
while True:
511
time.sleep(1)
512
except KeyboardInterrupt:
513
print("Stopping all change monitors...")
514
```
515
516
### Change Feed Performance Optimization
517
518
```python
519
from cloudant import cloudant
520
import time
521
522
with cloudant('user', 'pass', account='myaccount') as client:
523
db = client['my_database']
524
525
# High-performance change processing
526
changes = db.changes(
527
feed='continuous',
528
include_docs=False, # Don't fetch full documents for performance
529
heartbeat=5000, # More frequent heartbeats
530
timeout=30000, # Shorter timeout for faster reconnection
531
limit=1000, # Process in larger batches
532
since='now' # Start from current time
533
)
534
535
batch = []
536
batch_size = 50
537
last_process_time = time.time()
538
539
for change in changes:
540
if change:
541
batch.append(change)
542
543
# Process batch when full or after timeout
544
if (len(batch) >= batch_size or
545
time.time() - last_process_time > 5):
546
547
# Process batch efficiently
548
print(f"Processing batch of {len(batch)} changes")
549
550
for change in batch:
551
doc_id = change['id']
552
553
if change.get('deleted'):
554
# Handle deletion
555
print(f"Deleted: {doc_id}")
556
else:
557
# Fetch document only if needed
558
doc = db.get(doc_id, remote=True)
559
if doc and doc.exists():
560
# Process document
561
print(f"Updated: {doc_id}")
562
563
# Clear batch
564
batch = []
565
last_process_time = time.time()
566
```
567
568
## Error Handling
569
570
Change feed operations can raise `CloudantFeedException`:
571
572
```python
573
from cloudant import cloudant
574
from cloudant.error import CloudantFeedException
575
576
with cloudant('user', 'pass', account='myaccount') as client:
577
db = client['my_database']
578
579
try:
580
# Invalid feed parameters
581
changes = db.changes(feed='invalid_feed_type')
582
for change in changes:
583
print(change)
584
except CloudantFeedException as e:
585
print(f"Feed configuration error: {e}")
586
587
try:
588
# Network timeout during feed consumption
589
changes = db.changes(
590
feed='continuous',
591
timeout=1000 # Very short timeout
592
)
593
594
for change in changes:
595
print(change)
596
597
except CloudantFeedException as e:
598
print(f"Feed network error: {e}")
599
600
try:
601
# Invalid filter function
602
changes = db.changes(filter='non_existent/filter')
603
for change in changes:
604
print(change)
605
except CloudantFeedException as e:
606
print(f"Filter error: {e}")
607
```