0
# Core NATS Client
1
2
Essential connection management and messaging functionality for the NATS Python client. Provides connection lifecycle management, publish/subscribe messaging, request/reply patterns, and subscription handling.
3
4
## Capabilities
5
6
### Connection Management
7
8
Establish and manage connections to NATS servers with support for clustering, authentication, TLS, and automatic reconnection.
9
10
```python { .api }
11
async def connect(
12
servers: Union[str, List[str]] = ["nats://localhost:4222"],
13
error_cb: Optional[ErrorCallback] = None,
14
disconnected_cb: Optional[Callback] = None,
15
closed_cb: Optional[Callback] = None,
16
discovered_server_cb: Optional[Callback] = None,
17
reconnected_cb: Optional[Callback] = None,
18
name: Optional[str] = None,
19
pedantic: bool = False,
20
verbose: bool = False,
21
allow_reconnect: bool = True,
22
connect_timeout: int = 2,
23
reconnect_time_wait: int = 2,
24
max_reconnect_attempts: int = 60,
25
ping_interval: int = 120,
26
max_outstanding_pings: int = 2,
27
dont_randomize: bool = False,
28
flusher_queue_size: int = 1024,
29
no_echo: bool = False,
30
tls: Optional[ssl.SSLContext] = None,
31
tls_hostname: Optional[str] = None,
32
tls_handshake_first: bool = False,
33
user: Optional[str] = None,
34
password: Optional[str] = None,
35
token: Optional[str] = None,
36
drain_timeout: int = 30,
37
signature_cb: Optional[SignatureCallback] = None,
38
user_jwt_cb: Optional[JWTCallback] = None,
39
user_credentials: Optional[Credentials] = None,
40
nkeys_seed: Optional[str] = None,
41
nkeys_seed_str: Optional[str] = None,
42
inbox_prefix: Union[str, bytes] = b"_INBOX",
43
pending_size: int = 2 * 1024 * 1024,
44
flush_timeout: Optional[float] = None
45
) -> NATS:
46
"""
47
Connect to NATS server(s).
48
49
Parameters:
50
- servers: Server URLs to connect to
51
- error_cb: Callback for error events
52
- disconnected_cb: Callback for disconnection events
53
- closed_cb: Callback for connection closed events
54
- discovered_server_cb: Callback for server discovery events
55
- reconnected_cb: Callback for reconnection events
56
- name: Client name for identification
57
- pedantic: Enable pedantic protocol checking
58
- verbose: Enable verbose protocol logging
59
- allow_reconnect: Enable automatic reconnection
60
- connect_timeout: Connection timeout in seconds
61
- reconnect_time_wait: Wait time between reconnection attempts
62
- max_reconnect_attempts: Maximum reconnection attempts
63
- ping_interval: Ping interval in seconds
64
- max_outstanding_pings: Maximum outstanding pings
65
- dont_randomize: Don't randomize server connection order
66
- flusher_queue_size: Maximum flusher queue size
67
- no_echo: Disable message echo from server
68
- tls: SSL context for TLS connections
69
- tls_hostname: Hostname for TLS verification
70
- tls_handshake_first: Perform TLS handshake before INFO
71
- user: Username for authentication
72
- password: Password for authentication
73
- token: Token for authentication
74
- drain_timeout: Drain timeout in seconds
75
- signature_cb: Callback for message signing
76
- user_jwt_cb: Callback for JWT authentication
77
- user_credentials: Path to user credentials file
78
- nkeys_seed: NKEYS seed for authentication
79
- nkeys_seed_str: NKEYS seed string for authentication
80
- inbox_prefix: Prefix for inbox subjects
81
- pending_size: Maximum pending data size
82
- flush_timeout: Flush timeout in seconds
83
84
Returns:
85
Connected NATS client instance
86
"""
87
```
88
89
#### Usage Examples
90
91
```python
92
import asyncio
93
import nats
94
import ssl
95
96
# Basic connection
97
nc = await nats.connect()
98
99
# Multiple servers with clustering
100
nc = await nats.connect([
101
"nats://server1:4222",
102
"nats://server2:4222",
103
"nats://server3:4222"
104
])
105
106
# Authenticated connection
107
nc = await nats.connect(
108
servers=["nats://demo.nats.io:4222"],
109
user="myuser",
110
password="mypass"
111
)
112
113
# TLS connection
114
ssl_ctx = ssl.create_default_context()
115
nc = await nats.connect(
116
servers=["tls://demo.nats.io:4443"],
117
tls=ssl_ctx
118
)
119
120
# With credentials file
121
nc = await nats.connect(
122
servers=["nats://connect.ngs.global"],
123
user_credentials="/path/to/user.creds"
124
)
125
```
126
127
### Connection Lifecycle
128
129
Manage connection state and gracefully close connections.
130
131
```python { .api }
132
class NATS:
133
# Connection state constants
134
DISCONNECTED = 0
135
CONNECTED = 1
136
CLOSED = 2
137
RECONNECTING = 3
138
CONNECTING = 4
139
DRAINING_SUBS = 5
140
DRAINING_PUBS = 6
141
142
async def close(self) -> None:
143
"""Close the connection immediately."""
144
145
async def drain(self) -> None:
146
"""
147
Drain and close connection gracefully.
148
Stops accepting new messages and closes after processing pending.
149
"""
150
151
def is_connected(self) -> bool:
152
"""Check if client is connected to server."""
153
154
def is_closed(self) -> bool:
155
"""Check if client connection is closed."""
156
157
def is_reconnecting(self) -> bool:
158
"""Check if client is in reconnecting state."""
159
160
def is_connecting(self) -> bool:
161
"""Check if client is in connecting state."""
162
163
def is_draining(self) -> bool:
164
"""Check if client is draining subscriptions."""
165
166
def is_draining_pubs(self) -> bool:
167
"""Check if client is draining publications."""
168
```
169
170
### Publishing
171
172
Send messages to subjects with optional reply subjects and headers.
173
174
```python { .api }
175
class NATS:
176
async def publish(
177
self,
178
subject: str,
179
payload: bytes = b"",
180
reply: str = "",
181
headers: Optional[Dict[str, str]] = None
182
) -> None:
183
"""
184
Publish message to subject.
185
186
Parameters:
187
- subject: Target subject
188
- payload: Message data
189
- reply: Reply subject for responses
190
- headers: Message headers
191
"""
192
193
async def flush(self, timeout: float = 10.0) -> None:
194
"""
195
Flush pending messages to server.
196
197
Parameters:
198
- timeout: Flush timeout in seconds
199
"""
200
201
def pending_data_size(self) -> int:
202
"""Get size of pending outbound data in bytes."""
203
```
204
205
#### Usage Examples
206
207
```python
208
# Simple publish
209
await nc.publish("events.user.created", b'{"user_id": 123}')
210
211
# Publish with reply subject
212
await nc.publish("events.notify", b"Alert!", reply="responses.alerts")
213
214
# Publish with headers
215
headers = {"Content-Type": "application/json", "User-ID": "123"}
216
await nc.publish("api.requests", b'{"action": "create"}', headers=headers)
217
218
# Ensure delivery
219
await nc.publish("critical.event", b"Important data")
220
await nc.flush() # Wait for server acknowledgment
221
```
222
223
### Subscribing
224
225
Subscribe to subjects with callback handlers or async iteration.
226
227
```python { .api }
228
class NATS:
229
async def subscribe(
230
self,
231
subject: str,
232
queue: str = "",
233
cb: Optional[Callable[[Msg], Awaitable[None]]] = None,
234
future: Optional[asyncio.Future] = None,
235
max_msgs: int = 0,
236
pending_msgs_limit: int = 65536,
237
pending_bytes_limit: int = 67108864
238
) -> Subscription:
239
"""
240
Subscribe to subject.
241
242
Parameters:
243
- subject: Subject pattern to subscribe to
244
- queue: Queue group for load balancing
245
- cb: Message callback handler
246
- future: Future to complete on first message
247
- max_msgs: Maximum messages (0 = unlimited)
248
- pending_msgs_limit: Maximum pending messages
249
- pending_bytes_limit: Maximum pending bytes
250
251
Returns:
252
Subscription object
253
"""
254
255
def new_inbox(self) -> str:
256
"""Generate unique inbox subject for replies."""
257
```
258
259
#### Usage Examples
260
261
```python
262
# Callback-based subscription
263
async def message_handler(msg):
264
print(f"Received on {msg.subject}: {msg.data.decode()}")
265
266
sub = await nc.subscribe("events.*", cb=message_handler)
267
268
# Queue group subscription for load balancing
269
await nc.subscribe("work.queue", queue="workers", cb=process_work)
270
271
# Async iteration subscription
272
sub = await nc.subscribe("notifications")
273
async for msg in sub.messages():
274
await handle_notification(msg)
275
if some_condition:
276
break
277
278
# One-time subscription
279
future = asyncio.Future()
280
await nc.subscribe("single.event", future=future, max_msgs=1)
281
msg = await future
282
```
283
284
### Request-Reply
285
286
Send requests and receive responses with timeout handling.
287
288
```python { .api }
289
class NATS:
290
async def request(
291
self,
292
subject: str,
293
payload: bytes = b"",
294
timeout: float = 0.5,
295
old_style: bool = False,
296
headers: Optional[Dict[str, Any]] = None
297
) -> Msg:
298
"""
299
Send request and wait for response.
300
301
Parameters:
302
- subject: Request subject
303
- payload: Request data
304
- timeout: Response timeout in seconds
305
- old_style: Use old-style request format
306
- headers: Request headers
307
308
Returns:
309
Response message
310
311
Raises:
312
- TimeoutError: No response within timeout
313
- NoRespondersError: No services listening
314
"""
315
```
316
317
#### Usage Examples
318
319
```python
320
# Simple request-reply
321
try:
322
response = await nc.request("api.users.get", b'{"id": 123}', timeout=2.0)
323
user_data = response.data.decode()
324
print(f"User: {user_data}")
325
except TimeoutError:
326
print("Request timed out")
327
except NoRespondersError:
328
print("No service available")
329
330
# Request with headers
331
headers = {"Authorization": "Bearer token123"}
332
response = await nc.request(
333
"secure.api.data",
334
b'{"query": "SELECT * FROM users"}',
335
headers=headers,
336
timeout=5.0
337
)
338
```
339
340
### Server Information
341
342
Access server and connection information.
343
344
```python { .api }
345
class NATS:
346
def connected_url(self) -> str:
347
"""Get currently connected server URL."""
348
349
def servers(self) -> List[str]:
350
"""Get list of configured servers."""
351
352
def discovered_servers(self) -> List[str]:
353
"""Get list of servers discovered from cluster."""
354
355
def max_payload(self) -> int:
356
"""Get maximum payload size supported by server."""
357
358
def client_id(self) -> int:
359
"""Get unique client ID assigned by server."""
360
361
def connected_server_version(self) -> str:
362
"""Get version of connected server."""
363
364
def last_error(self) -> Exception:
365
"""Get last error encountered."""
366
```
367
368
### JetStream Integration
369
370
Access JetStream functionality from the core client.
371
372
```python { .api }
373
class NATS:
374
def jetstream(self, **opts) -> JetStreamContext:
375
"""
376
Get JetStream context for stream operations.
377
378
Parameters:
379
- prefix: Custom JetStream API prefix
380
- domain: JetStream domain
381
- timeout: Default operation timeout
382
383
Returns:
384
JetStream context instance
385
"""
386
387
def jsm(self, **opts) -> JetStreamManager:
388
"""
389
Get JetStream manager for administrative operations.
390
391
Parameters:
392
- prefix: Custom JetStream API prefix
393
- domain: JetStream domain
394
- timeout: Default operation timeout
395
396
Returns:
397
JetStream manager instance
398
"""
399
```
400
401
## Types
402
403
```python { .api }
404
from typing import Union, List, Dict, Optional, Callable, AsyncIterator
405
import ssl
406
import asyncio
407
408
# Connection types
409
Servers = Union[str, List[str]]
410
ConnectOptions = Dict[str, Union[str, int, bool, Callable]]
411
SSLContext = ssl.SSLContext
412
413
# Callback types
414
ErrorCallback = Callable[[Exception], None]
415
ClosedCallback = Callable[[], None]
416
DisconnectedCallback = Callable[[], None]
417
ReconnectedCallback = Callable[[], None]
418
MessageCallback = Callable[[Msg], None]
419
SignatureCallback = Callable[[str], bytes]
420
UserJWTCallback = Callable[[], Tuple[str, str]]
421
422
# Message types
423
Headers = Optional[Dict[str, str]]
424
Payload = bytes
425
Subject = str
426
```