or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connection.mdindex.mdjetstream.mdkv-store.mdmessaging.mdobject-store.mdservices.md

jetstream.mddocs/

0

# JetStream

1

2

JetStream provides persistent messaging with streams for durable message storage, consumers for reliable delivery, and advanced features like acknowledgments, deduplication, and retention policies.

3

4

## Capabilities

5

6

### JetStream Client

7

8

Primary interface for publishing and consuming persistent messages.

9

10

```typescript { .api }

11

/**

12

* Get JetStream client from NATS connection

13

* @param opts - JetStream configuration options

14

* @returns JetStream client instance

15

*/

16

jetstream(opts?: JetStreamOptions | JetStreamManagerOptions): JetStreamClient;

17

18

interface JetStreamOptions {

19

/** JetStream API prefix (default: "$JS.API") */

20

apiPrefix?: string;

21

/** Request timeout in milliseconds */

22

timeout?: number;

23

/** JetStream domain name */

24

domain?: string;

25

}

26

27

interface JetStreamClient {

28

/** Publish message to JetStream stream */

29

publish(subj: string, payload?: Payload, options?: Partial<JetStreamPublishOptions>): Promise<PubAck>;

30

31

/** Create push consumer subscription */

32

subscribe(subject: string, opts?: Partial<ConsumerOptsBuilder>): Promise<JetStreamSubscription>;

33

34

/** Create pull consumer subscription */

35

pullSubscribe(subject: string, opts?: Partial<ConsumerOptsBuilder>): Promise<JetStreamPullSubscription>;

36

37

/** Fetch messages from pull consumer */

38

fetch(stream: string, consumer: string, opts?: Partial<FetchOptions>): Promise<FetchMessages>;

39

40

/** Consume messages with automatic ack handling */

41

consume(stream: string, consumer: string, opts?: Partial<ConsumeOptions>): Promise<ConsumerMessages>;

42

43

/** Access to KV and Object Store views */

44

views: Views;

45

}

46

```

47

48

**Usage Examples:**

49

50

```typescript

51

import { connect, StringCodec } from "nats";

52

53

const nc = await connect();

54

const js = nc.jetstream();

55

const sc = StringCodec();

56

57

// Publish to JetStream

58

const pubAck = await js.publish("events.user.created", sc.encode("user-123"), {

59

msgID: "unique-msg-1",

60

timeout: 5000

61

});

62

console.log(`Published to stream: ${pubAck.stream}, sequence: ${pubAck.seq}`);

63

64

// Subscribe with push consumer

65

const sub = await js.subscribe("events.user.*", {

66

durable: "user-processor",

67

deliverSubject: "process.users"

68

});

69

70

(async () => {

71

for await (const m of sub) {

72

console.log(`Processing: ${sc.decode(m.data)}`);

73

m.ack(); // Acknowledge message

74

}

75

})();

76

77

// Pull subscription

78

const pullSub = await js.pullSubscribe("events.order.*", {

79

durable: "order-worker"

80

});

81

82

// Pull messages in batches

83

pullSub.pull({ batch: 10, max_wait: 5000 });

84

(async () => {

85

for await (const m of pullSub) {

86

console.log(`Order event: ${sc.decode(m.data)}`);

87

m.ack();

88

}

89

})();

90

```

91

92

### JetStream Publishing

93

94

Publish messages to streams with acknowledgment and deduplication support.

95

96

```typescript { .api }

97

/**

98

* Publish message to JetStream stream

99

* @param subj - Subject to publish to (must match stream subjects)

100

* @param payload - Message data

101

* @param options - Publishing options including deduplication and expectations

102

* @returns Promise resolving to publish acknowledgment

103

*/

104

publish(

105

subj: string,

106

payload?: Payload,

107

options?: Partial<JetStreamPublishOptions>

108

): Promise<PubAck>;

109

110

interface JetStreamPublishOptions {

111

/** Message ID for deduplication within duplicate_window */

112

msgID: string;

113

/** Publish timeout in milliseconds */

114

timeout: number;

115

/** Message headers */

116

headers: MsgHdrs;

117

/** Expectations for conditional publishing */

118

expect: Partial<{

119

/** Expected last message ID in stream */

120

lastMsgID: string;

121

/** Expected stream name */

122

streamName: string;

123

/** Expected last sequence number in stream */

124

lastSequence: number;

125

/** Expected last sequence for this subject */

126

lastSubjectSequence: number;

127

}>;

128

}

129

130

interface PubAck {

131

/** Stream that stored the message */

132

stream: string;

133

/** JetStream domain */

134

domain?: string;

135

/** Sequence number assigned to message */

136

seq: number;

137

/** True if message was deduplicated */

138

duplicate: boolean;

139

}

140

```

141

142

**Usage Examples:**

143

144

```typescript

145

import { connect, JSONCodec, headers } from "nats";

146

147

const nc = await connect();

148

const js = nc.jetstream();

149

const jc = JSONCodec();

150

151

// Basic publish

152

const ack = await js.publish("orders.created", jc.encode({

153

orderId: "ord-123",

154

amount: 99.99

155

}));

156

157

// Publish with deduplication

158

const ack = await js.publish("orders.created", jc.encode(orderData), {

159

msgID: `order-${orderId}`,

160

timeout: 10000

161

});

162

163

// Conditional publish (only if last sequence matches)

164

try {

165

const ack = await js.publish("inventory.updated", jc.encode(inventoryData), {

166

expect: {

167

lastSubjectSequence: 42

168

}

169

});

170

} catch (err) {

171

if (err.code === "10071") {

172

console.log("Sequence mismatch - concurrent update detected");

173

}

174

}

175

176

// Publish with headers

177

const hdrs = headers();

178

hdrs.set("source", "order-service");

179

hdrs.set("version", "1.0");

180

181

const ack = await js.publish("events.order.updated", jc.encode(updateData), {

182

headers: hdrs,

183

msgID: `update-${updateId}`

184

});

185

```

186

187

### Consumer Management

188

189

Create and configure consumers for message delivery patterns.

190

191

```typescript { .api }

192

/** Create consumer options builder */

193

function consumerOpts(): ConsumerOptsBuilder;

194

195

interface ConsumerOptsBuilder {

196

/** Set consumer name (durable or ephemeral) */

197

durable(name: string): ConsumerOptsBuilder;

198

/** Set delivery subject for push consumers */

199

deliverSubject(subject: string): ConsumerOptsBuilder;

200

/** Set queue group for load balancing */

201

queue(name: string): ConsumerOptsBuilder;

202

/** Set acknowledgment policy */

203

ackPolicy(policy: AckPolicy): ConsumerOptsBuilder;

204

/** Set delivery policy */

205

deliverPolicy(policy: DeliverPolicy): ConsumerOptsBuilder;

206

/** Set replay policy */

207

replayPolicy(policy: ReplayPolicy): ConsumerOptsBuilder;

208

/** Set message filter subject */

209

filterSubject(subject: string): ConsumerOptsBuilder;

210

/** Set starting sequence number */

211

startSequence(seq: number): ConsumerOptsBuilder;

212

/** Set starting time */

213

startTime(time: Date): ConsumerOptsBuilder;

214

/** Set acknowledgment wait time */

215

ackWait(millis: number): ConsumerOptsBuilder;

216

/** Set maximum delivery attempts */

217

maxDeliver(max: number): ConsumerOptsBuilder;

218

/** Set maximum pending acknowledgments */

219

maxAckPending(max: number): ConsumerOptsBuilder;

220

/** Set idle heartbeat interval */

221

idleHeartbeat(millis: number): ConsumerOptsBuilder;

222

/** Set flow control */

223

flowControl(): ConsumerOptsBuilder;

224

/** Set deliver group (like queue but for push consumers) */

225

deliverGroup(name: string): ConsumerOptsBuilder;

226

/** Set manual acknowledgment mode */

227

manualAck(): ConsumerOptsBuilder;

228

/** Bind to existing consumer */

229

bind(stream: string, consumer: string): ConsumerOptsBuilder;

230

}

231

232

enum AckPolicy {

233

/** No acknowledgment required */

234

None = "none",

235

/** Acknowledge all messages up to this one */

236

All = "all",

237

/** Acknowledge this specific message */

238

Explicit = "explicit"

239

}

240

241

enum DeliverPolicy {

242

/** Deliver all messages */

243

All = "all",

244

/** Deliver only the last message */

245

Last = "last",

246

/** Deliver only new messages */

247

New = "new",

248

/** Start from specific sequence */

249

ByStartSequence = "by_start_sequence",

250

/** Start from specific time */

251

ByStartTime = "by_start_time",

252

/** Last message per subject */

253

LastPerSubject = "last_per_subject"

254

}

255

256

enum ReplayPolicy {

257

/** Deliver as fast as possible */

258

Instant = "instant",

259

/** Replay at original timing */

260

Original = "original"

261

}

262

```

263

264

**Usage Examples:**

265

266

```typescript

267

import { connect, consumerOpts, AckPolicy, DeliverPolicy } from "nats";

268

269

const nc = await connect();

270

const js = nc.jetstream();

271

272

// Durable push consumer

273

const opts = consumerOpts()

274

.durable("order-processor")

275

.deliverSubject("process.orders")

276

.ackPolicy(AckPolicy.Explicit)

277

.deliverPolicy(DeliverPolicy.New)

278

.maxDeliver(3)

279

.ackWait(30000);

280

281

const sub = await js.subscribe("orders.*", opts);

282

283

// Pull consumer with queue

284

const pullOpts = consumerOpts()

285

.durable("work-queue")

286

.ackPolicy(AckPolicy.Explicit)

287

.maxAckPending(100);

288

289

const pullSub = await js.pullSubscribe("work.jobs", pullOpts);

290

291

// Ephemeral consumer starting from specific time

292

const ephemeralOpts = consumerOpts()

293

.deliverPolicy(DeliverPolicy.ByStartTime)

294

.startTime(new Date("2023-01-01"))

295

.filterSubject("logs.error.*");

296

297

const logSub = await js.subscribe("logs.*", ephemeralOpts);

298

299

// Bind to existing consumer

300

const bindOpts = consumerOpts()

301

.bind("events", "existing-consumer");

302

303

const boundSub = await js.subscribe("", bindOpts);

304

```

305

306

### JetStream Subscriptions

307

308

Handle JetStream messages with acknowledgment and flow control.

309

310

```typescript { .api }

311

interface JetStreamSubscription {

312

/** Standard subscription interface */

313

unsubscribe(max?: number): void;

314

drain(): Promise<void>;

315

isClosed(): boolean;

316

317

/** JetStream specific features */

318

destroy(): Promise<void>;

319

closed: Promise<void>;

320

consumerInfo(): Promise<ConsumerInfo>;

321

322

/** Async iterator for processing messages */

323

[Symbol.asyncIterator](): AsyncIterableIterator<JsMsg>;

324

}

325

326

interface JetStreamPullSubscription extends JetStreamSubscription {

327

/** Pull messages from server */

328

pull(opts?: Partial<PullOptions>): void;

329

}

330

331

interface JsMsg {

332

/** Message subject */

333

subject: string;

334

/** Message data */

335

data: Uint8Array;

336

/** Reply subject */

337

reply?: string;

338

/** Message headers */

339

headers?: MsgHdrs;

340

/** Message sequence in stream */

341

seq: number;

342

/** Message redelivery count */

343

redelivered: boolean;

344

/** Consumer info */

345

info: DeliveryInfo;

346

347

/** Acknowledge message processing */

348

ack(): void;

349

/** Negative acknowledge (redelivery) */

350

nak(millis?: number): void;

351

/** Working indicator (extend ack wait) */

352

working(): void;

353

/** Terminate message processing */

354

term(): void;

355

/** Acknowledge and request next message */

356

ackSync(): void;

357

}

358

359

interface PullOptions {

360

/** Number of messages to request */

361

batch: number;

362

/** Maximum time to wait for messages */

363

max_wait: number;

364

/** Don't wait if no messages available */

365

no_wait: boolean;

366

/** Maximum bytes to return */

367

max_bytes: number;

368

/** Idle heartbeat timeout */

369

idle_heartbeat: number;

370

}

371

372

interface DeliveryInfo {

373

/** Stream name */

374

stream: string;

375

/** Consumer name */

376

consumer: string;

377

/** Message sequence in stream */

378

streamSequence: number;

379

/** Consumer sequence */

380

consumerSequence: number;

381

/** Number of delivery attempts */

382

deliverySequence: number;

383

/** Message timestamp */

384

timestampNanos: number;

385

/** Number of pending messages */

386

pending: number;

387

/** Redelivered flag */

388

redelivered: boolean;

389

}

390

```

391

392

**Usage Examples:**

393

394

```typescript

395

import { connect } from "nats";

396

397

const nc = await connect();

398

const js = nc.jetstream();

399

400

// Process messages with explicit acks

401

const sub = await js.subscribe("orders.*", { durable: "order-handler" });

402

(async () => {

403

for await (const m of sub) {

404

try {

405

// Process the message

406

await processOrder(m.data);

407

m.ack(); // Acknowledge successful processing

408

} catch (err) {

409

console.error(`Processing failed: ${err.message}`);

410

if (m.info.deliverySequence >= 3) {

411

m.term(); // Terminate after 3 attempts

412

} else {

413

m.nak(5000); // Negative ack, retry in 5 seconds

414

}

415

}

416

}

417

})();

418

419

// Pull subscription with batching

420

const pullSub = await js.pullSubscribe("events.*", { durable: "event-worker" });

421

422

// Request messages in batches

423

pullSub.pull({ batch: 50, max_wait: 1000 });

424

425

(async () => {

426

for await (const m of pullSub) {

427

console.log(`Event: ${m.subject}, Sequence: ${m.seq}`);

428

429

// Send working indicator for long processing

430

if (needsLongProcessing(m)) {

431

m.working(); // Extend ack wait time

432

}

433

434

await processEvent(m);

435

m.ack();

436

437

// Request more messages when batch is nearly consumed

438

if (m.info.pending < 10) {

439

pullSub.pull({ batch: 50, max_wait: 1000 });

440

}

441

}

442

})();

443

```

444

445

### JetStream Manager

446

447

Manage streams, consumers, and JetStream account information.

448

449

```typescript { .api }

450

/**

451

* Get JetStream manager from NATS connection

452

* @param opts - Manager options including API validation

453

* @returns Promise resolving to JetStream manager

454

*/

455

jetstreamManager(opts?: JetStreamManagerOptions): Promise<JetStreamManager>;

456

457

interface JetStreamManagerOptions extends JetStreamOptions {

458

/** Skip JetStream API availability check */

459

checkAPI?: boolean;

460

}

461

462

interface JetStreamManager {

463

/** Stream management API */

464

streams: StreamAPI;

465

/** Consumer management API */

466

consumers: ConsumerAPI;

467

468

/** Get JetStream account information and limits */

469

getAccountInfo(): Promise<JetStreamAccountStats>;

470

/** Monitor JetStream advisories */

471

advisories(): AsyncIterable<Advisory>;

472

/** Get manager configuration */

473

getOptions(): JetStreamOptions;

474

/** Get JetStream client with same options */

475

jetstream(): JetStreamClient;

476

}

477

478

interface JetStreamAccountStats {

479

/** Account limits */

480

limits: AccountLimits;

481

/** Current usage */

482

api: JetStreamApiStats;

483

/** Number of streams */

484

streams: number;

485

/** Number of consumers */

486

consumers: number;

487

/** Number of messages across all streams */

488

messages: number;

489

/** Total bytes across all streams */

490

bytes: number;

491

}

492

493

interface StreamAPI {

494

/** Get stream information */

495

info(stream: string, opts?: Partial<StreamInfoRequestOptions>): Promise<StreamInfo>;

496

/** Create new stream */

497

add(cfg: Partial<StreamConfig>): Promise<StreamInfo>;

498

/** Update stream configuration */

499

update(name: string, cfg: Partial<StreamUpdateConfig>): Promise<StreamInfo>;

500

/** Delete stream */

501

delete(stream: string): Promise<boolean>;

502

/** List all streams */

503

list(subject?: string): Lister<StreamInfo>;

504

/** List stream names only */

505

names(subject?: string): Lister<string>;

506

/** Get stream object */

507

get(name: string): Promise<Stream>;

508

/** Find stream by subject */

509

find(subject: string): Promise<string>;

510

/** Purge stream messages */

511

purge(stream: string, opts?: PurgeOpts): Promise<PurgeResponse>;

512

/** Delete specific message */

513

deleteMessage(stream: string, seq: number, erase?: boolean): Promise<boolean>;

514

/** Get specific message */

515

getMessage(stream: string, query: MsgRequest): Promise<StoredMsg>;

516

}

517

```

518

519

**Usage Examples:**

520

521

```typescript

522

import { connect, StorageType, RetentionPolicy } from "nats";

523

524

const nc = await connect();

525

const jsm = await nc.jetstreamManager();

526

527

// Create a stream

528

const streamInfo = await jsm.streams.add({

529

name: "events",

530

subjects: ["events.*"],

531

storage: StorageType.File,

532

retention: RetentionPolicy.Limits,

533

max_msgs: 100000,

534

max_bytes: 1024 * 1024 * 100, // 100MB

535

max_age: 24 * 60 * 60 * 1000 * 1000 * 1000, // 24 hours in nanoseconds

536

duplicate_window: 5 * 60 * 1000 * 1000 * 1000 // 5 minutes in nanoseconds

537

});

538

539

console.log(`Created stream: ${streamInfo.config.name}`);

540

541

// List all streams

542

const streams = jsm.streams.list();

543

for await (const stream of streams) {

544

console.log(`Stream: ${stream.config.name}, Messages: ${stream.state.messages}`);

545

}

546

547

// Get account information

548

const accountInfo = await jsm.getAccountInfo();

549

console.log(`Streams: ${accountInfo.streams}, Messages: ${accountInfo.messages}`);

550

551

// Monitor advisories

552

(async () => {

553

for await (const advisory of jsm.advisories()) {

554

console.log(`Advisory: ${advisory.type}`, advisory);

555

}

556

})();

557

558

// Purge old messages

559

const purgeResponse = await jsm.streams.purge("events", {

560

keep: 1000 // Keep last 1000 messages

561

});

562

console.log(`Purged ${purgeResponse.purged} messages`);

563

```

564

565

## Types

566

567

```typescript { .api }

568

interface StreamConfig {

569

name: string;

570

subjects?: string[];

571

retention?: RetentionPolicy;

572

max_consumers?: number;

573

max_msgs?: number;

574

max_bytes?: number;

575

max_age?: number;

576

max_msgs_per_subject?: number;

577

max_msg_size?: number;

578

storage?: StorageType;

579

num_replicas?: number;

580

no_ack?: boolean;

581

discard?: DiscardPolicy;

582

duplicate_window?: number;

583

placement?: Placement;

584

mirror?: StreamSource;

585

sources?: StreamSource[];

586

sealed?: boolean;

587

deny_delete?: boolean;

588

deny_purge?: boolean;

589

allow_rollup_hdrs?: boolean;

590

republish?: Republish;

591

allow_direct?: boolean;

592

mirror_direct?: boolean;

593

subject_transform?: SubjectTransformConfig;

594

compression?: StoreCompression;

595

first_seq?: number;

596

}

597

598

interface ConsumerConfig {

599

name?: string;

600

durable_name?: string;

601

description?: string;

602

deliver_policy?: DeliverPolicy;

603

opt_start_seq?: number;

604

opt_start_time?: string;

605

ack_policy?: AckPolicy;

606

ack_wait?: number;

607

max_deliver?: number;

608

filter_subject?: string;

609

replay_policy?: ReplayPolicy;

610

rate_limit?: number;

611

sample_freq?: string;

612

max_waiting?: number;

613

max_ack_pending?: number;

614

flow_control?: boolean;

615

idle_heartbeat?: number;

616

headers_only?: boolean;

617

max_pull_waiting?: number;

618

deliver_subject?: string;

619

deliver_group?: string;

620

inactive_threshold?: number;

621

num_replicas?: number;

622

mem_storage?: boolean;

623

pause_until?: string;

624

}

625

626

enum RetentionPolicy {

627

Limits = "limits",

628

Interest = "interest",

629

WorkQueue = "workqueue"

630

}

631

632

enum StorageType {

633

File = "file",

634

Memory = "memory"

635

}

636

637

enum DiscardPolicy {

638

Old = "old",

639

New = "new"

640

}

641

642

enum StoreCompression {

643

None = "none",

644

S2 = "s2"

645

}

646

647

interface StoredMsg {

648

subject: string;

649

seq: number;

650

data: Uint8Array;

651

time: Date;

652

headers?: MsgHdrs;

653

}

654

655

interface Advisory {

656

type: AdvisoryKind;

657

id: string;

658

timestamp: Date;

659

stream?: string;

660

consumer?: string;

661

[key: string]: unknown;

662

}

663

664

enum AdvisoryKind {

665

API = "$JS.EVENT.ADVISORY.API",

666

StreamAction = "$JS.EVENT.ADVISORY.STREAM.ACTION",

667

ConsumerAction = "$JS.EVENT.ADVISORY.CONSUMER.ACTION",

668

SnapshotCreate = "$JS.EVENT.ADVISORY.CONSUMER.SNAPSHOT_CREATE",

669

SnapshotComplete = "$JS.EVENT.ADVISORY.CONSUMER.SNAPSHOT_COMPLETE",

670

RestoreCreate = "$JS.EVENT.ADVISORY.STREAM.RESTORE_CREATE",

671

RestoreComplete = "$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE",

672

MaxDeliver = "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES",

673

Terminated = "$JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED",

674

Ack = "$JS.EVENT.METRIC.CONSUMER.ACK",

675

DeliveryExceeded = "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES",

676

DeliveryTerminated = "$JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED",

677

MissedHeartbeat = "$JS.EVENT.ADVISORY.CONSUMER.MISSED_HEARTBEAT"

678

}

679

```