Node.js client for NATS, a lightweight, high-performance cloud native messaging system
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
NATS core messaging provides publish/subscribe functionality with wildcards, queues, request/reply patterns, and message headers for building distributed applications.
Send messages to subjects with optional payload and headers.
/**
* Publish a message to a subject
* @param subject - Target subject (can include wildcards for routing)
* @param payload - Message data (string or Uint8Array)
* @param options - Optional publish configuration
*/
publish(subject: string, payload?: Payload, options?: PublishOptions): void;
/**
* Publish using message object with subject, data, headers, and reply
* @param msg - Message object to publish
*/
publishMessage(msg: Msg): void;
interface PublishOptions {
/** Reply subject for response */
reply?: string;
/** Message headers */
headers?: MsgHdrs;
}
type Payload = Uint8Array | string;Usage Examples:
import { connect, StringCodec, headers } from "nats";
const nc = await connect();
const sc = StringCodec();
// Simple publish
nc.publish("news.updates", sc.encode("Breaking news!"));
// Publish with reply subject
nc.publish("weather.request", sc.encode("NYC"), {
reply: "weather.reply"
});
// Publish with headers
const hdrs = headers();
hdrs.set("content-type", "application/json");
hdrs.set("priority", "high");
nc.publish("api.user.create", sc.encode(JSON.stringify(userData)), {
headers: hdrs
});
// Publish binary data
const binaryData = new Uint8Array([1, 2, 3, 4, 5]);
nc.publish("binary.data", binaryData);Create subscriptions to receive messages from subjects with wildcard support and queue groups.
/**
* Subscribe to a subject and receive messages
* @param subject - Subject pattern (supports wildcards * and >)
* @param opts - Subscription options
* @returns Subscription instance for managing the subscription
*/
subscribe(subject: string, opts?: SubscriptionOptions): Subscription;
interface SubscriptionOptions {
/** Queue group name for load balancing */
queue?: string;
/** Maximum messages before auto-unsubscribe */
max?: number;
/** Timeout in milliseconds for first message */
timeout?: number;
/** Message callback function (alternative to async iteration) */
callback?: (err: NatsError | null, msg: Msg) => void;
}
interface Subscription {
/** Unsubscribe from receiving messages */
unsubscribe(max?: number): void;
/** Drain subscription (process pending, then close) */
drain(): Promise<void>;
/** Check if subscription is closed */
isClosed(): boolean;
/** Get subscription subject */
getSubject(): string;
/** Get total messages received */
getReceived(): number;
/** Get messages processed by callback/iterator */
getProcessed(): number;
/** Get pending messages in queue */
getPending(): number;
/** Get subscription ID */
getID(): number;
/** Get max messages setting */
getMax(): number | undefined;
/** Async iterator for processing messages */
[Symbol.asyncIterator](): AsyncIterableIterator<Msg>;
}Usage Examples:
import { connect, StringCodec } from "nats";
const nc = await connect();
const sc = StringCodec();
// Basic subscription with async iteration
const sub = nc.subscribe("news.*");
(async () => {
for await (const m of sub) {
console.log(`Subject: ${m.subject}, Data: ${sc.decode(m.data)}`);
}
})();
// Queue subscription for load balancing
const queueSub = nc.subscribe("work.jobs", { queue: "workers" });
(async () => {
for await (const m of queueSub) {
console.log(`Processing job: ${sc.decode(m.data)}`);
// Simulate work
await new Promise(resolve => setTimeout(resolve, 1000));
}
})();
// Subscription with callback
const callbackSub = nc.subscribe("alerts.*", {
callback: (err, msg) => {
if (err) {
console.error("Subscription error:", err);
return;
}
console.log(`Alert: ${sc.decode(msg.data)}`);
}
});
// Limited subscription (auto-unsubscribe after 10 messages)
const limitedSub = nc.subscribe("limited.messages", { max: 10 });
// Wildcard subscriptions
const allNews = nc.subscribe("news.*"); // news.sports, news.weather
const everything = nc.subscribe(">"); // all subjects
const userEvents = nc.subscribe("user.*.event"); // user.123.event, user.456.eventSynchronous and asynchronous request/reply messaging for RPC-style communication.
/**
* Send request and wait for single response
* @param subject - Request subject
* @param payload - Request data
* @param opts - Request options including timeout
* @returns Promise resolving to response message
*/
request(subject: string, payload?: Payload, opts?: RequestOptions): Promise<Msg>;
/**
* Send request expecting multiple responses
* @param subject - Request subject
* @param payload - Request data
* @param opts - Request options including strategy and limits
* @returns Promise resolving to async iterable of responses
*/
requestMany(
subject: string,
payload?: Payload,
opts?: Partial<RequestManyOptions>
): Promise<AsyncIterable<Msg>>;
/**
* Respond to a message using its reply subject
* @param msg - Original message to respond to
* @returns True if response was sent
*/
respondMessage(msg: Msg): boolean;
interface RequestOptions {
/** Request timeout in milliseconds */
timeout: number;
/** Request headers */
headers?: MsgHdrs;
/** Use dedicated subscription instead of shared mux */
noMux?: boolean;
/** Custom reply subject (requires noMux) */
reply?: string;
}
interface RequestManyOptions {
/** Strategy for determining when to stop collecting responses */
strategy: RequestStrategy;
/** Maximum time to wait for responses */
maxWait: number;
/** Request headers */
headers?: MsgHdrs;
/** Maximum number of responses to collect */
maxMessages?: number;
/** Use dedicated subscription instead of shared mux */
noMux?: boolean;
/** Jitter for timer-based strategies */
jitter?: number;
}
enum RequestStrategy {
/** Stop after specified time */
Timer = "timer",
/** Stop after specified message count */
Count = "count",
/** Stop after time with random jitter */
JitterTimer = "jitterTimer",
/** Stop when sentinel message received */
SentinelMsg = "sentinelMsg"
}Usage Examples:
import { connect, StringCodec, RequestStrategy } from "nats";
const nc = await connect();
const sc = StringCodec();
// Simple request/reply
try {
const response = await nc.request("time", sc.encode(""), { timeout: 1000 });
console.log(`Current time: ${sc.decode(response.data)}`);
} catch (err) {
console.error("Request failed:", err);
}
// Request with multiple responses
const responses = await nc.requestMany("services.ping", sc.encode(""), {
maxWait: 2000,
strategy: RequestStrategy.Timer
});
for await (const response of responses) {
console.log(`Service: ${sc.decode(response.data)}`);
}
// Service responder
const serviceSub = nc.subscribe("time");
(async () => {
for await (const m of serviceSub) {
const currentTime = new Date().toISOString();
m.respond(sc.encode(currentTime));
}
})();
// Service with error handling
const calcSub = nc.subscribe("math.divide");
(async () => {
for await (const m of calcSub) {
try {
const [a, b] = JSON.parse(sc.decode(m.data));
if (b === 0) {
m.respond(sc.encode(JSON.stringify({ error: "Division by zero" })));
} else {
m.respond(sc.encode(JSON.stringify({ result: a / b })));
}
} catch (err) {
m.respond(sc.encode(JSON.stringify({ error: "Invalid input" })));
}
}
})();Message structure and utilities for processing received messages.
interface Msg {
/** Message subject */
subject: string;
/** Message data as bytes */
data: Uint8Array;
/** Reply subject if expecting response */
reply?: string;
/** Message headers if present */
headers?: MsgHdrs;
/** Sequence ID for JetStream messages */
seq?: number;
/**
* Respond to this message
* @param data - Response payload
* @param opts - Response options
* @returns True if response sent successfully
*/
respond(data?: Payload, opts?: PublishOptions): boolean;
}
interface MsgHdrs extends Iterable<[string, string[]]> {
/** True if message contains error status */
hasError: boolean;
/** HTTP-style status text */
status: string;
/** HTTP-style status code */
code: number;
/** Status description */
description: string;
/** Get header value */
get(k: string, match?: Match): string;
/** Set header value */
set(k: string, v: string, match?: Match): void;
/** Append header value */
append(k: string, v: string, match?: Match): void;
/** Check if header exists */
has(k: string, match?: Match): boolean;
/** Get all header keys */
keys(): string[];
/** Get all values for header */
values(k: string, match?: Match): string[];
/** Delete header */
delete(k: string, match?: Match): void;
/** Get last value for header */
last(k: string, match?: Match): string;
}
enum Match {
/** Exact case-sensitive match */
Exact = 0,
/** Canonical MIME header format */
CanonicalMIME,
/** Case-insensitive match */
IgnoreCase
}
/** Create empty message headers */
function headers(): MsgHdrs;
/** Create canonical MIME header key */
function canonicalMIMEHeaderKey(key: string): string;Usage Examples:
import { connect, StringCodec, headers, JSONCodec } from "nats";
const nc = await connect();
const sc = StringCodec();
const jc = JSONCodec();
// Process messages with headers
const sub = nc.subscribe("api.requests");
(async () => {
for await (const m of sub) {
// Check for headers
if (m.headers) {
const contentType = m.headers.get("content-type");
const userId = m.headers.get("user-id");
console.log(`Content-Type: ${contentType}, User: ${userId}`);
// Check for error status
if (m.headers.hasError) {
console.error(`Error: ${m.headers.code} ${m.headers.description}`);
continue;
}
}
// Process message data
const data = sc.decode(m.data);
console.log(`Received: ${data} on subject: ${m.subject}`);
// Respond if reply subject provided
if (m.reply) {
const response = { status: "processed", timestamp: Date.now() };
m.respond(jc.encode(response));
}
}
})();
// Send message with headers
const hdrs = headers();
hdrs.set("content-type", "application/json");
hdrs.set("user-id", "12345");
hdrs.set("priority", "high");
nc.publish("api.requests", jc.encode({ action: "create", data: {} }), {
headers: hdrs,
reply: "api.responses"
});Built-in codecs for encoding/decoding message payloads.
interface Codec<T> {
encode(d: T): Uint8Array;
decode(a: Uint8Array): T;
}
/** String encoding/decoding codec */
const StringCodec: Codec<string>;
/** JSON encoding/decoding codec with type safety */
function JSONCodec<T = unknown>(): Codec<T>;Usage Examples:
import { connect, StringCodec, JSONCodec } from "nats";
const nc = await connect();
const sc = StringCodec();
const jc = JSONCodec<{ message: string; timestamp: number }>();
// String codec
const message = "Hello NATS!";
nc.publish("text.message", sc.encode(message));
const sub = nc.subscribe("text.message");
(async () => {
for await (const m of sub) {
const text = sc.decode(m.data);
console.log(`Received text: ${text}`);
}
})();
// JSON codec with types
const jsonData = { message: "Hello", timestamp: Date.now() };
nc.publish("json.message", jc.encode(jsonData));
const jsonSub = nc.subscribe("json.message");
(async () => {
for await (const m of jsonSub) {
const data = jc.decode(m.data); // Typed as { message: string; timestamp: number }
console.log(`Message: ${data.message}, Time: ${data.timestamp}`);
}
})();Helper functions for common messaging operations.
/** Generate unique inbox subject for replies */
function createInbox(): string;
/** Empty payload constant */
const Empty: Uint8Array;
/** Convert async iterator to sync iterator */
function syncIterator<T>(iterator: AsyncIterable<T>): SyncIterator<T>;
interface SyncIterator<T> {
next(): T | null;
stop(): void;
}Usage Examples:
import { connect, createInbox, Empty, syncIterator } from "nats";
const nc = await connect();
// Generate unique reply subjects
const replySubject = createInbox();
console.log(replySubject); // "_INBOX.abcd1234..."
// Publish empty message
nc.publish("ping", Empty);
// Use sync iterator for non-async contexts
const sub = nc.subscribe("data.stream", { max: 100 });
const iter = syncIterator(sub);
// Poll for messages synchronously
setInterval(() => {
const msg = iter.next();
if (msg) {
console.log(`Got message: ${msg.subject}`);
}
}, 100);class NatsError extends Error {
name: string;
message: string;
code: string;
chainedError?: Error;
api_error?: ApiError;
permissionContext?: { operation: string; subject: string; queue?: string };
/** Check if error is authentication related */
isAuthError(): boolean;
/** Check if error is permission related */
isPermissionError(): boolean;
/** Check if error is protocol related */
isProtocolError(): boolean;
/** Check if error is authentication timeout */
isAuthTimeout(): boolean;
/** Check if error is JetStream related */
isJetStreamError(): boolean;
/** Get JetStream API error details */
jsError(): ApiError | null;
}
interface ApiError {
/** HTTP-style error code */
code: number;
/** Human-readable description */
description: string;
/** NATS-specific error code */
err_code?: number;
}
enum ErrorCode {
// Connection errors
ConnectionClosed = "CONNECTION_CLOSED",
ConnectionTimeout = "CONNECTION_TIMEOUT",
ConnectionRefused = "CONNECTION_REFUSED",
// Authentication errors
BadAuthentication = "BAD_AUTHENTICATION",
AuthorizationViolation = "AUTHORIZATION_VIOLATION",
PermissionsViolation = "PERMISSIONS_VIOLATION",
// Protocol errors
BadSubject = "BAD_SUBJECT",
BadPayload = "BAD_PAYLOAD",
MaxPayloadExceeded = "MAX_PAYLOAD_EXCEEDED",
// Request errors
NoResponders = "503",
Timeout = "TIMEOUT",
RequestError = "REQUEST_ERROR"
}
/** Check if error is NatsError instance */
function isNatsError(err: NatsError | Error): err is NatsError;