An asyncio Python client for the NATS messaging system with JetStream, microservices, and key-value store support
—
Message processing with headers, reply handling, JetStream acknowledgments, and comprehensive metadata access for building sophisticated message-driven applications.
Understanding NATS message structure and properties.
class Msg:
"""NATS message representation."""
subject: str
data: bytes
reply: str
headers: Optional[Dict[str, str]]
header: Optional[Dict[str, str]] # Alias for headers
sid: int
is_acked: bool
metadata: Optional[Metadata] # JetStream metadataasync def message_handler(msg):
print(f"Received message:")
print(f" Subject: {msg.subject}")
print(f" Data: {msg.data.decode()}")
print(f" Reply: {msg.reply}")
print(f" Headers: {msg.headers}")
print(f" Subscription ID: {msg.sid}")
# Access JetStream metadata if available
if msg.metadata:
print(f" Stream: {msg.metadata.stream}")
print(f" Sequence: {msg.metadata.sequence.stream}")
print(f" Consumer: {msg.metadata.sequence.consumer}")
# Subscribe and handle messages
await nc.subscribe("events.*", cb=message_handler)Send responses to messages with reply subjects.
class Msg:
async def respond(self, data: bytes) -> None:
"""
Send response to message reply subject.
Parameters:
- data: Response data
Raises:
- ValueError: No reply subject available
"""async def request_handler(msg):
try:
# Process request
request_data = json.loads(msg.data.decode())
result = await process_request(request_data)
# Send response
response = json.dumps(result).encode()
await msg.respond(response)
except Exception as e:
# Send error response
error_response = json.dumps({
"error": str(e),
"type": type(e).__name__
}).encode()
await msg.respond(error_response)
# Subscribe to requests
await nc.subscribe("api.requests", cb=request_handler)
# Client making request
response = await nc.request("api.requests", b'{"action": "get_user", "id": 123}')
result = json.loads(response.data.decode())Work with message headers for metadata and routing.
class Msg:
headers: Optional[Dict[str, str]]
header: Optional[Dict[str, str]] # Alias for headers propertyasync def header_aware_handler(msg):
# Check for authentication header
auth_token = msg.headers.get("Authorization") if msg.headers else None
if not auth_token:
print("No authorization header")
return
# Check content type
content_type = msg.headers.get("Content-Type", "text/plain")
# Process based on content type
if content_type == "application/json":
data = json.loads(msg.data.decode())
elif content_type == "application/xml":
data = parse_xml(msg.data)
else:
data = msg.data.decode()
# Check for correlation ID for request tracking
correlation_id = msg.headers.get("Correlation-ID")
print(f"Processing request {correlation_id}")
await process_data(data)
# Publishing with headers
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer token123",
"Correlation-ID": "req-12345",
"User-ID": "user456"
}
await nc.publish(
"api.data",
json.dumps({"key": "value"}).encode(),
headers=headers
)Handle JetStream message acknowledgments with various strategies.
class Msg:
is_acked: bool
async def ack(self) -> None:
"""Acknowledge JetStream message successfully processed."""
async def ack_sync(self, timeout: float = 1.0) -> None:
"""
Synchronously acknowledge JetStream message.
Parameters:
- timeout: Acknowledgment timeout in seconds
"""
async def nak(self, delay: float = None) -> None:
"""
Negative acknowledgment - message will be redelivered.
Parameters:
- delay: Delay before redelivery in seconds
"""
async def in_progress(self) -> None:
"""Extend acknowledgment deadline for longer processing."""
async def term(self) -> None:
"""Terminate message processing - no further redelivery."""async def jetstream_handler(msg):
try:
# Check if this is a JetStream message
if not msg.metadata:
print("Not a JetStream message")
return
print(f"Processing JetStream message {msg.metadata.sequence.stream}")
# Long-running processing
if await is_long_running_task(msg.data):
await msg.in_progress() # Extend processing deadline
# Process the message
result = await process_message(msg.data)
if result.success:
await msg.ack() # Successfully processed
print("Message acknowledged successfully")
else:
# Temporary failure - retry after delay
await msg.nak(delay=30.0) # Retry in 30 seconds
print("Message negatively acknowledged, will retry")
except FatalProcessingError as e:
# Permanent failure - don't retry
await msg.term()
print(f"Message terminated due to fatal error: {e}")
except Exception as e:
# Temporary error - retry immediately
await msg.nak()
print(f"Message processing failed, will retry: {e}")
# Subscribe to JetStream
js = nc.jetstream()
await js.subscribe("events.orders", cb=jetstream_handler, manual_ack=True)Access JetStream-specific message metadata.
class Metadata:
"""JetStream message metadata."""
sequence: SequencePair
num_delivered: int
num_pending: int
timestamp: datetime
stream: str
consumer: str
domain: str
class SequencePair:
"""Consumer and stream sequence numbers."""
consumer: int
stream: intasync def metadata_handler(msg):
if not msg.metadata:
print("Core NATS message (no JetStream metadata)")
return
meta = msg.metadata
print(f"JetStream Message Metadata:")
print(f" Stream: {meta.stream}")
print(f" Consumer: {meta.consumer}")
print(f" Stream Sequence: {meta.sequence.stream}")
print(f" Consumer Sequence: {meta.sequence.consumer}")
print(f" Delivered: {meta.num_delivered} times")
print(f" Pending: {meta.num_pending} messages")
print(f" Timestamp: {meta.timestamp}")
# Handle redelivery scenarios
if meta.num_delivered > 1:
print(f"This message has been redelivered {meta.num_delivered} times")
# Maybe handle differently based on delivery count
if meta.num_delivered > 3:
print("Too many redeliveries, terminating")
await msg.term()
return
# Process message
await process_jetstream_message(msg.data)
await msg.ack()
# JetStream subscription with metadata handling
await js.subscribe("stream.events", cb=metadata_handler)Handle subscriptions and their message flows.
class Subscription:
"""NATS subscription."""
def subject(self) -> str:
"""Get subscription subject pattern."""
def queue(self) -> str:
"""Get queue group name."""
def messages(self) -> AsyncIterator[Msg]:
"""Async iterator for messages."""
def pending_msgs(self) -> int:
"""Number of pending messages."""
def pending_bytes(self) -> int:
"""Number of pending bytes."""
def delivered(self) -> int:
"""Total messages delivered."""
async def next_msg(self, timeout: float = 1.0) -> Msg:
"""Get next message with timeout."""
async def drain(self) -> None:
"""Drain subscription."""
async def unsubscribe(self, limit: int = 0) -> None:
"""Unsubscribe after limit messages."""# Subscription with async iteration
sub = await nc.subscribe("events.*")
async def process_subscription():
async for msg in sub.messages():
print(f"Processing: {msg.subject}")
await handle_message(msg)
# Break on specific condition
if should_stop_processing():
break
# Manual message fetching
async def manual_processing():
sub = await nc.subscribe("work.queue")
while True:
try:
msg = await sub.next_msg(timeout=5.0)
await process_work_item(msg)
except TimeoutError:
print("No messages available")
break
except Exception as e:
print(f"Processing error: {e}")
# Monitor subscription health
async def monitor_subscription():
sub = await nc.subscribe("monitoring.*")
while True:
print(f"Subscription stats:")
print(f" Pending messages: {sub.pending_msgs()}")
print(f" Pending bytes: {sub.pending_bytes()}")
print(f" Total delivered: {sub.delivered()}")
await asyncio.sleep(10) # Check every 10 seconds
# Graceful subscription shutdown
async def graceful_shutdown():
# Stop accepting new messages and process pending
await sub.drain()
print("Subscription drained")Common message processing patterns and utilities.
# Fan-out pattern - one message to multiple handlers
async def fan_out_handler(msg):
# Process message with multiple handlers concurrently
await asyncio.gather(
analytics_handler(msg),
audit_handler(msg),
notification_handler(msg)
)
# Message batching
class MessageBatcher:
def __init__(self, batch_size=10, timeout=5.0):
self.batch = []
self.batch_size = batch_size
self.timeout = timeout
self.last_batch_time = time.time()
async def add_message(self, msg):
self.batch.append(msg)
# Process batch if full or timeout reached
if (len(self.batch) >= self.batch_size or
time.time() - self.last_batch_time > self.timeout):
await self.process_batch()
async def process_batch(self):
if not self.batch:
return
print(f"Processing batch of {len(self.batch)} messages")
await process_message_batch(self.batch)
# Acknowledge all messages
for msg in self.batch:
if msg.metadata: # JetStream message
await msg.ack()
self.batch.clear()
self.last_batch_time = time.time()
batcher = MessageBatcher()
async def batching_handler(msg):
await batcher.add_message(msg)
# Content-based routing
async def routing_handler(msg):
# Route based on subject
if msg.subject.startswith("user."):
await user_service_handler(msg)
elif msg.subject.startswith("order."):
await order_service_handler(msg)
elif msg.subject.startswith("inventory."):
await inventory_handler(msg)
else:
print(f"Unknown message type: {msg.subject}")
# Message transformation pipeline
async def transform_pipeline(msg):
# Step 1: Validate
if not await validate_message(msg):
await msg.nak()
return
# Step 2: Transform
transformed_data = await transform_message_data(msg.data)
# Step 3: Enrich with external data
enriched_data = await enrich_message(transformed_data, msg.headers)
# Step 4: Store result
await store_processed_message(enriched_data)
# Step 5: Acknowledge
if msg.metadata:
await msg.ack()# Subscription limits
DEFAULT_SUB_PENDING_MSGS_LIMIT = 512 * 1024
DEFAULT_SUB_PENDING_BYTES_LIMIT = 128 * 1024 * 1024
# JetStream limits
DEFAULT_JS_SUB_PENDING_MSGS_LIMIT = 512 * 1024
DEFAULT_JS_SUB_PENDING_BYTES_LIMIT = 256 * 1024 * 1024
# Message acknowledgment types
class Ack:
"""JetStream acknowledgment types."""
ACK = "+ACK"
NAK = "-NAK"
PROGRESS = "+WPI" # Work in Progress
TERM = "+TERM" # TerminateInstall with Tessl CLI
npx tessl i tessl/pypi-nats-py