0
# Aggregation Pipeline
1
2
MongoDB aggregation pipeline builder with stage methods, type safety, and execution options for complex data processing and analytics.
3
4
## Capabilities
5
6
### Pipeline Creation
7
8
Create aggregation pipelines with method chaining for complex data transformations.
9
10
```javascript { .api }
11
/**
12
* Create aggregation pipeline
13
* @param pipeline - Initial pipeline stages
14
* @returns Aggregate instance
15
*/
16
Model.aggregate<T>(pipeline?: PipelineStage[]): Aggregate<T[]>;
17
18
interface Aggregate<T> {
19
/**
20
* Add pipeline stages
21
* @param stages - Pipeline stages to append
22
* @returns this aggregate
23
*/
24
append(...stages: PipelineStage[]): this;
25
26
/**
27
* Prepend pipeline stages
28
* @param stages - Pipeline stages to prepend
29
* @returns this aggregate
30
*/
31
prepend(...stages: PipelineStage[]): this;
32
33
/**
34
* Get current pipeline
35
* @returns Array of pipeline stages
36
*/
37
pipeline(): PipelineStage[];
38
}
39
```
40
41
**Usage Examples:**
42
43
```javascript
44
const User = mongoose.model('User', userSchema);
45
46
// Basic aggregation
47
const results = await User.aggregate([
48
{ $match: { status: 'active' } },
49
{ $group: { _id: '$department', count: { $sum: 1 } } },
50
{ $sort: { count: -1 } }
51
]);
52
53
// Method chaining
54
const pipeline = User.aggregate()
55
.match({ status: 'active' })
56
.group({ _id: '$department', count: { $sum: 1 } })
57
.sort({ count: -1 });
58
59
const results = await pipeline.exec();
60
```
61
62
### Filtering and Matching
63
64
Filter documents at various stages of the pipeline.
65
66
```javascript { .api }
67
interface Aggregate<T> {
68
/**
69
* Filter documents (equivalent to find conditions)
70
* @param conditions - Match conditions
71
* @returns this aggregate
72
*/
73
match(conditions: FilterQuery<any>): this;
74
75
/**
76
* Sample random documents
77
* @param size - Number of documents to sample
78
* @returns this aggregate
79
*/
80
sample(size: number): this;
81
82
/**
83
* Limit number of documents
84
* @param num - Maximum number of documents
85
* @returns this aggregate
86
*/
87
limit(num: number): this;
88
89
/**
90
* Skip number of documents
91
* @param num - Number of documents to skip
92
* @returns this aggregate
93
*/
94
skip(num: number): this;
95
}
96
```
97
98
**Usage Examples:**
99
100
```javascript
101
// Filter active users
102
const activeUsers = await User.aggregate()
103
.match({ status: 'active', age: { $gte: 18 } })
104
.exec();
105
106
// Sample random documents
107
const randomUsers = await User.aggregate()
108
.sample(10)
109
.exec();
110
111
// Pagination in aggregation
112
const page2Users = await User.aggregate()
113
.match({ status: 'active' })
114
.sort({ createdAt: -1 })
115
.skip(10)
116
.limit(10)
117
.exec();
118
```
119
120
### Grouping and Aggregation
121
122
Group documents and perform calculations on grouped data.
123
124
```javascript { .api }
125
interface Aggregate<T> {
126
/**
127
* Group documents by specified fields
128
* @param arg - Group specification object
129
* @returns this aggregate
130
*/
131
group(arg: any): this;
132
133
/**
134
* Sort documents by count (shorthand for group + sort)
135
* @param arg - Field to sort by count
136
* @returns this aggregate
137
*/
138
sortByCount(arg: any): this;
139
140
/**
141
* Count documents in pipeline
142
* @param field - Field name for count result
143
* @returns this aggregate
144
*/
145
count(field: string): this;
146
147
/**
148
* Categorize documents into buckets
149
* @param options - Bucket configuration
150
* @returns this aggregate
151
*/
152
bucket(options: BucketOptions): this;
153
154
/**
155
* Automatically categorize documents into buckets
156
* @param options - Auto bucket configuration
157
* @returns this aggregate
158
*/
159
bucketAuto(options: BucketAutoOptions): this;
160
}
161
162
interface BucketOptions {
163
/** Field to bucket by */
164
groupBy: any;
165
166
/** Bucket boundaries */
167
boundaries: any[];
168
169
/** Default bucket for out-of-range values */
170
default?: any;
171
172
/** Output specification */
173
output?: any;
174
}
175
176
interface BucketAutoOptions {
177
/** Field to bucket by */
178
groupBy: any;
179
180
/** Number of buckets */
181
buckets: number;
182
183
/** Output specification */
184
output?: any;
185
186
/** Granularity for bucket boundaries */
187
granularity?: string;
188
}
189
```
190
191
**Usage Examples:**
192
193
```javascript
194
// Group by department with statistics
195
const departmentStats = await User.aggregate()
196
.match({ status: 'active' })
197
.group({
198
_id: '$department',
199
count: { $sum: 1 },
200
avgAge: { $avg: '$age' },
201
maxSalary: { $max: '$salary' },
202
minSalary: { $min: '$salary' },
203
employees: { $push: '$name' }
204
})
205
.sort({ count: -1 })
206
.exec();
207
208
// Sort by count (shorthand)
209
const popularDepartments = await User.aggregate()
210
.sortByCount('$department')
211
.exec();
212
213
// Count total documents
214
const totalCount = await User.aggregate()
215
.match({ status: 'active' })
216
.count('totalUsers')
217
.exec();
218
219
// Age buckets
220
const ageBuckets = await User.aggregate()
221
.bucket({
222
groupBy: '$age',
223
boundaries: [0, 25, 35, 50, 100],
224
default: 'Other',
225
output: {
226
count: { $sum: 1 },
227
avgSalary: { $avg: '$salary' }
228
}
229
})
230
.exec();
231
232
// Auto buckets for salary ranges
233
const salaryBuckets = await User.aggregate()
234
.bucketAuto({
235
groupBy: '$salary',
236
buckets: 5,
237
output: {
238
count: { $sum: 1 },
239
avgAge: { $avg: '$age' }
240
}
241
})
242
.exec();
243
```
244
245
### Data Transformation
246
247
Transform and reshape documents in the pipeline.
248
249
```javascript { .api }
250
interface Aggregate<T> {
251
/**
252
* Select/transform fields (like SELECT in SQL)
253
* @param arg - Projection specification
254
* @returns this aggregate
255
*/
256
project(arg: any): this;
257
258
/**
259
* Add computed fields to documents
260
* @param arg - Fields to add
261
* @returns this aggregate
262
*/
263
addFields(arg: any): this;
264
265
/**
266
* Replace document root with specified field
267
* @param newRoot - New root specification
268
* @returns this aggregate
269
*/
270
replaceRoot(newRoot: any): this;
271
272
/**
273
* Deconstruct array fields into separate documents
274
* @param path - Array field path or unwind specification
275
* @returns this aggregate
276
*/
277
unwind(path: string | UnwindOptions): this;
278
279
/**
280
* Sort documents
281
* @param arg - Sort specification
282
* @returns this aggregate
283
*/
284
sort(arg: any): this;
285
}
286
287
interface UnwindOptions {
288
/** Path to array field */
289
path: string;
290
291
/** Field to store array index */
292
includeArrayIndex?: string;
293
294
/** Preserve documents with null/missing arrays */
295
preserveNullAndEmptyArrays?: boolean;
296
}
297
```
298
299
**Usage Examples:**
300
301
```javascript
302
// Project specific fields with calculations
303
const userSummary = await User.aggregate()
304
.project({
305
name: 1,
306
email: 1,
307
age: 1,
308
isAdult: { $gte: ['$age', 18] },
309
fullName: { $concat: ['$firstName', ' ', '$lastName'] },
310
yearBorn: { $subtract: [new Date().getFullYear(), '$age'] }
311
})
312
.exec();
313
314
// Add computed fields
315
const enrichedUsers = await User.aggregate()
316
.addFields({
317
isAdult: { $gte: ['$age', 18] },
318
fullName: { $concat: ['$firstName', ' ', '$lastName'] },
319
salaryGrade: {
320
$switch: {
321
branches: [
322
{ case: { $lt: ['$salary', 30000] }, then: 'Junior' },
323
{ case: { $lt: ['$salary', 60000] }, then: 'Mid' }
324
],
325
default: 'Senior'
326
}
327
}
328
})
329
.exec();
330
331
// Unwind array fields
332
const userSkills = await User.aggregate()
333
.unwind('$skills')
334
.group({
335
_id: '$skills',
336
users: { $addToSet: '$name' },
337
count: { $sum: 1 }
338
})
339
.sort({ count: -1 })
340
.exec();
341
342
// Unwind with options
343
const postComments = await Post.aggregate()
344
.unwind({
345
path: '$comments',
346
includeArrayIndex: 'commentIndex',
347
preserveNullAndEmptyArrays: true
348
})
349
.exec();
350
351
// Replace root to flatten nested objects
352
const profiles = await User.aggregate()
353
.replaceRoot({ $mergeObjects: ['$profile', { userId: '$_id' }] })
354
.exec();
355
```
356
357
### Joins and Lookups
358
359
Join data from multiple collections using lookup operations.
360
361
```javascript { .api }
362
interface Aggregate<T> {
363
/**
364
* Join with another collection (like JOIN in SQL)
365
* @param options - Lookup configuration
366
* @returns this aggregate
367
*/
368
lookup(options: LookupOptions): this;
369
370
/**
371
* Combine with another collection
372
* @param options - Union options
373
* @returns this aggregate
374
*/
375
unionWith(options: UnionWithOptions): this;
376
}
377
378
interface LookupOptions {
379
/** Collection to join with */
380
from: string;
381
382
/** Local field for join */
383
localField: string;
384
385
/** Foreign field for join */
386
foreignField: string;
387
388
/** Output array field name */
389
as: string;
390
391
/** Pipeline to run on joined collection (advanced) */
392
pipeline?: PipelineStage[];
393
394
/** Let variables for pipeline */
395
let?: any;
396
}
397
398
interface UnionWithOptions {
399
/** Collection to union with */
400
coll: string;
401
402
/** Pipeline to run on union collection */
403
pipeline?: PipelineStage[];
404
}
405
```
406
407
**Usage Examples:**
408
409
```javascript
410
// Basic lookup (join)
411
const usersWithPosts = await User.aggregate()
412
.lookup({
413
from: 'posts',
414
localField: '_id',
415
foreignField: 'author',
416
as: 'posts'
417
})
418
.addFields({
419
postCount: { $size: '$posts' }
420
})
421
.sort({ postCount: -1 })
422
.exec();
423
424
// Advanced lookup with pipeline
425
const usersWithRecentPosts = await User.aggregate()
426
.lookup({
427
from: 'posts',
428
let: { userId: '$_id' },
429
pipeline: [
430
{ $match: {
431
$expr: { $eq: ['$author', '$$userId'] },
432
createdAt: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) }
433
}},
434
{ $project: { title: 1, createdAt: 1 } },
435
{ $sort: { createdAt: -1 } },
436
{ $limit: 5 }
437
],
438
as: 'recentPosts'
439
})
440
.exec();
441
442
// Multiple lookups
443
const completeUserData = await User.aggregate()
444
.lookup({
445
from: 'posts',
446
localField: '_id',
447
foreignField: 'author',
448
as: 'posts'
449
})
450
.lookup({
451
from: 'comments',
452
localField: '_id',
453
foreignField: 'author',
454
as: 'comments'
455
})
456
.addFields({
457
totalActivity: { $add: [{ $size: '$posts' }, { $size: '$comments' }] }
458
})
459
.exec();
460
461
// Union with another collection
462
const allContent = await Post.aggregate()
463
.unionWith({
464
coll: 'pages',
465
pipeline: [
466
{ $match: { published: true } },
467
{ $addFields: { type: 'page' } }
468
]
469
})
470
.addFields({ type: { $ifNull: ['$type', 'post'] } })
471
.sort({ createdAt: -1 })
472
.exec();
473
```
474
475
### Advanced Operations
476
477
Advanced aggregation operations for complex data processing.
478
479
```javascript { .api }
480
interface Aggregate<T> {
481
/**
482
* Multi-faceted aggregation (multiple pipelines)
483
* @param facets - Object with named pipeline facets
484
* @returns this aggregate
485
*/
486
facet(facets: { [key: string]: PipelineStage[] }): this;
487
488
/**
489
* Conditionally exclude documents from output
490
* @param expression - Redaction expression
491
* @returns this aggregate
492
*/
493
redact(expression: any): this;
494
495
/**
496
* Output results to a collection
497
* @param collection - Target collection name
498
* @returns this aggregate
499
*/
500
out(collection: string): this;
501
502
/**
503
* Merge results into a collection
504
* @param options - Merge options
505
* @returns this aggregate
506
*/
507
merge(options: MergeOptions): this;
508
509
/**
510
* Create search index or perform search
511
* @param options - Search options
512
* @returns this aggregate
513
*/
514
search(options: any): this;
515
}
516
517
interface MergeOptions {
518
/** Target collection */
519
into: string;
520
521
/** When documents match */
522
whenMatched?: 'replace' | 'keepExisting' | 'merge' | 'fail' | PipelineStage[];
523
524
/** When documents don't match */
525
whenNotMatched?: 'insert' | 'discard' | 'fail';
526
527
/** Fields to match on */
528
on?: string | string[];
529
}
530
```
531
532
**Usage Examples:**
533
534
```javascript
535
// Multi-faceted aggregation
536
const analytics = await User.aggregate()
537
.facet({
538
ageStats: [
539
{ $group: { _id: null, avgAge: { $avg: '$age' }, maxAge: { $max: '$age' } } }
540
],
541
departmentCounts: [
542
{ $group: { _id: '$department', count: { $sum: 1 } } },
543
{ $sort: { count: -1 } }
544
],
545
salaryBuckets: [
546
{ $bucket: {
547
groupBy: '$salary',
548
boundaries: [0, 30000, 60000, 100000, Infinity],
549
default: 'Other',
550
output: { count: { $sum: 1 } }
551
}}
552
]
553
})
554
.exec();
555
556
// Redact sensitive information
557
const publicUsers = await User.aggregate()
558
.redact({
559
$cond: {
560
if: { $eq: ['$privacy', 'public'] },
561
then: '$$KEEP',
562
else: '$$PRUNE'
563
}
564
})
565
.exec();
566
567
// Output to collection
568
await User.aggregate()
569
.match({ status: 'active' })
570
.group({
571
_id: '$department',
572
count: { $sum: 1 },
573
avgSalary: { $avg: '$salary' }
574
})
575
.out('departmentStats')
576
.exec();
577
578
// Merge results
579
await User.aggregate()
580
.match({ lastLogin: { $gte: new Date(Date.now() - 24 * 60 * 60 * 1000) } })
581
.group({
582
_id: { $dateToString: { format: '%Y-%m-%d', date: '$lastLogin' } },
583
dailyActiveUsers: { $sum: 1 }
584
})
585
.merge({
586
into: 'userMetrics',
587
whenMatched: 'replace',
588
whenNotMatched: 'insert'
589
})
590
.exec();
591
```
592
593
### Execution and Options
594
595
Execute aggregation pipelines with various options and result formats.
596
597
```javascript { .api }
598
interface Aggregate<T> {
599
/**
600
* Execute the aggregation pipeline
601
* @returns Promise resolving to aggregation results
602
*/
603
exec(): Promise<T>;
604
605
/**
606
* Execute and return a cursor for streaming results
607
* @param options - Cursor options
608
* @returns Aggregation cursor
609
*/
610
cursor(options?: AggregateCursorOptions): AggregateCursor<T>;
611
612
/**
613
* Get aggregation execution plan
614
* @returns Promise resolving to execution plan
615
*/
616
explain(): Promise<any>;
617
618
/**
619
* Allow disk usage for large aggregations
620
* @param val - Enable disk usage
621
* @returns this aggregate
622
*/
623
allowDiskUse(val: boolean): this;
624
625
/**
626
* Set collation for string operations
627
* @param collation - Collation options
628
* @returns this aggregate
629
*/
630
collation(collation: CollationOptions): this;
631
632
/**
633
* Use database session
634
* @param session - MongoDB session
635
* @returns this aggregate
636
*/
637
session(session: ClientSession): this;
638
639
/**
640
* Set aggregation options
641
* @param options - Aggregation options
642
* @returns this aggregate
643
*/
644
option(options: AggregateOptions): this;
645
}
646
647
interface AggregateCursorOptions {
648
/** Batch size for cursor */
649
batchSize?: number;
650
651
/** Use session */
652
session?: ClientSession;
653
}
654
655
interface AggregateOptions {
656
/** Allow disk usage */
657
allowDiskUse?: boolean;
658
659
/** Cursor batch size */
660
batchSize?: number;
661
662
/** Maximum execution time */
663
maxTimeMS?: number;
664
665
/** Collation */
666
collation?: CollationOptions;
667
668
/** Read preference */
669
readPreference?: string;
670
671
/** Comment for profiling */
672
comment?: string;
673
674
/** Hint for index usage */
675
hint?: any;
676
}
677
```
678
679
**Usage Examples:**
680
681
```javascript
682
// Basic execution
683
const results = await User.aggregate()
684
.match({ status: 'active' })
685
.group({ _id: '$department', count: { $sum: 1 } })
686
.exec();
687
688
// Streaming large results
689
const cursor = User.aggregate()
690
.match({ createdAt: { $gte: new Date('2023-01-01') } })
691
.cursor({ batchSize: 100 });
692
693
for (let doc = await cursor.next(); doc != null; doc = await cursor.next()) {
694
console.log('Processing:', doc);
695
}
696
697
// With options for performance
698
const results = await User.aggregate()
699
.match({ status: 'active' })
700
.group({ _id: '$department', count: { $sum: 1 } })
701
.allowDiskUse(true)
702
.collation({ locale: 'en', strength: 2 })
703
.option({
704
maxTimeMS: 30000,
705
comment: 'Department statistics query'
706
})
707
.exec();
708
709
// Execution plan analysis
710
const plan = await User.aggregate()
711
.match({ age: { $gte: 18 } })
712
.group({ _id: '$status', count: { $sum: 1 } })
713
.explain();
714
715
console.log('Aggregation execution plan:', plan);
716
```
717
718
## Types
719
720
```javascript { .api }
721
type PipelineStage =
722
| { $match: any }
723
| { $group: any }
724
| { $project: any }
725
| { $addFields: any }
726
| { $sort: any }
727
| { $limit: number }
728
| { $skip: number }
729
| { $unwind: string | UnwindOptions }
730
| { $lookup: LookupOptions }
731
| { $sample: { size: number } }
732
| { $sortByCount: any }
733
| { $count: string }
734
| { $bucket: BucketOptions }
735
| { $bucketAuto: BucketAutoOptions }
736
| { $facet: { [key: string]: PipelineStage[] } }
737
| { $replaceRoot: any }
738
| { $redact: any }
739
| { $out: string }
740
| { $merge: MergeOptions }
741
| { [key: string]: any };
742
743
interface AggregateCursor<T> extends NodeJS.ReadableStream {
744
/** Get next document */
745
next(): Promise<T | null>;
746
747
/** Close cursor */
748
close(): Promise<void>;
749
750
/** Check if closed */
751
closed: boolean;
752
753
/** Transform documents */
754
map<U>(fn: (doc: T) => U): AggregateCursor<U>;
755
756
/** Event listeners */
757
on(event: 'data' | 'error' | 'end', listener: Function): this;
758
}
759
760
interface CollationOptions {
761
locale: string;
762
caseLevel?: boolean;
763
caseFirst?: string;
764
strength?: number;
765
numericOrdering?: boolean;
766
alternate?: string;
767
maxVariable?: string;
768
backwards?: boolean;
769
}
770
```