or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-client.mderror-handling.mdindex.mdjetstream-management.mdjetstream.mdkey-value-store.mdmessage-handling.mdmicroservices.mdobject-store.md

message-handling.mddocs/

0

# Message Handling

1

2

Message processing with headers, reply handling, JetStream acknowledgments, and comprehensive metadata access for building sophisticated message-driven applications.

3

4

## Capabilities

5

6

### Message Structure

7

8

Understanding NATS message structure and properties.

9

10

```python { .api }

11

class Msg:

12

"""NATS message representation."""

13

subject: str

14

data: bytes

15

reply: str

16

headers: Optional[Dict[str, str]]

17

header: Optional[Dict[str, str]] # Alias for headers

18

sid: int

19

is_acked: bool

20

metadata: Optional[Metadata] # JetStream metadata

21

```

22

23

#### Usage Examples

24

25

```python

26

async def message_handler(msg):

27

print(f"Received message:")

28

print(f" Subject: {msg.subject}")

29

print(f" Data: {msg.data.decode()}")

30

print(f" Reply: {msg.reply}")

31

print(f" Headers: {msg.headers}")

32

print(f" Subscription ID: {msg.sid}")

33

34

# Access JetStream metadata if available

35

if msg.metadata:

36

print(f" Stream: {msg.metadata.stream}")

37

print(f" Sequence: {msg.metadata.sequence.stream}")

38

print(f" Consumer: {msg.metadata.sequence.consumer}")

39

40

# Subscribe and handle messages

41

await nc.subscribe("events.*", cb=message_handler)

42

```

43

44

### Response Handling

45

46

Send responses to messages with reply subjects.

47

48

```python { .api }

49

class Msg:

50

async def respond(self, data: bytes) -> None:

51

"""

52

Send response to message reply subject.

53

54

Parameters:

55

- data: Response data

56

57

Raises:

58

- ValueError: No reply subject available

59

"""

60

```

61

62

#### Usage Examples

63

64

```python

65

async def request_handler(msg):

66

try:

67

# Process request

68

request_data = json.loads(msg.data.decode())

69

result = await process_request(request_data)

70

71

# Send response

72

response = json.dumps(result).encode()

73

await msg.respond(response)

74

75

except Exception as e:

76

# Send error response

77

error_response = json.dumps({

78

"error": str(e),

79

"type": type(e).__name__

80

}).encode()

81

await msg.respond(error_response)

82

83

# Subscribe to requests

84

await nc.subscribe("api.requests", cb=request_handler)

85

86

# Client making request

87

response = await nc.request("api.requests", b'{"action": "get_user", "id": 123}')

88

result = json.loads(response.data.decode())

89

```

90

91

### Header Processing

92

93

Work with message headers for metadata and routing.

94

95

```python { .api }

96

class Msg:

97

headers: Optional[Dict[str, str]]

98

header: Optional[Dict[str, str]] # Alias for headers property

99

```

100

101

#### Usage Examples

102

103

```python

104

async def header_aware_handler(msg):

105

# Check for authentication header

106

auth_token = msg.headers.get("Authorization") if msg.headers else None

107

if not auth_token:

108

print("No authorization header")

109

return

110

111

# Check content type

112

content_type = msg.headers.get("Content-Type", "text/plain")

113

114

# Process based on content type

115

if content_type == "application/json":

116

data = json.loads(msg.data.decode())

117

elif content_type == "application/xml":

118

data = parse_xml(msg.data)

119

else:

120

data = msg.data.decode()

121

122

# Check for correlation ID for request tracking

123

correlation_id = msg.headers.get("Correlation-ID")

124

print(f"Processing request {correlation_id}")

125

126

await process_data(data)

127

128

# Publishing with headers

129

headers = {

130

"Content-Type": "application/json",

131

"Authorization": "Bearer token123",

132

"Correlation-ID": "req-12345",

133

"User-ID": "user456"

134

}

135

136

await nc.publish(

137

"api.data",

138

json.dumps({"key": "value"}).encode(),

139

headers=headers

140

)

141

```

142

143

### JetStream Message Acknowledgments

144

145

Handle JetStream message acknowledgments with various strategies.

146

147

```python { .api }

148

class Msg:

149

is_acked: bool

150

151

async def ack(self) -> None:

152

"""Acknowledge JetStream message successfully processed."""

153

154

async def ack_sync(self, timeout: float = 1.0) -> None:

155

"""

156

Synchronously acknowledge JetStream message.

157

158

Parameters:

159

- timeout: Acknowledgment timeout in seconds

160

"""

161

162

async def nak(self, delay: float = None) -> None:

163

"""

164

Negative acknowledgment - message will be redelivered.

165

166

Parameters:

167

- delay: Delay before redelivery in seconds

168

"""

169

170

async def in_progress(self) -> None:

171

"""Extend acknowledgment deadline for longer processing."""

172

173

async def term(self) -> None:

174

"""Terminate message processing - no further redelivery."""

175

```

176

177

#### Usage Examples

178

179

```python

180

async def jetstream_handler(msg):

181

try:

182

# Check if this is a JetStream message

183

if not msg.metadata:

184

print("Not a JetStream message")

185

return

186

187

print(f"Processing JetStream message {msg.metadata.sequence.stream}")

188

189

# Long-running processing

190

if await is_long_running_task(msg.data):

191

await msg.in_progress() # Extend processing deadline

192

193

# Process the message

194

result = await process_message(msg.data)

195

196

if result.success:

197

await msg.ack() # Successfully processed

198

print("Message acknowledged successfully")

199

else:

200

# Temporary failure - retry after delay

201

await msg.nak(delay=30.0) # Retry in 30 seconds

202

print("Message negatively acknowledged, will retry")

203

204

except FatalProcessingError as e:

205

# Permanent failure - don't retry

206

await msg.term()

207

print(f"Message terminated due to fatal error: {e}")

208

209

except Exception as e:

210

# Temporary error - retry immediately

211

await msg.nak()

212

print(f"Message processing failed, will retry: {e}")

213

214

# Subscribe to JetStream

215

js = nc.jetstream()

216

await js.subscribe("events.orders", cb=jetstream_handler, manual_ack=True)

217

```

218

219

### JetStream Metadata

220

221

Access JetStream-specific message metadata.

222

223

```python { .api }

224

class Metadata:

225

"""JetStream message metadata."""

226

sequence: SequencePair

227

num_delivered: int

228

num_pending: int

229

timestamp: datetime

230

stream: str

231

consumer: str

232

domain: str

233

234

class SequencePair:

235

"""Consumer and stream sequence numbers."""

236

consumer: int

237

stream: int

238

```

239

240

#### Usage Examples

241

242

```python

243

async def metadata_handler(msg):

244

if not msg.metadata:

245

print("Core NATS message (no JetStream metadata)")

246

return

247

248

meta = msg.metadata

249

print(f"JetStream Message Metadata:")

250

print(f" Stream: {meta.stream}")

251

print(f" Consumer: {meta.consumer}")

252

print(f" Stream Sequence: {meta.sequence.stream}")

253

print(f" Consumer Sequence: {meta.sequence.consumer}")

254

print(f" Delivered: {meta.num_delivered} times")

255

print(f" Pending: {meta.num_pending} messages")

256

print(f" Timestamp: {meta.timestamp}")

257

258

# Handle redelivery scenarios

259

if meta.num_delivered > 1:

260

print(f"This message has been redelivered {meta.num_delivered} times")

261

262

# Maybe handle differently based on delivery count

263

if meta.num_delivered > 3:

264

print("Too many redeliveries, terminating")

265

await msg.term()

266

return

267

268

# Process message

269

await process_jetstream_message(msg.data)

270

await msg.ack()

271

272

# JetStream subscription with metadata handling

273

await js.subscribe("stream.events", cb=metadata_handler)

274

```

275

276

### Subscription Management

277

278

Handle subscriptions and their message flows.

279

280

```python { .api }

281

class Subscription:

282

"""NATS subscription."""

283

284

def subject(self) -> str:

285

"""Get subscription subject pattern."""

286

287

def queue(self) -> str:

288

"""Get queue group name."""

289

290

def messages(self) -> AsyncIterator[Msg]:

291

"""Async iterator for messages."""

292

293

def pending_msgs(self) -> int:

294

"""Number of pending messages."""

295

296

def pending_bytes(self) -> int:

297

"""Number of pending bytes."""

298

299

def delivered(self) -> int:

300

"""Total messages delivered."""

301

302

async def next_msg(self, timeout: float = 1.0) -> Msg:

303

"""Get next message with timeout."""

304

305

async def drain(self) -> None:

306

"""Drain subscription."""

307

308

async def unsubscribe(self, limit: int = 0) -> None:

309

"""Unsubscribe after limit messages."""

310

```

311

312

#### Usage Examples

313

314

```python

315

# Subscription with async iteration

316

sub = await nc.subscribe("events.*")

317

318

async def process_subscription():

319

async for msg in sub.messages():

320

print(f"Processing: {msg.subject}")

321

await handle_message(msg)

322

323

# Break on specific condition

324

if should_stop_processing():

325

break

326

327

# Manual message fetching

328

async def manual_processing():

329

sub = await nc.subscribe("work.queue")

330

331

while True:

332

try:

333

msg = await sub.next_msg(timeout=5.0)

334

await process_work_item(msg)

335

except TimeoutError:

336

print("No messages available")

337

break

338

except Exception as e:

339

print(f"Processing error: {e}")

340

341

# Monitor subscription health

342

async def monitor_subscription():

343

sub = await nc.subscribe("monitoring.*")

344

345

while True:

346

print(f"Subscription stats:")

347

print(f" Pending messages: {sub.pending_msgs()}")

348

print(f" Pending bytes: {sub.pending_bytes()}")

349

print(f" Total delivered: {sub.delivered()}")

350

351

await asyncio.sleep(10) # Check every 10 seconds

352

353

# Graceful subscription shutdown

354

async def graceful_shutdown():

355

# Stop accepting new messages and process pending

356

await sub.drain()

357

print("Subscription drained")

358

```

359

360

### Message Patterns

361

362

Common message processing patterns and utilities.

363

364

#### Usage Examples

365

366

```python

367

# Fan-out pattern - one message to multiple handlers

368

async def fan_out_handler(msg):

369

# Process message with multiple handlers concurrently

370

await asyncio.gather(

371

analytics_handler(msg),

372

audit_handler(msg),

373

notification_handler(msg)

374

)

375

376

# Message batching

377

class MessageBatcher:

378

def __init__(self, batch_size=10, timeout=5.0):

379

self.batch = []

380

self.batch_size = batch_size

381

self.timeout = timeout

382

self.last_batch_time = time.time()

383

384

async def add_message(self, msg):

385

self.batch.append(msg)

386

387

# Process batch if full or timeout reached

388

if (len(self.batch) >= self.batch_size or

389

time.time() - self.last_batch_time > self.timeout):

390

await self.process_batch()

391

392

async def process_batch(self):

393

if not self.batch:

394

return

395

396

print(f"Processing batch of {len(self.batch)} messages")

397

await process_message_batch(self.batch)

398

399

# Acknowledge all messages

400

for msg in self.batch:

401

if msg.metadata: # JetStream message

402

await msg.ack()

403

404

self.batch.clear()

405

self.last_batch_time = time.time()

406

407

batcher = MessageBatcher()

408

409

async def batching_handler(msg):

410

await batcher.add_message(msg)

411

412

# Content-based routing

413

async def routing_handler(msg):

414

# Route based on subject

415

if msg.subject.startswith("user."):

416

await user_service_handler(msg)

417

elif msg.subject.startswith("order."):

418

await order_service_handler(msg)

419

elif msg.subject.startswith("inventory."):

420

await inventory_handler(msg)

421

else:

422

print(f"Unknown message type: {msg.subject}")

423

424

# Message transformation pipeline

425

async def transform_pipeline(msg):

426

# Step 1: Validate

427

if not await validate_message(msg):

428

await msg.nak()

429

return

430

431

# Step 2: Transform

432

transformed_data = await transform_message_data(msg.data)

433

434

# Step 3: Enrich with external data

435

enriched_data = await enrich_message(transformed_data, msg.headers)

436

437

# Step 4: Store result

438

await store_processed_message(enriched_data)

439

440

# Step 5: Acknowledge

441

if msg.metadata:

442

await msg.ack()

443

```

444

445

## Constants

446

447

```python { .api }

448

# Subscription limits

449

DEFAULT_SUB_PENDING_MSGS_LIMIT = 512 * 1024

450

DEFAULT_SUB_PENDING_BYTES_LIMIT = 128 * 1024 * 1024

451

452

# JetStream limits

453

DEFAULT_JS_SUB_PENDING_MSGS_LIMIT = 512 * 1024

454

DEFAULT_JS_SUB_PENDING_BYTES_LIMIT = 256 * 1024 * 1024

455

456

# Message acknowledgment types

457

class Ack:

458

"""JetStream acknowledgment types."""

459

ACK = "+ACK"

460

NAK = "-NAK"

461

PROGRESS = "+WPI" # Work in Progress

462

TERM = "+TERM" # Terminate

463

```