or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-client.mderror-handling.mdindex.mdjetstream-management.mdjetstream.mdkey-value-store.mdmessage-handling.mdmicroservices.mdobject-store.md
tile.json

tessl/pypi-nats-py

An asyncio Python client for the NATS messaging system with JetStream, microservices, and key-value store support

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/nats-py@2.11.x

To install, run

npx @tessl/cli install tessl/pypi-nats-py@2.11.0

index.mddocs/

NATS Python Client

An asyncio Python client for the NATS messaging system with comprehensive support for JetStream persistent messaging, microservices framework, key-value store, object store, and high-performance real-time messaging patterns.

Package Information

  • Package Name: nats-py
  • Language: Python
  • Installation: pip install nats-py
  • Minimum Python Version: 3.7+

Core Imports

import nats

Common for working with the client:

from nats.aio.client import Client as NATS

JetStream functionality:

# JetStream is accessed via the client's jetstream() method
nc = await nats.connect()
js = nc.jetstream()

Microservices functionality:

from nats.micro import add_service

Error handling:

from nats.errors import ConnectionClosedError, TimeoutError

Basic Usage

import asyncio
import nats

async def main():
    # Connect to NATS server
    nc = await nats.connect("nats://localhost:4222")
    
    # Simple publish
    await nc.publish("foo", b"Hello World!")
    
    # Simple subscribe
    async def message_handler(msg):
        print(f"Received: {msg.data.decode()}")
        
    await nc.subscribe("foo", cb=message_handler)
    
    # Request-reply pattern
    response = await nc.request("help", b"help me", timeout=1.0)
    print(f"Response: {response.data.decode()}")
    
    # Cleanup
    await nc.drain()
    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())

Architecture

NATS Python client provides a layered architecture for scalable messaging:

  • Core Client: Connection management, publish/subscribe, request/reply messaging
  • JetStream: Stream processing layer for persistent messaging, key-value store, object store
  • Microservices: Service discovery, load balancing, monitoring framework
  • Transport Layer: TCP, WebSocket, TLS support with automatic reconnection
  • Protocol Layer: NATS wire protocol implementation with high-performance parsing

This design enables the client to serve as the foundation for distributed applications requiring reliable message delivery, real-time communication, and persistent data storage.

Capabilities

Core NATS Client

Essential connection management and messaging functionality including connect/disconnect, publish/subscribe, request/reply patterns, subscription management, and connection lifecycle handling.

async def connect(servers=["nats://localhost:4222"], **options) -> NATS: ...

class NATS:
    async def publish(self, subject: str, payload: bytes = b"", reply: str = "", headers: dict = None) -> None: ...
    async def subscribe(self, subject: str, queue: str = "", cb: callable = None, **kwargs) -> Subscription: ...
    async def request(self, subject: str, payload: bytes = b"", timeout: float = 0.5, headers: dict = None) -> Msg: ...
    async def close(self) -> None: ...
    async def drain(self) -> None: ...

Core NATS Client

JetStream Stream Processing

Persistent messaging with streams, consumers, message acknowledgments, delivery guarantees, and advanced features like ordered consumers and pull subscriptions for building resilient applications.

class JetStreamContext:
    async def publish(self, subject: str, payload: bytes = b"", timeout: float = None, stream: str = None, headers: dict = None) -> PubAck: ...
    async def subscribe(self, subject: str, durable: str = None, config: ConsumerConfig = None, **kwargs) -> JetStreamSubscription: ...
    async def pull_subscribe(self, subject: str, durable: str = None, config: ConsumerConfig = None, **kwargs) -> PullSubscription: ...

JetStream

JetStream Management

Administrative APIs for creating and managing streams, consumers, accounts with comprehensive configuration options, monitoring capabilities, and cluster management.

class JetStreamManager:
    async def add_stream(self, config: StreamConfig = None, **params) -> StreamInfo: ...
    async def add_consumer(self, stream: str, config: ConsumerConfig = None, **params) -> ConsumerInfo: ...
    async def account_info(self) -> AccountInfo: ...
    async def delete_stream(self, name: str) -> bool: ...

JetStream Management

Key-Value Store

Distributed key-value storage built on JetStream streams with atomic operations, conditional updates, history tracking, and watch capabilities for building stateful applications.

class KeyValue:
    async def get(self, key: str, revision: int = None) -> Entry: ...
    async def put(self, key: str, value: bytes, revision: int = None) -> int: ...
    async def delete(self, key: str, revision: int = None) -> bool: ...
    async def watch(self, key: str, **kwargs) -> AsyncIterator[Entry]: ...

Key-Value Store

Object Store

Scalable object storage with metadata, chunking for large objects, content addressing, and efficient streaming for storing and retrieving binary data and files.

class ObjectStore:
    async def put(self, name: str, data: bytes, **kwargs) -> ObjectInfo: ...
    async def get(self, name: str) -> bytes: ...
    async def get_info(self, name: str, show_deleted: bool = False) -> ObjectInfo: ...
    async def delete(self, name: str) -> bool: ...

Object Store

Microservices Framework

Service discovery, request routing, monitoring, and health checks with built-in load balancing, error handling, and comprehensive service information for building distributed microservice architectures.

async def add_service(nc: NATS, config: ServiceConfig = None, **kwargs) -> Service: ...

class Service:
    async def start(self) -> None: ...
    async def stop(self) -> None: ...
    async def add_endpoint(self, config: EndpointConfig) -> Endpoint: ...

Microservices

Message Handling

Message processing with headers, reply handling, JetStream acknowledgments, and comprehensive metadata access for building sophisticated message-driven applications.

class Msg:
    subject: str
    data: bytes
    reply: str
    headers: dict
    
    async def respond(self, data: bytes) -> None: ...
    async def ack(self) -> None: ...
    async def nak(self, delay: float = None) -> None: ...

Message Handling

Error Handling

Comprehensive exception hierarchy covering connection, protocol, JetStream, key-value, object store, and microservice errors with specific error types for precise error handling.

class Error(Exception): ...
class ConnectionClosedError(Error): ...
class TimeoutError(Error): ...
class NoRespondersError(Error): ...
# ... additional error types

Error Handling

Types

from typing import Union, List, Dict, Optional, Callable, AsyncIterator, Awaitable, Any
from dataclasses import dataclass

# Core types used across the API
Servers = Union[str, List[str]]
Handler = Callable[[Msg], Awaitable[None]]
Headers = Optional[Dict[str, str]]

# Callback types
Callback = Callable[[], Awaitable[None]]
ErrorCallback = Callable[[Exception], Awaitable[None]]
JWTCallback = Callable[[], Union[bytearray, bytes]]
SignatureCallback = Callable[[str], bytes]