CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-cloudant

IBM Cloudant Python client library providing comprehensive interface for Cloudant and CouchDB databases

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

scheduler-monitoring.mddocs/

Scheduler Monitoring

Monitor and query replication jobs and documents using the CouchDB/Cloudant scheduler API. The scheduler provides visibility into active, completed, and failed replications across the database instance.

Capabilities

Scheduler Operations

Query replication documents and jobs for monitoring and troubleshooting replication processes.

class Scheduler:
    """
    API for retrieving scheduler jobs and documents.
    """
    
    def __init__(self, client):
        """
        Initialize scheduler instance.
        
        Parameters:
        - client: CouchDB or Cloudant client instance
        """
    
    def list_docs(self, limit=None, skip=None):
        """
        Lists replication documents with status and metadata.
        
        Includes information about all replication documents, even those
        in completed and failed states. Returns document ID, database,
        replication ID, source and target information.
        
        Parameters:
        - limit (int): Maximum number of results to return
        - skip (int): Number of results to skip from beginning
        
        Returns:
        dict: Response containing replication documents list
        
        Response format:
        {
            "total_rows": int,
            "offset": int,
            "docs": [
                {
                    "database": str,
                    "doc_id": str,  
                    "id": str,
                    "source": str,
                    "target": str,
                    "state": str,
                    "error_count": int,
                    "info": dict,
                    "last_updated": str,
                    "start_time": str
                }
            ]
        }
        """
    
    def get_doc(self, doc_id):
        """
        Get replication document state for specific replication.
        
        Parameters:
        - doc_id (str): Replication document ID
        
        Returns:
        dict: Detailed replication document state information
        
        Response format:
        {
            "database": str,
            "doc_id": str,
            "id": str,
            "source": str,
            "target": str,
            "state": str,
            "error_count": int,
            "info": dict,
            "last_updated": str,
            "start_time": str
        }
        
        Raises:
        CloudantException: If replication document not found
        """
    
    def list_jobs(self, limit=None, skip=None):
        """
        Lists active replication jobs.
        
        Includes replications created via /_replicate endpoint and those
        created from replication documents. Does not include completed
        or failed replications. Provides source/target information,
        replication ID, recent event history, and job status.
        
        Parameters:
        - limit (int): Maximum number of results to return
        - skip (int): Number of results to skip from beginning
        
        Returns:
        dict: Response containing active replication jobs list
        
        Response format:
        {
            "total_rows": int,
            "offset": int,
            "jobs": [
                {
                    "database": str,
                    "doc_id": str,
                    "id": str,
                    "source": str,
                    "target": str,
                    "pid": str,
                    "continuous": bool,
                    "checkpointed_source_seq": str,
                    "source_seq": str,
                    "revisions_checked": int,
                    "missing_revisions_found": int,
                    "docs_read": int,
                    "docs_written": int,
                    "doc_write_failures": int,
                    "bulk_get_docs": int,
                    "bulk_get_attempts": int,
                    "start_time": str,
                    "history": [
                        {
                            "type": str,
                            "timestamp": str,
                            "details": dict
                        }
                    ]
                }
            ]
        }
        """

Usage Examples

Basic Scheduler Monitoring

from cloudant import cloudant
from cloudant.scheduler import Scheduler

with cloudant(username, password, account=account_name) as client:
    scheduler = Scheduler(client)
    
    # List all replication documents
    docs = scheduler.list_docs()
    print(f"Total replication documents: {docs['total_rows']}")
    
    for doc in docs['docs']:
        print(f"Replication {doc['doc_id']}: {doc['state']}")
        print(f"  Source: {doc['source']}")
        print(f"  Target: {doc['target']}")
        print(f"  Last updated: {doc['last_updated']}")
        
        if doc['error_count'] > 0:
            print(f"  Errors: {doc['error_count']}")

Monitor Active Replication Jobs

from cloudant import cloudant
from cloudant.scheduler import Scheduler

with cloudant(username, password, account=account_name) as client:
    scheduler = Scheduler(client)
    
    # List currently running replication jobs
    jobs = scheduler.list_jobs()
    print(f"Active replication jobs: {jobs['total_rows']}")
    
    for job in jobs['jobs']:
        print(f"Job {job['id']}:")
        print(f"  Source: {job['source']}")
        print(f"  Target: {job['target']}")
        print(f"  Continuous: {job['continuous']}")
        print(f"  Progress: {job['docs_written']}/{job['docs_read']} docs")
        print(f"  Started: {job['start_time']}")
        
        # Show recent history
        if 'history' in job:
            recent_events = job['history'][-3:]  # Last 3 events
            for event in recent_events:
                print(f"  {event['timestamp']}: {event['type']}")

Detailed Replication Status

from cloudant import cloudant
from cloudant.scheduler import Scheduler

with cloudant(username, password, account=account_name) as client:
    scheduler = Scheduler(client)
    
    # Get specific replication document details
    repl_doc_id = "my_replication_doc"
    
    try:
        doc_status = scheduler.get_doc(repl_doc_id)
        
        print(f"Replication Status for {repl_doc_id}:")
        print(f"  State: {doc_status['state']}")
        print(f"  Source: {doc_status['source']}")
        print(f"  Target: {doc_status['target']}")
        print(f"  Error count: {doc_status['error_count']}")
        print(f"  Last updated: {doc_status['last_updated']}")
        
        if 'info' in doc_status and doc_status['info']:
            info = doc_status['info']
            if 'error' in info:
                print(f"  Last error: {info['error']}")
            if 'progress' in info:
                print(f"  Progress: {info['progress']}")
                
    except Exception as e:
        print(f"Failed to get replication status: {e}")

Pagination Through Results

from cloudant import cloudant
from cloudant.scheduler import Scheduler

with cloudant(username, password, account=account_name) as client:
    scheduler = Scheduler(client)
    
    # Paginate through all replication documents
    limit = 10
    skip = 0
    
    while True:
        docs = scheduler.list_docs(limit=limit, skip=skip)
        
        if not docs['docs']:
            break
            
        print(f"Page starting at {skip}:")
        for doc in docs['docs']:
            print(f"  {doc['doc_id']}: {doc['state']}")
        
        skip += limit
        
        # Stop if we've seen all documents
        if skip >= docs['total_rows']:
            break

Integration with Replicator Class

from cloudant import cloudant
from cloudant.replicator import Replicator
from cloudant.scheduler import Scheduler

with cloudant(username, password, account=account_name) as client:
    replicator = Replicator(client)
    scheduler = Scheduler(client)
    
    # Start a replication
    repl_doc = replicator.create_replication(
        source_db='source_database',
        target_db='target_database',
        repl_id='my_monitored_replication',
        continuous=True
    )
    
    print(f"Started replication: {repl_doc['id']}")
    
    # Monitor the replication
    import time
    
    for _ in range(10):  # Check 10 times
        time.sleep(5)
        
        # Check job status
        jobs = scheduler.list_jobs()
        active_job = None
        
        for job in jobs['jobs']:
            if job['doc_id'] == 'my_monitored_replication':
                active_job = job
                break
        
        if active_job:
            print(f"Replication progress: {active_job['docs_written']} docs written")
        else:
            # Check if it completed or failed
            doc_status = scheduler.get_doc('my_monitored_replication')
            print(f"Replication state: {doc_status['state']}")
            
            if doc_status['state'] in ['completed', 'failed']:
                break

Error Analysis

from cloudant import cloudant
from cloudant.scheduler import Scheduler

with cloudant(username, password, account=account_name) as client:
    scheduler = Scheduler(client)
    
    # Find failed replications
    docs = scheduler.list_docs()
    
    failed_replications = [
        doc for doc in docs['docs'] 
        if doc['state'] == 'failed' or doc['error_count'] > 0
    ]
    
    if failed_replications:
        print(f"Found {len(failed_replications)} problematic replications:")
        
        for doc in failed_replications:
            print(f"\nReplication {doc['doc_id']}:")
            print(f"  State: {doc['state']}")
            print(f"  Errors: {doc['error_count']}")
            
            # Get detailed error information
            detailed = scheduler.get_doc(doc['doc_id'])
            if 'info' in detailed and 'error' in detailed['info']:
                print(f"  Error details: {detailed['info']['error']}")
    else:
        print("No failed replications found")

Types

# Replication document states
REPLICATION_STATES = ["initializing", "running", "completed", "pending", "crashing", "error", "failed"]

# Replication document structure
ReplicationDoc = dict[str, Any]

# Replication job structure  
ReplicationJob = dict[str, Any]

# Scheduler responses
DocsResponse = dict[str, Any]  # Contains total_rows, offset, docs
JobsResponse = dict[str, Any]  # Contains total_rows, offset, jobs

# History event structure
HistoryEvent = dict[str, Any]  # Contains type, timestamp, details

Import Statements

from cloudant.scheduler import Scheduler

Install with Tessl CLI

npx tessl i tessl/pypi-cloudant

docs

authentication.md

change-feeds.md

database-management.md

document-operations.md

error-handling.md

http-adapters.md

index.md

query-indexing.md

replication.md

scheduler-monitoring.md

security-document.md

views-design-documents.md

tile.json