0
# JetStream
1
2
JetStream provides persistent messaging capabilities built on NATS streams. It offers message storage, replay, delivery guarantees, and advanced consumption patterns for building resilient distributed applications.
3
4
## Capabilities
5
6
### JetStream Context
7
8
Core JetStream functionality for publishing to streams and subscribing to consumers.
9
10
```python { .api }
11
class JetStreamContext:
12
async def publish(
13
self,
14
subject: str,
15
payload: bytes = b"",
16
timeout: Optional[float] = None,
17
stream: Optional[str] = None,
18
headers: Optional[Dict[str, Any]] = None
19
) -> PubAck:
20
"""
21
Publish message to JetStream stream.
22
23
Parameters:
24
- subject: Stream subject
25
- payload: Message data
26
- timeout: Publish timeout
27
- stream: Target stream name (optional)
28
- headers: Message headers
29
30
Returns:
31
Publish acknowledgment with sequence info
32
"""
33
34
async def publish_async(
35
self,
36
subject: str,
37
payload: bytes = b"",
38
wait_stall: Optional[float] = None,
39
stream: Optional[str] = None,
40
headers: Optional[Dict] = None
41
) -> asyncio.Future[PubAck]:
42
"""
43
Publish message asynchronously without blocking.
44
45
Returns:
46
Future that resolves to publish acknowledgment
47
"""
48
49
def publish_async_pending(self) -> int:
50
"""Get count of pending async publishes."""
51
52
async def publish_async_completed(self) -> None:
53
"""Wait for all pending async publishes to complete."""
54
```
55
56
#### Usage Examples
57
58
```python
59
import asyncio
60
import nats
61
62
async def main():
63
nc = await nats.connect()
64
js = nc.jetstream()
65
66
# Synchronous publish with acknowledgment
67
ack = await js.publish("events.user.login", b'{"user_id": 123, "timestamp": "2024-01-01T10:00:00Z"}')
68
print(f"Message stored at sequence {ack.seq}")
69
70
# Asynchronous publish for high throughput
71
future1 = await js.publish_async("metrics.cpu", b'{"usage": 75.5}')
72
future2 = await js.publish_async("metrics.memory", b'{"usage": 82.1}')
73
74
# Wait for specific acknowledgments
75
ack1 = await future1
76
ack2 = await future2
77
78
# Wait for all pending publishes
79
await js.publish_async_completed()
80
```
81
82
### Push Subscriptions
83
84
Subscribe to JetStream messages with automatic delivery to callback handlers.
85
86
```python { .api }
87
class JetStreamContext:
88
async def subscribe(
89
self,
90
subject: str,
91
queue: str = "",
92
cb: Callable[[Msg], None] = None,
93
durable: str = None,
94
stream: str = None,
95
config: ConsumerConfig = None,
96
manual_ack: bool = False,
97
ordered_consumer: bool = False,
98
idle_heartbeat: float = None,
99
flow_control: bool = False,
100
**kwargs
101
) -> JetStreamSubscription:
102
"""
103
Subscribe to JetStream stream with push delivery.
104
105
Parameters:
106
- subject: Subject pattern to subscribe to
107
- queue: Queue group for load balancing
108
- cb: Message callback handler
109
- durable: Durable consumer name
110
- stream: Source stream name
111
- config: Consumer configuration
112
- manual_ack: Require manual message acknowledgment
113
- ordered_consumer: Enable ordered message delivery
114
- idle_heartbeat: Heartbeat interval for flow control
115
- flow_control: Enable flow control
116
117
Returns:
118
JetStream subscription
119
"""
120
121
async def subscribe_bind(
122
self,
123
stream: str,
124
consumer: str,
125
**kwargs
126
) -> JetStreamSubscription:
127
"""
128
Bind to existing durable consumer.
129
130
Parameters:
131
- stream: Stream name
132
- consumer: Consumer name
133
134
Returns:
135
Bound JetStream subscription
136
"""
137
```
138
139
#### Usage Examples
140
141
```python
142
# Simple JetStream subscription
143
async def handle_event(msg):
144
data = msg.data.decode()
145
print(f"Processing: {data}")
146
await msg.ack() # Acknowledge message
147
148
js_sub = await js.subscribe("events.>", cb=handle_event)
149
150
# Durable consumer subscription
151
await js.subscribe(
152
"orders.created",
153
durable="order-processor",
154
manual_ack=True,
155
cb=process_order
156
)
157
158
# Ordered consumer for sequential processing
159
await js.subscribe(
160
"audit.logs",
161
ordered_consumer=True,
162
cb=process_audit_log
163
)
164
165
# Queue group for load balancing
166
await js.subscribe(
167
"work.tasks",
168
queue="workers",
169
durable="task-worker",
170
cb=process_task
171
)
172
```
173
174
### Pull Subscriptions
175
176
Subscribe with manual message fetching for controlled consumption patterns.
177
178
```python { .api }
179
class JetStreamContext:
180
async def pull_subscribe(
181
self,
182
subject: str,
183
durable: str = None,
184
stream: str = None,
185
config: ConsumerConfig = None,
186
**kwargs
187
) -> PullSubscription:
188
"""
189
Create pull-based subscription for manual message fetching.
190
191
Parameters:
192
- subject: Subject pattern to subscribe to
193
- durable: Durable consumer name
194
- stream: Source stream name
195
- config: Consumer configuration
196
197
Returns:
198
Pull subscription for manual fetching
199
"""
200
201
async def pull_subscribe_bind(
202
self,
203
stream: str,
204
consumer: str,
205
**kwargs
206
) -> PullSubscription:
207
"""
208
Bind pull subscription to existing consumer.
209
210
Parameters:
211
- stream: Stream name
212
- consumer: Consumer name
213
214
Returns:
215
Bound pull subscription
216
"""
217
```
218
219
#### Usage Examples
220
221
```python
222
# Pull subscription with manual fetching
223
pull_sub = await js.pull_subscribe("batch.jobs", durable="job-processor")
224
225
# Fetch specific number of messages
226
msgs = await pull_sub.fetch(batch_size=10, timeout=5.0)
227
for msg in msgs:
228
await process_job(msg.data)
229
await msg.ack()
230
231
# Fetch with wait
232
msgs = await pull_sub.fetch(batch_size=5, timeout=30.0)
233
if msgs:
234
await process_batch(msgs)
235
236
# Continuous fetching loop
237
async for msg in pull_sub.messages():
238
try:
239
await process_message(msg.data)
240
await msg.ack()
241
except Exception as e:
242
print(f"Processing failed: {e}")
243
await msg.nak() # Negative acknowledgment for redelivery
244
```
245
246
### Message Acknowledgment
247
248
Handle JetStream message acknowledgments with various strategies.
249
250
```python { .api }
251
class Msg:
252
async def ack(self) -> None:
253
"""Acknowledge message successfully processed."""
254
255
async def ack_sync(self, timeout: float = 1.0) -> None:
256
"""Synchronously acknowledge message with timeout."""
257
258
async def nak(self, delay: float = None) -> None:
259
"""
260
Negative acknowledgment - message will be redelivered.
261
262
Parameters:
263
- delay: Delay before redelivery in seconds
264
"""
265
266
async def in_progress(self) -> None:
267
"""Extend acknowledgment deadline for longer processing."""
268
269
async def term(self) -> None:
270
"""Terminate message - no further redelivery."""
271
```
272
273
#### Usage Examples
274
275
```python
276
async def message_handler(msg):
277
try:
278
# Long-running processing
279
await msg.in_progress() # Extend ack deadline
280
281
result = await complex_processing(msg.data)
282
283
if result.success:
284
await msg.ack() # Success
285
else:
286
await msg.nak(delay=30.0) # Retry after 30 seconds
287
288
except FatalError:
289
await msg.term() # Don't retry
290
except Exception:
291
await msg.nak() # Retry immediately
292
```
293
294
### Utility Functions
295
296
Helper functions for JetStream message handling.
297
298
```python { .api }
299
class JetStreamContext:
300
def is_status_msg(self, msg: Msg) -> bool:
301
"""
302
Check if message is a JetStream status message.
303
304
Parameters:
305
- msg: Message to check
306
307
Returns:
308
True if message is status message
309
"""
310
```
311
312
## Consumer Configuration
313
314
```python { .api }
315
from dataclasses import dataclass
316
from typing import Optional, List
317
from datetime import datetime, timedelta
318
319
@dataclass
320
class ConsumerConfig:
321
durable_name: Optional[str] = None
322
name: Optional[str] = None
323
description: Optional[str] = None
324
deliver_policy: str = "all" # "all", "last", "new", "by_start_sequence", "by_start_time"
325
opt_start_seq: Optional[int] = None
326
opt_start_time: Optional[datetime] = None
327
ack_policy: str = "explicit" # "none", "all", "explicit"
328
ack_wait: Optional[timedelta] = None
329
max_deliver: Optional[int] = None
330
filter_subject: Optional[str] = None
331
replay_policy: str = "instant" # "instant", "original"
332
rate_limit_bps: Optional[int] = None
333
sample_freq: Optional[str] = None
334
max_waiting: Optional[int] = None
335
max_ack_pending: Optional[int] = None
336
flow_control: bool = False
337
idle_heartbeat: Optional[timedelta] = None
338
headers_only: bool = False
339
max_request_batch: Optional[int] = None
340
max_request_expires: Optional[timedelta] = None
341
inactive_threshold: Optional[timedelta] = None
342
```
343
344
## JetStream Message Types
345
346
```python { .api }
347
@dataclass
348
class PubAck:
349
"""JetStream publish acknowledgment."""
350
stream: str
351
seq: int
352
duplicate: bool = False
353
domain: Optional[str] = None
354
355
class JetStreamSubscription:
356
"""JetStream push subscription."""
357
async def next_msg(self, timeout: float = 1.0) -> Msg:
358
"""Get next message with timeout."""
359
360
async def drain(self) -> None:
361
"""Drain subscription."""
362
363
def messages(self) -> AsyncIterator[Msg]:
364
"""Async iterator for messages."""
365
366
class PullSubscription:
367
"""JetStream pull subscription."""
368
async def fetch(
369
self,
370
batch_size: int = 1,
371
timeout: float = 5.0
372
) -> List[Msg]:
373
"""Fetch batch of messages."""
374
375
def messages(self) -> AsyncIterator[Msg]:
376
"""Async iterator for messages."""
377
```
378
379
## Constants
380
381
```python { .api }
382
# JetStream API constants
383
DEFAULT_JS_SUB_PENDING_MSGS_LIMIT = 512 * 1024
384
DEFAULT_JS_SUB_PENDING_BYTES_LIMIT = 256 * 1024 * 1024
385
386
# Delivery policies
387
DELIVER_ALL = "all"
388
DELIVER_LAST = "last"
389
DELIVER_NEW = "new"
390
DELIVER_BY_START_SEQUENCE = "by_start_sequence"
391
DELIVER_BY_START_TIME = "by_start_time"
392
393
# Acknowledgment policies
394
ACK_NONE = "none"
395
ACK_ALL = "all"
396
ACK_EXPLICIT = "explicit"
397
398
# Replay policies
399
REPLAY_INSTANT = "instant"
400
REPLAY_ORIGINAL = "original"
401
```