Official Python driver for MongoDB providing comprehensive tools for database operations, BSON handling, and GridFS file storage
—
Change streams, monitoring capabilities, and event handling for real-time data updates and application performance monitoring.
Monitor real-time changes to collections, databases, or entire deployments.
class Collection:
def watch(self, pipeline=None, full_document=None, resume_after=None, max_await_time_ms=None, batch_size=None, collation=None, start_at_operation_time=None, session=None, start_after=None, show_expanded_events=None):
"""
Open change stream to monitor collection changes.
Parameters:
- pipeline: aggregation pipeline to filter/transform change events
- full_document: when to return full document ('default', 'updateLookup', 'whenAvailable', 'required')
- resume_after: resume token to continue from specific point
- max_await_time_ms: maximum time to wait for changes
- batch_size: change event batch size
- collation: collation options
- start_at_operation_time: start watching from specific time
- session: optional ClientSession
- start_after: start after specific change event
- show_expanded_events: include additional change event types
Returns:
ChangeStream: Change stream cursor
"""
class Database:
def watch(self, pipeline=None, **kwargs):
"""
Open change stream to monitor database changes.
Parameters:
- pipeline: aggregation pipeline for filtering
- kwargs: same options as Collection.watch()
Returns:
ChangeStream: Change stream cursor
"""
class MongoClient:
def watch(self, pipeline=None, **kwargs):
"""
Open change stream to monitor all database changes.
Parameters:
- pipeline: aggregation pipeline for filtering
- kwargs: same options as Collection.watch()
Returns:
ChangeStream: Change stream cursor
"""Handle and process change stream events.
class ChangeStream:
def __iter__(self):
"""Iterate over change events."""
def __next__(self):
"""Get next change event."""
def next(self):
"""Get next change event (Python 2 compatibility)."""
def try_next(self):
"""
Try to get next change event without blocking.
Returns:
dict: Change event or None if no events available
"""
def close(self):
"""Close the change stream."""
@property
def alive(self):
"""
Check if change stream is alive.
Returns:
bool: True if stream is active
"""
@property
def resume_token(self):
"""
Get current resume token.
Returns:
dict: Resume token for stream continuation
"""
def __enter__(self):
"""Context manager entry."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""Structure of change stream events.
# Change event document structure:
{
"_id": { # Resume token
"_data": "resume_token_string"
},
"operationType": "insert|update|replace|delete|drop|rename|dropDatabase|invalidate",
"clusterTime": "Timestamp(...)",
"ns": { # Namespace
"db": "database_name",
"coll": "collection_name"
},
"documentKey": { # Document identifier
"_id": "ObjectId(...)"
},
"fullDocument": {...}, # Full document (based on full_document option)
"fullDocumentBeforeChange": {...}, # Previous document state
"updateDescription": { # For update operations
"updatedFields": {...},
"removedFields": [...],
"truncatedArrays": [...]
}
}Monitor database commands for performance and debugging.
from pymongo import monitoring
class CommandListener(monitoring.CommandListener):
def started(self, event):
"""
Handle command started event.
Parameters:
- event: CommandStartedEvent
"""
def succeeded(self, event):
"""
Handle command succeeded event.
Parameters:
- event: CommandSucceededEvent
"""
def failed(self, event):
"""
Handle command failed event.
Parameters:
- event: CommandFailedEvent
"""
class CommandStartedEvent:
@property
def command_name(self):
"""Command name."""
@property
def request_id(self):
"""Request identifier."""
@property
def connection_id(self):
"""Connection identifier."""
@property
def command(self):
"""Command document."""
class CommandSucceededEvent:
@property
def duration_micros(self):
"""Command duration in microseconds."""
@property
def reply(self):
"""Command reply document."""
@property
def command_name(self):
"""Command name."""
@property
def request_id(self):
"""Request identifier."""
class CommandFailedEvent:
@property
def duration_micros(self):
"""Command duration in microseconds."""
@property
def failure(self):
"""Failure details."""
@property
def command_name(self):
"""Command name."""
@property
def request_id(self):
"""Request identifier."""
# Register command listener
monitoring.register(CommandListener())Monitor connection pool events for performance tuning.
from pymongo import monitoring
class PoolListener(monitoring.PoolListener):
def pool_created(self, event):
"""
Handle pool created event.
Parameters:
- event: PoolCreatedEvent
"""
def pool_ready(self, event):
"""
Handle pool ready event.
Parameters:
- event: PoolReadyEvent
"""
def pool_cleared(self, event):
"""
Handle pool cleared event.
Parameters:
- event: PoolClearedEvent
"""
def pool_closed(self, event):
"""
Handle pool closed event.
Parameters:
- event: PoolClosedEvent
"""
def connection_created(self, event):
"""
Handle connection created event.
Parameters:
- event: ConnectionCreatedEvent
"""
def connection_ready(self, event):
"""
Handle connection ready event.
Parameters:
- event: ConnectionReadyEvent
"""
def connection_closed(self, event):
"""
Handle connection closed event.
Parameters:
- event: ConnectionClosedEvent
"""
def connection_check_out_started(self, event):
"""
Handle connection checkout started event.
Parameters:
- event: ConnectionCheckOutStartedEvent
"""
def connection_check_out_failed(self, event):
"""
Handle connection checkout failed event.
Parameters:
- event: ConnectionCheckOutFailedEvent
"""
def connection_checked_out(self, event):
"""
Handle connection checked out event.
Parameters:
- event: ConnectionCheckedOutEvent
"""
def connection_checked_in(self, event):
"""
Handle connection checked in event.
Parameters:
- event: ConnectionCheckedInEvent
"""
# Register pool listener
monitoring.register(PoolListener())Monitor server discovery and topology changes.
from pymongo import monitoring
class ServerListener(monitoring.ServerListener):
def opened(self, event):
"""
Handle server opened event.
Parameters:
- event: ServerOpeningEvent
"""
def description_changed(self, event):
"""
Handle server description changed event.
Parameters:
- event: ServerDescriptionChangedEvent
"""
def closed(self, event):
"""
Handle server closed event.
Parameters:
- event: ServerClosedEvent
"""
class TopologyListener(monitoring.TopologyListener):
def opened(self, event):
"""
Handle topology opened event.
Parameters:
- event: TopologyOpenedEvent
"""
def description_changed(self, event):
"""
Handle topology description changed event.
Parameters:
- event: TopologyDescriptionChangedEvent
"""
def closed(self, event):
"""
Handle topology closed event.
Parameters:
- event: TopologyClosedEvent
"""
# Register server and topology listeners
monitoring.register(ServerListener())
monitoring.register(TopologyListener())Monitor server heartbeat events for connection health.
from pymongo import monitoring
class HeartbeatListener(monitoring.ServerHeartbeatListener):
def started(self, event):
"""
Handle heartbeat started event.
Parameters:
- event: ServerHeartbeatStartedEvent
"""
def succeeded(self, event):
"""
Handle heartbeat succeeded event.
Parameters:
- event: ServerHeartbeatSucceededEvent
"""
def failed(self, event):
"""
Handle heartbeat failed event.
Parameters:
- event: ServerHeartbeatFailedEvent
"""
# Register heartbeat listener
monitoring.register(HeartbeatListener())from pymongo import MongoClient
import pymongo
client = MongoClient()
db = client.mydb
collection = db.orders
# Watch for all changes to collection
with collection.watch() as stream:
for change in stream:
print(f"Change detected: {change['operationType']}")
print(f"Document: {change.get('fullDocument', 'N/A')}")
print(f"Resume token: {change['_id']}")
# Watch with pipeline filter
pipeline = [
{"$match": {"operationType": {"$in": ["insert", "update"]}}},
{"$match": {"fullDocument.status": "urgent"}}
]
with collection.watch(pipeline) as stream:
for change in stream:
print(f"Urgent order change: {change['fullDocument']['_id']}")
# Watch with full document lookup
with collection.watch(full_document="updateLookup") as stream:
for change in stream:
if change["operationType"] == "update":
print(f"Updated document: {change['fullDocument']}")
print(f"Changed fields: {change['updateDescription']['updatedFields']}")from pymongo import MongoClient
from pymongo.errors import PyMongoError
import time
client = MongoClient()
collection = client.mydb.inventory
resume_token = None
def process_changes():
global resume_token
try:
# Resume from last token if available
with collection.watch(resume_after=resume_token) as stream:
for change in stream:
# Process change event
process_inventory_change(change)
# Save resume token for recovery
resume_token = stream.resume_token
except PyMongoError as e:
print(f"Change stream error: {e}")
time.sleep(5) # Wait before retry
process_changes() # Retry with last resume token
def process_inventory_change(change):
"""Process inventory change event."""
op_type = change["operationType"]
if op_type == "insert":
print(f"New product added: {change['fullDocument']['name']}")
elif op_type == "update":
updates = change["updateDescription"]["updatedFields"]
if "quantity" in updates:
print(f"Quantity updated for {change['documentKey']['_id']}")
elif op_type == "delete":
print(f"Product deleted: {change['documentKey']['_id']}")
# Start monitoring
process_changes()from pymongo import MongoClient
client = MongoClient()
db = client.ecommerce
# Watch entire database
pipeline = [
{"$match": {"ns.coll": {"$in": ["orders", "inventory", "customers"]}}},
{"$project": {
"operationType": 1,
"ns": 1,
"documentKey": 1,
"fullDocument.status": 1
}}
]
with db.watch(pipeline) as stream:
for change in stream:
collection_name = change["ns"]["coll"]
print(f"Change in {collection_name}: {change['operationType']}")
# Watch all databases (requires appropriate permissions)
with client.watch() as stream:
for change in stream:
db_name = change["ns"]["db"]
coll_name = change["ns"]["coll"]
print(f"Change in {db_name}.{coll_name}")import pymongo
from pymongo import monitoring
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CommandLogger(monitoring.CommandListener):
def started(self, event):
logger.info(f"Command {event.command_name} started on {event.connection_id}")
if event.command_name in ["find", "insert", "update", "delete"]:
logger.info(f"Command details: {event.command}")
def succeeded(self, event):
logger.info(f"Command {event.command_name} succeeded in {event.duration_micros}μs")
def failed(self, event):
logger.error(f"Command {event.command_name} failed after {event.duration_micros}μs: {event.failure}")
# Register the listener
monitoring.register(CommandLogger())
# Now all MongoDB commands will be logged
client = pymongo.MongoClient()
collection = client.mydb.mycollection
# These operations will generate log entries
collection.insert_one({"name": "test"})
collection.find_one({"name": "test"})
collection.update_one({"name": "test"}, {"$set": {"updated": True}})import pymongo
from pymongo import monitoring
from datetime import datetime
class PoolMonitor(monitoring.PoolListener):
def __init__(self):
self.pool_stats = {}
def pool_created(self, event):
print(f"Pool created for {event.address}")
self.pool_stats[event.address] = {
"created": datetime.now(),
"connections": 0,
"checkouts": 0
}
def connection_created(self, event):
stats = self.pool_stats.get(event.address, {})
stats["connections"] = stats.get("connections", 0) + 1
print(f"Connection created for {event.address} (total: {stats['connections']})")
def connection_checked_out(self, event):
stats = self.pool_stats.get(event.address, {})
stats["checkouts"] = stats.get("checkouts", 0) + 1
print(f"Connection checked out from {event.address}")
def connection_check_out_failed(self, event):
print(f"Connection checkout failed for {event.address}: {event.reason}")
def pool_cleared(self, event):
print(f"Pool cleared for {event.address}")
# Register pool monitor
monitoring.register(PoolMonitor())
# Create client (will trigger pool creation)
client = pymongo.MongoClient(maxPoolSize=10, minPoolSize=2)import pymongo
from pymongo import monitoring
import time
from collections import defaultdict
class PerformanceMonitor(monitoring.CommandListener):
def __init__(self):
self.command_times = defaultdict(list)
self.slow_queries = []
def started(self, event):
event.start_time = time.time()
def succeeded(self, event):
duration_ms = event.duration_micros / 1000
command_name = event.command_name
self.command_times[command_name].append(duration_ms)
# Log slow queries (>100ms)
if duration_ms > 100:
self.slow_queries.append({
"command": command_name,
"duration_ms": duration_ms,
"details": event.command
})
def get_stats(self):
"""Get performance statistics."""
stats = {}
for cmd, times in self.command_times.items():
stats[cmd] = {
"count": len(times),
"avg_ms": sum(times) / len(times),
"max_ms": max(times),
"min_ms": min(times)
}
return stats
def get_slow_queries(self, limit=10):
"""Get slowest queries."""
return sorted(
self.slow_queries,
key=lambda x: x["duration_ms"],
reverse=True
)[:limit]
# Set up monitoring
perf_monitor = PerformanceMonitor()
monitoring.register(perf_monitor)
# Run some operations
client = pymongo.MongoClient()
collection = client.testdb.testcoll
# Generate some operations
for i in range(100):
collection.insert_one({"index": i, "data": f"test_data_{i}"})
# Create index (slow operation)
collection.create_index("index")
# Run some queries
collection.find({"index": {"$gt": 50}}).limit(10).to_list()
# Get performance stats
stats = perf_monitor.get_stats()
print("Command Performance Stats:")
for cmd, stat in stats.items():
print(f"{cmd}: {stat['count']} ops, avg: {stat['avg_ms']:.2f}ms")
slow_queries = perf_monitor.get_slow_queries()
print(f"\nTop {len(slow_queries)} Slow Queries:")
for query in slow_queries:
print(f"{query['command']}: {query['duration_ms']:.2f}ms")Install with Tessl CLI
npx tessl i tessl/pypi-pymongo