Nest microservices framework providing scalable distributed systems with multiple transport layers and communication patterns.
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Microservice-specific exception classes and error handling patterns for distributed system error management, providing structured error responses and transport-specific error handling capabilities.
Base exception class for RPC operations providing structured error handling across all transport types.
/**
* Base exception class for RPC operations
*/
class RpcException extends Error {
/**
* Creates a new RPC exception
* @param error - Error message string or structured error object
*/
constructor(error: string | object);
/**
* Returns the error data
* @returns Original error message or object
*/
getError(): string | object;
}Usage Examples:
import { Controller } from '@nestjs/common';
import { MessagePattern, RpcException } from '@nestjs/microservices';
@Controller()
export class UserController {
@MessagePattern({ cmd: 'get_user' })
getUser(data: { id: number }): any {
if (!data.id) {
throw new RpcException('User ID is required');
}
const user = this.userService.findById(data.id);
if (!user) {
throw new RpcException({
code: 'USER_NOT_FOUND',
message: `User with ID ${data.id} not found`,
statusCode: 404
});
}
return user;
}
@MessagePattern({ cmd: 'create_user' })
createUser(data: any): any {
try {
this.validateUserData(data);
return this.userService.create(data);
} catch (validationError) {
throw new RpcException({
code: 'VALIDATION_ERROR',
message: 'Invalid user data provided',
details: validationError.details,
statusCode: 400
});
}
}
@MessagePattern({ cmd: 'update_user' })
async updateUser(data: { id: number; updates: any }): Promise<any> {
try {
const result = await this.userService.update(data.id, data.updates);
return result;
} catch (error) {
if (error.code === 'CONCURRENT_MODIFICATION') {
throw new RpcException({
code: 'CONFLICT',
message: 'User was modified by another process',
timestamp: new Date().toISOString(),
statusCode: 409
});
}
// Re-throw as RPC exception
throw new RpcException({
code: 'INTERNAL_ERROR',
message: 'Failed to update user',
originalError: error.message
});
}
}
}Specialized exception for Kafka operations that can be retried, enabling proper error handling in streaming scenarios.
/**
* Kafka-specific exception for retriable errors
*/
class KafkaRetriableException extends RpcException {
/**
* Creates a new Kafka retriable exception
* @param error - Error message string or structured error object
*/
constructor(error: string | object);
}Usage Examples:
import { Controller, Logger } from '@nestjs/common';
import { EventPattern, MessagePattern, Payload, Ctx } from '@nestjs/microservices';
import { KafkaRetriableException, KafkaContext } from '@nestjs/microservices';
@Controller()
export class KafkaErrorController {
private readonly logger = new Logger(KafkaErrorController.name);
@EventPattern('user.events.process')
async processUserEvent(
@Payload() event: any,
@Ctx() context: KafkaContext
): Promise<void> {
try {
await this.processEvent(event);
} catch (error) {
if (this.isRetriableError(error)) {
// Throw retriable exception - Kafka will retry
throw new KafkaRetriableException({
code: 'TEMPORARY_FAILURE',
message: 'Temporary processing failure, will retry',
originalError: error.message,
retryCount: this.getRetryCount(context),
topic: context.getTopic(),
partition: context.getPartition()
});
} else {
// Non-retriable error - log and continue
this.logger.error('Non-retriable error processing event:', error);
this.deadLetterService.send(event, error);
}
}
}
@MessagePattern('user.commands.process')
async processUserCommand(
@Payload() command: any,
@Ctx() context: KafkaContext
): Promise<any> {
const maxRetries = 3;
const retryCount = this.getRetryCount(context);
try {
return await this.commandService.process(command);
} catch (error) {
if (retryCount < maxRetries && this.shouldRetry(error)) {
throw new KafkaRetriableException({
code: 'PROCESSING_FAILED',
message: `Command processing failed, retry ${retryCount + 1}/${maxRetries}`,
commandId: command.id,
retryCount: retryCount,
maxRetries: maxRetries,
error: error.message
});
} else {
// Max retries reached or non-retriable error
throw new RpcException({
code: 'COMMAND_FAILED',
message: 'Command processing failed permanently',
commandId: command.id,
retryCount: retryCount,
finalError: error.message
});
}
}
}
private isRetriableError(error: any): boolean {
// Define logic for determining if error is retriable
return error.code === 'ECONNRESET' ||
error.code === 'ETIMEDOUT' ||
error.statusCode === 503 ||
error.statusCode === 429;
}
private shouldRetry(error: any): boolean {
return error.temporary === true ||
error.statusCode >= 500 ||
error.code === 'NETWORK_ERROR';
}
private getRetryCount(context: KafkaContext): number {
const message = context.getMessage();
const retryHeader = message.headers?.['retry-count'];
return retryHeader ? parseInt(retryHeader.toString()) : 0;
}
}Base exception filter for handling RPC exceptions across different transport types.
/**
* Base exception filter for RPC operations
*/
class BaseRpcExceptionFilter {
/**
* Catches and processes RPC exceptions
* @param exception - The thrown exception
* @param host - Execution context host
*/
catch(exception: RpcException, host: any): any;
}Usage Examples:
import { Catch, RpcExceptionFilter, ArgumentsHost } from '@nestjs/common';
import { Observable, throwError } from 'rxjs';
import { RpcException } from '@nestjs/microservices';
@Catch(RpcException)
export class CustomRpcExceptionFilter implements RpcExceptionFilter<RpcException> {
catch(exception: RpcException, host: ArgumentsHost): Observable<any> {
const ctx = host.switchToRpc();
const data = ctx.getData();
const error = exception.getError();
// Log the error
console.error('RPC Exception:', {
error,
requestData: data,
timestamp: new Date().toISOString()
});
// Format error response
const errorResponse = {
success: false,
error: typeof error === 'string' ? { message: error } : error,
timestamp: new Date().toISOString(),
requestId: this.generateRequestId()
};
return throwError(() => errorResponse);
}
private generateRequestId(): string {
return `req_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
}
// Usage in main.ts or module
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.TCP,
options: { port: 3001 },
},
);
app.useGlobalFilters(new CustomRpcExceptionFilter());
await app.listen();
}Different error handling patterns for various transport types.
TCP Error Handling:
import { Controller } from '@nestjs/common';
import { MessagePattern, RpcException, Ctx, TcpContext } from '@nestjs/microservices';
@Controller()
export class TcpErrorController {
@MessagePattern({ cmd: 'risky_operation' })
performRiskyOperation(
data: any,
@Ctx() context: TcpContext
): any {
try {
return this.processData(data);
} catch (error) {
// TCP-specific error handling
const socket = context.getSocketRef();
throw new RpcException({
code: 'TCP_PROCESSING_ERROR',
message: error.message,
clientAddress: socket.socket.remoteAddress,
clientPort: socket.socket.remotePort,
timestamp: new Date().toISOString()
});
}
}
}Redis Error Handling:
@Controller()
export class RedisErrorController {
@MessagePattern('cache:operation')
performCacheOperation(
data: any,
@Ctx() context: RedisContext
): any {
try {
return this.cacheService.process(data);
} catch (error) {
throw new RpcException({
code: 'REDIS_ERROR',
message: 'Cache operation failed',
channel: context.getChannel(),
error: error.message,
retryable: error.code === 'ECONNRESET'
});
}
}
}gRPC Error Handling:
import { status } from '@grpc/grpc-js';
@Controller()
export class GrpcErrorController {
@GrpcMethod('UserService', 'GetUser')
getUser(data: { id: string }): any {
try {
const user = this.userService.findById(data.id);
if (!user) {
throw new RpcException({
code: status.NOT_FOUND,
message: `User with ID ${data.id} not found`,
details: 'USER_NOT_FOUND'
});
}
return user;
} catch (error) {
if (error.name === 'ValidationError') {
throw new RpcException({
code: status.INVALID_ARGUMENT,
message: 'Invalid request data',
details: error.details
});
}
throw new RpcException({
code: status.INTERNAL,
message: 'Internal server error'
});
}
}
}Common patterns for structuring error responses across different scenarios.
// Standard error response structure
interface ErrorResponse {
success: false;
error: {
code: string;
message: string;
details?: any;
timestamp: string;
requestId?: string;
};
}
// Validation error structure
interface ValidationErrorResponse extends ErrorResponse {
error: {
code: 'VALIDATION_ERROR';
message: string;
fields: Array<{
field: string;
message: string;
value?: any;
}>;
timestamp: string;
};
}
// Business logic error structure
interface BusinessErrorResponse extends ErrorResponse {
error: {
code: string;
message: string;
businessRules: string[];
timestamp: string;
context?: Record<string, any>;
};
}
// Usage examples
@Controller()
export class StructuredErrorController {
@MessagePattern({ cmd: 'validate_and_process' })
validateAndProcess(data: any): any {
// Validation errors
const validationErrors = this.validate(data);
if (validationErrors.length > 0) {
throw new RpcException({
code: 'VALIDATION_ERROR',
message: 'Request validation failed',
fields: validationErrors,
timestamp: new Date().toISOString()
});
}
// Business logic errors
if (!this.checkBusinessRules(data)) {
throw new RpcException({
code: 'BUSINESS_RULE_VIOLATION',
message: 'Business rules validation failed',
businessRules: ['INSUFFICIENT_BALANCE', 'ACCOUNT_SUSPENDED'],
timestamp: new Date().toISOString(),
context: { accountId: data.accountId, amount: data.amount }
});
}
return this.processData(data);
}
}