0
# Scheduler Monitoring
1
2
Monitor and query replication jobs and documents using the CouchDB/Cloudant scheduler API. The scheduler provides visibility into active, completed, and failed replications across the database instance.
3
4
## Capabilities
5
6
### Scheduler Operations
7
8
Query replication documents and jobs for monitoring and troubleshooting replication processes.
9
10
```python { .api }
11
class Scheduler:
12
"""
13
API for retrieving scheduler jobs and documents.
14
"""
15
16
def __init__(self, client):
17
"""
18
Initialize scheduler instance.
19
20
Parameters:
21
- client: CouchDB or Cloudant client instance
22
"""
23
24
def list_docs(self, limit=None, skip=None):
25
"""
26
Lists replication documents with status and metadata.
27
28
Includes information about all replication documents, even those
29
in completed and failed states. Returns document ID, database,
30
replication ID, source and target information.
31
32
Parameters:
33
- limit (int): Maximum number of results to return
34
- skip (int): Number of results to skip from beginning
35
36
Returns:
37
dict: Response containing replication documents list
38
39
Response format:
40
{
41
"total_rows": int,
42
"offset": int,
43
"docs": [
44
{
45
"database": str,
46
"doc_id": str,
47
"id": str,
48
"source": str,
49
"target": str,
50
"state": str,
51
"error_count": int,
52
"info": dict,
53
"last_updated": str,
54
"start_time": str
55
}
56
]
57
}
58
"""
59
60
def get_doc(self, doc_id):
61
"""
62
Get replication document state for specific replication.
63
64
Parameters:
65
- doc_id (str): Replication document ID
66
67
Returns:
68
dict: Detailed replication document state information
69
70
Response format:
71
{
72
"database": str,
73
"doc_id": str,
74
"id": str,
75
"source": str,
76
"target": str,
77
"state": str,
78
"error_count": int,
79
"info": dict,
80
"last_updated": str,
81
"start_time": str
82
}
83
84
Raises:
85
CloudantException: If replication document not found
86
"""
87
88
def list_jobs(self, limit=None, skip=None):
89
"""
90
Lists active replication jobs.
91
92
Includes replications created via /_replicate endpoint and those
93
created from replication documents. Does not include completed
94
or failed replications. Provides source/target information,
95
replication ID, recent event history, and job status.
96
97
Parameters:
98
- limit (int): Maximum number of results to return
99
- skip (int): Number of results to skip from beginning
100
101
Returns:
102
dict: Response containing active replication jobs list
103
104
Response format:
105
{
106
"total_rows": int,
107
"offset": int,
108
"jobs": [
109
{
110
"database": str,
111
"doc_id": str,
112
"id": str,
113
"source": str,
114
"target": str,
115
"pid": str,
116
"continuous": bool,
117
"checkpointed_source_seq": str,
118
"source_seq": str,
119
"revisions_checked": int,
120
"missing_revisions_found": int,
121
"docs_read": int,
122
"docs_written": int,
123
"doc_write_failures": int,
124
"bulk_get_docs": int,
125
"bulk_get_attempts": int,
126
"start_time": str,
127
"history": [
128
{
129
"type": str,
130
"timestamp": str,
131
"details": dict
132
}
133
]
134
}
135
]
136
}
137
"""
138
```
139
140
## Usage Examples
141
142
### Basic Scheduler Monitoring
143
144
```python
145
from cloudant import cloudant
146
from cloudant.scheduler import Scheduler
147
148
with cloudant(username, password, account=account_name) as client:
149
scheduler = Scheduler(client)
150
151
# List all replication documents
152
docs = scheduler.list_docs()
153
print(f"Total replication documents: {docs['total_rows']}")
154
155
for doc in docs['docs']:
156
print(f"Replication {doc['doc_id']}: {doc['state']}")
157
print(f" Source: {doc['source']}")
158
print(f" Target: {doc['target']}")
159
print(f" Last updated: {doc['last_updated']}")
160
161
if doc['error_count'] > 0:
162
print(f" Errors: {doc['error_count']}")
163
```
164
165
### Monitor Active Replication Jobs
166
167
```python
168
from cloudant import cloudant
169
from cloudant.scheduler import Scheduler
170
171
with cloudant(username, password, account=account_name) as client:
172
scheduler = Scheduler(client)
173
174
# List currently running replication jobs
175
jobs = scheduler.list_jobs()
176
print(f"Active replication jobs: {jobs['total_rows']}")
177
178
for job in jobs['jobs']:
179
print(f"Job {job['id']}:")
180
print(f" Source: {job['source']}")
181
print(f" Target: {job['target']}")
182
print(f" Continuous: {job['continuous']}")
183
print(f" Progress: {job['docs_written']}/{job['docs_read']} docs")
184
print(f" Started: {job['start_time']}")
185
186
# Show recent history
187
if 'history' in job:
188
recent_events = job['history'][-3:] # Last 3 events
189
for event in recent_events:
190
print(f" {event['timestamp']}: {event['type']}")
191
```
192
193
### Detailed Replication Status
194
195
```python
196
from cloudant import cloudant
197
from cloudant.scheduler import Scheduler
198
199
with cloudant(username, password, account=account_name) as client:
200
scheduler = Scheduler(client)
201
202
# Get specific replication document details
203
repl_doc_id = "my_replication_doc"
204
205
try:
206
doc_status = scheduler.get_doc(repl_doc_id)
207
208
print(f"Replication Status for {repl_doc_id}:")
209
print(f" State: {doc_status['state']}")
210
print(f" Source: {doc_status['source']}")
211
print(f" Target: {doc_status['target']}")
212
print(f" Error count: {doc_status['error_count']}")
213
print(f" Last updated: {doc_status['last_updated']}")
214
215
if 'info' in doc_status and doc_status['info']:
216
info = doc_status['info']
217
if 'error' in info:
218
print(f" Last error: {info['error']}")
219
if 'progress' in info:
220
print(f" Progress: {info['progress']}")
221
222
except Exception as e:
223
print(f"Failed to get replication status: {e}")
224
```
225
226
### Pagination Through Results
227
228
```python
229
from cloudant import cloudant
230
from cloudant.scheduler import Scheduler
231
232
with cloudant(username, password, account=account_name) as client:
233
scheduler = Scheduler(client)
234
235
# Paginate through all replication documents
236
limit = 10
237
skip = 0
238
239
while True:
240
docs = scheduler.list_docs(limit=limit, skip=skip)
241
242
if not docs['docs']:
243
break
244
245
print(f"Page starting at {skip}:")
246
for doc in docs['docs']:
247
print(f" {doc['doc_id']}: {doc['state']}")
248
249
skip += limit
250
251
# Stop if we've seen all documents
252
if skip >= docs['total_rows']:
253
break
254
```
255
256
### Integration with Replicator Class
257
258
```python
259
from cloudant import cloudant
260
from cloudant.replicator import Replicator
261
from cloudant.scheduler import Scheduler
262
263
with cloudant(username, password, account=account_name) as client:
264
replicator = Replicator(client)
265
scheduler = Scheduler(client)
266
267
# Start a replication
268
repl_doc = replicator.create_replication(
269
source_db='source_database',
270
target_db='target_database',
271
repl_id='my_monitored_replication',
272
continuous=True
273
)
274
275
print(f"Started replication: {repl_doc['id']}")
276
277
# Monitor the replication
278
import time
279
280
for _ in range(10): # Check 10 times
281
time.sleep(5)
282
283
# Check job status
284
jobs = scheduler.list_jobs()
285
active_job = None
286
287
for job in jobs['jobs']:
288
if job['doc_id'] == 'my_monitored_replication':
289
active_job = job
290
break
291
292
if active_job:
293
print(f"Replication progress: {active_job['docs_written']} docs written")
294
else:
295
# Check if it completed or failed
296
doc_status = scheduler.get_doc('my_monitored_replication')
297
print(f"Replication state: {doc_status['state']}")
298
299
if doc_status['state'] in ['completed', 'failed']:
300
break
301
```
302
303
### Error Analysis
304
305
```python
306
from cloudant import cloudant
307
from cloudant.scheduler import Scheduler
308
309
with cloudant(username, password, account=account_name) as client:
310
scheduler = Scheduler(client)
311
312
# Find failed replications
313
docs = scheduler.list_docs()
314
315
failed_replications = [
316
doc for doc in docs['docs']
317
if doc['state'] == 'failed' or doc['error_count'] > 0
318
]
319
320
if failed_replications:
321
print(f"Found {len(failed_replications)} problematic replications:")
322
323
for doc in failed_replications:
324
print(f"\nReplication {doc['doc_id']}:")
325
print(f" State: {doc['state']}")
326
print(f" Errors: {doc['error_count']}")
327
328
# Get detailed error information
329
detailed = scheduler.get_doc(doc['doc_id'])
330
if 'info' in detailed and 'error' in detailed['info']:
331
print(f" Error details: {detailed['info']['error']}")
332
else:
333
print("No failed replications found")
334
```
335
336
## Types
337
338
```python { .api }
339
# Replication document states
340
REPLICATION_STATES = ["initializing", "running", "completed", "pending", "crashing", "error", "failed"]
341
342
# Replication document structure
343
ReplicationDoc = dict[str, Any]
344
345
# Replication job structure
346
ReplicationJob = dict[str, Any]
347
348
# Scheduler responses
349
DocsResponse = dict[str, Any] # Contains total_rows, offset, docs
350
JobsResponse = dict[str, Any] # Contains total_rows, offset, jobs
351
352
# History event structure
353
HistoryEvent = dict[str, Any] # Contains type, timestamp, details
354
```
355
356
## Import Statements
357
358
```python
359
from cloudant.scheduler import Scheduler
360
```