IBM Cloudant Python client library providing comprehensive interface for Cloudant and CouchDB databases
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Monitor and query replication jobs and documents using the CouchDB/Cloudant scheduler API. The scheduler provides visibility into active, completed, and failed replications across the database instance.
Query replication documents and jobs for monitoring and troubleshooting replication processes.
class Scheduler:
"""
API for retrieving scheduler jobs and documents.
"""
def __init__(self, client):
"""
Initialize scheduler instance.
Parameters:
- client: CouchDB or Cloudant client instance
"""
def list_docs(self, limit=None, skip=None):
"""
Lists replication documents with status and metadata.
Includes information about all replication documents, even those
in completed and failed states. Returns document ID, database,
replication ID, source and target information.
Parameters:
- limit (int): Maximum number of results to return
- skip (int): Number of results to skip from beginning
Returns:
dict: Response containing replication documents list
Response format:
{
"total_rows": int,
"offset": int,
"docs": [
{
"database": str,
"doc_id": str,
"id": str,
"source": str,
"target": str,
"state": str,
"error_count": int,
"info": dict,
"last_updated": str,
"start_time": str
}
]
}
"""
def get_doc(self, doc_id):
"""
Get replication document state for specific replication.
Parameters:
- doc_id (str): Replication document ID
Returns:
dict: Detailed replication document state information
Response format:
{
"database": str,
"doc_id": str,
"id": str,
"source": str,
"target": str,
"state": str,
"error_count": int,
"info": dict,
"last_updated": str,
"start_time": str
}
Raises:
CloudantException: If replication document not found
"""
def list_jobs(self, limit=None, skip=None):
"""
Lists active replication jobs.
Includes replications created via /_replicate endpoint and those
created from replication documents. Does not include completed
or failed replications. Provides source/target information,
replication ID, recent event history, and job status.
Parameters:
- limit (int): Maximum number of results to return
- skip (int): Number of results to skip from beginning
Returns:
dict: Response containing active replication jobs list
Response format:
{
"total_rows": int,
"offset": int,
"jobs": [
{
"database": str,
"doc_id": str,
"id": str,
"source": str,
"target": str,
"pid": str,
"continuous": bool,
"checkpointed_source_seq": str,
"source_seq": str,
"revisions_checked": int,
"missing_revisions_found": int,
"docs_read": int,
"docs_written": int,
"doc_write_failures": int,
"bulk_get_docs": int,
"bulk_get_attempts": int,
"start_time": str,
"history": [
{
"type": str,
"timestamp": str,
"details": dict
}
]
}
]
}
"""from cloudant import cloudant
from cloudant.scheduler import Scheduler
with cloudant(username, password, account=account_name) as client:
scheduler = Scheduler(client)
# List all replication documents
docs = scheduler.list_docs()
print(f"Total replication documents: {docs['total_rows']}")
for doc in docs['docs']:
print(f"Replication {doc['doc_id']}: {doc['state']}")
print(f" Source: {doc['source']}")
print(f" Target: {doc['target']}")
print(f" Last updated: {doc['last_updated']}")
if doc['error_count'] > 0:
print(f" Errors: {doc['error_count']}")from cloudant import cloudant
from cloudant.scheduler import Scheduler
with cloudant(username, password, account=account_name) as client:
scheduler = Scheduler(client)
# List currently running replication jobs
jobs = scheduler.list_jobs()
print(f"Active replication jobs: {jobs['total_rows']}")
for job in jobs['jobs']:
print(f"Job {job['id']}:")
print(f" Source: {job['source']}")
print(f" Target: {job['target']}")
print(f" Continuous: {job['continuous']}")
print(f" Progress: {job['docs_written']}/{job['docs_read']} docs")
print(f" Started: {job['start_time']}")
# Show recent history
if 'history' in job:
recent_events = job['history'][-3:] # Last 3 events
for event in recent_events:
print(f" {event['timestamp']}: {event['type']}")from cloudant import cloudant
from cloudant.scheduler import Scheduler
with cloudant(username, password, account=account_name) as client:
scheduler = Scheduler(client)
# Get specific replication document details
repl_doc_id = "my_replication_doc"
try:
doc_status = scheduler.get_doc(repl_doc_id)
print(f"Replication Status for {repl_doc_id}:")
print(f" State: {doc_status['state']}")
print(f" Source: {doc_status['source']}")
print(f" Target: {doc_status['target']}")
print(f" Error count: {doc_status['error_count']}")
print(f" Last updated: {doc_status['last_updated']}")
if 'info' in doc_status and doc_status['info']:
info = doc_status['info']
if 'error' in info:
print(f" Last error: {info['error']}")
if 'progress' in info:
print(f" Progress: {info['progress']}")
except Exception as e:
print(f"Failed to get replication status: {e}")from cloudant import cloudant
from cloudant.scheduler import Scheduler
with cloudant(username, password, account=account_name) as client:
scheduler = Scheduler(client)
# Paginate through all replication documents
limit = 10
skip = 0
while True:
docs = scheduler.list_docs(limit=limit, skip=skip)
if not docs['docs']:
break
print(f"Page starting at {skip}:")
for doc in docs['docs']:
print(f" {doc['doc_id']}: {doc['state']}")
skip += limit
# Stop if we've seen all documents
if skip >= docs['total_rows']:
breakfrom cloudant import cloudant
from cloudant.replicator import Replicator
from cloudant.scheduler import Scheduler
with cloudant(username, password, account=account_name) as client:
replicator = Replicator(client)
scheduler = Scheduler(client)
# Start a replication
repl_doc = replicator.create_replication(
source_db='source_database',
target_db='target_database',
repl_id='my_monitored_replication',
continuous=True
)
print(f"Started replication: {repl_doc['id']}")
# Monitor the replication
import time
for _ in range(10): # Check 10 times
time.sleep(5)
# Check job status
jobs = scheduler.list_jobs()
active_job = None
for job in jobs['jobs']:
if job['doc_id'] == 'my_monitored_replication':
active_job = job
break
if active_job:
print(f"Replication progress: {active_job['docs_written']} docs written")
else:
# Check if it completed or failed
doc_status = scheduler.get_doc('my_monitored_replication')
print(f"Replication state: {doc_status['state']}")
if doc_status['state'] in ['completed', 'failed']:
breakfrom cloudant import cloudant
from cloudant.scheduler import Scheduler
with cloudant(username, password, account=account_name) as client:
scheduler = Scheduler(client)
# Find failed replications
docs = scheduler.list_docs()
failed_replications = [
doc for doc in docs['docs']
if doc['state'] == 'failed' or doc['error_count'] > 0
]
if failed_replications:
print(f"Found {len(failed_replications)} problematic replications:")
for doc in failed_replications:
print(f"\nReplication {doc['doc_id']}:")
print(f" State: {doc['state']}")
print(f" Errors: {doc['error_count']}")
# Get detailed error information
detailed = scheduler.get_doc(doc['doc_id'])
if 'info' in detailed and 'error' in detailed['info']:
print(f" Error details: {detailed['info']['error']}")
else:
print("No failed replications found")# Replication document states
REPLICATION_STATES = ["initializing", "running", "completed", "pending", "crashing", "error", "failed"]
# Replication document structure
ReplicationDoc = dict[str, Any]
# Replication job structure
ReplicationJob = dict[str, Any]
# Scheduler responses
DocsResponse = dict[str, Any] # Contains total_rows, offset, docs
JobsResponse = dict[str, Any] # Contains total_rows, offset, jobs
# History event structure
HistoryEvent = dict[str, Any] # Contains type, timestamp, detailsfrom cloudant.scheduler import SchedulerInstall with Tessl CLI
npx tessl i tessl/pypi-cloudant