Nest microservices framework providing scalable distributed systems with multiple transport layers and communication patterns.
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Transport-specific context objects providing access to underlying protocol details and metadata for message handling, enabling fine-grained control over microservice communication and transport-specific operations.
Abstract base class for all RPC context implementations providing common context operations.
/**
* Base class for all RPC context implementations
*/
abstract class BaseRpcContext<T = any[]> {
/**
* Returns the arguments array from the context
* @returns Arguments array with type information
*/
getArgs(): T;
/**
* Returns specific argument by index position
* @param index - Zero-based index of the argument
* @returns Argument value at the specified index
*/
getArgByIndex<T = any>(index: number): T;
}Usage Examples:
import { Controller } from '@nestjs/common';
import { MessagePattern, Ctx } from '@nestjs/microservices';
import { BaseRpcContext } from '@nestjs/microservices';
@Controller()
export class BaseContextController {
@MessagePattern('process_args')
processWithArgs(@Ctx() context: BaseRpcContext): any {
const allArgs = context.getArgs();
const firstArg = context.getArgByIndex(0);
const secondArg = context.getArgByIndex(1);
return {
argCount: allArgs.length,
first: firstArg,
second: secondArg
};
}
}Kafka-specific context providing access to Kafka message metadata, consumer, and producer instances.
/**
* Kafka-specific context implementation
*/
class KafkaContext extends BaseRpcContext {
/**
* Returns the original Kafka message object
* @returns Kafka message with headers, key, value, etc.
*/
getMessage(): any;
/**
* Returns the partition number for the message
* @returns Partition number
*/
getPartition(): number;
/**
* Returns the topic name for the message
* @returns Topic name string
*/
getTopic(): string;
/**
* Returns reference to the Kafka consumer instance
* @returns Kafka consumer for manual operations
*/
getConsumer(): any;
/**
* Returns the heartbeat callback function
* @returns Heartbeat function for consumer group coordination
*/
getHeartbeat(): () => Promise<void>;
/**
* Returns reference to the Kafka producer instance
* @returns Kafka producer for sending messages
*/
getProducer(): any;
}Usage Examples:
import { Controller, Logger } from '@nestjs/common';
import { EventPattern, MessagePattern, Payload, Ctx } from '@nestjs/microservices';
import { KafkaContext } from '@nestjs/microservices';
@Controller()
export class KafkaController {
private readonly logger = new Logger(KafkaController.name);
@EventPattern('user.events.created')
async handleUserCreated(
@Payload() userData: any,
@Ctx() context: KafkaContext
): Promise<void> {
const message = context.getMessage();
const topic = context.getTopic();
const partition = context.getPartition();
this.logger.log(`Processing user creation from topic: ${topic}, partition: ${partition}`);
this.logger.debug(`Message headers:`, message.headers);
this.logger.debug(`Message key:`, message.key?.toString());
// Manual heartbeat to prevent session timeout for long processing
const heartbeat = context.getHeartbeat();
await heartbeat();
await this.processUserCreation(userData);
}
@MessagePattern('user.commands.process')
async processUserCommand(
@Payload() command: any,
@Ctx() context: KafkaContext
): Promise<any> {
const producer = context.getProducer();
const consumer = context.getConsumer();
try {
const result = await this.executeCommand(command);
// Send result to another topic using the producer
await producer.send({
topic: 'user.events.processed',
messages: [{
key: command.userId,
value: JSON.stringify(result),
headers: {
'correlation-id': context.getMessage().headers['correlation-id'],
'processed-at': Date.now().toString()
}
}]
});
return result;
} catch (error) {
// Manual offset management if needed
const { topic, partition, offset } = context.getMessage();
await consumer.commitOffsets([{
topic,
partition,
offset: (parseInt(offset) + 1).toString()
}]);
throw error;
}
}
}TCP-specific context providing access to the underlying socket connection and pattern information.
/**
* TCP-specific context implementation
*/
class TcpContext extends BaseRpcContext {
/**
* Returns reference to the underlying JSON socket
* @returns JSON socket instance for direct socket operations
*/
getSocketRef(): any;
/**
* Returns the pattern name used for message routing
* @returns Pattern identifier
*/
getPattern(): string;
}Usage Examples:
import { Controller, Logger } from '@nestjs/common';
import { MessagePattern, EventPattern, Payload, Ctx } from '@nestjs/microservices';
import { TcpContext } from '@nestjs/microservices';
@Controller()
export class TcpController {
private readonly logger = new Logger(TcpController.name);
@MessagePattern({ cmd: 'process_with_socket' })
processWithSocket(
@Payload() data: any,
@Ctx() context: TcpContext
): any {
const socket = context.getSocketRef();
const pattern = context.getPattern();
this.logger.log(`Processing command: ${JSON.stringify(pattern)}`);
// Access socket properties
const clientAddress = socket.socket.remoteAddress;
const clientPort = socket.socket.remotePort;
this.logger.log(`Request from client: ${clientAddress}:${clientPort}`);
// Direct socket operations if needed
socket.write(JSON.stringify({
type: 'ack',
pattern: pattern,
timestamp: Date.now()
}));
return this.processData(data, { clientAddress, pattern });
}
@EventPattern('tcp_notification')
handleTcpNotification(
@Payload() notification: any,
@Ctx() context: TcpContext
): void {
const socket = context.getSocketRef();
const pattern = context.getPattern();
// Log connection details
this.logger.log(`TCP notification: ${pattern} from ${socket.socket.remoteAddress}`);
// Send immediate acknowledgment
socket.write(JSON.stringify({
type: 'notification_received',
id: notification.id,
timestamp: Date.now()
}));
this.processNotification(notification);
}
}NATS-specific context providing access to subject and message headers.
/**
* NATS-specific context implementation
*/
class NatsContext extends BaseRpcContext {
/**
* Returns the NATS subject name
* @returns Subject string used for message routing
*/
getSubject(): string;
/**
* Returns message headers if available
* @returns Message headers object
*/
getHeaders(): Record<string, any>;
}Usage Examples:
import { Controller, Logger } from '@nestjs/common';
import { MessagePattern, EventPattern, Payload, Ctx } from '@nestjs/microservices';
import { NatsContext } from '@nestjs/microservices';
@Controller()
export class NatsController {
private readonly logger = new Logger(NatsController.name);
@MessagePattern('user.*.get')
getUserBySubject(
@Payload() request: any,
@Ctx() context: NatsContext
): any {
const subject = context.getSubject();
const headers = context.getHeaders();
// Extract user type from subject pattern
const userType = subject.split('.')[1]; // e.g., 'admin' from 'user.admin.get'
this.logger.log(`Getting ${userType} user with ID: ${request.id}`);
this.logger.debug(`Request headers:`, headers);
return this.userService.getByType(userType, request.id, {
requestId: headers?.['request-id'],
userAgent: headers?.['user-agent']
});
}
@EventPattern('events.*.created')
handleEntityCreated(
@Payload() entity: any,
@Ctx() context: NatsContext
): void {
const subject = context.getSubject();
const headers = context.getHeaders();
const entityType = subject.split('.')[1]; // Extract entity type from subject
this.logger.log(`${entityType} created with ID: ${entity.id}`);
// Use headers for tracking and correlation
if (headers?.['correlation-id']) {
this.trackingService.correlateEvent(headers['correlation-id'], entity);
}
this.processEntityCreation(entityType, entity);
}
}MQTT-specific context providing access to topic and original MQTT packet information.
/**
* MQTT-specific context implementation
*/
class MqttContext extends BaseRpcContext {
/**
* Returns the MQTT topic name
* @returns Topic string used for message routing
*/
getTopic(): string;
/**
* Returns the original MQTT packet
* @returns MQTT packet with QoS, retain flag, etc.
*/
getPacket(): any;
}Usage Examples:
import { Controller, Logger } from '@nestjs/common';
import { EventPattern, Payload, Ctx } from '@nestjs/microservices';
import { MqttContext } from '@nestjs/microservices';
@Controller()
export class MqttController {
private readonly logger = new Logger(MqttController.name);
@EventPattern('sensors/+/temperature')
handleTemperatureReading(
@Payload() reading: any,
@Ctx() context: MqttContext
): void {
const topic = context.getTopic();
const packet = context.getPacket();
// Extract sensor ID from topic (e.g., 'sensors/sensor001/temperature')
const sensorId = topic.split('/')[1];
this.logger.log(`Temperature reading from sensor ${sensorId}: ${reading.value}°C`);
this.logger.debug(`MQTT QoS: ${packet.qos}, Retain: ${packet.retain}`);
// Handle retained messages differently
if (packet.retain) {
this.logger.log('Processing retained temperature message');
this.sensorService.updateLastKnownValue(sensorId, reading);
} else {
this.sensorService.processRealtimeReading(sensorId, reading);
}
}
@EventPattern('devices/+/status/#')
handleDeviceStatus(
@Payload() status: any,
@Ctx() context: MqttContext
): void {
const topic = context.getTopic();
const packet = context.getPacket();
const topicParts = topic.split('/');
const deviceId = topicParts[1];
const statusType = topicParts.slice(3).join('/'); // Everything after 'status/'
this.logger.log(`Device ${deviceId} status update: ${statusType}`);
// Use packet information for processing decisions
const priority = packet.qos === 2 ? 'high' : 'normal';
this.deviceService.updateStatus(deviceId, statusType, status, { priority });
}
}Redis-specific context providing access to channel information for pub/sub operations.
/**
* Redis-specific context implementation
*/
class RedisContext extends BaseRpcContext {
/**
* Returns the Redis channel name
* @returns Channel name used for pub/sub
*/
getChannel(): string;
}Usage Examples:
import { Controller, Logger } from '@nestjs/common';
import { MessagePattern, EventPattern, Payload, Ctx } from '@nestjs/microservices';
import { RedisContext } from '@nestjs/microservices';
@Controller()
export class RedisController {
private readonly logger = new Logger(RedisController.name);
@MessagePattern('cache:get:*')
getCacheValue(
@Payload() request: any,
@Ctx() context: RedisContext
): any {
const channel = context.getChannel();
// Extract cache key from channel pattern
const cacheKey = channel.replace('cache:get:', '');
this.logger.log(`Cache request for key: ${cacheKey}`);
return this.cacheService.get(cacheKey, request.options);
}
@EventPattern('notifications:*')
handleNotification(
@Payload() notification: any,
@Ctx() context: RedisContext
): void {
const channel = context.getChannel();
const notificationType = channel.split(':')[1];
this.logger.log(`Notification received on channel: ${channel}`);
switch (notificationType) {
case 'user':
this.userNotificationService.handle(notification);
break;
case 'system':
this.systemNotificationService.handle(notification);
break;
default:
this.logger.warn(`Unknown notification type: ${notificationType}`);
}
}
}RabbitMQ-specific context providing access to message, channel reference, and pattern information.
/**
* RabbitMQ-specific context implementation
*/
class RmqContext extends BaseRpcContext {
/**
* Returns the original RabbitMQ message
* @returns RabbitMQ message with properties and fields
*/
getMessage(): any;
/**
* Returns reference to the RabbitMQ channel
* @returns Channel instance for manual operations
*/
getChannelRef(): any;
/**
* Returns the pattern name used for routing
* @returns Pattern/routing key string
*/
getPattern(): string;
}Usage Examples:
import { Controller, Logger } from '@nestjs/common';
import { MessagePattern, EventPattern, Payload, Ctx } from '@nestjs/microservices';
import { RmqContext } from '@nestjs/microservices';
@Controller()
export class RmqController {
private readonly logger = new Logger(RmqController.name);
@MessagePattern('user.commands.process')
async processUserCommand(
@Payload() command: any,
@Ctx() context: RmqContext
): Promise<any> {
const message = context.getMessage();
const channel = context.getChannelRef();
const pattern = context.getPattern();
this.logger.log(`Processing command with routing key: ${pattern}`);
this.logger.debug(`Message properties:`, message.properties);
try {
const result = await this.commandService.process(command);
// Manual acknowledgment after successful processing
channel.ack(message);
// Publish result to another exchange
await channel.publish(
'results_exchange',
'user.results.processed',
Buffer.from(JSON.stringify(result)),
{
correlationId: message.properties.correlationId,
timestamp: Date.now(),
headers: {
'processed-by': 'user-service',
'original-routing-key': pattern
}
}
);
return result;
} catch (error) {
this.logger.error(`Command processing failed:`, error);
// Reject message and potentially requeue
const requeue = error.retryable === true;
channel.nack(message, false, requeue);
throw error;
}
}
@EventPattern('user.events.*')
handleUserEvent(
@Payload() event: any,
@Ctx() context: RmqContext
): void {
const message = context.getMessage();
const channel = context.getChannelRef();
const pattern = context.getPattern();
const eventType = pattern.split('.').pop(); // Extract event type
this.logger.log(`Handling user event: ${eventType}`);
try {
// Process event based on type
this.eventService.handle(eventType, event);
// Manual acknowledgment
channel.ack(message);
// Publish follow-up events if needed
if (eventType === 'created') {
channel.publish(
'notifications_exchange',
'notifications.welcome',
Buffer.from(JSON.stringify({ userId: event.id })),
{ correlationId: message.properties.correlationId }
);
}
} catch (error) {
this.logger.error(`Event handling failed:`, error);
channel.nack(message, false, false); // Don't requeue events
}
}
}