Fake pymongo stub for testing simple MongoDB-dependent code
—
Aggregation pipeline operations and data analysis functions including pipeline stages, aggregation operators, and distinct value queries. Supports comprehensive data transformation and analysis workflows.
Execute multi-stage aggregation pipelines for complex data transformations and analysis.
def aggregate(self, pipeline, session=None, **kwargs):
"""
Execute an aggregation pipeline.
Parameters:
- pipeline: list, list of aggregation pipeline stages
- session: ClientSession, session to use (ignored)
- **kwargs: additional aggregation options
Returns:
CommandCursor: cursor over aggregation results
Raises:
OperationFailure: if aggregation fails
"""Usage Example:
collection = mongomock.MongoClient().db.orders
# Basic aggregation pipeline
pipeline = [
{'$match': {'status': 'completed'}},
{'$group': {
'_id': '$customer_id',
'total_amount': {'$sum': '$amount'},
'order_count': {'$sum': 1}
}},
{'$sort': {'total_amount': -1}},
{'$limit': 10}
]
results = list(collection.aggregate(pipeline))
for result in results:
print(f"Customer {result['_id']}: ${result['total_amount']}")
# Complex aggregation with multiple stages
advanced_pipeline = [
# Filter documents
{'$match': {'date': {'$gte': datetime(2023, 1, 1)}}},
# Add computed fields
{'$addFields': {
'month': {'$month': '$date'},
'profit_margin': {'$divide': ['$profit', '$revenue']}
}},
# Group by month
{'$group': {
'_id': '$month',
'total_revenue': {'$sum': '$revenue'},
'total_profit': {'$sum': '$profit'},
'avg_margin': {'$avg': '$profit_margin'},
'order_count': {'$sum': 1}
}},
# Sort by month
{'$sort': {'_id': 1}},
# Project final shape
{'$project': {
'month': '$_id',
'revenue': '$total_revenue',
'profit': '$total_profit',
'margin': {'$round': ['$avg_margin', 4]},
'orders': '$order_count',
'_id': 0
}}
]
monthly_stats = list(collection.aggregate(advanced_pipeline))Support for standard MongoDB aggregation pipeline stages.
$match - Filtering:
# Filter stage
{'$match': {'status': 'active', 'age': {'$gte': 18}}}
# Complex filters
{'$match': {
'$and': [
{'category': {'$in': ['electronics', 'books']}},
{'price': {'$lt': 100}},
{'in_stock': True}
]
}}$group - Grouping and Aggregation:
# Group with aggregation functions
{'$group': {
'_id': '$department',
'avg_salary': {'$avg': '$salary'},
'max_salary': {'$max': '$salary'},
'min_salary': {'$min': '$salary'},
'total_employees': {'$sum': 1},
'salary_sum': {'$sum': '$salary'}
}}
# Multiple grouping fields
{'$group': {
'_id': {
'department': '$department',
'level': '$level'
},
'count': {'$sum': 1},
'names': {'$push': '$name'}
}}$project - Field Selection and Transformation:
# Select and rename fields
{'$project': {
'full_name': {'$concat': ['$first_name', ' ', '$last_name']},
'age_category': {
'$cond': {
'if': {'$gte': ['$age', 65]},
'then': 'senior',
'else': 'adult'
}
},
'email': 1,
'_id': 0
}}
# Mathematical operations
{'$project': {
'name': 1,
'bmi': {
'$divide': [
'$weight',
{'$pow': [{'$divide': ['$height', 100]}, 2]}
]
},
'score_percentage': {'$multiply': ['$score', 100]}
}}$sort - Sorting:
# Single field sort
{'$sort': {'created_date': -1}}
# Multiple field sort
{'$sort': {'department': 1, 'salary': -1, 'name': 1}}$limit and $skip - Pagination:
# Limit results
{'$limit': 100}
# Skip documents
{'$skip': 50}
# Pagination example
pipeline = [
{'$match': {'status': 'active'}},
{'$sort': {'created_date': -1}},
{'$skip': 20}, # Skip first 20
{'$limit': 10} # Get next 10
]Usage Example:
collection = mongomock.MongoClient().db.employees
# Employee statistics by department
department_stats = list(collection.aggregate([
{'$match': {'status': 'active'}},
{'$group': {
'_id': '$department',
'avg_salary': {'$avg': '$salary'},
'employee_count': {'$sum': 1},
'total_salary_budget': {'$sum': '$salary'}
}},
{'$sort': {'avg_salary': -1}},
{'$project': {
'department': '$_id',
'avg_salary': {'$round': ['$avg_salary', 2]},
'employee_count': 1,
'total_budget': '$total_salary_budget',
'_id': 0
}}
]))Get distinct values for fields with optional filtering.
def distinct(self, key, filter=None, session=None):
"""
Get distinct values for a field.
Parameters:
- key: str, field name to get distinct values for
- filter: dict, optional filter to apply before getting distinct values
- session: ClientSession, session to use (ignored)
Returns:
list: list of distinct values
Raises:
OperationFailure: if operation fails
"""Usage Example:
collection = mongomock.MongoClient().db.products
# Get all distinct categories
categories = collection.distinct('category')
print(f"Categories: {categories}")
# Get distinct values with filter
active_brands = collection.distinct('brand', {'status': 'active'})
print(f"Active brands: {active_brands}")
# Get distinct values for nested fields
distinct_countries = collection.distinct('supplier.address.country')
# Complex filtering
recent_tags = collection.distinct('tags', {
'created_date': {'$gte': datetime(2023, 1, 1)},
'status': {'$in': ['published', 'featured']}
})Count documents with various filtering and aggregation options.
def count_documents(self, filter, **kwargs):
"""
Count documents matching filter criteria.
Parameters:
- filter: dict, query filter
- **kwargs: additional count options
Returns:
int: number of matching documents
"""
def estimated_document_count(self, **kwargs):
"""
Get estimated total document count.
Parameters:
- **kwargs: additional estimation options
Returns:
int: estimated document count
"""Usage Example:
collection = mongomock.MongoClient().db.users
# Count with filter
active_users = collection.count_documents({'status': 'active'})
premium_users = collection.count_documents({'subscription': 'premium'})
# Count with complex filter
recent_active = collection.count_documents({
'status': 'active',
'last_login': {'$gte': datetime.now() - timedelta(days=30)}
})
# Total document count
total_users = collection.estimated_document_count()
print(f"Total: {total_users}, Active: {active_users}, Premium: {premium_users}")Support for MongoDB aggregation operators in pipeline stages.
Arithmetic Operators:
# Mathematical operations
{'$add': ['$price', '$tax']}
{'$subtract': ['$revenue', '$cost']}
{'$multiply': ['$quantity', '$unit_price']}
{'$divide': ['$total_score', '$test_count']}
{'$mod': ['$value', 10]}
{'$pow': ['$base', '$exponent']}Comparison Operators:
# Comparison operations
{'$eq': ['$status', 'active']}
{'$ne': ['$type', 'deleted']}
{'$gt': ['$score', 80]}
{'$gte': ['$age', 18]}
{'$lt': ['$price', 100]}
{'$lte': ['$quantity', 0]}Logical Operators:
# Logical operations
{'$and': [{'$gt': ['$age', 18]}, {'$lt': ['$age', 65]}]}
{'$or': [{'$eq': ['$status', 'premium']}, {'$gt': ['$score', 90]}]}
{'$not': {'$eq': ['$deleted', True]}}String Operators:
# String operations
{'$concat': ['$first_name', ' ', '$last_name']}
{'$substr': ['$description', 0, 100]}
{'$toLower': '$email'}
{'$toUpper': '$code'}
{'$split': ['$tags', ',']}Array Operators:
# Array operations
{'$size': '$items'}
{'$push': '$category'}
{'$addToSet': '$tag'}
{'$in': ['premium', '$memberships']}
{'$slice': ['$recent_orders', 5]}Usage Example:
collection = mongomock.MongoClient().db.orders
# Complex aggregation with operators
pipeline = [
{'$addFields': {
'total_with_tax': {'$multiply': ['$subtotal', 1.08]},
'customer_name': {'$concat': ['$customer.first', ' ', '$customer.last']},
'order_month': {'$month': '$order_date'},
'item_count': {'$size': '$items'},
'is_large_order': {'$gte': ['$subtotal', 1000]}
}},
{'$match': {'is_large_order': True}},
{'$group': {
'_id': '$order_month',
'large_order_count': {'$sum': 1},
'avg_order_value': {'$avg': '$total_with_tax'},
'max_items': {'$max': '$item_count'}
}},
{'$sort': {'_id': 1}}
]
large_order_stats = list(collection.aggregate(pipeline))Optimize aggregation pipeline performance through proper stage ordering and indexing.
Pipeline Optimization:
# Efficient pipeline (filter early, reduce data flow)
efficient_pipeline = [
{'$match': {'status': 'active'}}, # Filter first
{'$match': {'date': {'$gte': recent_date}}}, # Additional filters early
{'$project': {'needed_field1': 1, 'needed_field2': 1}}, # Select only needed fields
{'$group': {'_id': '$category', 'count': {'$sum': 1}}},
{'$sort': {'count': -1}},
{'$limit': 10}
]
# Index support for aggregation
collection.create_index([('status', 1), ('date', 1)]) # Supports initial $matchUsage Example:
collection = mongomock.MongoClient().db.large_dataset
# Create supporting indexes
collection.create_index('category')
collection.create_index([('status', 1), ('created_date', -1)])
# Optimized aggregation
optimized_results = list(collection.aggregate([
{'$match': {'status': 'published', 'created_date': {'$gte': recent_date}}},
{'$project': {'category': 1, 'views': 1, 'likes': 1}},
{'$group': {
'_id': '$category',
'total_views': {'$sum': '$views'},
'total_likes': {'$sum': '$likes'},
'article_count': {'$sum': 1}
}},
{'$addFields': {
'engagement_ratio': {'$divide': ['$total_likes', '$total_views']}
}},
{'$sort': {'engagement_ratio': -1}},
{'$limit': 5}
]))Install with Tessl CLI
npx tessl i tessl/pypi-mongomock