CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pymongo

Official Python driver for MongoDB providing comprehensive tools for database operations, BSON handling, and GridFS file storage

Pending
Overview
Eval results
Files

monitoring-events.mddocs/

Monitoring and Change Streams

Change streams, monitoring capabilities, and event handling for real-time data updates and application performance monitoring.

Capabilities

Change Streams

Monitor real-time changes to collections, databases, or entire deployments.

class Collection:
    def watch(self, pipeline=None, full_document=None, resume_after=None, max_await_time_ms=None, batch_size=None, collation=None, start_at_operation_time=None, session=None, start_after=None, show_expanded_events=None):
        """
        Open change stream to monitor collection changes.

        Parameters:
        - pipeline: aggregation pipeline to filter/transform change events
        - full_document: when to return full document ('default', 'updateLookup', 'whenAvailable', 'required')
        - resume_after: resume token to continue from specific point
        - max_await_time_ms: maximum time to wait for changes
        - batch_size: change event batch size
        - collation: collation options
        - start_at_operation_time: start watching from specific time
        - session: optional ClientSession
        - start_after: start after specific change event
        - show_expanded_events: include additional change event types

        Returns:
        ChangeStream: Change stream cursor
        """

class Database:
    def watch(self, pipeline=None, **kwargs):
        """
        Open change stream to monitor database changes.

        Parameters:
        - pipeline: aggregation pipeline for filtering
        - kwargs: same options as Collection.watch()

        Returns:
        ChangeStream: Change stream cursor
        """

class MongoClient:
    def watch(self, pipeline=None, **kwargs):
        """
        Open change stream to monitor all database changes.

        Parameters:
        - pipeline: aggregation pipeline for filtering
        - kwargs: same options as Collection.watch()

        Returns:
        ChangeStream: Change stream cursor
        """

Change Stream Operations

Handle and process change stream events.

class ChangeStream:
    def __iter__(self):
        """Iterate over change events."""

    def __next__(self):
        """Get next change event."""

    def next(self):
        """Get next change event (Python 2 compatibility)."""

    def try_next(self):
        """
        Try to get next change event without blocking.

        Returns:
        dict: Change event or None if no events available
        """

    def close(self):
        """Close the change stream."""

    @property
    def alive(self):
        """
        Check if change stream is alive.

        Returns:
        bool: True if stream is active
        """

    @property
    def resume_token(self):
        """
        Get current resume token.

        Returns:
        dict: Resume token for stream continuation
        """

    def __enter__(self):
        """Context manager entry."""

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit."""

Change Event Types

Structure of change stream events.

# Change event document structure:
{
    "_id": {  # Resume token
        "_data": "resume_token_string"
    },
    "operationType": "insert|update|replace|delete|drop|rename|dropDatabase|invalidate",
    "clusterTime": "Timestamp(...)",
    "ns": {  # Namespace
        "db": "database_name",
        "coll": "collection_name"
    },
    "documentKey": {  # Document identifier
        "_id": "ObjectId(...)"
    },
    "fullDocument": {...},  # Full document (based on full_document option)
    "fullDocumentBeforeChange": {...},  # Previous document state
    "updateDescription": {  # For update operations
        "updatedFields": {...},
        "removedFields": [...],
        "truncatedArrays": [...]
    }
}

Command Monitoring

Monitor database commands for performance and debugging.

from pymongo import monitoring

class CommandListener(monitoring.CommandListener):
    def started(self, event):
        """
        Handle command started event.

        Parameters:
        - event: CommandStartedEvent
        """

    def succeeded(self, event):
        """
        Handle command succeeded event.

        Parameters:
        - event: CommandSucceededEvent
        """

    def failed(self, event):
        """
        Handle command failed event.

        Parameters:
        - event: CommandFailedEvent
        """

class CommandStartedEvent:
    @property
    def command_name(self):
        """Command name."""
    
    @property
    def request_id(self):
        """Request identifier."""
    
    @property
    def connection_id(self):
        """Connection identifier."""
    
    @property
    def command(self):
        """Command document."""

class CommandSucceededEvent:
    @property
    def duration_micros(self):
        """Command duration in microseconds."""
    
    @property
    def reply(self):
        """Command reply document."""
    
    @property
    def command_name(self):
        """Command name."""
    
    @property
    def request_id(self):
        """Request identifier."""

class CommandFailedEvent:
    @property
    def duration_micros(self):
        """Command duration in microseconds."""
    
    @property
    def failure(self):
        """Failure details."""
    
    @property
    def command_name(self):
        """Command name."""
    
    @property
    def request_id(self):
        """Request identifier."""

# Register command listener
monitoring.register(CommandListener())

Connection Pool Monitoring

Monitor connection pool events for performance tuning.

from pymongo import monitoring

class PoolListener(monitoring.PoolListener):
    def pool_created(self, event):
        """
        Handle pool created event.

        Parameters:
        - event: PoolCreatedEvent
        """

    def pool_ready(self, event):
        """
        Handle pool ready event.

        Parameters:
        - event: PoolReadyEvent
        """

    def pool_cleared(self, event):
        """
        Handle pool cleared event.

        Parameters:
        - event: PoolClearedEvent
        """

    def pool_closed(self, event):
        """
        Handle pool closed event.

        Parameters:
        - event: PoolClosedEvent
        """

    def connection_created(self, event):
        """
        Handle connection created event.

        Parameters:
        - event: ConnectionCreatedEvent
        """

    def connection_ready(self, event):
        """
        Handle connection ready event.

        Parameters:
        - event: ConnectionReadyEvent
        """

    def connection_closed(self, event):
        """
        Handle connection closed event.

        Parameters:
        - event: ConnectionClosedEvent
        """

    def connection_check_out_started(self, event):
        """
        Handle connection checkout started event.

        Parameters:
        - event: ConnectionCheckOutStartedEvent
        """

    def connection_check_out_failed(self, event):
        """
        Handle connection checkout failed event.

        Parameters:
        - event: ConnectionCheckOutFailedEvent
        """

    def connection_checked_out(self, event):
        """
        Handle connection checked out event.

        Parameters:
        - event: ConnectionCheckedOutEvent
        """

    def connection_checked_in(self, event):
        """
        Handle connection checked in event.

        Parameters:
        - event: ConnectionCheckedInEvent
        """

# Register pool listener
monitoring.register(PoolListener())

Server Monitoring

Monitor server discovery and topology changes.

from pymongo import monitoring

class ServerListener(monitoring.ServerListener):
    def opened(self, event):
        """
        Handle server opened event.

        Parameters:
        - event: ServerOpeningEvent
        """

    def description_changed(self, event):
        """
        Handle server description changed event.

        Parameters:
        - event: ServerDescriptionChangedEvent
        """

    def closed(self, event):
        """
        Handle server closed event.

        Parameters:
        - event: ServerClosedEvent
        """

class TopologyListener(monitoring.TopologyListener):
    def opened(self, event):
        """
        Handle topology opened event.

        Parameters:
        - event: TopologyOpenedEvent
        """

    def description_changed(self, event):
        """
        Handle topology description changed event.

        Parameters:
        - event: TopologyDescriptionChangedEvent
        """

    def closed(self, event):
        """
        Handle topology closed event.

        Parameters:
        - event: TopologyClosedEvent
        """

# Register server and topology listeners
monitoring.register(ServerListener())
monitoring.register(TopologyListener())

Heartbeat Monitoring

Monitor server heartbeat events for connection health.

from pymongo import monitoring

class HeartbeatListener(monitoring.ServerHeartbeatListener):
    def started(self, event):
        """
        Handle heartbeat started event.

        Parameters:
        - event: ServerHeartbeatStartedEvent
        """

    def succeeded(self, event):
        """
        Handle heartbeat succeeded event.

        Parameters:
        - event: ServerHeartbeatSucceededEvent
        """

    def failed(self, event):
        """
        Handle heartbeat failed event.

        Parameters:
        - event: ServerHeartbeatFailedEvent
        """

# Register heartbeat listener
monitoring.register(HeartbeatListener())

Usage Examples

Basic Change Streams

from pymongo import MongoClient
import pymongo

client = MongoClient()
db = client.mydb
collection = db.orders

# Watch for all changes to collection
with collection.watch() as stream:
    for change in stream:
        print(f"Change detected: {change['operationType']}")
        print(f"Document: {change.get('fullDocument', 'N/A')}")
        print(f"Resume token: {change['_id']}")

# Watch with pipeline filter
pipeline = [
    {"$match": {"operationType": {"$in": ["insert", "update"]}}},
    {"$match": {"fullDocument.status": "urgent"}}
]

with collection.watch(pipeline) as stream:
    for change in stream:
        print(f"Urgent order change: {change['fullDocument']['_id']}")

# Watch with full document lookup
with collection.watch(full_document="updateLookup") as stream:
    for change in stream:
        if change["operationType"] == "update":
            print(f"Updated document: {change['fullDocument']}")
            print(f"Changed fields: {change['updateDescription']['updatedFields']}")

Resumable Change Streams

from pymongo import MongoClient
from pymongo.errors import PyMongoError
import time

client = MongoClient()
collection = client.mydb.inventory

resume_token = None

def process_changes():
    global resume_token
    
    try:
        # Resume from last token if available
        with collection.watch(resume_after=resume_token) as stream:
            for change in stream:
                # Process change event
                process_inventory_change(change)
                
                # Save resume token for recovery
                resume_token = stream.resume_token
                
    except PyMongoError as e:
        print(f"Change stream error: {e}")
        time.sleep(5)  # Wait before retry
        process_changes()  # Retry with last resume token

def process_inventory_change(change):
    """Process inventory change event."""
    op_type = change["operationType"]
    
    if op_type == "insert":
        print(f"New product added: {change['fullDocument']['name']}")
    elif op_type == "update":
        updates = change["updateDescription"]["updatedFields"]
        if "quantity" in updates:
            print(f"Quantity updated for {change['documentKey']['_id']}")
    elif op_type == "delete":
        print(f"Product deleted: {change['documentKey']['_id']}")

# Start monitoring
process_changes()

Database and Client-level Change Streams

from pymongo import MongoClient

client = MongoClient()
db = client.ecommerce

# Watch entire database
pipeline = [
    {"$match": {"ns.coll": {"$in": ["orders", "inventory", "customers"]}}},
    {"$project": {
        "operationType": 1,
        "ns": 1,
        "documentKey": 1,
        "fullDocument.status": 1
    }}
]

with db.watch(pipeline) as stream:
    for change in stream:
        collection_name = change["ns"]["coll"]
        print(f"Change in {collection_name}: {change['operationType']}")

# Watch all databases (requires appropriate permissions)
with client.watch() as stream:
    for change in stream:
        db_name = change["ns"]["db"]
        coll_name = change["ns"]["coll"]
        print(f"Change in {db_name}.{coll_name}")

Command Monitoring

import pymongo
from pymongo import monitoring
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class CommandLogger(monitoring.CommandListener):
    def started(self, event):
        logger.info(f"Command {event.command_name} started on {event.connection_id}")
        if event.command_name in ["find", "insert", "update", "delete"]:
            logger.info(f"Command details: {event.command}")

    def succeeded(self, event):
        logger.info(f"Command {event.command_name} succeeded in {event.duration_micros}μs")

    def failed(self, event):
        logger.error(f"Command {event.command_name} failed after {event.duration_micros}μs: {event.failure}")

# Register the listener
monitoring.register(CommandLogger())

# Now all MongoDB commands will be logged
client = pymongo.MongoClient()
collection = client.mydb.mycollection

# These operations will generate log entries
collection.insert_one({"name": "test"})
collection.find_one({"name": "test"})
collection.update_one({"name": "test"}, {"$set": {"updated": True}})

Connection Pool Monitoring

import pymongo
from pymongo import monitoring
from datetime import datetime

class PoolMonitor(monitoring.PoolListener):
    def __init__(self):
        self.pool_stats = {}

    def pool_created(self, event):
        print(f"Pool created for {event.address}")
        self.pool_stats[event.address] = {
            "created": datetime.now(),
            "connections": 0,
            "checkouts": 0
        }

    def connection_created(self, event):
        stats = self.pool_stats.get(event.address, {})
        stats["connections"] = stats.get("connections", 0) + 1
        print(f"Connection created for {event.address} (total: {stats['connections']})")

    def connection_checked_out(self, event):
        stats = self.pool_stats.get(event.address, {})
        stats["checkouts"] = stats.get("checkouts", 0) + 1
        print(f"Connection checked out from {event.address}")

    def connection_check_out_failed(self, event):
        print(f"Connection checkout failed for {event.address}: {event.reason}")

    def pool_cleared(self, event):
        print(f"Pool cleared for {event.address}")

# Register pool monitor
monitoring.register(PoolMonitor())

# Create client (will trigger pool creation)
client = pymongo.MongoClient(maxPoolSize=10, minPoolSize=2)

Performance Monitoring

import pymongo
from pymongo import monitoring
import time
from collections import defaultdict

class PerformanceMonitor(monitoring.CommandListener):
    def __init__(self):
        self.command_times = defaultdict(list)
        self.slow_queries = []

    def started(self, event):
        event.start_time = time.time()

    def succeeded(self, event):
        duration_ms = event.duration_micros / 1000
        command_name = event.command_name
        
        self.command_times[command_name].append(duration_ms)
        
        # Log slow queries (>100ms)
        if duration_ms > 100:
            self.slow_queries.append({
                "command": command_name,
                "duration_ms": duration_ms,
                "details": event.command
            })

    def get_stats(self):
        """Get performance statistics."""
        stats = {}
        for cmd, times in self.command_times.items():
            stats[cmd] = {
                "count": len(times),
                "avg_ms": sum(times) / len(times),
                "max_ms": max(times),
                "min_ms": min(times)
            }
        return stats

    def get_slow_queries(self, limit=10):
        """Get slowest queries."""
        return sorted(
            self.slow_queries,
            key=lambda x: x["duration_ms"],
            reverse=True
        )[:limit]

# Set up monitoring
perf_monitor = PerformanceMonitor()
monitoring.register(perf_monitor)

# Run some operations
client = pymongo.MongoClient()
collection = client.testdb.testcoll

# Generate some operations
for i in range(100):
    collection.insert_one({"index": i, "data": f"test_data_{i}"})

# Create index (slow operation)
collection.create_index("index")

# Run some queries
collection.find({"index": {"$gt": 50}}).limit(10).to_list()

# Get performance stats
stats = perf_monitor.get_stats()
print("Command Performance Stats:")
for cmd, stat in stats.items():
    print(f"{cmd}: {stat['count']} ops, avg: {stat['avg_ms']:.2f}ms")

slow_queries = perf_monitor.get_slow_queries()
print(f"\nTop {len(slow_queries)} Slow Queries:")
for query in slow_queries:
    print(f"{query['command']}: {query['duration_ms']:.2f}ms")

Install with Tessl CLI

npx tessl i tessl/pypi-pymongo

docs

advanced-queries.md

bson-handling.md

bulk-transactions.md

client-connection.md

database-collection.md

gridfs-storage.md

index.md

monitoring-events.md

tile.json