Nest microservices framework providing scalable distributed systems with multiple transport layers and communication patterns.
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Decorators and patterns for defining message handlers with request-response and event-driven communication models, supporting pattern-based routing and payload extraction for distributed microservice architectures.
Decorator for subscribing to incoming messages with request-response pattern where a response is expected.
/**
* Subscribes to incoming messages with request-response pattern
* @param metadata - Pattern metadata for message routing
* @param transport - Specific transport to bind the pattern to
* @param extras - Additional configuration options
* @returns Method decorator
*/
function MessagePattern<T = string>(
metadata?: T,
transport?: Transport | symbol,
extras?: Record<string, any>
): MethodDecorator;Usage Examples:
import { Controller } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';
@Controller()
export class MathController {
@MessagePattern({ cmd: 'sum' })
accumulate(@Payload() data: number[]): number {
return (data || []).reduce((a, b) => a + b, 0);
}
@MessagePattern('get_user')
getUser(@Payload() id: number): Promise<User> {
return this.userService.findById(id);
}
// With transport-specific binding
@MessagePattern({ cmd: 'process' }, Transport.REDIS)
processOnRedis(@Payload() data: any): any {
return this.processData(data);
}
// With metadata and extras
@MessagePattern(
{ service: 'user', method: 'create' },
Transport.KAFKA,
{ version: '1.0' }
)
createUser(@Payload() userData: CreateUserDto): Promise<User> {
return this.userService.create(userData);
}
}Decorator for subscribing to incoming events with fire-and-forget pattern where no response is expected.
/**
* Subscribes to incoming events with fire-and-forget pattern
* @param metadata - Event pattern metadata for routing
* @param transport - Specific transport to bind the pattern to
* @param extras - Additional configuration options
* @returns Method decorator
*/
function EventPattern<T = string>(
metadata?: T,
transport?: Transport | symbol,
extras?: Record<string, any>
): MethodDecorator;Usage Examples:
import { Controller } from '@nestjs/common';
import { EventPattern, Payload, Ctx } from '@nestjs/microservices';
@Controller()
export class EventController {
@EventPattern('user_created')
handleUserCreated(@Payload() data: User): void {
console.log('New user created:', data.email);
// Perform side effects like sending welcome email
this.emailService.sendWelcomeEmail(data.email);
}
@EventPattern({ type: 'order', action: 'completed' })
handleOrderCompleted(@Payload() order: Order): void {
this.analyticsService.trackOrderCompletion(order);
this.inventoryService.updateStock(order.items);
}
// Transport-specific event handling
@EventPattern('inventory.updated', Transport.KAFKA)
handleInventoryUpdate(@Payload() data: InventoryUpdate): void {
this.cacheService.invalidateProductCache(data.productId);
}
}Parameter decorator for extracting payload data from incoming messages with optional property selection and pipe transformations.
/**
* Extracts payload from incoming microservice message
* @param property - Optional property name to extract from payload
* @param pipes - Optional transformation pipes
* @returns Parameter decorator
*/
function Payload(): ParameterDecorator;
function Payload(...pipes: PipeTransform[]): ParameterDecorator;
function Payload(property?: string, ...pipes: PipeTransform[]): ParameterDecorator;Usage Examples:
import { Controller, ParseIntPipe, ValidationPipe } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';
@Controller()
export class DataController {
// Extract entire payload
@MessagePattern('process_data')
processData(@Payload() data: ProcessDataDto): ProcessResult {
return this.dataService.process(data);
}
// Extract specific property from payload
@MessagePattern('get_user')
getUser(@Payload('id') userId: number): Promise<User> {
return this.userService.findById(userId);
}
// Apply validation pipe to payload
@MessagePattern('create_user')
createUser(@Payload(ValidationPipe) userData: CreateUserDto): Promise<User> {
return this.userService.create(userData);
}
// Extract property and apply transformation pipe
@MessagePattern('calculate')
calculate(@Payload('value', ParseIntPipe) value: number): number {
return value * 2;
}
// Multiple parameters with different extractions
@MessagePattern('update_user')
updateUser(
@Payload('id', ParseIntPipe) id: number,
@Payload('data', ValidationPipe) updateData: UpdateUserDto
): Promise<User> {
return this.userService.update(id, updateData);
}
}Parameter decorator for injecting transport-specific context object into message handler methods.
/**
* Injects RPC context into handler method parameter
* @returns Parameter decorator
*/
function Ctx(): ParameterDecorator;Usage Examples:
import { Controller } from '@nestjs/common';
import { MessagePattern, EventPattern, Payload, Ctx } from '@nestjs/microservices';
import { KafkaContext, TcpContext, RedisContext } from '@nestjs/microservices';
@Controller()
export class ContextController {
@MessagePattern('kafka_message')
handleKafkaMessage(
@Payload() data: any,
@Ctx() context: KafkaContext
): any {
const topic = context.getTopic();
const partition = context.getPartition();
const message = context.getMessage();
console.log(`Processing message from topic: ${topic}, partition: ${partition}`);
return this.processMessage(data, { topic, partition });
}
@EventPattern('tcp_event')
handleTcpEvent(
@Payload() data: any,
@Ctx() context: TcpContext
): void {
const socket = context.getSocketRef();
const pattern = context.getPattern();
console.log(`TCP event received with pattern: ${pattern}`);
// Access raw socket for additional operations
socket.write('ACK');
}
@MessagePattern('redis_request')
handleRedisRequest(
@Payload() data: any,
@Ctx() context: RedisContext
): any {
const channel = context.getChannel();
console.log(`Redis request on channel: ${channel}`);
return this.processRedisData(data);
}
}Interface for defining pattern metadata structure used in message and event patterns.
/**
* Pattern metadata interface for message routing
*/
interface PatternMetadata {
[key: string]: any;
}
/**
* Message handler interface for custom implementations
*/
interface MessageHandler<TInput = any, TResult = any, TContext = any> {
(data: TInput, ctx?: TContext): Observable<TResult> | Promise<TResult> | TResult;
}Pattern Matching Strategies:
// String patterns
@MessagePattern('user.get')
@EventPattern('user.created')
// Object patterns - exact match
@MessagePattern({ cmd: 'get', resource: 'user' })
@EventPattern({ type: 'user', action: 'created' })
// Nested object patterns
@MessagePattern({
service: 'user',
operation: { type: 'read', version: 'v1' }
})
// Array patterns (for some transports)
@MessagePattern(['user', 'get'])
@EventPattern(['events', 'user', 'created'])Transport-Specific Pattern Features:
// Kafka topic patterns
@MessagePattern('user.commands.create', Transport.KAFKA)
@EventPattern('user.events.created', Transport.KAFKA)
// MQTT topic patterns with wildcards (transport handles routing)
@EventPattern('sensors/+/temperature', Transport.MQTT)
@EventPattern('devices/+/status/#', Transport.MQTT)
// gRPC service method patterns
@MessagePattern({ service: 'UserService', method: 'GetUser' }, Transport.GRPC)
// RabbitMQ routing key patterns
@MessagePattern('user.commands.create', Transport.RMQ)
@EventPattern('user.events.*', Transport.RMQ)Error Handling in Message Handlers:
import { RpcException } from '@nestjs/microservices';
@Controller()
export class ErrorHandlingController {
@MessagePattern('risky_operation')
performRiskyOperation(@Payload() data: any): any {
try {
return this.processData(data);
} catch (error) {
throw new RpcException({
code: 'PROCESSING_ERROR',
message: error.message,
details: { input: data }
});
}
}
@EventPattern('cleanup_task')
async performCleanup(@Payload() data: any): Promise<void> {
try {
await this.cleanupService.cleanup(data);
} catch (error) {
// Log error but don't throw for fire-and-forget events
this.logger.error('Cleanup failed', error);
}
}
}