0
# Message Handling
1
2
Message processing with headers, reply handling, JetStream acknowledgments, and comprehensive metadata access for building sophisticated message-driven applications.
3
4
## Capabilities
5
6
### Message Structure
7
8
Understanding NATS message structure and properties.
9
10
```python { .api }
11
class Msg:
12
"""NATS message representation."""
13
subject: str
14
data: bytes
15
reply: str
16
headers: Optional[Dict[str, str]]
17
header: Optional[Dict[str, str]] # Alias for headers
18
sid: int
19
is_acked: bool
20
metadata: Optional[Metadata] # JetStream metadata
21
```
22
23
#### Usage Examples
24
25
```python
26
async def message_handler(msg):
27
print(f"Received message:")
28
print(f" Subject: {msg.subject}")
29
print(f" Data: {msg.data.decode()}")
30
print(f" Reply: {msg.reply}")
31
print(f" Headers: {msg.headers}")
32
print(f" Subscription ID: {msg.sid}")
33
34
# Access JetStream metadata if available
35
if msg.metadata:
36
print(f" Stream: {msg.metadata.stream}")
37
print(f" Sequence: {msg.metadata.sequence.stream}")
38
print(f" Consumer: {msg.metadata.sequence.consumer}")
39
40
# Subscribe and handle messages
41
await nc.subscribe("events.*", cb=message_handler)
42
```
43
44
### Response Handling
45
46
Send responses to messages with reply subjects.
47
48
```python { .api }
49
class Msg:
50
async def respond(self, data: bytes) -> None:
51
"""
52
Send response to message reply subject.
53
54
Parameters:
55
- data: Response data
56
57
Raises:
58
- ValueError: No reply subject available
59
"""
60
```
61
62
#### Usage Examples
63
64
```python
65
async def request_handler(msg):
66
try:
67
# Process request
68
request_data = json.loads(msg.data.decode())
69
result = await process_request(request_data)
70
71
# Send response
72
response = json.dumps(result).encode()
73
await msg.respond(response)
74
75
except Exception as e:
76
# Send error response
77
error_response = json.dumps({
78
"error": str(e),
79
"type": type(e).__name__
80
}).encode()
81
await msg.respond(error_response)
82
83
# Subscribe to requests
84
await nc.subscribe("api.requests", cb=request_handler)
85
86
# Client making request
87
response = await nc.request("api.requests", b'{"action": "get_user", "id": 123}')
88
result = json.loads(response.data.decode())
89
```
90
91
### Header Processing
92
93
Work with message headers for metadata and routing.
94
95
```python { .api }
96
class Msg:
97
headers: Optional[Dict[str, str]]
98
header: Optional[Dict[str, str]] # Alias for headers property
99
```
100
101
#### Usage Examples
102
103
```python
104
async def header_aware_handler(msg):
105
# Check for authentication header
106
auth_token = msg.headers.get("Authorization") if msg.headers else None
107
if not auth_token:
108
print("No authorization header")
109
return
110
111
# Check content type
112
content_type = msg.headers.get("Content-Type", "text/plain")
113
114
# Process based on content type
115
if content_type == "application/json":
116
data = json.loads(msg.data.decode())
117
elif content_type == "application/xml":
118
data = parse_xml(msg.data)
119
else:
120
data = msg.data.decode()
121
122
# Check for correlation ID for request tracking
123
correlation_id = msg.headers.get("Correlation-ID")
124
print(f"Processing request {correlation_id}")
125
126
await process_data(data)
127
128
# Publishing with headers
129
headers = {
130
"Content-Type": "application/json",
131
"Authorization": "Bearer token123",
132
"Correlation-ID": "req-12345",
133
"User-ID": "user456"
134
}
135
136
await nc.publish(
137
"api.data",
138
json.dumps({"key": "value"}).encode(),
139
headers=headers
140
)
141
```
142
143
### JetStream Message Acknowledgments
144
145
Handle JetStream message acknowledgments with various strategies.
146
147
```python { .api }
148
class Msg:
149
is_acked: bool
150
151
async def ack(self) -> None:
152
"""Acknowledge JetStream message successfully processed."""
153
154
async def ack_sync(self, timeout: float = 1.0) -> None:
155
"""
156
Synchronously acknowledge JetStream message.
157
158
Parameters:
159
- timeout: Acknowledgment timeout in seconds
160
"""
161
162
async def nak(self, delay: float = None) -> None:
163
"""
164
Negative acknowledgment - message will be redelivered.
165
166
Parameters:
167
- delay: Delay before redelivery in seconds
168
"""
169
170
async def in_progress(self) -> None:
171
"""Extend acknowledgment deadline for longer processing."""
172
173
async def term(self) -> None:
174
"""Terminate message processing - no further redelivery."""
175
```
176
177
#### Usage Examples
178
179
```python
180
async def jetstream_handler(msg):
181
try:
182
# Check if this is a JetStream message
183
if not msg.metadata:
184
print("Not a JetStream message")
185
return
186
187
print(f"Processing JetStream message {msg.metadata.sequence.stream}")
188
189
# Long-running processing
190
if await is_long_running_task(msg.data):
191
await msg.in_progress() # Extend processing deadline
192
193
# Process the message
194
result = await process_message(msg.data)
195
196
if result.success:
197
await msg.ack() # Successfully processed
198
print("Message acknowledged successfully")
199
else:
200
# Temporary failure - retry after delay
201
await msg.nak(delay=30.0) # Retry in 30 seconds
202
print("Message negatively acknowledged, will retry")
203
204
except FatalProcessingError as e:
205
# Permanent failure - don't retry
206
await msg.term()
207
print(f"Message terminated due to fatal error: {e}")
208
209
except Exception as e:
210
# Temporary error - retry immediately
211
await msg.nak()
212
print(f"Message processing failed, will retry: {e}")
213
214
# Subscribe to JetStream
215
js = nc.jetstream()
216
await js.subscribe("events.orders", cb=jetstream_handler, manual_ack=True)
217
```
218
219
### JetStream Metadata
220
221
Access JetStream-specific message metadata.
222
223
```python { .api }
224
class Metadata:
225
"""JetStream message metadata."""
226
sequence: SequencePair
227
num_delivered: int
228
num_pending: int
229
timestamp: datetime
230
stream: str
231
consumer: str
232
domain: str
233
234
class SequencePair:
235
"""Consumer and stream sequence numbers."""
236
consumer: int
237
stream: int
238
```
239
240
#### Usage Examples
241
242
```python
243
async def metadata_handler(msg):
244
if not msg.metadata:
245
print("Core NATS message (no JetStream metadata)")
246
return
247
248
meta = msg.metadata
249
print(f"JetStream Message Metadata:")
250
print(f" Stream: {meta.stream}")
251
print(f" Consumer: {meta.consumer}")
252
print(f" Stream Sequence: {meta.sequence.stream}")
253
print(f" Consumer Sequence: {meta.sequence.consumer}")
254
print(f" Delivered: {meta.num_delivered} times")
255
print(f" Pending: {meta.num_pending} messages")
256
print(f" Timestamp: {meta.timestamp}")
257
258
# Handle redelivery scenarios
259
if meta.num_delivered > 1:
260
print(f"This message has been redelivered {meta.num_delivered} times")
261
262
# Maybe handle differently based on delivery count
263
if meta.num_delivered > 3:
264
print("Too many redeliveries, terminating")
265
await msg.term()
266
return
267
268
# Process message
269
await process_jetstream_message(msg.data)
270
await msg.ack()
271
272
# JetStream subscription with metadata handling
273
await js.subscribe("stream.events", cb=metadata_handler)
274
```
275
276
### Subscription Management
277
278
Handle subscriptions and their message flows.
279
280
```python { .api }
281
class Subscription:
282
"""NATS subscription."""
283
284
def subject(self) -> str:
285
"""Get subscription subject pattern."""
286
287
def queue(self) -> str:
288
"""Get queue group name."""
289
290
def messages(self) -> AsyncIterator[Msg]:
291
"""Async iterator for messages."""
292
293
def pending_msgs(self) -> int:
294
"""Number of pending messages."""
295
296
def pending_bytes(self) -> int:
297
"""Number of pending bytes."""
298
299
def delivered(self) -> int:
300
"""Total messages delivered."""
301
302
async def next_msg(self, timeout: float = 1.0) -> Msg:
303
"""Get next message with timeout."""
304
305
async def drain(self) -> None:
306
"""Drain subscription."""
307
308
async def unsubscribe(self, limit: int = 0) -> None:
309
"""Unsubscribe after limit messages."""
310
```
311
312
#### Usage Examples
313
314
```python
315
# Subscription with async iteration
316
sub = await nc.subscribe("events.*")
317
318
async def process_subscription():
319
async for msg in sub.messages():
320
print(f"Processing: {msg.subject}")
321
await handle_message(msg)
322
323
# Break on specific condition
324
if should_stop_processing():
325
break
326
327
# Manual message fetching
328
async def manual_processing():
329
sub = await nc.subscribe("work.queue")
330
331
while True:
332
try:
333
msg = await sub.next_msg(timeout=5.0)
334
await process_work_item(msg)
335
except TimeoutError:
336
print("No messages available")
337
break
338
except Exception as e:
339
print(f"Processing error: {e}")
340
341
# Monitor subscription health
342
async def monitor_subscription():
343
sub = await nc.subscribe("monitoring.*")
344
345
while True:
346
print(f"Subscription stats:")
347
print(f" Pending messages: {sub.pending_msgs()}")
348
print(f" Pending bytes: {sub.pending_bytes()}")
349
print(f" Total delivered: {sub.delivered()}")
350
351
await asyncio.sleep(10) # Check every 10 seconds
352
353
# Graceful subscription shutdown
354
async def graceful_shutdown():
355
# Stop accepting new messages and process pending
356
await sub.drain()
357
print("Subscription drained")
358
```
359
360
### Message Patterns
361
362
Common message processing patterns and utilities.
363
364
#### Usage Examples
365
366
```python
367
# Fan-out pattern - one message to multiple handlers
368
async def fan_out_handler(msg):
369
# Process message with multiple handlers concurrently
370
await asyncio.gather(
371
analytics_handler(msg),
372
audit_handler(msg),
373
notification_handler(msg)
374
)
375
376
# Message batching
377
class MessageBatcher:
378
def __init__(self, batch_size=10, timeout=5.0):
379
self.batch = []
380
self.batch_size = batch_size
381
self.timeout = timeout
382
self.last_batch_time = time.time()
383
384
async def add_message(self, msg):
385
self.batch.append(msg)
386
387
# Process batch if full or timeout reached
388
if (len(self.batch) >= self.batch_size or
389
time.time() - self.last_batch_time > self.timeout):
390
await self.process_batch()
391
392
async def process_batch(self):
393
if not self.batch:
394
return
395
396
print(f"Processing batch of {len(self.batch)} messages")
397
await process_message_batch(self.batch)
398
399
# Acknowledge all messages
400
for msg in self.batch:
401
if msg.metadata: # JetStream message
402
await msg.ack()
403
404
self.batch.clear()
405
self.last_batch_time = time.time()
406
407
batcher = MessageBatcher()
408
409
async def batching_handler(msg):
410
await batcher.add_message(msg)
411
412
# Content-based routing
413
async def routing_handler(msg):
414
# Route based on subject
415
if msg.subject.startswith("user."):
416
await user_service_handler(msg)
417
elif msg.subject.startswith("order."):
418
await order_service_handler(msg)
419
elif msg.subject.startswith("inventory."):
420
await inventory_handler(msg)
421
else:
422
print(f"Unknown message type: {msg.subject}")
423
424
# Message transformation pipeline
425
async def transform_pipeline(msg):
426
# Step 1: Validate
427
if not await validate_message(msg):
428
await msg.nak()
429
return
430
431
# Step 2: Transform
432
transformed_data = await transform_message_data(msg.data)
433
434
# Step 3: Enrich with external data
435
enriched_data = await enrich_message(transformed_data, msg.headers)
436
437
# Step 4: Store result
438
await store_processed_message(enriched_data)
439
440
# Step 5: Acknowledge
441
if msg.metadata:
442
await msg.ack()
443
```
444
445
## Constants
446
447
```python { .api }
448
# Subscription limits
449
DEFAULT_SUB_PENDING_MSGS_LIMIT = 512 * 1024
450
DEFAULT_SUB_PENDING_BYTES_LIMIT = 128 * 1024 * 1024
451
452
# JetStream limits
453
DEFAULT_JS_SUB_PENDING_MSGS_LIMIT = 512 * 1024
454
DEFAULT_JS_SUB_PENDING_BYTES_LIMIT = 256 * 1024 * 1024
455
456
# Message acknowledgment types
457
class Ack:
458
"""JetStream acknowledgment types."""
459
ACK = "+ACK"
460
NAK = "-NAK"
461
PROGRESS = "+WPI" # Work in Progress
462
TERM = "+TERM" # Terminate
463
```