CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-nats

Node.js client for NATS, a lightweight, high-performance cloud native messaging system

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

jetstream.mddocs/

JetStream

JetStream provides persistent messaging with streams for durable message storage, consumers for reliable delivery, and advanced features like acknowledgments, deduplication, and retention policies.

Capabilities

JetStream Client

Primary interface for publishing and consuming persistent messages.

/**
 * Get JetStream client from NATS connection
 * @param opts - JetStream configuration options
 * @returns JetStream client instance
 */
jetstream(opts?: JetStreamOptions | JetStreamManagerOptions): JetStreamClient;

interface JetStreamOptions {
  /** JetStream API prefix (default: "$JS.API") */
  apiPrefix?: string;
  /** Request timeout in milliseconds */
  timeout?: number;
  /** JetStream domain name */
  domain?: string;
}

interface JetStreamClient {
  /** Publish message to JetStream stream */
  publish(subj: string, payload?: Payload, options?: Partial<JetStreamPublishOptions>): Promise<PubAck>;
  
  /** Create push consumer subscription */
  subscribe(subject: string, opts?: Partial<ConsumerOptsBuilder>): Promise<JetStreamSubscription>;
  
  /** Create pull consumer subscription */
  pullSubscribe(subject: string, opts?: Partial<ConsumerOptsBuilder>): Promise<JetStreamPullSubscription>;
  
  /** Fetch messages from pull consumer */
  fetch(stream: string, consumer: string, opts?: Partial<FetchOptions>): Promise<FetchMessages>;
  
  /** Consume messages with automatic ack handling */
  consume(stream: string, consumer: string, opts?: Partial<ConsumeOptions>): Promise<ConsumerMessages>;
  
  /** Access to KV and Object Store views */
  views: Views;
}

Usage Examples:

import { connect, StringCodec } from "nats";

const nc = await connect();
const js = nc.jetstream();
const sc = StringCodec();

// Publish to JetStream
const pubAck = await js.publish("events.user.created", sc.encode("user-123"), {
  msgID: "unique-msg-1",
  timeout: 5000
});
console.log(`Published to stream: ${pubAck.stream}, sequence: ${pubAck.seq}`);

// Subscribe with push consumer
const sub = await js.subscribe("events.user.*", {
  durable: "user-processor",
  deliverSubject: "process.users"
});

(async () => {
  for await (const m of sub) {
    console.log(`Processing: ${sc.decode(m.data)}`);
    m.ack(); // Acknowledge message
  }
})();

// Pull subscription
const pullSub = await js.pullSubscribe("events.order.*", {
  durable: "order-worker"
});

// Pull messages in batches
pullSub.pull({ batch: 10, max_wait: 5000 });
(async () => {
  for await (const m of pullSub) {
    console.log(`Order event: ${sc.decode(m.data)}`);
    m.ack();
  }
})();

JetStream Publishing

Publish messages to streams with acknowledgment and deduplication support.

/**
 * Publish message to JetStream stream
 * @param subj - Subject to publish to (must match stream subjects)
 * @param payload - Message data
 * @param options - Publishing options including deduplication and expectations
 * @returns Promise resolving to publish acknowledgment
 */
publish(
  subj: string, 
  payload?: Payload, 
  options?: Partial<JetStreamPublishOptions>
): Promise<PubAck>;

interface JetStreamPublishOptions {
  /** Message ID for deduplication within duplicate_window */
  msgID: string;
  /** Publish timeout in milliseconds */
  timeout: number;
  /** Message headers */
  headers: MsgHdrs;
  /** Expectations for conditional publishing */
  expect: Partial<{
    /** Expected last message ID in stream */
    lastMsgID: string;
    /** Expected stream name */
    streamName: string;
    /** Expected last sequence number in stream */
    lastSequence: number;
    /** Expected last sequence for this subject */
    lastSubjectSequence: number;
  }>;
}

interface PubAck {
  /** Stream that stored the message */
  stream: string;
  /** JetStream domain */
  domain?: string;
  /** Sequence number assigned to message */
  seq: number;
  /** True if message was deduplicated */
  duplicate: boolean;
}

Usage Examples:

import { connect, JSONCodec, headers } from "nats";

const nc = await connect();
const js = nc.jetstream();
const jc = JSONCodec();

// Basic publish
const ack = await js.publish("orders.created", jc.encode({
  orderId: "ord-123",
  amount: 99.99
}));

// Publish with deduplication
const ack = await js.publish("orders.created", jc.encode(orderData), {
  msgID: `order-${orderId}`,
  timeout: 10000
});

// Conditional publish (only if last sequence matches)
try {
  const ack = await js.publish("inventory.updated", jc.encode(inventoryData), {
    expect: {
      lastSubjectSequence: 42
    }
  });
} catch (err) {
  if (err.code === "10071") {
    console.log("Sequence mismatch - concurrent update detected");
  }
}

// Publish with headers
const hdrs = headers();
hdrs.set("source", "order-service");
hdrs.set("version", "1.0");

const ack = await js.publish("events.order.updated", jc.encode(updateData), {
  headers: hdrs,
  msgID: `update-${updateId}`
});

Consumer Management

Create and configure consumers for message delivery patterns.

/** Create consumer options builder */
function consumerOpts(): ConsumerOptsBuilder;

interface ConsumerOptsBuilder {
  /** Set consumer name (durable or ephemeral) */
  durable(name: string): ConsumerOptsBuilder;
  /** Set delivery subject for push consumers */
  deliverSubject(subject: string): ConsumerOptsBuilder;
  /** Set queue group for load balancing */
  queue(name: string): ConsumerOptsBuilder;
  /** Set acknowledgment policy */
  ackPolicy(policy: AckPolicy): ConsumerOptsBuilder;
  /** Set delivery policy */
  deliverPolicy(policy: DeliverPolicy): ConsumerOptsBuilder;
  /** Set replay policy */
  replayPolicy(policy: ReplayPolicy): ConsumerOptsBuilder;
  /** Set message filter subject */
  filterSubject(subject: string): ConsumerOptsBuilder;
  /** Set starting sequence number */
  startSequence(seq: number): ConsumerOptsBuilder;
  /** Set starting time */
  startTime(time: Date): ConsumerOptsBuilder;
  /** Set acknowledgment wait time */
  ackWait(millis: number): ConsumerOptsBuilder;
  /** Set maximum delivery attempts */
  maxDeliver(max: number): ConsumerOptsBuilder;
  /** Set maximum pending acknowledgments */
  maxAckPending(max: number): ConsumerOptsBuilder;
  /** Set idle heartbeat interval */
  idleHeartbeat(millis: number): ConsumerOptsBuilder;
  /** Set flow control */
  flowControl(): ConsumerOptsBuilder;
  /** Set deliver group (like queue but for push consumers) */
  deliverGroup(name: string): ConsumerOptsBuilder;
  /** Set manual acknowledgment mode */
  manualAck(): ConsumerOptsBuilder;
  /** Bind to existing consumer */
  bind(stream: string, consumer: string): ConsumerOptsBuilder;
}

enum AckPolicy {
  /** No acknowledgment required */
  None = "none",
  /** Acknowledge all messages up to this one */
  All = "all", 
  /** Acknowledge this specific message */
  Explicit = "explicit"
}

enum DeliverPolicy {
  /** Deliver all messages */
  All = "all",
  /** Deliver only the last message */
  Last = "last",
  /** Deliver only new messages */
  New = "new",
  /** Start from specific sequence */
  ByStartSequence = "by_start_sequence",
  /** Start from specific time */
  ByStartTime = "by_start_time",
  /** Last message per subject */
  LastPerSubject = "last_per_subject"
}

enum ReplayPolicy {
  /** Deliver as fast as possible */
  Instant = "instant",
  /** Replay at original timing */
  Original = "original"
}

Usage Examples:

import { connect, consumerOpts, AckPolicy, DeliverPolicy } from "nats";

const nc = await connect();
const js = nc.jetstream();

// Durable push consumer
const opts = consumerOpts()
  .durable("order-processor")
  .deliverSubject("process.orders")
  .ackPolicy(AckPolicy.Explicit)
  .deliverPolicy(DeliverPolicy.New)
  .maxDeliver(3)
  .ackWait(30000);

const sub = await js.subscribe("orders.*", opts);

// Pull consumer with queue
const pullOpts = consumerOpts()
  .durable("work-queue")
  .ackPolicy(AckPolicy.Explicit)
  .maxAckPending(100);

const pullSub = await js.pullSubscribe("work.jobs", pullOpts);

// Ephemeral consumer starting from specific time
const ephemeralOpts = consumerOpts()
  .deliverPolicy(DeliverPolicy.ByStartTime)
  .startTime(new Date("2023-01-01"))
  .filterSubject("logs.error.*");

const logSub = await js.subscribe("logs.*", ephemeralOpts);

// Bind to existing consumer
const bindOpts = consumerOpts()
  .bind("events", "existing-consumer");

const boundSub = await js.subscribe("", bindOpts);

JetStream Subscriptions

Handle JetStream messages with acknowledgment and flow control.

interface JetStreamSubscription {
  /** Standard subscription interface */
  unsubscribe(max?: number): void;
  drain(): Promise<void>;
  isClosed(): boolean;
  
  /** JetStream specific features */
  destroy(): Promise<void>;
  closed: Promise<void>;
  consumerInfo(): Promise<ConsumerInfo>;
  
  /** Async iterator for processing messages */
  [Symbol.asyncIterator](): AsyncIterableIterator<JsMsg>;
}

interface JetStreamPullSubscription extends JetStreamSubscription {
  /** Pull messages from server */
  pull(opts?: Partial<PullOptions>): void;
}

interface JsMsg {
  /** Message subject */
  subject: string;
  /** Message data */
  data: Uint8Array;
  /** Reply subject */
  reply?: string;
  /** Message headers */
  headers?: MsgHdrs;
  /** Message sequence in stream */
  seq: number;
  /** Message redelivery count */
  redelivered: boolean;
  /** Consumer info */
  info: DeliveryInfo;
  
  /** Acknowledge message processing */
  ack(): void;
  /** Negative acknowledge (redelivery) */
  nak(millis?: number): void;
  /** Working indicator (extend ack wait) */
  working(): void;
  /** Terminate message processing */
  term(): void;
  /** Acknowledge and request next message */
  ackSync(): void;
}

interface PullOptions {
  /** Number of messages to request */
  batch: number;
  /** Maximum time to wait for messages */
  max_wait: number;
  /** Don't wait if no messages available */
  no_wait: boolean;
  /** Maximum bytes to return */
  max_bytes: number;
  /** Idle heartbeat timeout */
  idle_heartbeat: number;
}

interface DeliveryInfo {
  /** Stream name */
  stream: string;
  /** Consumer name */
  consumer: string;
  /** Message sequence in stream */
  streamSequence: number;
  /** Consumer sequence */
  consumerSequence: number;
  /** Number of delivery attempts */
  deliverySequence: number;
  /** Message timestamp */
  timestampNanos: number;
  /** Number of pending messages */
  pending: number;
  /** Redelivered flag */
  redelivered: boolean;
}

Usage Examples:

import { connect } from "nats";

const nc = await connect();
const js = nc.jetstream();

// Process messages with explicit acks
const sub = await js.subscribe("orders.*", { durable: "order-handler" });
(async () => {
  for await (const m of sub) {
    try {
      // Process the message
      await processOrder(m.data);
      m.ack(); // Acknowledge successful processing
    } catch (err) {
      console.error(`Processing failed: ${err.message}`);
      if (m.info.deliverySequence >= 3) {
        m.term(); // Terminate after 3 attempts
      } else {
        m.nak(5000); // Negative ack, retry in 5 seconds
      }
    }
  }
})();

// Pull subscription with batching  
const pullSub = await js.pullSubscribe("events.*", { durable: "event-worker" });

// Request messages in batches
pullSub.pull({ batch: 50, max_wait: 1000 });

(async () => {
  for await (const m of pullSub) {
    console.log(`Event: ${m.subject}, Sequence: ${m.seq}`);
    
    // Send working indicator for long processing
    if (needsLongProcessing(m)) {
      m.working(); // Extend ack wait time
    }
    
    await processEvent(m);
    m.ack();
    
    // Request more messages when batch is nearly consumed
    if (m.info.pending < 10) {
      pullSub.pull({ batch: 50, max_wait: 1000 });
    }
  }
})();

JetStream Manager

Manage streams, consumers, and JetStream account information.

/**
 * Get JetStream manager from NATS connection
 * @param opts - Manager options including API validation
 * @returns Promise resolving to JetStream manager
 */
jetstreamManager(opts?: JetStreamManagerOptions): Promise<JetStreamManager>;

interface JetStreamManagerOptions extends JetStreamOptions {
  /** Skip JetStream API availability check */
  checkAPI?: boolean;
}

interface JetStreamManager {
  /** Stream management API */
  streams: StreamAPI;
  /** Consumer management API */
  consumers: ConsumerAPI;
  
  /** Get JetStream account information and limits */
  getAccountInfo(): Promise<JetStreamAccountStats>;
  /** Monitor JetStream advisories */
  advisories(): AsyncIterable<Advisory>;
  /** Get manager configuration */
  getOptions(): JetStreamOptions;
  /** Get JetStream client with same options */
  jetstream(): JetStreamClient;
}

interface JetStreamAccountStats {
  /** Account limits */
  limits: AccountLimits;
  /** Current usage */
  api: JetStreamApiStats;
  /** Number of streams */
  streams: number;
  /** Number of consumers */
  consumers: number;
  /** Number of messages across all streams */
  messages: number;
  /** Total bytes across all streams */
  bytes: number;
}

interface StreamAPI {
  /** Get stream information */
  info(stream: string, opts?: Partial<StreamInfoRequestOptions>): Promise<StreamInfo>;
  /** Create new stream */
  add(cfg: Partial<StreamConfig>): Promise<StreamInfo>;
  /** Update stream configuration */
  update(name: string, cfg: Partial<StreamUpdateConfig>): Promise<StreamInfo>;
  /** Delete stream */
  delete(stream: string): Promise<boolean>;
  /** List all streams */
  list(subject?: string): Lister<StreamInfo>;
  /** List stream names only */
  names(subject?: string): Lister<string>;
  /** Get stream object */
  get(name: string): Promise<Stream>;
  /** Find stream by subject */
  find(subject: string): Promise<string>;
  /** Purge stream messages */
  purge(stream: string, opts?: PurgeOpts): Promise<PurgeResponse>;
  /** Delete specific message */
  deleteMessage(stream: string, seq: number, erase?: boolean): Promise<boolean>;
  /** Get specific message */
  getMessage(stream: string, query: MsgRequest): Promise<StoredMsg>;
}

Usage Examples:

import { connect, StorageType, RetentionPolicy } from "nats";

const nc = await connect();
const jsm = await nc.jetstreamManager();

// Create a stream
const streamInfo = await jsm.streams.add({
  name: "events",
  subjects: ["events.*"],
  storage: StorageType.File,
  retention: RetentionPolicy.Limits,
  max_msgs: 100000,
  max_bytes: 1024 * 1024 * 100, // 100MB
  max_age: 24 * 60 * 60 * 1000 * 1000 * 1000, // 24 hours in nanoseconds
  duplicate_window: 5 * 60 * 1000 * 1000 * 1000 // 5 minutes in nanoseconds
});

console.log(`Created stream: ${streamInfo.config.name}`);

// List all streams
const streams = jsm.streams.list();
for await (const stream of streams) {
  console.log(`Stream: ${stream.config.name}, Messages: ${stream.state.messages}`);
}

// Get account information
const accountInfo = await jsm.getAccountInfo();
console.log(`Streams: ${accountInfo.streams}, Messages: ${accountInfo.messages}`);

// Monitor advisories
(async () => {
  for await (const advisory of jsm.advisories()) {
    console.log(`Advisory: ${advisory.type}`, advisory);
  }
})();

// Purge old messages
const purgeResponse = await jsm.streams.purge("events", {
  keep: 1000 // Keep last 1000 messages
});
console.log(`Purged ${purgeResponse.purged} messages`);

Types

interface StreamConfig {
  name: string;
  subjects?: string[];
  retention?: RetentionPolicy;
  max_consumers?: number;
  max_msgs?: number;
  max_bytes?: number;
  max_age?: number;
  max_msgs_per_subject?: number;
  max_msg_size?: number;
  storage?: StorageType;
  num_replicas?: number;
  no_ack?: boolean;
  discard?: DiscardPolicy;
  duplicate_window?: number;
  placement?: Placement;
  mirror?: StreamSource;
  sources?: StreamSource[];
  sealed?: boolean;
  deny_delete?: boolean;
  deny_purge?: boolean;
  allow_rollup_hdrs?: boolean;
  republish?: Republish;
  allow_direct?: boolean;
  mirror_direct?: boolean;
  subject_transform?: SubjectTransformConfig;
  compression?: StoreCompression;
  first_seq?: number;
}

interface ConsumerConfig {
  name?: string;
  durable_name?: string;
  description?: string;
  deliver_policy?: DeliverPolicy;
  opt_start_seq?: number;
  opt_start_time?: string;
  ack_policy?: AckPolicy;
  ack_wait?: number;
  max_deliver?: number;
  filter_subject?: string;
  replay_policy?: ReplayPolicy;
  rate_limit?: number;
  sample_freq?: string;
  max_waiting?: number;
  max_ack_pending?: number;
  flow_control?: boolean;
  idle_heartbeat?: number;
  headers_only?: boolean;
  max_pull_waiting?: number;
  deliver_subject?: string;
  deliver_group?: string;
  inactive_threshold?: number;
  num_replicas?: number;
  mem_storage?: boolean;
  pause_until?: string;
}

enum RetentionPolicy {
  Limits = "limits",
  Interest = "interest", 
  WorkQueue = "workqueue"
}

enum StorageType {
  File = "file",
  Memory = "memory"
}

enum DiscardPolicy {
  Old = "old",
  New = "new"
}

enum StoreCompression {
  None = "none",
  S2 = "s2"
}

interface StoredMsg {
  subject: string;
  seq: number;
  data: Uint8Array;
  time: Date;
  headers?: MsgHdrs;
}

interface Advisory {
  type: AdvisoryKind;
  id: string;
  timestamp: Date;
  stream?: string;
  consumer?: string;
  [key: string]: unknown;
}

enum AdvisoryKind {
  API = "$JS.EVENT.ADVISORY.API",
  StreamAction = "$JS.EVENT.ADVISORY.STREAM.ACTION",
  ConsumerAction = "$JS.EVENT.ADVISORY.CONSUMER.ACTION",
  SnapshotCreate = "$JS.EVENT.ADVISORY.CONSUMER.SNAPSHOT_CREATE",
  SnapshotComplete = "$JS.EVENT.ADVISORY.CONSUMER.SNAPSHOT_COMPLETE",
  RestoreCreate = "$JS.EVENT.ADVISORY.STREAM.RESTORE_CREATE",
  RestoreComplete = "$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE",
  MaxDeliver = "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES",
  Terminated = "$JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED",
  Ack = "$JS.EVENT.METRIC.CONSUMER.ACK",
  DeliveryExceeded = "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES",
  DeliveryTerminated = "$JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED",
  MissedHeartbeat = "$JS.EVENT.ADVISORY.CONSUMER.MISSED_HEARTBEAT"
}

docs

connection.md

index.md

jetstream.md

kv-store.md

messaging.md

object-store.md

services.md

tile.json