An asyncio Python client for the NATS messaging system with JetStream, microservices, and key-value store support
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
An asyncio Python client for the NATS messaging system with comprehensive support for JetStream persistent messaging, microservices framework, key-value store, object store, and high-performance real-time messaging patterns.
pip install nats-pyimport natsCommon for working with the client:
from nats.aio.client import Client as NATSJetStream functionality:
# JetStream is accessed via the client's jetstream() method
nc = await nats.connect()
js = nc.jetstream()Microservices functionality:
from nats.micro import add_serviceError handling:
from nats.errors import ConnectionClosedError, TimeoutErrorimport asyncio
import nats
async def main():
# Connect to NATS server
nc = await nats.connect("nats://localhost:4222")
# Simple publish
await nc.publish("foo", b"Hello World!")
# Simple subscribe
async def message_handler(msg):
print(f"Received: {msg.data.decode()}")
await nc.subscribe("foo", cb=message_handler)
# Request-reply pattern
response = await nc.request("help", b"help me", timeout=1.0)
print(f"Response: {response.data.decode()}")
# Cleanup
await nc.drain()
await nc.close()
if __name__ == '__main__':
asyncio.run(main())NATS Python client provides a layered architecture for scalable messaging:
This design enables the client to serve as the foundation for distributed applications requiring reliable message delivery, real-time communication, and persistent data storage.
Essential connection management and messaging functionality including connect/disconnect, publish/subscribe, request/reply patterns, subscription management, and connection lifecycle handling.
async def connect(servers=["nats://localhost:4222"], **options) -> NATS: ...
class NATS:
async def publish(self, subject: str, payload: bytes = b"", reply: str = "", headers: dict = None) -> None: ...
async def subscribe(self, subject: str, queue: str = "", cb: callable = None, **kwargs) -> Subscription: ...
async def request(self, subject: str, payload: bytes = b"", timeout: float = 0.5, headers: dict = None) -> Msg: ...
async def close(self) -> None: ...
async def drain(self) -> None: ...Persistent messaging with streams, consumers, message acknowledgments, delivery guarantees, and advanced features like ordered consumers and pull subscriptions for building resilient applications.
class JetStreamContext:
async def publish(self, subject: str, payload: bytes = b"", timeout: float = None, stream: str = None, headers: dict = None) -> PubAck: ...
async def subscribe(self, subject: str, durable: str = None, config: ConsumerConfig = None, **kwargs) -> JetStreamSubscription: ...
async def pull_subscribe(self, subject: str, durable: str = None, config: ConsumerConfig = None, **kwargs) -> PullSubscription: ...Administrative APIs for creating and managing streams, consumers, accounts with comprehensive configuration options, monitoring capabilities, and cluster management.
class JetStreamManager:
async def add_stream(self, config: StreamConfig = None, **params) -> StreamInfo: ...
async def add_consumer(self, stream: str, config: ConsumerConfig = None, **params) -> ConsumerInfo: ...
async def account_info(self) -> AccountInfo: ...
async def delete_stream(self, name: str) -> bool: ...Distributed key-value storage built on JetStream streams with atomic operations, conditional updates, history tracking, and watch capabilities for building stateful applications.
class KeyValue:
async def get(self, key: str, revision: int = None) -> Entry: ...
async def put(self, key: str, value: bytes, revision: int = None) -> int: ...
async def delete(self, key: str, revision: int = None) -> bool: ...
async def watch(self, key: str, **kwargs) -> AsyncIterator[Entry]: ...Scalable object storage with metadata, chunking for large objects, content addressing, and efficient streaming for storing and retrieving binary data and files.
class ObjectStore:
async def put(self, name: str, data: bytes, **kwargs) -> ObjectInfo: ...
async def get(self, name: str) -> bytes: ...
async def get_info(self, name: str, show_deleted: bool = False) -> ObjectInfo: ...
async def delete(self, name: str) -> bool: ...Service discovery, request routing, monitoring, and health checks with built-in load balancing, error handling, and comprehensive service information for building distributed microservice architectures.
async def add_service(nc: NATS, config: ServiceConfig = None, **kwargs) -> Service: ...
class Service:
async def start(self) -> None: ...
async def stop(self) -> None: ...
async def add_endpoint(self, config: EndpointConfig) -> Endpoint: ...Message processing with headers, reply handling, JetStream acknowledgments, and comprehensive metadata access for building sophisticated message-driven applications.
class Msg:
subject: str
data: bytes
reply: str
headers: dict
async def respond(self, data: bytes) -> None: ...
async def ack(self) -> None: ...
async def nak(self, delay: float = None) -> None: ...Comprehensive exception hierarchy covering connection, protocol, JetStream, key-value, object store, and microservice errors with specific error types for precise error handling.
class Error(Exception): ...
class ConnectionClosedError(Error): ...
class TimeoutError(Error): ...
class NoRespondersError(Error): ...
# ... additional error typesfrom typing import Union, List, Dict, Optional, Callable, AsyncIterator, Awaitable, Any
from dataclasses import dataclass
# Core types used across the API
Servers = Union[str, List[str]]
Handler = Callable[[Msg], Awaitable[None]]
Headers = Optional[Dict[str, str]]
# Callback types
Callback = Callable[[], Awaitable[None]]
ErrorCallback = Callable[[Exception], Awaitable[None]]
JWTCallback = Callable[[], Union[bytearray, bytes]]
SignatureCallback = Callable[[str], bytes]