or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregation.mdconnection.mddocument.mderrors.mdindex.mdmodel.mdquery.mdschema.mdutilities.md

aggregation.mddocs/

0

# Aggregation Pipeline

1

2

MongoDB aggregation pipeline builder with stage methods, type safety, and execution options for complex data processing and analytics.

3

4

## Capabilities

5

6

### Pipeline Creation

7

8

Create aggregation pipelines with method chaining for complex data transformations.

9

10

```javascript { .api }

11

/**

12

* Create aggregation pipeline

13

* @param pipeline - Initial pipeline stages

14

* @returns Aggregate instance

15

*/

16

Model.aggregate<T>(pipeline?: PipelineStage[]): Aggregate<T[]>;

17

18

interface Aggregate<T> {

19

/**

20

* Add pipeline stages

21

* @param stages - Pipeline stages to append

22

* @returns this aggregate

23

*/

24

append(...stages: PipelineStage[]): this;

25

26

/**

27

* Prepend pipeline stages

28

* @param stages - Pipeline stages to prepend

29

* @returns this aggregate

30

*/

31

prepend(...stages: PipelineStage[]): this;

32

33

/**

34

* Get current pipeline

35

* @returns Array of pipeline stages

36

*/

37

pipeline(): PipelineStage[];

38

}

39

```

40

41

**Usage Examples:**

42

43

```javascript

44

const User = mongoose.model('User', userSchema);

45

46

// Basic aggregation

47

const results = await User.aggregate([

48

{ $match: { status: 'active' } },

49

{ $group: { _id: '$department', count: { $sum: 1 } } },

50

{ $sort: { count: -1 } }

51

]);

52

53

// Method chaining

54

const pipeline = User.aggregate()

55

.match({ status: 'active' })

56

.group({ _id: '$department', count: { $sum: 1 } })

57

.sort({ count: -1 });

58

59

const results = await pipeline.exec();

60

```

61

62

### Filtering and Matching

63

64

Filter documents at various stages of the pipeline.

65

66

```javascript { .api }

67

interface Aggregate<T> {

68

/**

69

* Filter documents (equivalent to find conditions)

70

* @param conditions - Match conditions

71

* @returns this aggregate

72

*/

73

match(conditions: FilterQuery<any>): this;

74

75

/**

76

* Sample random documents

77

* @param size - Number of documents to sample

78

* @returns this aggregate

79

*/

80

sample(size: number): this;

81

82

/**

83

* Limit number of documents

84

* @param num - Maximum number of documents

85

* @returns this aggregate

86

*/

87

limit(num: number): this;

88

89

/**

90

* Skip number of documents

91

* @param num - Number of documents to skip

92

* @returns this aggregate

93

*/

94

skip(num: number): this;

95

}

96

```

97

98

**Usage Examples:**

99

100

```javascript

101

// Filter active users

102

const activeUsers = await User.aggregate()

103

.match({ status: 'active', age: { $gte: 18 } })

104

.exec();

105

106

// Sample random documents

107

const randomUsers = await User.aggregate()

108

.sample(10)

109

.exec();

110

111

// Pagination in aggregation

112

const page2Users = await User.aggregate()

113

.match({ status: 'active' })

114

.sort({ createdAt: -1 })

115

.skip(10)

116

.limit(10)

117

.exec();

118

```

119

120

### Grouping and Aggregation

121

122

Group documents and perform calculations on grouped data.

123

124

```javascript { .api }

125

interface Aggregate<T> {

126

/**

127

* Group documents by specified fields

128

* @param arg - Group specification object

129

* @returns this aggregate

130

*/

131

group(arg: any): this;

132

133

/**

134

* Sort documents by count (shorthand for group + sort)

135

* @param arg - Field to sort by count

136

* @returns this aggregate

137

*/

138

sortByCount(arg: any): this;

139

140

/**

141

* Count documents in pipeline

142

* @param field - Field name for count result

143

* @returns this aggregate

144

*/

145

count(field: string): this;

146

147

/**

148

* Categorize documents into buckets

149

* @param options - Bucket configuration

150

* @returns this aggregate

151

*/

152

bucket(options: BucketOptions): this;

153

154

/**

155

* Automatically categorize documents into buckets

156

* @param options - Auto bucket configuration

157

* @returns this aggregate

158

*/

159

bucketAuto(options: BucketAutoOptions): this;

160

}

161

162

interface BucketOptions {

163

/** Field to bucket by */

164

groupBy: any;

165

166

/** Bucket boundaries */

167

boundaries: any[];

168

169

/** Default bucket for out-of-range values */

170

default?: any;

171

172

/** Output specification */

173

output?: any;

174

}

175

176

interface BucketAutoOptions {

177

/** Field to bucket by */

178

groupBy: any;

179

180

/** Number of buckets */

181

buckets: number;

182

183

/** Output specification */

184

output?: any;

185

186

/** Granularity for bucket boundaries */

187

granularity?: string;

188

}

189

```

190

191

**Usage Examples:**

192

193

```javascript

194

// Group by department with statistics

195

const departmentStats = await User.aggregate()

196

.match({ status: 'active' })

197

.group({

198

_id: '$department',

199

count: { $sum: 1 },

200

avgAge: { $avg: '$age' },

201

maxSalary: { $max: '$salary' },

202

minSalary: { $min: '$salary' },

203

employees: { $push: '$name' }

204

})

205

.sort({ count: -1 })

206

.exec();

207

208

// Sort by count (shorthand)

209

const popularDepartments = await User.aggregate()

210

.sortByCount('$department')

211

.exec();

212

213

// Count total documents

214

const totalCount = await User.aggregate()

215

.match({ status: 'active' })

216

.count('totalUsers')

217

.exec();

218

219

// Age buckets

220

const ageBuckets = await User.aggregate()

221

.bucket({

222

groupBy: '$age',

223

boundaries: [0, 25, 35, 50, 100],

224

default: 'Other',

225

output: {

226

count: { $sum: 1 },

227

avgSalary: { $avg: '$salary' }

228

}

229

})

230

.exec();

231

232

// Auto buckets for salary ranges

233

const salaryBuckets = await User.aggregate()

234

.bucketAuto({

235

groupBy: '$salary',

236

buckets: 5,

237

output: {

238

count: { $sum: 1 },

239

avgAge: { $avg: '$age' }

240

}

241

})

242

.exec();

243

```

244

245

### Data Transformation

246

247

Transform and reshape documents in the pipeline.

248

249

```javascript { .api }

250

interface Aggregate<T> {

251

/**

252

* Select/transform fields (like SELECT in SQL)

253

* @param arg - Projection specification

254

* @returns this aggregate

255

*/

256

project(arg: any): this;

257

258

/**

259

* Add computed fields to documents

260

* @param arg - Fields to add

261

* @returns this aggregate

262

*/

263

addFields(arg: any): this;

264

265

/**

266

* Replace document root with specified field

267

* @param newRoot - New root specification

268

* @returns this aggregate

269

*/

270

replaceRoot(newRoot: any): this;

271

272

/**

273

* Deconstruct array fields into separate documents

274

* @param path - Array field path or unwind specification

275

* @returns this aggregate

276

*/

277

unwind(path: string | UnwindOptions): this;

278

279

/**

280

* Sort documents

281

* @param arg - Sort specification

282

* @returns this aggregate

283

*/

284

sort(arg: any): this;

285

}

286

287

interface UnwindOptions {

288

/** Path to array field */

289

path: string;

290

291

/** Field to store array index */

292

includeArrayIndex?: string;

293

294

/** Preserve documents with null/missing arrays */

295

preserveNullAndEmptyArrays?: boolean;

296

}

297

```

298

299

**Usage Examples:**

300

301

```javascript

302

// Project specific fields with calculations

303

const userSummary = await User.aggregate()

304

.project({

305

name: 1,

306

email: 1,

307

age: 1,

308

isAdult: { $gte: ['$age', 18] },

309

fullName: { $concat: ['$firstName', ' ', '$lastName'] },

310

yearBorn: { $subtract: [new Date().getFullYear(), '$age'] }

311

})

312

.exec();

313

314

// Add computed fields

315

const enrichedUsers = await User.aggregate()

316

.addFields({

317

isAdult: { $gte: ['$age', 18] },

318

fullName: { $concat: ['$firstName', ' ', '$lastName'] },

319

salaryGrade: {

320

$switch: {

321

branches: [

322

{ case: { $lt: ['$salary', 30000] }, then: 'Junior' },

323

{ case: { $lt: ['$salary', 60000] }, then: 'Mid' }

324

],

325

default: 'Senior'

326

}

327

}

328

})

329

.exec();

330

331

// Unwind array fields

332

const userSkills = await User.aggregate()

333

.unwind('$skills')

334

.group({

335

_id: '$skills',

336

users: { $addToSet: '$name' },

337

count: { $sum: 1 }

338

})

339

.sort({ count: -1 })

340

.exec();

341

342

// Unwind with options

343

const postComments = await Post.aggregate()

344

.unwind({

345

path: '$comments',

346

includeArrayIndex: 'commentIndex',

347

preserveNullAndEmptyArrays: true

348

})

349

.exec();

350

351

// Replace root to flatten nested objects

352

const profiles = await User.aggregate()

353

.replaceRoot({ $mergeObjects: ['$profile', { userId: '$_id' }] })

354

.exec();

355

```

356

357

### Joins and Lookups

358

359

Join data from multiple collections using lookup operations.

360

361

```javascript { .api }

362

interface Aggregate<T> {

363

/**

364

* Join with another collection (like JOIN in SQL)

365

* @param options - Lookup configuration

366

* @returns this aggregate

367

*/

368

lookup(options: LookupOptions): this;

369

370

/**

371

* Combine with another collection

372

* @param options - Union options

373

* @returns this aggregate

374

*/

375

unionWith(options: UnionWithOptions): this;

376

}

377

378

interface LookupOptions {

379

/** Collection to join with */

380

from: string;

381

382

/** Local field for join */

383

localField: string;

384

385

/** Foreign field for join */

386

foreignField: string;

387

388

/** Output array field name */

389

as: string;

390

391

/** Pipeline to run on joined collection (advanced) */

392

pipeline?: PipelineStage[];

393

394

/** Let variables for pipeline */

395

let?: any;

396

}

397

398

interface UnionWithOptions {

399

/** Collection to union with */

400

coll: string;

401

402

/** Pipeline to run on union collection */

403

pipeline?: PipelineStage[];

404

}

405

```

406

407

**Usage Examples:**

408

409

```javascript

410

// Basic lookup (join)

411

const usersWithPosts = await User.aggregate()

412

.lookup({

413

from: 'posts',

414

localField: '_id',

415

foreignField: 'author',

416

as: 'posts'

417

})

418

.addFields({

419

postCount: { $size: '$posts' }

420

})

421

.sort({ postCount: -1 })

422

.exec();

423

424

// Advanced lookup with pipeline

425

const usersWithRecentPosts = await User.aggregate()

426

.lookup({

427

from: 'posts',

428

let: { userId: '$_id' },

429

pipeline: [

430

{ $match: {

431

$expr: { $eq: ['$author', '$$userId'] },

432

createdAt: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) }

433

}},

434

{ $project: { title: 1, createdAt: 1 } },

435

{ $sort: { createdAt: -1 } },

436

{ $limit: 5 }

437

],

438

as: 'recentPosts'

439

})

440

.exec();

441

442

// Multiple lookups

443

const completeUserData = await User.aggregate()

444

.lookup({

445

from: 'posts',

446

localField: '_id',

447

foreignField: 'author',

448

as: 'posts'

449

})

450

.lookup({

451

from: 'comments',

452

localField: '_id',

453

foreignField: 'author',

454

as: 'comments'

455

})

456

.addFields({

457

totalActivity: { $add: [{ $size: '$posts' }, { $size: '$comments' }] }

458

})

459

.exec();

460

461

// Union with another collection

462

const allContent = await Post.aggregate()

463

.unionWith({

464

coll: 'pages',

465

pipeline: [

466

{ $match: { published: true } },

467

{ $addFields: { type: 'page' } }

468

]

469

})

470

.addFields({ type: { $ifNull: ['$type', 'post'] } })

471

.sort({ createdAt: -1 })

472

.exec();

473

```

474

475

### Advanced Operations

476

477

Advanced aggregation operations for complex data processing.

478

479

```javascript { .api }

480

interface Aggregate<T> {

481

/**

482

* Multi-faceted aggregation (multiple pipelines)

483

* @param facets - Object with named pipeline facets

484

* @returns this aggregate

485

*/

486

facet(facets: { [key: string]: PipelineStage[] }): this;

487

488

/**

489

* Conditionally exclude documents from output

490

* @param expression - Redaction expression

491

* @returns this aggregate

492

*/

493

redact(expression: any): this;

494

495

/**

496

* Output results to a collection

497

* @param collection - Target collection name

498

* @returns this aggregate

499

*/

500

out(collection: string): this;

501

502

/**

503

* Merge results into a collection

504

* @param options - Merge options

505

* @returns this aggregate

506

*/

507

merge(options: MergeOptions): this;

508

509

/**

510

* Create search index or perform search

511

* @param options - Search options

512

* @returns this aggregate

513

*/

514

search(options: any): this;

515

}

516

517

interface MergeOptions {

518

/** Target collection */

519

into: string;

520

521

/** When documents match */

522

whenMatched?: 'replace' | 'keepExisting' | 'merge' | 'fail' | PipelineStage[];

523

524

/** When documents don't match */

525

whenNotMatched?: 'insert' | 'discard' | 'fail';

526

527

/** Fields to match on */

528

on?: string | string[];

529

}

530

```

531

532

**Usage Examples:**

533

534

```javascript

535

// Multi-faceted aggregation

536

const analytics = await User.aggregate()

537

.facet({

538

ageStats: [

539

{ $group: { _id: null, avgAge: { $avg: '$age' }, maxAge: { $max: '$age' } } }

540

],

541

departmentCounts: [

542

{ $group: { _id: '$department', count: { $sum: 1 } } },

543

{ $sort: { count: -1 } }

544

],

545

salaryBuckets: [

546

{ $bucket: {

547

groupBy: '$salary',

548

boundaries: [0, 30000, 60000, 100000, Infinity],

549

default: 'Other',

550

output: { count: { $sum: 1 } }

551

}}

552

]

553

})

554

.exec();

555

556

// Redact sensitive information

557

const publicUsers = await User.aggregate()

558

.redact({

559

$cond: {

560

if: { $eq: ['$privacy', 'public'] },

561

then: '$$KEEP',

562

else: '$$PRUNE'

563

}

564

})

565

.exec();

566

567

// Output to collection

568

await User.aggregate()

569

.match({ status: 'active' })

570

.group({

571

_id: '$department',

572

count: { $sum: 1 },

573

avgSalary: { $avg: '$salary' }

574

})

575

.out('departmentStats')

576

.exec();

577

578

// Merge results

579

await User.aggregate()

580

.match({ lastLogin: { $gte: new Date(Date.now() - 24 * 60 * 60 * 1000) } })

581

.group({

582

_id: { $dateToString: { format: '%Y-%m-%d', date: '$lastLogin' } },

583

dailyActiveUsers: { $sum: 1 }

584

})

585

.merge({

586

into: 'userMetrics',

587

whenMatched: 'replace',

588

whenNotMatched: 'insert'

589

})

590

.exec();

591

```

592

593

### Execution and Options

594

595

Execute aggregation pipelines with various options and result formats.

596

597

```javascript { .api }

598

interface Aggregate<T> {

599

/**

600

* Execute the aggregation pipeline

601

* @returns Promise resolving to aggregation results

602

*/

603

exec(): Promise<T>;

604

605

/**

606

* Execute and return a cursor for streaming results

607

* @param options - Cursor options

608

* @returns Aggregation cursor

609

*/

610

cursor(options?: AggregateCursorOptions): AggregateCursor<T>;

611

612

/**

613

* Get aggregation execution plan

614

* @returns Promise resolving to execution plan

615

*/

616

explain(): Promise<any>;

617

618

/**

619

* Allow disk usage for large aggregations

620

* @param val - Enable disk usage

621

* @returns this aggregate

622

*/

623

allowDiskUse(val: boolean): this;

624

625

/**

626

* Set collation for string operations

627

* @param collation - Collation options

628

* @returns this aggregate

629

*/

630

collation(collation: CollationOptions): this;

631

632

/**

633

* Use database session

634

* @param session - MongoDB session

635

* @returns this aggregate

636

*/

637

session(session: ClientSession): this;

638

639

/**

640

* Set aggregation options

641

* @param options - Aggregation options

642

* @returns this aggregate

643

*/

644

option(options: AggregateOptions): this;

645

}

646

647

interface AggregateCursorOptions {

648

/** Batch size for cursor */

649

batchSize?: number;

650

651

/** Use session */

652

session?: ClientSession;

653

}

654

655

interface AggregateOptions {

656

/** Allow disk usage */

657

allowDiskUse?: boolean;

658

659

/** Cursor batch size */

660

batchSize?: number;

661

662

/** Maximum execution time */

663

maxTimeMS?: number;

664

665

/** Collation */

666

collation?: CollationOptions;

667

668

/** Read preference */

669

readPreference?: string;

670

671

/** Comment for profiling */

672

comment?: string;

673

674

/** Hint for index usage */

675

hint?: any;

676

}

677

```

678

679

**Usage Examples:**

680

681

```javascript

682

// Basic execution

683

const results = await User.aggregate()

684

.match({ status: 'active' })

685

.group({ _id: '$department', count: { $sum: 1 } })

686

.exec();

687

688

// Streaming large results

689

const cursor = User.aggregate()

690

.match({ createdAt: { $gte: new Date('2023-01-01') } })

691

.cursor({ batchSize: 100 });

692

693

for (let doc = await cursor.next(); doc != null; doc = await cursor.next()) {

694

console.log('Processing:', doc);

695

}

696

697

// With options for performance

698

const results = await User.aggregate()

699

.match({ status: 'active' })

700

.group({ _id: '$department', count: { $sum: 1 } })

701

.allowDiskUse(true)

702

.collation({ locale: 'en', strength: 2 })

703

.option({

704

maxTimeMS: 30000,

705

comment: 'Department statistics query'

706

})

707

.exec();

708

709

// Execution plan analysis

710

const plan = await User.aggregate()

711

.match({ age: { $gte: 18 } })

712

.group({ _id: '$status', count: { $sum: 1 } })

713

.explain();

714

715

console.log('Aggregation execution plan:', plan);

716

```

717

718

## Types

719

720

```javascript { .api }

721

type PipelineStage =

722

| { $match: any }

723

| { $group: any }

724

| { $project: any }

725

| { $addFields: any }

726

| { $sort: any }

727

| { $limit: number }

728

| { $skip: number }

729

| { $unwind: string | UnwindOptions }

730

| { $lookup: LookupOptions }

731

| { $sample: { size: number } }

732

| { $sortByCount: any }

733

| { $count: string }

734

| { $bucket: BucketOptions }

735

| { $bucketAuto: BucketAutoOptions }

736

| { $facet: { [key: string]: PipelineStage[] } }

737

| { $replaceRoot: any }

738

| { $redact: any }

739

| { $out: string }

740

| { $merge: MergeOptions }

741

| { [key: string]: any };

742

743

interface AggregateCursor<T> extends NodeJS.ReadableStream {

744

/** Get next document */

745

next(): Promise<T | null>;

746

747

/** Close cursor */

748

close(): Promise<void>;

749

750

/** Check if closed */

751

closed: boolean;

752

753

/** Transform documents */

754

map<U>(fn: (doc: T) => U): AggregateCursor<U>;

755

756

/** Event listeners */

757

on(event: 'data' | 'error' | 'end', listener: Function): this;

758

}

759

760

interface CollationOptions {

761

locale: string;

762

caseLevel?: boolean;

763

caseFirst?: string;

764

strength?: number;

765

numericOrdering?: boolean;

766

alternate?: string;

767

maxVariable?: string;

768

backwards?: boolean;

769

}

770

```