IBM Cloudant Python client library providing comprehensive interface for Cloudant and CouchDB databases
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Monitor database changes in real-time with comprehensive support for continuous feeds, filtering, infinite monitoring, and change event processing.
Iterator classes for consuming change feeds and database updates.
class Feed:
"""
Iterator for consuming continuous and non-continuous change feeds.
"""
def __init__(self, source, raw_data=False, **options):
"""
Initialize feed iterator.
Parameters:
- source (callable): Function that returns feed data
- raw_data (bool): Return raw JSON response data
- heartbeat (int): Heartbeat interval in milliseconds
- timeout (int): Feed timeout in milliseconds
- since (str | int): Start from specific sequence
- limit (int): Maximum number of changes
- descending (bool): Process changes in reverse order
- include_docs (bool): Include document content in changes
- attachments (bool): Include attachment info
- att_encoding_info (bool): Include attachment encoding info
- conflicts (bool): Include conflict information
- filter (str): Filter function name
- **options: Additional feed parameters
"""
def __iter__(self):
"""
Iterator protocol implementation.
Yields:
dict: Change events from the feed
"""
def stop(self):
"""
Stop feed iteration and close connection.
Returns:
None
"""
@property
def last_seq(self):
"""
Last processed sequence identifier.
Returns:
str | int: Sequence identifier for resuming feed
"""
class InfiniteFeed(Feed):
"""
Perpetually refreshed feed iterator that automatically reconnects.
"""Database methods for accessing change feeds.
class CouchDatabase(dict):
"""Database change feed methods."""
def changes(self, raw_data=False, **kwargs):
"""
Get changes feed for database.
Parameters:
- raw_data (bool): Return raw response data
- feed (str): Feed type ('normal', 'continuous', 'longpoll')
- style (str): Change format ('main_only', 'all_docs')
- heartbeat (int): Heartbeat interval (continuous feeds)
- timeout (int): Request timeout in milliseconds
- since (str | int): Start sequence (0, 'now', or sequence ID)
- limit (int): Maximum number of changes
- descending (bool): Reverse chronological order
- include_docs (bool): Include full document content
- attachments (bool): Include attachment stubs
- att_encoding_info (bool): Include attachment encoding info
- conflicts (bool): Include conflict revisions
- deleted_conflicts (bool): Include deleted conflict revisions
- filter (str): Filter function name from design document
- doc_ids (list[str]): Only changes for specific document IDs
- selector (dict): Filter changes by document selector (Cloudant only)
- **kwargs: Additional filter parameters
Returns:
Feed: Change feed iterator
Raises:
CloudantFeedException: Feed creation failed
"""
def infinite_changes(self, **kwargs):
"""
Get infinite changes feed that automatically reconnects.
Parameters:
- **kwargs: Same options as changes() method
Returns:
InfiniteFeed: Perpetual change feed iterator
"""Monitor database creation and deletion events.
class CouchDB(dict):
"""Database update monitoring."""
def db_updates(self, raw_data=False, **kwargs):
"""
Monitor database creation and deletion events.
Parameters:
- raw_data (bool): Return raw response data
- feed (str): Feed type ('normal', 'continuous', 'longpoll')
- timeout (int): Request timeout
- heartbeat (int): Heartbeat interval (continuous feeds)
- since (str): Start from specific sequence
Returns:
Feed: Database updates feed iterator
"""
class Cloudant(CouchDB):
"""Cloudant database update monitoring."""
def infinite_db_updates(self, **kwargs):
"""
Get infinite database updates feed.
Parameters:
- **kwargs: Same options as db_updates()
Returns:
InfiniteFeed: Perpetual database updates iterator
"""from cloudant import cloudant
with cloudant('user', 'pass', account='myaccount') as client:
db = client['my_database']
# Get recent changes (non-continuous)
changes = db.changes(limit=10, include_docs=True)
for change in changes:
doc_id = change['id']
seq = change['seq']
if 'doc' in change:
doc = change['doc']
print(f"Changed document {doc_id}: {doc.get('name', 'N/A')}")
else:
print(f"Document {doc_id} changed (deleted: {change.get('deleted', False)})")
print(f"Last sequence: {changes.last_seq}")from cloudant import cloudant
import time
import threading
with cloudant('user', 'pass', account='myaccount') as client:
db = client['my_database']
# Start continuous feed in background thread
def monitor_changes():
try:
changes = db.changes(
feed='continuous',
include_docs=True,
heartbeat=30000, # 30 second heartbeat
timeout=60000 # 60 second timeout
)
for change in changes:
if change: # Skip heartbeat messages
doc_id = change['id']
if change.get('deleted'):
print(f"Document deleted: {doc_id}")
else:
doc = change.get('doc', {})
print(f"Document updated: {doc_id} - {doc.get('name', 'N/A')}")
except Exception as e:
print(f"Change feed error: {e}")
# Start monitoring in background
monitor_thread = threading.Thread(target=monitor_changes)
monitor_thread.daemon = True
monitor_thread.start()
# Do other work while monitoring changes
print("Monitoring changes in background...")
time.sleep(60) # Monitor for 1 minute
print("Stopping change monitoring")from cloudant import cloudant
with cloudant('user', 'pass', account='myaccount') as client:
db = client['my_database']
# Infinite feed automatically reconnects on connection loss
infinite_changes = db.infinite_changes(
include_docs=True,
heartbeat=15000, # 15 second heartbeat
since='now' # Start from current time
)
try:
for change in infinite_changes:
if change: # Skip heartbeat messages
doc_id = change['id']
seq = change['seq']
if change.get('deleted'):
print(f"[{seq}] Document deleted: {doc_id}")
else:
doc = change.get('doc', {})
doc_type = doc.get('type', 'unknown')
print(f"[{seq}] {doc_type} document updated: {doc_id}")
except KeyboardInterrupt:
print("Stopping infinite change feed...")
infinite_changes.stop()from cloudant import cloudant
with cloudant('user', 'pass', account='myaccount') as client:
db = client['my_database']
# Filter by document IDs
doc_ids = ['user123', 'user456', 'user789']
changes = db.changes(
doc_ids=doc_ids,
include_docs=True,
since='0' # From beginning
)
for change in changes:
print(f"User document changed: {change['id']}")
# Filter using selector (Cloudant only)
changes = db.changes(
selector={'type': 'order', 'status': 'pending'},
include_docs=True,
feed='continuous'
)
for change in changes:
if change:
doc = change.get('doc', {})
print(f"Pending order updated: {doc.get('order_id', 'N/A')}")
# Filter using design document filter function
changes = db.changes(
filter='filters/by_user_type',
user_type='admin', # Parameter for filter function
include_docs=True
)
for change in changes:
print(f"Admin user changed: {change['id']}")from cloudant import cloudant
import json
import os
CHECKPOINT_FILE = 'change_checkpoint.json'
def load_checkpoint():
"""Load last processed sequence from file."""
if os.path.exists(CHECKPOINT_FILE):
with open(CHECKPOINT_FILE, 'r') as f:
data = json.load(f)
return data.get('last_seq', '0')
return '0'
def save_checkpoint(seq):
"""Save current sequence to file."""
with open(CHECKPOINT_FILE, 'w') as f:
json.dump({'last_seq': seq}, f)
with cloudant('user', 'pass', account='myaccount') as client:
db = client['my_database']
# Start from last checkpoint
last_seq = load_checkpoint()
print(f"Resuming from sequence: {last_seq}")
changes = db.changes(
since=last_seq,
include_docs=True,
limit=100 # Process in batches
)
processed_count = 0
for change in changes:
doc_id = change['id']
current_seq = change['seq']
# Process the change
if change.get('deleted'):
print(f"Processing deletion: {doc_id}")
else:
doc = change.get('doc', {})
print(f"Processing update: {doc_id} - {doc.get('type', 'unknown')}")
processed_count += 1
# Save checkpoint periodically
if processed_count % 10 == 0:
save_checkpoint(current_seq)
print(f"Checkpoint saved at sequence: {current_seq}")
# Save final checkpoint
if changes.last_seq:
save_checkpoint(changes.last_seq)
print(f"Final checkpoint: {changes.last_seq}")
print(f"Processed {processed_count} changes")from cloudant import cloudant
with cloudant('user', 'pass', account='myaccount') as client:
# Monitor database creation/deletion events
db_updates = client.db_updates(feed='continuous')
try:
for update in db_updates:
if update: # Skip heartbeat messages
db_name = update['db_name']
update_type = update['type']
if update_type == 'created':
print(f"Database created: {db_name}")
elif update_type == 'deleted':
print(f"Database deleted: {db_name}")
elif update_type == 'updated':
print(f"Database updated: {db_name}")
except KeyboardInterrupt:
print("Stopping database updates monitoring")
db_updates.stop()from cloudant import cloudant
from cloudant.error import CloudantFeedException
import time
def robust_change_monitor(client, db_name, max_retries=5):
"""Monitor changes with automatic retry on errors."""
retry_count = 0
last_seq = '0'
while retry_count < max_retries:
try:
db = client[db_name]
print(f"Starting change feed from sequence: {last_seq}")
changes = db.changes(
since=last_seq,
feed='continuous',
include_docs=True,
heartbeat=30000,
timeout=60000
)
for change in changes:
if change:
doc_id = change['id']
last_seq = change['seq']
# Process change
if change.get('deleted'):
print(f"Document deleted: {doc_id}")
else:
doc = change.get('doc', {})
print(f"Document updated: {doc_id}")
# If we reach here, feed ended normally
print("Change feed ended normally")
break
except CloudantFeedException as e:
retry_count += 1
print(f"Feed error (attempt {retry_count}/{max_retries}): {e}")
if retry_count < max_retries:
# Exponential backoff
wait_time = 2 ** retry_count
print(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
print("Max retries reached, giving up")
raise
except KeyboardInterrupt:
print("Change monitoring interrupted by user")
break
except Exception as e:
print(f"Unexpected error: {e}")
break
# Usage
with cloudant('user', 'pass', account='myaccount') as client:
robust_change_monitor(client, 'my_database')from cloudant import cloudant
import threading
import time
def monitor_database_changes(client, db_name, callback):
"""Monitor changes for a specific database."""
try:
db = client[db_name]
changes = db.changes(
feed='continuous',
include_docs=True,
heartbeat=30000
)
for change in changes:
if change:
callback(db_name, change)
except Exception as e:
print(f"Error monitoring {db_name}: {e}")
def change_handler(db_name, change):
"""Handle change events from any database."""
doc_id = change['id']
seq = change['seq']
if change.get('deleted'):
print(f"[{db_name}] Document deleted: {doc_id}")
else:
doc = change.get('doc', {})
doc_type = doc.get('type', 'unknown')
print(f"[{db_name}] {doc_type} updated: {doc_id}")
with cloudant('user', 'pass', account='myaccount') as client:
# Monitor multiple databases
databases = ['users', 'orders', 'inventory', 'logs']
threads = []
for db_name in databases:
thread = threading.Thread(
target=monitor_database_changes,
args=(client, db_name, change_handler)
)
thread.daemon = True
thread.start()
threads.append(thread)
print(f"Started monitoring {db_name}")
try:
# Keep main thread alive
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping all change monitors...")from cloudant import cloudant
import time
with cloudant('user', 'pass', account='myaccount') as client:
db = client['my_database']
# High-performance change processing
changes = db.changes(
feed='continuous',
include_docs=False, # Don't fetch full documents for performance
heartbeat=5000, # More frequent heartbeats
timeout=30000, # Shorter timeout for faster reconnection
limit=1000, # Process in larger batches
since='now' # Start from current time
)
batch = []
batch_size = 50
last_process_time = time.time()
for change in changes:
if change:
batch.append(change)
# Process batch when full or after timeout
if (len(batch) >= batch_size or
time.time() - last_process_time > 5):
# Process batch efficiently
print(f"Processing batch of {len(batch)} changes")
for change in batch:
doc_id = change['id']
if change.get('deleted'):
# Handle deletion
print(f"Deleted: {doc_id}")
else:
# Fetch document only if needed
doc = db.get(doc_id, remote=True)
if doc and doc.exists():
# Process document
print(f"Updated: {doc_id}")
# Clear batch
batch = []
last_process_time = time.time()Change feed operations can raise CloudantFeedException:
from cloudant import cloudant
from cloudant.error import CloudantFeedException
with cloudant('user', 'pass', account='myaccount') as client:
db = client['my_database']
try:
# Invalid feed parameters
changes = db.changes(feed='invalid_feed_type')
for change in changes:
print(change)
except CloudantFeedException as e:
print(f"Feed configuration error: {e}")
try:
# Network timeout during feed consumption
changes = db.changes(
feed='continuous',
timeout=1000 # Very short timeout
)
for change in changes:
print(change)
except CloudantFeedException as e:
print(f"Feed network error: {e}")
try:
# Invalid filter function
changes = db.changes(filter='non_existent/filter')
for change in changes:
print(change)
except CloudantFeedException as e:
print(f"Filter error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-cloudant