An asyncio Python client for the NATS messaging system with JetStream, microservices, and key-value store support
—
JetStream provides persistent messaging capabilities built on NATS streams. It offers message storage, replay, delivery guarantees, and advanced consumption patterns for building resilient distributed applications.
Core JetStream functionality for publishing to streams and subscribing to consumers.
class JetStreamContext:
async def publish(
self,
subject: str,
payload: bytes = b"",
timeout: Optional[float] = None,
stream: Optional[str] = None,
headers: Optional[Dict[str, Any]] = None
) -> PubAck:
"""
Publish message to JetStream stream.
Parameters:
- subject: Stream subject
- payload: Message data
- timeout: Publish timeout
- stream: Target stream name (optional)
- headers: Message headers
Returns:
Publish acknowledgment with sequence info
"""
async def publish_async(
self,
subject: str,
payload: bytes = b"",
wait_stall: Optional[float] = None,
stream: Optional[str] = None,
headers: Optional[Dict] = None
) -> asyncio.Future[PubAck]:
"""
Publish message asynchronously without blocking.
Returns:
Future that resolves to publish acknowledgment
"""
def publish_async_pending(self) -> int:
"""Get count of pending async publishes."""
async def publish_async_completed(self) -> None:
"""Wait for all pending async publishes to complete."""import asyncio
import nats
async def main():
nc = await nats.connect()
js = nc.jetstream()
# Synchronous publish with acknowledgment
ack = await js.publish("events.user.login", b'{"user_id": 123, "timestamp": "2024-01-01T10:00:00Z"}')
print(f"Message stored at sequence {ack.seq}")
# Asynchronous publish for high throughput
future1 = await js.publish_async("metrics.cpu", b'{"usage": 75.5}')
future2 = await js.publish_async("metrics.memory", b'{"usage": 82.1}')
# Wait for specific acknowledgments
ack1 = await future1
ack2 = await future2
# Wait for all pending publishes
await js.publish_async_completed()Subscribe to JetStream messages with automatic delivery to callback handlers.
class JetStreamContext:
async def subscribe(
self,
subject: str,
queue: str = "",
cb: Callable[[Msg], None] = None,
durable: str = None,
stream: str = None,
config: ConsumerConfig = None,
manual_ack: bool = False,
ordered_consumer: bool = False,
idle_heartbeat: float = None,
flow_control: bool = False,
**kwargs
) -> JetStreamSubscription:
"""
Subscribe to JetStream stream with push delivery.
Parameters:
- subject: Subject pattern to subscribe to
- queue: Queue group for load balancing
- cb: Message callback handler
- durable: Durable consumer name
- stream: Source stream name
- config: Consumer configuration
- manual_ack: Require manual message acknowledgment
- ordered_consumer: Enable ordered message delivery
- idle_heartbeat: Heartbeat interval for flow control
- flow_control: Enable flow control
Returns:
JetStream subscription
"""
async def subscribe_bind(
self,
stream: str,
consumer: str,
**kwargs
) -> JetStreamSubscription:
"""
Bind to existing durable consumer.
Parameters:
- stream: Stream name
- consumer: Consumer name
Returns:
Bound JetStream subscription
"""# Simple JetStream subscription
async def handle_event(msg):
data = msg.data.decode()
print(f"Processing: {data}")
await msg.ack() # Acknowledge message
js_sub = await js.subscribe("events.>", cb=handle_event)
# Durable consumer subscription
await js.subscribe(
"orders.created",
durable="order-processor",
manual_ack=True,
cb=process_order
)
# Ordered consumer for sequential processing
await js.subscribe(
"audit.logs",
ordered_consumer=True,
cb=process_audit_log
)
# Queue group for load balancing
await js.subscribe(
"work.tasks",
queue="workers",
durable="task-worker",
cb=process_task
)Subscribe with manual message fetching for controlled consumption patterns.
class JetStreamContext:
async def pull_subscribe(
self,
subject: str,
durable: str = None,
stream: str = None,
config: ConsumerConfig = None,
**kwargs
) -> PullSubscription:
"""
Create pull-based subscription for manual message fetching.
Parameters:
- subject: Subject pattern to subscribe to
- durable: Durable consumer name
- stream: Source stream name
- config: Consumer configuration
Returns:
Pull subscription for manual fetching
"""
async def pull_subscribe_bind(
self,
stream: str,
consumer: str,
**kwargs
) -> PullSubscription:
"""
Bind pull subscription to existing consumer.
Parameters:
- stream: Stream name
- consumer: Consumer name
Returns:
Bound pull subscription
"""# Pull subscription with manual fetching
pull_sub = await js.pull_subscribe("batch.jobs", durable="job-processor")
# Fetch specific number of messages
msgs = await pull_sub.fetch(batch_size=10, timeout=5.0)
for msg in msgs:
await process_job(msg.data)
await msg.ack()
# Fetch with wait
msgs = await pull_sub.fetch(batch_size=5, timeout=30.0)
if msgs:
await process_batch(msgs)
# Continuous fetching loop
async for msg in pull_sub.messages():
try:
await process_message(msg.data)
await msg.ack()
except Exception as e:
print(f"Processing failed: {e}")
await msg.nak() # Negative acknowledgment for redeliveryHandle JetStream message acknowledgments with various strategies.
class Msg:
async def ack(self) -> None:
"""Acknowledge message successfully processed."""
async def ack_sync(self, timeout: float = 1.0) -> None:
"""Synchronously acknowledge message with timeout."""
async def nak(self, delay: float = None) -> None:
"""
Negative acknowledgment - message will be redelivered.
Parameters:
- delay: Delay before redelivery in seconds
"""
async def in_progress(self) -> None:
"""Extend acknowledgment deadline for longer processing."""
async def term(self) -> None:
"""Terminate message - no further redelivery."""async def message_handler(msg):
try:
# Long-running processing
await msg.in_progress() # Extend ack deadline
result = await complex_processing(msg.data)
if result.success:
await msg.ack() # Success
else:
await msg.nak(delay=30.0) # Retry after 30 seconds
except FatalError:
await msg.term() # Don't retry
except Exception:
await msg.nak() # Retry immediatelyHelper functions for JetStream message handling.
class JetStreamContext:
def is_status_msg(self, msg: Msg) -> bool:
"""
Check if message is a JetStream status message.
Parameters:
- msg: Message to check
Returns:
True if message is status message
"""from dataclasses import dataclass
from typing import Optional, List
from datetime import datetime, timedelta
@dataclass
class ConsumerConfig:
durable_name: Optional[str] = None
name: Optional[str] = None
description: Optional[str] = None
deliver_policy: str = "all" # "all", "last", "new", "by_start_sequence", "by_start_time"
opt_start_seq: Optional[int] = None
opt_start_time: Optional[datetime] = None
ack_policy: str = "explicit" # "none", "all", "explicit"
ack_wait: Optional[timedelta] = None
max_deliver: Optional[int] = None
filter_subject: Optional[str] = None
replay_policy: str = "instant" # "instant", "original"
rate_limit_bps: Optional[int] = None
sample_freq: Optional[str] = None
max_waiting: Optional[int] = None
max_ack_pending: Optional[int] = None
flow_control: bool = False
idle_heartbeat: Optional[timedelta] = None
headers_only: bool = False
max_request_batch: Optional[int] = None
max_request_expires: Optional[timedelta] = None
inactive_threshold: Optional[timedelta] = None@dataclass
class PubAck:
"""JetStream publish acknowledgment."""
stream: str
seq: int
duplicate: bool = False
domain: Optional[str] = None
class JetStreamSubscription:
"""JetStream push subscription."""
async def next_msg(self, timeout: float = 1.0) -> Msg:
"""Get next message with timeout."""
async def drain(self) -> None:
"""Drain subscription."""
def messages(self) -> AsyncIterator[Msg]:
"""Async iterator for messages."""
class PullSubscription:
"""JetStream pull subscription."""
async def fetch(
self,
batch_size: int = 1,
timeout: float = 5.0
) -> List[Msg]:
"""Fetch batch of messages."""
def messages(self) -> AsyncIterator[Msg]:
"""Async iterator for messages."""# JetStream API constants
DEFAULT_JS_SUB_PENDING_MSGS_LIMIT = 512 * 1024
DEFAULT_JS_SUB_PENDING_BYTES_LIMIT = 256 * 1024 * 1024
# Delivery policies
DELIVER_ALL = "all"
DELIVER_LAST = "last"
DELIVER_NEW = "new"
DELIVER_BY_START_SEQUENCE = "by_start_sequence"
DELIVER_BY_START_TIME = "by_start_time"
# Acknowledgment policies
ACK_NONE = "none"
ACK_ALL = "all"
ACK_EXPLICIT = "explicit"
# Replay policies
REPLAY_INSTANT = "instant"
REPLAY_ORIGINAL = "original"Install with Tessl CLI
npx tessl i tessl/pypi-nats-py