CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pymongo

Official Python driver for MongoDB providing comprehensive tools for database operations, BSON handling, and GridFS file storage

Pending
Overview
Eval results
Files

advanced-queries.mddocs/

Advanced Queries and Aggregation

Aggregation pipelines, advanced querying, sorting, pagination, and cursor operations for complex data retrieval patterns.

Capabilities

Aggregation Pipelines

Execute complex data processing pipelines with multiple stages.

class Collection:
    def aggregate(self, pipeline, session=None, **kwargs):
        """
        Execute aggregation pipeline.

        Parameters:
        - pipeline: list of aggregation stages
        - allowDiskUse: enable disk usage for large operations
        - maxTimeMS: maximum execution time
        - batchSize: cursor batch size
        - collation: collation options
        - hint: index hint
        - session: optional ClientSession

        Returns:
        CommandCursor: Results cursor
        """

    def aggregate_raw_batches(self, pipeline, **kwargs):
        """
        Execute aggregation returning raw BSON batches.

        Parameters:
        - pipeline: list of aggregation stages
        - kwargs: same as aggregate()

        Returns:
        RawBSONDocument batches
        """

    def map_reduce(
        self,
        map,
        reduce,
        out,
        full_response=False,
        session=None,
        **kwargs
    ):
        """
        Execute map-reduce operation (deprecated - use aggregation).

        Parameters:
        - map: JavaScript map function
        - reduce: JavaScript reduce function
        - out: output collection specification
        - full_response: return full response
        - query: optional query filter
        - sort: optional sort specification
        - limit: optional limit
        - finalize: optional finalize function
        - scope: optional JavaScript scope
        - session: optional ClientSession

        Returns:
        MapReduce results or Collection
        """

Index Management

Create, manage, and optimize database indexes for query performance.

class Collection:
    def create_index(self, keys, session=None, **kwargs):
        """
        Create an index.

        Parameters:
        - keys: index specification (field name or list of tuples)
        - unique: create unique index
        - background: build index in background (deprecated)
        - sparse: create sparse index
        - expireAfterSeconds: TTL for documents
        - partialFilterExpression: partial index filter
        - collation: collation options
        - session: optional ClientSession

        Returns:
        str: Index name
        """

    def create_indexes(self, indexes, session=None, **kwargs):
        """
        Create multiple indexes.

        Parameters:
        - indexes: list of IndexModel instances
        - session: optional ClientSession

        Returns:
        list: Created index names
        """

    def drop_index(self, index_or_name, session=None, **kwargs):
        """
        Drop an index.

        Parameters:
        - index_or_name: index name or specification
        - session: optional ClientSession
        """

    def drop_indexes(self, session=None, **kwargs):
        """
        Drop all indexes except _id.

        Parameters:
        - session: optional ClientSession
        """

    def list_indexes(self, session=None):
        """
        List collection indexes.

        Parameters:
        - session: optional ClientSession

        Returns:
        CommandCursor: Index information
        """

    def index_information(self, session=None):
        """
        Get index information as dictionary.

        Parameters:
        - session: optional ClientSession

        Returns:
        dict: Index information mapping
        """

    def reindex(self, session=None, **kwargs):
        """
        Rebuild all indexes.

        Parameters:
        - session: optional ClientSession
        """

Text Search

Full-text search capabilities with text indexes.

class Collection:
    def find(self, filter=None, **kwargs):
        """
        Find with text search support.

        Text search parameters:
        - filter: can include {"$text": {"$search": "search terms"}}
        - projection: can include text score with {"score": {"$meta": "textScore"}}
        - sort: can sort by text score with [("score", {"$meta": "textScore"})]

        Returns:
        Cursor: Query results
        """

Geospatial Queries

Query documents based on geospatial data and proximity.

class Collection:
    def find(self, filter=None, **kwargs):
        """
        Find with geospatial query support.

        Geospatial operators in filter:
        - $near: find near a point
        - $nearSphere: spherical near query
        - $geoWithin: find within geometry
        - $geoIntersects: find intersecting geometry
        - $geometry: GeoJSON geometry specification

        Returns:
        Cursor: Query results
        """

    def create_index(self, keys, **kwargs):
        """
        Create geospatial indexes.

        Geospatial index types:
        - "2d": legacy 2D index
        - "2dsphere": spherical geometry index
        - "geoHaystack": haystack index (deprecated)

        Returns:
        str: Index name
        """

Advanced Query Operations

Complex query patterns and specialized operations.

class Collection:
    def find_raw_batches(self, filter=None, projection=None, **kwargs):
        """
        Find returning raw BSON batches.

        Parameters:
        - filter: query criteria
        - projection: fields to return
        - kwargs: same as find()

        Returns:
        RawBSONDocument batches
        """

    def parallel_scan(self, num_cursors, session=None, **kwargs):
        """
        Scan collection in parallel.

        Parameters:
        - num_cursors: number of parallel cursors
        - session: optional ClientSession

        Returns:
        list: List of CommandCursor instances
        """

    def options(self):
        """
        Get collection options.

        Returns:
        dict: Collection options
        """

    def rename(self, new_name, session=None, **kwargs):
        """
        Rename collection.

        Parameters:
        - new_name: new collection name
        - dropTarget: drop target if exists
        - session: optional ClientSession
        """

Cursor Advanced Operations

Advanced cursor manipulation and optimization.

class Cursor:
    def hint(self, index):
        """
        Force use of specific index.

        Parameters:
        - index: index name or specification

        Returns:
        Cursor: Modified cursor (chainable)
        """

    def max_time_ms(self, max_time_ms):
        """
        Set maximum execution time.

        Parameters:
        - max_time_ms: maximum time in milliseconds

        Returns:
        Cursor: Modified cursor (chainable)
        """

    def max_scan(self, max_scan):
        """
        Set maximum documents to scan (deprecated).

        Parameters:
        - max_scan: maximum documents to examine

        Returns:
        Cursor: Modified cursor (chainable)
        """

    def min(self, spec):
        """
        Set minimum index bounds.

        Parameters:
        - spec: minimum bound specification

        Returns:
        Cursor: Modified cursor (chainable)
        """

    def max(self, spec):
        """
        Set maximum index bounds.

        Parameters:
        - spec: maximum bound specification

        Returns:
        Cursor: Modified cursor (chainable)
        """

    def comment(self, comment):
        """
        Add comment to query for profiling.

        Parameters:
        - comment: query comment

        Returns:
        Cursor: Modified cursor (chainable)
        """

    def collation(self, collation):
        """
        Set collation for string comparison.

        Parameters:
        - collation: collation specification

        Returns:
        Cursor: Modified cursor (chainable)
        """

    def allow_disk_use(self, allow_disk_use):
        """
        Allow disk usage for large sorts.

        Parameters:
        - allow_disk_use: enable disk usage

        Returns:
        Cursor: Modified cursor (chainable)
        """

    def explain(self, verbosity='queryPlanner'):
        """
        Get query execution plan.

        Parameters:
        - verbosity: explanation verbosity level

        Returns:
        dict: Query execution plan
        """

Usage Examples

Aggregation Pipeline

from pymongo import MongoClient

client = MongoClient()
db = client.sales
collection = db.orders

# Group by category and calculate totals
pipeline = [
    {"$match": {"date": {"$gte": "2023-01-01"}}},
    {"$group": {
        "_id": "$category",
        "total_sales": {"$sum": "$amount"},
        "order_count": {"$sum": 1},
        "avg_order": {"$avg": "$amount"}
    }},
    {"$sort": {"total_sales": -1}},
    {"$limit": 10}
]

results = collection.aggregate(pipeline)
for doc in results:
    print(f"Category: {doc['_id']}, Sales: ${doc['total_sales']:.2f}")

# Complex pipeline with multiple stages
pipeline = [
    {"$unwind": "$items"},
    {"$lookup": {
        "from": "products",
        "localField": "items.product_id",
        "foreignField": "_id",
        "as": "product_info"
    }},
    {"$project": {
        "customer": 1,
        "item_total": {"$multiply": ["$items.quantity", "$items.price"]},
        "product_name": {"$arrayElemAt": ["$product_info.name", 0]}
    }},
    {"$group": {
        "_id": "$customer",
        "total_spent": {"$sum": "$item_total"},
        "products": {"$addToSet": "$product_name"}
    }}
]

customer_analysis = collection.aggregate(pipeline, allowDiskUse=True)

Index Management

from pymongo import ASCENDING, DESCENDING, GEO2D, TEXT

# Create simple index
collection.create_index("email", unique=True)

# Create compound index
collection.create_index([
    ("category", ASCENDING),
    ("price", DESCENDING)
])

# Create text index for search
collection.create_index([
    ("title", TEXT),
    ("description", TEXT)
], default_language='english')

# Create geospatial index
collection.create_index([("location", GEO2D)])

# Create TTL index for expiration
collection.create_index("expire_at", expireAfterSeconds=3600)

# Create partial index
collection.create_index(
    "email",
    partialFilterExpression={"email": {"$exists": True}}
)

# List all indexes
for index in collection.list_indexes():
    print(f"Index: {index['name']}, Keys: {index['key']}")

Text Search

# Create text index
collection.create_index([("title", TEXT), ("content", TEXT)])

# Search for documents
results = collection.find(
    {"$text": {"$search": "python mongodb"}},
    {"score": {"$meta": "textScore"}}
).sort([("score", {"$meta": "textScore"})])

for doc in results:
    print(f"Title: {doc['title']}, Score: {doc['score']}")

# Advanced text search
results = collection.find({
    "$text": {
        "$search": "\"exact phrase\" -excluded +required",
        "$language": "english",
        "$caseSensitive": False
    }
})

Geospatial Queries

# Create 2dsphere index for GeoJSON
collection.create_index([("location", "2dsphere")])

# Find nearby locations
nearby = collection.find({
    "location": {
        "$near": {
            "$geometry": {
                "type": "Point",
                "coordinates": [-73.9857, 40.7484]  # NYC coordinates
            },
            "$maxDistance": 1000  # meters
        }
    }
})

# Find within a polygon
within_area = collection.find({
    "location": {
        "$geoWithin": {
            "$geometry": {
                "type": "Polygon",
                "coordinates": [[
                    [-74.0, 40.7], [-73.9, 40.7],
                    [-73.9, 40.8], [-74.0, 40.8],
                    [-74.0, 40.7]
                ]]
            }
        }
    }
})

Advanced Cursor Operations

# Use query hints and optimization
cursor = collection.find({"category": "electronics"}) \
    .hint("category_1_price_-1") \
    .max_time_ms(5000) \
    .comment("Product search query") \
    .allow_disk_use(True)

# Set collation for string comparison
cursor = collection.find({"name": {"$regex": "^a"}}) \
    .collation({
        "locale": "en",
        "strength": 1,  # Case insensitive
        "caseLevel": False
    })

# Get query execution plan
plan = collection.find({"price": {"$gt": 100}}).explain()
print(f"Query plan: {plan['queryPlanner']['winningPlan']}")

Install with Tessl CLI

npx tessl i tessl/pypi-pymongo

docs

advanced-queries.md

bson-handling.md

bulk-transactions.md

client-connection.md

database-collection.md

gridfs-storage.md

index.md

monitoring-events.md

tile.json