CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-mongoose

Mongoose is a comprehensive MongoDB object modeling tool designed for asynchronous environments with schema-based validation, query building, and business logic hooks.

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

aggregation.mddocs/

Aggregation Pipeline

MongoDB aggregation pipeline builder with stage methods, type safety, and execution options for complex data processing and analytics.

Capabilities

Pipeline Creation

Create aggregation pipelines with method chaining for complex data transformations.

/**
 * Create aggregation pipeline
 * @param pipeline - Initial pipeline stages
 * @returns Aggregate instance
 */
Model.aggregate<T>(pipeline?: PipelineStage[]): Aggregate<T[]>;

interface Aggregate<T> {
  /**
   * Add pipeline stages
   * @param stages - Pipeline stages to append
   * @returns this aggregate
   */
  append(...stages: PipelineStage[]): this;
  
  /**
   * Prepend pipeline stages  
   * @param stages - Pipeline stages to prepend
   * @returns this aggregate
   */
  prepend(...stages: PipelineStage[]): this;
  
  /**
   * Get current pipeline
   * @returns Array of pipeline stages
   */
  pipeline(): PipelineStage[];
}

Usage Examples:

const User = mongoose.model('User', userSchema);

// Basic aggregation
const results = await User.aggregate([
  { $match: { status: 'active' } },
  { $group: { _id: '$department', count: { $sum: 1 } } },
  { $sort: { count: -1 } }
]);

// Method chaining
const pipeline = User.aggregate()
  .match({ status: 'active' })
  .group({ _id: '$department', count: { $sum: 1 } })
  .sort({ count: -1 });

const results = await pipeline.exec();

Filtering and Matching

Filter documents at various stages of the pipeline.

interface Aggregate<T> {
  /**
   * Filter documents (equivalent to find conditions)
   * @param conditions - Match conditions
   * @returns this aggregate
   */
  match(conditions: FilterQuery<any>): this;
  
  /**
   * Sample random documents
   * @param size - Number of documents to sample
   * @returns this aggregate
   */
  sample(size: number): this;
  
  /**
   * Limit number of documents
   * @param num - Maximum number of documents
   * @returns this aggregate
   */
  limit(num: number): this;
  
  /**
   * Skip number of documents
   * @param num - Number of documents to skip
   * @returns this aggregate
   */
  skip(num: number): this;
}

Usage Examples:

// Filter active users
const activeUsers = await User.aggregate()
  .match({ status: 'active', age: { $gte: 18 } })
  .exec();

// Sample random documents
const randomUsers = await User.aggregate()
  .sample(10)
  .exec();

// Pagination in aggregation
const page2Users = await User.aggregate()
  .match({ status: 'active' })
  .sort({ createdAt: -1 })
  .skip(10)
  .limit(10)
  .exec();

Grouping and Aggregation

Group documents and perform calculations on grouped data.

interface Aggregate<T> {
  /**
   * Group documents by specified fields
   * @param arg - Group specification object
   * @returns this aggregate
   */
  group(arg: any): this;
  
  /**
   * Sort documents by count (shorthand for group + sort)
   * @param arg - Field to sort by count
   * @returns this aggregate
   */
  sortByCount(arg: any): this;
  
  /**
   * Count documents in pipeline
   * @param field - Field name for count result
   * @returns this aggregate
   */
  count(field: string): this;
  
  /**
   * Categorize documents into buckets
   * @param options - Bucket configuration
   * @returns this aggregate
   */
  bucket(options: BucketOptions): this;
  
  /**
   * Automatically categorize documents into buckets
   * @param options - Auto bucket configuration
   * @returns this aggregate
   */
  bucketAuto(options: BucketAutoOptions): this;
}

interface BucketOptions {
  /** Field to bucket by */
  groupBy: any;
  
  /** Bucket boundaries */
  boundaries: any[];
  
  /** Default bucket for out-of-range values */
  default?: any;
  
  /** Output specification */
  output?: any;
}

interface BucketAutoOptions {
  /** Field to bucket by */
  groupBy: any;
  
  /** Number of buckets */
  buckets: number;
  
  /** Output specification */
  output?: any;
  
  /** Granularity for bucket boundaries */
  granularity?: string;
}

Usage Examples:

// Group by department with statistics
const departmentStats = await User.aggregate()
  .match({ status: 'active' })
  .group({
    _id: '$department',
    count: { $sum: 1 },
    avgAge: { $avg: '$age' },
    maxSalary: { $max: '$salary' },
    minSalary: { $min: '$salary' },
    employees: { $push: '$name' }
  })
  .sort({ count: -1 })
  .exec();

// Sort by count (shorthand)
const popularDepartments = await User.aggregate()
  .sortByCount('$department')
  .exec();

// Count total documents
const totalCount = await User.aggregate()
  .match({ status: 'active' })
  .count('totalUsers')
  .exec();

// Age buckets
const ageBuckets = await User.aggregate()
  .bucket({
    groupBy: '$age',
    boundaries: [0, 25, 35, 50, 100],
    default: 'Other',
    output: {
      count: { $sum: 1 },
      avgSalary: { $avg: '$salary' }
    }
  })
  .exec();

// Auto buckets for salary ranges
const salaryBuckets = await User.aggregate()
  .bucketAuto({
    groupBy: '$salary',
    buckets: 5,
    output: {
      count: { $sum: 1 },
      avgAge: { $avg: '$age' }
    }
  })
  .exec();

Data Transformation

Transform and reshape documents in the pipeline.

interface Aggregate<T> {
  /**
   * Select/transform fields (like SELECT in SQL)
   * @param arg - Projection specification
   * @returns this aggregate
   */
  project(arg: any): this;
  
  /**
   * Add computed fields to documents
   * @param arg - Fields to add
   * @returns this aggregate
   */
  addFields(arg: any): this;
  
  /**
   * Replace document root with specified field
   * @param newRoot - New root specification
   * @returns this aggregate
   */
  replaceRoot(newRoot: any): this;
  
  /**
   * Deconstruct array fields into separate documents
   * @param path - Array field path or unwind specification
   * @returns this aggregate
   */
  unwind(path: string | UnwindOptions): this;
  
  /**
   * Sort documents
   * @param arg - Sort specification
   * @returns this aggregate
   */
  sort(arg: any): this;
}

interface UnwindOptions {
  /** Path to array field */
  path: string;
  
  /** Field to store array index */
  includeArrayIndex?: string;
  
  /** Preserve documents with null/missing arrays */
  preserveNullAndEmptyArrays?: boolean;
}

Usage Examples:

// Project specific fields with calculations
const userSummary = await User.aggregate()
  .project({
    name: 1,
    email: 1,
    age: 1,
    isAdult: { $gte: ['$age', 18] },
    fullName: { $concat: ['$firstName', ' ', '$lastName'] },
    yearBorn: { $subtract: [new Date().getFullYear(), '$age'] }
  })
  .exec();

// Add computed fields
const enrichedUsers = await User.aggregate()
  .addFields({
    isAdult: { $gte: ['$age', 18] },
    fullName: { $concat: ['$firstName', ' ', '$lastName'] },
    salaryGrade: {
      $switch: {
        branches: [
          { case: { $lt: ['$salary', 30000] }, then: 'Junior' },
          { case: { $lt: ['$salary', 60000] }, then: 'Mid' }
        ],
        default: 'Senior'
      }
    }
  })
  .exec();

// Unwind array fields
const userSkills = await User.aggregate()
  .unwind('$skills')
  .group({
    _id: '$skills',
    users: { $addToSet: '$name' },
    count: { $sum: 1 }
  })
  .sort({ count: -1 })
  .exec();

// Unwind with options
const postComments = await Post.aggregate()
  .unwind({
    path: '$comments',
    includeArrayIndex: 'commentIndex',
    preserveNullAndEmptyArrays: true
  })
  .exec();

// Replace root to flatten nested objects
const profiles = await User.aggregate()
  .replaceRoot({ $mergeObjects: ['$profile', { userId: '$_id' }] })
  .exec();

Joins and Lookups

Join data from multiple collections using lookup operations.

interface Aggregate<T> {
  /**
   * Join with another collection (like JOIN in SQL)
   * @param options - Lookup configuration
   * @returns this aggregate
   */
  lookup(options: LookupOptions): this;
  
  /**
   * Combine with another collection
   * @param options - Union options
   * @returns this aggregate
   */
  unionWith(options: UnionWithOptions): this;
}

interface LookupOptions {
  /** Collection to join with */
  from: string;
  
  /** Local field for join */
  localField: string;
  
  /** Foreign field for join */
  foreignField: string;
  
  /** Output array field name */
  as: string;
  
  /** Pipeline to run on joined collection (advanced) */
  pipeline?: PipelineStage[];
  
  /** Let variables for pipeline */
  let?: any;
}

interface UnionWithOptions {
  /** Collection to union with */
  coll: string;
  
  /** Pipeline to run on union collection */
  pipeline?: PipelineStage[];
}

Usage Examples:

// Basic lookup (join)
const usersWithPosts = await User.aggregate()
  .lookup({
    from: 'posts',
    localField: '_id',
    foreignField: 'author',
    as: 'posts'
  })
  .addFields({
    postCount: { $size: '$posts' }
  })
  .sort({ postCount: -1 })
  .exec();

// Advanced lookup with pipeline
const usersWithRecentPosts = await User.aggregate()
  .lookup({
    from: 'posts',
    let: { userId: '$_id' },
    pipeline: [
      { $match: {
        $expr: { $eq: ['$author', '$$userId'] },
        createdAt: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) }
      }},
      { $project: { title: 1, createdAt: 1 } },
      { $sort: { createdAt: -1 } },
      { $limit: 5 }
    ],
    as: 'recentPosts'
  })
  .exec();

// Multiple lookups
const completeUserData = await User.aggregate()
  .lookup({
    from: 'posts',
    localField: '_id',
    foreignField: 'author',
    as: 'posts'
  })
  .lookup({
    from: 'comments',
    localField: '_id',
    foreignField: 'author',
    as: 'comments'
  })
  .addFields({
    totalActivity: { $add: [{ $size: '$posts' }, { $size: '$comments' }] }
  })
  .exec();

// Union with another collection
const allContent = await Post.aggregate()
  .unionWith({
    coll: 'pages',
    pipeline: [
      { $match: { published: true } },
      { $addFields: { type: 'page' } }
    ]
  })
  .addFields({ type: { $ifNull: ['$type', 'post'] } })
  .sort({ createdAt: -1 })
  .exec();

Advanced Operations

Advanced aggregation operations for complex data processing.

interface Aggregate<T> {
  /**
   * Multi-faceted aggregation (multiple pipelines)
   * @param facets - Object with named pipeline facets
   * @returns this aggregate
   */
  facet(facets: { [key: string]: PipelineStage[] }): this;
  
  /**
   * Conditionally exclude documents from output
   * @param expression - Redaction expression
   * @returns this aggregate
   */
  redact(expression: any): this;
  
  /**
   * Output results to a collection
   * @param collection - Target collection name
   * @returns this aggregate
   */
  out(collection: string): this;
  
  /**
   * Merge results into a collection
   * @param options - Merge options
   * @returns this aggregate
   */
  merge(options: MergeOptions): this;
  
  /**
   * Create search index or perform search
   * @param options - Search options
   * @returns this aggregate
   */
  search(options: any): this;
}

interface MergeOptions {
  /** Target collection */
  into: string;
  
  /** When documents match */
  whenMatched?: 'replace' | 'keepExisting' | 'merge' | 'fail' | PipelineStage[];
  
  /** When documents don't match */
  whenNotMatched?: 'insert' | 'discard' | 'fail';
  
  /** Fields to match on */
  on?: string | string[];
}

Usage Examples:

// Multi-faceted aggregation
const analytics = await User.aggregate()
  .facet({
    ageStats: [
      { $group: { _id: null, avgAge: { $avg: '$age' }, maxAge: { $max: '$age' } } }
    ],
    departmentCounts: [
      { $group: { _id: '$department', count: { $sum: 1 } } },
      { $sort: { count: -1 } }
    ],
    salaryBuckets: [
      { $bucket: {
        groupBy: '$salary',
        boundaries: [0, 30000, 60000, 100000, Infinity],
        default: 'Other',
        output: { count: { $sum: 1 } }
      }}
    ]
  })
  .exec();

// Redact sensitive information
const publicUsers = await User.aggregate()
  .redact({
    $cond: {
      if: { $eq: ['$privacy', 'public'] },
      then: '$$KEEP',
      else: '$$PRUNE'
    }
  })
  .exec();

// Output to collection
await User.aggregate()
  .match({ status: 'active' })
  .group({
    _id: '$department',
    count: { $sum: 1 },
    avgSalary: { $avg: '$salary' }
  })
  .out('departmentStats')
  .exec();

// Merge results
await User.aggregate()
  .match({ lastLogin: { $gte: new Date(Date.now() - 24 * 60 * 60 * 1000) } })
  .group({
    _id: { $dateToString: { format: '%Y-%m-%d', date: '$lastLogin' } },
    dailyActiveUsers: { $sum: 1 }
  })
  .merge({
    into: 'userMetrics',
    whenMatched: 'replace',
    whenNotMatched: 'insert'
  })
  .exec();

Execution and Options

Execute aggregation pipelines with various options and result formats.

interface Aggregate<T> {
  /**
   * Execute the aggregation pipeline
   * @returns Promise resolving to aggregation results
   */
  exec(): Promise<T>;
  
  /**
   * Execute and return a cursor for streaming results
   * @param options - Cursor options
   * @returns Aggregation cursor
   */
  cursor(options?: AggregateCursorOptions): AggregateCursor<T>;
  
  /**
   * Get aggregation execution plan
   * @returns Promise resolving to execution plan
   */
  explain(): Promise<any>;
  
  /**
   * Allow disk usage for large aggregations
   * @param val - Enable disk usage
   * @returns this aggregate
   */
  allowDiskUse(val: boolean): this;
  
  /**
   * Set collation for string operations
   * @param collation - Collation options
   * @returns this aggregate
   */
  collation(collation: CollationOptions): this;
  
  /**
   * Use database session
   * @param session - MongoDB session
   * @returns this aggregate
   */
  session(session: ClientSession): this;
  
  /**
   * Set aggregation options
   * @param options - Aggregation options
   * @returns this aggregate
   */
  option(options: AggregateOptions): this;
}

interface AggregateCursorOptions {
  /** Batch size for cursor */
  batchSize?: number;
  
  /** Use session */
  session?: ClientSession;
}

interface AggregateOptions {
  /** Allow disk usage */
  allowDiskUse?: boolean;
  
  /** Cursor batch size */
  batchSize?: number;
  
  /** Maximum execution time */
  maxTimeMS?: number;
  
  /** Collation */
  collation?: CollationOptions;
  
  /** Read preference */
  readPreference?: string;
  
  /** Comment for profiling */
  comment?: string;
  
  /** Hint for index usage */
  hint?: any;
}

Usage Examples:

// Basic execution
const results = await User.aggregate()
  .match({ status: 'active' })
  .group({ _id: '$department', count: { $sum: 1 } })
  .exec();

// Streaming large results
const cursor = User.aggregate()
  .match({ createdAt: { $gte: new Date('2023-01-01') } })
  .cursor({ batchSize: 100 });

for (let doc = await cursor.next(); doc != null; doc = await cursor.next()) {
  console.log('Processing:', doc);
}

// With options for performance
const results = await User.aggregate()
  .match({ status: 'active' })
  .group({ _id: '$department', count: { $sum: 1 } })
  .allowDiskUse(true)
  .collation({ locale: 'en', strength: 2 })
  .option({
    maxTimeMS: 30000,
    comment: 'Department statistics query'
  })
  .exec();

// Execution plan analysis
const plan = await User.aggregate()
  .match({ age: { $gte: 18 } })
  .group({ _id: '$status', count: { $sum: 1 } })
  .explain();

console.log('Aggregation execution plan:', plan);

Types

type PipelineStage = 
  | { $match: any }
  | { $group: any }
  | { $project: any }
  | { $addFields: any }
  | { $sort: any }
  | { $limit: number }
  | { $skip: number }
  | { $unwind: string | UnwindOptions }
  | { $lookup: LookupOptions }
  | { $sample: { size: number } }
  | { $sortByCount: any }
  | { $count: string }
  | { $bucket: BucketOptions }
  | { $bucketAuto: BucketAutoOptions }
  | { $facet: { [key: string]: PipelineStage[] } }
  | { $replaceRoot: any }
  | { $redact: any }
  | { $out: string }
  | { $merge: MergeOptions }
  | { [key: string]: any };

interface AggregateCursor<T> extends NodeJS.ReadableStream {
  /** Get next document */
  next(): Promise<T | null>;
  
  /** Close cursor */
  close(): Promise<void>;
  
  /** Check if closed */
  closed: boolean;
  
  /** Transform documents */
  map<U>(fn: (doc: T) => U): AggregateCursor<U>;
  
  /** Event listeners */
  on(event: 'data' | 'error' | 'end', listener: Function): this;
}

interface CollationOptions {
  locale: string;
  caseLevel?: boolean;
  caseFirst?: string;
  strength?: number;
  numericOrdering?: boolean;
  alternate?: string;
  maxVariable?: string;
  backwards?: boolean;
}

docs

aggregation.md

connection.md

document.md

errors.md

index.md

model.md

query.md

schema.md

utilities.md

tile.json