CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-nats-py

An asyncio Python client for the NATS messaging system with JetStream, microservices, and key-value store support

Pending
Overview
Eval results
Files

core-client.mddocs/

Core NATS Client

Essential connection management and messaging functionality for the NATS Python client. Provides connection lifecycle management, publish/subscribe messaging, request/reply patterns, and subscription handling.

Capabilities

Connection Management

Establish and manage connections to NATS servers with support for clustering, authentication, TLS, and automatic reconnection.

async def connect(
    servers: Union[str, List[str]] = ["nats://localhost:4222"],
    error_cb: Optional[ErrorCallback] = None,
    disconnected_cb: Optional[Callback] = None,
    closed_cb: Optional[Callback] = None,
    discovered_server_cb: Optional[Callback] = None,
    reconnected_cb: Optional[Callback] = None,
    name: Optional[str] = None,
    pedantic: bool = False,
    verbose: bool = False,
    allow_reconnect: bool = True,
    connect_timeout: int = 2,
    reconnect_time_wait: int = 2,
    max_reconnect_attempts: int = 60,
    ping_interval: int = 120,
    max_outstanding_pings: int = 2,
    dont_randomize: bool = False,
    flusher_queue_size: int = 1024,
    no_echo: bool = False,
    tls: Optional[ssl.SSLContext] = None,
    tls_hostname: Optional[str] = None,
    tls_handshake_first: bool = False,
    user: Optional[str] = None,
    password: Optional[str] = None,
    token: Optional[str] = None,
    drain_timeout: int = 30,
    signature_cb: Optional[SignatureCallback] = None,
    user_jwt_cb: Optional[JWTCallback] = None,
    user_credentials: Optional[Credentials] = None,
    nkeys_seed: Optional[str] = None,
    nkeys_seed_str: Optional[str] = None,
    inbox_prefix: Union[str, bytes] = b"_INBOX",
    pending_size: int = 2 * 1024 * 1024,
    flush_timeout: Optional[float] = None
) -> NATS:
    """
    Connect to NATS server(s).
    
    Parameters:
    - servers: Server URLs to connect to
    - error_cb: Callback for error events
    - disconnected_cb: Callback for disconnection events
    - closed_cb: Callback for connection closed events
    - discovered_server_cb: Callback for server discovery events
    - reconnected_cb: Callback for reconnection events
    - name: Client name for identification
    - pedantic: Enable pedantic protocol checking
    - verbose: Enable verbose protocol logging
    - allow_reconnect: Enable automatic reconnection
    - connect_timeout: Connection timeout in seconds
    - reconnect_time_wait: Wait time between reconnection attempts
    - max_reconnect_attempts: Maximum reconnection attempts
    - ping_interval: Ping interval in seconds
    - max_outstanding_pings: Maximum outstanding pings
    - dont_randomize: Don't randomize server connection order
    - flusher_queue_size: Maximum flusher queue size
    - no_echo: Disable message echo from server
    - tls: SSL context for TLS connections
    - tls_hostname: Hostname for TLS verification
    - tls_handshake_first: Perform TLS handshake before INFO
    - user: Username for authentication
    - password: Password for authentication
    - token: Token for authentication
    - drain_timeout: Drain timeout in seconds
    - signature_cb: Callback for message signing
    - user_jwt_cb: Callback for JWT authentication
    - user_credentials: Path to user credentials file
    - nkeys_seed: NKEYS seed for authentication
    - nkeys_seed_str: NKEYS seed string for authentication
    - inbox_prefix: Prefix for inbox subjects
    - pending_size: Maximum pending data size
    - flush_timeout: Flush timeout in seconds
    
    Returns:
    Connected NATS client instance
    """

Usage Examples

import asyncio
import nats
import ssl

# Basic connection
nc = await nats.connect()

# Multiple servers with clustering
nc = await nats.connect([
    "nats://server1:4222",
    "nats://server2:4222", 
    "nats://server3:4222"
])

# Authenticated connection
nc = await nats.connect(
    servers=["nats://demo.nats.io:4222"],
    user="myuser",
    password="mypass"
)

# TLS connection
ssl_ctx = ssl.create_default_context()
nc = await nats.connect(
    servers=["tls://demo.nats.io:4443"],
    tls=ssl_ctx
)

# With credentials file
nc = await nats.connect(
    servers=["nats://connect.ngs.global"],
    user_credentials="/path/to/user.creds"
)

Connection Lifecycle

Manage connection state and gracefully close connections.

class NATS:
    # Connection state constants
    DISCONNECTED = 0
    CONNECTED = 1
    CLOSED = 2
    RECONNECTING = 3
    CONNECTING = 4
    DRAINING_SUBS = 5
    DRAINING_PUBS = 6
    
    async def close(self) -> None:
        """Close the connection immediately."""
    
    async def drain(self) -> None:
        """
        Drain and close connection gracefully.
        Stops accepting new messages and closes after processing pending.
        """
    
    def is_connected(self) -> bool:
        """Check if client is connected to server."""
    
    def is_closed(self) -> bool:
        """Check if client connection is closed."""
    
    def is_reconnecting(self) -> bool:
        """Check if client is in reconnecting state."""
    
    def is_connecting(self) -> bool:
        """Check if client is in connecting state."""
    
    def is_draining(self) -> bool:
        """Check if client is draining subscriptions."""
    
    def is_draining_pubs(self) -> bool:
        """Check if client is draining publications."""

Publishing

Send messages to subjects with optional reply subjects and headers.

class NATS:
    async def publish(
        self,
        subject: str,
        payload: bytes = b"",
        reply: str = "",
        headers: Optional[Dict[str, str]] = None
    ) -> None:
        """
        Publish message to subject.
        
        Parameters:
        - subject: Target subject
        - payload: Message data
        - reply: Reply subject for responses
        - headers: Message headers
        """
    
    async def flush(self, timeout: float = 10.0) -> None:
        """
        Flush pending messages to server.
        
        Parameters:
        - timeout: Flush timeout in seconds
        """
    
    def pending_data_size(self) -> int:
        """Get size of pending outbound data in bytes."""

Usage Examples

# Simple publish
await nc.publish("events.user.created", b'{"user_id": 123}')

# Publish with reply subject
await nc.publish("events.notify", b"Alert!", reply="responses.alerts")

# Publish with headers
headers = {"Content-Type": "application/json", "User-ID": "123"}
await nc.publish("api.requests", b'{"action": "create"}', headers=headers)

# Ensure delivery
await nc.publish("critical.event", b"Important data")
await nc.flush()  # Wait for server acknowledgment

Subscribing

Subscribe to subjects with callback handlers or async iteration.

class NATS:
    async def subscribe(
        self,
        subject: str,
        queue: str = "",
        cb: Optional[Callable[[Msg], Awaitable[None]]] = None,
        future: Optional[asyncio.Future] = None,
        max_msgs: int = 0,
        pending_msgs_limit: int = 65536,
        pending_bytes_limit: int = 67108864
    ) -> Subscription:
        """
        Subscribe to subject.
        
        Parameters:
        - subject: Subject pattern to subscribe to
        - queue: Queue group for load balancing
        - cb: Message callback handler
        - future: Future to complete on first message
        - max_msgs: Maximum messages (0 = unlimited)
        - pending_msgs_limit: Maximum pending messages
        - pending_bytes_limit: Maximum pending bytes
        
        Returns:
        Subscription object
        """
    
    def new_inbox(self) -> str:
        """Generate unique inbox subject for replies."""

Usage Examples

# Callback-based subscription
async def message_handler(msg):
    print(f"Received on {msg.subject}: {msg.data.decode()}")
    
sub = await nc.subscribe("events.*", cb=message_handler)

# Queue group subscription for load balancing
await nc.subscribe("work.queue", queue="workers", cb=process_work)

# Async iteration subscription
sub = await nc.subscribe("notifications")
async for msg in sub.messages():
    await handle_notification(msg)
    if some_condition:
        break

# One-time subscription
future = asyncio.Future()
await nc.subscribe("single.event", future=future, max_msgs=1)
msg = await future

Request-Reply

Send requests and receive responses with timeout handling.

class NATS:
    async def request(
        self,
        subject: str,
        payload: bytes = b"",
        timeout: float = 0.5,
        old_style: bool = False,
        headers: Optional[Dict[str, Any]] = None
    ) -> Msg:
        """
        Send request and wait for response.
        
        Parameters:
        - subject: Request subject
        - payload: Request data
        - timeout: Response timeout in seconds
        - old_style: Use old-style request format
        - headers: Request headers
        
        Returns:
        Response message
        
        Raises:
        - TimeoutError: No response within timeout
        - NoRespondersError: No services listening
        """

Usage Examples

# Simple request-reply
try:
    response = await nc.request("api.users.get", b'{"id": 123}', timeout=2.0)
    user_data = response.data.decode()
    print(f"User: {user_data}")
except TimeoutError:
    print("Request timed out")
except NoRespondersError:
    print("No service available")

# Request with headers
headers = {"Authorization": "Bearer token123"}
response = await nc.request(
    "secure.api.data", 
    b'{"query": "SELECT * FROM users"}',
    headers=headers,
    timeout=5.0
)

Server Information

Access server and connection information.

class NATS:
    def connected_url(self) -> str:
        """Get currently connected server URL."""
    
    def servers(self) -> List[str]:
        """Get list of configured servers."""
    
    def discovered_servers(self) -> List[str]:
        """Get list of servers discovered from cluster."""
    
    def max_payload(self) -> int:
        """Get maximum payload size supported by server."""
    
    def client_id(self) -> int:
        """Get unique client ID assigned by server."""
    
    def connected_server_version(self) -> str:
        """Get version of connected server."""
    
    def last_error(self) -> Exception:
        """Get last error encountered."""

JetStream Integration

Access JetStream functionality from the core client.

class NATS:
    def jetstream(self, **opts) -> JetStreamContext:
        """
        Get JetStream context for stream operations.
        
        Parameters:
        - prefix: Custom JetStream API prefix
        - domain: JetStream domain
        - timeout: Default operation timeout
        
        Returns:
        JetStream context instance
        """
    
    def jsm(self, **opts) -> JetStreamManager:
        """
        Get JetStream manager for administrative operations.
        
        Parameters:
        - prefix: Custom JetStream API prefix
        - domain: JetStream domain
        - timeout: Default operation timeout
        
        Returns:
        JetStream manager instance
        """

Types

from typing import Union, List, Dict, Optional, Callable, AsyncIterator
import ssl
import asyncio

# Connection types
Servers = Union[str, List[str]]
ConnectOptions = Dict[str, Union[str, int, bool, Callable]]
SSLContext = ssl.SSLContext

# Callback types  
ErrorCallback = Callable[[Exception], None]
ClosedCallback = Callable[[], None]
DisconnectedCallback = Callable[[], None]
ReconnectedCallback = Callable[[], None]
MessageCallback = Callable[[Msg], None]
SignatureCallback = Callable[[str], bytes]
UserJWTCallback = Callable[[], Tuple[str, str]]

# Message types
Headers = Optional[Dict[str, str]]
Payload = bytes
Subject = str

Install with Tessl CLI

npx tessl i tessl/pypi-nats-py

docs

core-client.md

error-handling.md

index.md

jetstream-management.md

jetstream.md

key-value-store.md

message-handling.md

microservices.md

object-store.md

tile.json