CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-cloudant

IBM Cloudant Python client library providing comprehensive interface for Cloudant and CouchDB databases

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

replication.mddocs/

Replication

Set up and manage database replication between Cloudant instances with comprehensive progress monitoring, state management, and replication configuration options.

Capabilities

Replicator Class

Manage replication documents and monitor replication progress.

class Replicator:
    """
    Database replication management for Cloudant and CouchDB.
    """
    
    def __init__(self, client):
        """
        Initialize replicator with client connection.
        
        Parameters:
        - client (Cloudant | CouchDB): Authenticated client instance
        """
    
    def create_replication(self, source_db=None, target_db=None, repl_id=None, **kwargs):
        """
        Create replication document to replicate between databases.
        
        Parameters:
        - source_db (str | dict): Source database name or connection info
        - target_db (str | dict): Target database name or connection info  
        - repl_id (str): Custom replication document ID
        - continuous (bool): Continuous replication (default: False)
        - create_target (bool): Create target database if missing
        - doc_ids (list[str]): Replicate only specific documents
        - filter (str): Filter function name from design document
        - query_params (dict): Parameters for filter function
        - selector (dict): Replicate documents matching selector
        - since_seq (str | int): Start replication from sequence
        - checkpoint_interval (int): Checkpoint frequency (milliseconds)
        - batch_size (int): Number of documents per batch
        - batch_timeout (int): Batch timeout (milliseconds)
        - connection_timeout (int): Connection timeout (milliseconds)
        - retries_per_request (int): Retry attempts per request
        - use_checkpoints (bool): Use replication checkpoints
        - source_proxy (str): HTTP proxy for source
        - target_proxy (str): HTTP proxy for target
        
        Returns:
        dict: Replication document with replication ID
        
        Raises:
        CloudantReplicatorException: Replication creation failed
        """
    
    def list_replications(self):
        """
        List all replication documents.
        
        Returns:
        list[dict]: Active and completed replication documents
        """
    
    def replication_state(self, repl_id):
        """
        Get current state of replication.
        
        Parameters:
        - repl_id (str): Replication document ID
        
        Returns:
        dict: Replication status including state, progress, and errors
        
        Raises:
        CloudantReplicatorException: Replication not found
        """
    
    def stop_replication(self, repl_id):
        """
        Stop active replication.
        
        Parameters:
        - repl_id (str): Replication document ID to stop
        
        Returns:
        dict: Stop confirmation response
        
        Raises:
        CloudantReplicatorException: Stop operation failed
        """
    
    def follow_replication(self, repl_id):
        """
        Monitor replication progress with iterator.
        
        Parameters:
        - repl_id (str): Replication document ID to monitor
        
        Yields:
        dict: Replication state updates until completion
        
        Raises:
        CloudantReplicatorException: Monitoring failed
        """

Usage Examples

Basic Replication Setup

from cloudant import cloudant
from cloudant.replicator import Replicator

with cloudant('user', 'pass', account='myaccount') as client:
    replicator = Replicator(client)
    
    # One-time replication between databases
    repl_doc = replicator.create_replication(
        source_db='source_database',
        target_db='target_database',
        create_target=True  # Create target if it doesn't exist
    )
    
    repl_id = repl_doc['_id']
    print(f"Started replication: {repl_id}")
    
    # Monitor replication progress
    state = replicator.replication_state(repl_id)
    print(f"Replication state: {state['_replication_state']}")
    print(f"Progress: {state.get('_replication_stats', {})}")

Continuous Replication

from cloudant import cloudant
from cloudant.replicator import Replicator

with cloudant('user', 'pass', account='myaccount') as client:
    replicator = Replicator(client)
    
    # Set up continuous replication
    repl_doc = replicator.create_replication(
        source_db='live_database',
        target_db='backup_database',
        continuous=True,
        create_target=True
    )
    
    repl_id = repl_doc['_id']
    print(f"Continuous replication started: {repl_id}")
    
    # Continuous replication runs until stopped
    # Check status periodically
    import time
    
    for i in range(5):
        time.sleep(10)
        state = replicator.replication_state(repl_id)
        stats = state.get('_replication_stats', {})
        print(f"Docs written: {stats.get('docs_written', 0)}")
        print(f"Docs read: {stats.get('docs_read', 0)}")
    
    # Stop continuous replication
    replicator.stop_replication(repl_id)
    print("Continuous replication stopped")

Cross-Instance Replication

from cloudant import cloudant
from cloudant.replicator import Replicator

with cloudant('user', 'pass', account='source_account') as source_client:
    replicator = Replicator(source_client)
    
    # Replicate to different Cloudant account
    target_config = {
        'url': 'https://target_account.cloudant.com/target_database',
        'auth': {
            'username': 'target_user',
            'password': 'target_password'
        }
    }
    
    repl_doc = replicator.create_replication(
        source_db='source_database',
        target_db=target_config,
        continuous=False,
        create_target=True
    )
    
    print(f"Cross-instance replication: {repl_doc['_id']}")

Filtered Replication

from cloudant import cloudant
from cloudant.replicator import Replicator

with cloudant('user', 'pass', account='myaccount') as client:
    replicator = Replicator(client)
    
    # Replicate only specific document types using selector
    repl_doc = replicator.create_replication(
        source_db='main_database',
        target_db='user_database', 
        selector={
            'type': 'user',
            'status': 'active'
        }
    )
    
    # Replicate specific documents by ID
    doc_ids = ['user123', 'user456', 'user789']
    repl_doc = replicator.create_replication(
        source_db='main_database',
        target_db='selected_users',
        doc_ids=doc_ids
    )
    
    # Replicate using filter function
    repl_doc = replicator.create_replication(
        source_db='main_database',
        target_db='filtered_database',
        filter='filters/by_department',
        query_params={'department': 'engineering'}
    )

Replication with Custom Settings

from cloudant import cloudant
from cloudant.replicator import Replicator

with cloudant('user', 'pass', account='myaccount') as client:
    replicator = Replicator(client)
    
    # High-performance replication configuration
    repl_doc = replicator.create_replication(
        source_db='large_database',
        target_db='replica_database',
        batch_size=1000,           # Process 1000 docs per batch
        batch_timeout=10000,       # 10 second batch timeout
        checkpoint_interval=5000,   # Checkpoint every 5 seconds
        connection_timeout=60000,   # 60 second connection timeout
        retries_per_request=5,     # Retry failed requests 5 times
        use_checkpoints=True       # Enable checkpointing
    )
    
    repl_id = repl_doc['_id']
    print(f"High-performance replication: {repl_id}")

Incremental Replication

from cloudant import cloudant
from cloudant.replicator import Replicator

with cloudant('user', 'pass', account='myaccount') as client:
    replicator = Replicator(client)
    
    # Get current sequence from target database
    target_db = client['target_database']
    if target_db.exists():
        metadata = target_db.metadata()
        last_seq = metadata.get('update_seq', 0)
    else:
        last_seq = 0
    
    # Replicate only changes since last sequence
    repl_doc = replicator.create_replication(
        source_db='source_database',
        target_db='target_database',
        since_seq=last_seq,
        create_target=True
    )
    
    print(f"Incremental replication from sequence {last_seq}")

Monitoring Replication Progress

from cloudant import cloudant
from cloudant.replicator import Replicator
import time

with cloudant('user', 'pass', account='myaccount') as client:
    replicator = Replicator(client)
    
    # Start replication
    repl_doc = replicator.create_replication(
        source_db='large_database',
        target_db='backup_database'
    )
    
    repl_id = repl_doc['_id']
    
    # Follow replication progress
    print("Monitoring replication progress...")
    
    try:
        for state_update in replicator.follow_replication(repl_id):
            repl_state = state_update.get('_replication_state', 'unknown')
            stats = state_update.get('_replication_stats', {})
            
            if repl_state == 'triggered':
                print("Replication started")
            elif repl_state == 'running':
                docs_read = stats.get('docs_read', 0)
                docs_written = stats.get('docs_written', 0)
                progress = f"{docs_written}/{docs_read}"
                print(f"Progress: {progress} documents")
            elif repl_state == 'completed':
                total_docs = stats.get('docs_written', 0)
                print(f"Replication completed: {total_docs} documents")
                break
            elif repl_state == 'error':
                error_msg = state_update.get('_replication_state_reason', 'Unknown error')
                print(f"Replication failed: {error_msg}")
                break
                
            time.sleep(2)  # Check every 2 seconds
            
    except KeyboardInterrupt:
        print("Stopping replication monitoring...")
        replicator.stop_replication(repl_id)

Managing Multiple Replications

from cloudant import cloudant
from cloudant.replicator import Replicator

with cloudant('user', 'pass', account='myaccount') as client:
    replicator = Replicator(client)
    
    # Set up multiple replication jobs
    replications = [
        ('db1', 'backup_db1'),
        ('db2', 'backup_db2'),
        ('db3', 'backup_db3')
    ]
    
    repl_ids = []
    
    # Start all replications
    for source, target in replications:
        repl_doc = replicator.create_replication(
            source_db=source,
            target_db=target,
            create_target=True
        )
        repl_ids.append(repl_doc['_id'])
        print(f"Started replication: {source} -> {target}")
    
    # Monitor all replications
    while repl_ids:
        completed_repls = []
        
        for repl_id in repl_ids:
            state = replicator.replication_state(repl_id)
            repl_state = state.get('_replication_state')
            
            if repl_state == 'completed':
                stats = state.get('_replication_stats', {})
                docs = stats.get('docs_written', 0)
                print(f"Replication {repl_id} completed - {docs} documents")
                completed_repls.append(repl_id)
            elif repl_state == 'error':
                error = state.get('_replication_state_reason', 'Unknown')
                print(f"Replication {repl_id} failed: {error}")
                completed_repls.append(repl_id)
        
        # Remove completed replications from monitoring
        for repl_id in completed_repls:
            repl_ids.remove(repl_id)
        
        if repl_ids:
            time.sleep(5)  # Check every 5 seconds
    
    print("All replications completed")

Replication with Authentication

from cloudant import cloudant
from cloudant.replicator import Replicator

with cloudant('user', 'pass', account='myaccount') as client:
    replicator = Replicator(client)
    
    # Replicate to external CouchDB with basic auth
    external_target = {
        'url': 'https://external-couchdb.example.com/target_db',
        'auth': {
            'username': 'external_user',
            'password': 'external_password'
        }
    }
    
    repl_doc = replicator.create_replication(
        source_db='local_database',
        target_db=external_target
    )
    
    # Replicate to Cloudant with IAM authentication
    iam_target = {
        'url': 'https://target-account.cloudant.com/target_db',
        'auth': {
            'iam': {
                'api_key': 'target_iam_api_key'
            }
        }
    }
    
    repl_doc = replicator.create_replication(
        source_db='local_database',
        target_db=iam_target
    )

Replication Error Handling

from cloudant import cloudant
from cloudant.replicator import Replicator
from cloudant.error import CloudantReplicatorException

with cloudant('user', 'pass', account='myaccount') as client:
    replicator = Replicator(client)
    
    try:
        # Try to replicate to non-existent target without create_target
        repl_doc = replicator.create_replication(
            source_db='source_database',
            target_db='non_existent_target',
            create_target=False
        )
        
        repl_id = repl_doc['_id']
        
        # Monitor for errors
        for i in range(10):  # Check up to 10 times
            state = replicator.replication_state(repl_id)
            repl_state = state.get('_replication_state')
            
            if repl_state == 'error':
                error_reason = state.get('_replication_state_reason')
                print(f"Replication failed: {error_reason}")
                break
            elif repl_state == 'completed':
                print("Replication completed successfully")
                break
            
            time.sleep(2)
            
    except CloudantReplicatorException as e:
        print(f"Replication setup failed: {e}")
    
    # List all replications to check status
    all_replications = replicator.list_replications()
    for repl in all_replications:
        repl_id = repl.get('_id', 'unknown')
        state = repl.get('_replication_state', 'unknown')
        print(f"Replication {repl_id}: {state}")

Bidirectional Replication

from cloudant import cloudant
from cloudant.replicator import Replicator

with cloudant('user', 'pass', account='myaccount') as client:
    replicator = Replicator(client)
    
    # Set up bidirectional continuous replication
    # Primary to secondary
    repl1 = replicator.create_replication(
        source_db='primary_database',
        target_db='secondary_database',
        continuous=True,
        create_target=True,
        repl_id='primary_to_secondary'
    )
    
    # Secondary to primary
    repl2 = replicator.create_replication(
        source_db='secondary_database', 
        target_db='primary_database',
        continuous=True,
        repl_id='secondary_to_primary'
    )
    
    print("Bidirectional replication established")
    print(f"Primary->Secondary: {repl1['_id']}")
    print(f"Secondary->Primary: {repl2['_id']}")
    
    # Monitor both replications
    repl_ids = [repl1['_id'], repl2['_id']]
    
    try:
        while True:
            for repl_id in repl_ids:
                state = replicator.replication_state(repl_id)
                stats = state.get('_replication_stats', {})
                docs_written = stats.get('docs_written', 0)
                print(f"{repl_id}: {docs_written} docs replicated")
            
            time.sleep(30)  # Check every 30 seconds
            
    except KeyboardInterrupt:
        print("Stopping bidirectional replication...")
        for repl_id in repl_ids:
            replicator.stop_replication(repl_id)

Error Handling

Replication operations can raise CloudantReplicatorException:

from cloudant import cloudant
from cloudant.replicator import Replicator
from cloudant.error import CloudantReplicatorException

with cloudant('user', 'pass', account='myaccount') as client:
    replicator = Replicator(client)
    
    try:
        # Invalid replication configuration
        repl_doc = replicator.create_replication(
            source_db='',  # Empty source
            target_db='target'
        )
    except CloudantReplicatorException as e:
        print(f"Replication creation failed: {e}")
    
    try:
        # Check non-existent replication
        state = replicator.replication_state('non_existent_repl_id')
    except CloudantReplicatorException as e:
        print(f"Replication not found: {e}")
    
    try:
        # Stop non-existent replication
        replicator.stop_replication('non_existent_repl_id')
    except CloudantReplicatorException as e:
        print(f"Stop replication failed: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-cloudant

docs

authentication.md

change-feeds.md

database-management.md

document-operations.md

error-handling.md

http-adapters.md

index.md

query-indexing.md

replication.md

scheduler-monitoring.md

security-document.md

views-design-documents.md

tile.json