An asyncio Python client for the NATS messaging system with JetStream, microservices, and key-value store support
—
Administrative APIs for managing JetStream streams, consumers, and accounts. Provides comprehensive configuration, monitoring, and maintenance capabilities for JetStream infrastructure.
Create, configure, and manage JetStream streams for persistent message storage.
class JetStreamManager:
async def add_stream(
self,
config: StreamConfig = None,
**params
) -> StreamInfo:
"""
Create new JetStream stream.
Parameters:
- config: Complete stream configuration
- **params: Individual configuration parameters
Returns:
Stream information including configuration and state
"""
async def update_stream(
self,
config: StreamConfig = None,
**params
) -> StreamInfo:
"""
Update existing stream configuration.
Parameters:
- config: Updated stream configuration
- **params: Individual configuration parameters
Returns:
Updated stream information
"""
async def delete_stream(self, name: str) -> bool:
"""
Delete stream and all its messages.
Parameters:
- name: Stream name to delete
Returns:
True if stream was deleted
"""
async def stream_info(
self,
name: str,
subjects_filter: str = None
) -> StreamInfo:
"""
Get stream information and statistics.
Parameters:
- name: Stream name
- subjects_filter: Filter subjects in response
Returns:
Stream information including state and configuration
"""
async def find_stream_name_by_subject(self, subject: str) -> str:
"""
Find stream name that matches subject.
Parameters:
- subject: Subject to find stream for
Returns:
Stream name or raises NotFoundError
"""import asyncio
import nats
from nats.js.api import StreamConfig
async def main():
nc = await nats.connect()
jsm = nc.jsm()
# Create stream with configuration object
stream_config = StreamConfig(
name="events",
subjects=["events.*", "alerts.>"],
storage="file",
retention="limits",
max_msgs=1000000,
max_bytes=1024*1024*1024, # 1GB
max_age=timedelta(days=30),
max_consumers=10,
duplicate_window=timedelta(minutes=2)
)
stream_info = await jsm.add_stream(config=stream_config)
print(f"Created stream: {stream_info.config.name}")
# Create stream with parameters
await jsm.add_stream(
name="metrics",
subjects=["metrics.cpu.*", "metrics.memory.*"],
storage="memory",
max_msgs=100000,
max_age=timedelta(hours=24)
)
# Update stream configuration
await jsm.update_stream(
name="events",
max_msgs=2000000,
description="Updated event stream"
)
# Get stream information
info = await jsm.stream_info("events")
print(f"Stream has {info.state.messages} messages")
# Find stream by subject
stream_name = await jsm.find_stream_name_by_subject("events.user.login")
print(f"Subject belongs to stream: {stream_name}")List and iterate through all streams in the account.
class JetStreamManager:
async def streams_info(self, offset: int = 0) -> List[StreamInfo]:
"""
Get information for all streams.
Parameters:
- offset: Starting offset for pagination
Returns:
List of stream information objects
"""
async def streams_info_iterator(self, **kwargs) -> AsyncIterator[StreamInfo]:
"""
Iterate through all streams with pagination.
Returns:
Async iterator yielding stream information
"""# List all streams
streams = await jsm.streams_info()
for stream in streams:
print(f"Stream: {stream.config.name}, Messages: {stream.state.messages}")
# Iterate through streams
async for stream_info in jsm.streams_info_iterator():
print(f"Processing stream: {stream_info.config.name}")
if stream_info.state.bytes > 1024*1024*100: # 100MB
print(f"Large stream: {stream_info.config.name}")Create and manage consumers for stream consumption patterns.
class JetStreamManager:
async def add_consumer(
self,
stream: str,
config: ConsumerConfig = None,
**params
) -> ConsumerInfo:
"""
Create consumer for stream.
Parameters:
- stream: Stream name
- config: Consumer configuration
- **params: Individual configuration parameters
Returns:
Consumer information including configuration and state
"""
async def delete_consumer(self, stream: str, consumer: str) -> bool:
"""
Delete consumer from stream.
Parameters:
- stream: Stream name
- consumer: Consumer name
Returns:
True if consumer was deleted
"""
async def consumer_info(self, stream: str, consumer: str) -> ConsumerInfo:
"""
Get consumer information and statistics.
Parameters:
- stream: Stream name
- consumer: Consumer name
Returns:
Consumer information including state and configuration
"""
async def consumers_info(self, stream: str, offset: int = 0) -> List[ConsumerInfo]:
"""
Get information for all consumers in stream.
Parameters:
- stream: Stream name
- offset: Starting offset for pagination
Returns:
List of consumer information objects
"""from nats.js.api import ConsumerConfig
from datetime import timedelta
# Create durable consumer
consumer_config = ConsumerConfig(
durable_name="event-processor",
deliver_policy="all",
ack_policy="explicit",
ack_wait=timedelta(seconds=30),
max_deliver=3,
filter_subject="events.user.*"
)
consumer_info = await jsm.add_consumer("events", config=consumer_config)
print(f"Created consumer: {consumer_info.name}")
# Create ephemeral consumer with parameters
await jsm.add_consumer(
stream="metrics",
deliver_policy="new",
ack_policy="explicit",
max_ack_pending=100
)
# Get consumer information
info = await jsm.consumer_info("events", "event-processor")
print(f"Consumer delivered {info.delivered.stream_seq} messages")
# List all consumers for stream
consumers = await jsm.consumers_info("events")
for consumer in consumers:
print(f"Consumer: {consumer.name}, Pending: {consumer.num_pending}")Direct message operations on streams.
class JetStreamManager:
async def get_msg(
self,
stream_name: str,
seq: int,
**kwargs
) -> RawStreamMsg:
"""
Get message by sequence number.
Parameters:
- stream_name: Stream name
- seq: Message sequence number
Returns:
Raw stream message with metadata
"""
async def delete_msg(self, stream_name: str, seq: int) -> bool:
"""
Delete message by sequence number.
Parameters:
- stream_name: Stream name
- seq: Message sequence number
Returns:
True if message was deleted
"""
async def get_last_msg(self, stream_name: str, subject: str) -> RawStreamMsg:
"""
Get last message for subject.
Parameters:
- stream_name: Stream name
- subject: Subject filter
Returns:
Last message matching subject
"""
async def purge_stream(self, name: str, **opts) -> bool:
"""
Purge messages from stream.
Parameters:
- name: Stream name
- subject: Purge messages matching subject filter
- seq: Purge up to sequence number
- keep: Keep latest N messages
Returns:
True if stream was purged
"""# Get specific message
msg = await jsm.get_msg("events", seq=12345)
print(f"Message data: {msg.data.decode()}")
print(f"Subject: {msg.subject}")
# Get last message for subject
last_msg = await jsm.get_last_msg("events", "events.user.login")
print(f"Last login: {last_msg.data.decode()}")
# Delete specific message
await jsm.delete_msg("events", seq=12345)
# Purge old messages, keep latest 1000
await jsm.purge_stream("events", keep=1000)
# Purge messages by subject
await jsm.purge_stream("events", subject="events.test.*")Get JetStream account limits and usage information.
class JetStreamManager:
async def account_info(self) -> AccountInfo:
"""
Get JetStream account information.
Returns:
Account information including limits and usage statistics
"""# Get account information
account = await jsm.account_info()
print(f"Memory usage: {account.memory} / {account.limits.max_memory}")
print(f"Storage usage: {account.store} / {account.limits.max_storage}")
print(f"Streams: {account.streams} / {account.limits.max_streams}")
print(f"Consumers: {account.consumers} / {account.limits.max_consumers}")
# Check if approaching limits
if account.memory > account.limits.max_memory * 0.8:
print("Warning: Approaching memory limit")from dataclasses import dataclass
from typing import Optional, List, Dict
from datetime import datetime, timedelta
@dataclass
class StreamConfig:
name: str
subjects: List[str] = None
retention: str = "limits" # "limits", "interest", "workqueue"
max_consumers: int = -1
max_msgs: int = -1
max_bytes: int = -1
max_age: timedelta = None
max_msgs_per_subject: int = -1
max_msg_size: int = -1
storage: str = "file" # "file", "memory"
num_replicas: int = 1
no_ack: bool = False
template_owner: str = None
discard: str = "old" # "old", "new"
duplicate_window: timedelta = None
placement: Placement = None
mirror: StreamSource = None
sources: List[StreamSource] = None
sealed: bool = False
deny_delete: bool = False
deny_purge: bool = False
allow_rollup_hdrs: bool = False
allow_direct: bool = False
mirror_direct: bool = False
republish: RePublish = None
description: str = None
metadata: Dict[str, str] = None
@dataclass
class ConsumerConfig:
durable_name: Optional[str] = None
name: Optional[str] = None
description: Optional[str] = None
deliver_policy: str = "all"
opt_start_seq: Optional[int] = None
opt_start_time: Optional[datetime] = None
ack_policy: str = "explicit"
ack_wait: Optional[timedelta] = None
max_deliver: Optional[int] = None
filter_subject: Optional[str] = None
replay_policy: str = "instant"
rate_limit_bps: Optional[int] = None
sample_freq: Optional[str] = None
max_waiting: Optional[int] = None
max_ack_pending: Optional[int] = None
flow_control: bool = False
idle_heartbeat: Optional[timedelta] = None
headers_only: bool = False
max_request_batch: Optional[int] = None
max_request_expires: Optional[timedelta] = None
inactive_threshold: Optional[timedelta] = None
num_replicas: int = 0
mem_storage: bool = False
metadata: Dict[str, str] = None@dataclass
class StreamInfo:
config: StreamConfig
state: StreamState
cluster: Optional[ClusterInfo] = None
mirror: Optional[StreamSourceInfo] = None
sources: Optional[List[StreamSourceInfo]] = None
alternates: Optional[List[StreamAlternate]] = None
@dataclass
class StreamState:
messages: int
bytes: int
first_seq: int
first_ts: datetime
last_seq: int
last_ts: datetime
consumers: int
deleted: Optional[List[int]] = None
lost: Optional[LostStreamData] = None
num_subjects: Optional[int] = None
@dataclass
class ConsumerInfo:
name: str
config: ConsumerConfig
delivered: SequenceInfo
ack_floor: SequenceInfo
num_ack_pending: int
num_redelivered: int
num_waiting: int
num_pending: int
cluster: Optional[ClusterInfo] = None
push_bound: bool = False
@dataclass
class AccountInfo:
memory: int
storage: int
streams: int
consumers: int
limits: AccountLimits
api: APIStats
domain: Optional[str] = None
@dataclass
class RawStreamMsg:
subject: str
seq: int
data: bytes
hdrs: Optional[bytes] = None
time: Optional[datetime] = None
stream: Optional[str] = NoneInstall with Tessl CLI
npx tessl i tessl/pypi-nats-py