CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-nats-py

An asyncio Python client for the NATS messaging system with JetStream, microservices, and key-value store support

Pending
Overview
Eval results
Files

jetstream-management.mddocs/

JetStream Management

Administrative APIs for managing JetStream streams, consumers, and accounts. Provides comprehensive configuration, monitoring, and maintenance capabilities for JetStream infrastructure.

Capabilities

Stream Management

Create, configure, and manage JetStream streams for persistent message storage.

class JetStreamManager:
    async def add_stream(
        self,
        config: StreamConfig = None,
        **params
    ) -> StreamInfo:
        """
        Create new JetStream stream.
        
        Parameters:
        - config: Complete stream configuration
        - **params: Individual configuration parameters
        
        Returns:
        Stream information including configuration and state
        """
    
    async def update_stream(
        self,
        config: StreamConfig = None,
        **params
    ) -> StreamInfo:
        """
        Update existing stream configuration.
        
        Parameters:
        - config: Updated stream configuration
        - **params: Individual configuration parameters
        
        Returns:
        Updated stream information
        """
    
    async def delete_stream(self, name: str) -> bool:
        """
        Delete stream and all its messages.
        
        Parameters:
        - name: Stream name to delete
        
        Returns:
        True if stream was deleted
        """
    
    async def stream_info(
        self,
        name: str,
        subjects_filter: str = None
    ) -> StreamInfo:
        """
        Get stream information and statistics.
        
        Parameters:
        - name: Stream name
        - subjects_filter: Filter subjects in response
        
        Returns:
        Stream information including state and configuration
        """
    
    async def find_stream_name_by_subject(self, subject: str) -> str:
        """
        Find stream name that matches subject.
        
        Parameters:
        - subject: Subject to find stream for
        
        Returns:
        Stream name or raises NotFoundError
        """

Usage Examples

import asyncio
import nats
from nats.js.api import StreamConfig

async def main():
    nc = await nats.connect()
    jsm = nc.jsm()
    
    # Create stream with configuration object
    stream_config = StreamConfig(
        name="events",
        subjects=["events.*", "alerts.>"],
        storage="file",
        retention="limits",
        max_msgs=1000000,
        max_bytes=1024*1024*1024,  # 1GB
        max_age=timedelta(days=30),
        max_consumers=10,
        duplicate_window=timedelta(minutes=2)
    )
    
    stream_info = await jsm.add_stream(config=stream_config)
    print(f"Created stream: {stream_info.config.name}")
    
    # Create stream with parameters
    await jsm.add_stream(
        name="metrics",
        subjects=["metrics.cpu.*", "metrics.memory.*"],
        storage="memory",
        max_msgs=100000,
        max_age=timedelta(hours=24)
    )
    
    # Update stream configuration
    await jsm.update_stream(
        name="events",
        max_msgs=2000000,
        description="Updated event stream"
    )
    
    # Get stream information
    info = await jsm.stream_info("events")
    print(f"Stream has {info.state.messages} messages")
    
    # Find stream by subject
    stream_name = await jsm.find_stream_name_by_subject("events.user.login")
    print(f"Subject belongs to stream: {stream_name}")

Stream Listing and Iteration

List and iterate through all streams in the account.

class JetStreamManager:
    async def streams_info(self, offset: int = 0) -> List[StreamInfo]:
        """
        Get information for all streams.
        
        Parameters:
        - offset: Starting offset for pagination
        
        Returns:
        List of stream information objects
        """
    
    async def streams_info_iterator(self, **kwargs) -> AsyncIterator[StreamInfo]:
        """
        Iterate through all streams with pagination.
        
        Returns:
        Async iterator yielding stream information
        """

Usage Examples

# List all streams
streams = await jsm.streams_info()
for stream in streams:
    print(f"Stream: {stream.config.name}, Messages: {stream.state.messages}")

# Iterate through streams
async for stream_info in jsm.streams_info_iterator():
    print(f"Processing stream: {stream_info.config.name}")
    if stream_info.state.bytes > 1024*1024*100:  # 100MB
        print(f"Large stream: {stream_info.config.name}")

Consumer Management

Create and manage consumers for stream consumption patterns.

class JetStreamManager:
    async def add_consumer(
        self,
        stream: str,
        config: ConsumerConfig = None,
        **params
    ) -> ConsumerInfo:
        """
        Create consumer for stream.
        
        Parameters:
        - stream: Stream name
        - config: Consumer configuration
        - **params: Individual configuration parameters
        
        Returns:
        Consumer information including configuration and state
        """
    
    async def delete_consumer(self, stream: str, consumer: str) -> bool:
        """
        Delete consumer from stream.
        
        Parameters:
        - stream: Stream name
        - consumer: Consumer name
        
        Returns:
        True if consumer was deleted
        """
    
    async def consumer_info(self, stream: str, consumer: str) -> ConsumerInfo:
        """
        Get consumer information and statistics.
        
        Parameters:
        - stream: Stream name
        - consumer: Consumer name
        
        Returns:
        Consumer information including state and configuration
        """
    
    async def consumers_info(self, stream: str, offset: int = 0) -> List[ConsumerInfo]:
        """
        Get information for all consumers in stream.
        
        Parameters:
        - stream: Stream name
        - offset: Starting offset for pagination
        
        Returns:
        List of consumer information objects
        """

Usage Examples

from nats.js.api import ConsumerConfig
from datetime import timedelta

# Create durable consumer
consumer_config = ConsumerConfig(
    durable_name="event-processor",
    deliver_policy="all",
    ack_policy="explicit",
    ack_wait=timedelta(seconds=30),
    max_deliver=3,
    filter_subject="events.user.*"
)

consumer_info = await jsm.add_consumer("events", config=consumer_config)
print(f"Created consumer: {consumer_info.name}")

# Create ephemeral consumer with parameters
await jsm.add_consumer(
    stream="metrics",
    deliver_policy="new",
    ack_policy="explicit",
    max_ack_pending=100
)

# Get consumer information
info = await jsm.consumer_info("events", "event-processor")
print(f"Consumer delivered {info.delivered.stream_seq} messages")

# List all consumers for stream
consumers = await jsm.consumers_info("events")
for consumer in consumers:
    print(f"Consumer: {consumer.name}, Pending: {consumer.num_pending}")

Message Management

Direct message operations on streams.

class JetStreamManager:
    async def get_msg(
        self,
        stream_name: str,
        seq: int,
        **kwargs
    ) -> RawStreamMsg:
        """
        Get message by sequence number.
        
        Parameters:
        - stream_name: Stream name
        - seq: Message sequence number
        
        Returns:
        Raw stream message with metadata
        """
    
    async def delete_msg(self, stream_name: str, seq: int) -> bool:
        """
        Delete message by sequence number.
        
        Parameters:
        - stream_name: Stream name  
        - seq: Message sequence number
        
        Returns:
        True if message was deleted
        """
    
    async def get_last_msg(self, stream_name: str, subject: str) -> RawStreamMsg:
        """
        Get last message for subject.
        
        Parameters:
        - stream_name: Stream name
        - subject: Subject filter
        
        Returns:
        Last message matching subject
        """
    
    async def purge_stream(self, name: str, **opts) -> bool:
        """
        Purge messages from stream.
        
        Parameters:
        - name: Stream name
        - subject: Purge messages matching subject filter
        - seq: Purge up to sequence number
        - keep: Keep latest N messages
        
        Returns:
        True if stream was purged
        """

Usage Examples

# Get specific message
msg = await jsm.get_msg("events", seq=12345)
print(f"Message data: {msg.data.decode()}")
print(f"Subject: {msg.subject}")

# Get last message for subject
last_msg = await jsm.get_last_msg("events", "events.user.login")
print(f"Last login: {last_msg.data.decode()}")

# Delete specific message
await jsm.delete_msg("events", seq=12345)

# Purge old messages, keep latest 1000
await jsm.purge_stream("events", keep=1000)

# Purge messages by subject
await jsm.purge_stream("events", subject="events.test.*")

Account Information

Get JetStream account limits and usage information.

class JetStreamManager:
    async def account_info(self) -> AccountInfo:
        """
        Get JetStream account information.
        
        Returns:
        Account information including limits and usage statistics
        """

Usage Examples

# Get account information
account = await jsm.account_info()
print(f"Memory usage: {account.memory} / {account.limits.max_memory}")
print(f"Storage usage: {account.store} / {account.limits.max_storage}")
print(f"Streams: {account.streams} / {account.limits.max_streams}")
print(f"Consumers: {account.consumers} / {account.limits.max_consumers}")

# Check if approaching limits
if account.memory > account.limits.max_memory * 0.8:
    print("Warning: Approaching memory limit")

Configuration Types

from dataclasses import dataclass
from typing import Optional, List, Dict
from datetime import datetime, timedelta

@dataclass
class StreamConfig:
    name: str
    subjects: List[str] = None
    retention: str = "limits"  # "limits", "interest", "workqueue"
    max_consumers: int = -1
    max_msgs: int = -1
    max_bytes: int = -1
    max_age: timedelta = None
    max_msgs_per_subject: int = -1
    max_msg_size: int = -1
    storage: str = "file"  # "file", "memory"
    num_replicas: int = 1
    no_ack: bool = False
    template_owner: str = None
    discard: str = "old"  # "old", "new"
    duplicate_window: timedelta = None
    placement: Placement = None
    mirror: StreamSource = None
    sources: List[StreamSource] = None
    sealed: bool = False
    deny_delete: bool = False
    deny_purge: bool = False
    allow_rollup_hdrs: bool = False
    allow_direct: bool = False
    mirror_direct: bool = False
    republish: RePublish = None
    description: str = None
    metadata: Dict[str, str] = None

@dataclass  
class ConsumerConfig:
    durable_name: Optional[str] = None
    name: Optional[str] = None
    description: Optional[str] = None
    deliver_policy: str = "all"
    opt_start_seq: Optional[int] = None
    opt_start_time: Optional[datetime] = None
    ack_policy: str = "explicit"
    ack_wait: Optional[timedelta] = None
    max_deliver: Optional[int] = None
    filter_subject: Optional[str] = None
    replay_policy: str = "instant"
    rate_limit_bps: Optional[int] = None
    sample_freq: Optional[str] = None
    max_waiting: Optional[int] = None
    max_ack_pending: Optional[int] = None
    flow_control: bool = False
    idle_heartbeat: Optional[timedelta] = None
    headers_only: bool = False
    max_request_batch: Optional[int] = None
    max_request_expires: Optional[timedelta] = None
    inactive_threshold: Optional[timedelta] = None
    num_replicas: int = 0
    mem_storage: bool = False
    metadata: Dict[str, str] = None

Information Types

@dataclass
class StreamInfo:
    config: StreamConfig
    state: StreamState
    cluster: Optional[ClusterInfo] = None
    mirror: Optional[StreamSourceInfo] = None
    sources: Optional[List[StreamSourceInfo]] = None
    alternates: Optional[List[StreamAlternate]] = None

@dataclass
class StreamState:
    messages: int
    bytes: int
    first_seq: int
    first_ts: datetime
    last_seq: int
    last_ts: datetime
    consumers: int
    deleted: Optional[List[int]] = None
    lost: Optional[LostStreamData] = None
    num_subjects: Optional[int] = None

@dataclass
class ConsumerInfo:
    name: str
    config: ConsumerConfig
    delivered: SequenceInfo
    ack_floor: SequenceInfo
    num_ack_pending: int
    num_redelivered: int
    num_waiting: int
    num_pending: int
    cluster: Optional[ClusterInfo] = None
    push_bound: bool = False

@dataclass
class AccountInfo:
    memory: int
    storage: int
    streams: int
    consumers: int
    limits: AccountLimits
    api: APIStats
    domain: Optional[str] = None

@dataclass
class RawStreamMsg:
    subject: str
    seq: int
    data: bytes
    hdrs: Optional[bytes] = None
    time: Optional[datetime] = None
    stream: Optional[str] = None

Install with Tessl CLI

npx tessl i tessl/pypi-nats-py

docs

core-client.md

error-handling.md

index.md

jetstream-management.md

jetstream.md

key-value-store.md

message-handling.md

microservices.md

object-store.md

tile.json