Node.js client for NATS, a lightweight, high-performance cloud native messaging system
npx @tessl/cli install tessl/npm-nats@2.29.0NATS is a high-performance, lightweight cloud-native messaging system for distributed applications and microservices. The Node.js client provides full TypeScript support with async/await patterns, automatic reconnection handling, and comprehensive features including core pub/sub messaging, JetStream for persistence and streaming, Key-Value stores, Object stores, and Service discovery APIs.
npm install natsimport { connect, StringCodec, JSONCodec } from "nats";For CommonJS:
const { connect, StringCodec, JSONCodec } = require("nats");import { connect, StringCodec } from "nats";
// Connect to NATS server
const nc = await connect({ servers: ["nats://localhost:4222"] });
const sc = StringCodec();
// Simple publish/subscribe
const sub = nc.subscribe("news.updates");
(async () => {
for await (const m of sub) {
console.log(`Received: ${sc.decode(m.data)}`);
}
})();
// Publish a message
nc.publish("news.updates", sc.encode("Hello NATS!"));
// Request/reply pattern
const response = await nc.request("time", sc.encode(""));
console.log(`Time: ${sc.decode(response.data)}`);
// Clean shutdown
await nc.drain();NATS is built around several key components:
NatsConnection interface providing pub/sub messaging with wildcards, queues, and request/reply patternsCore NATS connection functionality including authentication, TLS, reconnection handling, and connection lifecycle management.
function connect(opts?: ConnectionOptions): Promise<NatsConnection>;
interface ConnectionOptions {
servers?: string[] | string;
port?: number;
user?: string;
pass?: string;
token?: string;
authenticator?: Authenticator | Authenticator[];
reconnect?: boolean;
maxReconnectAttempts?: number;
reconnectTimeWait?: number;
timeout?: number;
pingInterval?: number;
maxPingOut?: number;
name?: string;
verbose?: boolean;
pedantic?: boolean;
tls?: TlsOptions | null;
ignoreClusterUpdates?: boolean;
inboxPrefix?: string;
noEcho?: boolean;
debug?: boolean;
noRandomize?: boolean;
waitOnFirstConnect?: boolean;
}Essential pub/sub messaging operations including publishing, subscribing, request/reply patterns, and message handling with full wildcard support.
interface NatsConnection {
publish(subject: string, payload?: Payload, options?: PublishOptions): void;
subscribe(subject: string, opts?: SubscriptionOptions): Subscription;
request(subject: string, payload?: Payload, opts?: RequestOptions): Promise<Msg>;
requestMany(subject: string, payload?: Payload, opts?: Partial<RequestManyOptions>): Promise<AsyncIterable<Msg>>;
}
interface Msg {
subject: string;
data: Uint8Array;
reply?: string;
headers?: MsgHdrs;
respond(data?: Payload, opts?: PublishOptions): boolean;
}Persistent messaging system providing streams for message storage, consumers for reliable delivery, and advanced features like deduplication and retention policies.
interface JetStreamClient {
publish(subj: string, payload?: Payload, options?: Partial<JetStreamPublishOptions>): Promise<PubAck>;
subscribe(subject: string, opts?: Partial<ConsumerOptsBuilder>): Promise<JetStreamSubscription>;
pullSubscribe(subject: string, opts?: Partial<ConsumerOptsBuilder>): Promise<JetStreamPullSubscription>;
}
interface JetStreamManager {
streams: StreamAPI;
consumers: ConsumerAPI;
getAccountInfo(): Promise<JetStreamAccountStats>;
}High-level key-value abstraction with history tracking, watch capabilities, and automatic conflict resolution built on JetStream streams.
interface KV {
get(key: string): Promise<KvEntry | null>;
put(key: string, value: Uint8Array, opts?: Partial<KvPutOptions>): Promise<number>;
delete(key: string, opts?: Partial<KvDeleteOptions>): Promise<void>;
watch(opts?: Partial<KvWatchOptions>): Promise<QueuedIterator<KvEntry>>;
history(key: string): Promise<QueuedIterator<KvEntry>>;
}File-like storage system for larger payloads with metadata, chunking, and linking capabilities for complex data structures.
interface ObjectStore {
put(meta: ObjectStoreMeta, payload: AsyncIterable<Uint8Array>): Promise<ObjectInfo>;
get(name: string): Promise<ObjectResult | null>;
info(name: string): Promise<ObjectInfo | null>;
list(): Promise<QueuedIterator<ObjectInfo>>;
delete(name: string): Promise<boolean>;
}Service discovery and monitoring framework enabling service registration, health checks, and observability for NATS-based microservices.
interface ServicesAPI {
add(config: ServiceConfig): Promise<Service>;
client(opts?: RequestManyOptions, prefix?: string): ServiceClient;
}
interface ServiceClient {
ping(name?: string, id?: string): Promise<QueuedIterator<ServiceIdentity>>;
stats(name?: string, id?: string): Promise<QueuedIterator<ServiceStats>>;
info(name?: string, id?: string): Promise<QueuedIterator<ServiceInfo>>;
}type Payload = Uint8Array | string;
interface Msg {
subject: string;
data: Uint8Array;
reply?: string;
headers?: MsgHdrs;
respond(data?: Payload, opts?: PublishOptions): boolean;
}
interface Subscription {
unsubscribe(max?: number): void;
drain(): Promise<void>;
isClosed(): boolean;
getSubject(): string;
getReceived(): number;
getProcessed(): number;
getPending(): number;
getID(): number;
getMax(): number | undefined;
}
interface NatsError extends Error {
name: string;
message: string;
code: string;
chainedError?: Error;
api_error?: ApiError;
}
interface Stats {
inBytes: number;
outBytes: number;
inMsgs: number;
outMsgs: number;
}
interface ServerInfo {
server_id: string;
server_name: string;
version: string;
proto: number;
host: string;
port: number;
headers?: boolean;
max_payload: number;
jetstream?: boolean;
auth_required?: boolean;
tls_required?: boolean;
tls_available?: boolean;
}