CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-nestjs--microservices

Nest microservices framework providing scalable distributed systems with multiple transport layers and communication patterns.

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

message-patterns.mddocs/

Message Patterns & Event Handling

Decorators and patterns for defining message handlers with request-response and event-driven communication models, supporting pattern-based routing and payload extraction for distributed microservice architectures.

Capabilities

Message Pattern Decorator

Decorator for subscribing to incoming messages with request-response pattern where a response is expected.

/**
 * Subscribes to incoming messages with request-response pattern
 * @param metadata - Pattern metadata for message routing
 * @param transport - Specific transport to bind the pattern to
 * @param extras - Additional configuration options
 * @returns Method decorator
 */
function MessagePattern<T = string>(
  metadata?: T,
  transport?: Transport | symbol,
  extras?: Record<string, any>
): MethodDecorator;

Usage Examples:

import { Controller } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';

@Controller()
export class MathController {
  @MessagePattern({ cmd: 'sum' })
  accumulate(@Payload() data: number[]): number {
    return (data || []).reduce((a, b) => a + b, 0);
  }

  @MessagePattern('get_user')
  getUser(@Payload() id: number): Promise<User> {
    return this.userService.findById(id);
  }

  // With transport-specific binding
  @MessagePattern({ cmd: 'process' }, Transport.REDIS)
  processOnRedis(@Payload() data: any): any {
    return this.processData(data);
  }

  // With metadata and extras
  @MessagePattern(
    { service: 'user', method: 'create' },
    Transport.KAFKA,
    { version: '1.0' }
  )
  createUser(@Payload() userData: CreateUserDto): Promise<User> {
    return this.userService.create(userData);
  }
}

Event Pattern Decorator

Decorator for subscribing to incoming events with fire-and-forget pattern where no response is expected.

/**
 * Subscribes to incoming events with fire-and-forget pattern
 * @param metadata - Event pattern metadata for routing
 * @param transport - Specific transport to bind the pattern to
 * @param extras - Additional configuration options
 * @returns Method decorator
 */
function EventPattern<T = string>(
  metadata?: T,
  transport?: Transport | symbol,
  extras?: Record<string, any>
): MethodDecorator;

Usage Examples:

import { Controller } from '@nestjs/common';
import { EventPattern, Payload, Ctx } from '@nestjs/microservices';

@Controller()
export class EventController {
  @EventPattern('user_created')
  handleUserCreated(@Payload() data: User): void {
    console.log('New user created:', data.email);
    // Perform side effects like sending welcome email
    this.emailService.sendWelcomeEmail(data.email);
  }

  @EventPattern({ type: 'order', action: 'completed' })
  handleOrderCompleted(@Payload() order: Order): void {
    this.analyticsService.trackOrderCompletion(order);
    this.inventoryService.updateStock(order.items);
  }

  // Transport-specific event handling
  @EventPattern('inventory.updated', Transport.KAFKA)
  handleInventoryUpdate(@Payload() data: InventoryUpdate): void {
    this.cacheService.invalidateProductCache(data.productId);
  }
}

Payload Decorator

Parameter decorator for extracting payload data from incoming messages with optional property selection and pipe transformations.

/**
 * Extracts payload from incoming microservice message
 * @param property - Optional property name to extract from payload
 * @param pipes - Optional transformation pipes
 * @returns Parameter decorator
 */
function Payload(): ParameterDecorator;
function Payload(...pipes: PipeTransform[]): ParameterDecorator;
function Payload(property?: string, ...pipes: PipeTransform[]): ParameterDecorator;

Usage Examples:

import { Controller, ParseIntPipe, ValidationPipe } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';

@Controller()
export class DataController {
  // Extract entire payload
  @MessagePattern('process_data')
  processData(@Payload() data: ProcessDataDto): ProcessResult {
    return this.dataService.process(data);
  }

  // Extract specific property from payload
  @MessagePattern('get_user')
  getUser(@Payload('id') userId: number): Promise<User> {
    return this.userService.findById(userId);
  }

  // Apply validation pipe to payload
  @MessagePattern('create_user')
  createUser(@Payload(ValidationPipe) userData: CreateUserDto): Promise<User> {
    return this.userService.create(userData);
  }

  // Extract property and apply transformation pipe
  @MessagePattern('calculate')
  calculate(@Payload('value', ParseIntPipe) value: number): number {
    return value * 2;
  }

  // Multiple parameters with different extractions
  @MessagePattern('update_user')
  updateUser(
    @Payload('id', ParseIntPipe) id: number,
    @Payload('data', ValidationPipe) updateData: UpdateUserDto
  ): Promise<User> {
    return this.userService.update(id, updateData);
  }
}

Context Decorator

Parameter decorator for injecting transport-specific context object into message handler methods.

/**
 * Injects RPC context into handler method parameter
 * @returns Parameter decorator
 */
function Ctx(): ParameterDecorator;

Usage Examples:

import { Controller } from '@nestjs/common';
import { MessagePattern, EventPattern, Payload, Ctx } from '@nestjs/microservices';
import { KafkaContext, TcpContext, RedisContext } from '@nestjs/microservices';

@Controller()
export class ContextController {
  @MessagePattern('kafka_message')
  handleKafkaMessage(
    @Payload() data: any,
    @Ctx() context: KafkaContext
  ): any {
    const topic = context.getTopic();
    const partition = context.getPartition();
    const message = context.getMessage();
    
    console.log(`Processing message from topic: ${topic}, partition: ${partition}`);
    return this.processMessage(data, { topic, partition });
  }

  @EventPattern('tcp_event')
  handleTcpEvent(
    @Payload() data: any,
    @Ctx() context: TcpContext
  ): void {
    const socket = context.getSocketRef();
    const pattern = context.getPattern();
    
    console.log(`TCP event received with pattern: ${pattern}`);
    // Access raw socket for additional operations
    socket.write('ACK');
  }

  @MessagePattern('redis_request')
  handleRedisRequest(
    @Payload() data: any,
    @Ctx() context: RedisContext
  ): any {
    const channel = context.getChannel();
    console.log(`Redis request on channel: ${channel}`);
    return this.processRedisData(data);
  }
}

Pattern Metadata Interface

Interface for defining pattern metadata structure used in message and event patterns.

/**
 * Pattern metadata interface for message routing
 */
interface PatternMetadata {
  [key: string]: any;
}

/**
 * Message handler interface for custom implementations
 */
interface MessageHandler<TInput = any, TResult = any, TContext = any> {
  (data: TInput, ctx?: TContext): Observable<TResult> | Promise<TResult> | TResult;
}

Advanced Pattern Usage

Pattern Matching Strategies:

// String patterns
@MessagePattern('user.get')
@EventPattern('user.created')

// Object patterns - exact match
@MessagePattern({ cmd: 'get', resource: 'user' })
@EventPattern({ type: 'user', action: 'created' })

// Nested object patterns
@MessagePattern({ 
  service: 'user', 
  operation: { type: 'read', version: 'v1' } 
})

// Array patterns (for some transports)
@MessagePattern(['user', 'get'])
@EventPattern(['events', 'user', 'created'])

Transport-Specific Pattern Features:

// Kafka topic patterns
@MessagePattern('user.commands.create', Transport.KAFKA)
@EventPattern('user.events.created', Transport.KAFKA)

// MQTT topic patterns with wildcards (transport handles routing)
@EventPattern('sensors/+/temperature', Transport.MQTT)
@EventPattern('devices/+/status/#', Transport.MQTT)

// gRPC service method patterns
@MessagePattern({ service: 'UserService', method: 'GetUser' }, Transport.GRPC)

// RabbitMQ routing key patterns
@MessagePattern('user.commands.create', Transport.RMQ)
@EventPattern('user.events.*', Transport.RMQ)

Error Handling in Message Handlers:

import { RpcException } from '@nestjs/microservices';

@Controller()
export class ErrorHandlingController {
  @MessagePattern('risky_operation')
  performRiskyOperation(@Payload() data: any): any {
    try {
      return this.processData(data);
    } catch (error) {
      throw new RpcException({
        code: 'PROCESSING_ERROR',
        message: error.message,
        details: { input: data }
      });
    }
  }

  @EventPattern('cleanup_task')
  async performCleanup(@Payload() data: any): Promise<void> {
    try {
      await this.cleanupService.cleanup(data);
    } catch (error) {
      // Log error but don't throw for fire-and-forget events
      this.logger.error('Cleanup failed', error);
    }
  }
}

docs

client-proxies.md

context-objects.md

exceptions.md

grpc.md

index.md

message-patterns.md

modules.md

transports.md

tile.json