CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-nats-py

An asyncio Python client for the NATS messaging system with JetStream, microservices, and key-value store support

Pending
Overview
Eval results
Files

jetstream.mddocs/

JetStream

JetStream provides persistent messaging capabilities built on NATS streams. It offers message storage, replay, delivery guarantees, and advanced consumption patterns for building resilient distributed applications.

Capabilities

JetStream Context

Core JetStream functionality for publishing to streams and subscribing to consumers.

class JetStreamContext:
    async def publish(
        self,
        subject: str,
        payload: bytes = b"",
        timeout: Optional[float] = None,
        stream: Optional[str] = None,
        headers: Optional[Dict[str, Any]] = None
    ) -> PubAck:
        """
        Publish message to JetStream stream.
        
        Parameters:
        - subject: Stream subject
        - payload: Message data
        - timeout: Publish timeout
        - stream: Target stream name (optional)
        - headers: Message headers
        
        Returns:
        Publish acknowledgment with sequence info
        """
    
    async def publish_async(
        self,
        subject: str,
        payload: bytes = b"",
        wait_stall: Optional[float] = None,
        stream: Optional[str] = None,
        headers: Optional[Dict] = None
    ) -> asyncio.Future[PubAck]:
        """
        Publish message asynchronously without blocking.
        
        Returns:
        Future that resolves to publish acknowledgment
        """
    
    def publish_async_pending(self) -> int:
        """Get count of pending async publishes."""
    
    async def publish_async_completed(self) -> None:
        """Wait for all pending async publishes to complete."""

Usage Examples

import asyncio
import nats

async def main():
    nc = await nats.connect()
    js = nc.jetstream()
    
    # Synchronous publish with acknowledgment
    ack = await js.publish("events.user.login", b'{"user_id": 123, "timestamp": "2024-01-01T10:00:00Z"}')
    print(f"Message stored at sequence {ack.seq}")
    
    # Asynchronous publish for high throughput
    future1 = await js.publish_async("metrics.cpu", b'{"usage": 75.5}')
    future2 = await js.publish_async("metrics.memory", b'{"usage": 82.1}')
    
    # Wait for specific acknowledgments
    ack1 = await future1
    ack2 = await future2
    
    # Wait for all pending publishes
    await js.publish_async_completed()

Push Subscriptions

Subscribe to JetStream messages with automatic delivery to callback handlers.

class JetStreamContext:
    async def subscribe(
        self,
        subject: str,
        queue: str = "",
        cb: Callable[[Msg], None] = None,
        durable: str = None,
        stream: str = None,
        config: ConsumerConfig = None,
        manual_ack: bool = False,
        ordered_consumer: bool = False,
        idle_heartbeat: float = None,
        flow_control: bool = False,
        **kwargs
    ) -> JetStreamSubscription:
        """
        Subscribe to JetStream stream with push delivery.
        
        Parameters:
        - subject: Subject pattern to subscribe to
        - queue: Queue group for load balancing
        - cb: Message callback handler
        - durable: Durable consumer name
        - stream: Source stream name
        - config: Consumer configuration
        - manual_ack: Require manual message acknowledgment
        - ordered_consumer: Enable ordered message delivery
        - idle_heartbeat: Heartbeat interval for flow control
        - flow_control: Enable flow control
        
        Returns:
        JetStream subscription
        """
    
    async def subscribe_bind(
        self,
        stream: str,
        consumer: str,
        **kwargs
    ) -> JetStreamSubscription:
        """
        Bind to existing durable consumer.
        
        Parameters:
        - stream: Stream name
        - consumer: Consumer name
        
        Returns:
        Bound JetStream subscription
        """

Usage Examples

# Simple JetStream subscription
async def handle_event(msg):
    data = msg.data.decode()
    print(f"Processing: {data}")
    await msg.ack()  # Acknowledge message

js_sub = await js.subscribe("events.>", cb=handle_event)

# Durable consumer subscription
await js.subscribe(
    "orders.created",
    durable="order-processor",
    manual_ack=True,
    cb=process_order
)

# Ordered consumer for sequential processing
await js.subscribe(
    "audit.logs",
    ordered_consumer=True,
    cb=process_audit_log
)

# Queue group for load balancing
await js.subscribe(
    "work.tasks",
    queue="workers",
    durable="task-worker",
    cb=process_task
)

Pull Subscriptions

Subscribe with manual message fetching for controlled consumption patterns.

class JetStreamContext:
    async def pull_subscribe(
        self,
        subject: str,
        durable: str = None,
        stream: str = None,
        config: ConsumerConfig = None,
        **kwargs
    ) -> PullSubscription:
        """
        Create pull-based subscription for manual message fetching.
        
        Parameters:
        - subject: Subject pattern to subscribe to
        - durable: Durable consumer name
        - stream: Source stream name
        - config: Consumer configuration
        
        Returns:
        Pull subscription for manual fetching
        """
    
    async def pull_subscribe_bind(
        self,
        stream: str,
        consumer: str,
        **kwargs
    ) -> PullSubscription:
        """
        Bind pull subscription to existing consumer.
        
        Parameters:
        - stream: Stream name
        - consumer: Consumer name
        
        Returns:
        Bound pull subscription
        """

Usage Examples

# Pull subscription with manual fetching
pull_sub = await js.pull_subscribe("batch.jobs", durable="job-processor")

# Fetch specific number of messages
msgs = await pull_sub.fetch(batch_size=10, timeout=5.0)
for msg in msgs:
    await process_job(msg.data)
    await msg.ack()

# Fetch with wait
msgs = await pull_sub.fetch(batch_size=5, timeout=30.0)
if msgs:
    await process_batch(msgs)

# Continuous fetching loop
async for msg in pull_sub.messages():
    try:
        await process_message(msg.data)
        await msg.ack()
    except Exception as e:
        print(f"Processing failed: {e}")
        await msg.nak()  # Negative acknowledgment for redelivery

Message Acknowledgment

Handle JetStream message acknowledgments with various strategies.

class Msg:
    async def ack(self) -> None:
        """Acknowledge message successfully processed."""
    
    async def ack_sync(self, timeout: float = 1.0) -> None:
        """Synchronously acknowledge message with timeout."""
    
    async def nak(self, delay: float = None) -> None:
        """
        Negative acknowledgment - message will be redelivered.
        
        Parameters:
        - delay: Delay before redelivery in seconds
        """
    
    async def in_progress(self) -> None:
        """Extend acknowledgment deadline for longer processing."""
    
    async def term(self) -> None:
        """Terminate message - no further redelivery."""

Usage Examples

async def message_handler(msg):
    try:
        # Long-running processing
        await msg.in_progress()  # Extend ack deadline
        
        result = await complex_processing(msg.data)
        
        if result.success:
            await msg.ack()  # Success
        else:
            await msg.nak(delay=30.0)  # Retry after 30 seconds
            
    except FatalError:
        await msg.term()  # Don't retry
    except Exception:
        await msg.nak()  # Retry immediately

Utility Functions

Helper functions for JetStream message handling.

class JetStreamContext:
    def is_status_msg(self, msg: Msg) -> bool:
        """
        Check if message is a JetStream status message.
        
        Parameters:
        - msg: Message to check
        
        Returns:
        True if message is status message
        """

Consumer Configuration

from dataclasses import dataclass
from typing import Optional, List
from datetime import datetime, timedelta

@dataclass
class ConsumerConfig:
    durable_name: Optional[str] = None
    name: Optional[str] = None
    description: Optional[str] = None
    deliver_policy: str = "all"  # "all", "last", "new", "by_start_sequence", "by_start_time"
    opt_start_seq: Optional[int] = None
    opt_start_time: Optional[datetime] = None
    ack_policy: str = "explicit"  # "none", "all", "explicit"
    ack_wait: Optional[timedelta] = None
    max_deliver: Optional[int] = None
    filter_subject: Optional[str] = None
    replay_policy: str = "instant"  # "instant", "original"
    rate_limit_bps: Optional[int] = None
    sample_freq: Optional[str] = None
    max_waiting: Optional[int] = None
    max_ack_pending: Optional[int] = None
    flow_control: bool = False
    idle_heartbeat: Optional[timedelta] = None
    headers_only: bool = False
    max_request_batch: Optional[int] = None
    max_request_expires: Optional[timedelta] = None
    inactive_threshold: Optional[timedelta] = None

JetStream Message Types

@dataclass
class PubAck:
    """JetStream publish acknowledgment."""
    stream: str
    seq: int
    duplicate: bool = False
    domain: Optional[str] = None

class JetStreamSubscription:
    """JetStream push subscription."""
    async def next_msg(self, timeout: float = 1.0) -> Msg:
        """Get next message with timeout."""
    
    async def drain(self) -> None:
        """Drain subscription."""
    
    def messages(self) -> AsyncIterator[Msg]:
        """Async iterator for messages."""

class PullSubscription:
    """JetStream pull subscription."""
    async def fetch(
        self, 
        batch_size: int = 1, 
        timeout: float = 5.0
    ) -> List[Msg]:
        """Fetch batch of messages."""
    
    def messages(self) -> AsyncIterator[Msg]:
        """Async iterator for messages."""

Constants

# JetStream API constants
DEFAULT_JS_SUB_PENDING_MSGS_LIMIT = 512 * 1024
DEFAULT_JS_SUB_PENDING_BYTES_LIMIT = 256 * 1024 * 1024

# Delivery policies
DELIVER_ALL = "all"
DELIVER_LAST = "last" 
DELIVER_NEW = "new"
DELIVER_BY_START_SEQUENCE = "by_start_sequence"
DELIVER_BY_START_TIME = "by_start_time"

# Acknowledgment policies
ACK_NONE = "none"
ACK_ALL = "all"
ACK_EXPLICIT = "explicit"

# Replay policies
REPLAY_INSTANT = "instant"
REPLAY_ORIGINAL = "original"

Install with Tessl CLI

npx tessl i tessl/pypi-nats-py

docs

core-client.md

error-handling.md

index.md

jetstream-management.md

jetstream.md

key-value-store.md

message-handling.md

microservices.md

object-store.md

tile.json