0
# JetStream Management
1
2
Administrative APIs for managing JetStream streams, consumers, and accounts. Provides comprehensive configuration, monitoring, and maintenance capabilities for JetStream infrastructure.
3
4
## Capabilities
5
6
### Stream Management
7
8
Create, configure, and manage JetStream streams for persistent message storage.
9
10
```python { .api }
11
class JetStreamManager:
12
async def add_stream(
13
self,
14
config: StreamConfig = None,
15
**params
16
) -> StreamInfo:
17
"""
18
Create new JetStream stream.
19
20
Parameters:
21
- config: Complete stream configuration
22
- **params: Individual configuration parameters
23
24
Returns:
25
Stream information including configuration and state
26
"""
27
28
async def update_stream(
29
self,
30
config: StreamConfig = None,
31
**params
32
) -> StreamInfo:
33
"""
34
Update existing stream configuration.
35
36
Parameters:
37
- config: Updated stream configuration
38
- **params: Individual configuration parameters
39
40
Returns:
41
Updated stream information
42
"""
43
44
async def delete_stream(self, name: str) -> bool:
45
"""
46
Delete stream and all its messages.
47
48
Parameters:
49
- name: Stream name to delete
50
51
Returns:
52
True if stream was deleted
53
"""
54
55
async def stream_info(
56
self,
57
name: str,
58
subjects_filter: str = None
59
) -> StreamInfo:
60
"""
61
Get stream information and statistics.
62
63
Parameters:
64
- name: Stream name
65
- subjects_filter: Filter subjects in response
66
67
Returns:
68
Stream information including state and configuration
69
"""
70
71
async def find_stream_name_by_subject(self, subject: str) -> str:
72
"""
73
Find stream name that matches subject.
74
75
Parameters:
76
- subject: Subject to find stream for
77
78
Returns:
79
Stream name or raises NotFoundError
80
"""
81
```
82
83
#### Usage Examples
84
85
```python
86
import asyncio
87
import nats
88
from nats.js.api import StreamConfig
89
90
async def main():
91
nc = await nats.connect()
92
jsm = nc.jsm()
93
94
# Create stream with configuration object
95
stream_config = StreamConfig(
96
name="events",
97
subjects=["events.*", "alerts.>"],
98
storage="file",
99
retention="limits",
100
max_msgs=1000000,
101
max_bytes=1024*1024*1024, # 1GB
102
max_age=timedelta(days=30),
103
max_consumers=10,
104
duplicate_window=timedelta(minutes=2)
105
)
106
107
stream_info = await jsm.add_stream(config=stream_config)
108
print(f"Created stream: {stream_info.config.name}")
109
110
# Create stream with parameters
111
await jsm.add_stream(
112
name="metrics",
113
subjects=["metrics.cpu.*", "metrics.memory.*"],
114
storage="memory",
115
max_msgs=100000,
116
max_age=timedelta(hours=24)
117
)
118
119
# Update stream configuration
120
await jsm.update_stream(
121
name="events",
122
max_msgs=2000000,
123
description="Updated event stream"
124
)
125
126
# Get stream information
127
info = await jsm.stream_info("events")
128
print(f"Stream has {info.state.messages} messages")
129
130
# Find stream by subject
131
stream_name = await jsm.find_stream_name_by_subject("events.user.login")
132
print(f"Subject belongs to stream: {stream_name}")
133
```
134
135
### Stream Listing and Iteration
136
137
List and iterate through all streams in the account.
138
139
```python { .api }
140
class JetStreamManager:
141
async def streams_info(self, offset: int = 0) -> List[StreamInfo]:
142
"""
143
Get information for all streams.
144
145
Parameters:
146
- offset: Starting offset for pagination
147
148
Returns:
149
List of stream information objects
150
"""
151
152
async def streams_info_iterator(self, **kwargs) -> AsyncIterator[StreamInfo]:
153
"""
154
Iterate through all streams with pagination.
155
156
Returns:
157
Async iterator yielding stream information
158
"""
159
```
160
161
#### Usage Examples
162
163
```python
164
# List all streams
165
streams = await jsm.streams_info()
166
for stream in streams:
167
print(f"Stream: {stream.config.name}, Messages: {stream.state.messages}")
168
169
# Iterate through streams
170
async for stream_info in jsm.streams_info_iterator():
171
print(f"Processing stream: {stream_info.config.name}")
172
if stream_info.state.bytes > 1024*1024*100: # 100MB
173
print(f"Large stream: {stream_info.config.name}")
174
```
175
176
### Consumer Management
177
178
Create and manage consumers for stream consumption patterns.
179
180
```python { .api }
181
class JetStreamManager:
182
async def add_consumer(
183
self,
184
stream: str,
185
config: ConsumerConfig = None,
186
**params
187
) -> ConsumerInfo:
188
"""
189
Create consumer for stream.
190
191
Parameters:
192
- stream: Stream name
193
- config: Consumer configuration
194
- **params: Individual configuration parameters
195
196
Returns:
197
Consumer information including configuration and state
198
"""
199
200
async def delete_consumer(self, stream: str, consumer: str) -> bool:
201
"""
202
Delete consumer from stream.
203
204
Parameters:
205
- stream: Stream name
206
- consumer: Consumer name
207
208
Returns:
209
True if consumer was deleted
210
"""
211
212
async def consumer_info(self, stream: str, consumer: str) -> ConsumerInfo:
213
"""
214
Get consumer information and statistics.
215
216
Parameters:
217
- stream: Stream name
218
- consumer: Consumer name
219
220
Returns:
221
Consumer information including state and configuration
222
"""
223
224
async def consumers_info(self, stream: str, offset: int = 0) -> List[ConsumerInfo]:
225
"""
226
Get information for all consumers in stream.
227
228
Parameters:
229
- stream: Stream name
230
- offset: Starting offset for pagination
231
232
Returns:
233
List of consumer information objects
234
"""
235
```
236
237
#### Usage Examples
238
239
```python
240
from nats.js.api import ConsumerConfig
241
from datetime import timedelta
242
243
# Create durable consumer
244
consumer_config = ConsumerConfig(
245
durable_name="event-processor",
246
deliver_policy="all",
247
ack_policy="explicit",
248
ack_wait=timedelta(seconds=30),
249
max_deliver=3,
250
filter_subject="events.user.*"
251
)
252
253
consumer_info = await jsm.add_consumer("events", config=consumer_config)
254
print(f"Created consumer: {consumer_info.name}")
255
256
# Create ephemeral consumer with parameters
257
await jsm.add_consumer(
258
stream="metrics",
259
deliver_policy="new",
260
ack_policy="explicit",
261
max_ack_pending=100
262
)
263
264
# Get consumer information
265
info = await jsm.consumer_info("events", "event-processor")
266
print(f"Consumer delivered {info.delivered.stream_seq} messages")
267
268
# List all consumers for stream
269
consumers = await jsm.consumers_info("events")
270
for consumer in consumers:
271
print(f"Consumer: {consumer.name}, Pending: {consumer.num_pending}")
272
```
273
274
### Message Management
275
276
Direct message operations on streams.
277
278
```python { .api }
279
class JetStreamManager:
280
async def get_msg(
281
self,
282
stream_name: str,
283
seq: int,
284
**kwargs
285
) -> RawStreamMsg:
286
"""
287
Get message by sequence number.
288
289
Parameters:
290
- stream_name: Stream name
291
- seq: Message sequence number
292
293
Returns:
294
Raw stream message with metadata
295
"""
296
297
async def delete_msg(self, stream_name: str, seq: int) -> bool:
298
"""
299
Delete message by sequence number.
300
301
Parameters:
302
- stream_name: Stream name
303
- seq: Message sequence number
304
305
Returns:
306
True if message was deleted
307
"""
308
309
async def get_last_msg(self, stream_name: str, subject: str) -> RawStreamMsg:
310
"""
311
Get last message for subject.
312
313
Parameters:
314
- stream_name: Stream name
315
- subject: Subject filter
316
317
Returns:
318
Last message matching subject
319
"""
320
321
async def purge_stream(self, name: str, **opts) -> bool:
322
"""
323
Purge messages from stream.
324
325
Parameters:
326
- name: Stream name
327
- subject: Purge messages matching subject filter
328
- seq: Purge up to sequence number
329
- keep: Keep latest N messages
330
331
Returns:
332
True if stream was purged
333
"""
334
```
335
336
#### Usage Examples
337
338
```python
339
# Get specific message
340
msg = await jsm.get_msg("events", seq=12345)
341
print(f"Message data: {msg.data.decode()}")
342
print(f"Subject: {msg.subject}")
343
344
# Get last message for subject
345
last_msg = await jsm.get_last_msg("events", "events.user.login")
346
print(f"Last login: {last_msg.data.decode()}")
347
348
# Delete specific message
349
await jsm.delete_msg("events", seq=12345)
350
351
# Purge old messages, keep latest 1000
352
await jsm.purge_stream("events", keep=1000)
353
354
# Purge messages by subject
355
await jsm.purge_stream("events", subject="events.test.*")
356
```
357
358
### Account Information
359
360
Get JetStream account limits and usage information.
361
362
```python { .api }
363
class JetStreamManager:
364
async def account_info(self) -> AccountInfo:
365
"""
366
Get JetStream account information.
367
368
Returns:
369
Account information including limits and usage statistics
370
"""
371
```
372
373
#### Usage Examples
374
375
```python
376
# Get account information
377
account = await jsm.account_info()
378
print(f"Memory usage: {account.memory} / {account.limits.max_memory}")
379
print(f"Storage usage: {account.store} / {account.limits.max_storage}")
380
print(f"Streams: {account.streams} / {account.limits.max_streams}")
381
print(f"Consumers: {account.consumers} / {account.limits.max_consumers}")
382
383
# Check if approaching limits
384
if account.memory > account.limits.max_memory * 0.8:
385
print("Warning: Approaching memory limit")
386
```
387
388
## Configuration Types
389
390
```python { .api }
391
from dataclasses import dataclass
392
from typing import Optional, List, Dict
393
from datetime import datetime, timedelta
394
395
@dataclass
396
class StreamConfig:
397
name: str
398
subjects: List[str] = None
399
retention: str = "limits" # "limits", "interest", "workqueue"
400
max_consumers: int = -1
401
max_msgs: int = -1
402
max_bytes: int = -1
403
max_age: timedelta = None
404
max_msgs_per_subject: int = -1
405
max_msg_size: int = -1
406
storage: str = "file" # "file", "memory"
407
num_replicas: int = 1
408
no_ack: bool = False
409
template_owner: str = None
410
discard: str = "old" # "old", "new"
411
duplicate_window: timedelta = None
412
placement: Placement = None
413
mirror: StreamSource = None
414
sources: List[StreamSource] = None
415
sealed: bool = False
416
deny_delete: bool = False
417
deny_purge: bool = False
418
allow_rollup_hdrs: bool = False
419
allow_direct: bool = False
420
mirror_direct: bool = False
421
republish: RePublish = None
422
description: str = None
423
metadata: Dict[str, str] = None
424
425
@dataclass
426
class ConsumerConfig:
427
durable_name: Optional[str] = None
428
name: Optional[str] = None
429
description: Optional[str] = None
430
deliver_policy: str = "all"
431
opt_start_seq: Optional[int] = None
432
opt_start_time: Optional[datetime] = None
433
ack_policy: str = "explicit"
434
ack_wait: Optional[timedelta] = None
435
max_deliver: Optional[int] = None
436
filter_subject: Optional[str] = None
437
replay_policy: str = "instant"
438
rate_limit_bps: Optional[int] = None
439
sample_freq: Optional[str] = None
440
max_waiting: Optional[int] = None
441
max_ack_pending: Optional[int] = None
442
flow_control: bool = False
443
idle_heartbeat: Optional[timedelta] = None
444
headers_only: bool = False
445
max_request_batch: Optional[int] = None
446
max_request_expires: Optional[timedelta] = None
447
inactive_threshold: Optional[timedelta] = None
448
num_replicas: int = 0
449
mem_storage: bool = False
450
metadata: Dict[str, str] = None
451
```
452
453
## Information Types
454
455
```python { .api }
456
@dataclass
457
class StreamInfo:
458
config: StreamConfig
459
state: StreamState
460
cluster: Optional[ClusterInfo] = None
461
mirror: Optional[StreamSourceInfo] = None
462
sources: Optional[List[StreamSourceInfo]] = None
463
alternates: Optional[List[StreamAlternate]] = None
464
465
@dataclass
466
class StreamState:
467
messages: int
468
bytes: int
469
first_seq: int
470
first_ts: datetime
471
last_seq: int
472
last_ts: datetime
473
consumers: int
474
deleted: Optional[List[int]] = None
475
lost: Optional[LostStreamData] = None
476
num_subjects: Optional[int] = None
477
478
@dataclass
479
class ConsumerInfo:
480
name: str
481
config: ConsumerConfig
482
delivered: SequenceInfo
483
ack_floor: SequenceInfo
484
num_ack_pending: int
485
num_redelivered: int
486
num_waiting: int
487
num_pending: int
488
cluster: Optional[ClusterInfo] = None
489
push_bound: bool = False
490
491
@dataclass
492
class AccountInfo:
493
memory: int
494
storage: int
495
streams: int
496
consumers: int
497
limits: AccountLimits
498
api: APIStats
499
domain: Optional[str] = None
500
501
@dataclass
502
class RawStreamMsg:
503
subject: str
504
seq: int
505
data: bytes
506
hdrs: Optional[bytes] = None
507
time: Optional[datetime] = None
508
stream: Optional[str] = None
509
```