0
# Advanced Queries and Aggregation
1
2
Aggregation pipelines, advanced querying, sorting, pagination, and cursor operations for complex data retrieval patterns.
3
4
## Capabilities
5
6
### Aggregation Pipelines
7
8
Execute complex data processing pipelines with multiple stages.
9
10
```python { .api }
11
class Collection:
12
def aggregate(self, pipeline, session=None, **kwargs):
13
"""
14
Execute aggregation pipeline.
15
16
Parameters:
17
- pipeline: list of aggregation stages
18
- allowDiskUse: enable disk usage for large operations
19
- maxTimeMS: maximum execution time
20
- batchSize: cursor batch size
21
- collation: collation options
22
- hint: index hint
23
- session: optional ClientSession
24
25
Returns:
26
CommandCursor: Results cursor
27
"""
28
29
def aggregate_raw_batches(self, pipeline, **kwargs):
30
"""
31
Execute aggregation returning raw BSON batches.
32
33
Parameters:
34
- pipeline: list of aggregation stages
35
- kwargs: same as aggregate()
36
37
Returns:
38
RawBSONDocument batches
39
"""
40
41
def map_reduce(
42
self,
43
map,
44
reduce,
45
out,
46
full_response=False,
47
session=None,
48
**kwargs
49
):
50
"""
51
Execute map-reduce operation (deprecated - use aggregation).
52
53
Parameters:
54
- map: JavaScript map function
55
- reduce: JavaScript reduce function
56
- out: output collection specification
57
- full_response: return full response
58
- query: optional query filter
59
- sort: optional sort specification
60
- limit: optional limit
61
- finalize: optional finalize function
62
- scope: optional JavaScript scope
63
- session: optional ClientSession
64
65
Returns:
66
MapReduce results or Collection
67
"""
68
```
69
70
### Index Management
71
72
Create, manage, and optimize database indexes for query performance.
73
74
```python { .api }
75
class Collection:
76
def create_index(self, keys, session=None, **kwargs):
77
"""
78
Create an index.
79
80
Parameters:
81
- keys: index specification (field name or list of tuples)
82
- unique: create unique index
83
- background: build index in background (deprecated)
84
- sparse: create sparse index
85
- expireAfterSeconds: TTL for documents
86
- partialFilterExpression: partial index filter
87
- collation: collation options
88
- session: optional ClientSession
89
90
Returns:
91
str: Index name
92
"""
93
94
def create_indexes(self, indexes, session=None, **kwargs):
95
"""
96
Create multiple indexes.
97
98
Parameters:
99
- indexes: list of IndexModel instances
100
- session: optional ClientSession
101
102
Returns:
103
list: Created index names
104
"""
105
106
def drop_index(self, index_or_name, session=None, **kwargs):
107
"""
108
Drop an index.
109
110
Parameters:
111
- index_or_name: index name or specification
112
- session: optional ClientSession
113
"""
114
115
def drop_indexes(self, session=None, **kwargs):
116
"""
117
Drop all indexes except _id.
118
119
Parameters:
120
- session: optional ClientSession
121
"""
122
123
def list_indexes(self, session=None):
124
"""
125
List collection indexes.
126
127
Parameters:
128
- session: optional ClientSession
129
130
Returns:
131
CommandCursor: Index information
132
"""
133
134
def index_information(self, session=None):
135
"""
136
Get index information as dictionary.
137
138
Parameters:
139
- session: optional ClientSession
140
141
Returns:
142
dict: Index information mapping
143
"""
144
145
def reindex(self, session=None, **kwargs):
146
"""
147
Rebuild all indexes.
148
149
Parameters:
150
- session: optional ClientSession
151
"""
152
```
153
154
### Text Search
155
156
Full-text search capabilities with text indexes.
157
158
```python { .api }
159
class Collection:
160
def find(self, filter=None, **kwargs):
161
"""
162
Find with text search support.
163
164
Text search parameters:
165
- filter: can include {"$text": {"$search": "search terms"}}
166
- projection: can include text score with {"score": {"$meta": "textScore"}}
167
- sort: can sort by text score with [("score", {"$meta": "textScore"})]
168
169
Returns:
170
Cursor: Query results
171
"""
172
```
173
174
### Geospatial Queries
175
176
Query documents based on geospatial data and proximity.
177
178
```python { .api }
179
class Collection:
180
def find(self, filter=None, **kwargs):
181
"""
182
Find with geospatial query support.
183
184
Geospatial operators in filter:
185
- $near: find near a point
186
- $nearSphere: spherical near query
187
- $geoWithin: find within geometry
188
- $geoIntersects: find intersecting geometry
189
- $geometry: GeoJSON geometry specification
190
191
Returns:
192
Cursor: Query results
193
"""
194
195
def create_index(self, keys, **kwargs):
196
"""
197
Create geospatial indexes.
198
199
Geospatial index types:
200
- "2d": legacy 2D index
201
- "2dsphere": spherical geometry index
202
- "geoHaystack": haystack index (deprecated)
203
204
Returns:
205
str: Index name
206
"""
207
```
208
209
### Advanced Query Operations
210
211
Complex query patterns and specialized operations.
212
213
```python { .api }
214
class Collection:
215
def find_raw_batches(self, filter=None, projection=None, **kwargs):
216
"""
217
Find returning raw BSON batches.
218
219
Parameters:
220
- filter: query criteria
221
- projection: fields to return
222
- kwargs: same as find()
223
224
Returns:
225
RawBSONDocument batches
226
"""
227
228
def parallel_scan(self, num_cursors, session=None, **kwargs):
229
"""
230
Scan collection in parallel.
231
232
Parameters:
233
- num_cursors: number of parallel cursors
234
- session: optional ClientSession
235
236
Returns:
237
list: List of CommandCursor instances
238
"""
239
240
def options(self):
241
"""
242
Get collection options.
243
244
Returns:
245
dict: Collection options
246
"""
247
248
def rename(self, new_name, session=None, **kwargs):
249
"""
250
Rename collection.
251
252
Parameters:
253
- new_name: new collection name
254
- dropTarget: drop target if exists
255
- session: optional ClientSession
256
"""
257
```
258
259
### Cursor Advanced Operations
260
261
Advanced cursor manipulation and optimization.
262
263
```python { .api }
264
class Cursor:
265
def hint(self, index):
266
"""
267
Force use of specific index.
268
269
Parameters:
270
- index: index name or specification
271
272
Returns:
273
Cursor: Modified cursor (chainable)
274
"""
275
276
def max_time_ms(self, max_time_ms):
277
"""
278
Set maximum execution time.
279
280
Parameters:
281
- max_time_ms: maximum time in milliseconds
282
283
Returns:
284
Cursor: Modified cursor (chainable)
285
"""
286
287
def max_scan(self, max_scan):
288
"""
289
Set maximum documents to scan (deprecated).
290
291
Parameters:
292
- max_scan: maximum documents to examine
293
294
Returns:
295
Cursor: Modified cursor (chainable)
296
"""
297
298
def min(self, spec):
299
"""
300
Set minimum index bounds.
301
302
Parameters:
303
- spec: minimum bound specification
304
305
Returns:
306
Cursor: Modified cursor (chainable)
307
"""
308
309
def max(self, spec):
310
"""
311
Set maximum index bounds.
312
313
Parameters:
314
- spec: maximum bound specification
315
316
Returns:
317
Cursor: Modified cursor (chainable)
318
"""
319
320
def comment(self, comment):
321
"""
322
Add comment to query for profiling.
323
324
Parameters:
325
- comment: query comment
326
327
Returns:
328
Cursor: Modified cursor (chainable)
329
"""
330
331
def collation(self, collation):
332
"""
333
Set collation for string comparison.
334
335
Parameters:
336
- collation: collation specification
337
338
Returns:
339
Cursor: Modified cursor (chainable)
340
"""
341
342
def allow_disk_use(self, allow_disk_use):
343
"""
344
Allow disk usage for large sorts.
345
346
Parameters:
347
- allow_disk_use: enable disk usage
348
349
Returns:
350
Cursor: Modified cursor (chainable)
351
"""
352
353
def explain(self, verbosity='queryPlanner'):
354
"""
355
Get query execution plan.
356
357
Parameters:
358
- verbosity: explanation verbosity level
359
360
Returns:
361
dict: Query execution plan
362
"""
363
```
364
365
## Usage Examples
366
367
### Aggregation Pipeline
368
369
```python
370
from pymongo import MongoClient
371
372
client = MongoClient()
373
db = client.sales
374
collection = db.orders
375
376
# Group by category and calculate totals
377
pipeline = [
378
{"$match": {"date": {"$gte": "2023-01-01"}}},
379
{"$group": {
380
"_id": "$category",
381
"total_sales": {"$sum": "$amount"},
382
"order_count": {"$sum": 1},
383
"avg_order": {"$avg": "$amount"}
384
}},
385
{"$sort": {"total_sales": -1}},
386
{"$limit": 10}
387
]
388
389
results = collection.aggregate(pipeline)
390
for doc in results:
391
print(f"Category: {doc['_id']}, Sales: ${doc['total_sales']:.2f}")
392
393
# Complex pipeline with multiple stages
394
pipeline = [
395
{"$unwind": "$items"},
396
{"$lookup": {
397
"from": "products",
398
"localField": "items.product_id",
399
"foreignField": "_id",
400
"as": "product_info"
401
}},
402
{"$project": {
403
"customer": 1,
404
"item_total": {"$multiply": ["$items.quantity", "$items.price"]},
405
"product_name": {"$arrayElemAt": ["$product_info.name", 0]}
406
}},
407
{"$group": {
408
"_id": "$customer",
409
"total_spent": {"$sum": "$item_total"},
410
"products": {"$addToSet": "$product_name"}
411
}}
412
]
413
414
customer_analysis = collection.aggregate(pipeline, allowDiskUse=True)
415
```
416
417
### Index Management
418
419
```python
420
from pymongo import ASCENDING, DESCENDING, GEO2D, TEXT
421
422
# Create simple index
423
collection.create_index("email", unique=True)
424
425
# Create compound index
426
collection.create_index([
427
("category", ASCENDING),
428
("price", DESCENDING)
429
])
430
431
# Create text index for search
432
collection.create_index([
433
("title", TEXT),
434
("description", TEXT)
435
], default_language='english')
436
437
# Create geospatial index
438
collection.create_index([("location", GEO2D)])
439
440
# Create TTL index for expiration
441
collection.create_index("expire_at", expireAfterSeconds=3600)
442
443
# Create partial index
444
collection.create_index(
445
"email",
446
partialFilterExpression={"email": {"$exists": True}}
447
)
448
449
# List all indexes
450
for index in collection.list_indexes():
451
print(f"Index: {index['name']}, Keys: {index['key']}")
452
```
453
454
### Text Search
455
456
```python
457
# Create text index
458
collection.create_index([("title", TEXT), ("content", TEXT)])
459
460
# Search for documents
461
results = collection.find(
462
{"$text": {"$search": "python mongodb"}},
463
{"score": {"$meta": "textScore"}}
464
).sort([("score", {"$meta": "textScore"})])
465
466
for doc in results:
467
print(f"Title: {doc['title']}, Score: {doc['score']}")
468
469
# Advanced text search
470
results = collection.find({
471
"$text": {
472
"$search": "\"exact phrase\" -excluded +required",
473
"$language": "english",
474
"$caseSensitive": False
475
}
476
})
477
```
478
479
### Geospatial Queries
480
481
```python
482
# Create 2dsphere index for GeoJSON
483
collection.create_index([("location", "2dsphere")])
484
485
# Find nearby locations
486
nearby = collection.find({
487
"location": {
488
"$near": {
489
"$geometry": {
490
"type": "Point",
491
"coordinates": [-73.9857, 40.7484] # NYC coordinates
492
},
493
"$maxDistance": 1000 # meters
494
}
495
}
496
})
497
498
# Find within a polygon
499
within_area = collection.find({
500
"location": {
501
"$geoWithin": {
502
"$geometry": {
503
"type": "Polygon",
504
"coordinates": [[
505
[-74.0, 40.7], [-73.9, 40.7],
506
[-73.9, 40.8], [-74.0, 40.8],
507
[-74.0, 40.7]
508
]]
509
}
510
}
511
}
512
})
513
```
514
515
### Advanced Cursor Operations
516
517
```python
518
# Use query hints and optimization
519
cursor = collection.find({"category": "electronics"}) \
520
.hint("category_1_price_-1") \
521
.max_time_ms(5000) \
522
.comment("Product search query") \
523
.allow_disk_use(True)
524
525
# Set collation for string comparison
526
cursor = collection.find({"name": {"$regex": "^a"}}) \
527
.collation({
528
"locale": "en",
529
"strength": 1, # Case insensitive
530
"caseLevel": False
531
})
532
533
# Get query execution plan
534
plan = collection.find({"price": {"$gt": 100}}).explain()
535
print(f"Query plan: {plan['queryPlanner']['winningPlan']}")
536
```