An asyncio Python client for the NATS messaging system with JetStream, microservices, and key-value store support
npx @tessl/cli install tessl/pypi-nats-py@2.11.0An asyncio Python client for the NATS messaging system with comprehensive support for JetStream persistent messaging, microservices framework, key-value store, object store, and high-performance real-time messaging patterns.
pip install nats-pyimport natsCommon for working with the client:
from nats.aio.client import Client as NATSJetStream functionality:
# JetStream is accessed via the client's jetstream() method
nc = await nats.connect()
js = nc.jetstream()Microservices functionality:
from nats.micro import add_serviceError handling:
from nats.errors import ConnectionClosedError, TimeoutErrorimport asyncio
import nats
async def main():
# Connect to NATS server
nc = await nats.connect("nats://localhost:4222")
# Simple publish
await nc.publish("foo", b"Hello World!")
# Simple subscribe
async def message_handler(msg):
print(f"Received: {msg.data.decode()}")
await nc.subscribe("foo", cb=message_handler)
# Request-reply pattern
response = await nc.request("help", b"help me", timeout=1.0)
print(f"Response: {response.data.decode()}")
# Cleanup
await nc.drain()
await nc.close()
if __name__ == '__main__':
asyncio.run(main())NATS Python client provides a layered architecture for scalable messaging:
This design enables the client to serve as the foundation for distributed applications requiring reliable message delivery, real-time communication, and persistent data storage.
Essential connection management and messaging functionality including connect/disconnect, publish/subscribe, request/reply patterns, subscription management, and connection lifecycle handling.
async def connect(servers=["nats://localhost:4222"], **options) -> NATS: ...
class NATS:
async def publish(self, subject: str, payload: bytes = b"", reply: str = "", headers: dict = None) -> None: ...
async def subscribe(self, subject: str, queue: str = "", cb: callable = None, **kwargs) -> Subscription: ...
async def request(self, subject: str, payload: bytes = b"", timeout: float = 0.5, headers: dict = None) -> Msg: ...
async def close(self) -> None: ...
async def drain(self) -> None: ...Persistent messaging with streams, consumers, message acknowledgments, delivery guarantees, and advanced features like ordered consumers and pull subscriptions for building resilient applications.
class JetStreamContext:
async def publish(self, subject: str, payload: bytes = b"", timeout: float = None, stream: str = None, headers: dict = None) -> PubAck: ...
async def subscribe(self, subject: str, durable: str = None, config: ConsumerConfig = None, **kwargs) -> JetStreamSubscription: ...
async def pull_subscribe(self, subject: str, durable: str = None, config: ConsumerConfig = None, **kwargs) -> PullSubscription: ...Administrative APIs for creating and managing streams, consumers, accounts with comprehensive configuration options, monitoring capabilities, and cluster management.
class JetStreamManager:
async def add_stream(self, config: StreamConfig = None, **params) -> StreamInfo: ...
async def add_consumer(self, stream: str, config: ConsumerConfig = None, **params) -> ConsumerInfo: ...
async def account_info(self) -> AccountInfo: ...
async def delete_stream(self, name: str) -> bool: ...Distributed key-value storage built on JetStream streams with atomic operations, conditional updates, history tracking, and watch capabilities for building stateful applications.
class KeyValue:
async def get(self, key: str, revision: int = None) -> Entry: ...
async def put(self, key: str, value: bytes, revision: int = None) -> int: ...
async def delete(self, key: str, revision: int = None) -> bool: ...
async def watch(self, key: str, **kwargs) -> AsyncIterator[Entry]: ...Scalable object storage with metadata, chunking for large objects, content addressing, and efficient streaming for storing and retrieving binary data and files.
class ObjectStore:
async def put(self, name: str, data: bytes, **kwargs) -> ObjectInfo: ...
async def get(self, name: str) -> bytes: ...
async def get_info(self, name: str, show_deleted: bool = False) -> ObjectInfo: ...
async def delete(self, name: str) -> bool: ...Service discovery, request routing, monitoring, and health checks with built-in load balancing, error handling, and comprehensive service information for building distributed microservice architectures.
async def add_service(nc: NATS, config: ServiceConfig = None, **kwargs) -> Service: ...
class Service:
async def start(self) -> None: ...
async def stop(self) -> None: ...
async def add_endpoint(self, config: EndpointConfig) -> Endpoint: ...Message processing with headers, reply handling, JetStream acknowledgments, and comprehensive metadata access for building sophisticated message-driven applications.
class Msg:
subject: str
data: bytes
reply: str
headers: dict
async def respond(self, data: bytes) -> None: ...
async def ack(self) -> None: ...
async def nak(self, delay: float = None) -> None: ...Comprehensive exception hierarchy covering connection, protocol, JetStream, key-value, object store, and microservice errors with specific error types for precise error handling.
class Error(Exception): ...
class ConnectionClosedError(Error): ...
class TimeoutError(Error): ...
class NoRespondersError(Error): ...
# ... additional error typesfrom typing import Union, List, Dict, Optional, Callable, AsyncIterator, Awaitable, Any
from dataclasses import dataclass
# Core types used across the API
Servers = Union[str, List[str]]
Handler = Callable[[Msg], Awaitable[None]]
Headers = Optional[Dict[str, str]]
# Callback types
Callback = Callable[[], Awaitable[None]]
ErrorCallback = Callable[[Exception], Awaitable[None]]
JWTCallback = Callable[[], Union[bytearray, bytes]]
SignatureCallback = Callable[[str], bytes]