0
# JetStream
1
2
JetStream provides persistent messaging with streams for durable message storage, consumers for reliable delivery, and advanced features like acknowledgments, deduplication, and retention policies.
3
4
## Capabilities
5
6
### JetStream Client
7
8
Primary interface for publishing and consuming persistent messages.
9
10
```typescript { .api }
11
/**
12
* Get JetStream client from NATS connection
13
* @param opts - JetStream configuration options
14
* @returns JetStream client instance
15
*/
16
jetstream(opts?: JetStreamOptions | JetStreamManagerOptions): JetStreamClient;
17
18
interface JetStreamOptions {
19
/** JetStream API prefix (default: "$JS.API") */
20
apiPrefix?: string;
21
/** Request timeout in milliseconds */
22
timeout?: number;
23
/** JetStream domain name */
24
domain?: string;
25
}
26
27
interface JetStreamClient {
28
/** Publish message to JetStream stream */
29
publish(subj: string, payload?: Payload, options?: Partial<JetStreamPublishOptions>): Promise<PubAck>;
30
31
/** Create push consumer subscription */
32
subscribe(subject: string, opts?: Partial<ConsumerOptsBuilder>): Promise<JetStreamSubscription>;
33
34
/** Create pull consumer subscription */
35
pullSubscribe(subject: string, opts?: Partial<ConsumerOptsBuilder>): Promise<JetStreamPullSubscription>;
36
37
/** Fetch messages from pull consumer */
38
fetch(stream: string, consumer: string, opts?: Partial<FetchOptions>): Promise<FetchMessages>;
39
40
/** Consume messages with automatic ack handling */
41
consume(stream: string, consumer: string, opts?: Partial<ConsumeOptions>): Promise<ConsumerMessages>;
42
43
/** Access to KV and Object Store views */
44
views: Views;
45
}
46
```
47
48
**Usage Examples:**
49
50
```typescript
51
import { connect, StringCodec } from "nats";
52
53
const nc = await connect();
54
const js = nc.jetstream();
55
const sc = StringCodec();
56
57
// Publish to JetStream
58
const pubAck = await js.publish("events.user.created", sc.encode("user-123"), {
59
msgID: "unique-msg-1",
60
timeout: 5000
61
});
62
console.log(`Published to stream: ${pubAck.stream}, sequence: ${pubAck.seq}`);
63
64
// Subscribe with push consumer
65
const sub = await js.subscribe("events.user.*", {
66
durable: "user-processor",
67
deliverSubject: "process.users"
68
});
69
70
(async () => {
71
for await (const m of sub) {
72
console.log(`Processing: ${sc.decode(m.data)}`);
73
m.ack(); // Acknowledge message
74
}
75
})();
76
77
// Pull subscription
78
const pullSub = await js.pullSubscribe("events.order.*", {
79
durable: "order-worker"
80
});
81
82
// Pull messages in batches
83
pullSub.pull({ batch: 10, max_wait: 5000 });
84
(async () => {
85
for await (const m of pullSub) {
86
console.log(`Order event: ${sc.decode(m.data)}`);
87
m.ack();
88
}
89
})();
90
```
91
92
### JetStream Publishing
93
94
Publish messages to streams with acknowledgment and deduplication support.
95
96
```typescript { .api }
97
/**
98
* Publish message to JetStream stream
99
* @param subj - Subject to publish to (must match stream subjects)
100
* @param payload - Message data
101
* @param options - Publishing options including deduplication and expectations
102
* @returns Promise resolving to publish acknowledgment
103
*/
104
publish(
105
subj: string,
106
payload?: Payload,
107
options?: Partial<JetStreamPublishOptions>
108
): Promise<PubAck>;
109
110
interface JetStreamPublishOptions {
111
/** Message ID for deduplication within duplicate_window */
112
msgID: string;
113
/** Publish timeout in milliseconds */
114
timeout: number;
115
/** Message headers */
116
headers: MsgHdrs;
117
/** Expectations for conditional publishing */
118
expect: Partial<{
119
/** Expected last message ID in stream */
120
lastMsgID: string;
121
/** Expected stream name */
122
streamName: string;
123
/** Expected last sequence number in stream */
124
lastSequence: number;
125
/** Expected last sequence for this subject */
126
lastSubjectSequence: number;
127
}>;
128
}
129
130
interface PubAck {
131
/** Stream that stored the message */
132
stream: string;
133
/** JetStream domain */
134
domain?: string;
135
/** Sequence number assigned to message */
136
seq: number;
137
/** True if message was deduplicated */
138
duplicate: boolean;
139
}
140
```
141
142
**Usage Examples:**
143
144
```typescript
145
import { connect, JSONCodec, headers } from "nats";
146
147
const nc = await connect();
148
const js = nc.jetstream();
149
const jc = JSONCodec();
150
151
// Basic publish
152
const ack = await js.publish("orders.created", jc.encode({
153
orderId: "ord-123",
154
amount: 99.99
155
}));
156
157
// Publish with deduplication
158
const ack = await js.publish("orders.created", jc.encode(orderData), {
159
msgID: `order-${orderId}`,
160
timeout: 10000
161
});
162
163
// Conditional publish (only if last sequence matches)
164
try {
165
const ack = await js.publish("inventory.updated", jc.encode(inventoryData), {
166
expect: {
167
lastSubjectSequence: 42
168
}
169
});
170
} catch (err) {
171
if (err.code === "10071") {
172
console.log("Sequence mismatch - concurrent update detected");
173
}
174
}
175
176
// Publish with headers
177
const hdrs = headers();
178
hdrs.set("source", "order-service");
179
hdrs.set("version", "1.0");
180
181
const ack = await js.publish("events.order.updated", jc.encode(updateData), {
182
headers: hdrs,
183
msgID: `update-${updateId}`
184
});
185
```
186
187
### Consumer Management
188
189
Create and configure consumers for message delivery patterns.
190
191
```typescript { .api }
192
/** Create consumer options builder */
193
function consumerOpts(): ConsumerOptsBuilder;
194
195
interface ConsumerOptsBuilder {
196
/** Set consumer name (durable or ephemeral) */
197
durable(name: string): ConsumerOptsBuilder;
198
/** Set delivery subject for push consumers */
199
deliverSubject(subject: string): ConsumerOptsBuilder;
200
/** Set queue group for load balancing */
201
queue(name: string): ConsumerOptsBuilder;
202
/** Set acknowledgment policy */
203
ackPolicy(policy: AckPolicy): ConsumerOptsBuilder;
204
/** Set delivery policy */
205
deliverPolicy(policy: DeliverPolicy): ConsumerOptsBuilder;
206
/** Set replay policy */
207
replayPolicy(policy: ReplayPolicy): ConsumerOptsBuilder;
208
/** Set message filter subject */
209
filterSubject(subject: string): ConsumerOptsBuilder;
210
/** Set starting sequence number */
211
startSequence(seq: number): ConsumerOptsBuilder;
212
/** Set starting time */
213
startTime(time: Date): ConsumerOptsBuilder;
214
/** Set acknowledgment wait time */
215
ackWait(millis: number): ConsumerOptsBuilder;
216
/** Set maximum delivery attempts */
217
maxDeliver(max: number): ConsumerOptsBuilder;
218
/** Set maximum pending acknowledgments */
219
maxAckPending(max: number): ConsumerOptsBuilder;
220
/** Set idle heartbeat interval */
221
idleHeartbeat(millis: number): ConsumerOptsBuilder;
222
/** Set flow control */
223
flowControl(): ConsumerOptsBuilder;
224
/** Set deliver group (like queue but for push consumers) */
225
deliverGroup(name: string): ConsumerOptsBuilder;
226
/** Set manual acknowledgment mode */
227
manualAck(): ConsumerOptsBuilder;
228
/** Bind to existing consumer */
229
bind(stream: string, consumer: string): ConsumerOptsBuilder;
230
}
231
232
enum AckPolicy {
233
/** No acknowledgment required */
234
None = "none",
235
/** Acknowledge all messages up to this one */
236
All = "all",
237
/** Acknowledge this specific message */
238
Explicit = "explicit"
239
}
240
241
enum DeliverPolicy {
242
/** Deliver all messages */
243
All = "all",
244
/** Deliver only the last message */
245
Last = "last",
246
/** Deliver only new messages */
247
New = "new",
248
/** Start from specific sequence */
249
ByStartSequence = "by_start_sequence",
250
/** Start from specific time */
251
ByStartTime = "by_start_time",
252
/** Last message per subject */
253
LastPerSubject = "last_per_subject"
254
}
255
256
enum ReplayPolicy {
257
/** Deliver as fast as possible */
258
Instant = "instant",
259
/** Replay at original timing */
260
Original = "original"
261
}
262
```
263
264
**Usage Examples:**
265
266
```typescript
267
import { connect, consumerOpts, AckPolicy, DeliverPolicy } from "nats";
268
269
const nc = await connect();
270
const js = nc.jetstream();
271
272
// Durable push consumer
273
const opts = consumerOpts()
274
.durable("order-processor")
275
.deliverSubject("process.orders")
276
.ackPolicy(AckPolicy.Explicit)
277
.deliverPolicy(DeliverPolicy.New)
278
.maxDeliver(3)
279
.ackWait(30000);
280
281
const sub = await js.subscribe("orders.*", opts);
282
283
// Pull consumer with queue
284
const pullOpts = consumerOpts()
285
.durable("work-queue")
286
.ackPolicy(AckPolicy.Explicit)
287
.maxAckPending(100);
288
289
const pullSub = await js.pullSubscribe("work.jobs", pullOpts);
290
291
// Ephemeral consumer starting from specific time
292
const ephemeralOpts = consumerOpts()
293
.deliverPolicy(DeliverPolicy.ByStartTime)
294
.startTime(new Date("2023-01-01"))
295
.filterSubject("logs.error.*");
296
297
const logSub = await js.subscribe("logs.*", ephemeralOpts);
298
299
// Bind to existing consumer
300
const bindOpts = consumerOpts()
301
.bind("events", "existing-consumer");
302
303
const boundSub = await js.subscribe("", bindOpts);
304
```
305
306
### JetStream Subscriptions
307
308
Handle JetStream messages with acknowledgment and flow control.
309
310
```typescript { .api }
311
interface JetStreamSubscription {
312
/** Standard subscription interface */
313
unsubscribe(max?: number): void;
314
drain(): Promise<void>;
315
isClosed(): boolean;
316
317
/** JetStream specific features */
318
destroy(): Promise<void>;
319
closed: Promise<void>;
320
consumerInfo(): Promise<ConsumerInfo>;
321
322
/** Async iterator for processing messages */
323
[Symbol.asyncIterator](): AsyncIterableIterator<JsMsg>;
324
}
325
326
interface JetStreamPullSubscription extends JetStreamSubscription {
327
/** Pull messages from server */
328
pull(opts?: Partial<PullOptions>): void;
329
}
330
331
interface JsMsg {
332
/** Message subject */
333
subject: string;
334
/** Message data */
335
data: Uint8Array;
336
/** Reply subject */
337
reply?: string;
338
/** Message headers */
339
headers?: MsgHdrs;
340
/** Message sequence in stream */
341
seq: number;
342
/** Message redelivery count */
343
redelivered: boolean;
344
/** Consumer info */
345
info: DeliveryInfo;
346
347
/** Acknowledge message processing */
348
ack(): void;
349
/** Negative acknowledge (redelivery) */
350
nak(millis?: number): void;
351
/** Working indicator (extend ack wait) */
352
working(): void;
353
/** Terminate message processing */
354
term(): void;
355
/** Acknowledge and request next message */
356
ackSync(): void;
357
}
358
359
interface PullOptions {
360
/** Number of messages to request */
361
batch: number;
362
/** Maximum time to wait for messages */
363
max_wait: number;
364
/** Don't wait if no messages available */
365
no_wait: boolean;
366
/** Maximum bytes to return */
367
max_bytes: number;
368
/** Idle heartbeat timeout */
369
idle_heartbeat: number;
370
}
371
372
interface DeliveryInfo {
373
/** Stream name */
374
stream: string;
375
/** Consumer name */
376
consumer: string;
377
/** Message sequence in stream */
378
streamSequence: number;
379
/** Consumer sequence */
380
consumerSequence: number;
381
/** Number of delivery attempts */
382
deliverySequence: number;
383
/** Message timestamp */
384
timestampNanos: number;
385
/** Number of pending messages */
386
pending: number;
387
/** Redelivered flag */
388
redelivered: boolean;
389
}
390
```
391
392
**Usage Examples:**
393
394
```typescript
395
import { connect } from "nats";
396
397
const nc = await connect();
398
const js = nc.jetstream();
399
400
// Process messages with explicit acks
401
const sub = await js.subscribe("orders.*", { durable: "order-handler" });
402
(async () => {
403
for await (const m of sub) {
404
try {
405
// Process the message
406
await processOrder(m.data);
407
m.ack(); // Acknowledge successful processing
408
} catch (err) {
409
console.error(`Processing failed: ${err.message}`);
410
if (m.info.deliverySequence >= 3) {
411
m.term(); // Terminate after 3 attempts
412
} else {
413
m.nak(5000); // Negative ack, retry in 5 seconds
414
}
415
}
416
}
417
})();
418
419
// Pull subscription with batching
420
const pullSub = await js.pullSubscribe("events.*", { durable: "event-worker" });
421
422
// Request messages in batches
423
pullSub.pull({ batch: 50, max_wait: 1000 });
424
425
(async () => {
426
for await (const m of pullSub) {
427
console.log(`Event: ${m.subject}, Sequence: ${m.seq}`);
428
429
// Send working indicator for long processing
430
if (needsLongProcessing(m)) {
431
m.working(); // Extend ack wait time
432
}
433
434
await processEvent(m);
435
m.ack();
436
437
// Request more messages when batch is nearly consumed
438
if (m.info.pending < 10) {
439
pullSub.pull({ batch: 50, max_wait: 1000 });
440
}
441
}
442
})();
443
```
444
445
### JetStream Manager
446
447
Manage streams, consumers, and JetStream account information.
448
449
```typescript { .api }
450
/**
451
* Get JetStream manager from NATS connection
452
* @param opts - Manager options including API validation
453
* @returns Promise resolving to JetStream manager
454
*/
455
jetstreamManager(opts?: JetStreamManagerOptions): Promise<JetStreamManager>;
456
457
interface JetStreamManagerOptions extends JetStreamOptions {
458
/** Skip JetStream API availability check */
459
checkAPI?: boolean;
460
}
461
462
interface JetStreamManager {
463
/** Stream management API */
464
streams: StreamAPI;
465
/** Consumer management API */
466
consumers: ConsumerAPI;
467
468
/** Get JetStream account information and limits */
469
getAccountInfo(): Promise<JetStreamAccountStats>;
470
/** Monitor JetStream advisories */
471
advisories(): AsyncIterable<Advisory>;
472
/** Get manager configuration */
473
getOptions(): JetStreamOptions;
474
/** Get JetStream client with same options */
475
jetstream(): JetStreamClient;
476
}
477
478
interface JetStreamAccountStats {
479
/** Account limits */
480
limits: AccountLimits;
481
/** Current usage */
482
api: JetStreamApiStats;
483
/** Number of streams */
484
streams: number;
485
/** Number of consumers */
486
consumers: number;
487
/** Number of messages across all streams */
488
messages: number;
489
/** Total bytes across all streams */
490
bytes: number;
491
}
492
493
interface StreamAPI {
494
/** Get stream information */
495
info(stream: string, opts?: Partial<StreamInfoRequestOptions>): Promise<StreamInfo>;
496
/** Create new stream */
497
add(cfg: Partial<StreamConfig>): Promise<StreamInfo>;
498
/** Update stream configuration */
499
update(name: string, cfg: Partial<StreamUpdateConfig>): Promise<StreamInfo>;
500
/** Delete stream */
501
delete(stream: string): Promise<boolean>;
502
/** List all streams */
503
list(subject?: string): Lister<StreamInfo>;
504
/** List stream names only */
505
names(subject?: string): Lister<string>;
506
/** Get stream object */
507
get(name: string): Promise<Stream>;
508
/** Find stream by subject */
509
find(subject: string): Promise<string>;
510
/** Purge stream messages */
511
purge(stream: string, opts?: PurgeOpts): Promise<PurgeResponse>;
512
/** Delete specific message */
513
deleteMessage(stream: string, seq: number, erase?: boolean): Promise<boolean>;
514
/** Get specific message */
515
getMessage(stream: string, query: MsgRequest): Promise<StoredMsg>;
516
}
517
```
518
519
**Usage Examples:**
520
521
```typescript
522
import { connect, StorageType, RetentionPolicy } from "nats";
523
524
const nc = await connect();
525
const jsm = await nc.jetstreamManager();
526
527
// Create a stream
528
const streamInfo = await jsm.streams.add({
529
name: "events",
530
subjects: ["events.*"],
531
storage: StorageType.File,
532
retention: RetentionPolicy.Limits,
533
max_msgs: 100000,
534
max_bytes: 1024 * 1024 * 100, // 100MB
535
max_age: 24 * 60 * 60 * 1000 * 1000 * 1000, // 24 hours in nanoseconds
536
duplicate_window: 5 * 60 * 1000 * 1000 * 1000 // 5 minutes in nanoseconds
537
});
538
539
console.log(`Created stream: ${streamInfo.config.name}`);
540
541
// List all streams
542
const streams = jsm.streams.list();
543
for await (const stream of streams) {
544
console.log(`Stream: ${stream.config.name}, Messages: ${stream.state.messages}`);
545
}
546
547
// Get account information
548
const accountInfo = await jsm.getAccountInfo();
549
console.log(`Streams: ${accountInfo.streams}, Messages: ${accountInfo.messages}`);
550
551
// Monitor advisories
552
(async () => {
553
for await (const advisory of jsm.advisories()) {
554
console.log(`Advisory: ${advisory.type}`, advisory);
555
}
556
})();
557
558
// Purge old messages
559
const purgeResponse = await jsm.streams.purge("events", {
560
keep: 1000 // Keep last 1000 messages
561
});
562
console.log(`Purged ${purgeResponse.purged} messages`);
563
```
564
565
## Types
566
567
```typescript { .api }
568
interface StreamConfig {
569
name: string;
570
subjects?: string[];
571
retention?: RetentionPolicy;
572
max_consumers?: number;
573
max_msgs?: number;
574
max_bytes?: number;
575
max_age?: number;
576
max_msgs_per_subject?: number;
577
max_msg_size?: number;
578
storage?: StorageType;
579
num_replicas?: number;
580
no_ack?: boolean;
581
discard?: DiscardPolicy;
582
duplicate_window?: number;
583
placement?: Placement;
584
mirror?: StreamSource;
585
sources?: StreamSource[];
586
sealed?: boolean;
587
deny_delete?: boolean;
588
deny_purge?: boolean;
589
allow_rollup_hdrs?: boolean;
590
republish?: Republish;
591
allow_direct?: boolean;
592
mirror_direct?: boolean;
593
subject_transform?: SubjectTransformConfig;
594
compression?: StoreCompression;
595
first_seq?: number;
596
}
597
598
interface ConsumerConfig {
599
name?: string;
600
durable_name?: string;
601
description?: string;
602
deliver_policy?: DeliverPolicy;
603
opt_start_seq?: number;
604
opt_start_time?: string;
605
ack_policy?: AckPolicy;
606
ack_wait?: number;
607
max_deliver?: number;
608
filter_subject?: string;
609
replay_policy?: ReplayPolicy;
610
rate_limit?: number;
611
sample_freq?: string;
612
max_waiting?: number;
613
max_ack_pending?: number;
614
flow_control?: boolean;
615
idle_heartbeat?: number;
616
headers_only?: boolean;
617
max_pull_waiting?: number;
618
deliver_subject?: string;
619
deliver_group?: string;
620
inactive_threshold?: number;
621
num_replicas?: number;
622
mem_storage?: boolean;
623
pause_until?: string;
624
}
625
626
enum RetentionPolicy {
627
Limits = "limits",
628
Interest = "interest",
629
WorkQueue = "workqueue"
630
}
631
632
enum StorageType {
633
File = "file",
634
Memory = "memory"
635
}
636
637
enum DiscardPolicy {
638
Old = "old",
639
New = "new"
640
}
641
642
enum StoreCompression {
643
None = "none",
644
S2 = "s2"
645
}
646
647
interface StoredMsg {
648
subject: string;
649
seq: number;
650
data: Uint8Array;
651
time: Date;
652
headers?: MsgHdrs;
653
}
654
655
interface Advisory {
656
type: AdvisoryKind;
657
id: string;
658
timestamp: Date;
659
stream?: string;
660
consumer?: string;
661
[key: string]: unknown;
662
}
663
664
enum AdvisoryKind {
665
API = "$JS.EVENT.ADVISORY.API",
666
StreamAction = "$JS.EVENT.ADVISORY.STREAM.ACTION",
667
ConsumerAction = "$JS.EVENT.ADVISORY.CONSUMER.ACTION",
668
SnapshotCreate = "$JS.EVENT.ADVISORY.CONSUMER.SNAPSHOT_CREATE",
669
SnapshotComplete = "$JS.EVENT.ADVISORY.CONSUMER.SNAPSHOT_COMPLETE",
670
RestoreCreate = "$JS.EVENT.ADVISORY.STREAM.RESTORE_CREATE",
671
RestoreComplete = "$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE",
672
MaxDeliver = "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES",
673
Terminated = "$JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED",
674
Ack = "$JS.EVENT.METRIC.CONSUMER.ACK",
675
DeliveryExceeded = "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES",
676
DeliveryTerminated = "$JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED",
677
MissedHeartbeat = "$JS.EVENT.ADVISORY.CONSUMER.MISSED_HEARTBEAT"
678
}
679
```