or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connection.mdindex.mdjetstream.mdkv-store.mdmessaging.mdobject-store.mdservices.md

messaging.mddocs/

0

# Core Messaging

1

2

NATS core messaging provides publish/subscribe functionality with wildcards, queues, request/reply patterns, and message headers for building distributed applications.

3

4

## Capabilities

5

6

### Publishing Messages

7

8

Send messages to subjects with optional payload and headers.

9

10

```typescript { .api }

11

/**

12

* Publish a message to a subject

13

* @param subject - Target subject (can include wildcards for routing)

14

* @param payload - Message data (string or Uint8Array)

15

* @param options - Optional publish configuration

16

*/

17

publish(subject: string, payload?: Payload, options?: PublishOptions): void;

18

19

/**

20

* Publish using message object with subject, data, headers, and reply

21

* @param msg - Message object to publish

22

*/

23

publishMessage(msg: Msg): void;

24

25

interface PublishOptions {

26

/** Reply subject for response */

27

reply?: string;

28

/** Message headers */

29

headers?: MsgHdrs;

30

}

31

32

type Payload = Uint8Array | string;

33

```

34

35

**Usage Examples:**

36

37

```typescript

38

import { connect, StringCodec, headers } from "nats";

39

40

const nc = await connect();

41

const sc = StringCodec();

42

43

// Simple publish

44

nc.publish("news.updates", sc.encode("Breaking news!"));

45

46

// Publish with reply subject

47

nc.publish("weather.request", sc.encode("NYC"), {

48

reply: "weather.reply"

49

});

50

51

// Publish with headers

52

const hdrs = headers();

53

hdrs.set("content-type", "application/json");

54

hdrs.set("priority", "high");

55

56

nc.publish("api.user.create", sc.encode(JSON.stringify(userData)), {

57

headers: hdrs

58

});

59

60

// Publish binary data

61

const binaryData = new Uint8Array([1, 2, 3, 4, 5]);

62

nc.publish("binary.data", binaryData);

63

```

64

65

### Subscribing to Messages

66

67

Create subscriptions to receive messages from subjects with wildcard support and queue groups.

68

69

```typescript { .api }

70

/**

71

* Subscribe to a subject and receive messages

72

* @param subject - Subject pattern (supports wildcards * and >)

73

* @param opts - Subscription options

74

* @returns Subscription instance for managing the subscription

75

*/

76

subscribe(subject: string, opts?: SubscriptionOptions): Subscription;

77

78

interface SubscriptionOptions {

79

/** Queue group name for load balancing */

80

queue?: string;

81

/** Maximum messages before auto-unsubscribe */

82

max?: number;

83

/** Timeout in milliseconds for first message */

84

timeout?: number;

85

/** Message callback function (alternative to async iteration) */

86

callback?: (err: NatsError | null, msg: Msg) => void;

87

}

88

89

interface Subscription {

90

/** Unsubscribe from receiving messages */

91

unsubscribe(max?: number): void;

92

/** Drain subscription (process pending, then close) */

93

drain(): Promise<void>;

94

/** Check if subscription is closed */

95

isClosed(): boolean;

96

/** Get subscription subject */

97

getSubject(): string;

98

/** Get total messages received */

99

getReceived(): number;

100

/** Get messages processed by callback/iterator */

101

getProcessed(): number;

102

/** Get pending messages in queue */

103

getPending(): number;

104

/** Get subscription ID */

105

getID(): number;

106

/** Get max messages setting */

107

getMax(): number | undefined;

108

109

/** Async iterator for processing messages */

110

[Symbol.asyncIterator](): AsyncIterableIterator<Msg>;

111

}

112

```

113

114

**Usage Examples:**

115

116

```typescript

117

import { connect, StringCodec } from "nats";

118

119

const nc = await connect();

120

const sc = StringCodec();

121

122

// Basic subscription with async iteration

123

const sub = nc.subscribe("news.*");

124

(async () => {

125

for await (const m of sub) {

126

console.log(`Subject: ${m.subject}, Data: ${sc.decode(m.data)}`);

127

}

128

})();

129

130

// Queue subscription for load balancing

131

const queueSub = nc.subscribe("work.jobs", { queue: "workers" });

132

(async () => {

133

for await (const m of queueSub) {

134

console.log(`Processing job: ${sc.decode(m.data)}`);

135

// Simulate work

136

await new Promise(resolve => setTimeout(resolve, 1000));

137

}

138

})();

139

140

// Subscription with callback

141

const callbackSub = nc.subscribe("alerts.*", {

142

callback: (err, msg) => {

143

if (err) {

144

console.error("Subscription error:", err);

145

return;

146

}

147

console.log(`Alert: ${sc.decode(msg.data)}`);

148

}

149

});

150

151

// Limited subscription (auto-unsubscribe after 10 messages)

152

const limitedSub = nc.subscribe("limited.messages", { max: 10 });

153

154

// Wildcard subscriptions

155

const allNews = nc.subscribe("news.*"); // news.sports, news.weather

156

const everything = nc.subscribe(">"); // all subjects

157

const userEvents = nc.subscribe("user.*.event"); // user.123.event, user.456.event

158

```

159

160

### Request/Reply Pattern

161

162

Synchronous and asynchronous request/reply messaging for RPC-style communication.

163

164

```typescript { .api }

165

/**

166

* Send request and wait for single response

167

* @param subject - Request subject

168

* @param payload - Request data

169

* @param opts - Request options including timeout

170

* @returns Promise resolving to response message

171

*/

172

request(subject: string, payload?: Payload, opts?: RequestOptions): Promise<Msg>;

173

174

/**

175

* Send request expecting multiple responses

176

* @param subject - Request subject

177

* @param payload - Request data

178

* @param opts - Request options including strategy and limits

179

* @returns Promise resolving to async iterable of responses

180

*/

181

requestMany(

182

subject: string,

183

payload?: Payload,

184

opts?: Partial<RequestManyOptions>

185

): Promise<AsyncIterable<Msg>>;

186

187

/**

188

* Respond to a message using its reply subject

189

* @param msg - Original message to respond to

190

* @returns True if response was sent

191

*/

192

respondMessage(msg: Msg): boolean;

193

194

interface RequestOptions {

195

/** Request timeout in milliseconds */

196

timeout: number;

197

/** Request headers */

198

headers?: MsgHdrs;

199

/** Use dedicated subscription instead of shared mux */

200

noMux?: boolean;

201

/** Custom reply subject (requires noMux) */

202

reply?: string;

203

}

204

205

interface RequestManyOptions {

206

/** Strategy for determining when to stop collecting responses */

207

strategy: RequestStrategy;

208

/** Maximum time to wait for responses */

209

maxWait: number;

210

/** Request headers */

211

headers?: MsgHdrs;

212

/** Maximum number of responses to collect */

213

maxMessages?: number;

214

/** Use dedicated subscription instead of shared mux */

215

noMux?: boolean;

216

/** Jitter for timer-based strategies */

217

jitter?: number;

218

}

219

220

enum RequestStrategy {

221

/** Stop after specified time */

222

Timer = "timer",

223

/** Stop after specified message count */

224

Count = "count",

225

/** Stop after time with random jitter */

226

JitterTimer = "jitterTimer",

227

/** Stop when sentinel message received */

228

SentinelMsg = "sentinelMsg"

229

}

230

```

231

232

**Usage Examples:**

233

234

```typescript

235

import { connect, StringCodec, RequestStrategy } from "nats";

236

237

const nc = await connect();

238

const sc = StringCodec();

239

240

// Simple request/reply

241

try {

242

const response = await nc.request("time", sc.encode(""), { timeout: 1000 });

243

console.log(`Current time: ${sc.decode(response.data)}`);

244

} catch (err) {

245

console.error("Request failed:", err);

246

}

247

248

// Request with multiple responses

249

const responses = await nc.requestMany("services.ping", sc.encode(""), {

250

maxWait: 2000,

251

strategy: RequestStrategy.Timer

252

});

253

254

for await (const response of responses) {

255

console.log(`Service: ${sc.decode(response.data)}`);

256

}

257

258

// Service responder

259

const serviceSub = nc.subscribe("time");

260

(async () => {

261

for await (const m of serviceSub) {

262

const currentTime = new Date().toISOString();

263

m.respond(sc.encode(currentTime));

264

}

265

})();

266

267

// Service with error handling

268

const calcSub = nc.subscribe("math.divide");

269

(async () => {

270

for await (const m of calcSub) {

271

try {

272

const [a, b] = JSON.parse(sc.decode(m.data));

273

if (b === 0) {

274

m.respond(sc.encode(JSON.stringify({ error: "Division by zero" })));

275

} else {

276

m.respond(sc.encode(JSON.stringify({ result: a / b })));

277

}

278

} catch (err) {

279

m.respond(sc.encode(JSON.stringify({ error: "Invalid input" })));

280

}

281

}

282

})();

283

```

284

285

### Message Handling

286

287

Message structure and utilities for processing received messages.

288

289

```typescript { .api }

290

interface Msg {

291

/** Message subject */

292

subject: string;

293

/** Message data as bytes */

294

data: Uint8Array;

295

/** Reply subject if expecting response */

296

reply?: string;

297

/** Message headers if present */

298

headers?: MsgHdrs;

299

/** Sequence ID for JetStream messages */

300

seq?: number;

301

/**

302

* Respond to this message

303

* @param data - Response payload

304

* @param opts - Response options

305

* @returns True if response sent successfully

306

*/

307

respond(data?: Payload, opts?: PublishOptions): boolean;

308

}

309

310

interface MsgHdrs extends Iterable<[string, string[]]> {

311

/** True if message contains error status */

312

hasError: boolean;

313

/** HTTP-style status text */

314

status: string;

315

/** HTTP-style status code */

316

code: number;

317

/** Status description */

318

description: string;

319

320

/** Get header value */

321

get(k: string, match?: Match): string;

322

/** Set header value */

323

set(k: string, v: string, match?: Match): void;

324

/** Append header value */

325

append(k: string, v: string, match?: Match): void;

326

/** Check if header exists */

327

has(k: string, match?: Match): boolean;

328

/** Get all header keys */

329

keys(): string[];

330

/** Get all values for header */

331

values(k: string, match?: Match): string[];

332

/** Delete header */

333

delete(k: string, match?: Match): void;

334

/** Get last value for header */

335

last(k: string, match?: Match): string;

336

}

337

338

enum Match {

339

/** Exact case-sensitive match */

340

Exact = 0,

341

/** Canonical MIME header format */

342

CanonicalMIME,

343

/** Case-insensitive match */

344

IgnoreCase

345

}

346

347

/** Create empty message headers */

348

function headers(): MsgHdrs;

349

350

/** Create canonical MIME header key */

351

function canonicalMIMEHeaderKey(key: string): string;

352

```

353

354

**Usage Examples:**

355

356

```typescript

357

import { connect, StringCodec, headers, JSONCodec } from "nats";

358

359

const nc = await connect();

360

const sc = StringCodec();

361

const jc = JSONCodec();

362

363

// Process messages with headers

364

const sub = nc.subscribe("api.requests");

365

(async () => {

366

for await (const m of sub) {

367

// Check for headers

368

if (m.headers) {

369

const contentType = m.headers.get("content-type");

370

const userId = m.headers.get("user-id");

371

372

console.log(`Content-Type: ${contentType}, User: ${userId}`);

373

374

// Check for error status

375

if (m.headers.hasError) {

376

console.error(`Error: ${m.headers.code} ${m.headers.description}`);

377

continue;

378

}

379

}

380

381

// Process message data

382

const data = sc.decode(m.data);

383

console.log(`Received: ${data} on subject: ${m.subject}`);

384

385

// Respond if reply subject provided

386

if (m.reply) {

387

const response = { status: "processed", timestamp: Date.now() };

388

m.respond(jc.encode(response));

389

}

390

}

391

})();

392

393

// Send message with headers

394

const hdrs = headers();

395

hdrs.set("content-type", "application/json");

396

hdrs.set("user-id", "12345");

397

hdrs.set("priority", "high");

398

399

nc.publish("api.requests", jc.encode({ action: "create", data: {} }), {

400

headers: hdrs,

401

reply: "api.responses"

402

});

403

```

404

405

### Codecs

406

407

Built-in codecs for encoding/decoding message payloads.

408

409

```typescript { .api }

410

interface Codec<T> {

411

encode(d: T): Uint8Array;

412

decode(a: Uint8Array): T;

413

}

414

415

/** String encoding/decoding codec */

416

const StringCodec: Codec<string>;

417

418

/** JSON encoding/decoding codec with type safety */

419

function JSONCodec<T = unknown>(): Codec<T>;

420

```

421

422

**Usage Examples:**

423

424

```typescript

425

import { connect, StringCodec, JSONCodec } from "nats";

426

427

const nc = await connect();

428

const sc = StringCodec();

429

const jc = JSONCodec<{ message: string; timestamp: number }>();

430

431

// String codec

432

const message = "Hello NATS!";

433

nc.publish("text.message", sc.encode(message));

434

435

const sub = nc.subscribe("text.message");

436

(async () => {

437

for await (const m of sub) {

438

const text = sc.decode(m.data);

439

console.log(`Received text: ${text}`);

440

}

441

})();

442

443

// JSON codec with types

444

const jsonData = { message: "Hello", timestamp: Date.now() };

445

nc.publish("json.message", jc.encode(jsonData));

446

447

const jsonSub = nc.subscribe("json.message");

448

(async () => {

449

for await (const m of jsonSub) {

450

const data = jc.decode(m.data); // Typed as { message: string; timestamp: number }

451

console.log(`Message: ${data.message}, Time: ${data.timestamp}`);

452

}

453

})();

454

```

455

456

### Utilities

457

458

Helper functions for common messaging operations.

459

460

```typescript { .api }

461

/** Generate unique inbox subject for replies */

462

function createInbox(): string;

463

464

/** Empty payload constant */

465

const Empty: Uint8Array;

466

467

/** Convert async iterator to sync iterator */

468

function syncIterator<T>(iterator: AsyncIterable<T>): SyncIterator<T>;

469

470

interface SyncIterator<T> {

471

next(): T | null;

472

stop(): void;

473

}

474

```

475

476

**Usage Examples:**

477

478

```typescript

479

import { connect, createInbox, Empty, syncIterator } from "nats";

480

481

const nc = await connect();

482

483

// Generate unique reply subjects

484

const replySubject = createInbox();

485

console.log(replySubject); // "_INBOX.abcd1234..."

486

487

// Publish empty message

488

nc.publish("ping", Empty);

489

490

// Use sync iterator for non-async contexts

491

const sub = nc.subscribe("data.stream", { max: 100 });

492

const iter = syncIterator(sub);

493

494

// Poll for messages synchronously

495

setInterval(() => {

496

const msg = iter.next();

497

if (msg) {

498

console.log(`Got message: ${msg.subject}`);

499

}

500

}, 100);

501

```

502

503

## Error Handling

504

505

```typescript { .api }

506

class NatsError extends Error {

507

name: string;

508

message: string;

509

code: string;

510

chainedError?: Error;

511

api_error?: ApiError;

512

permissionContext?: { operation: string; subject: string; queue?: string };

513

514

/** Check if error is authentication related */

515

isAuthError(): boolean;

516

/** Check if error is permission related */

517

isPermissionError(): boolean;

518

/** Check if error is protocol related */

519

isProtocolError(): boolean;

520

/** Check if error is authentication timeout */

521

isAuthTimeout(): boolean;

522

/** Check if error is JetStream related */

523

isJetStreamError(): boolean;

524

/** Get JetStream API error details */

525

jsError(): ApiError | null;

526

}

527

528

interface ApiError {

529

/** HTTP-style error code */

530

code: number;

531

/** Human-readable description */

532

description: string;

533

/** NATS-specific error code */

534

err_code?: number;

535

}

536

537

enum ErrorCode {

538

// Connection errors

539

ConnectionClosed = "CONNECTION_CLOSED",

540

ConnectionTimeout = "CONNECTION_TIMEOUT",

541

ConnectionRefused = "CONNECTION_REFUSED",

542

543

// Authentication errors

544

BadAuthentication = "BAD_AUTHENTICATION",

545

AuthorizationViolation = "AUTHORIZATION_VIOLATION",

546

PermissionsViolation = "PERMISSIONS_VIOLATION",

547

548

// Protocol errors

549

BadSubject = "BAD_SUBJECT",

550

BadPayload = "BAD_PAYLOAD",

551

MaxPayloadExceeded = "MAX_PAYLOAD_EXCEEDED",

552

553

// Request errors

554

NoResponders = "503",

555

Timeout = "TIMEOUT",

556

RequestError = "REQUEST_ERROR"

557

}

558

559

/** Check if error is NatsError instance */

560

function isNatsError(err: NatsError | Error): err is NatsError;

561

```