0
# Replication
1
2
Set up and manage database replication between Cloudant instances with comprehensive progress monitoring, state management, and replication configuration options.
3
4
## Capabilities
5
6
### Replicator Class
7
8
Manage replication documents and monitor replication progress.
9
10
```python { .api }
11
class Replicator:
12
"""
13
Database replication management for Cloudant and CouchDB.
14
"""
15
16
def __init__(self, client):
17
"""
18
Initialize replicator with client connection.
19
20
Parameters:
21
- client (Cloudant | CouchDB): Authenticated client instance
22
"""
23
24
def create_replication(self, source_db=None, target_db=None, repl_id=None, **kwargs):
25
"""
26
Create replication document to replicate between databases.
27
28
Parameters:
29
- source_db (str | dict): Source database name or connection info
30
- target_db (str | dict): Target database name or connection info
31
- repl_id (str): Custom replication document ID
32
- continuous (bool): Continuous replication (default: False)
33
- create_target (bool): Create target database if missing
34
- doc_ids (list[str]): Replicate only specific documents
35
- filter (str): Filter function name from design document
36
- query_params (dict): Parameters for filter function
37
- selector (dict): Replicate documents matching selector
38
- since_seq (str | int): Start replication from sequence
39
- checkpoint_interval (int): Checkpoint frequency (milliseconds)
40
- batch_size (int): Number of documents per batch
41
- batch_timeout (int): Batch timeout (milliseconds)
42
- connection_timeout (int): Connection timeout (milliseconds)
43
- retries_per_request (int): Retry attempts per request
44
- use_checkpoints (bool): Use replication checkpoints
45
- source_proxy (str): HTTP proxy for source
46
- target_proxy (str): HTTP proxy for target
47
48
Returns:
49
dict: Replication document with replication ID
50
51
Raises:
52
CloudantReplicatorException: Replication creation failed
53
"""
54
55
def list_replications(self):
56
"""
57
List all replication documents.
58
59
Returns:
60
list[dict]: Active and completed replication documents
61
"""
62
63
def replication_state(self, repl_id):
64
"""
65
Get current state of replication.
66
67
Parameters:
68
- repl_id (str): Replication document ID
69
70
Returns:
71
dict: Replication status including state, progress, and errors
72
73
Raises:
74
CloudantReplicatorException: Replication not found
75
"""
76
77
def stop_replication(self, repl_id):
78
"""
79
Stop active replication.
80
81
Parameters:
82
- repl_id (str): Replication document ID to stop
83
84
Returns:
85
dict: Stop confirmation response
86
87
Raises:
88
CloudantReplicatorException: Stop operation failed
89
"""
90
91
def follow_replication(self, repl_id):
92
"""
93
Monitor replication progress with iterator.
94
95
Parameters:
96
- repl_id (str): Replication document ID to monitor
97
98
Yields:
99
dict: Replication state updates until completion
100
101
Raises:
102
CloudantReplicatorException: Monitoring failed
103
"""
104
```
105
106
## Usage Examples
107
108
### Basic Replication Setup
109
110
```python
111
from cloudant import cloudant
112
from cloudant.replicator import Replicator
113
114
with cloudant('user', 'pass', account='myaccount') as client:
115
replicator = Replicator(client)
116
117
# One-time replication between databases
118
repl_doc = replicator.create_replication(
119
source_db='source_database',
120
target_db='target_database',
121
create_target=True # Create target if it doesn't exist
122
)
123
124
repl_id = repl_doc['_id']
125
print(f"Started replication: {repl_id}")
126
127
# Monitor replication progress
128
state = replicator.replication_state(repl_id)
129
print(f"Replication state: {state['_replication_state']}")
130
print(f"Progress: {state.get('_replication_stats', {})}")
131
```
132
133
### Continuous Replication
134
135
```python
136
from cloudant import cloudant
137
from cloudant.replicator import Replicator
138
139
with cloudant('user', 'pass', account='myaccount') as client:
140
replicator = Replicator(client)
141
142
# Set up continuous replication
143
repl_doc = replicator.create_replication(
144
source_db='live_database',
145
target_db='backup_database',
146
continuous=True,
147
create_target=True
148
)
149
150
repl_id = repl_doc['_id']
151
print(f"Continuous replication started: {repl_id}")
152
153
# Continuous replication runs until stopped
154
# Check status periodically
155
import time
156
157
for i in range(5):
158
time.sleep(10)
159
state = replicator.replication_state(repl_id)
160
stats = state.get('_replication_stats', {})
161
print(f"Docs written: {stats.get('docs_written', 0)}")
162
print(f"Docs read: {stats.get('docs_read', 0)}")
163
164
# Stop continuous replication
165
replicator.stop_replication(repl_id)
166
print("Continuous replication stopped")
167
```
168
169
### Cross-Instance Replication
170
171
```python
172
from cloudant import cloudant
173
from cloudant.replicator import Replicator
174
175
with cloudant('user', 'pass', account='source_account') as source_client:
176
replicator = Replicator(source_client)
177
178
# Replicate to different Cloudant account
179
target_config = {
180
'url': 'https://target_account.cloudant.com/target_database',
181
'auth': {
182
'username': 'target_user',
183
'password': 'target_password'
184
}
185
}
186
187
repl_doc = replicator.create_replication(
188
source_db='source_database',
189
target_db=target_config,
190
continuous=False,
191
create_target=True
192
)
193
194
print(f"Cross-instance replication: {repl_doc['_id']}")
195
```
196
197
### Filtered Replication
198
199
```python
200
from cloudant import cloudant
201
from cloudant.replicator import Replicator
202
203
with cloudant('user', 'pass', account='myaccount') as client:
204
replicator = Replicator(client)
205
206
# Replicate only specific document types using selector
207
repl_doc = replicator.create_replication(
208
source_db='main_database',
209
target_db='user_database',
210
selector={
211
'type': 'user',
212
'status': 'active'
213
}
214
)
215
216
# Replicate specific documents by ID
217
doc_ids = ['user123', 'user456', 'user789']
218
repl_doc = replicator.create_replication(
219
source_db='main_database',
220
target_db='selected_users',
221
doc_ids=doc_ids
222
)
223
224
# Replicate using filter function
225
repl_doc = replicator.create_replication(
226
source_db='main_database',
227
target_db='filtered_database',
228
filter='filters/by_department',
229
query_params={'department': 'engineering'}
230
)
231
```
232
233
### Replication with Custom Settings
234
235
```python
236
from cloudant import cloudant
237
from cloudant.replicator import Replicator
238
239
with cloudant('user', 'pass', account='myaccount') as client:
240
replicator = Replicator(client)
241
242
# High-performance replication configuration
243
repl_doc = replicator.create_replication(
244
source_db='large_database',
245
target_db='replica_database',
246
batch_size=1000, # Process 1000 docs per batch
247
batch_timeout=10000, # 10 second batch timeout
248
checkpoint_interval=5000, # Checkpoint every 5 seconds
249
connection_timeout=60000, # 60 second connection timeout
250
retries_per_request=5, # Retry failed requests 5 times
251
use_checkpoints=True # Enable checkpointing
252
)
253
254
repl_id = repl_doc['_id']
255
print(f"High-performance replication: {repl_id}")
256
```
257
258
### Incremental Replication
259
260
```python
261
from cloudant import cloudant
262
from cloudant.replicator import Replicator
263
264
with cloudant('user', 'pass', account='myaccount') as client:
265
replicator = Replicator(client)
266
267
# Get current sequence from target database
268
target_db = client['target_database']
269
if target_db.exists():
270
metadata = target_db.metadata()
271
last_seq = metadata.get('update_seq', 0)
272
else:
273
last_seq = 0
274
275
# Replicate only changes since last sequence
276
repl_doc = replicator.create_replication(
277
source_db='source_database',
278
target_db='target_database',
279
since_seq=last_seq,
280
create_target=True
281
)
282
283
print(f"Incremental replication from sequence {last_seq}")
284
```
285
286
### Monitoring Replication Progress
287
288
```python
289
from cloudant import cloudant
290
from cloudant.replicator import Replicator
291
import time
292
293
with cloudant('user', 'pass', account='myaccount') as client:
294
replicator = Replicator(client)
295
296
# Start replication
297
repl_doc = replicator.create_replication(
298
source_db='large_database',
299
target_db='backup_database'
300
)
301
302
repl_id = repl_doc['_id']
303
304
# Follow replication progress
305
print("Monitoring replication progress...")
306
307
try:
308
for state_update in replicator.follow_replication(repl_id):
309
repl_state = state_update.get('_replication_state', 'unknown')
310
stats = state_update.get('_replication_stats', {})
311
312
if repl_state == 'triggered':
313
print("Replication started")
314
elif repl_state == 'running':
315
docs_read = stats.get('docs_read', 0)
316
docs_written = stats.get('docs_written', 0)
317
progress = f"{docs_written}/{docs_read}"
318
print(f"Progress: {progress} documents")
319
elif repl_state == 'completed':
320
total_docs = stats.get('docs_written', 0)
321
print(f"Replication completed: {total_docs} documents")
322
break
323
elif repl_state == 'error':
324
error_msg = state_update.get('_replication_state_reason', 'Unknown error')
325
print(f"Replication failed: {error_msg}")
326
break
327
328
time.sleep(2) # Check every 2 seconds
329
330
except KeyboardInterrupt:
331
print("Stopping replication monitoring...")
332
replicator.stop_replication(repl_id)
333
```
334
335
### Managing Multiple Replications
336
337
```python
338
from cloudant import cloudant
339
from cloudant.replicator import Replicator
340
341
with cloudant('user', 'pass', account='myaccount') as client:
342
replicator = Replicator(client)
343
344
# Set up multiple replication jobs
345
replications = [
346
('db1', 'backup_db1'),
347
('db2', 'backup_db2'),
348
('db3', 'backup_db3')
349
]
350
351
repl_ids = []
352
353
# Start all replications
354
for source, target in replications:
355
repl_doc = replicator.create_replication(
356
source_db=source,
357
target_db=target,
358
create_target=True
359
)
360
repl_ids.append(repl_doc['_id'])
361
print(f"Started replication: {source} -> {target}")
362
363
# Monitor all replications
364
while repl_ids:
365
completed_repls = []
366
367
for repl_id in repl_ids:
368
state = replicator.replication_state(repl_id)
369
repl_state = state.get('_replication_state')
370
371
if repl_state == 'completed':
372
stats = state.get('_replication_stats', {})
373
docs = stats.get('docs_written', 0)
374
print(f"Replication {repl_id} completed - {docs} documents")
375
completed_repls.append(repl_id)
376
elif repl_state == 'error':
377
error = state.get('_replication_state_reason', 'Unknown')
378
print(f"Replication {repl_id} failed: {error}")
379
completed_repls.append(repl_id)
380
381
# Remove completed replications from monitoring
382
for repl_id in completed_repls:
383
repl_ids.remove(repl_id)
384
385
if repl_ids:
386
time.sleep(5) # Check every 5 seconds
387
388
print("All replications completed")
389
```
390
391
### Replication with Authentication
392
393
```python
394
from cloudant import cloudant
395
from cloudant.replicator import Replicator
396
397
with cloudant('user', 'pass', account='myaccount') as client:
398
replicator = Replicator(client)
399
400
# Replicate to external CouchDB with basic auth
401
external_target = {
402
'url': 'https://external-couchdb.example.com/target_db',
403
'auth': {
404
'username': 'external_user',
405
'password': 'external_password'
406
}
407
}
408
409
repl_doc = replicator.create_replication(
410
source_db='local_database',
411
target_db=external_target
412
)
413
414
# Replicate to Cloudant with IAM authentication
415
iam_target = {
416
'url': 'https://target-account.cloudant.com/target_db',
417
'auth': {
418
'iam': {
419
'api_key': 'target_iam_api_key'
420
}
421
}
422
}
423
424
repl_doc = replicator.create_replication(
425
source_db='local_database',
426
target_db=iam_target
427
)
428
```
429
430
### Replication Error Handling
431
432
```python
433
from cloudant import cloudant
434
from cloudant.replicator import Replicator
435
from cloudant.error import CloudantReplicatorException
436
437
with cloudant('user', 'pass', account='myaccount') as client:
438
replicator = Replicator(client)
439
440
try:
441
# Try to replicate to non-existent target without create_target
442
repl_doc = replicator.create_replication(
443
source_db='source_database',
444
target_db='non_existent_target',
445
create_target=False
446
)
447
448
repl_id = repl_doc['_id']
449
450
# Monitor for errors
451
for i in range(10): # Check up to 10 times
452
state = replicator.replication_state(repl_id)
453
repl_state = state.get('_replication_state')
454
455
if repl_state == 'error':
456
error_reason = state.get('_replication_state_reason')
457
print(f"Replication failed: {error_reason}")
458
break
459
elif repl_state == 'completed':
460
print("Replication completed successfully")
461
break
462
463
time.sleep(2)
464
465
except CloudantReplicatorException as e:
466
print(f"Replication setup failed: {e}")
467
468
# List all replications to check status
469
all_replications = replicator.list_replications()
470
for repl in all_replications:
471
repl_id = repl.get('_id', 'unknown')
472
state = repl.get('_replication_state', 'unknown')
473
print(f"Replication {repl_id}: {state}")
474
```
475
476
### Bidirectional Replication
477
478
```python
479
from cloudant import cloudant
480
from cloudant.replicator import Replicator
481
482
with cloudant('user', 'pass', account='myaccount') as client:
483
replicator = Replicator(client)
484
485
# Set up bidirectional continuous replication
486
# Primary to secondary
487
repl1 = replicator.create_replication(
488
source_db='primary_database',
489
target_db='secondary_database',
490
continuous=True,
491
create_target=True,
492
repl_id='primary_to_secondary'
493
)
494
495
# Secondary to primary
496
repl2 = replicator.create_replication(
497
source_db='secondary_database',
498
target_db='primary_database',
499
continuous=True,
500
repl_id='secondary_to_primary'
501
)
502
503
print("Bidirectional replication established")
504
print(f"Primary->Secondary: {repl1['_id']}")
505
print(f"Secondary->Primary: {repl2['_id']}")
506
507
# Monitor both replications
508
repl_ids = [repl1['_id'], repl2['_id']]
509
510
try:
511
while True:
512
for repl_id in repl_ids:
513
state = replicator.replication_state(repl_id)
514
stats = state.get('_replication_stats', {})
515
docs_written = stats.get('docs_written', 0)
516
print(f"{repl_id}: {docs_written} docs replicated")
517
518
time.sleep(30) # Check every 30 seconds
519
520
except KeyboardInterrupt:
521
print("Stopping bidirectional replication...")
522
for repl_id in repl_ids:
523
replicator.stop_replication(repl_id)
524
```
525
526
## Error Handling
527
528
Replication operations can raise `CloudantReplicatorException`:
529
530
```python
531
from cloudant import cloudant
532
from cloudant.replicator import Replicator
533
from cloudant.error import CloudantReplicatorException
534
535
with cloudant('user', 'pass', account='myaccount') as client:
536
replicator = Replicator(client)
537
538
try:
539
# Invalid replication configuration
540
repl_doc = replicator.create_replication(
541
source_db='', # Empty source
542
target_db='target'
543
)
544
except CloudantReplicatorException as e:
545
print(f"Replication creation failed: {e}")
546
547
try:
548
# Check non-existent replication
549
state = replicator.replication_state('non_existent_repl_id')
550
except CloudantReplicatorException as e:
551
print(f"Replication not found: {e}")
552
553
try:
554
# Stop non-existent replication
555
replicator.stop_replication('non_existent_repl_id')
556
except CloudantReplicatorException as e:
557
print(f"Stop replication failed: {e}")
558
```