0
# NATS Python Client
1
2
An asyncio Python client for the NATS messaging system with comprehensive support for JetStream persistent messaging, microservices framework, key-value store, object store, and high-performance real-time messaging patterns.
3
4
## Package Information
5
6
- **Package Name**: nats-py
7
- **Language**: Python
8
- **Installation**: `pip install nats-py`
9
- **Minimum Python Version**: 3.7+
10
11
## Core Imports
12
13
```python
14
import nats
15
```
16
17
Common for working with the client:
18
19
```python
20
from nats.aio.client import Client as NATS
21
```
22
23
JetStream functionality:
24
25
```python
26
# JetStream is accessed via the client's jetstream() method
27
nc = await nats.connect()
28
js = nc.jetstream()
29
```
30
31
Microservices functionality:
32
33
```python
34
from nats.micro import add_service
35
```
36
37
Error handling:
38
39
```python
40
from nats.errors import ConnectionClosedError, TimeoutError
41
```
42
43
## Basic Usage
44
45
```python
46
import asyncio
47
import nats
48
49
async def main():
50
# Connect to NATS server
51
nc = await nats.connect("nats://localhost:4222")
52
53
# Simple publish
54
await nc.publish("foo", b"Hello World!")
55
56
# Simple subscribe
57
async def message_handler(msg):
58
print(f"Received: {msg.data.decode()}")
59
60
await nc.subscribe("foo", cb=message_handler)
61
62
# Request-reply pattern
63
response = await nc.request("help", b"help me", timeout=1.0)
64
print(f"Response: {response.data.decode()}")
65
66
# Cleanup
67
await nc.drain()
68
await nc.close()
69
70
if __name__ == '__main__':
71
asyncio.run(main())
72
```
73
74
## Architecture
75
76
NATS Python client provides a layered architecture for scalable messaging:
77
78
- **Core Client**: Connection management, publish/subscribe, request/reply messaging
79
- **JetStream**: Stream processing layer for persistent messaging, key-value store, object store
80
- **Microservices**: Service discovery, load balancing, monitoring framework
81
- **Transport Layer**: TCP, WebSocket, TLS support with automatic reconnection
82
- **Protocol Layer**: NATS wire protocol implementation with high-performance parsing
83
84
This design enables the client to serve as the foundation for distributed applications requiring reliable message delivery, real-time communication, and persistent data storage.
85
86
## Capabilities
87
88
### Core NATS Client
89
90
Essential connection management and messaging functionality including connect/disconnect, publish/subscribe, request/reply patterns, subscription management, and connection lifecycle handling.
91
92
```python { .api }
93
async def connect(servers=["nats://localhost:4222"], **options) -> NATS: ...
94
95
class NATS:
96
async def publish(self, subject: str, payload: bytes = b"", reply: str = "", headers: dict = None) -> None: ...
97
async def subscribe(self, subject: str, queue: str = "", cb: callable = None, **kwargs) -> Subscription: ...
98
async def request(self, subject: str, payload: bytes = b"", timeout: float = 0.5, headers: dict = None) -> Msg: ...
99
async def close(self) -> None: ...
100
async def drain(self) -> None: ...
101
```
102
103
[Core NATS Client](./core-client.md)
104
105
### JetStream Stream Processing
106
107
Persistent messaging with streams, consumers, message acknowledgments, delivery guarantees, and advanced features like ordered consumers and pull subscriptions for building resilient applications.
108
109
```python { .api }
110
class JetStreamContext:
111
async def publish(self, subject: str, payload: bytes = b"", timeout: float = None, stream: str = None, headers: dict = None) -> PubAck: ...
112
async def subscribe(self, subject: str, durable: str = None, config: ConsumerConfig = None, **kwargs) -> JetStreamSubscription: ...
113
async def pull_subscribe(self, subject: str, durable: str = None, config: ConsumerConfig = None, **kwargs) -> PullSubscription: ...
114
```
115
116
[JetStream](./jetstream.md)
117
118
### JetStream Management
119
120
Administrative APIs for creating and managing streams, consumers, accounts with comprehensive configuration options, monitoring capabilities, and cluster management.
121
122
```python { .api }
123
class JetStreamManager:
124
async def add_stream(self, config: StreamConfig = None, **params) -> StreamInfo: ...
125
async def add_consumer(self, stream: str, config: ConsumerConfig = None, **params) -> ConsumerInfo: ...
126
async def account_info(self) -> AccountInfo: ...
127
async def delete_stream(self, name: str) -> bool: ...
128
```
129
130
[JetStream Management](./jetstream-management.md)
131
132
### Key-Value Store
133
134
Distributed key-value storage built on JetStream streams with atomic operations, conditional updates, history tracking, and watch capabilities for building stateful applications.
135
136
```python { .api }
137
class KeyValue:
138
async def get(self, key: str, revision: int = None) -> Entry: ...
139
async def put(self, key: str, value: bytes, revision: int = None) -> int: ...
140
async def delete(self, key: str, revision: int = None) -> bool: ...
141
async def watch(self, key: str, **kwargs) -> AsyncIterator[Entry]: ...
142
```
143
144
[Key-Value Store](./key-value-store.md)
145
146
### Object Store
147
148
Scalable object storage with metadata, chunking for large objects, content addressing, and efficient streaming for storing and retrieving binary data and files.
149
150
```python { .api }
151
class ObjectStore:
152
async def put(self, name: str, data: bytes, **kwargs) -> ObjectInfo: ...
153
async def get(self, name: str) -> bytes: ...
154
async def get_info(self, name: str, show_deleted: bool = False) -> ObjectInfo: ...
155
async def delete(self, name: str) -> bool: ...
156
```
157
158
[Object Store](./object-store.md)
159
160
### Microservices Framework
161
162
Service discovery, request routing, monitoring, and health checks with built-in load balancing, error handling, and comprehensive service information for building distributed microservice architectures.
163
164
```python { .api }
165
async def add_service(nc: NATS, config: ServiceConfig = None, **kwargs) -> Service: ...
166
167
class Service:
168
async def start(self) -> None: ...
169
async def stop(self) -> None: ...
170
async def add_endpoint(self, config: EndpointConfig) -> Endpoint: ...
171
```
172
173
[Microservices](./microservices.md)
174
175
### Message Handling
176
177
Message processing with headers, reply handling, JetStream acknowledgments, and comprehensive metadata access for building sophisticated message-driven applications.
178
179
```python { .api }
180
class Msg:
181
subject: str
182
data: bytes
183
reply: str
184
headers: dict
185
186
async def respond(self, data: bytes) -> None: ...
187
async def ack(self) -> None: ...
188
async def nak(self, delay: float = None) -> None: ...
189
```
190
191
[Message Handling](./message-handling.md)
192
193
### Error Handling
194
195
Comprehensive exception hierarchy covering connection, protocol, JetStream, key-value, object store, and microservice errors with specific error types for precise error handling.
196
197
```python { .api }
198
class Error(Exception): ...
199
class ConnectionClosedError(Error): ...
200
class TimeoutError(Error): ...
201
class NoRespondersError(Error): ...
202
# ... additional error types
203
```
204
205
[Error Handling](./error-handling.md)
206
207
## Types
208
209
```python { .api }
210
from typing import Union, List, Dict, Optional, Callable, AsyncIterator, Awaitable, Any
211
from dataclasses import dataclass
212
213
# Core types used across the API
214
Servers = Union[str, List[str]]
215
Handler = Callable[[Msg], Awaitable[None]]
216
Headers = Optional[Dict[str, str]]
217
218
# Callback types
219
Callback = Callable[[], Awaitable[None]]
220
ErrorCallback = Callable[[Exception], Awaitable[None]]
221
JWTCallback = Callable[[], Union[bytearray, bytes]]
222
SignatureCallback = Callable[[str], bytes]
223
```