0
# Monitoring and Change Streams
1
2
Change streams, monitoring capabilities, and event handling for real-time data updates and application performance monitoring.
3
4
## Capabilities
5
6
### Change Streams
7
8
Monitor real-time changes to collections, databases, or entire deployments.
9
10
```python { .api }
11
class Collection:
12
def watch(self, pipeline=None, full_document=None, resume_after=None, max_await_time_ms=None, batch_size=None, collation=None, start_at_operation_time=None, session=None, start_after=None, show_expanded_events=None):
13
"""
14
Open change stream to monitor collection changes.
15
16
Parameters:
17
- pipeline: aggregation pipeline to filter/transform change events
18
- full_document: when to return full document ('default', 'updateLookup', 'whenAvailable', 'required')
19
- resume_after: resume token to continue from specific point
20
- max_await_time_ms: maximum time to wait for changes
21
- batch_size: change event batch size
22
- collation: collation options
23
- start_at_operation_time: start watching from specific time
24
- session: optional ClientSession
25
- start_after: start after specific change event
26
- show_expanded_events: include additional change event types
27
28
Returns:
29
ChangeStream: Change stream cursor
30
"""
31
32
class Database:
33
def watch(self, pipeline=None, **kwargs):
34
"""
35
Open change stream to monitor database changes.
36
37
Parameters:
38
- pipeline: aggregation pipeline for filtering
39
- kwargs: same options as Collection.watch()
40
41
Returns:
42
ChangeStream: Change stream cursor
43
"""
44
45
class MongoClient:
46
def watch(self, pipeline=None, **kwargs):
47
"""
48
Open change stream to monitor all database changes.
49
50
Parameters:
51
- pipeline: aggregation pipeline for filtering
52
- kwargs: same options as Collection.watch()
53
54
Returns:
55
ChangeStream: Change stream cursor
56
"""
57
```
58
59
### Change Stream Operations
60
61
Handle and process change stream events.
62
63
```python { .api }
64
class ChangeStream:
65
def __iter__(self):
66
"""Iterate over change events."""
67
68
def __next__(self):
69
"""Get next change event."""
70
71
def next(self):
72
"""Get next change event (Python 2 compatibility)."""
73
74
def try_next(self):
75
"""
76
Try to get next change event without blocking.
77
78
Returns:
79
dict: Change event or None if no events available
80
"""
81
82
def close(self):
83
"""Close the change stream."""
84
85
@property
86
def alive(self):
87
"""
88
Check if change stream is alive.
89
90
Returns:
91
bool: True if stream is active
92
"""
93
94
@property
95
def resume_token(self):
96
"""
97
Get current resume token.
98
99
Returns:
100
dict: Resume token for stream continuation
101
"""
102
103
def __enter__(self):
104
"""Context manager entry."""
105
106
def __exit__(self, exc_type, exc_val, exc_tb):
107
"""Context manager exit."""
108
```
109
110
### Change Event Types
111
112
Structure of change stream events.
113
114
```python { .api }
115
# Change event document structure:
116
{
117
"_id": { # Resume token
118
"_data": "resume_token_string"
119
},
120
"operationType": "insert|update|replace|delete|drop|rename|dropDatabase|invalidate",
121
"clusterTime": "Timestamp(...)",
122
"ns": { # Namespace
123
"db": "database_name",
124
"coll": "collection_name"
125
},
126
"documentKey": { # Document identifier
127
"_id": "ObjectId(...)"
128
},
129
"fullDocument": {...}, # Full document (based on full_document option)
130
"fullDocumentBeforeChange": {...}, # Previous document state
131
"updateDescription": { # For update operations
132
"updatedFields": {...},
133
"removedFields": [...],
134
"truncatedArrays": [...]
135
}
136
}
137
```
138
139
### Command Monitoring
140
141
Monitor database commands for performance and debugging.
142
143
```python { .api }
144
from pymongo import monitoring
145
146
class CommandListener(monitoring.CommandListener):
147
def started(self, event):
148
"""
149
Handle command started event.
150
151
Parameters:
152
- event: CommandStartedEvent
153
"""
154
155
def succeeded(self, event):
156
"""
157
Handle command succeeded event.
158
159
Parameters:
160
- event: CommandSucceededEvent
161
"""
162
163
def failed(self, event):
164
"""
165
Handle command failed event.
166
167
Parameters:
168
- event: CommandFailedEvent
169
"""
170
171
class CommandStartedEvent:
172
@property
173
def command_name(self):
174
"""Command name."""
175
176
@property
177
def request_id(self):
178
"""Request identifier."""
179
180
@property
181
def connection_id(self):
182
"""Connection identifier."""
183
184
@property
185
def command(self):
186
"""Command document."""
187
188
class CommandSucceededEvent:
189
@property
190
def duration_micros(self):
191
"""Command duration in microseconds."""
192
193
@property
194
def reply(self):
195
"""Command reply document."""
196
197
@property
198
def command_name(self):
199
"""Command name."""
200
201
@property
202
def request_id(self):
203
"""Request identifier."""
204
205
class CommandFailedEvent:
206
@property
207
def duration_micros(self):
208
"""Command duration in microseconds."""
209
210
@property
211
def failure(self):
212
"""Failure details."""
213
214
@property
215
def command_name(self):
216
"""Command name."""
217
218
@property
219
def request_id(self):
220
"""Request identifier."""
221
222
# Register command listener
223
monitoring.register(CommandListener())
224
```
225
226
### Connection Pool Monitoring
227
228
Monitor connection pool events for performance tuning.
229
230
```python { .api }
231
from pymongo import monitoring
232
233
class PoolListener(monitoring.PoolListener):
234
def pool_created(self, event):
235
"""
236
Handle pool created event.
237
238
Parameters:
239
- event: PoolCreatedEvent
240
"""
241
242
def pool_ready(self, event):
243
"""
244
Handle pool ready event.
245
246
Parameters:
247
- event: PoolReadyEvent
248
"""
249
250
def pool_cleared(self, event):
251
"""
252
Handle pool cleared event.
253
254
Parameters:
255
- event: PoolClearedEvent
256
"""
257
258
def pool_closed(self, event):
259
"""
260
Handle pool closed event.
261
262
Parameters:
263
- event: PoolClosedEvent
264
"""
265
266
def connection_created(self, event):
267
"""
268
Handle connection created event.
269
270
Parameters:
271
- event: ConnectionCreatedEvent
272
"""
273
274
def connection_ready(self, event):
275
"""
276
Handle connection ready event.
277
278
Parameters:
279
- event: ConnectionReadyEvent
280
"""
281
282
def connection_closed(self, event):
283
"""
284
Handle connection closed event.
285
286
Parameters:
287
- event: ConnectionClosedEvent
288
"""
289
290
def connection_check_out_started(self, event):
291
"""
292
Handle connection checkout started event.
293
294
Parameters:
295
- event: ConnectionCheckOutStartedEvent
296
"""
297
298
def connection_check_out_failed(self, event):
299
"""
300
Handle connection checkout failed event.
301
302
Parameters:
303
- event: ConnectionCheckOutFailedEvent
304
"""
305
306
def connection_checked_out(self, event):
307
"""
308
Handle connection checked out event.
309
310
Parameters:
311
- event: ConnectionCheckedOutEvent
312
"""
313
314
def connection_checked_in(self, event):
315
"""
316
Handle connection checked in event.
317
318
Parameters:
319
- event: ConnectionCheckedInEvent
320
"""
321
322
# Register pool listener
323
monitoring.register(PoolListener())
324
```
325
326
### Server Monitoring
327
328
Monitor server discovery and topology changes.
329
330
```python { .api }
331
from pymongo import monitoring
332
333
class ServerListener(monitoring.ServerListener):
334
def opened(self, event):
335
"""
336
Handle server opened event.
337
338
Parameters:
339
- event: ServerOpeningEvent
340
"""
341
342
def description_changed(self, event):
343
"""
344
Handle server description changed event.
345
346
Parameters:
347
- event: ServerDescriptionChangedEvent
348
"""
349
350
def closed(self, event):
351
"""
352
Handle server closed event.
353
354
Parameters:
355
- event: ServerClosedEvent
356
"""
357
358
class TopologyListener(monitoring.TopologyListener):
359
def opened(self, event):
360
"""
361
Handle topology opened event.
362
363
Parameters:
364
- event: TopologyOpenedEvent
365
"""
366
367
def description_changed(self, event):
368
"""
369
Handle topology description changed event.
370
371
Parameters:
372
- event: TopologyDescriptionChangedEvent
373
"""
374
375
def closed(self, event):
376
"""
377
Handle topology closed event.
378
379
Parameters:
380
- event: TopologyClosedEvent
381
"""
382
383
# Register server and topology listeners
384
monitoring.register(ServerListener())
385
monitoring.register(TopologyListener())
386
```
387
388
### Heartbeat Monitoring
389
390
Monitor server heartbeat events for connection health.
391
392
```python { .api }
393
from pymongo import monitoring
394
395
class HeartbeatListener(monitoring.ServerHeartbeatListener):
396
def started(self, event):
397
"""
398
Handle heartbeat started event.
399
400
Parameters:
401
- event: ServerHeartbeatStartedEvent
402
"""
403
404
def succeeded(self, event):
405
"""
406
Handle heartbeat succeeded event.
407
408
Parameters:
409
- event: ServerHeartbeatSucceededEvent
410
"""
411
412
def failed(self, event):
413
"""
414
Handle heartbeat failed event.
415
416
Parameters:
417
- event: ServerHeartbeatFailedEvent
418
"""
419
420
# Register heartbeat listener
421
monitoring.register(HeartbeatListener())
422
```
423
424
## Usage Examples
425
426
### Basic Change Streams
427
428
```python
429
from pymongo import MongoClient
430
import pymongo
431
432
client = MongoClient()
433
db = client.mydb
434
collection = db.orders
435
436
# Watch for all changes to collection
437
with collection.watch() as stream:
438
for change in stream:
439
print(f"Change detected: {change['operationType']}")
440
print(f"Document: {change.get('fullDocument', 'N/A')}")
441
print(f"Resume token: {change['_id']}")
442
443
# Watch with pipeline filter
444
pipeline = [
445
{"$match": {"operationType": {"$in": ["insert", "update"]}}},
446
{"$match": {"fullDocument.status": "urgent"}}
447
]
448
449
with collection.watch(pipeline) as stream:
450
for change in stream:
451
print(f"Urgent order change: {change['fullDocument']['_id']}")
452
453
# Watch with full document lookup
454
with collection.watch(full_document="updateLookup") as stream:
455
for change in stream:
456
if change["operationType"] == "update":
457
print(f"Updated document: {change['fullDocument']}")
458
print(f"Changed fields: {change['updateDescription']['updatedFields']}")
459
```
460
461
### Resumable Change Streams
462
463
```python
464
from pymongo import MongoClient
465
from pymongo.errors import PyMongoError
466
import time
467
468
client = MongoClient()
469
collection = client.mydb.inventory
470
471
resume_token = None
472
473
def process_changes():
474
global resume_token
475
476
try:
477
# Resume from last token if available
478
with collection.watch(resume_after=resume_token) as stream:
479
for change in stream:
480
# Process change event
481
process_inventory_change(change)
482
483
# Save resume token for recovery
484
resume_token = stream.resume_token
485
486
except PyMongoError as e:
487
print(f"Change stream error: {e}")
488
time.sleep(5) # Wait before retry
489
process_changes() # Retry with last resume token
490
491
def process_inventory_change(change):
492
"""Process inventory change event."""
493
op_type = change["operationType"]
494
495
if op_type == "insert":
496
print(f"New product added: {change['fullDocument']['name']}")
497
elif op_type == "update":
498
updates = change["updateDescription"]["updatedFields"]
499
if "quantity" in updates:
500
print(f"Quantity updated for {change['documentKey']['_id']}")
501
elif op_type == "delete":
502
print(f"Product deleted: {change['documentKey']['_id']}")
503
504
# Start monitoring
505
process_changes()
506
```
507
508
### Database and Client-level Change Streams
509
510
```python
511
from pymongo import MongoClient
512
513
client = MongoClient()
514
db = client.ecommerce
515
516
# Watch entire database
517
pipeline = [
518
{"$match": {"ns.coll": {"$in": ["orders", "inventory", "customers"]}}},
519
{"$project": {
520
"operationType": 1,
521
"ns": 1,
522
"documentKey": 1,
523
"fullDocument.status": 1
524
}}
525
]
526
527
with db.watch(pipeline) as stream:
528
for change in stream:
529
collection_name = change["ns"]["coll"]
530
print(f"Change in {collection_name}: {change['operationType']}")
531
532
# Watch all databases (requires appropriate permissions)
533
with client.watch() as stream:
534
for change in stream:
535
db_name = change["ns"]["db"]
536
coll_name = change["ns"]["coll"]
537
print(f"Change in {db_name}.{coll_name}")
538
```
539
540
### Command Monitoring
541
542
```python
543
import pymongo
544
from pymongo import monitoring
545
import logging
546
547
# Set up logging
548
logging.basicConfig(level=logging.INFO)
549
logger = logging.getLogger(__name__)
550
551
class CommandLogger(monitoring.CommandListener):
552
def started(self, event):
553
logger.info(f"Command {event.command_name} started on {event.connection_id}")
554
if event.command_name in ["find", "insert", "update", "delete"]:
555
logger.info(f"Command details: {event.command}")
556
557
def succeeded(self, event):
558
logger.info(f"Command {event.command_name} succeeded in {event.duration_micros}μs")
559
560
def failed(self, event):
561
logger.error(f"Command {event.command_name} failed after {event.duration_micros}μs: {event.failure}")
562
563
# Register the listener
564
monitoring.register(CommandLogger())
565
566
# Now all MongoDB commands will be logged
567
client = pymongo.MongoClient()
568
collection = client.mydb.mycollection
569
570
# These operations will generate log entries
571
collection.insert_one({"name": "test"})
572
collection.find_one({"name": "test"})
573
collection.update_one({"name": "test"}, {"$set": {"updated": True}})
574
```
575
576
### Connection Pool Monitoring
577
578
```python
579
import pymongo
580
from pymongo import monitoring
581
from datetime import datetime
582
583
class PoolMonitor(monitoring.PoolListener):
584
def __init__(self):
585
self.pool_stats = {}
586
587
def pool_created(self, event):
588
print(f"Pool created for {event.address}")
589
self.pool_stats[event.address] = {
590
"created": datetime.now(),
591
"connections": 0,
592
"checkouts": 0
593
}
594
595
def connection_created(self, event):
596
stats = self.pool_stats.get(event.address, {})
597
stats["connections"] = stats.get("connections", 0) + 1
598
print(f"Connection created for {event.address} (total: {stats['connections']})")
599
600
def connection_checked_out(self, event):
601
stats = self.pool_stats.get(event.address, {})
602
stats["checkouts"] = stats.get("checkouts", 0) + 1
603
print(f"Connection checked out from {event.address}")
604
605
def connection_check_out_failed(self, event):
606
print(f"Connection checkout failed for {event.address}: {event.reason}")
607
608
def pool_cleared(self, event):
609
print(f"Pool cleared for {event.address}")
610
611
# Register pool monitor
612
monitoring.register(PoolMonitor())
613
614
# Create client (will trigger pool creation)
615
client = pymongo.MongoClient(maxPoolSize=10, minPoolSize=2)
616
```
617
618
### Performance Monitoring
619
620
```python
621
import pymongo
622
from pymongo import monitoring
623
import time
624
from collections import defaultdict
625
626
class PerformanceMonitor(monitoring.CommandListener):
627
def __init__(self):
628
self.command_times = defaultdict(list)
629
self.slow_queries = []
630
631
def started(self, event):
632
event.start_time = time.time()
633
634
def succeeded(self, event):
635
duration_ms = event.duration_micros / 1000
636
command_name = event.command_name
637
638
self.command_times[command_name].append(duration_ms)
639
640
# Log slow queries (>100ms)
641
if duration_ms > 100:
642
self.slow_queries.append({
643
"command": command_name,
644
"duration_ms": duration_ms,
645
"details": event.command
646
})
647
648
def get_stats(self):
649
"""Get performance statistics."""
650
stats = {}
651
for cmd, times in self.command_times.items():
652
stats[cmd] = {
653
"count": len(times),
654
"avg_ms": sum(times) / len(times),
655
"max_ms": max(times),
656
"min_ms": min(times)
657
}
658
return stats
659
660
def get_slow_queries(self, limit=10):
661
"""Get slowest queries."""
662
return sorted(
663
self.slow_queries,
664
key=lambda x: x["duration_ms"],
665
reverse=True
666
)[:limit]
667
668
# Set up monitoring
669
perf_monitor = PerformanceMonitor()
670
monitoring.register(perf_monitor)
671
672
# Run some operations
673
client = pymongo.MongoClient()
674
collection = client.testdb.testcoll
675
676
# Generate some operations
677
for i in range(100):
678
collection.insert_one({"index": i, "data": f"test_data_{i}"})
679
680
# Create index (slow operation)
681
collection.create_index("index")
682
683
# Run some queries
684
collection.find({"index": {"$gt": 50}}).limit(10).to_list()
685
686
# Get performance stats
687
stats = perf_monitor.get_stats()
688
print("Command Performance Stats:")
689
for cmd, stat in stats.items():
690
print(f"{cmd}: {stat['count']} ops, avg: {stat['avg_ms']:.2f}ms")
691
692
slow_queries = perf_monitor.get_slow_queries()
693
print(f"\nTop {len(slow_queries)} Slow Queries:")
694
for query in slow_queries:
695
print(f"{query['command']}: {query['duration_ms']:.2f}ms")
696
```