or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

client-proxies.mdcontext-objects.mdexceptions.mdgrpc.mdindex.mdmessage-patterns.mdmodules.mdtransports.md
tile.json

tessl/npm-nestjs--microservices

Nest microservices framework providing scalable distributed systems with multiple transport layers and communication patterns.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/@nestjs/microservices@11.1.x

To install, run

npx @tessl/cli install tessl/npm-nestjs--microservices@11.1.0

index.mddocs/

NestJS Microservices

NestJS Microservices is a comprehensive framework for building scalable distributed systems using various transport layers and communication patterns. It provides seamless integration with multiple messaging protocols including Redis, RabbitMQ, MQTT, NATS, gRPC, and Kafka, offering both request-response and event-driven communication models with automatic message serialization, pattern-based routing, and built-in error handling.

Package Information

  • Package Name: @nestjs/microservices
  • Package Type: npm
  • Language: TypeScript
  • Installation: npm install @nestjs/microservices
  • Peer Dependencies: @nestjs/common, @nestjs/core, rxjs, reflect-metadata

Core Imports

import { 
  NestFactory,
  ClientProxy,
  Transport,
  MessagePattern,
  EventPattern,
  Payload,
  Ctx
} from '@nestjs/microservices';

For CommonJS:

const { 
  NestFactory,
  ClientProxy,
  Transport,
  MessagePattern,
  EventPattern 
} = require('@nestjs/microservices');

Basic Usage

Creating a Microservice

import { NestFactory } from '@nestjs/core';
import { Transport, MicroserviceOptions } from '@nestjs/microservices';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.TCP,
      options: {
        host: '127.0.0.1',
        port: 3001,
      },
    },
  );
  
  await app.listen();
}
bootstrap();

Message Handlers

import { Controller } from '@nestjs/common';
import { MessagePattern, EventPattern, Payload, Ctx } from '@nestjs/microservices';

@Controller()
export class AppController {
  @MessagePattern({ cmd: 'sum' })
  accumulate(@Payload() data: number[]): number {
    return (data || []).reduce((a, b) => a + b, 0);
  }

  @EventPattern('user_created')
  handleUserCreated(@Payload() data: Record<string, unknown>) {
    console.log('User created:', data);
  }
}

Client Usage

import { Injectable } from '@nestjs/common';
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';

@Injectable()
export class AppService {
  private client: ClientProxy;

  constructor() {
    this.client = ClientProxyFactory.create({
      transport: Transport.TCP,
      options: {
        host: '127.0.0.1',
        port: 3001,
      },
    });
  }

  async getSum(data: number[]): Promise<number> {
    return this.client.send({ cmd: 'sum' }, data).toPromise();
  }

  emitEvent(data: any): void {
    this.client.emit('user_created', data);
  }
}

Architecture

NestJS Microservices is built around several key components:

  • Transport Layer: Abstracted communication layer supporting multiple protocols (TCP, Redis, NATS, MQTT, gRPC, RabbitMQ, Kafka)
  • Client-Server Pattern: Separate client and server implementations for each transport with connection management
  • Context System: Transport-specific context objects providing access to underlying protocol details
  • Decorator System: Method and parameter decorators for message handling and payload extraction
  • Observable-Based: Built on RxJS observables for reactive programming patterns
  • Pattern Matching: Flexible pattern-based message routing with metadata support

Capabilities

Microservice Application

Core microservice application class providing lifecycle management, middleware registration, and server initialization for distributed applications.

class NestMicroservice extends NestApplicationContext implements INestMicroservice {
  readonly status: Observable<any>;
  
  createServer(config: MicroserviceOptions | AsyncMicroserviceOptions): void;
  registerModules(): Promise<any>;
  registerListeners(): void;
  useWebSocketAdapter(adapter: WebSocketAdapter): this;
  useGlobalFilters(...filters: ExceptionFilter[]): this;
  useGlobalPipes(...pipes: PipeTransform<any>[]): this;
  useGlobalInterceptors(...interceptors: NestInterceptor[]): this;
  useGlobalGuards(...guards: CanActivate[]): this;
  init(): Promise<this>;
  listen(): Promise<any>;
  close(): Promise<any>;
  setIsInitialized(isInitialized: boolean): void;
  setIsTerminated(isTerminated: boolean): void;
  setIsInitHookCalled(isInitHookCalled: boolean): void;
  on(event: string | number | symbol, callback: Function): any;
  unwrap<T>(): T;
}

Server Infrastructure

Abstract server base class and factory for creating transport-specific server implementations with message handling and lifecycle management.

abstract class Server<EventsMap = Record<string, Function>, Status = string> {
  readonly status: Observable<Status>;
  transportId?: Transport | symbol;
  
  abstract on<EventKey, EventCallback>(event: EventKey, callback: EventCallback): any;
  abstract unwrap<T>(): T;
  abstract listen(callback: (...optionalParams: unknown[]) => any): any;
  abstract close(): any;
  setTransportId(transportId: Transport | symbol): void;
  setOnProcessingStartHook(hook: Function): void;
  setOnProcessingEndHook(hook: Function): void;
  addHandler(pattern: any, callback: MessageHandler, isEventHandler?: boolean, extras?: Record<string, any>): void;
  getHandlers(): Map<string, MessageHandler>;
  getHandlerByPattern(pattern: string): MessageHandler | null;
  send(stream$: Observable<any>, respond: (data: WritePacket) => Promise<unknown> | void): Subscription;
  handleEvent(pattern: string, packet: ReadPacket, context: BaseRpcContext): Promise<any>;
}

class ServerFactory {
  static create(microserviceOptions: MicroserviceOptions): Server;
}

Record Builders

Message record builders for constructing transport-specific message wrappers with metadata and options for enhanced message handling.

class RmqRecord<TData = any> {
  constructor(data: TData, options?: RmqRecordOptions);
  readonly data: TData;
  options?: RmqRecordOptions;
}

class RmqRecordBuilder<TData> {
  constructor(data?: TData);
  setOptions(options: RmqRecordOptions): this;
  setData(data: TData): this;
  build(): RmqRecord;
}

class MqttRecord<TData = any> {
  constructor(data: TData, options?: MqttRecordOptions);
  readonly data: TData;
  options?: MqttRecordOptions;
}

class MqttRecordBuilder<TData> {
  constructor(data?: TData);
  setData(data: TData): this;
  setQoS(qos: 0 | 1 | 2): this;
  setRetain(retain: boolean): this;
  setDup(dup: boolean): this;
  setProperties(properties: Record<string, any>): this;
  build(): MqttRecord;
}

class NatsRecord<TData = any, THeaders = any> {
  constructor(data: TData, headers?: THeaders);
  readonly data: TData;
  readonly headers?: THeaders;
}

class NatsRecordBuilder<TData> {
  constructor(data?: TData);
  setHeaders<THeaders = any>(headers: THeaders): this;
  setData(data: TData): this;
  build(): NatsRecord;
}

Client Proxies

Client-side communication interface for sending messages and events to microservices with support for multiple transport protocols.

abstract class ClientProxy<EventsMap extends Record<never, Function> = Record<never, Function>, Status extends string = string> {
  connect(): Promise<any>;
  close(): any;
  send<TResult = any, TInput = any>(
    pattern: any,
    data: TInput
  ): Observable<TResult>;
  emit<TResult = any, TInput = any>(
    pattern: any,
    data: TInput
  ): Observable<TResult>;
}

class ClientProxyFactory {
  static create(clientOptions: ClientOptions): ClientProxy;
}

Client Proxies

Transport Implementations

Support for multiple messaging protocols including TCP, Redis, NATS, MQTT, gRPC, RabbitMQ, and Kafka with protocol-specific features and configurations.

enum Transport {
  TCP = 0,
  REDIS = 1,
  NATS = 2,
  MQTT = 3,
  GRPC = 4,
  RMQ = 5,
  KAFKA = 6
}

interface MicroserviceOptions {
  transport?: Transport | CustomTransportStrategy;
  options?: any;
}

Transport Implementations

Message Patterns & Event Handling

Decorators and patterns for defining message handlers with request-response and event-driven communication models.

function MessagePattern<T = string>(
  metadata?: T,
  transport?: Transport | symbol,
  extras?: Record<string, any>
): MethodDecorator;

function EventPattern<T = string>(
  metadata?: T,
  transport?: Transport | symbol,
  extras?: Record<string, any>
): MethodDecorator;

function Payload(property?: string, ...pipes: PipeTransform[]): ParameterDecorator;
function Ctx(): ParameterDecorator;

Message Patterns & Events

Context Objects

Transport-specific context objects providing access to underlying protocol details and metadata for message handling.

abstract class BaseRpcContext<T = any[]> {
  getArgs(): T;
  getArgByIndex<T = any>(index: number): T;
}

class KafkaContext extends BaseRpcContext {
  getMessage(): any;
  getPartition(): number;
  getTopic(): string;
  getConsumer(): any;
  getProducer(): any;
}

Context Objects

gRPC Integration

Specialized support for gRPC protocol with service definitions, streaming methods, and protocol buffer integration.

function GrpcMethod(service?: string, method?: string): MethodDecorator;
function GrpcStreamMethod(service?: string, method?: string): MethodDecorator;
function GrpcStreamCall(service?: string, method?: string): MethodDecorator;

interface ClientGrpc {
  getService<T = any>(name: string): T;
  getClientByServiceName<T = any>(name: string): T;
}

gRPC Integration

Exception Handling

Microservice-specific exception classes and error handling patterns for distributed system error management.

class RpcException extends Error {
  constructor(error: string | object);
  getError(): string | object;
}

class KafkaRetriableException extends RpcException {
  constructor(error: string | object);
}

Exception Handling

Module Integration

NestJS module integration for dependency injection and configuration management of microservice clients and servers.

class ClientsModule {
  static register(options: ClientsModuleOptions): DynamicModule;
  static registerAsync(options: ClientsModuleAsyncOptions): DynamicModule;
}

function Client(metadata?: ClientOptions): PropertyDecorator;

Module Integration

Types

interface ClientOptions {
  transport: Transport | CustomTransportStrategy;
  options?: any;
}

interface CustomTransportStrategy {
  server: any;
  client: any;
}

interface ReadPacket<T = any> {
  pattern: any;
  data: T;
}

interface WritePacket<T = any> {
  err?: any;
  response?: T;
  isDisposed?: boolean;
  status?: string;
}

interface PacketId {
  id: string;
}

type OutgoingRequest = ReadPacket & PacketId;
type IncomingRequest = ReadPacket & PacketId;
type OutgoingEvent = ReadPacket;
type IncomingEvent = ReadPacket;
type IncomingResponse = WritePacket & PacketId;
type OutgoingResponse = WritePacket & PacketId;

interface RmqRecordOptions {
  expiration?: string | number;
  userId?: string;
  CC?: string | string[];
  mandatory?: boolean;
  persistent?: boolean;
  deliveryMode?: boolean | number;
  BCC?: string | string[];
  contentType?: string;
  contentEncoding?: string;
  headers?: Record<string, string>;
  priority?: number;
  messageId?: string;
  timestamp?: number;
  type?: string;
  appId?: string;
}

interface MqttRecordOptions {
  qos?: 0 | 1 | 2;
  retain?: boolean;
  dup?: boolean;
  properties?: {
    payloadFormatIndicator?: boolean;
    messageExpiryInterval?: number;
    topicAlias?: number;
    responseTopic?: string;
    correlationData?: Buffer;
    userProperties?: Record<string, string | string[]>;
    subscriptionIdentifier?: number;
    contentType?: string;
  };
}