CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-nats

Node.js client for NATS, a lightweight, high-performance cloud native messaging system

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

messaging.mddocs/

Core Messaging

NATS core messaging provides publish/subscribe functionality with wildcards, queues, request/reply patterns, and message headers for building distributed applications.

Capabilities

Publishing Messages

Send messages to subjects with optional payload and headers.

/**
 * Publish a message to a subject
 * @param subject - Target subject (can include wildcards for routing)
 * @param payload - Message data (string or Uint8Array)
 * @param options - Optional publish configuration
 */
publish(subject: string, payload?: Payload, options?: PublishOptions): void;

/**
 * Publish using message object with subject, data, headers, and reply
 * @param msg - Message object to publish
 */
publishMessage(msg: Msg): void;

interface PublishOptions {
  /** Reply subject for response */
  reply?: string;
  /** Message headers */
  headers?: MsgHdrs;
}

type Payload = Uint8Array | string;

Usage Examples:

import { connect, StringCodec, headers } from "nats";

const nc = await connect();
const sc = StringCodec();

// Simple publish
nc.publish("news.updates", sc.encode("Breaking news!"));

// Publish with reply subject
nc.publish("weather.request", sc.encode("NYC"), { 
  reply: "weather.reply" 
});

// Publish with headers
const hdrs = headers();
hdrs.set("content-type", "application/json");
hdrs.set("priority", "high");

nc.publish("api.user.create", sc.encode(JSON.stringify(userData)), {
  headers: hdrs
});

// Publish binary data
const binaryData = new Uint8Array([1, 2, 3, 4, 5]);
nc.publish("binary.data", binaryData);

Subscribing to Messages

Create subscriptions to receive messages from subjects with wildcard support and queue groups.

/**
 * Subscribe to a subject and receive messages
 * @param subject - Subject pattern (supports wildcards * and >)
 * @param opts - Subscription options
 * @returns Subscription instance for managing the subscription
 */
subscribe(subject: string, opts?: SubscriptionOptions): Subscription;

interface SubscriptionOptions {
  /** Queue group name for load balancing */
  queue?: string;
  /** Maximum messages before auto-unsubscribe */
  max?: number;
  /** Timeout in milliseconds for first message */
  timeout?: number;
  /** Message callback function (alternative to async iteration) */
  callback?: (err: NatsError | null, msg: Msg) => void;
}

interface Subscription {
  /** Unsubscribe from receiving messages */
  unsubscribe(max?: number): void;
  /** Drain subscription (process pending, then close) */
  drain(): Promise<void>;
  /** Check if subscription is closed */
  isClosed(): boolean;
  /** Get subscription subject */
  getSubject(): string;
  /** Get total messages received */
  getReceived(): number;
  /** Get messages processed by callback/iterator */
  getProcessed(): number;
  /** Get pending messages in queue */
  getPending(): number;
  /** Get subscription ID */
  getID(): number;
  /** Get max messages setting */
  getMax(): number | undefined;

  /** Async iterator for processing messages */
  [Symbol.asyncIterator](): AsyncIterableIterator<Msg>;
}

Usage Examples:

import { connect, StringCodec } from "nats";

const nc = await connect();
const sc = StringCodec();

// Basic subscription with async iteration
const sub = nc.subscribe("news.*");
(async () => {
  for await (const m of sub) {
    console.log(`Subject: ${m.subject}, Data: ${sc.decode(m.data)}`);
  }
})();

// Queue subscription for load balancing
const queueSub = nc.subscribe("work.jobs", { queue: "workers" });
(async () => {
  for await (const m of queueSub) {
    console.log(`Processing job: ${sc.decode(m.data)}`);
    // Simulate work
    await new Promise(resolve => setTimeout(resolve, 1000));
  }
})();

// Subscription with callback
const callbackSub = nc.subscribe("alerts.*", {
  callback: (err, msg) => {
    if (err) {
      console.error("Subscription error:", err);
      return;
    }
    console.log(`Alert: ${sc.decode(msg.data)}`);
  }
});

// Limited subscription (auto-unsubscribe after 10 messages)
const limitedSub = nc.subscribe("limited.messages", { max: 10 });

// Wildcard subscriptions
const allNews = nc.subscribe("news.*");        // news.sports, news.weather
const everything = nc.subscribe(">");          // all subjects
const userEvents = nc.subscribe("user.*.event"); // user.123.event, user.456.event

Request/Reply Pattern

Synchronous and asynchronous request/reply messaging for RPC-style communication.

/**
 * Send request and wait for single response
 * @param subject - Request subject
 * @param payload - Request data
 * @param opts - Request options including timeout
 * @returns Promise resolving to response message
 */
request(subject: string, payload?: Payload, opts?: RequestOptions): Promise<Msg>;

/**
 * Send request expecting multiple responses
 * @param subject - Request subject  
 * @param payload - Request data
 * @param opts - Request options including strategy and limits
 * @returns Promise resolving to async iterable of responses
 */
requestMany(
  subject: string, 
  payload?: Payload, 
  opts?: Partial<RequestManyOptions>
): Promise<AsyncIterable<Msg>>;

/**
 * Respond to a message using its reply subject
 * @param msg - Original message to respond to
 * @returns True if response was sent
 */
respondMessage(msg: Msg): boolean;

interface RequestOptions {
  /** Request timeout in milliseconds */
  timeout: number;
  /** Request headers */
  headers?: MsgHdrs;
  /** Use dedicated subscription instead of shared mux */
  noMux?: boolean;
  /** Custom reply subject (requires noMux) */
  reply?: string;
}

interface RequestManyOptions {
  /** Strategy for determining when to stop collecting responses */
  strategy: RequestStrategy;
  /** Maximum time to wait for responses */
  maxWait: number;
  /** Request headers */
  headers?: MsgHdrs;
  /** Maximum number of responses to collect */
  maxMessages?: number;
  /** Use dedicated subscription instead of shared mux */
  noMux?: boolean;
  /** Jitter for timer-based strategies */
  jitter?: number;
}

enum RequestStrategy {
  /** Stop after specified time */
  Timer = "timer",
  /** Stop after specified message count */
  Count = "count", 
  /** Stop after time with random jitter */
  JitterTimer = "jitterTimer",
  /** Stop when sentinel message received */
  SentinelMsg = "sentinelMsg"
}

Usage Examples:

import { connect, StringCodec, RequestStrategy } from "nats";

const nc = await connect();
const sc = StringCodec();

// Simple request/reply
try {
  const response = await nc.request("time", sc.encode(""), { timeout: 1000 });
  console.log(`Current time: ${sc.decode(response.data)}`);
} catch (err) {
  console.error("Request failed:", err);
}

// Request with multiple responses
const responses = await nc.requestMany("services.ping", sc.encode(""), {
  maxWait: 2000,
  strategy: RequestStrategy.Timer
});

for await (const response of responses) {
  console.log(`Service: ${sc.decode(response.data)}`);
}

// Service responder
const serviceSub = nc.subscribe("time");
(async () => {
  for await (const m of serviceSub) {
    const currentTime = new Date().toISOString();
    m.respond(sc.encode(currentTime));
  }
})();

// Service with error handling
const calcSub = nc.subscribe("math.divide");
(async () => {
  for await (const m of calcSub) {
    try {
      const [a, b] = JSON.parse(sc.decode(m.data));
      if (b === 0) {
        m.respond(sc.encode(JSON.stringify({ error: "Division by zero" })));
      } else {
        m.respond(sc.encode(JSON.stringify({ result: a / b })));
      }
    } catch (err) {
      m.respond(sc.encode(JSON.stringify({ error: "Invalid input" })));
    }
  }
})();

Message Handling

Message structure and utilities for processing received messages.

interface Msg {
  /** Message subject */
  subject: string;
  /** Message data as bytes */
  data: Uint8Array;
  /** Reply subject if expecting response */
  reply?: string;
  /** Message headers if present */
  headers?: MsgHdrs;
  /** Sequence ID for JetStream messages */
  seq?: number;
  /** 
   * Respond to this message
   * @param data - Response payload
   * @param opts - Response options
   * @returns True if response sent successfully
   */
  respond(data?: Payload, opts?: PublishOptions): boolean;
}

interface MsgHdrs extends Iterable<[string, string[]]> {
  /** True if message contains error status */
  hasError: boolean;
  /** HTTP-style status text */
  status: string;
  /** HTTP-style status code */
  code: number;
  /** Status description */
  description: string;

  /** Get header value */
  get(k: string, match?: Match): string;
  /** Set header value */
  set(k: string, v: string, match?: Match): void;
  /** Append header value */
  append(k: string, v: string, match?: Match): void;
  /** Check if header exists */
  has(k: string, match?: Match): boolean;
  /** Get all header keys */
  keys(): string[];
  /** Get all values for header */
  values(k: string, match?: Match): string[];
  /** Delete header */
  delete(k: string, match?: Match): void;
  /** Get last value for header */
  last(k: string, match?: Match): string;
}

enum Match {
  /** Exact case-sensitive match */
  Exact = 0,
  /** Canonical MIME header format */
  CanonicalMIME,
  /** Case-insensitive match */
  IgnoreCase
}

/** Create empty message headers */
function headers(): MsgHdrs;

/** Create canonical MIME header key */
function canonicalMIMEHeaderKey(key: string): string;

Usage Examples:

import { connect, StringCodec, headers, JSONCodec } from "nats";

const nc = await connect();
const sc = StringCodec();
const jc = JSONCodec();

// Process messages with headers
const sub = nc.subscribe("api.requests");
(async () => {
  for await (const m of sub) {
    // Check for headers
    if (m.headers) {
      const contentType = m.headers.get("content-type");
      const userId = m.headers.get("user-id");
      
      console.log(`Content-Type: ${contentType}, User: ${userId}`);
      
      // Check for error status
      if (m.headers.hasError) {
        console.error(`Error: ${m.headers.code} ${m.headers.description}`);
        continue;
      }
    }
    
    // Process message data
    const data = sc.decode(m.data);
    console.log(`Received: ${data} on subject: ${m.subject}`);
    
    // Respond if reply subject provided
    if (m.reply) {
      const response = { status: "processed", timestamp: Date.now() };
      m.respond(jc.encode(response));
    }
  }
})();

// Send message with headers
const hdrs = headers();
hdrs.set("content-type", "application/json");
hdrs.set("user-id", "12345");
hdrs.set("priority", "high");

nc.publish("api.requests", jc.encode({ action: "create", data: {} }), {
  headers: hdrs,
  reply: "api.responses"
});

Codecs

Built-in codecs for encoding/decoding message payloads.

interface Codec<T> {
  encode(d: T): Uint8Array;
  decode(a: Uint8Array): T;
}

/** String encoding/decoding codec */
const StringCodec: Codec<string>;

/** JSON encoding/decoding codec with type safety */
function JSONCodec<T = unknown>(): Codec<T>;

Usage Examples:

import { connect, StringCodec, JSONCodec } from "nats";

const nc = await connect();
const sc = StringCodec();
const jc = JSONCodec<{ message: string; timestamp: number }>();

// String codec
const message = "Hello NATS!";
nc.publish("text.message", sc.encode(message));

const sub = nc.subscribe("text.message");
(async () => {
  for await (const m of sub) {
    const text = sc.decode(m.data);
    console.log(`Received text: ${text}`);
  }
})();

// JSON codec with types
const jsonData = { message: "Hello", timestamp: Date.now() };
nc.publish("json.message", jc.encode(jsonData));

const jsonSub = nc.subscribe("json.message");
(async () => {
  for await (const m of jsonSub) {
    const data = jc.decode(m.data); // Typed as { message: string; timestamp: number }
    console.log(`Message: ${data.message}, Time: ${data.timestamp}`);
  }
})();

Utilities

Helper functions for common messaging operations.

/** Generate unique inbox subject for replies */
function createInbox(): string;

/** Empty payload constant */
const Empty: Uint8Array;

/** Convert async iterator to sync iterator */
function syncIterator<T>(iterator: AsyncIterable<T>): SyncIterator<T>;

interface SyncIterator<T> {
  next(): T | null;
  stop(): void;
}

Usage Examples:

import { connect, createInbox, Empty, syncIterator } from "nats";

const nc = await connect();

// Generate unique reply subjects
const replySubject = createInbox();
console.log(replySubject); // "_INBOX.abcd1234..."

// Publish empty message
nc.publish("ping", Empty);

// Use sync iterator for non-async contexts
const sub = nc.subscribe("data.stream", { max: 100 });
const iter = syncIterator(sub);

// Poll for messages synchronously
setInterval(() => {
  const msg = iter.next();
  if (msg) {
    console.log(`Got message: ${msg.subject}`);
  }
}, 100);

Error Handling

class NatsError extends Error {
  name: string;
  message: string;
  code: string;
  chainedError?: Error;
  api_error?: ApiError;
  permissionContext?: { operation: string; subject: string; queue?: string };

  /** Check if error is authentication related */
  isAuthError(): boolean;
  /** Check if error is permission related */
  isPermissionError(): boolean;
  /** Check if error is protocol related */
  isProtocolError(): boolean;
  /** Check if error is authentication timeout */
  isAuthTimeout(): boolean;
  /** Check if error is JetStream related */
  isJetStreamError(): boolean;
  /** Get JetStream API error details */
  jsError(): ApiError | null;
}

interface ApiError {
  /** HTTP-style error code */
  code: number;
  /** Human-readable description */
  description: string;
  /** NATS-specific error code */
  err_code?: number;
}

enum ErrorCode {
  // Connection errors
  ConnectionClosed = "CONNECTION_CLOSED",
  ConnectionTimeout = "CONNECTION_TIMEOUT",
  ConnectionRefused = "CONNECTION_REFUSED",
  
  // Authentication errors
  BadAuthentication = "BAD_AUTHENTICATION",
  AuthorizationViolation = "AUTHORIZATION_VIOLATION",
  PermissionsViolation = "PERMISSIONS_VIOLATION",
  
  // Protocol errors
  BadSubject = "BAD_SUBJECT",
  BadPayload = "BAD_PAYLOAD",
  MaxPayloadExceeded = "MAX_PAYLOAD_EXCEEDED",
  
  // Request errors
  NoResponders = "503",
  Timeout = "TIMEOUT",
  RequestError = "REQUEST_ERROR"
}

/** Check if error is NatsError instance */
function isNatsError(err: NatsError | Error): err is NatsError;

docs

connection.md

index.md

jetstream.md

kv-store.md

messaging.md

object-store.md

services.md

tile.json