CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-nats

Node.js client for NATS, a lightweight, high-performance cloud native messaging system

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

services.mddocs/

Services API

NATS Services API provides a framework for building discoverable microservices with built-in health monitoring, statistics collection, and service registry capabilities.

Capabilities

Service Registration

Register services with NATS for discovery and monitoring.

/**
 * Access to Services API from NATS connection
 */
services: ServicesAPI;

interface ServicesAPI {
  /**
   * Register a new service with NATS
   * @param config - Service configuration including name, version, and endpoints
   * @returns Promise resolving to Service instance
   */
  add(config: ServiceConfig): Promise<Service>;

  /**
   * Create a service client for discovery and monitoring
   * @param opts - Request options for service operations
   * @param prefix - Custom service prefix (default: "$SRV")
   * @returns ServiceClient instance
   */
  client(opts?: RequestManyOptions, prefix?: string): ServiceClient;
}

interface ServiceConfig {
  /** Service name (required) */
  name: string;
  /** Service version (required) */
  version: string;
  /** Service description */
  description?: string;
  /** Service metadata */
  metadata?: Record<string, string>;
  /** Service endpoints */
  queue?: string;
  /** Schema for service definition */
  schema?: ServiceSchema;
}

interface ServiceSchema {
  /** Request schema */
  request?: Record<string, unknown>;
  /** Response schema */
  response?: Record<string, unknown>;
}

interface Service {
  /** Stop the service */
  stop(): Promise<void>;
  /** Check if service is stopped */
  stopped: boolean;
  /** Service information */
  info: ServiceInfo;
  /** Service statistics */
  stats(): ServiceStats;
}

Usage Examples:

import { connect } from "nats";

const nc = await connect();

// Create a simple service
const service = await nc.services.add({
  name: "calculator",
  version: "1.0.0",
  description: "Simple math calculator service"
});

console.log(`Service started: ${service.info.name} v${service.info.version}`);

// Create service with metadata
const userService = await nc.services.add({
  name: "user-manager",
  version: "2.1.0",
  description: "User management service",
  metadata: {
    region: "us-west-2",
    environment: "production",
    team: "backend"
  }
});

// Stop service gracefully
await userService.stop();

Service Endpoints

Define service endpoints that handle specific request types.

interface ServiceGroup {
  /** Add endpoint to service group */
  addEndpoint(
    name: string,
    opts: ServiceEndpointOpts,
    handler: ServiceHandler
  ): QueuedIterator<ServiceMsg>;

  /** Add multiple endpoints */
  addGroup(name: string): ServiceGroup;
}

interface ServiceEndpointOpts {
  /** Endpoint subject pattern */
  subject?: string;
  /** Endpoint queue group */
  queue_group?: string;  
  /** Endpoint metadata */
  metadata?: Record<string, string>;
  /** Endpoint schema */
  schema?: {
    request?: Record<string, unknown>;
    response?: Record<string, unknown>;
  };
}

type ServiceHandler = (
  err: NatsError | null,
  msg: ServiceMsg
) => Promise<void> | void;

interface ServiceMsg extends Msg {
  /** Respond to service message */
  respond(data?: Payload, opts?: ServiceMsgResponse): boolean;
  /** Respond with error */
  respondError(code: number, description: string, data?: Payload): boolean;
}

interface ServiceMsgResponse {
  /** Response headers */
  headers?: MsgHdrs;
}

Usage Examples:

import { connect, StringCodec, JSONCodec, headers } from "nats";

const nc = await connect();
const sc = StringCodec();
const jc = JSONCodec();

// Create calculator service with endpoints
const calc = await nc.services.add({
  name: "calculator",
  version: "1.0.0",
  description: "Math calculator service"
});

// Add endpoint for addition
const addGroup = calc.addGroup("math");
const addEndpoint = addGroup.addEndpoint(
  "add",
  {
    subject: "calc.add",
    metadata: { operation: "addition" }
  },
  (err, msg) => {
    if (err) {
      console.error("Handler error:", err);
      return;
    }

    try {
      const { a, b } = jc.decode(msg.data);
      const result = { sum: a + b };
      msg.respond(jc.encode(result));
    } catch (error) {
      msg.respondError(400, "Invalid input format", sc.encode("Expected {a: number, b: number}"));
    }
  }
);

// Add endpoint for division with error handling
const divEndpoint = addGroup.addEndpoint(
  "divide",
  { subject: "calc.divide" },
  (err, msg) => {
    if (err) return;

    try {
      const { dividend, divisor } = jc.decode(msg.data);
      
      if (divisor === 0) {
        msg.respondError(422, "Division by zero", sc.encode("Divisor cannot be zero"));
        return;
      }
      
      const result = { quotient: dividend / divisor };
      msg.respond(jc.encode(result));
    } catch (error) {
      msg.respondError(400, "Invalid input", sc.encode(error.message));
    }
  }
);

// User management service with authentication
const userSvc = await nc.services.add({
  name: "user-service",
  version: "1.2.0"
});

const usersGroup = userSvc.addGroup("users");
const getUserEndpoint = usersGroup.addEndpoint(
  "get",
  { subject: "users.get" },
  async (err, msg) => {
    if (err) return;

    // Check authentication header
    const token = msg.headers?.get("authorization");
    if (!token) {
      msg.respondError(401, "Unauthorized", sc.encode("Missing authorization header"));
      return;
    }

    try {
      const { userId } = jc.decode(msg.data);
      const user = await getUserFromDatabase(userId); // Your DB call
      
      if (!user) {
        msg.respondError(404, "User not found", sc.encode(`User ${userId} not found`));
        return;
      }
      
      // Add response headers
      const responseHeaders = headers();
      responseHeaders.set("content-type", "application/json");
      responseHeaders.set("cache-control", "max-age=300");
      
      msg.respond(jc.encode(user), { headers: responseHeaders });
    } catch (error) {
      msg.respondError(500, "Internal server error", sc.encode(error.message));
    }
  }
);

Service Discovery

Discover and interact with registered services.

interface ServiceClient {
  /**
   * Ping services to check availability
   * @param name - Optional service name filter
   * @param id - Optional service ID filter
   * @returns Promise resolving to async iterator of service identities
   */
  ping(name?: string, id?: string): Promise<QueuedIterator<ServiceIdentity>>;

  /**
   * Get statistics from services
   * @param name - Optional service name filter
   * @param id - Optional service ID filter
   * @returns Promise resolving to async iterator of service statistics
   */
  stats(name?: string, id?: string): Promise<QueuedIterator<ServiceStats>>;

  /**
   * Get information from services
   * @param name - Optional service name filter
   * @param id - Optional service ID filter
   * @returns Promise resolving to async iterator of service information
   */
  info(name?: string, id?: string): Promise<QueuedIterator<ServiceInfo>>;
}

interface ServiceIdentity {
  /** Service name */
  name: string;
  /** Service ID */
  id: string;
  /** Service version */
  version: string;
  /** Service metadata */
  metadata?: Record<string, string>;
}

interface ServiceInfo {
  /** Service name */
  name: string;
  /** Service ID */
  id: string;
  /** Service version */
  version: string;
  /** Service description */
  description?: string;
  /** Service metadata */
  metadata?: Record<string, string>;
  /** Service endpoints */
  endpoints: EndpointInfo[];
  /** Service type */
  type: string;
}

interface ServiceStats {
  /** Service name */
  name: string;
  /** Service ID */  
  id: string;
  /** Service version */
  version: string;
  /** Service start time */
  started: Date;
  /** Service endpoints statistics */
  endpoints: EndpointStats[];
}

interface EndpointInfo {
  /** Endpoint name */
  name: string;
  /** Endpoint subject */
  subject: string;
  /** Endpoint queue group */
  queue_group?: string;
  /** Endpoint metadata */
  metadata?: Record<string, string>;
}

interface EndpointStats {
  /** Endpoint name */
  name: string;
  /** Endpoint subject */
  subject: string;
  /** Number of requests received */
  num_requests: number;
  /** Number of errors */
  num_errors: number;
  /** Last error message */
  last_error?: string;
  /** Average processing time */
  processing_time: number;
  /** Average queue time */
  queue_time: number;
  /** Additional endpoint data */
  data?: Record<string, unknown>;
}

Usage Examples:

import { connect } from "nats";

const nc = await connect();
const client = nc.services.client();

// Discover all services
console.log("Discovering services...");
const services = await client.ping();
for await (const service of services) {
  console.log(`Found: ${service.name} v${service.version} (${service.id})`);
}

// Get information about specific service
const calcInfo = await client.info("calculator");
for await (const info of calcInfo) {
  console.log(`Service: ${info.name} - ${info.description}`);
  console.log("Endpoints:");
  for (const endpoint of info.endpoints) {
    console.log(`  - ${endpoint.name}: ${endpoint.subject}`);
  }
}

// Monitor service statistics
const statsMonitor = await client.stats("user-service");
for await (const stats of statsMonitor) {
  console.log(`Service: ${stats.name}, Started: ${stats.started}`);
  for (const endpoint of stats.endpoints) {
    console.log(`  ${endpoint.name}: ${endpoint.num_requests} requests, ${endpoint.num_errors} errors`);
    if (endpoint.processing_time > 0) {
      console.log(`    Avg processing: ${endpoint.processing_time}ms`);
    }
  }
}

// Health check for specific services
async function checkServiceHealth() {
  const pingResults = await client.ping();
  const healthyServices: string[] = [];
  
  for await (const service of pingResults) {
    healthyServices.push(`${service.name}@${service.version}`);
  }
  
  console.log(`Healthy services: ${healthyServices.join(", ")}`);
  return healthyServices;
}

// Run health check every 30 seconds
setInterval(checkServiceHealth, 30000);

Service Communication

Communicate with discovered services using standard NATS messaging.

// Service client wrapper for easy communication
class ServiceProxy {
  constructor(private nc: NatsConnection, private serviceName: string) {}

  async callEndpoint<T, R>(
    endpoint: string, 
    data: T, 
    timeout = 5000
  ): Promise<R> {
    const subject = `${this.serviceName}.${endpoint}`;
    const response = await this.nc.request(
      subject, 
      JSONCodec<T>().encode(data),
      { timeout }
    );

    // Check for service error response
    if (response.headers?.hasError) {
      throw new Error(`Service error ${response.headers.code}: ${response.headers.description}`);
    }

    return JSONCodec<R>().decode(response.data);
  }

  async callWithAuth<T, R>(
    endpoint: string,
    data: T,
    token: string,
    timeout = 5000
  ): Promise<R> {
    const subject = `${this.serviceName}.${endpoint}`;
    const hdrs = headers();
    hdrs.set("authorization", `Bearer ${token}`);

    const response = await this.nc.request(
      subject,
      JSONCodec<T>().encode(data),
      { timeout, headers: hdrs }
    );

    if (response.headers?.hasError) {
      throw new Error(`Service error ${response.headers.code}: ${response.headers.description}`);
    }

    return JSONCodec<R>().decode(response.data);
  }
}

// Usage examples
const nc = await connect();

// Calculator service client
const calc = new ServiceProxy(nc, "calc");

try {
  const sum = await calc.callEndpoint<{a: number, b: number}, {sum: number}>(
    "add", 
    { a: 10, b: 20 }
  );
  console.log(`10 + 20 = ${sum.sum}`);

  const quotient = await calc.callEndpoint<{dividend: number, divisor: number}, {quotient: number}>(
    "divide",
    { dividend: 100, divisor: 5 }
  );
  console.log(`100 / 5 = ${quotient.quotient}`);
} catch (err) {
  console.error("Calculator service error:", err.message);
}

// User service client with authentication
const users = new ServiceProxy(nc, "users");

try {
  const user = await users.callWithAuth<{userId: string}, {id: string, name: string, email: string}>(
    "get",
    { userId: "user123" },
    "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..." // JWT token
  );
  console.log(`User: ${user.name} (${user.email})`);
} catch (err) {
  console.error("User service error:", err.message);
}

Service Monitoring

Monitor service health and performance metrics.

// Service monitoring dashboard
class ServiceMonitor {
  private client: ServiceClient;

  constructor(nc: NatsConnection) {
    this.client = nc.services.client();
  }

  async getServiceInventory(): Promise<ServiceInfo[]> {
    const services: ServiceInfo[] = [];
    const serviceInfo = await this.client.info();
    
    for await (const info of serviceInfo) {
      services.push(info);
    }
    
    return services;
  }

  async getServiceMetrics(): Promise<Map<string, ServiceStats>> {
    const metrics = new Map<string, ServiceStats>();
    const stats = await this.client.stats();
    
    for await (const stat of stats) {
      metrics.set(`${stat.name}@${stat.version}`, stat);
    }
    
    return metrics;
  }

  async monitorServiceHealth(intervalMs = 10000): Promise<void> {
    console.log("Starting service health monitoring...");
    
    setInterval(async () => {
      try {
        const services = await this.getServiceInventory();
        const metrics = await this.getServiceMetrics();
        
        console.log(`\n=== Service Health Report (${new Date().toISOString()}) ===`);
        
        for (const service of services) {
          const key = `${service.name}@${service.version}`;
          const stats = metrics.get(key);
          
          console.log(`\n${service.name} v${service.version} (${service.id})`);
          console.log(`  Description: ${service.description || 'N/A'}`);
          console.log(`  Endpoints: ${service.endpoints.length}`);
          
          if (stats) {
            console.log(`  Uptime: ${this.formatUptime(stats.started)}`);
            
            let totalRequests = 0;
            let totalErrors = 0;
            
            for (const endpoint of stats.endpoints) {
              totalRequests += endpoint.num_requests;
              totalErrors += endpoint.num_errors;
              
              const errorRate = endpoint.num_requests > 0 
                ? (endpoint.num_errors / endpoint.num_requests * 100).toFixed(2)
                : "0.00";
                
              console.log(`    ${endpoint.name}: ${endpoint.num_requests} reqs, ${errorRate}% errors, ${endpoint.processing_time}ms avg`);
            }
            
            const overallErrorRate = totalRequests > 0
              ? (totalErrors / totalRequests * 100).toFixed(2)
              : "0.00";
              
            console.log(`  Overall: ${totalRequests} requests, ${overallErrorRate}% error rate`);
          } else {
            console.log(`  Status: No statistics available`);
          }
        }
      } catch (err) {
        console.error("Health monitoring error:", err);
      }
    }, intervalMs);
  }

  private formatUptime(started: Date): string {
    const uptimeMs = Date.now() - started.getTime();
    const seconds = Math.floor(uptimeMs / 1000);
    
    if (seconds < 60) return `${seconds}s`;
    if (seconds < 3600) return `${Math.floor(seconds / 60)}m ${seconds % 60}s`;
    
    const hours = Math.floor(seconds / 3600);
    const mins = Math.floor((seconds % 3600) / 60);
    return `${hours}h ${mins}m`;
  }

  async alertOnHighErrorRate(threshold = 5.0): Promise<void> {
    const metrics = await this.getServiceMetrics();
    
    for (const [serviceKey, stats] of metrics) {
      for (const endpoint of stats.endpoints) {
        if (endpoint.num_requests > 0) {
          const errorRate = (endpoint.num_errors / endpoint.num_requests) * 100;
          
          if (errorRate > threshold) {
            console.error(`🚨 HIGH ERROR RATE ALERT: ${serviceKey}/${endpoint.name}`);
            console.error(`   Error rate: ${errorRate.toFixed(2)}% (${endpoint.num_errors}/${endpoint.num_requests})`);
            console.error(`   Last error: ${endpoint.last_error || 'N/A'}`);
          }
        }
      }
    }
  }
}

// Usage
const nc = await connect();
const monitor = new ServiceMonitor(nc);

// Get current service inventory
const services = await monitor.getServiceInventory();
console.log(`Discovered ${services.length} services`);

// Start continuous monitoring
await monitor.monitorServiceHealth(15000); // Every 15 seconds

// Check for high error rates
setInterval(() => monitor.alertOnHighErrorRate(10.0), 60000); // Every minute, 10% threshold

Types

enum ServiceVerb {
  PING = "PING",
  STATS = "STATS", 
  INFO = "INFO",
  SCHEMA = "SCHEMA"
}

enum ServiceResponseType {
  Singleton = "io.nats.micro.v1.ping_response",
  Stream = "io.nats.micro.v1.stats_response"
}

interface RequestManyOptions {
  strategy: RequestStrategy;
  maxWait: number;
  headers?: MsgHdrs;
  maxMessages?: number;
  noMux?: boolean;
  jitter?: number;
}

interface ServiceMetadata {
  [key: string]: string;
}

interface ServiceGroup {
  addEndpoint(name: string, opts: EndpointOptions, handler: ServiceHandler): QueuedIterator<ServiceMsg>;
  addGroup(name: string): ServiceGroup;
}

interface EndpointOptions {
  subject?: string;
  queue_group?: string;
  metadata?: ServiceMetadata;
  schema?: {
    request?: Record<string, unknown>;
    response?: Record<string, unknown>;
  };
}

interface NamedEndpointStats {
  name: string;
  subject: string;
  num_requests: number;
  num_errors: number;
  last_error?: string;
  processing_time: number;
  queue_time: number;
  data?: Record<string, unknown>;
}

interface ServiceError extends NatsError {
  /** Service error code */
  service_error_code?: number;
  /** Service error message */
  service_error_message?: string;
}

const ServiceErrorHeader = "Nats-Service-Error";
const ServiceErrorCodeHeader = "Nats-Service-Error-Code";

interface QueuedIterator<T> {
  next(): Promise<IteratorResult<T>>;
  stop(): void;
  [Symbol.asyncIterator](): AsyncIterableIterator<T>;
}

docs

connection.md

index.md

jetstream.md

kv-store.md

messaging.md

object-store.md

services.md

tile.json