0
# Aggregation and Analysis
1
2
Aggregation pipeline operations and data analysis functions including pipeline stages, aggregation operators, and distinct value queries. Supports comprehensive data transformation and analysis workflows.
3
4
## Capabilities
5
6
### Aggregation Pipeline
7
8
Execute multi-stage aggregation pipelines for complex data transformations and analysis.
9
10
```python { .api }
11
def aggregate(self, pipeline, session=None, **kwargs):
12
"""
13
Execute an aggregation pipeline.
14
15
Parameters:
16
- pipeline: list, list of aggregation pipeline stages
17
- session: ClientSession, session to use (ignored)
18
- **kwargs: additional aggregation options
19
20
Returns:
21
CommandCursor: cursor over aggregation results
22
23
Raises:
24
OperationFailure: if aggregation fails
25
"""
26
```
27
28
**Usage Example:**
29
30
```python
31
collection = mongomock.MongoClient().db.orders
32
33
# Basic aggregation pipeline
34
pipeline = [
35
{'$match': {'status': 'completed'}},
36
{'$group': {
37
'_id': '$customer_id',
38
'total_amount': {'$sum': '$amount'},
39
'order_count': {'$sum': 1}
40
}},
41
{'$sort': {'total_amount': -1}},
42
{'$limit': 10}
43
]
44
45
results = list(collection.aggregate(pipeline))
46
for result in results:
47
print(f"Customer {result['_id']}: ${result['total_amount']}")
48
49
# Complex aggregation with multiple stages
50
advanced_pipeline = [
51
# Filter documents
52
{'$match': {'date': {'$gte': datetime(2023, 1, 1)}}},
53
54
# Add computed fields
55
{'$addFields': {
56
'month': {'$month': '$date'},
57
'profit_margin': {'$divide': ['$profit', '$revenue']}
58
}},
59
60
# Group by month
61
{'$group': {
62
'_id': '$month',
63
'total_revenue': {'$sum': '$revenue'},
64
'total_profit': {'$sum': '$profit'},
65
'avg_margin': {'$avg': '$profit_margin'},
66
'order_count': {'$sum': 1}
67
}},
68
69
# Sort by month
70
{'$sort': {'_id': 1}},
71
72
# Project final shape
73
{'$project': {
74
'month': '$_id',
75
'revenue': '$total_revenue',
76
'profit': '$total_profit',
77
'margin': {'$round': ['$avg_margin', 4]},
78
'orders': '$order_count',
79
'_id': 0
80
}}
81
]
82
83
monthly_stats = list(collection.aggregate(advanced_pipeline))
84
```
85
86
### Common Aggregation Stages
87
88
Support for standard MongoDB aggregation pipeline stages.
89
90
**$match - Filtering:**
91
92
```python
93
# Filter stage
94
{'$match': {'status': 'active', 'age': {'$gte': 18}}}
95
96
# Complex filters
97
{'$match': {
98
'$and': [
99
{'category': {'$in': ['electronics', 'books']}},
100
{'price': {'$lt': 100}},
101
{'in_stock': True}
102
]
103
}}
104
```
105
106
**$group - Grouping and Aggregation:**
107
108
```python
109
# Group with aggregation functions
110
{'$group': {
111
'_id': '$department',
112
'avg_salary': {'$avg': '$salary'},
113
'max_salary': {'$max': '$salary'},
114
'min_salary': {'$min': '$salary'},
115
'total_employees': {'$sum': 1},
116
'salary_sum': {'$sum': '$salary'}
117
}}
118
119
# Multiple grouping fields
120
{'$group': {
121
'_id': {
122
'department': '$department',
123
'level': '$level'
124
},
125
'count': {'$sum': 1},
126
'names': {'$push': '$name'}
127
}}
128
```
129
130
**$project - Field Selection and Transformation:**
131
132
```python
133
# Select and rename fields
134
{'$project': {
135
'full_name': {'$concat': ['$first_name', ' ', '$last_name']},
136
'age_category': {
137
'$cond': {
138
'if': {'$gte': ['$age', 65]},
139
'then': 'senior',
140
'else': 'adult'
141
}
142
},
143
'email': 1,
144
'_id': 0
145
}}
146
147
# Mathematical operations
148
{'$project': {
149
'name': 1,
150
'bmi': {
151
'$divide': [
152
'$weight',
153
{'$pow': [{'$divide': ['$height', 100]}, 2]}
154
]
155
},
156
'score_percentage': {'$multiply': ['$score', 100]}
157
}}
158
```
159
160
**$sort - Sorting:**
161
162
```python
163
# Single field sort
164
{'$sort': {'created_date': -1}}
165
166
# Multiple field sort
167
{'$sort': {'department': 1, 'salary': -1, 'name': 1}}
168
```
169
170
**$limit and $skip - Pagination:**
171
172
```python
173
# Limit results
174
{'$limit': 100}
175
176
# Skip documents
177
{'$skip': 50}
178
179
# Pagination example
180
pipeline = [
181
{'$match': {'status': 'active'}},
182
{'$sort': {'created_date': -1}},
183
{'$skip': 20}, # Skip first 20
184
{'$limit': 10} # Get next 10
185
]
186
```
187
188
**Usage Example:**
189
190
```python
191
collection = mongomock.MongoClient().db.employees
192
193
# Employee statistics by department
194
department_stats = list(collection.aggregate([
195
{'$match': {'status': 'active'}},
196
{'$group': {
197
'_id': '$department',
198
'avg_salary': {'$avg': '$salary'},
199
'employee_count': {'$sum': 1},
200
'total_salary_budget': {'$sum': '$salary'}
201
}},
202
{'$sort': {'avg_salary': -1}},
203
{'$project': {
204
'department': '$_id',
205
'avg_salary': {'$round': ['$avg_salary', 2]},
206
'employee_count': 1,
207
'total_budget': '$total_salary_budget',
208
'_id': 0
209
}}
210
]))
211
```
212
213
### Distinct Values
214
215
Get distinct values for fields with optional filtering.
216
217
```python { .api }
218
def distinct(self, key, filter=None, session=None):
219
"""
220
Get distinct values for a field.
221
222
Parameters:
223
- key: str, field name to get distinct values for
224
- filter: dict, optional filter to apply before getting distinct values
225
- session: ClientSession, session to use (ignored)
226
227
Returns:
228
list: list of distinct values
229
230
Raises:
231
OperationFailure: if operation fails
232
"""
233
```
234
235
**Usage Example:**
236
237
```python
238
collection = mongomock.MongoClient().db.products
239
240
# Get all distinct categories
241
categories = collection.distinct('category')
242
print(f"Categories: {categories}")
243
244
# Get distinct values with filter
245
active_brands = collection.distinct('brand', {'status': 'active'})
246
print(f"Active brands: {active_brands}")
247
248
# Get distinct values for nested fields
249
distinct_countries = collection.distinct('supplier.address.country')
250
251
# Complex filtering
252
recent_tags = collection.distinct('tags', {
253
'created_date': {'$gte': datetime(2023, 1, 1)},
254
'status': {'$in': ['published', 'featured']}
255
})
256
```
257
258
### Document Counting
259
260
Count documents with various filtering and aggregation options.
261
262
```python { .api }
263
def count_documents(self, filter, **kwargs):
264
"""
265
Count documents matching filter criteria.
266
267
Parameters:
268
- filter: dict, query filter
269
- **kwargs: additional count options
270
271
Returns:
272
int: number of matching documents
273
"""
274
275
def estimated_document_count(self, **kwargs):
276
"""
277
Get estimated total document count.
278
279
Parameters:
280
- **kwargs: additional estimation options
281
282
Returns:
283
int: estimated document count
284
"""
285
```
286
287
**Usage Example:**
288
289
```python
290
collection = mongomock.MongoClient().db.users
291
292
# Count with filter
293
active_users = collection.count_documents({'status': 'active'})
294
premium_users = collection.count_documents({'subscription': 'premium'})
295
296
# Count with complex filter
297
recent_active = collection.count_documents({
298
'status': 'active',
299
'last_login': {'$gte': datetime.now() - timedelta(days=30)}
300
})
301
302
# Total document count
303
total_users = collection.estimated_document_count()
304
305
print(f"Total: {total_users}, Active: {active_users}, Premium: {premium_users}")
306
```
307
308
### Aggregation Operators
309
310
Support for MongoDB aggregation operators in pipeline stages.
311
312
**Arithmetic Operators:**
313
314
```python
315
# Mathematical operations
316
{'$add': ['$price', '$tax']}
317
{'$subtract': ['$revenue', '$cost']}
318
{'$multiply': ['$quantity', '$unit_price']}
319
{'$divide': ['$total_score', '$test_count']}
320
{'$mod': ['$value', 10]}
321
{'$pow': ['$base', '$exponent']}
322
```
323
324
**Comparison Operators:**
325
326
```python
327
# Comparison operations
328
{'$eq': ['$status', 'active']}
329
{'$ne': ['$type', 'deleted']}
330
{'$gt': ['$score', 80]}
331
{'$gte': ['$age', 18]}
332
{'$lt': ['$price', 100]}
333
{'$lte': ['$quantity', 0]}
334
```
335
336
**Logical Operators:**
337
338
```python
339
# Logical operations
340
{'$and': [{'$gt': ['$age', 18]}, {'$lt': ['$age', 65]}]}
341
{'$or': [{'$eq': ['$status', 'premium']}, {'$gt': ['$score', 90]}]}
342
{'$not': {'$eq': ['$deleted', True]}}
343
```
344
345
**String Operators:**
346
347
```python
348
# String operations
349
{'$concat': ['$first_name', ' ', '$last_name']}
350
{'$substr': ['$description', 0, 100]}
351
{'$toLower': '$email'}
352
{'$toUpper': '$code'}
353
{'$split': ['$tags', ',']}
354
```
355
356
**Array Operators:**
357
358
```python
359
# Array operations
360
{'$size': '$items'}
361
{'$push': '$category'}
362
{'$addToSet': '$tag'}
363
{'$in': ['premium', '$memberships']}
364
{'$slice': ['$recent_orders', 5]}
365
```
366
367
**Usage Example:**
368
369
```python
370
collection = mongomock.MongoClient().db.orders
371
372
# Complex aggregation with operators
373
pipeline = [
374
{'$addFields': {
375
'total_with_tax': {'$multiply': ['$subtotal', 1.08]},
376
'customer_name': {'$concat': ['$customer.first', ' ', '$customer.last']},
377
'order_month': {'$month': '$order_date'},
378
'item_count': {'$size': '$items'},
379
'is_large_order': {'$gte': ['$subtotal', 1000]}
380
}},
381
{'$match': {'is_large_order': True}},
382
{'$group': {
383
'_id': '$order_month',
384
'large_order_count': {'$sum': 1},
385
'avg_order_value': {'$avg': '$total_with_tax'},
386
'max_items': {'$max': '$item_count'}
387
}},
388
{'$sort': {'_id': 1}}
389
]
390
391
large_order_stats = list(collection.aggregate(pipeline))
392
```
393
394
### Aggregation Performance
395
396
Optimize aggregation pipeline performance through proper stage ordering and indexing.
397
398
**Pipeline Optimization:**
399
400
```python
401
# Efficient pipeline (filter early, reduce data flow)
402
efficient_pipeline = [
403
{'$match': {'status': 'active'}}, # Filter first
404
{'$match': {'date': {'$gte': recent_date}}}, # Additional filters early
405
{'$project': {'needed_field1': 1, 'needed_field2': 1}}, # Select only needed fields
406
{'$group': {'_id': '$category', 'count': {'$sum': 1}}},
407
{'$sort': {'count': -1}},
408
{'$limit': 10}
409
]
410
411
# Index support for aggregation
412
collection.create_index([('status', 1), ('date', 1)]) # Supports initial $match
413
```
414
415
**Usage Example:**
416
417
```python
418
collection = mongomock.MongoClient().db.large_dataset
419
420
# Create supporting indexes
421
collection.create_index('category')
422
collection.create_index([('status', 1), ('created_date', -1)])
423
424
# Optimized aggregation
425
optimized_results = list(collection.aggregate([
426
{'$match': {'status': 'published', 'created_date': {'$gte': recent_date}}},
427
{'$project': {'category': 1, 'views': 1, 'likes': 1}},
428
{'$group': {
429
'_id': '$category',
430
'total_views': {'$sum': '$views'},
431
'total_likes': {'$sum': '$likes'},
432
'article_count': {'$sum': 1}
433
}},
434
{'$addFields': {
435
'engagement_ratio': {'$divide': ['$total_likes', '$total_views']}
436
}},
437
{'$sort': {'engagement_ratio': -1}},
438
{'$limit': 5}
439
]))
440
```