CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-nestjs--microservices

Nest microservices framework providing scalable distributed systems with multiple transport layers and communication patterns.

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

context-objects.mddocs/

Context Objects

Transport-specific context objects providing access to underlying protocol details and metadata for message handling, enabling fine-grained control over microservice communication and transport-specific operations.

Capabilities

Base RPC Context

Abstract base class for all RPC context implementations providing common context operations.

/**
 * Base class for all RPC context implementations
 */
abstract class BaseRpcContext<T = any[]> {
  /**
   * Returns the arguments array from the context
   * @returns Arguments array with type information
   */
  getArgs(): T;
  
  /**
   * Returns specific argument by index position
   * @param index - Zero-based index of the argument
   * @returns Argument value at the specified index
   */
  getArgByIndex<T = any>(index: number): T;
}

Usage Examples:

import { Controller } from '@nestjs/common';
import { MessagePattern, Ctx } from '@nestjs/microservices';
import { BaseRpcContext } from '@nestjs/microservices';

@Controller()
export class BaseContextController {
  @MessagePattern('process_args')
  processWithArgs(@Ctx() context: BaseRpcContext): any {
    const allArgs = context.getArgs();
    const firstArg = context.getArgByIndex(0);
    const secondArg = context.getArgByIndex(1);
    
    return {
      argCount: allArgs.length,
      first: firstArg,
      second: secondArg
    };
  }
}

Kafka Context

Kafka-specific context providing access to Kafka message metadata, consumer, and producer instances.

/**
 * Kafka-specific context implementation
 */
class KafkaContext extends BaseRpcContext {
  /**
   * Returns the original Kafka message object
   * @returns Kafka message with headers, key, value, etc.
   */
  getMessage(): any;
  
  /**
   * Returns the partition number for the message
   * @returns Partition number
   */
  getPartition(): number;
  
  /**
   * Returns the topic name for the message
   * @returns Topic name string
   */
  getTopic(): string;
  
  /**
   * Returns reference to the Kafka consumer instance
   * @returns Kafka consumer for manual operations
   */
  getConsumer(): any;
  
  /**
   * Returns the heartbeat callback function
   * @returns Heartbeat function for consumer group coordination
   */
  getHeartbeat(): () => Promise<void>;
  
  /**
   * Returns reference to the Kafka producer instance
   * @returns Kafka producer for sending messages
   */
  getProducer(): any;
}

Usage Examples:

import { Controller, Logger } from '@nestjs/common';
import { EventPattern, MessagePattern, Payload, Ctx } from '@nestjs/microservices';
import { KafkaContext } from '@nestjs/microservices';

@Controller()
export class KafkaController {
  private readonly logger = new Logger(KafkaController.name);

  @EventPattern('user.events.created')
  async handleUserCreated(
    @Payload() userData: any,
    @Ctx() context: KafkaContext
  ): Promise<void> {
    const message = context.getMessage();
    const topic = context.getTopic();
    const partition = context.getPartition();
    
    this.logger.log(`Processing user creation from topic: ${topic}, partition: ${partition}`);
    this.logger.debug(`Message headers:`, message.headers);
    this.logger.debug(`Message key:`, message.key?.toString());
    
    // Manual heartbeat to prevent session timeout for long processing
    const heartbeat = context.getHeartbeat();
    await heartbeat();
    
    await this.processUserCreation(userData);
  }

  @MessagePattern('user.commands.process')
  async processUserCommand(
    @Payload() command: any,
    @Ctx() context: KafkaContext
  ): Promise<any> {
    const producer = context.getProducer();
    const consumer = context.getConsumer();
    
    try {
      const result = await this.executeCommand(command);
      
      // Send result to another topic using the producer
      await producer.send({
        topic: 'user.events.processed',
        messages: [{
          key: command.userId,
          value: JSON.stringify(result),
          headers: {
            'correlation-id': context.getMessage().headers['correlation-id'],
            'processed-at': Date.now().toString()
          }
        }]
      });
      
      return result;
    } catch (error) {
      // Manual offset management if needed
      const { topic, partition, offset } = context.getMessage();
      await consumer.commitOffsets([{
        topic,
        partition,
        offset: (parseInt(offset) + 1).toString()
      }]);
      
      throw error;
    }
  }
}

TCP Context

TCP-specific context providing access to the underlying socket connection and pattern information.

/**
 * TCP-specific context implementation
 */
class TcpContext extends BaseRpcContext {
  /**
   * Returns reference to the underlying JSON socket
   * @returns JSON socket instance for direct socket operations
   */
  getSocketRef(): any;
  
  /**
   * Returns the pattern name used for message routing
   * @returns Pattern identifier
   */
  getPattern(): string;
}

Usage Examples:

import { Controller, Logger } from '@nestjs/common';
import { MessagePattern, EventPattern, Payload, Ctx } from '@nestjs/microservices';
import { TcpContext } from '@nestjs/microservices';

@Controller()
export class TcpController {
  private readonly logger = new Logger(TcpController.name);

  @MessagePattern({ cmd: 'process_with_socket' })
  processWithSocket(
    @Payload() data: any,
    @Ctx() context: TcpContext
  ): any {
    const socket = context.getSocketRef();
    const pattern = context.getPattern();
    
    this.logger.log(`Processing command: ${JSON.stringify(pattern)}`);
    
    // Access socket properties
    const clientAddress = socket.socket.remoteAddress;
    const clientPort = socket.socket.remotePort;
    
    this.logger.log(`Request from client: ${clientAddress}:${clientPort}`);
    
    // Direct socket operations if needed
    socket.write(JSON.stringify({ 
      type: 'ack', 
      pattern: pattern,
      timestamp: Date.now() 
    }));
    
    return this.processData(data, { clientAddress, pattern });
  }

  @EventPattern('tcp_notification')
  handleTcpNotification(
    @Payload() notification: any,
    @Ctx() context: TcpContext
  ): void {
    const socket = context.getSocketRef();
    const pattern = context.getPattern();
    
    // Log connection details
    this.logger.log(`TCP notification: ${pattern} from ${socket.socket.remoteAddress}`);
    
    // Send immediate acknowledgment
    socket.write(JSON.stringify({ 
      type: 'notification_received',
      id: notification.id,
      timestamp: Date.now()
    }));
    
    this.processNotification(notification);
  }
}

NATS Context

NATS-specific context providing access to subject and message headers.

/**
 * NATS-specific context implementation
 */
class NatsContext extends BaseRpcContext {
  /**
   * Returns the NATS subject name
   * @returns Subject string used for message routing
   */
  getSubject(): string;
  
  /**
   * Returns message headers if available
   * @returns Message headers object
   */
  getHeaders(): Record<string, any>;
}

Usage Examples:

import { Controller, Logger } from '@nestjs/common';
import { MessagePattern, EventPattern, Payload, Ctx } from '@nestjs/microservices';
import { NatsContext } from '@nestjs/microservices';

@Controller()
export class NatsController {
  private readonly logger = new Logger(NatsController.name);

  @MessagePattern('user.*.get')
  getUserBySubject(
    @Payload() request: any,
    @Ctx() context: NatsContext
  ): any {
    const subject = context.getSubject();
    const headers = context.getHeaders();
    
    // Extract user type from subject pattern
    const userType = subject.split('.')[1]; // e.g., 'admin' from 'user.admin.get'
    
    this.logger.log(`Getting ${userType} user with ID: ${request.id}`);
    this.logger.debug(`Request headers:`, headers);
    
    return this.userService.getByType(userType, request.id, {
      requestId: headers?.['request-id'],
      userAgent: headers?.['user-agent']
    });
  }

  @EventPattern('events.*.created')
  handleEntityCreated(
    @Payload() entity: any,
    @Ctx() context: NatsContext
  ): void {
    const subject = context.getSubject();
    const headers = context.getHeaders();
    const entityType = subject.split('.')[1]; // Extract entity type from subject
    
    this.logger.log(`${entityType} created with ID: ${entity.id}`);
    
    // Use headers for tracking and correlation
    if (headers?.['correlation-id']) {
      this.trackingService.correlateEvent(headers['correlation-id'], entity);
    }
    
    this.processEntityCreation(entityType, entity);
  }
}

MQTT Context

MQTT-specific context providing access to topic and original MQTT packet information.

/**
 * MQTT-specific context implementation
 */
class MqttContext extends BaseRpcContext {
  /**
   * Returns the MQTT topic name
   * @returns Topic string used for message routing
   */
  getTopic(): string;
  
  /**
   * Returns the original MQTT packet
   * @returns MQTT packet with QoS, retain flag, etc.
   */
  getPacket(): any;
}

Usage Examples:

import { Controller, Logger } from '@nestjs/common';
import { EventPattern, Payload, Ctx } from '@nestjs/microservices';
import { MqttContext } from '@nestjs/microservices';

@Controller()
export class MqttController {
  private readonly logger = new Logger(MqttController.name);

  @EventPattern('sensors/+/temperature')
  handleTemperatureReading(
    @Payload() reading: any,
    @Ctx() context: MqttContext
  ): void {
    const topic = context.getTopic();
    const packet = context.getPacket();
    
    // Extract sensor ID from topic (e.g., 'sensors/sensor001/temperature')
    const sensorId = topic.split('/')[1];
    
    this.logger.log(`Temperature reading from sensor ${sensorId}: ${reading.value}°C`);
    this.logger.debug(`MQTT QoS: ${packet.qos}, Retain: ${packet.retain}`);
    
    // Handle retained messages differently
    if (packet.retain) {
      this.logger.log('Processing retained temperature message');
      this.sensorService.updateLastKnownValue(sensorId, reading);
    } else {
      this.sensorService.processRealtimeReading(sensorId, reading);
    }
  }

  @EventPattern('devices/+/status/#')
  handleDeviceStatus(
    @Payload() status: any,
    @Ctx() context: MqttContext
  ): void {
    const topic = context.getTopic();
    const packet = context.getPacket();
    
    const topicParts = topic.split('/');
    const deviceId = topicParts[1];
    const statusType = topicParts.slice(3).join('/'); // Everything after 'status/'
    
    this.logger.log(`Device ${deviceId} status update: ${statusType}`);
    
    // Use packet information for processing decisions
    const priority = packet.qos === 2 ? 'high' : 'normal';
    
    this.deviceService.updateStatus(deviceId, statusType, status, { priority });
  }
}

Redis Context

Redis-specific context providing access to channel information for pub/sub operations.

/**
 * Redis-specific context implementation
 */
class RedisContext extends BaseRpcContext {
  /**
   * Returns the Redis channel name
   * @returns Channel name used for pub/sub
   */
  getChannel(): string;
}

Usage Examples:

import { Controller, Logger } from '@nestjs/common';
import { MessagePattern, EventPattern, Payload, Ctx } from '@nestjs/microservices';
import { RedisContext } from '@nestjs/microservices';

@Controller()
export class RedisController {
  private readonly logger = new Logger(RedisController.name);

  @MessagePattern('cache:get:*')
  getCacheValue(
    @Payload() request: any,
    @Ctx() context: RedisContext
  ): any {
    const channel = context.getChannel();
    
    // Extract cache key from channel pattern
    const cacheKey = channel.replace('cache:get:', '');
    
    this.logger.log(`Cache request for key: ${cacheKey}`);
    
    return this.cacheService.get(cacheKey, request.options);
  }

  @EventPattern('notifications:*')
  handleNotification(
    @Payload() notification: any,
    @Ctx() context: RedisContext
  ): void {
    const channel = context.getChannel();
    const notificationType = channel.split(':')[1];
    
    this.logger.log(`Notification received on channel: ${channel}`);
    
    switch (notificationType) {
      case 'user':
        this.userNotificationService.handle(notification);
        break;
      case 'system':
        this.systemNotificationService.handle(notification);
        break;
      default:
        this.logger.warn(`Unknown notification type: ${notificationType}`);
    }
  }
}

RabbitMQ Context

RabbitMQ-specific context providing access to message, channel reference, and pattern information.

/**
 * RabbitMQ-specific context implementation
 */
class RmqContext extends BaseRpcContext {
  /**
   * Returns the original RabbitMQ message
   * @returns RabbitMQ message with properties and fields
   */
  getMessage(): any;
  
  /**
   * Returns reference to the RabbitMQ channel
   * @returns Channel instance for manual operations
   */
  getChannelRef(): any;
  
  /**
   * Returns the pattern name used for routing
   * @returns Pattern/routing key string
   */
  getPattern(): string;
}

Usage Examples:

import { Controller, Logger } from '@nestjs/common';
import { MessagePattern, EventPattern, Payload, Ctx } from '@nestjs/microservices';
import { RmqContext } from '@nestjs/microservices';

@Controller()
export class RmqController {
  private readonly logger = new Logger(RmqController.name);

  @MessagePattern('user.commands.process')
  async processUserCommand(
    @Payload() command: any,
    @Ctx() context: RmqContext
  ): Promise<any> {
    const message = context.getMessage();
    const channel = context.getChannelRef();
    const pattern = context.getPattern();
    
    this.logger.log(`Processing command with routing key: ${pattern}`);
    this.logger.debug(`Message properties:`, message.properties);
    
    try {
      const result = await this.commandService.process(command);
      
      // Manual acknowledgment after successful processing
      channel.ack(message);
      
      // Publish result to another exchange
      await channel.publish(
        'results_exchange',
        'user.results.processed',
        Buffer.from(JSON.stringify(result)),
        {
          correlationId: message.properties.correlationId,
          timestamp: Date.now(),
          headers: {
            'processed-by': 'user-service',
            'original-routing-key': pattern
          }
        }
      );
      
      return result;
    } catch (error) {
      this.logger.error(`Command processing failed:`, error);
      
      // Reject message and potentially requeue
      const requeue = error.retryable === true;
      channel.nack(message, false, requeue);
      
      throw error;
    }
  }

  @EventPattern('user.events.*')
  handleUserEvent(
    @Payload() event: any,
    @Ctx() context: RmqContext
  ): void {
    const message = context.getMessage();
    const channel = context.getChannelRef();
    const pattern = context.getPattern();
    
    const eventType = pattern.split('.').pop(); // Extract event type
    
    this.logger.log(`Handling user event: ${eventType}`);
    
    try {
      // Process event based on type
      this.eventService.handle(eventType, event);
      
      // Manual acknowledgment
      channel.ack(message);
      
      // Publish follow-up events if needed
      if (eventType === 'created') {
        channel.publish(
          'notifications_exchange',
          'notifications.welcome',
          Buffer.from(JSON.stringify({ userId: event.id })),
          { correlationId: message.properties.correlationId }
        );
      }
    } catch (error) {
      this.logger.error(`Event handling failed:`, error);
      channel.nack(message, false, false); // Don't requeue events
    }
  }
}

docs

client-proxies.md

context-objects.md

exceptions.md

grpc.md

index.md

message-patterns.md

modules.md

transports.md

tile.json