Nest microservices framework providing scalable distributed systems with multiple transport layers and communication patterns.
npx @tessl/cli install tessl/npm-nestjs--microservices@11.1.0NestJS Microservices is a comprehensive framework for building scalable distributed systems using various transport layers and communication patterns. It provides seamless integration with multiple messaging protocols including Redis, RabbitMQ, MQTT, NATS, gRPC, and Kafka, offering both request-response and event-driven communication models with automatic message serialization, pattern-based routing, and built-in error handling.
npm install @nestjs/microservices@nestjs/common, @nestjs/core, rxjs, reflect-metadataimport {
NestFactory,
ClientProxy,
Transport,
MessagePattern,
EventPattern,
Payload,
Ctx
} from '@nestjs/microservices';For CommonJS:
const {
NestFactory,
ClientProxy,
Transport,
MessagePattern,
EventPattern
} = require('@nestjs/microservices');import { NestFactory } from '@nestjs/core';
import { Transport, MicroserviceOptions } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.TCP,
options: {
host: '127.0.0.1',
port: 3001,
},
},
);
await app.listen();
}
bootstrap();import { Controller } from '@nestjs/common';
import { MessagePattern, EventPattern, Payload, Ctx } from '@nestjs/microservices';
@Controller()
export class AppController {
@MessagePattern({ cmd: 'sum' })
accumulate(@Payload() data: number[]): number {
return (data || []).reduce((a, b) => a + b, 0);
}
@EventPattern('user_created')
handleUserCreated(@Payload() data: Record<string, unknown>) {
console.log('User created:', data);
}
}import { Injectable } from '@nestjs/common';
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';
@Injectable()
export class AppService {
private client: ClientProxy;
constructor() {
this.client = ClientProxyFactory.create({
transport: Transport.TCP,
options: {
host: '127.0.0.1',
port: 3001,
},
});
}
async getSum(data: number[]): Promise<number> {
return this.client.send({ cmd: 'sum' }, data).toPromise();
}
emitEvent(data: any): void {
this.client.emit('user_created', data);
}
}NestJS Microservices is built around several key components:
Core microservice application class providing lifecycle management, middleware registration, and server initialization for distributed applications.
class NestMicroservice extends NestApplicationContext implements INestMicroservice {
readonly status: Observable<any>;
createServer(config: MicroserviceOptions | AsyncMicroserviceOptions): void;
registerModules(): Promise<any>;
registerListeners(): void;
useWebSocketAdapter(adapter: WebSocketAdapter): this;
useGlobalFilters(...filters: ExceptionFilter[]): this;
useGlobalPipes(...pipes: PipeTransform<any>[]): this;
useGlobalInterceptors(...interceptors: NestInterceptor[]): this;
useGlobalGuards(...guards: CanActivate[]): this;
init(): Promise<this>;
listen(): Promise<any>;
close(): Promise<any>;
setIsInitialized(isInitialized: boolean): void;
setIsTerminated(isTerminated: boolean): void;
setIsInitHookCalled(isInitHookCalled: boolean): void;
on(event: string | number | symbol, callback: Function): any;
unwrap<T>(): T;
}Abstract server base class and factory for creating transport-specific server implementations with message handling and lifecycle management.
abstract class Server<EventsMap = Record<string, Function>, Status = string> {
readonly status: Observable<Status>;
transportId?: Transport | symbol;
abstract on<EventKey, EventCallback>(event: EventKey, callback: EventCallback): any;
abstract unwrap<T>(): T;
abstract listen(callback: (...optionalParams: unknown[]) => any): any;
abstract close(): any;
setTransportId(transportId: Transport | symbol): void;
setOnProcessingStartHook(hook: Function): void;
setOnProcessingEndHook(hook: Function): void;
addHandler(pattern: any, callback: MessageHandler, isEventHandler?: boolean, extras?: Record<string, any>): void;
getHandlers(): Map<string, MessageHandler>;
getHandlerByPattern(pattern: string): MessageHandler | null;
send(stream$: Observable<any>, respond: (data: WritePacket) => Promise<unknown> | void): Subscription;
handleEvent(pattern: string, packet: ReadPacket, context: BaseRpcContext): Promise<any>;
}
class ServerFactory {
static create(microserviceOptions: MicroserviceOptions): Server;
}Message record builders for constructing transport-specific message wrappers with metadata and options for enhanced message handling.
class RmqRecord<TData = any> {
constructor(data: TData, options?: RmqRecordOptions);
readonly data: TData;
options?: RmqRecordOptions;
}
class RmqRecordBuilder<TData> {
constructor(data?: TData);
setOptions(options: RmqRecordOptions): this;
setData(data: TData): this;
build(): RmqRecord;
}
class MqttRecord<TData = any> {
constructor(data: TData, options?: MqttRecordOptions);
readonly data: TData;
options?: MqttRecordOptions;
}
class MqttRecordBuilder<TData> {
constructor(data?: TData);
setData(data: TData): this;
setQoS(qos: 0 | 1 | 2): this;
setRetain(retain: boolean): this;
setDup(dup: boolean): this;
setProperties(properties: Record<string, any>): this;
build(): MqttRecord;
}
class NatsRecord<TData = any, THeaders = any> {
constructor(data: TData, headers?: THeaders);
readonly data: TData;
readonly headers?: THeaders;
}
class NatsRecordBuilder<TData> {
constructor(data?: TData);
setHeaders<THeaders = any>(headers: THeaders): this;
setData(data: TData): this;
build(): NatsRecord;
}Client-side communication interface for sending messages and events to microservices with support for multiple transport protocols.
abstract class ClientProxy<EventsMap extends Record<never, Function> = Record<never, Function>, Status extends string = string> {
connect(): Promise<any>;
close(): any;
send<TResult = any, TInput = any>(
pattern: any,
data: TInput
): Observable<TResult>;
emit<TResult = any, TInput = any>(
pattern: any,
data: TInput
): Observable<TResult>;
}
class ClientProxyFactory {
static create(clientOptions: ClientOptions): ClientProxy;
}Support for multiple messaging protocols including TCP, Redis, NATS, MQTT, gRPC, RabbitMQ, and Kafka with protocol-specific features and configurations.
enum Transport {
TCP = 0,
REDIS = 1,
NATS = 2,
MQTT = 3,
GRPC = 4,
RMQ = 5,
KAFKA = 6
}
interface MicroserviceOptions {
transport?: Transport | CustomTransportStrategy;
options?: any;
}Decorators and patterns for defining message handlers with request-response and event-driven communication models.
function MessagePattern<T = string>(
metadata?: T,
transport?: Transport | symbol,
extras?: Record<string, any>
): MethodDecorator;
function EventPattern<T = string>(
metadata?: T,
transport?: Transport | symbol,
extras?: Record<string, any>
): MethodDecorator;
function Payload(property?: string, ...pipes: PipeTransform[]): ParameterDecorator;
function Ctx(): ParameterDecorator;Transport-specific context objects providing access to underlying protocol details and metadata for message handling.
abstract class BaseRpcContext<T = any[]> {
getArgs(): T;
getArgByIndex<T = any>(index: number): T;
}
class KafkaContext extends BaseRpcContext {
getMessage(): any;
getPartition(): number;
getTopic(): string;
getConsumer(): any;
getProducer(): any;
}Specialized support for gRPC protocol with service definitions, streaming methods, and protocol buffer integration.
function GrpcMethod(service?: string, method?: string): MethodDecorator;
function GrpcStreamMethod(service?: string, method?: string): MethodDecorator;
function GrpcStreamCall(service?: string, method?: string): MethodDecorator;
interface ClientGrpc {
getService<T = any>(name: string): T;
getClientByServiceName<T = any>(name: string): T;
}Microservice-specific exception classes and error handling patterns for distributed system error management.
class RpcException extends Error {
constructor(error: string | object);
getError(): string | object;
}
class KafkaRetriableException extends RpcException {
constructor(error: string | object);
}NestJS module integration for dependency injection and configuration management of microservice clients and servers.
class ClientsModule {
static register(options: ClientsModuleOptions): DynamicModule;
static registerAsync(options: ClientsModuleAsyncOptions): DynamicModule;
}
function Client(metadata?: ClientOptions): PropertyDecorator;interface ClientOptions {
transport: Transport | CustomTransportStrategy;
options?: any;
}
interface CustomTransportStrategy {
server: any;
client: any;
}
interface ReadPacket<T = any> {
pattern: any;
data: T;
}
interface WritePacket<T = any> {
err?: any;
response?: T;
isDisposed?: boolean;
status?: string;
}
interface PacketId {
id: string;
}
type OutgoingRequest = ReadPacket & PacketId;
type IncomingRequest = ReadPacket & PacketId;
type OutgoingEvent = ReadPacket;
type IncomingEvent = ReadPacket;
type IncomingResponse = WritePacket & PacketId;
type OutgoingResponse = WritePacket & PacketId;
interface RmqRecordOptions {
expiration?: string | number;
userId?: string;
CC?: string | string[];
mandatory?: boolean;
persistent?: boolean;
deliveryMode?: boolean | number;
BCC?: string | string[];
contentType?: string;
contentEncoding?: string;
headers?: Record<string, string>;
priority?: number;
messageId?: string;
timestamp?: number;
type?: string;
appId?: string;
}
interface MqttRecordOptions {
qos?: 0 | 1 | 2;
retain?: boolean;
dup?: boolean;
properties?: {
payloadFormatIndicator?: boolean;
messageExpiryInterval?: number;
topicAlias?: number;
responseTopic?: string;
correlationData?: Buffer;
userProperties?: Record<string, string | string[]>;
subscriptionIdentifier?: number;
contentType?: string;
};
}