CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-nestjs--microservices

Nest microservices framework providing scalable distributed systems with multiple transport layers and communication patterns.

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

exceptions.mddocs/

Exception Handling

Microservice-specific exception classes and error handling patterns for distributed system error management, providing structured error responses and transport-specific error handling capabilities.

Capabilities

RPC Exception

Base exception class for RPC operations providing structured error handling across all transport types.

/**
 * Base exception class for RPC operations
 */
class RpcException extends Error {
  /**
   * Creates a new RPC exception
   * @param error - Error message string or structured error object
   */
  constructor(error: string | object);
  
  /**
   * Returns the error data
   * @returns Original error message or object
   */
  getError(): string | object;
}

Usage Examples:

import { Controller } from '@nestjs/common';
import { MessagePattern, RpcException } from '@nestjs/microservices';

@Controller()
export class UserController {
  @MessagePattern({ cmd: 'get_user' })
  getUser(data: { id: number }): any {
    if (!data.id) {
      throw new RpcException('User ID is required');
    }
    
    const user = this.userService.findById(data.id);
    if (!user) {
      throw new RpcException({
        code: 'USER_NOT_FOUND',
        message: `User with ID ${data.id} not found`,
        statusCode: 404
      });
    }
    
    return user;
  }

  @MessagePattern({ cmd: 'create_user' })
  createUser(data: any): any {
    try {
      this.validateUserData(data);
      return this.userService.create(data);
    } catch (validationError) {
      throw new RpcException({
        code: 'VALIDATION_ERROR',
        message: 'Invalid user data provided',
        details: validationError.details,
        statusCode: 400
      });
    }
  }

  @MessagePattern({ cmd: 'update_user' })
  async updateUser(data: { id: number; updates: any }): Promise<any> {
    try {
      const result = await this.userService.update(data.id, data.updates);
      return result;
    } catch (error) {
      if (error.code === 'CONCURRENT_MODIFICATION') {
        throw new RpcException({
          code: 'CONFLICT',
          message: 'User was modified by another process',
          timestamp: new Date().toISOString(),
          statusCode: 409
        });
      }
      
      // Re-throw as RPC exception
      throw new RpcException({
        code: 'INTERNAL_ERROR',
        message: 'Failed to update user',
        originalError: error.message
      });
    }
  }
}

Kafka Retriable Exception

Specialized exception for Kafka operations that can be retried, enabling proper error handling in streaming scenarios.

/**
 * Kafka-specific exception for retriable errors
 */
class KafkaRetriableException extends RpcException {
  /**
   * Creates a new Kafka retriable exception
   * @param error - Error message string or structured error object
   */
  constructor(error: string | object);
}

Usage Examples:

import { Controller, Logger } from '@nestjs/common';
import { EventPattern, MessagePattern, Payload, Ctx } from '@nestjs/microservices';
import { KafkaRetriableException, KafkaContext } from '@nestjs/microservices';

@Controller()
export class KafkaErrorController {
  private readonly logger = new Logger(KafkaErrorController.name);

  @EventPattern('user.events.process')
  async processUserEvent(
    @Payload() event: any,
    @Ctx() context: KafkaContext
  ): Promise<void> {
    try {
      await this.processEvent(event);
    } catch (error) {
      if (this.isRetriableError(error)) {
        // Throw retriable exception - Kafka will retry
        throw new KafkaRetriableException({
          code: 'TEMPORARY_FAILURE',
          message: 'Temporary processing failure, will retry',
          originalError: error.message,
          retryCount: this.getRetryCount(context),
          topic: context.getTopic(),
          partition: context.getPartition()
        });
      } else {
        // Non-retriable error - log and continue
        this.logger.error('Non-retriable error processing event:', error);
        this.deadLetterService.send(event, error);
      }
    }
  }

  @MessagePattern('user.commands.process')
  async processUserCommand(
    @Payload() command: any,
    @Ctx() context: KafkaContext
  ): Promise<any> {
    const maxRetries = 3;
    const retryCount = this.getRetryCount(context);

    try {
      return await this.commandService.process(command);
    } catch (error) {
      if (retryCount < maxRetries && this.shouldRetry(error)) {
        throw new KafkaRetriableException({
          code: 'PROCESSING_FAILED',
          message: `Command processing failed, retry ${retryCount + 1}/${maxRetries}`,
          commandId: command.id,
          retryCount: retryCount,
          maxRetries: maxRetries,
          error: error.message
        });
      } else {
        // Max retries reached or non-retriable error
        throw new RpcException({
          code: 'COMMAND_FAILED',
          message: 'Command processing failed permanently',
          commandId: command.id,
          retryCount: retryCount,
          finalError: error.message
        });
      }
    }
  }

  private isRetriableError(error: any): boolean {
    // Define logic for determining if error is retriable
    return error.code === 'ECONNRESET' ||
           error.code === 'ETIMEDOUT' ||
           error.statusCode === 503 ||
           error.statusCode === 429;
  }

  private shouldRetry(error: any): boolean {
    return error.temporary === true ||
           error.statusCode >= 500 ||
           error.code === 'NETWORK_ERROR';
  }

  private getRetryCount(context: KafkaContext): number {
    const message = context.getMessage();
    const retryHeader = message.headers?.['retry-count'];
    return retryHeader ? parseInt(retryHeader.toString()) : 0;
  }
}

Base RPC Exception Filter

Base exception filter for handling RPC exceptions across different transport types.

/**
 * Base exception filter for RPC operations
 */
class BaseRpcExceptionFilter {
  /**
   * Catches and processes RPC exceptions
   * @param exception - The thrown exception
   * @param host - Execution context host
   */
  catch(exception: RpcException, host: any): any;
}

Usage Examples:

import { Catch, RpcExceptionFilter, ArgumentsHost } from '@nestjs/common';
import { Observable, throwError } from 'rxjs';
import { RpcException } from '@nestjs/microservices';

@Catch(RpcException)
export class CustomRpcExceptionFilter implements RpcExceptionFilter<RpcException> {
  catch(exception: RpcException, host: ArgumentsHost): Observable<any> {
    const ctx = host.switchToRpc();
    const data = ctx.getData();
    const error = exception.getError();

    // Log the error
    console.error('RPC Exception:', {
      error,
      requestData: data,
      timestamp: new Date().toISOString()
    });

    // Format error response
    const errorResponse = {
      success: false,
      error: typeof error === 'string' ? { message: error } : error,
      timestamp: new Date().toISOString(),
      requestId: this.generateRequestId()
    };

    return throwError(() => errorResponse);
  }

  private generateRequestId(): string {
    return `req_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  }
}

// Usage in main.ts or module
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.TCP,
      options: { port: 3001 },
    },
  );

  app.useGlobalFilters(new CustomRpcExceptionFilter());
  await app.listen();
}

Transport-Specific Error Handling

Different error handling patterns for various transport types.

TCP Error Handling:

import { Controller } from '@nestjs/common';
import { MessagePattern, RpcException, Ctx, TcpContext } from '@nestjs/microservices';

@Controller()
export class TcpErrorController {
  @MessagePattern({ cmd: 'risky_operation' })
  performRiskyOperation(
    data: any,
    @Ctx() context: TcpContext
  ): any {
    try {
      return this.processData(data);
    } catch (error) {
      // TCP-specific error handling
      const socket = context.getSocketRef();
      
      throw new RpcException({
        code: 'TCP_PROCESSING_ERROR',
        message: error.message,
        clientAddress: socket.socket.remoteAddress,
        clientPort: socket.socket.remotePort,
        timestamp: new Date().toISOString()
      });
    }
  }
}

Redis Error Handling:

@Controller()
export class RedisErrorController {
  @MessagePattern('cache:operation')
  performCacheOperation(
    data: any,
    @Ctx() context: RedisContext
  ): any {
    try {
      return this.cacheService.process(data);
    } catch (error) {
      throw new RpcException({
        code: 'REDIS_ERROR',
        message: 'Cache operation failed',
        channel: context.getChannel(),
        error: error.message,
        retryable: error.code === 'ECONNRESET'
      });
    }
  }
}

gRPC Error Handling:

import { status } from '@grpc/grpc-js';

@Controller()
export class GrpcErrorController {
  @GrpcMethod('UserService', 'GetUser')
  getUser(data: { id: string }): any {
    try {
      const user = this.userService.findById(data.id);
      if (!user) {
        throw new RpcException({
          code: status.NOT_FOUND,
          message: `User with ID ${data.id} not found`,
          details: 'USER_NOT_FOUND'
        });
      }
      return user;
    } catch (error) {
      if (error.name === 'ValidationError') {
        throw new RpcException({
          code: status.INVALID_ARGUMENT,
          message: 'Invalid request data',
          details: error.details
        });
      }
      
      throw new RpcException({
        code: status.INTERNAL,
        message: 'Internal server error'
      });
    }
  }
}

Error Response Patterns

Common patterns for structuring error responses across different scenarios.

// Standard error response structure
interface ErrorResponse {
  success: false;
  error: {
    code: string;
    message: string;
    details?: any;
    timestamp: string;
    requestId?: string;
  };
}

// Validation error structure
interface ValidationErrorResponse extends ErrorResponse {
  error: {
    code: 'VALIDATION_ERROR';
    message: string;
    fields: Array<{
      field: string;
      message: string;
      value?: any;
    }>;
    timestamp: string;
  };
}

// Business logic error structure
interface BusinessErrorResponse extends ErrorResponse {
  error: {
    code: string;
    message: string;
    businessRules: string[];
    timestamp: string;
    context?: Record<string, any>;
  };
}

// Usage examples
@Controller()
export class StructuredErrorController {
  @MessagePattern({ cmd: 'validate_and_process' })
  validateAndProcess(data: any): any {
    // Validation errors
    const validationErrors = this.validate(data);
    if (validationErrors.length > 0) {
      throw new RpcException({
        code: 'VALIDATION_ERROR',
        message: 'Request validation failed',
        fields: validationErrors,
        timestamp: new Date().toISOString()
      });
    }

    // Business logic errors
    if (!this.checkBusinessRules(data)) {
      throw new RpcException({
        code: 'BUSINESS_RULE_VIOLATION',
        message: 'Business rules validation failed',
        businessRules: ['INSUFFICIENT_BALANCE', 'ACCOUNT_SUSPENDED'],
        timestamp: new Date().toISOString(),
        context: { accountId: data.accountId, amount: data.amount }
      });
    }

    return this.processData(data);
  }
}

docs

client-proxies.md

context-objects.md

exceptions.md

grpc.md

index.md

message-patterns.md

modules.md

transports.md

tile.json