0
# Change Streams
1
2
Real-time change monitoring for watching database, collection, or document changes. Motor's change streams enable reactive applications that respond immediately to data modifications with full support for both asyncio and Tornado frameworks.
3
4
## Capabilities
5
6
### Change Stream Creation
7
8
Change streams can be created at client, database, or collection level to monitor different scopes of changes.
9
10
```python { .api }
11
# Client-level change streams (all databases)
12
def watch(
13
pipeline: Optional[List[Dict[str, Any]]] = None,
14
full_document: Optional[str] = None,
15
resume_after: Optional[Dict[str, Any]] = None,
16
max_await_time_ms: Optional[int] = None,
17
batch_size: Optional[int] = None,
18
collation: Optional[Dict[str, Any]] = None,
19
start_at_operation_time: Optional[Any] = None,
20
session: Optional[Any] = None,
21
start_after: Optional[Dict[str, Any]] = None,
22
**kwargs
23
) -> Union[AsyncIOMotorChangeStream, MotorChangeStream]:
24
"""
25
Watch for changes across all collections in all databases.
26
27
Parameters:
28
- pipeline: Aggregation pipeline to filter changes
29
- full_document: 'default', 'updateLookup', or 'whenAvailable'
30
- resume_after: Resume token to continue from a specific point
31
- max_await_time_ms: Maximum time to wait for changes
32
- batch_size: Number of changes to return in each batch
33
- collation: Collation options for string comparisons
34
- start_at_operation_time: Start watching from specific time
35
- session: Client session for transaction context
36
- start_after: Resume token for change stream continuation
37
"""
38
39
# Database-level change streams (all collections in database)
40
def watch(
41
pipeline: Optional[List[Dict[str, Any]]] = None,
42
**kwargs
43
) -> Union[AsyncIOMotorChangeStream, MotorChangeStream]:
44
"""Watch for changes on all collections in the database."""
45
46
# Collection-level change streams (specific collection)
47
def watch(
48
pipeline: Optional[List[Dict[str, Any]]] = None,
49
**kwargs
50
) -> Union[AsyncIOMotorChangeStream, MotorChangeStream]:
51
"""Watch for changes on the collection."""
52
```
53
54
### AsyncIO Change Stream
55
56
Change stream implementation optimized for asyncio with native async/await support.
57
58
```python { .api }
59
class AsyncIOMotorChangeStream:
60
# Async Iterator Protocol
61
def __aiter__(self) -> AsyncIOMotorChangeStream:
62
"""Return self for async iteration."""
63
64
async def __anext__(self) -> Dict[str, Any]:
65
"""Get the next change event."""
66
67
# Manual Iteration
68
async def next(self) -> Dict[str, Any]:
69
"""
70
Get the next change event.
71
72
Returns:
73
Dictionary containing change event with fields like:
74
- _id: Resume token for this change
75
- operationType: Type of operation (insert, update, delete, etc.)
76
- ns: Namespace (database and collection)
77
- documentKey: Key of changed document
78
- fullDocument: Full document (if full_document option used)
79
- updateDescription: Description of update (for update operations)
80
- clusterTime: Timestamp of the change
81
"""
82
83
async def try_next(self) -> Optional[Dict[str, Any]]:
84
"""
85
Try to get the next change event without blocking.
86
87
Returns:
88
Change event dict or None if no changes available
89
"""
90
91
# Change Stream Properties
92
@property
93
def resume_token(self) -> Optional[Dict[str, Any]]:
94
"""Current resume token for continuing the change stream."""
95
96
@property
97
def alive(self) -> bool:
98
"""Whether the change stream is still alive."""
99
100
# Change Stream Management
101
async def close(self) -> None:
102
"""Close the change stream."""
103
104
async def __aenter__(self) -> AsyncIOMotorChangeStream:
105
"""Async context manager entry."""
106
107
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
108
"""Async context manager exit."""
109
110
# Tornado Change Stream
111
class MotorChangeStream:
112
# Manual Iteration (returns Tornado Futures)
113
def next(self) -> tornado.concurrent.Future:
114
"""Get the next change event."""
115
116
def try_next(self) -> tornado.concurrent.Future:
117
"""Try to get the next change event without blocking."""
118
119
# Properties (identical to AsyncIO version)
120
@property
121
def resume_token(self) -> Optional[Dict[str, Any]]: ...
122
@property
123
def alive(self) -> bool: ...
124
125
# Management
126
def close(self) -> tornado.concurrent.Future: ...
127
```
128
129
### Change Event Structure
130
131
Change events follow a standardized structure containing information about the modification.
132
133
```python { .api }
134
class ChangeEvent:
135
"""
136
Structure of change stream events.
137
Note: This is a conceptual type - actual events are dictionaries.
138
"""
139
_id: Dict[str, Any] # Resume token
140
operationType: str # 'insert', 'update', 'replace', 'delete', 'invalidate', etc.
141
clusterTime: Any # Timestamp when the change occurred
142
ns: Dict[str, str] # Namespace: {'db': 'database_name', 'coll': 'collection_name'}
143
documentKey: Dict[str, Any] # Key identifying the changed document
144
145
# Optional fields (depending on operation type and options)
146
fullDocument: Optional[Dict[str, Any]] # Full document (for inserts/updates with fullDocument option)
147
fullDocumentBeforeChange: Optional[Dict[str, Any]] # Document before change (MongoDB 6.0+)
148
updateDescription: Optional[Dict[str, Any]] # Update details for update operations
149
txnNumber: Optional[int] # Transaction number (for transactional changes)
150
lsid: Optional[Dict[str, Any]] # Logical session identifier
151
```
152
153
## Usage Examples
154
155
### Basic Change Stream Monitoring
156
157
```python
158
import asyncio
159
import motor.motor_asyncio
160
161
async def basic_change_stream_example():
162
client = motor.motor_asyncio.AsyncIOMotorClient()
163
db = client.test_database
164
collection = db.test_collection
165
166
# Start watching for changes
167
print("Starting change stream...")
168
change_stream = collection.watch()
169
170
# Start a background task to make changes
171
async def make_changes():
172
await asyncio.sleep(1) # Wait a bit
173
174
print("Inserting document...")
175
await collection.insert_one({"name": "Alice", "age": 30})
176
177
await asyncio.sleep(1)
178
print("Updating document...")
179
await collection.update_one({"name": "Alice"}, {"$set": {"age": 31}})
180
181
await asyncio.sleep(1)
182
print("Deleting document...")
183
await collection.delete_one({"name": "Alice"})
184
185
# Start change-making task
186
change_task = asyncio.create_task(make_changes())
187
188
# Watch for changes
189
try:
190
change_count = 0
191
async for change in change_stream:
192
print(f"Change {change_count + 1}:")
193
print(f" Operation: {change['operationType']}")
194
print(f" Document Key: {change['documentKey']}")
195
196
if 'fullDocument' in change:
197
print(f" Full Document: {change['fullDocument']}")
198
199
if 'updateDescription' in change:
200
print(f" Update: {change['updateDescription']}")
201
202
change_count += 1
203
204
# Stop after seeing 3 changes
205
if change_count >= 3:
206
break
207
208
finally:
209
await change_stream.close()
210
await change_task
211
client.close()
212
213
asyncio.run(basic_change_stream_example())
214
```
215
216
### Advanced Change Stream Options
217
218
```python
219
import asyncio
220
import motor.motor_asyncio
221
from datetime import datetime
222
223
async def advanced_change_stream_example():
224
client = motor.motor_asyncio.AsyncIOMotorClient()
225
db = client.test_database
226
collection = db.users
227
228
# Change stream with full document lookup
229
change_stream = collection.watch(
230
full_document='updateLookup', # Include full document for updates
231
max_await_time_ms=1000, # Wait max 1 second for changes
232
batch_size=10 # Process changes in batches of 10
233
)
234
235
# Background task to generate changes
236
async def generate_changes():
237
users = [
238
{"name": "Alice", "age": 30, "status": "active"},
239
{"name": "Bob", "age": 25, "status": "active"},
240
{"name": "Charlie", "age": 35, "status": "inactive"}
241
]
242
243
# Insert users
244
await collection.insert_many(users)
245
await asyncio.sleep(2)
246
247
# Update users
248
await collection.update_many(
249
{"status": "active"},
250
{"$inc": {"age": 1}}
251
)
252
await asyncio.sleep(2)
253
254
# Delete inactive users
255
await collection.delete_many({"status": "inactive"})
256
257
change_task = asyncio.create_task(generate_changes())
258
259
print("Watching for changes with full document lookup...")
260
261
try:
262
timeout_count = 0
263
async for change in change_stream:
264
print(f"\nChange detected:")
265
print(f" Type: {change['operationType']}")
266
print(f" Time: {change['clusterTime']}")
267
print(f" Namespace: {change['ns']}")
268
269
if change['operationType'] == 'insert':
270
print(f" Inserted: {change['fullDocument']}")
271
272
elif change['operationType'] == 'update':
273
print(f" Updated document: {change['fullDocument']}")
274
print(f" Updated fields: {change['updateDescription']['updatedFields']}")
275
276
elif change['operationType'] == 'delete':
277
print(f" Deleted document key: {change['documentKey']}")
278
279
# Store resume token for potential resumption
280
resume_token = change['_id']
281
print(f" Resume token: {resume_token}")
282
283
except asyncio.TimeoutError:
284
print("No more changes detected")
285
286
finally:
287
await change_stream.close()
288
await change_task
289
client.close()
290
291
asyncio.run(advanced_change_stream_example())
292
```
293
294
### Change Stream with Pipeline Filtering
295
296
```python
297
import asyncio
298
import motor.motor_asyncio
299
300
async def filtered_change_stream_example():
301
client = motor.motor_asyncio.AsyncIOMotorClient()
302
db = client.test_database
303
collection = db.products
304
305
# Pipeline to filter only certain types of changes
306
pipeline = [
307
# Only watch for insert and update operations
308
{"$match": {
309
"operationType": {"$in": ["insert", "update"]}
310
}},
311
# Only watch for products in Electronics category
312
{"$match": {
313
"$or": [
314
{"fullDocument.category": "Electronics"},
315
{"updateDescription.updatedFields.category": "Electronics"}
316
]
317
}},
318
# Add custom fields to the change event
319
{"$addFields": {
320
"changeTimestamp": "$$clusterTime",
321
"productName": "$fullDocument.name"
322
}}
323
]
324
325
change_stream = collection.watch(
326
pipeline=pipeline,
327
full_document='updateLookup'
328
)
329
330
async def make_product_changes():
331
products = [
332
{"name": "Laptop", "category": "Electronics", "price": 999},
333
{"name": "Book", "category": "Literature", "price": 20},
334
{"name": "Phone", "category": "Electronics", "price": 699},
335
{"name": "Desk", "category": "Furniture", "price": 299}
336
]
337
338
# Insert products
339
await collection.insert_many(products)
340
await asyncio.sleep(1)
341
342
# Update electronics prices (should be detected)
343
await collection.update_many(
344
{"category": "Electronics"},
345
{"$mul": {"price": 0.9}} # 10% discount
346
)
347
await asyncio.sleep(1)
348
349
# Update furniture prices (should NOT be detected due to filter)
350
await collection.update_many(
351
{"category": "Furniture"},
352
{"$mul": {"price": 0.8}} # 20% discount
353
)
354
await asyncio.sleep(1)
355
356
# Change category (should be detected when changing TO Electronics)
357
await collection.update_one(
358
{"name": "Book"},
359
{"$set": {"category": "Electronics"}} # Now it's electronics
360
)
361
362
change_task = asyncio.create_task(make_product_changes())
363
364
print("Watching for Electronics product changes only...")
365
366
try:
367
change_count = 0
368
async for change in change_stream:
369
change_count += 1
370
print(f"\nFiltered Change {change_count}:")
371
print(f" Operation: {change['operationType']}")
372
print(f" Product: {change.get('productName', 'Unknown')}")
373
print(f" Category: {change['fullDocument']['category']}")
374
print(f" Price: ${change['fullDocument']['price']}")
375
376
# Stop after reasonable number of changes
377
if change_count >= 5:
378
break
379
380
finally:
381
await change_stream.close()
382
await change_task
383
client.close()
384
385
asyncio.run(filtered_change_stream_example())
386
```
387
388
### Resume Token and Error Recovery
389
390
```python
391
import asyncio
392
import motor.motor_asyncio
393
import pymongo.errors
394
395
async def resume_token_example():
396
client = motor.motor_asyncio.AsyncIOMotorClient()
397
collection = client.test_database.events
398
399
resume_token = None
400
401
async def watch_with_resume():
402
nonlocal resume_token
403
404
# Create change stream, resuming from token if available
405
if resume_token:
406
print(f"Resuming from token: {resume_token}")
407
change_stream = collection.watch(resume_after=resume_token)
408
else:
409
print("Starting new change stream")
410
change_stream = collection.watch()
411
412
try:
413
change_count = 0
414
async for change in change_stream:
415
change_count += 1
416
print(f"Change {change_count}: {change['operationType']}")
417
418
# Store resume token after each change
419
resume_token = change['_id']
420
421
# Simulate error after 3 changes
422
if change_count == 3:
423
raise Exception("Simulated connection error")
424
425
except Exception as e:
426
print(f"Error occurred: {e}")
427
print(f"Last resume token: {resume_token}")
428
429
finally:
430
await change_stream.close()
431
432
# Background task to generate changes
433
async def generate_events():
434
for i in range(10):
435
await asyncio.sleep(1)
436
await collection.insert_one({"event": f"Event {i}", "timestamp": i})
437
438
event_task = asyncio.create_task(generate_events())
439
440
# First watch session (will be interrupted)
441
try:
442
await watch_with_resume()
443
except Exception:
444
pass
445
446
print("\nRestarting change stream from resume token...")
447
await asyncio.sleep(2)
448
449
# Second watch session (resumes from where we left off)
450
try:
451
await watch_with_resume()
452
except Exception:
453
pass
454
455
await event_task
456
client.close()
457
458
asyncio.run(resume_token_example())
459
```
460
461
### Multi-Collection Change Monitoring
462
463
```python
464
import asyncio
465
import motor.motor_asyncio
466
467
async def multi_collection_example():
468
client = motor.motor_asyncio.AsyncIOMotorClient()
469
db = client.test_database
470
471
# Watch at database level to see changes across all collections
472
change_stream = db.watch(
473
pipeline=[
474
# Filter to only certain collections
475
{"$match": {
476
"ns.coll": {"$in": ["users", "orders", "products"]}
477
}}
478
]
479
)
480
481
async def make_changes():
482
users = db.users
483
orders = db.orders
484
products = db.products
485
486
# Changes across multiple collections
487
await users.insert_one({"name": "Alice", "email": "alice@example.com"})
488
await asyncio.sleep(0.5)
489
490
await products.insert_one({"name": "Laptop", "price": 999})
491
await asyncio.sleep(0.5)
492
493
await orders.insert_one({
494
"user": "alice@example.com",
495
"product": "Laptop",
496
"quantity": 1,
497
"total": 999
498
})
499
await asyncio.sleep(0.5)
500
501
# Update across collections
502
await users.update_one(
503
{"email": "alice@example.com"},
504
{"$set": {"last_order": "Laptop"}}
505
)
506
507
change_task = asyncio.create_task(make_changes())
508
509
print("Watching for changes across multiple collections...")
510
511
try:
512
change_count = 0
513
async for change in change_stream:
514
change_count += 1
515
collection_name = change['ns']['coll']
516
operation = change['operationType']
517
518
print(f"Change {change_count}:")
519
print(f" Collection: {collection_name}")
520
print(f" Operation: {operation}")
521
522
if operation == 'insert':
523
print(f" Document: {change['fullDocument']}")
524
elif operation == 'update':
525
print(f" Updated fields: {change['updateDescription']['updatedFields']}")
526
527
if change_count >= 4:
528
break
529
530
finally:
531
await change_stream.close()
532
await change_task
533
client.close()
534
535
asyncio.run(multi_collection_example())
536
```
537
538
### Change Stream Context Manager
539
540
```python
541
import asyncio
542
import motor.motor_asyncio
543
544
async def context_manager_example():
545
client = motor.motor_asyncio.AsyncIOMotorClient()
546
collection = client.test_database.notifications
547
548
# Using change stream as async context manager
549
async with collection.watch() as change_stream:
550
print("Change stream started with context manager")
551
552
# Background task to generate changes
553
async def send_notifications():
554
notifications = [
555
{"type": "email", "recipient": "user1@example.com", "message": "Welcome!"},
556
{"type": "sms", "recipient": "+1234567890", "message": "Code: 123"},
557
{"type": "push", "recipient": "device123", "message": "New message"}
558
]
559
560
for notification in notifications:
561
await asyncio.sleep(1)
562
await collection.insert_one(notification)
563
564
notify_task = asyncio.create_task(send_notifications())
565
566
# Process changes
567
change_count = 0
568
async for change in change_stream:
569
change_count += 1
570
doc = change['fullDocument']
571
print(f"Notification {change_count}: {doc['type']} to {doc['recipient']}")
572
573
if change_count >= 3:
574
break
575
576
await notify_task
577
578
# Change stream automatically closed when exiting context
579
print("Change stream closed by context manager")
580
client.close()
581
582
asyncio.run(context_manager_example())
583
```
584
585
## Types
586
587
```python { .api }
588
from typing import Any, Optional, Union, Dict, List, AsyncIterator
589
import tornado.concurrent
590
591
# Change event structure
592
ChangeEvent = Dict[str, Any]
593
ResumeToken = Dict[str, Any]
594
OperationType = str # 'insert', 'update', 'replace', 'delete', 'invalidate', etc.
595
Namespace = Dict[str, str] # {'db': str, 'coll': str}
596
597
# Change stream options
598
FullDocumentOption = str # 'default', 'updateLookup', 'whenAvailable'
599
Pipeline = List[Dict[str, Any]]
600
```