Node.js client for NATS, a lightweight, high-performance cloud native messaging system
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
NATS Services API provides a framework for building discoverable microservices with built-in health monitoring, statistics collection, and service registry capabilities.
Register services with NATS for discovery and monitoring.
/**
* Access to Services API from NATS connection
*/
services: ServicesAPI;
interface ServicesAPI {
/**
* Register a new service with NATS
* @param config - Service configuration including name, version, and endpoints
* @returns Promise resolving to Service instance
*/
add(config: ServiceConfig): Promise<Service>;
/**
* Create a service client for discovery and monitoring
* @param opts - Request options for service operations
* @param prefix - Custom service prefix (default: "$SRV")
* @returns ServiceClient instance
*/
client(opts?: RequestManyOptions, prefix?: string): ServiceClient;
}
interface ServiceConfig {
/** Service name (required) */
name: string;
/** Service version (required) */
version: string;
/** Service description */
description?: string;
/** Service metadata */
metadata?: Record<string, string>;
/** Service endpoints */
queue?: string;
/** Schema for service definition */
schema?: ServiceSchema;
}
interface ServiceSchema {
/** Request schema */
request?: Record<string, unknown>;
/** Response schema */
response?: Record<string, unknown>;
}
interface Service {
/** Stop the service */
stop(): Promise<void>;
/** Check if service is stopped */
stopped: boolean;
/** Service information */
info: ServiceInfo;
/** Service statistics */
stats(): ServiceStats;
}Usage Examples:
import { connect } from "nats";
const nc = await connect();
// Create a simple service
const service = await nc.services.add({
name: "calculator",
version: "1.0.0",
description: "Simple math calculator service"
});
console.log(`Service started: ${service.info.name} v${service.info.version}`);
// Create service with metadata
const userService = await nc.services.add({
name: "user-manager",
version: "2.1.0",
description: "User management service",
metadata: {
region: "us-west-2",
environment: "production",
team: "backend"
}
});
// Stop service gracefully
await userService.stop();Define service endpoints that handle specific request types.
interface ServiceGroup {
/** Add endpoint to service group */
addEndpoint(
name: string,
opts: ServiceEndpointOpts,
handler: ServiceHandler
): QueuedIterator<ServiceMsg>;
/** Add multiple endpoints */
addGroup(name: string): ServiceGroup;
}
interface ServiceEndpointOpts {
/** Endpoint subject pattern */
subject?: string;
/** Endpoint queue group */
queue_group?: string;
/** Endpoint metadata */
metadata?: Record<string, string>;
/** Endpoint schema */
schema?: {
request?: Record<string, unknown>;
response?: Record<string, unknown>;
};
}
type ServiceHandler = (
err: NatsError | null,
msg: ServiceMsg
) => Promise<void> | void;
interface ServiceMsg extends Msg {
/** Respond to service message */
respond(data?: Payload, opts?: ServiceMsgResponse): boolean;
/** Respond with error */
respondError(code: number, description: string, data?: Payload): boolean;
}
interface ServiceMsgResponse {
/** Response headers */
headers?: MsgHdrs;
}Usage Examples:
import { connect, StringCodec, JSONCodec, headers } from "nats";
const nc = await connect();
const sc = StringCodec();
const jc = JSONCodec();
// Create calculator service with endpoints
const calc = await nc.services.add({
name: "calculator",
version: "1.0.0",
description: "Math calculator service"
});
// Add endpoint for addition
const addGroup = calc.addGroup("math");
const addEndpoint = addGroup.addEndpoint(
"add",
{
subject: "calc.add",
metadata: { operation: "addition" }
},
(err, msg) => {
if (err) {
console.error("Handler error:", err);
return;
}
try {
const { a, b } = jc.decode(msg.data);
const result = { sum: a + b };
msg.respond(jc.encode(result));
} catch (error) {
msg.respondError(400, "Invalid input format", sc.encode("Expected {a: number, b: number}"));
}
}
);
// Add endpoint for division with error handling
const divEndpoint = addGroup.addEndpoint(
"divide",
{ subject: "calc.divide" },
(err, msg) => {
if (err) return;
try {
const { dividend, divisor } = jc.decode(msg.data);
if (divisor === 0) {
msg.respondError(422, "Division by zero", sc.encode("Divisor cannot be zero"));
return;
}
const result = { quotient: dividend / divisor };
msg.respond(jc.encode(result));
} catch (error) {
msg.respondError(400, "Invalid input", sc.encode(error.message));
}
}
);
// User management service with authentication
const userSvc = await nc.services.add({
name: "user-service",
version: "1.2.0"
});
const usersGroup = userSvc.addGroup("users");
const getUserEndpoint = usersGroup.addEndpoint(
"get",
{ subject: "users.get" },
async (err, msg) => {
if (err) return;
// Check authentication header
const token = msg.headers?.get("authorization");
if (!token) {
msg.respondError(401, "Unauthorized", sc.encode("Missing authorization header"));
return;
}
try {
const { userId } = jc.decode(msg.data);
const user = await getUserFromDatabase(userId); // Your DB call
if (!user) {
msg.respondError(404, "User not found", sc.encode(`User ${userId} not found`));
return;
}
// Add response headers
const responseHeaders = headers();
responseHeaders.set("content-type", "application/json");
responseHeaders.set("cache-control", "max-age=300");
msg.respond(jc.encode(user), { headers: responseHeaders });
} catch (error) {
msg.respondError(500, "Internal server error", sc.encode(error.message));
}
}
);Discover and interact with registered services.
interface ServiceClient {
/**
* Ping services to check availability
* @param name - Optional service name filter
* @param id - Optional service ID filter
* @returns Promise resolving to async iterator of service identities
*/
ping(name?: string, id?: string): Promise<QueuedIterator<ServiceIdentity>>;
/**
* Get statistics from services
* @param name - Optional service name filter
* @param id - Optional service ID filter
* @returns Promise resolving to async iterator of service statistics
*/
stats(name?: string, id?: string): Promise<QueuedIterator<ServiceStats>>;
/**
* Get information from services
* @param name - Optional service name filter
* @param id - Optional service ID filter
* @returns Promise resolving to async iterator of service information
*/
info(name?: string, id?: string): Promise<QueuedIterator<ServiceInfo>>;
}
interface ServiceIdentity {
/** Service name */
name: string;
/** Service ID */
id: string;
/** Service version */
version: string;
/** Service metadata */
metadata?: Record<string, string>;
}
interface ServiceInfo {
/** Service name */
name: string;
/** Service ID */
id: string;
/** Service version */
version: string;
/** Service description */
description?: string;
/** Service metadata */
metadata?: Record<string, string>;
/** Service endpoints */
endpoints: EndpointInfo[];
/** Service type */
type: string;
}
interface ServiceStats {
/** Service name */
name: string;
/** Service ID */
id: string;
/** Service version */
version: string;
/** Service start time */
started: Date;
/** Service endpoints statistics */
endpoints: EndpointStats[];
}
interface EndpointInfo {
/** Endpoint name */
name: string;
/** Endpoint subject */
subject: string;
/** Endpoint queue group */
queue_group?: string;
/** Endpoint metadata */
metadata?: Record<string, string>;
}
interface EndpointStats {
/** Endpoint name */
name: string;
/** Endpoint subject */
subject: string;
/** Number of requests received */
num_requests: number;
/** Number of errors */
num_errors: number;
/** Last error message */
last_error?: string;
/** Average processing time */
processing_time: number;
/** Average queue time */
queue_time: number;
/** Additional endpoint data */
data?: Record<string, unknown>;
}Usage Examples:
import { connect } from "nats";
const nc = await connect();
const client = nc.services.client();
// Discover all services
console.log("Discovering services...");
const services = await client.ping();
for await (const service of services) {
console.log(`Found: ${service.name} v${service.version} (${service.id})`);
}
// Get information about specific service
const calcInfo = await client.info("calculator");
for await (const info of calcInfo) {
console.log(`Service: ${info.name} - ${info.description}`);
console.log("Endpoints:");
for (const endpoint of info.endpoints) {
console.log(` - ${endpoint.name}: ${endpoint.subject}`);
}
}
// Monitor service statistics
const statsMonitor = await client.stats("user-service");
for await (const stats of statsMonitor) {
console.log(`Service: ${stats.name}, Started: ${stats.started}`);
for (const endpoint of stats.endpoints) {
console.log(` ${endpoint.name}: ${endpoint.num_requests} requests, ${endpoint.num_errors} errors`);
if (endpoint.processing_time > 0) {
console.log(` Avg processing: ${endpoint.processing_time}ms`);
}
}
}
// Health check for specific services
async function checkServiceHealth() {
const pingResults = await client.ping();
const healthyServices: string[] = [];
for await (const service of pingResults) {
healthyServices.push(`${service.name}@${service.version}`);
}
console.log(`Healthy services: ${healthyServices.join(", ")}`);
return healthyServices;
}
// Run health check every 30 seconds
setInterval(checkServiceHealth, 30000);Communicate with discovered services using standard NATS messaging.
// Service client wrapper for easy communication
class ServiceProxy {
constructor(private nc: NatsConnection, private serviceName: string) {}
async callEndpoint<T, R>(
endpoint: string,
data: T,
timeout = 5000
): Promise<R> {
const subject = `${this.serviceName}.${endpoint}`;
const response = await this.nc.request(
subject,
JSONCodec<T>().encode(data),
{ timeout }
);
// Check for service error response
if (response.headers?.hasError) {
throw new Error(`Service error ${response.headers.code}: ${response.headers.description}`);
}
return JSONCodec<R>().decode(response.data);
}
async callWithAuth<T, R>(
endpoint: string,
data: T,
token: string,
timeout = 5000
): Promise<R> {
const subject = `${this.serviceName}.${endpoint}`;
const hdrs = headers();
hdrs.set("authorization", `Bearer ${token}`);
const response = await this.nc.request(
subject,
JSONCodec<T>().encode(data),
{ timeout, headers: hdrs }
);
if (response.headers?.hasError) {
throw new Error(`Service error ${response.headers.code}: ${response.headers.description}`);
}
return JSONCodec<R>().decode(response.data);
}
}
// Usage examples
const nc = await connect();
// Calculator service client
const calc = new ServiceProxy(nc, "calc");
try {
const sum = await calc.callEndpoint<{a: number, b: number}, {sum: number}>(
"add",
{ a: 10, b: 20 }
);
console.log(`10 + 20 = ${sum.sum}`);
const quotient = await calc.callEndpoint<{dividend: number, divisor: number}, {quotient: number}>(
"divide",
{ dividend: 100, divisor: 5 }
);
console.log(`100 / 5 = ${quotient.quotient}`);
} catch (err) {
console.error("Calculator service error:", err.message);
}
// User service client with authentication
const users = new ServiceProxy(nc, "users");
try {
const user = await users.callWithAuth<{userId: string}, {id: string, name: string, email: string}>(
"get",
{ userId: "user123" },
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..." // JWT token
);
console.log(`User: ${user.name} (${user.email})`);
} catch (err) {
console.error("User service error:", err.message);
}Monitor service health and performance metrics.
// Service monitoring dashboard
class ServiceMonitor {
private client: ServiceClient;
constructor(nc: NatsConnection) {
this.client = nc.services.client();
}
async getServiceInventory(): Promise<ServiceInfo[]> {
const services: ServiceInfo[] = [];
const serviceInfo = await this.client.info();
for await (const info of serviceInfo) {
services.push(info);
}
return services;
}
async getServiceMetrics(): Promise<Map<string, ServiceStats>> {
const metrics = new Map<string, ServiceStats>();
const stats = await this.client.stats();
for await (const stat of stats) {
metrics.set(`${stat.name}@${stat.version}`, stat);
}
return metrics;
}
async monitorServiceHealth(intervalMs = 10000): Promise<void> {
console.log("Starting service health monitoring...");
setInterval(async () => {
try {
const services = await this.getServiceInventory();
const metrics = await this.getServiceMetrics();
console.log(`\n=== Service Health Report (${new Date().toISOString()}) ===`);
for (const service of services) {
const key = `${service.name}@${service.version}`;
const stats = metrics.get(key);
console.log(`\n${service.name} v${service.version} (${service.id})`);
console.log(` Description: ${service.description || 'N/A'}`);
console.log(` Endpoints: ${service.endpoints.length}`);
if (stats) {
console.log(` Uptime: ${this.formatUptime(stats.started)}`);
let totalRequests = 0;
let totalErrors = 0;
for (const endpoint of stats.endpoints) {
totalRequests += endpoint.num_requests;
totalErrors += endpoint.num_errors;
const errorRate = endpoint.num_requests > 0
? (endpoint.num_errors / endpoint.num_requests * 100).toFixed(2)
: "0.00";
console.log(` ${endpoint.name}: ${endpoint.num_requests} reqs, ${errorRate}% errors, ${endpoint.processing_time}ms avg`);
}
const overallErrorRate = totalRequests > 0
? (totalErrors / totalRequests * 100).toFixed(2)
: "0.00";
console.log(` Overall: ${totalRequests} requests, ${overallErrorRate}% error rate`);
} else {
console.log(` Status: No statistics available`);
}
}
} catch (err) {
console.error("Health monitoring error:", err);
}
}, intervalMs);
}
private formatUptime(started: Date): string {
const uptimeMs = Date.now() - started.getTime();
const seconds = Math.floor(uptimeMs / 1000);
if (seconds < 60) return `${seconds}s`;
if (seconds < 3600) return `${Math.floor(seconds / 60)}m ${seconds % 60}s`;
const hours = Math.floor(seconds / 3600);
const mins = Math.floor((seconds % 3600) / 60);
return `${hours}h ${mins}m`;
}
async alertOnHighErrorRate(threshold = 5.0): Promise<void> {
const metrics = await this.getServiceMetrics();
for (const [serviceKey, stats] of metrics) {
for (const endpoint of stats.endpoints) {
if (endpoint.num_requests > 0) {
const errorRate = (endpoint.num_errors / endpoint.num_requests) * 100;
if (errorRate > threshold) {
console.error(`🚨 HIGH ERROR RATE ALERT: ${serviceKey}/${endpoint.name}`);
console.error(` Error rate: ${errorRate.toFixed(2)}% (${endpoint.num_errors}/${endpoint.num_requests})`);
console.error(` Last error: ${endpoint.last_error || 'N/A'}`);
}
}
}
}
}
}
// Usage
const nc = await connect();
const monitor = new ServiceMonitor(nc);
// Get current service inventory
const services = await monitor.getServiceInventory();
console.log(`Discovered ${services.length} services`);
// Start continuous monitoring
await monitor.monitorServiceHealth(15000); // Every 15 seconds
// Check for high error rates
setInterval(() => monitor.alertOnHighErrorRate(10.0), 60000); // Every minute, 10% thresholdenum ServiceVerb {
PING = "PING",
STATS = "STATS",
INFO = "INFO",
SCHEMA = "SCHEMA"
}
enum ServiceResponseType {
Singleton = "io.nats.micro.v1.ping_response",
Stream = "io.nats.micro.v1.stats_response"
}
interface RequestManyOptions {
strategy: RequestStrategy;
maxWait: number;
headers?: MsgHdrs;
maxMessages?: number;
noMux?: boolean;
jitter?: number;
}
interface ServiceMetadata {
[key: string]: string;
}
interface ServiceGroup {
addEndpoint(name: string, opts: EndpointOptions, handler: ServiceHandler): QueuedIterator<ServiceMsg>;
addGroup(name: string): ServiceGroup;
}
interface EndpointOptions {
subject?: string;
queue_group?: string;
metadata?: ServiceMetadata;
schema?: {
request?: Record<string, unknown>;
response?: Record<string, unknown>;
};
}
interface NamedEndpointStats {
name: string;
subject: string;
num_requests: number;
num_errors: number;
last_error?: string;
processing_time: number;
queue_time: number;
data?: Record<string, unknown>;
}
interface ServiceError extends NatsError {
/** Service error code */
service_error_code?: number;
/** Service error message */
service_error_message?: string;
}
const ServiceErrorHeader = "Nats-Service-Error";
const ServiceErrorCodeHeader = "Nats-Service-Error-Code";
interface QueuedIterator<T> {
next(): Promise<IteratorResult<T>>;
stop(): void;
[Symbol.asyncIterator](): AsyncIterableIterator<T>;
}