CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-motor

Non-blocking MongoDB driver for Python asyncio and Tornado applications

Pending
Overview
Eval results
Files

change-streams.mddocs/

Change Streams

Real-time change monitoring for watching database, collection, or document changes. Motor's change streams enable reactive applications that respond immediately to data modifications with full support for both asyncio and Tornado frameworks.

Capabilities

Change Stream Creation

Change streams can be created at client, database, or collection level to monitor different scopes of changes.

# Client-level change streams (all databases)
def watch(
    pipeline: Optional[List[Dict[str, Any]]] = None,
    full_document: Optional[str] = None,
    resume_after: Optional[Dict[str, Any]] = None,
    max_await_time_ms: Optional[int] = None,
    batch_size: Optional[int] = None,
    collation: Optional[Dict[str, Any]] = None,
    start_at_operation_time: Optional[Any] = None,
    session: Optional[Any] = None,
    start_after: Optional[Dict[str, Any]] = None,
    **kwargs
) -> Union[AsyncIOMotorChangeStream, MotorChangeStream]:
    """
    Watch for changes across all collections in all databases.
    
    Parameters:
    - pipeline: Aggregation pipeline to filter changes
    - full_document: 'default', 'updateLookup', or 'whenAvailable'
    - resume_after: Resume token to continue from a specific point
    - max_await_time_ms: Maximum time to wait for changes
    - batch_size: Number of changes to return in each batch
    - collation: Collation options for string comparisons
    - start_at_operation_time: Start watching from specific time
    - session: Client session for transaction context
    - start_after: Resume token for change stream continuation
    """

# Database-level change streams (all collections in database)
def watch(
    pipeline: Optional[List[Dict[str, Any]]] = None,
    **kwargs
) -> Union[AsyncIOMotorChangeStream, MotorChangeStream]:
    """Watch for changes on all collections in the database."""

# Collection-level change streams (specific collection)
def watch(
    pipeline: Optional[List[Dict[str, Any]]] = None,
    **kwargs
) -> Union[AsyncIOMotorChangeStream, MotorChangeStream]:
    """Watch for changes on the collection."""

AsyncIO Change Stream

Change stream implementation optimized for asyncio with native async/await support.

class AsyncIOMotorChangeStream:
    # Async Iterator Protocol
    def __aiter__(self) -> AsyncIOMotorChangeStream:
        """Return self for async iteration."""
    
    async def __anext__(self) -> Dict[str, Any]:
        """Get the next change event."""
    
    # Manual Iteration
    async def next(self) -> Dict[str, Any]:
        """
        Get the next change event.
        
        Returns:
        Dictionary containing change event with fields like:
        - _id: Resume token for this change
        - operationType: Type of operation (insert, update, delete, etc.)
        - ns: Namespace (database and collection)
        - documentKey: Key of changed document
        - fullDocument: Full document (if full_document option used)
        - updateDescription: Description of update (for update operations)
        - clusterTime: Timestamp of the change
        """
    
    async def try_next(self) -> Optional[Dict[str, Any]]:
        """
        Try to get the next change event without blocking.
        
        Returns:
        Change event dict or None if no changes available
        """
    
    # Change Stream Properties
    @property
    def resume_token(self) -> Optional[Dict[str, Any]]:
        """Current resume token for continuing the change stream."""
    
    @property
    def alive(self) -> bool:
        """Whether the change stream is still alive."""
    
    # Change Stream Management
    async def close(self) -> None:
        """Close the change stream."""
    
    async def __aenter__(self) -> AsyncIOMotorChangeStream:
        """Async context manager entry."""
    
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Async context manager exit."""

# Tornado Change Stream
class MotorChangeStream:
    # Manual Iteration (returns Tornado Futures)
    def next(self) -> tornado.concurrent.Future:
        """Get the next change event."""
    
    def try_next(self) -> tornado.concurrent.Future:
        """Try to get the next change event without blocking."""
    
    # Properties (identical to AsyncIO version)
    @property
    def resume_token(self) -> Optional[Dict[str, Any]]: ...
    @property
    def alive(self) -> bool: ...
    
    # Management
    def close(self) -> tornado.concurrent.Future: ...

Change Event Structure

Change events follow a standardized structure containing information about the modification.

class ChangeEvent:
    """
    Structure of change stream events.
    Note: This is a conceptual type - actual events are dictionaries.
    """
    _id: Dict[str, Any]  # Resume token
    operationType: str  # 'insert', 'update', 'replace', 'delete', 'invalidate', etc.
    clusterTime: Any  # Timestamp when the change occurred
    ns: Dict[str, str]  # Namespace: {'db': 'database_name', 'coll': 'collection_name'}
    documentKey: Dict[str, Any]  # Key identifying the changed document
    
    # Optional fields (depending on operation type and options)
    fullDocument: Optional[Dict[str, Any]]  # Full document (for inserts/updates with fullDocument option)
    fullDocumentBeforeChange: Optional[Dict[str, Any]]  # Document before change (MongoDB 6.0+)
    updateDescription: Optional[Dict[str, Any]]  # Update details for update operations
    txnNumber: Optional[int]  # Transaction number (for transactional changes)
    lsid: Optional[Dict[str, Any]]  # Logical session identifier

Usage Examples

Basic Change Stream Monitoring

import asyncio
import motor.motor_asyncio

async def basic_change_stream_example():
    client = motor.motor_asyncio.AsyncIOMotorClient()
    db = client.test_database
    collection = db.test_collection
    
    # Start watching for changes
    print("Starting change stream...")
    change_stream = collection.watch()
    
    # Start a background task to make changes
    async def make_changes():
        await asyncio.sleep(1)  # Wait a bit
        
        print("Inserting document...")
        await collection.insert_one({"name": "Alice", "age": 30})
        
        await asyncio.sleep(1)
        print("Updating document...")
        await collection.update_one({"name": "Alice"}, {"$set": {"age": 31}})
        
        await asyncio.sleep(1)
        print("Deleting document...")
        await collection.delete_one({"name": "Alice"})
    
    # Start change-making task
    change_task = asyncio.create_task(make_changes())
    
    # Watch for changes
    try:
        change_count = 0
        async for change in change_stream:
            print(f"Change {change_count + 1}:")
            print(f"  Operation: {change['operationType']}")
            print(f"  Document Key: {change['documentKey']}")
            
            if 'fullDocument' in change:
                print(f"  Full Document: {change['fullDocument']}")
            
            if 'updateDescription' in change:
                print(f"  Update: {change['updateDescription']}")
            
            change_count += 1
            
            # Stop after seeing 3 changes
            if change_count >= 3:
                break
    
    finally:
        await change_stream.close()
        await change_task
        client.close()

asyncio.run(basic_change_stream_example())

Advanced Change Stream Options

import asyncio
import motor.motor_asyncio
from datetime import datetime

async def advanced_change_stream_example():
    client = motor.motor_asyncio.AsyncIOMotorClient()
    db = client.test_database
    collection = db.users
    
    # Change stream with full document lookup
    change_stream = collection.watch(
        full_document='updateLookup',  # Include full document for updates
        max_await_time_ms=1000,        # Wait max 1 second for changes
        batch_size=10                  # Process changes in batches of 10
    )
    
    # Background task to generate changes
    async def generate_changes():
        users = [
            {"name": "Alice", "age": 30, "status": "active"},
            {"name": "Bob", "age": 25, "status": "active"},
            {"name": "Charlie", "age": 35, "status": "inactive"}
        ]
        
        # Insert users
        await collection.insert_many(users)
        await asyncio.sleep(2)
        
        # Update users
        await collection.update_many(
            {"status": "active"},
            {"$inc": {"age": 1}}
        )
        await asyncio.sleep(2)
        
        # Delete inactive users
        await collection.delete_many({"status": "inactive"})
    
    change_task = asyncio.create_task(generate_changes())
    
    print("Watching for changes with full document lookup...")
    
    try:
        timeout_count = 0
        async for change in change_stream:
            print(f"\nChange detected:")
            print(f"  Type: {change['operationType']}")
            print(f"  Time: {change['clusterTime']}")
            print(f"  Namespace: {change['ns']}")
            
            if change['operationType'] == 'insert':
                print(f"  Inserted: {change['fullDocument']}")
            
            elif change['operationType'] == 'update':
                print(f"  Updated document: {change['fullDocument']}")
                print(f"  Updated fields: {change['updateDescription']['updatedFields']}")
            
            elif change['operationType'] == 'delete':
                print(f"  Deleted document key: {change['documentKey']}")
            
            # Store resume token for potential resumption
            resume_token = change['_id']
            print(f"  Resume token: {resume_token}")
            
    except asyncio.TimeoutError:
        print("No more changes detected")
    
    finally:
        await change_stream.close()
        await change_task
        client.close()

asyncio.run(advanced_change_stream_example())

Change Stream with Pipeline Filtering

import asyncio
import motor.motor_asyncio

async def filtered_change_stream_example():
    client = motor.motor_asyncio.AsyncIOMotorClient()
    db = client.test_database
    collection = db.products
    
    # Pipeline to filter only certain types of changes
    pipeline = [
        # Only watch for insert and update operations
        {"$match": {
            "operationType": {"$in": ["insert", "update"]}
        }},
        # Only watch for products in Electronics category
        {"$match": {
            "$or": [
                {"fullDocument.category": "Electronics"},
                {"updateDescription.updatedFields.category": "Electronics"}
            ]
        }},
        # Add custom fields to the change event
        {"$addFields": {
            "changeTimestamp": "$$clusterTime",
            "productName": "$fullDocument.name"
        }}
    ]
    
    change_stream = collection.watch(
        pipeline=pipeline,
        full_document='updateLookup'
    )
    
    async def make_product_changes():
        products = [
            {"name": "Laptop", "category": "Electronics", "price": 999},
            {"name": "Book", "category": "Literature", "price": 20},
            {"name": "Phone", "category": "Electronics", "price": 699},
            {"name": "Desk", "category": "Furniture", "price": 299}
        ]
        
        # Insert products
        await collection.insert_many(products)
        await asyncio.sleep(1)
        
        # Update electronics prices (should be detected)
        await collection.update_many(
            {"category": "Electronics"},
            {"$mul": {"price": 0.9}}  # 10% discount
        )
        await asyncio.sleep(1)
        
        # Update furniture prices (should NOT be detected due to filter)
        await collection.update_many(
            {"category": "Furniture"},
            {"$mul": {"price": 0.8}}  # 20% discount
        )
        await asyncio.sleep(1)
        
        # Change category (should be detected when changing TO Electronics)
        await collection.update_one(
            {"name": "Book"},
            {"$set": {"category": "Electronics"}}  # Now it's electronics
        )
    
    change_task = asyncio.create_task(make_product_changes())
    
    print("Watching for Electronics product changes only...")
    
    try:
        change_count = 0
        async for change in change_stream:
            change_count += 1
            print(f"\nFiltered Change {change_count}:")
            print(f"  Operation: {change['operationType']}")
            print(f"  Product: {change.get('productName', 'Unknown')}")
            print(f"  Category: {change['fullDocument']['category']}")
            print(f"  Price: ${change['fullDocument']['price']}")
            
            # Stop after reasonable number of changes
            if change_count >= 5:
                break
    
    finally:
        await change_stream.close()
        await change_task
        client.close()

asyncio.run(filtered_change_stream_example())

Resume Token and Error Recovery

import asyncio
import motor.motor_asyncio
import pymongo.errors

async def resume_token_example():
    client = motor.motor_asyncio.AsyncIOMotorClient()
    collection = client.test_database.events
    
    resume_token = None
    
    async def watch_with_resume():
        nonlocal resume_token
        
        # Create change stream, resuming from token if available
        if resume_token:
            print(f"Resuming from token: {resume_token}")
            change_stream = collection.watch(resume_after=resume_token)
        else:
            print("Starting new change stream")
            change_stream = collection.watch()
        
        try:
            change_count = 0
            async for change in change_stream:
                change_count += 1
                print(f"Change {change_count}: {change['operationType']}")
                
                # Store resume token after each change
                resume_token = change['_id']
                
                # Simulate error after 3 changes
                if change_count == 3:
                    raise Exception("Simulated connection error")
                
        except Exception as e:
            print(f"Error occurred: {e}")
            print(f"Last resume token: {resume_token}")
            
        finally:
            await change_stream.close()
    
    # Background task to generate changes
    async def generate_events():
        for i in range(10):
            await asyncio.sleep(1)
            await collection.insert_one({"event": f"Event {i}", "timestamp": i})
    
    event_task = asyncio.create_task(generate_events())
    
    # First watch session (will be interrupted)
    try:
        await watch_with_resume()
    except Exception:
        pass
    
    print("\nRestarting change stream from resume token...")
    await asyncio.sleep(2)
    
    # Second watch session (resumes from where we left off)
    try:
        await watch_with_resume()
    except Exception:
        pass
    
    await event_task
    client.close()

asyncio.run(resume_token_example())

Multi-Collection Change Monitoring

import asyncio
import motor.motor_asyncio

async def multi_collection_example():
    client = motor.motor_asyncio.AsyncIOMotorClient()
    db = client.test_database
    
    # Watch at database level to see changes across all collections
    change_stream = db.watch(
        pipeline=[
            # Filter to only certain collections
            {"$match": {
                "ns.coll": {"$in": ["users", "orders", "products"]}
            }}
        ]
    )
    
    async def make_changes():
        users = db.users
        orders = db.orders
        products = db.products
        
        # Changes across multiple collections
        await users.insert_one({"name": "Alice", "email": "alice@example.com"})
        await asyncio.sleep(0.5)
        
        await products.insert_one({"name": "Laptop", "price": 999})
        await asyncio.sleep(0.5)
        
        await orders.insert_one({
            "user": "alice@example.com",
            "product": "Laptop",
            "quantity": 1,
            "total": 999
        })
        await asyncio.sleep(0.5)
        
        # Update across collections
        await users.update_one(
            {"email": "alice@example.com"},
            {"$set": {"last_order": "Laptop"}}
        )
    
    change_task = asyncio.create_task(make_changes())
    
    print("Watching for changes across multiple collections...")
    
    try:
        change_count = 0
        async for change in change_stream:
            change_count += 1
            collection_name = change['ns']['coll']
            operation = change['operationType']
            
            print(f"Change {change_count}:")
            print(f"  Collection: {collection_name}")
            print(f"  Operation: {operation}")
            
            if operation == 'insert':
                print(f"  Document: {change['fullDocument']}")
            elif operation == 'update':
                print(f"  Updated fields: {change['updateDescription']['updatedFields']}")
            
            if change_count >= 4:
                break
    
    finally:
        await change_stream.close()
        await change_task
        client.close()

asyncio.run(multi_collection_example())

Change Stream Context Manager

import asyncio
import motor.motor_asyncio

async def context_manager_example():
    client = motor.motor_asyncio.AsyncIOMotorClient()
    collection = client.test_database.notifications
    
    # Using change stream as async context manager
    async with collection.watch() as change_stream:
        print("Change stream started with context manager")
        
        # Background task to generate changes
        async def send_notifications():
            notifications = [
                {"type": "email", "recipient": "user1@example.com", "message": "Welcome!"},
                {"type": "sms", "recipient": "+1234567890", "message": "Code: 123"},
                {"type": "push", "recipient": "device123", "message": "New message"}
            ]
            
            for notification in notifications:
                await asyncio.sleep(1)
                await collection.insert_one(notification)
        
        notify_task = asyncio.create_task(send_notifications())
        
        # Process changes
        change_count = 0
        async for change in change_stream:
            change_count += 1
            doc = change['fullDocument']
            print(f"Notification {change_count}: {doc['type']} to {doc['recipient']}")
            
            if change_count >= 3:
                break
        
        await notify_task
    
    # Change stream automatically closed when exiting context
    print("Change stream closed by context manager")
    client.close()

asyncio.run(context_manager_example())

Types

from typing import Any, Optional, Union, Dict, List, AsyncIterator
import tornado.concurrent

# Change event structure
ChangeEvent = Dict[str, Any]
ResumeToken = Dict[str, Any]
OperationType = str  # 'insert', 'update', 'replace', 'delete', 'invalidate', etc.
Namespace = Dict[str, str]  # {'db': str, 'coll': str}

# Change stream options
FullDocumentOption = str  # 'default', 'updateLookup', 'whenAvailable'
Pipeline = List[Dict[str, Any]]

Install with Tessl CLI

npx tessl i tessl/pypi-motor

docs

asyncio-operations.md

change-streams.md

client-encryption.md

cursor-operations.md

gridfs-operations.md

index.md

tornado-operations.md

web-integration.md

tile.json