IBM Cloudant Python client library providing comprehensive interface for Cloudant and CouchDB databases
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Set up and manage database replication between Cloudant instances with comprehensive progress monitoring, state management, and replication configuration options.
Manage replication documents and monitor replication progress.
class Replicator:
"""
Database replication management for Cloudant and CouchDB.
"""
def __init__(self, client):
"""
Initialize replicator with client connection.
Parameters:
- client (Cloudant | CouchDB): Authenticated client instance
"""
def create_replication(self, source_db=None, target_db=None, repl_id=None, **kwargs):
"""
Create replication document to replicate between databases.
Parameters:
- source_db (str | dict): Source database name or connection info
- target_db (str | dict): Target database name or connection info
- repl_id (str): Custom replication document ID
- continuous (bool): Continuous replication (default: False)
- create_target (bool): Create target database if missing
- doc_ids (list[str]): Replicate only specific documents
- filter (str): Filter function name from design document
- query_params (dict): Parameters for filter function
- selector (dict): Replicate documents matching selector
- since_seq (str | int): Start replication from sequence
- checkpoint_interval (int): Checkpoint frequency (milliseconds)
- batch_size (int): Number of documents per batch
- batch_timeout (int): Batch timeout (milliseconds)
- connection_timeout (int): Connection timeout (milliseconds)
- retries_per_request (int): Retry attempts per request
- use_checkpoints (bool): Use replication checkpoints
- source_proxy (str): HTTP proxy for source
- target_proxy (str): HTTP proxy for target
Returns:
dict: Replication document with replication ID
Raises:
CloudantReplicatorException: Replication creation failed
"""
def list_replications(self):
"""
List all replication documents.
Returns:
list[dict]: Active and completed replication documents
"""
def replication_state(self, repl_id):
"""
Get current state of replication.
Parameters:
- repl_id (str): Replication document ID
Returns:
dict: Replication status including state, progress, and errors
Raises:
CloudantReplicatorException: Replication not found
"""
def stop_replication(self, repl_id):
"""
Stop active replication.
Parameters:
- repl_id (str): Replication document ID to stop
Returns:
dict: Stop confirmation response
Raises:
CloudantReplicatorException: Stop operation failed
"""
def follow_replication(self, repl_id):
"""
Monitor replication progress with iterator.
Parameters:
- repl_id (str): Replication document ID to monitor
Yields:
dict: Replication state updates until completion
Raises:
CloudantReplicatorException: Monitoring failed
"""from cloudant import cloudant
from cloudant.replicator import Replicator
with cloudant('user', 'pass', account='myaccount') as client:
replicator = Replicator(client)
# One-time replication between databases
repl_doc = replicator.create_replication(
source_db='source_database',
target_db='target_database',
create_target=True # Create target if it doesn't exist
)
repl_id = repl_doc['_id']
print(f"Started replication: {repl_id}")
# Monitor replication progress
state = replicator.replication_state(repl_id)
print(f"Replication state: {state['_replication_state']}")
print(f"Progress: {state.get('_replication_stats', {})}")from cloudant import cloudant
from cloudant.replicator import Replicator
with cloudant('user', 'pass', account='myaccount') as client:
replicator = Replicator(client)
# Set up continuous replication
repl_doc = replicator.create_replication(
source_db='live_database',
target_db='backup_database',
continuous=True,
create_target=True
)
repl_id = repl_doc['_id']
print(f"Continuous replication started: {repl_id}")
# Continuous replication runs until stopped
# Check status periodically
import time
for i in range(5):
time.sleep(10)
state = replicator.replication_state(repl_id)
stats = state.get('_replication_stats', {})
print(f"Docs written: {stats.get('docs_written', 0)}")
print(f"Docs read: {stats.get('docs_read', 0)}")
# Stop continuous replication
replicator.stop_replication(repl_id)
print("Continuous replication stopped")from cloudant import cloudant
from cloudant.replicator import Replicator
with cloudant('user', 'pass', account='source_account') as source_client:
replicator = Replicator(source_client)
# Replicate to different Cloudant account
target_config = {
'url': 'https://target_account.cloudant.com/target_database',
'auth': {
'username': 'target_user',
'password': 'target_password'
}
}
repl_doc = replicator.create_replication(
source_db='source_database',
target_db=target_config,
continuous=False,
create_target=True
)
print(f"Cross-instance replication: {repl_doc['_id']}")from cloudant import cloudant
from cloudant.replicator import Replicator
with cloudant('user', 'pass', account='myaccount') as client:
replicator = Replicator(client)
# Replicate only specific document types using selector
repl_doc = replicator.create_replication(
source_db='main_database',
target_db='user_database',
selector={
'type': 'user',
'status': 'active'
}
)
# Replicate specific documents by ID
doc_ids = ['user123', 'user456', 'user789']
repl_doc = replicator.create_replication(
source_db='main_database',
target_db='selected_users',
doc_ids=doc_ids
)
# Replicate using filter function
repl_doc = replicator.create_replication(
source_db='main_database',
target_db='filtered_database',
filter='filters/by_department',
query_params={'department': 'engineering'}
)from cloudant import cloudant
from cloudant.replicator import Replicator
with cloudant('user', 'pass', account='myaccount') as client:
replicator = Replicator(client)
# High-performance replication configuration
repl_doc = replicator.create_replication(
source_db='large_database',
target_db='replica_database',
batch_size=1000, # Process 1000 docs per batch
batch_timeout=10000, # 10 second batch timeout
checkpoint_interval=5000, # Checkpoint every 5 seconds
connection_timeout=60000, # 60 second connection timeout
retries_per_request=5, # Retry failed requests 5 times
use_checkpoints=True # Enable checkpointing
)
repl_id = repl_doc['_id']
print(f"High-performance replication: {repl_id}")from cloudant import cloudant
from cloudant.replicator import Replicator
with cloudant('user', 'pass', account='myaccount') as client:
replicator = Replicator(client)
# Get current sequence from target database
target_db = client['target_database']
if target_db.exists():
metadata = target_db.metadata()
last_seq = metadata.get('update_seq', 0)
else:
last_seq = 0
# Replicate only changes since last sequence
repl_doc = replicator.create_replication(
source_db='source_database',
target_db='target_database',
since_seq=last_seq,
create_target=True
)
print(f"Incremental replication from sequence {last_seq}")from cloudant import cloudant
from cloudant.replicator import Replicator
import time
with cloudant('user', 'pass', account='myaccount') as client:
replicator = Replicator(client)
# Start replication
repl_doc = replicator.create_replication(
source_db='large_database',
target_db='backup_database'
)
repl_id = repl_doc['_id']
# Follow replication progress
print("Monitoring replication progress...")
try:
for state_update in replicator.follow_replication(repl_id):
repl_state = state_update.get('_replication_state', 'unknown')
stats = state_update.get('_replication_stats', {})
if repl_state == 'triggered':
print("Replication started")
elif repl_state == 'running':
docs_read = stats.get('docs_read', 0)
docs_written = stats.get('docs_written', 0)
progress = f"{docs_written}/{docs_read}"
print(f"Progress: {progress} documents")
elif repl_state == 'completed':
total_docs = stats.get('docs_written', 0)
print(f"Replication completed: {total_docs} documents")
break
elif repl_state == 'error':
error_msg = state_update.get('_replication_state_reason', 'Unknown error')
print(f"Replication failed: {error_msg}")
break
time.sleep(2) # Check every 2 seconds
except KeyboardInterrupt:
print("Stopping replication monitoring...")
replicator.stop_replication(repl_id)from cloudant import cloudant
from cloudant.replicator import Replicator
with cloudant('user', 'pass', account='myaccount') as client:
replicator = Replicator(client)
# Set up multiple replication jobs
replications = [
('db1', 'backup_db1'),
('db2', 'backup_db2'),
('db3', 'backup_db3')
]
repl_ids = []
# Start all replications
for source, target in replications:
repl_doc = replicator.create_replication(
source_db=source,
target_db=target,
create_target=True
)
repl_ids.append(repl_doc['_id'])
print(f"Started replication: {source} -> {target}")
# Monitor all replications
while repl_ids:
completed_repls = []
for repl_id in repl_ids:
state = replicator.replication_state(repl_id)
repl_state = state.get('_replication_state')
if repl_state == 'completed':
stats = state.get('_replication_stats', {})
docs = stats.get('docs_written', 0)
print(f"Replication {repl_id} completed - {docs} documents")
completed_repls.append(repl_id)
elif repl_state == 'error':
error = state.get('_replication_state_reason', 'Unknown')
print(f"Replication {repl_id} failed: {error}")
completed_repls.append(repl_id)
# Remove completed replications from monitoring
for repl_id in completed_repls:
repl_ids.remove(repl_id)
if repl_ids:
time.sleep(5) # Check every 5 seconds
print("All replications completed")from cloudant import cloudant
from cloudant.replicator import Replicator
with cloudant('user', 'pass', account='myaccount') as client:
replicator = Replicator(client)
# Replicate to external CouchDB with basic auth
external_target = {
'url': 'https://external-couchdb.example.com/target_db',
'auth': {
'username': 'external_user',
'password': 'external_password'
}
}
repl_doc = replicator.create_replication(
source_db='local_database',
target_db=external_target
)
# Replicate to Cloudant with IAM authentication
iam_target = {
'url': 'https://target-account.cloudant.com/target_db',
'auth': {
'iam': {
'api_key': 'target_iam_api_key'
}
}
}
repl_doc = replicator.create_replication(
source_db='local_database',
target_db=iam_target
)from cloudant import cloudant
from cloudant.replicator import Replicator
from cloudant.error import CloudantReplicatorException
with cloudant('user', 'pass', account='myaccount') as client:
replicator = Replicator(client)
try:
# Try to replicate to non-existent target without create_target
repl_doc = replicator.create_replication(
source_db='source_database',
target_db='non_existent_target',
create_target=False
)
repl_id = repl_doc['_id']
# Monitor for errors
for i in range(10): # Check up to 10 times
state = replicator.replication_state(repl_id)
repl_state = state.get('_replication_state')
if repl_state == 'error':
error_reason = state.get('_replication_state_reason')
print(f"Replication failed: {error_reason}")
break
elif repl_state == 'completed':
print("Replication completed successfully")
break
time.sleep(2)
except CloudantReplicatorException as e:
print(f"Replication setup failed: {e}")
# List all replications to check status
all_replications = replicator.list_replications()
for repl in all_replications:
repl_id = repl.get('_id', 'unknown')
state = repl.get('_replication_state', 'unknown')
print(f"Replication {repl_id}: {state}")from cloudant import cloudant
from cloudant.replicator import Replicator
with cloudant('user', 'pass', account='myaccount') as client:
replicator = Replicator(client)
# Set up bidirectional continuous replication
# Primary to secondary
repl1 = replicator.create_replication(
source_db='primary_database',
target_db='secondary_database',
continuous=True,
create_target=True,
repl_id='primary_to_secondary'
)
# Secondary to primary
repl2 = replicator.create_replication(
source_db='secondary_database',
target_db='primary_database',
continuous=True,
repl_id='secondary_to_primary'
)
print("Bidirectional replication established")
print(f"Primary->Secondary: {repl1['_id']}")
print(f"Secondary->Primary: {repl2['_id']}")
# Monitor both replications
repl_ids = [repl1['_id'], repl2['_id']]
try:
while True:
for repl_id in repl_ids:
state = replicator.replication_state(repl_id)
stats = state.get('_replication_stats', {})
docs_written = stats.get('docs_written', 0)
print(f"{repl_id}: {docs_written} docs replicated")
time.sleep(30) # Check every 30 seconds
except KeyboardInterrupt:
print("Stopping bidirectional replication...")
for repl_id in repl_ids:
replicator.stop_replication(repl_id)Replication operations can raise CloudantReplicatorException:
from cloudant import cloudant
from cloudant.replicator import Replicator
from cloudant.error import CloudantReplicatorException
with cloudant('user', 'pass', account='myaccount') as client:
replicator = Replicator(client)
try:
# Invalid replication configuration
repl_doc = replicator.create_replication(
source_db='', # Empty source
target_db='target'
)
except CloudantReplicatorException as e:
print(f"Replication creation failed: {e}")
try:
# Check non-existent replication
state = replicator.replication_state('non_existent_repl_id')
except CloudantReplicatorException as e:
print(f"Replication not found: {e}")
try:
# Stop non-existent replication
replicator.stop_replication('non_existent_repl_id')
except CloudantReplicatorException as e:
print(f"Stop replication failed: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-cloudant