An asyncio Python client for the NATS messaging system with JetStream, microservices, and key-value store support
—
Essential connection management and messaging functionality for the NATS Python client. Provides connection lifecycle management, publish/subscribe messaging, request/reply patterns, and subscription handling.
Establish and manage connections to NATS servers with support for clustering, authentication, TLS, and automatic reconnection.
async def connect(
servers: Union[str, List[str]] = ["nats://localhost:4222"],
error_cb: Optional[ErrorCallback] = None,
disconnected_cb: Optional[Callback] = None,
closed_cb: Optional[Callback] = None,
discovered_server_cb: Optional[Callback] = None,
reconnected_cb: Optional[Callback] = None,
name: Optional[str] = None,
pedantic: bool = False,
verbose: bool = False,
allow_reconnect: bool = True,
connect_timeout: int = 2,
reconnect_time_wait: int = 2,
max_reconnect_attempts: int = 60,
ping_interval: int = 120,
max_outstanding_pings: int = 2,
dont_randomize: bool = False,
flusher_queue_size: int = 1024,
no_echo: bool = False,
tls: Optional[ssl.SSLContext] = None,
tls_hostname: Optional[str] = None,
tls_handshake_first: bool = False,
user: Optional[str] = None,
password: Optional[str] = None,
token: Optional[str] = None,
drain_timeout: int = 30,
signature_cb: Optional[SignatureCallback] = None,
user_jwt_cb: Optional[JWTCallback] = None,
user_credentials: Optional[Credentials] = None,
nkeys_seed: Optional[str] = None,
nkeys_seed_str: Optional[str] = None,
inbox_prefix: Union[str, bytes] = b"_INBOX",
pending_size: int = 2 * 1024 * 1024,
flush_timeout: Optional[float] = None
) -> NATS:
"""
Connect to NATS server(s).
Parameters:
- servers: Server URLs to connect to
- error_cb: Callback for error events
- disconnected_cb: Callback for disconnection events
- closed_cb: Callback for connection closed events
- discovered_server_cb: Callback for server discovery events
- reconnected_cb: Callback for reconnection events
- name: Client name for identification
- pedantic: Enable pedantic protocol checking
- verbose: Enable verbose protocol logging
- allow_reconnect: Enable automatic reconnection
- connect_timeout: Connection timeout in seconds
- reconnect_time_wait: Wait time between reconnection attempts
- max_reconnect_attempts: Maximum reconnection attempts
- ping_interval: Ping interval in seconds
- max_outstanding_pings: Maximum outstanding pings
- dont_randomize: Don't randomize server connection order
- flusher_queue_size: Maximum flusher queue size
- no_echo: Disable message echo from server
- tls: SSL context for TLS connections
- tls_hostname: Hostname for TLS verification
- tls_handshake_first: Perform TLS handshake before INFO
- user: Username for authentication
- password: Password for authentication
- token: Token for authentication
- drain_timeout: Drain timeout in seconds
- signature_cb: Callback for message signing
- user_jwt_cb: Callback for JWT authentication
- user_credentials: Path to user credentials file
- nkeys_seed: NKEYS seed for authentication
- nkeys_seed_str: NKEYS seed string for authentication
- inbox_prefix: Prefix for inbox subjects
- pending_size: Maximum pending data size
- flush_timeout: Flush timeout in seconds
Returns:
Connected NATS client instance
"""import asyncio
import nats
import ssl
# Basic connection
nc = await nats.connect()
# Multiple servers with clustering
nc = await nats.connect([
"nats://server1:4222",
"nats://server2:4222",
"nats://server3:4222"
])
# Authenticated connection
nc = await nats.connect(
servers=["nats://demo.nats.io:4222"],
user="myuser",
password="mypass"
)
# TLS connection
ssl_ctx = ssl.create_default_context()
nc = await nats.connect(
servers=["tls://demo.nats.io:4443"],
tls=ssl_ctx
)
# With credentials file
nc = await nats.connect(
servers=["nats://connect.ngs.global"],
user_credentials="/path/to/user.creds"
)Manage connection state and gracefully close connections.
class NATS:
# Connection state constants
DISCONNECTED = 0
CONNECTED = 1
CLOSED = 2
RECONNECTING = 3
CONNECTING = 4
DRAINING_SUBS = 5
DRAINING_PUBS = 6
async def close(self) -> None:
"""Close the connection immediately."""
async def drain(self) -> None:
"""
Drain and close connection gracefully.
Stops accepting new messages and closes after processing pending.
"""
def is_connected(self) -> bool:
"""Check if client is connected to server."""
def is_closed(self) -> bool:
"""Check if client connection is closed."""
def is_reconnecting(self) -> bool:
"""Check if client is in reconnecting state."""
def is_connecting(self) -> bool:
"""Check if client is in connecting state."""
def is_draining(self) -> bool:
"""Check if client is draining subscriptions."""
def is_draining_pubs(self) -> bool:
"""Check if client is draining publications."""Send messages to subjects with optional reply subjects and headers.
class NATS:
async def publish(
self,
subject: str,
payload: bytes = b"",
reply: str = "",
headers: Optional[Dict[str, str]] = None
) -> None:
"""
Publish message to subject.
Parameters:
- subject: Target subject
- payload: Message data
- reply: Reply subject for responses
- headers: Message headers
"""
async def flush(self, timeout: float = 10.0) -> None:
"""
Flush pending messages to server.
Parameters:
- timeout: Flush timeout in seconds
"""
def pending_data_size(self) -> int:
"""Get size of pending outbound data in bytes."""# Simple publish
await nc.publish("events.user.created", b'{"user_id": 123}')
# Publish with reply subject
await nc.publish("events.notify", b"Alert!", reply="responses.alerts")
# Publish with headers
headers = {"Content-Type": "application/json", "User-ID": "123"}
await nc.publish("api.requests", b'{"action": "create"}', headers=headers)
# Ensure delivery
await nc.publish("critical.event", b"Important data")
await nc.flush() # Wait for server acknowledgmentSubscribe to subjects with callback handlers or async iteration.
class NATS:
async def subscribe(
self,
subject: str,
queue: str = "",
cb: Optional[Callable[[Msg], Awaitable[None]]] = None,
future: Optional[asyncio.Future] = None,
max_msgs: int = 0,
pending_msgs_limit: int = 65536,
pending_bytes_limit: int = 67108864
) -> Subscription:
"""
Subscribe to subject.
Parameters:
- subject: Subject pattern to subscribe to
- queue: Queue group for load balancing
- cb: Message callback handler
- future: Future to complete on first message
- max_msgs: Maximum messages (0 = unlimited)
- pending_msgs_limit: Maximum pending messages
- pending_bytes_limit: Maximum pending bytes
Returns:
Subscription object
"""
def new_inbox(self) -> str:
"""Generate unique inbox subject for replies."""# Callback-based subscription
async def message_handler(msg):
print(f"Received on {msg.subject}: {msg.data.decode()}")
sub = await nc.subscribe("events.*", cb=message_handler)
# Queue group subscription for load balancing
await nc.subscribe("work.queue", queue="workers", cb=process_work)
# Async iteration subscription
sub = await nc.subscribe("notifications")
async for msg in sub.messages():
await handle_notification(msg)
if some_condition:
break
# One-time subscription
future = asyncio.Future()
await nc.subscribe("single.event", future=future, max_msgs=1)
msg = await futureSend requests and receive responses with timeout handling.
class NATS:
async def request(
self,
subject: str,
payload: bytes = b"",
timeout: float = 0.5,
old_style: bool = False,
headers: Optional[Dict[str, Any]] = None
) -> Msg:
"""
Send request and wait for response.
Parameters:
- subject: Request subject
- payload: Request data
- timeout: Response timeout in seconds
- old_style: Use old-style request format
- headers: Request headers
Returns:
Response message
Raises:
- TimeoutError: No response within timeout
- NoRespondersError: No services listening
"""# Simple request-reply
try:
response = await nc.request("api.users.get", b'{"id": 123}', timeout=2.0)
user_data = response.data.decode()
print(f"User: {user_data}")
except TimeoutError:
print("Request timed out")
except NoRespondersError:
print("No service available")
# Request with headers
headers = {"Authorization": "Bearer token123"}
response = await nc.request(
"secure.api.data",
b'{"query": "SELECT * FROM users"}',
headers=headers,
timeout=5.0
)Access server and connection information.
class NATS:
def connected_url(self) -> str:
"""Get currently connected server URL."""
def servers(self) -> List[str]:
"""Get list of configured servers."""
def discovered_servers(self) -> List[str]:
"""Get list of servers discovered from cluster."""
def max_payload(self) -> int:
"""Get maximum payload size supported by server."""
def client_id(self) -> int:
"""Get unique client ID assigned by server."""
def connected_server_version(self) -> str:
"""Get version of connected server."""
def last_error(self) -> Exception:
"""Get last error encountered."""Access JetStream functionality from the core client.
class NATS:
def jetstream(self, **opts) -> JetStreamContext:
"""
Get JetStream context for stream operations.
Parameters:
- prefix: Custom JetStream API prefix
- domain: JetStream domain
- timeout: Default operation timeout
Returns:
JetStream context instance
"""
def jsm(self, **opts) -> JetStreamManager:
"""
Get JetStream manager for administrative operations.
Parameters:
- prefix: Custom JetStream API prefix
- domain: JetStream domain
- timeout: Default operation timeout
Returns:
JetStream manager instance
"""from typing import Union, List, Dict, Optional, Callable, AsyncIterator
import ssl
import asyncio
# Connection types
Servers = Union[str, List[str]]
ConnectOptions = Dict[str, Union[str, int, bool, Callable]]
SSLContext = ssl.SSLContext
# Callback types
ErrorCallback = Callable[[Exception], None]
ClosedCallback = Callable[[], None]
DisconnectedCallback = Callable[[], None]
ReconnectedCallback = Callable[[], None]
MessageCallback = Callable[[Msg], None]
SignatureCallback = Callable[[str], bytes]
UserJWTCallback = Callable[[], Tuple[str, str]]
# Message types
Headers = Optional[Dict[str, str]]
Payload = bytes
Subject = strInstall with Tessl CLI
npx tessl i tessl/pypi-nats-py