CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-cloudant

IBM Cloudant Python client library providing comprehensive interface for Cloudant and CouchDB databases

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

change-feeds.mddocs/

Change Feeds and Monitoring

Monitor database changes in real-time with comprehensive support for continuous feeds, filtering, infinite monitoring, and change event processing.

Capabilities

Feed Classes

Iterator classes for consuming change feeds and database updates.

class Feed:
    """
    Iterator for consuming continuous and non-continuous change feeds.
    """
    
    def __init__(self, source, raw_data=False, **options):
        """
        Initialize feed iterator.
        
        Parameters:
        - source (callable): Function that returns feed data
        - raw_data (bool): Return raw JSON response data
        - heartbeat (int): Heartbeat interval in milliseconds
        - timeout (int): Feed timeout in milliseconds
        - since (str | int): Start from specific sequence
        - limit (int): Maximum number of changes
        - descending (bool): Process changes in reverse order
        - include_docs (bool): Include document content in changes
        - attachments (bool): Include attachment info
        - att_encoding_info (bool): Include attachment encoding info
        - conflicts (bool): Include conflict information
        - filter (str): Filter function name
        - **options: Additional feed parameters
        """
    
    def __iter__(self):
        """
        Iterator protocol implementation.
        
        Yields:
        dict: Change events from the feed
        """
    
    def stop(self):
        """
        Stop feed iteration and close connection.
        
        Returns:
        None
        """
    
    @property
    def last_seq(self):
        """
        Last processed sequence identifier.
        
        Returns:
        str | int: Sequence identifier for resuming feed
        """

class InfiniteFeed(Feed):
    """
    Perpetually refreshed feed iterator that automatically reconnects.
    """

Database Change Feed Methods

Database methods for accessing change feeds.

class CouchDatabase(dict):
    """Database change feed methods."""
    
    def changes(self, raw_data=False, **kwargs):
        """
        Get changes feed for database.
        
        Parameters:
        - raw_data (bool): Return raw response data
        - feed (str): Feed type ('normal', 'continuous', 'longpoll')  
        - style (str): Change format ('main_only', 'all_docs')
        - heartbeat (int): Heartbeat interval (continuous feeds)
        - timeout (int): Request timeout in milliseconds
        - since (str | int): Start sequence (0, 'now', or sequence ID)
        - limit (int): Maximum number of changes
        - descending (bool): Reverse chronological order
        - include_docs (bool): Include full document content
        - attachments (bool): Include attachment stubs
        - att_encoding_info (bool): Include attachment encoding info
        - conflicts (bool): Include conflict revisions
        - deleted_conflicts (bool): Include deleted conflict revisions
        - filter (str): Filter function name from design document
        - doc_ids (list[str]): Only changes for specific document IDs
        - selector (dict): Filter changes by document selector (Cloudant only)
        - **kwargs: Additional filter parameters
        
        Returns:
        Feed: Change feed iterator
        
        Raises:
        CloudantFeedException: Feed creation failed
        """
    
    def infinite_changes(self, **kwargs):
        """
        Get infinite changes feed that automatically reconnects.
        
        Parameters:
        - **kwargs: Same options as changes() method
        
        Returns:
        InfiniteFeed: Perpetual change feed iterator
        """

Client Database Update Feeds

Monitor database creation and deletion events.

class CouchDB(dict):
    """Database update monitoring."""
    
    def db_updates(self, raw_data=False, **kwargs):
        """
        Monitor database creation and deletion events.
        
        Parameters:
        - raw_data (bool): Return raw response data
        - feed (str): Feed type ('normal', 'continuous', 'longpoll')
        - timeout (int): Request timeout
        - heartbeat (int): Heartbeat interval (continuous feeds)
        - since (str): Start from specific sequence
        
        Returns:
        Feed: Database updates feed iterator
        """

class Cloudant(CouchDB):
    """Cloudant database update monitoring."""
    
    def infinite_db_updates(self, **kwargs):
        """
        Get infinite database updates feed.
        
        Parameters:
        - **kwargs: Same options as db_updates()
        
        Returns:
        InfiniteFeed: Perpetual database updates iterator
        """

Usage Examples

Basic Change Monitoring

from cloudant import cloudant

with cloudant('user', 'pass', account='myaccount') as client:
    db = client['my_database']
    
    # Get recent changes (non-continuous)
    changes = db.changes(limit=10, include_docs=True)
    
    for change in changes:
        doc_id = change['id']
        seq = change['seq']
        
        if 'doc' in change:
            doc = change['doc']
            print(f"Changed document {doc_id}: {doc.get('name', 'N/A')}")
        else:
            print(f"Document {doc_id} changed (deleted: {change.get('deleted', False)})")
    
    print(f"Last sequence: {changes.last_seq}")

Continuous Change Monitoring

from cloudant import cloudant
import time
import threading

with cloudant('user', 'pass', account='myaccount') as client:
    db = client['my_database']
    
    # Start continuous feed in background thread
    def monitor_changes():
        try:
            changes = db.changes(
                feed='continuous',
                include_docs=True,
                heartbeat=30000,  # 30 second heartbeat
                timeout=60000     # 60 second timeout
            )
            
            for change in changes:
                if change:  # Skip heartbeat messages
                    doc_id = change['id']
                    if change.get('deleted'):
                        print(f"Document deleted: {doc_id}")
                    else:
                        doc = change.get('doc', {})
                        print(f"Document updated: {doc_id} - {doc.get('name', 'N/A')}")
                        
        except Exception as e:
            print(f"Change feed error: {e}")
    
    # Start monitoring in background
    monitor_thread = threading.Thread(target=monitor_changes)
    monitor_thread.daemon = True
    monitor_thread.start()
    
    # Do other work while monitoring changes
    print("Monitoring changes in background...")
    time.sleep(60)  # Monitor for 1 minute
    
    print("Stopping change monitoring")

Infinite Change Feed

from cloudant import cloudant

with cloudant('user', 'pass', account='myaccount') as client:
    db = client['my_database']
    
    # Infinite feed automatically reconnects on connection loss
    infinite_changes = db.infinite_changes(
        include_docs=True,
        heartbeat=15000,  # 15 second heartbeat
        since='now'       # Start from current time
    )
    
    try:
        for change in infinite_changes:
            if change:  # Skip heartbeat messages
                doc_id = change['id'] 
                seq = change['seq']
                
                if change.get('deleted'):
                    print(f"[{seq}] Document deleted: {doc_id}")
                else:
                    doc = change.get('doc', {})
                    doc_type = doc.get('type', 'unknown')
                    print(f"[{seq}] {doc_type} document updated: {doc_id}")
                    
    except KeyboardInterrupt:
        print("Stopping infinite change feed...")
        infinite_changes.stop()

Filtered Change Feeds

from cloudant import cloudant

with cloudant('user', 'pass', account='myaccount') as client:
    db = client['my_database']
    
    # Filter by document IDs
    doc_ids = ['user123', 'user456', 'user789']
    changes = db.changes(
        doc_ids=doc_ids,
        include_docs=True,
        since='0'  # From beginning
    )
    
    for change in changes:
        print(f"User document changed: {change['id']}")
    
    # Filter using selector (Cloudant only)
    changes = db.changes(
        selector={'type': 'order', 'status': 'pending'},
        include_docs=True,
        feed='continuous'
    )
    
    for change in changes:
        if change:
            doc = change.get('doc', {})
            print(f"Pending order updated: {doc.get('order_id', 'N/A')}")
    
    # Filter using design document filter function
    changes = db.changes(
        filter='filters/by_user_type',
        user_type='admin',  # Parameter for filter function
        include_docs=True
    )
    
    for change in changes:
        print(f"Admin user changed: {change['id']}")

Resumable Change Processing

from cloudant import cloudant
import json
import os

CHECKPOINT_FILE = 'change_checkpoint.json'

def load_checkpoint():
    """Load last processed sequence from file."""
    if os.path.exists(CHECKPOINT_FILE):
        with open(CHECKPOINT_FILE, 'r') as f:
            data = json.load(f)
            return data.get('last_seq', '0')
    return '0'

def save_checkpoint(seq):
    """Save current sequence to file."""
    with open(CHECKPOINT_FILE, 'w') as f:
        json.dump({'last_seq': seq}, f)

with cloudant('user', 'pass', account='myaccount') as client:
    db = client['my_database']
    
    # Start from last checkpoint
    last_seq = load_checkpoint()
    print(f"Resuming from sequence: {last_seq}")
    
    changes = db.changes(
        since=last_seq,
        include_docs=True,
        limit=100  # Process in batches
    )
    
    processed_count = 0
    
    for change in changes:
        doc_id = change['id']
        current_seq = change['seq']
        
        # Process the change
        if change.get('deleted'):
            print(f"Processing deletion: {doc_id}")
        else:
            doc = change.get('doc', {})
            print(f"Processing update: {doc_id} - {doc.get('type', 'unknown')}")
        
        processed_count += 1
        
        # Save checkpoint periodically
        if processed_count % 10 == 0:
            save_checkpoint(current_seq)
            print(f"Checkpoint saved at sequence: {current_seq}")
    
    # Save final checkpoint
    if changes.last_seq:
        save_checkpoint(changes.last_seq)
        print(f"Final checkpoint: {changes.last_seq}")
    
    print(f"Processed {processed_count} changes")

Database Updates Monitoring

from cloudant import cloudant

with cloudant('user', 'pass', account='myaccount') as client:
    # Monitor database creation/deletion events
    db_updates = client.db_updates(feed='continuous')
    
    try:
        for update in db_updates:
            if update:  # Skip heartbeat messages
                db_name = update['db_name']
                update_type = update['type']
                
                if update_type == 'created':
                    print(f"Database created: {db_name}")
                elif update_type == 'deleted':
                    print(f"Database deleted: {db_name}")
                elif update_type == 'updated':
                    print(f"Database updated: {db_name}")
                    
    except KeyboardInterrupt:
        print("Stopping database updates monitoring")
        db_updates.stop()

Change Feed with Error Handling

from cloudant import cloudant
from cloudant.error import CloudantFeedException
import time

def robust_change_monitor(client, db_name, max_retries=5):
    """Monitor changes with automatic retry on errors."""
    
    retry_count = 0
    last_seq = '0'
    
    while retry_count < max_retries:
        try:
            db = client[db_name]
            
            print(f"Starting change feed from sequence: {last_seq}")
            changes = db.changes(
                since=last_seq,
                feed='continuous',
                include_docs=True,
                heartbeat=30000,
                timeout=60000
            )
            
            for change in changes:
                if change:
                    doc_id = change['id']
                    last_seq = change['seq']
                    
                    # Process change
                    if change.get('deleted'):
                        print(f"Document deleted: {doc_id}")
                    else:
                        doc = change.get('doc', {})
                        print(f"Document updated: {doc_id}")
                    
            # If we reach here, feed ended normally
            print("Change feed ended normally")
            break
            
        except CloudantFeedException as e:
            retry_count += 1
            print(f"Feed error (attempt {retry_count}/{max_retries}): {e}")
            
            if retry_count < max_retries:
                # Exponential backoff
                wait_time = 2 ** retry_count
                print(f"Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
            else:
                print("Max retries reached, giving up")
                raise
                
        except KeyboardInterrupt:
            print("Change monitoring interrupted by user")
            break
        except Exception as e:
            print(f"Unexpected error: {e}")
            break

# Usage
with cloudant('user', 'pass', account='myaccount') as client:
    robust_change_monitor(client, 'my_database')

Multi-Database Change Monitoring

from cloudant import cloudant
import threading
import time

def monitor_database_changes(client, db_name, callback):
    """Monitor changes for a specific database."""
    try:
        db = client[db_name]
        changes = db.changes(
            feed='continuous',
            include_docs=True,
            heartbeat=30000
        )
        
        for change in changes:
            if change:
                callback(db_name, change)
                
    except Exception as e:
        print(f"Error monitoring {db_name}: {e}")

def change_handler(db_name, change):
    """Handle change events from any database."""
    doc_id = change['id']
    seq = change['seq']
    
    if change.get('deleted'):
        print(f"[{db_name}] Document deleted: {doc_id}")
    else:
        doc = change.get('doc', {})
        doc_type = doc.get('type', 'unknown')
        print(f"[{db_name}] {doc_type} updated: {doc_id}")

with cloudant('user', 'pass', account='myaccount') as client:
    # Monitor multiple databases
    databases = ['users', 'orders', 'inventory', 'logs']
    threads = []
    
    for db_name in databases:
        thread = threading.Thread(
            target=monitor_database_changes,
            args=(client, db_name, change_handler)
        )
        thread.daemon = True
        thread.start()
        threads.append(thread)
        print(f"Started monitoring {db_name}")
    
    try:
        # Keep main thread alive
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("Stopping all change monitors...")

Change Feed Performance Optimization

from cloudant import cloudant
import time

with cloudant('user', 'pass', account='myaccount') as client:
    db = client['my_database']
    
    # High-performance change processing
    changes = db.changes(
        feed='continuous',
        include_docs=False,  # Don't fetch full documents for performance
        heartbeat=5000,      # More frequent heartbeats
        timeout=30000,       # Shorter timeout for faster reconnection
        limit=1000,          # Process in larger batches
        since='now'          # Start from current time
    )
    
    batch = []
    batch_size = 50
    last_process_time = time.time()
    
    for change in changes:
        if change:
            batch.append(change)
            
            # Process batch when full or after timeout
            if (len(batch) >= batch_size or 
                time.time() - last_process_time > 5):
                
                # Process batch efficiently
                print(f"Processing batch of {len(batch)} changes")
                
                for change in batch:
                    doc_id = change['id']
                    
                    if change.get('deleted'):
                        # Handle deletion
                        print(f"Deleted: {doc_id}")
                    else:
                        # Fetch document only if needed
                        doc = db.get(doc_id, remote=True)
                        if doc and doc.exists():
                            # Process document
                            print(f"Updated: {doc_id}")
                
                # Clear batch
                batch = []
                last_process_time = time.time()

Error Handling

Change feed operations can raise CloudantFeedException:

from cloudant import cloudant
from cloudant.error import CloudantFeedException

with cloudant('user', 'pass', account='myaccount') as client:
    db = client['my_database']
    
    try:
        # Invalid feed parameters
        changes = db.changes(feed='invalid_feed_type')
        for change in changes:
            print(change)
    except CloudantFeedException as e:
        print(f"Feed configuration error: {e}")
    
    try:
        # Network timeout during feed consumption
        changes = db.changes(
            feed='continuous',
            timeout=1000  # Very short timeout
        )
        
        for change in changes:
            print(change)
            
    except CloudantFeedException as e:
        print(f"Feed network error: {e}")
        
    try:
        # Invalid filter function
        changes = db.changes(filter='non_existent/filter')
        for change in changes:
            print(change)
    except CloudantFeedException as e:
        print(f"Filter error: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-cloudant

docs

authentication.md

change-feeds.md

database-management.md

document-operations.md

error-handling.md

http-adapters.md

index.md

query-indexing.md

replication.md

scheduler-monitoring.md

security-document.md

views-design-documents.md

tile.json