CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-nats-py

An asyncio Python client for the NATS messaging system with JetStream, microservices, and key-value store support

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

index.mddocs/

NATS Python Client

An asyncio Python client for the NATS messaging system with comprehensive support for JetStream persistent messaging, microservices framework, key-value store, object store, and high-performance real-time messaging patterns.

Package Information

  • Package Name: nats-py
  • Language: Python
  • Installation: pip install nats-py
  • Minimum Python Version: 3.7+

Core Imports

import nats

Common for working with the client:

from nats.aio.client import Client as NATS

JetStream functionality:

# JetStream is accessed via the client's jetstream() method
nc = await nats.connect()
js = nc.jetstream()

Microservices functionality:

from nats.micro import add_service

Error handling:

from nats.errors import ConnectionClosedError, TimeoutError

Basic Usage

import asyncio
import nats

async def main():
    # Connect to NATS server
    nc = await nats.connect("nats://localhost:4222")
    
    # Simple publish
    await nc.publish("foo", b"Hello World!")
    
    # Simple subscribe
    async def message_handler(msg):
        print(f"Received: {msg.data.decode()}")
        
    await nc.subscribe("foo", cb=message_handler)
    
    # Request-reply pattern
    response = await nc.request("help", b"help me", timeout=1.0)
    print(f"Response: {response.data.decode()}")
    
    # Cleanup
    await nc.drain()
    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())

Architecture

NATS Python client provides a layered architecture for scalable messaging:

  • Core Client: Connection management, publish/subscribe, request/reply messaging
  • JetStream: Stream processing layer for persistent messaging, key-value store, object store
  • Microservices: Service discovery, load balancing, monitoring framework
  • Transport Layer: TCP, WebSocket, TLS support with automatic reconnection
  • Protocol Layer: NATS wire protocol implementation with high-performance parsing

This design enables the client to serve as the foundation for distributed applications requiring reliable message delivery, real-time communication, and persistent data storage.

Capabilities

Core NATS Client

Essential connection management and messaging functionality including connect/disconnect, publish/subscribe, request/reply patterns, subscription management, and connection lifecycle handling.

async def connect(servers=["nats://localhost:4222"], **options) -> NATS: ...

class NATS:
    async def publish(self, subject: str, payload: bytes = b"", reply: str = "", headers: dict = None) -> None: ...
    async def subscribe(self, subject: str, queue: str = "", cb: callable = None, **kwargs) -> Subscription: ...
    async def request(self, subject: str, payload: bytes = b"", timeout: float = 0.5, headers: dict = None) -> Msg: ...
    async def close(self) -> None: ...
    async def drain(self) -> None: ...

Core NATS Client

JetStream Stream Processing

Persistent messaging with streams, consumers, message acknowledgments, delivery guarantees, and advanced features like ordered consumers and pull subscriptions for building resilient applications.

class JetStreamContext:
    async def publish(self, subject: str, payload: bytes = b"", timeout: float = None, stream: str = None, headers: dict = None) -> PubAck: ...
    async def subscribe(self, subject: str, durable: str = None, config: ConsumerConfig = None, **kwargs) -> JetStreamSubscription: ...
    async def pull_subscribe(self, subject: str, durable: str = None, config: ConsumerConfig = None, **kwargs) -> PullSubscription: ...

JetStream

JetStream Management

Administrative APIs for creating and managing streams, consumers, accounts with comprehensive configuration options, monitoring capabilities, and cluster management.

class JetStreamManager:
    async def add_stream(self, config: StreamConfig = None, **params) -> StreamInfo: ...
    async def add_consumer(self, stream: str, config: ConsumerConfig = None, **params) -> ConsumerInfo: ...
    async def account_info(self) -> AccountInfo: ...
    async def delete_stream(self, name: str) -> bool: ...

JetStream Management

Key-Value Store

Distributed key-value storage built on JetStream streams with atomic operations, conditional updates, history tracking, and watch capabilities for building stateful applications.

class KeyValue:
    async def get(self, key: str, revision: int = None) -> Entry: ...
    async def put(self, key: str, value: bytes, revision: int = None) -> int: ...
    async def delete(self, key: str, revision: int = None) -> bool: ...
    async def watch(self, key: str, **kwargs) -> AsyncIterator[Entry]: ...

Key-Value Store

Object Store

Scalable object storage with metadata, chunking for large objects, content addressing, and efficient streaming for storing and retrieving binary data and files.

class ObjectStore:
    async def put(self, name: str, data: bytes, **kwargs) -> ObjectInfo: ...
    async def get(self, name: str) -> bytes: ...
    async def get_info(self, name: str, show_deleted: bool = False) -> ObjectInfo: ...
    async def delete(self, name: str) -> bool: ...

Object Store

Microservices Framework

Service discovery, request routing, monitoring, and health checks with built-in load balancing, error handling, and comprehensive service information for building distributed microservice architectures.

async def add_service(nc: NATS, config: ServiceConfig = None, **kwargs) -> Service: ...

class Service:
    async def start(self) -> None: ...
    async def stop(self) -> None: ...
    async def add_endpoint(self, config: EndpointConfig) -> Endpoint: ...

Microservices

Message Handling

Message processing with headers, reply handling, JetStream acknowledgments, and comprehensive metadata access for building sophisticated message-driven applications.

class Msg:
    subject: str
    data: bytes
    reply: str
    headers: dict
    
    async def respond(self, data: bytes) -> None: ...
    async def ack(self) -> None: ...
    async def nak(self, delay: float = None) -> None: ...

Message Handling

Error Handling

Comprehensive exception hierarchy covering connection, protocol, JetStream, key-value, object store, and microservice errors with specific error types for precise error handling.

class Error(Exception): ...
class ConnectionClosedError(Error): ...
class TimeoutError(Error): ...
class NoRespondersError(Error): ...
# ... additional error types

Error Handling

Types

from typing import Union, List, Dict, Optional, Callable, AsyncIterator, Awaitable, Any
from dataclasses import dataclass

# Core types used across the API
Servers = Union[str, List[str]]
Handler = Callable[[Msg], Awaitable[None]]
Headers = Optional[Dict[str, str]]

# Callback types
Callback = Callable[[], Awaitable[None]]
ErrorCallback = Callable[[Exception], Awaitable[None]]
JWTCallback = Callable[[], Union[bytearray, bytes]]
SignatureCallback = Callable[[str], bytes]

docs

core-client.md

error-handling.md

index.md

jetstream-management.md

jetstream.md

key-value-store.md

message-handling.md

microservices.md

object-store.md

tile.json