Official Python driver for MongoDB providing comprehensive tools for database operations, BSON handling, and GridFS file storage
—
Aggregation pipelines, advanced querying, sorting, pagination, and cursor operations for complex data retrieval patterns.
Execute complex data processing pipelines with multiple stages.
class Collection:
def aggregate(self, pipeline, session=None, **kwargs):
"""
Execute aggregation pipeline.
Parameters:
- pipeline: list of aggregation stages
- allowDiskUse: enable disk usage for large operations
- maxTimeMS: maximum execution time
- batchSize: cursor batch size
- collation: collation options
- hint: index hint
- session: optional ClientSession
Returns:
CommandCursor: Results cursor
"""
def aggregate_raw_batches(self, pipeline, **kwargs):
"""
Execute aggregation returning raw BSON batches.
Parameters:
- pipeline: list of aggregation stages
- kwargs: same as aggregate()
Returns:
RawBSONDocument batches
"""
def map_reduce(
self,
map,
reduce,
out,
full_response=False,
session=None,
**kwargs
):
"""
Execute map-reduce operation (deprecated - use aggregation).
Parameters:
- map: JavaScript map function
- reduce: JavaScript reduce function
- out: output collection specification
- full_response: return full response
- query: optional query filter
- sort: optional sort specification
- limit: optional limit
- finalize: optional finalize function
- scope: optional JavaScript scope
- session: optional ClientSession
Returns:
MapReduce results or Collection
"""Create, manage, and optimize database indexes for query performance.
class Collection:
def create_index(self, keys, session=None, **kwargs):
"""
Create an index.
Parameters:
- keys: index specification (field name or list of tuples)
- unique: create unique index
- background: build index in background (deprecated)
- sparse: create sparse index
- expireAfterSeconds: TTL for documents
- partialFilterExpression: partial index filter
- collation: collation options
- session: optional ClientSession
Returns:
str: Index name
"""
def create_indexes(self, indexes, session=None, **kwargs):
"""
Create multiple indexes.
Parameters:
- indexes: list of IndexModel instances
- session: optional ClientSession
Returns:
list: Created index names
"""
def drop_index(self, index_or_name, session=None, **kwargs):
"""
Drop an index.
Parameters:
- index_or_name: index name or specification
- session: optional ClientSession
"""
def drop_indexes(self, session=None, **kwargs):
"""
Drop all indexes except _id.
Parameters:
- session: optional ClientSession
"""
def list_indexes(self, session=None):
"""
List collection indexes.
Parameters:
- session: optional ClientSession
Returns:
CommandCursor: Index information
"""
def index_information(self, session=None):
"""
Get index information as dictionary.
Parameters:
- session: optional ClientSession
Returns:
dict: Index information mapping
"""
def reindex(self, session=None, **kwargs):
"""
Rebuild all indexes.
Parameters:
- session: optional ClientSession
"""Full-text search capabilities with text indexes.
class Collection:
def find(self, filter=None, **kwargs):
"""
Find with text search support.
Text search parameters:
- filter: can include {"$text": {"$search": "search terms"}}
- projection: can include text score with {"score": {"$meta": "textScore"}}
- sort: can sort by text score with [("score", {"$meta": "textScore"})]
Returns:
Cursor: Query results
"""Query documents based on geospatial data and proximity.
class Collection:
def find(self, filter=None, **kwargs):
"""
Find with geospatial query support.
Geospatial operators in filter:
- $near: find near a point
- $nearSphere: spherical near query
- $geoWithin: find within geometry
- $geoIntersects: find intersecting geometry
- $geometry: GeoJSON geometry specification
Returns:
Cursor: Query results
"""
def create_index(self, keys, **kwargs):
"""
Create geospatial indexes.
Geospatial index types:
- "2d": legacy 2D index
- "2dsphere": spherical geometry index
- "geoHaystack": haystack index (deprecated)
Returns:
str: Index name
"""Complex query patterns and specialized operations.
class Collection:
def find_raw_batches(self, filter=None, projection=None, **kwargs):
"""
Find returning raw BSON batches.
Parameters:
- filter: query criteria
- projection: fields to return
- kwargs: same as find()
Returns:
RawBSONDocument batches
"""
def parallel_scan(self, num_cursors, session=None, **kwargs):
"""
Scan collection in parallel.
Parameters:
- num_cursors: number of parallel cursors
- session: optional ClientSession
Returns:
list: List of CommandCursor instances
"""
def options(self):
"""
Get collection options.
Returns:
dict: Collection options
"""
def rename(self, new_name, session=None, **kwargs):
"""
Rename collection.
Parameters:
- new_name: new collection name
- dropTarget: drop target if exists
- session: optional ClientSession
"""Advanced cursor manipulation and optimization.
class Cursor:
def hint(self, index):
"""
Force use of specific index.
Parameters:
- index: index name or specification
Returns:
Cursor: Modified cursor (chainable)
"""
def max_time_ms(self, max_time_ms):
"""
Set maximum execution time.
Parameters:
- max_time_ms: maximum time in milliseconds
Returns:
Cursor: Modified cursor (chainable)
"""
def max_scan(self, max_scan):
"""
Set maximum documents to scan (deprecated).
Parameters:
- max_scan: maximum documents to examine
Returns:
Cursor: Modified cursor (chainable)
"""
def min(self, spec):
"""
Set minimum index bounds.
Parameters:
- spec: minimum bound specification
Returns:
Cursor: Modified cursor (chainable)
"""
def max(self, spec):
"""
Set maximum index bounds.
Parameters:
- spec: maximum bound specification
Returns:
Cursor: Modified cursor (chainable)
"""
def comment(self, comment):
"""
Add comment to query for profiling.
Parameters:
- comment: query comment
Returns:
Cursor: Modified cursor (chainable)
"""
def collation(self, collation):
"""
Set collation for string comparison.
Parameters:
- collation: collation specification
Returns:
Cursor: Modified cursor (chainable)
"""
def allow_disk_use(self, allow_disk_use):
"""
Allow disk usage for large sorts.
Parameters:
- allow_disk_use: enable disk usage
Returns:
Cursor: Modified cursor (chainable)
"""
def explain(self, verbosity='queryPlanner'):
"""
Get query execution plan.
Parameters:
- verbosity: explanation verbosity level
Returns:
dict: Query execution plan
"""from pymongo import MongoClient
client = MongoClient()
db = client.sales
collection = db.orders
# Group by category and calculate totals
pipeline = [
{"$match": {"date": {"$gte": "2023-01-01"}}},
{"$group": {
"_id": "$category",
"total_sales": {"$sum": "$amount"},
"order_count": {"$sum": 1},
"avg_order": {"$avg": "$amount"}
}},
{"$sort": {"total_sales": -1}},
{"$limit": 10}
]
results = collection.aggregate(pipeline)
for doc in results:
print(f"Category: {doc['_id']}, Sales: ${doc['total_sales']:.2f}")
# Complex pipeline with multiple stages
pipeline = [
{"$unwind": "$items"},
{"$lookup": {
"from": "products",
"localField": "items.product_id",
"foreignField": "_id",
"as": "product_info"
}},
{"$project": {
"customer": 1,
"item_total": {"$multiply": ["$items.quantity", "$items.price"]},
"product_name": {"$arrayElemAt": ["$product_info.name", 0]}
}},
{"$group": {
"_id": "$customer",
"total_spent": {"$sum": "$item_total"},
"products": {"$addToSet": "$product_name"}
}}
]
customer_analysis = collection.aggregate(pipeline, allowDiskUse=True)from pymongo import ASCENDING, DESCENDING, GEO2D, TEXT
# Create simple index
collection.create_index("email", unique=True)
# Create compound index
collection.create_index([
("category", ASCENDING),
("price", DESCENDING)
])
# Create text index for search
collection.create_index([
("title", TEXT),
("description", TEXT)
], default_language='english')
# Create geospatial index
collection.create_index([("location", GEO2D)])
# Create TTL index for expiration
collection.create_index("expire_at", expireAfterSeconds=3600)
# Create partial index
collection.create_index(
"email",
partialFilterExpression={"email": {"$exists": True}}
)
# List all indexes
for index in collection.list_indexes():
print(f"Index: {index['name']}, Keys: {index['key']}")# Create text index
collection.create_index([("title", TEXT), ("content", TEXT)])
# Search for documents
results = collection.find(
{"$text": {"$search": "python mongodb"}},
{"score": {"$meta": "textScore"}}
).sort([("score", {"$meta": "textScore"})])
for doc in results:
print(f"Title: {doc['title']}, Score: {doc['score']}")
# Advanced text search
results = collection.find({
"$text": {
"$search": "\"exact phrase\" -excluded +required",
"$language": "english",
"$caseSensitive": False
}
})# Create 2dsphere index for GeoJSON
collection.create_index([("location", "2dsphere")])
# Find nearby locations
nearby = collection.find({
"location": {
"$near": {
"$geometry": {
"type": "Point",
"coordinates": [-73.9857, 40.7484] # NYC coordinates
},
"$maxDistance": 1000 # meters
}
}
})
# Find within a polygon
within_area = collection.find({
"location": {
"$geoWithin": {
"$geometry": {
"type": "Polygon",
"coordinates": [[
[-74.0, 40.7], [-73.9, 40.7],
[-73.9, 40.8], [-74.0, 40.8],
[-74.0, 40.7]
]]
}
}
}
})# Use query hints and optimization
cursor = collection.find({"category": "electronics"}) \
.hint("category_1_price_-1") \
.max_time_ms(5000) \
.comment("Product search query") \
.allow_disk_use(True)
# Set collation for string comparison
cursor = collection.find({"name": {"$regex": "^a"}}) \
.collation({
"locale": "en",
"strength": 1, # Case insensitive
"caseLevel": False
})
# Get query execution plan
plan = collection.find({"price": {"$gt": 100}}).explain()
print(f"Query plan: {plan['queryPlanner']['winningPlan']}")Install with Tessl CLI
npx tessl i tessl/pypi-pymongo