0
# Core Messaging
1
2
NATS core messaging provides publish/subscribe functionality with wildcards, queues, request/reply patterns, and message headers for building distributed applications.
3
4
## Capabilities
5
6
### Publishing Messages
7
8
Send messages to subjects with optional payload and headers.
9
10
```typescript { .api }
11
/**
12
* Publish a message to a subject
13
* @param subject - Target subject (can include wildcards for routing)
14
* @param payload - Message data (string or Uint8Array)
15
* @param options - Optional publish configuration
16
*/
17
publish(subject: string, payload?: Payload, options?: PublishOptions): void;
18
19
/**
20
* Publish using message object with subject, data, headers, and reply
21
* @param msg - Message object to publish
22
*/
23
publishMessage(msg: Msg): void;
24
25
interface PublishOptions {
26
/** Reply subject for response */
27
reply?: string;
28
/** Message headers */
29
headers?: MsgHdrs;
30
}
31
32
type Payload = Uint8Array | string;
33
```
34
35
**Usage Examples:**
36
37
```typescript
38
import { connect, StringCodec, headers } from "nats";
39
40
const nc = await connect();
41
const sc = StringCodec();
42
43
// Simple publish
44
nc.publish("news.updates", sc.encode("Breaking news!"));
45
46
// Publish with reply subject
47
nc.publish("weather.request", sc.encode("NYC"), {
48
reply: "weather.reply"
49
});
50
51
// Publish with headers
52
const hdrs = headers();
53
hdrs.set("content-type", "application/json");
54
hdrs.set("priority", "high");
55
56
nc.publish("api.user.create", sc.encode(JSON.stringify(userData)), {
57
headers: hdrs
58
});
59
60
// Publish binary data
61
const binaryData = new Uint8Array([1, 2, 3, 4, 5]);
62
nc.publish("binary.data", binaryData);
63
```
64
65
### Subscribing to Messages
66
67
Create subscriptions to receive messages from subjects with wildcard support and queue groups.
68
69
```typescript { .api }
70
/**
71
* Subscribe to a subject and receive messages
72
* @param subject - Subject pattern (supports wildcards * and >)
73
* @param opts - Subscription options
74
* @returns Subscription instance for managing the subscription
75
*/
76
subscribe(subject: string, opts?: SubscriptionOptions): Subscription;
77
78
interface SubscriptionOptions {
79
/** Queue group name for load balancing */
80
queue?: string;
81
/** Maximum messages before auto-unsubscribe */
82
max?: number;
83
/** Timeout in milliseconds for first message */
84
timeout?: number;
85
/** Message callback function (alternative to async iteration) */
86
callback?: (err: NatsError | null, msg: Msg) => void;
87
}
88
89
interface Subscription {
90
/** Unsubscribe from receiving messages */
91
unsubscribe(max?: number): void;
92
/** Drain subscription (process pending, then close) */
93
drain(): Promise<void>;
94
/** Check if subscription is closed */
95
isClosed(): boolean;
96
/** Get subscription subject */
97
getSubject(): string;
98
/** Get total messages received */
99
getReceived(): number;
100
/** Get messages processed by callback/iterator */
101
getProcessed(): number;
102
/** Get pending messages in queue */
103
getPending(): number;
104
/** Get subscription ID */
105
getID(): number;
106
/** Get max messages setting */
107
getMax(): number | undefined;
108
109
/** Async iterator for processing messages */
110
[Symbol.asyncIterator](): AsyncIterableIterator<Msg>;
111
}
112
```
113
114
**Usage Examples:**
115
116
```typescript
117
import { connect, StringCodec } from "nats";
118
119
const nc = await connect();
120
const sc = StringCodec();
121
122
// Basic subscription with async iteration
123
const sub = nc.subscribe("news.*");
124
(async () => {
125
for await (const m of sub) {
126
console.log(`Subject: ${m.subject}, Data: ${sc.decode(m.data)}`);
127
}
128
})();
129
130
// Queue subscription for load balancing
131
const queueSub = nc.subscribe("work.jobs", { queue: "workers" });
132
(async () => {
133
for await (const m of queueSub) {
134
console.log(`Processing job: ${sc.decode(m.data)}`);
135
// Simulate work
136
await new Promise(resolve => setTimeout(resolve, 1000));
137
}
138
})();
139
140
// Subscription with callback
141
const callbackSub = nc.subscribe("alerts.*", {
142
callback: (err, msg) => {
143
if (err) {
144
console.error("Subscription error:", err);
145
return;
146
}
147
console.log(`Alert: ${sc.decode(msg.data)}`);
148
}
149
});
150
151
// Limited subscription (auto-unsubscribe after 10 messages)
152
const limitedSub = nc.subscribe("limited.messages", { max: 10 });
153
154
// Wildcard subscriptions
155
const allNews = nc.subscribe("news.*"); // news.sports, news.weather
156
const everything = nc.subscribe(">"); // all subjects
157
const userEvents = nc.subscribe("user.*.event"); // user.123.event, user.456.event
158
```
159
160
### Request/Reply Pattern
161
162
Synchronous and asynchronous request/reply messaging for RPC-style communication.
163
164
```typescript { .api }
165
/**
166
* Send request and wait for single response
167
* @param subject - Request subject
168
* @param payload - Request data
169
* @param opts - Request options including timeout
170
* @returns Promise resolving to response message
171
*/
172
request(subject: string, payload?: Payload, opts?: RequestOptions): Promise<Msg>;
173
174
/**
175
* Send request expecting multiple responses
176
* @param subject - Request subject
177
* @param payload - Request data
178
* @param opts - Request options including strategy and limits
179
* @returns Promise resolving to async iterable of responses
180
*/
181
requestMany(
182
subject: string,
183
payload?: Payload,
184
opts?: Partial<RequestManyOptions>
185
): Promise<AsyncIterable<Msg>>;
186
187
/**
188
* Respond to a message using its reply subject
189
* @param msg - Original message to respond to
190
* @returns True if response was sent
191
*/
192
respondMessage(msg: Msg): boolean;
193
194
interface RequestOptions {
195
/** Request timeout in milliseconds */
196
timeout: number;
197
/** Request headers */
198
headers?: MsgHdrs;
199
/** Use dedicated subscription instead of shared mux */
200
noMux?: boolean;
201
/** Custom reply subject (requires noMux) */
202
reply?: string;
203
}
204
205
interface RequestManyOptions {
206
/** Strategy for determining when to stop collecting responses */
207
strategy: RequestStrategy;
208
/** Maximum time to wait for responses */
209
maxWait: number;
210
/** Request headers */
211
headers?: MsgHdrs;
212
/** Maximum number of responses to collect */
213
maxMessages?: number;
214
/** Use dedicated subscription instead of shared mux */
215
noMux?: boolean;
216
/** Jitter for timer-based strategies */
217
jitter?: number;
218
}
219
220
enum RequestStrategy {
221
/** Stop after specified time */
222
Timer = "timer",
223
/** Stop after specified message count */
224
Count = "count",
225
/** Stop after time with random jitter */
226
JitterTimer = "jitterTimer",
227
/** Stop when sentinel message received */
228
SentinelMsg = "sentinelMsg"
229
}
230
```
231
232
**Usage Examples:**
233
234
```typescript
235
import { connect, StringCodec, RequestStrategy } from "nats";
236
237
const nc = await connect();
238
const sc = StringCodec();
239
240
// Simple request/reply
241
try {
242
const response = await nc.request("time", sc.encode(""), { timeout: 1000 });
243
console.log(`Current time: ${sc.decode(response.data)}`);
244
} catch (err) {
245
console.error("Request failed:", err);
246
}
247
248
// Request with multiple responses
249
const responses = await nc.requestMany("services.ping", sc.encode(""), {
250
maxWait: 2000,
251
strategy: RequestStrategy.Timer
252
});
253
254
for await (const response of responses) {
255
console.log(`Service: ${sc.decode(response.data)}`);
256
}
257
258
// Service responder
259
const serviceSub = nc.subscribe("time");
260
(async () => {
261
for await (const m of serviceSub) {
262
const currentTime = new Date().toISOString();
263
m.respond(sc.encode(currentTime));
264
}
265
})();
266
267
// Service with error handling
268
const calcSub = nc.subscribe("math.divide");
269
(async () => {
270
for await (const m of calcSub) {
271
try {
272
const [a, b] = JSON.parse(sc.decode(m.data));
273
if (b === 0) {
274
m.respond(sc.encode(JSON.stringify({ error: "Division by zero" })));
275
} else {
276
m.respond(sc.encode(JSON.stringify({ result: a / b })));
277
}
278
} catch (err) {
279
m.respond(sc.encode(JSON.stringify({ error: "Invalid input" })));
280
}
281
}
282
})();
283
```
284
285
### Message Handling
286
287
Message structure and utilities for processing received messages.
288
289
```typescript { .api }
290
interface Msg {
291
/** Message subject */
292
subject: string;
293
/** Message data as bytes */
294
data: Uint8Array;
295
/** Reply subject if expecting response */
296
reply?: string;
297
/** Message headers if present */
298
headers?: MsgHdrs;
299
/** Sequence ID for JetStream messages */
300
seq?: number;
301
/**
302
* Respond to this message
303
* @param data - Response payload
304
* @param opts - Response options
305
* @returns True if response sent successfully
306
*/
307
respond(data?: Payload, opts?: PublishOptions): boolean;
308
}
309
310
interface MsgHdrs extends Iterable<[string, string[]]> {
311
/** True if message contains error status */
312
hasError: boolean;
313
/** HTTP-style status text */
314
status: string;
315
/** HTTP-style status code */
316
code: number;
317
/** Status description */
318
description: string;
319
320
/** Get header value */
321
get(k: string, match?: Match): string;
322
/** Set header value */
323
set(k: string, v: string, match?: Match): void;
324
/** Append header value */
325
append(k: string, v: string, match?: Match): void;
326
/** Check if header exists */
327
has(k: string, match?: Match): boolean;
328
/** Get all header keys */
329
keys(): string[];
330
/** Get all values for header */
331
values(k: string, match?: Match): string[];
332
/** Delete header */
333
delete(k: string, match?: Match): void;
334
/** Get last value for header */
335
last(k: string, match?: Match): string;
336
}
337
338
enum Match {
339
/** Exact case-sensitive match */
340
Exact = 0,
341
/** Canonical MIME header format */
342
CanonicalMIME,
343
/** Case-insensitive match */
344
IgnoreCase
345
}
346
347
/** Create empty message headers */
348
function headers(): MsgHdrs;
349
350
/** Create canonical MIME header key */
351
function canonicalMIMEHeaderKey(key: string): string;
352
```
353
354
**Usage Examples:**
355
356
```typescript
357
import { connect, StringCodec, headers, JSONCodec } from "nats";
358
359
const nc = await connect();
360
const sc = StringCodec();
361
const jc = JSONCodec();
362
363
// Process messages with headers
364
const sub = nc.subscribe("api.requests");
365
(async () => {
366
for await (const m of sub) {
367
// Check for headers
368
if (m.headers) {
369
const contentType = m.headers.get("content-type");
370
const userId = m.headers.get("user-id");
371
372
console.log(`Content-Type: ${contentType}, User: ${userId}`);
373
374
// Check for error status
375
if (m.headers.hasError) {
376
console.error(`Error: ${m.headers.code} ${m.headers.description}`);
377
continue;
378
}
379
}
380
381
// Process message data
382
const data = sc.decode(m.data);
383
console.log(`Received: ${data} on subject: ${m.subject}`);
384
385
// Respond if reply subject provided
386
if (m.reply) {
387
const response = { status: "processed", timestamp: Date.now() };
388
m.respond(jc.encode(response));
389
}
390
}
391
})();
392
393
// Send message with headers
394
const hdrs = headers();
395
hdrs.set("content-type", "application/json");
396
hdrs.set("user-id", "12345");
397
hdrs.set("priority", "high");
398
399
nc.publish("api.requests", jc.encode({ action: "create", data: {} }), {
400
headers: hdrs,
401
reply: "api.responses"
402
});
403
```
404
405
### Codecs
406
407
Built-in codecs for encoding/decoding message payloads.
408
409
```typescript { .api }
410
interface Codec<T> {
411
encode(d: T): Uint8Array;
412
decode(a: Uint8Array): T;
413
}
414
415
/** String encoding/decoding codec */
416
const StringCodec: Codec<string>;
417
418
/** JSON encoding/decoding codec with type safety */
419
function JSONCodec<T = unknown>(): Codec<T>;
420
```
421
422
**Usage Examples:**
423
424
```typescript
425
import { connect, StringCodec, JSONCodec } from "nats";
426
427
const nc = await connect();
428
const sc = StringCodec();
429
const jc = JSONCodec<{ message: string; timestamp: number }>();
430
431
// String codec
432
const message = "Hello NATS!";
433
nc.publish("text.message", sc.encode(message));
434
435
const sub = nc.subscribe("text.message");
436
(async () => {
437
for await (const m of sub) {
438
const text = sc.decode(m.data);
439
console.log(`Received text: ${text}`);
440
}
441
})();
442
443
// JSON codec with types
444
const jsonData = { message: "Hello", timestamp: Date.now() };
445
nc.publish("json.message", jc.encode(jsonData));
446
447
const jsonSub = nc.subscribe("json.message");
448
(async () => {
449
for await (const m of jsonSub) {
450
const data = jc.decode(m.data); // Typed as { message: string; timestamp: number }
451
console.log(`Message: ${data.message}, Time: ${data.timestamp}`);
452
}
453
})();
454
```
455
456
### Utilities
457
458
Helper functions for common messaging operations.
459
460
```typescript { .api }
461
/** Generate unique inbox subject for replies */
462
function createInbox(): string;
463
464
/** Empty payload constant */
465
const Empty: Uint8Array;
466
467
/** Convert async iterator to sync iterator */
468
function syncIterator<T>(iterator: AsyncIterable<T>): SyncIterator<T>;
469
470
interface SyncIterator<T> {
471
next(): T | null;
472
stop(): void;
473
}
474
```
475
476
**Usage Examples:**
477
478
```typescript
479
import { connect, createInbox, Empty, syncIterator } from "nats";
480
481
const nc = await connect();
482
483
// Generate unique reply subjects
484
const replySubject = createInbox();
485
console.log(replySubject); // "_INBOX.abcd1234..."
486
487
// Publish empty message
488
nc.publish("ping", Empty);
489
490
// Use sync iterator for non-async contexts
491
const sub = nc.subscribe("data.stream", { max: 100 });
492
const iter = syncIterator(sub);
493
494
// Poll for messages synchronously
495
setInterval(() => {
496
const msg = iter.next();
497
if (msg) {
498
console.log(`Got message: ${msg.subject}`);
499
}
500
}, 100);
501
```
502
503
## Error Handling
504
505
```typescript { .api }
506
class NatsError extends Error {
507
name: string;
508
message: string;
509
code: string;
510
chainedError?: Error;
511
api_error?: ApiError;
512
permissionContext?: { operation: string; subject: string; queue?: string };
513
514
/** Check if error is authentication related */
515
isAuthError(): boolean;
516
/** Check if error is permission related */
517
isPermissionError(): boolean;
518
/** Check if error is protocol related */
519
isProtocolError(): boolean;
520
/** Check if error is authentication timeout */
521
isAuthTimeout(): boolean;
522
/** Check if error is JetStream related */
523
isJetStreamError(): boolean;
524
/** Get JetStream API error details */
525
jsError(): ApiError | null;
526
}
527
528
interface ApiError {
529
/** HTTP-style error code */
530
code: number;
531
/** Human-readable description */
532
description: string;
533
/** NATS-specific error code */
534
err_code?: number;
535
}
536
537
enum ErrorCode {
538
// Connection errors
539
ConnectionClosed = "CONNECTION_CLOSED",
540
ConnectionTimeout = "CONNECTION_TIMEOUT",
541
ConnectionRefused = "CONNECTION_REFUSED",
542
543
// Authentication errors
544
BadAuthentication = "BAD_AUTHENTICATION",
545
AuthorizationViolation = "AUTHORIZATION_VIOLATION",
546
PermissionsViolation = "PERMISSIONS_VIOLATION",
547
548
// Protocol errors
549
BadSubject = "BAD_SUBJECT",
550
BadPayload = "BAD_PAYLOAD",
551
MaxPayloadExceeded = "MAX_PAYLOAD_EXCEEDED",
552
553
// Request errors
554
NoResponders = "503",
555
Timeout = "TIMEOUT",
556
RequestError = "REQUEST_ERROR"
557
}
558
559
/** Check if error is NatsError instance */
560
function isNatsError(err: NatsError | Error): err is NatsError;
561
```