CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-nats-py

An asyncio Python client for the NATS messaging system with JetStream, microservices, and key-value store support

Pending
Overview
Eval results
Files

message-handling.mddocs/

Message Handling

Message processing with headers, reply handling, JetStream acknowledgments, and comprehensive metadata access for building sophisticated message-driven applications.

Capabilities

Message Structure

Understanding NATS message structure and properties.

class Msg:
    """NATS message representation."""
    subject: str
    data: bytes
    reply: str
    headers: Optional[Dict[str, str]]
    header: Optional[Dict[str, str]]  # Alias for headers
    sid: int
    is_acked: bool
    metadata: Optional[Metadata]  # JetStream metadata

Usage Examples

async def message_handler(msg):
    print(f"Received message:")
    print(f"  Subject: {msg.subject}")
    print(f"  Data: {msg.data.decode()}")
    print(f"  Reply: {msg.reply}")
    print(f"  Headers: {msg.headers}")
    print(f"  Subscription ID: {msg.sid}")
    
    # Access JetStream metadata if available
    if msg.metadata:
        print(f"  Stream: {msg.metadata.stream}")
        print(f"  Sequence: {msg.metadata.sequence.stream}")
        print(f"  Consumer: {msg.metadata.sequence.consumer}")

# Subscribe and handle messages
await nc.subscribe("events.*", cb=message_handler)

Response Handling

Send responses to messages with reply subjects.

class Msg:
    async def respond(self, data: bytes) -> None:
        """
        Send response to message reply subject.
        
        Parameters:
        - data: Response data
        
        Raises:
        - ValueError: No reply subject available
        """

Usage Examples

async def request_handler(msg):
    try:
        # Process request
        request_data = json.loads(msg.data.decode())
        result = await process_request(request_data)
        
        # Send response
        response = json.dumps(result).encode()
        await msg.respond(response)
        
    except Exception as e:
        # Send error response
        error_response = json.dumps({
            "error": str(e),
            "type": type(e).__name__
        }).encode()
        await msg.respond(error_response)

# Subscribe to requests
await nc.subscribe("api.requests", cb=request_handler)

# Client making request
response = await nc.request("api.requests", b'{"action": "get_user", "id": 123}')
result = json.loads(response.data.decode())

Header Processing

Work with message headers for metadata and routing.

class Msg:
    headers: Optional[Dict[str, str]]
    header: Optional[Dict[str, str]]  # Alias for headers property

Usage Examples

async def header_aware_handler(msg):
    # Check for authentication header
    auth_token = msg.headers.get("Authorization") if msg.headers else None
    if not auth_token:
        print("No authorization header")
        return
    
    # Check content type
    content_type = msg.headers.get("Content-Type", "text/plain")
    
    # Process based on content type
    if content_type == "application/json":
        data = json.loads(msg.data.decode())
    elif content_type == "application/xml":
        data = parse_xml(msg.data)
    else:
        data = msg.data.decode()
    
    # Check for correlation ID for request tracking
    correlation_id = msg.headers.get("Correlation-ID")
    print(f"Processing request {correlation_id}")
    
    await process_data(data)

# Publishing with headers
headers = {
    "Content-Type": "application/json",
    "Authorization": "Bearer token123",
    "Correlation-ID": "req-12345",
    "User-ID": "user456"
}

await nc.publish(
    "api.data", 
    json.dumps({"key": "value"}).encode(),
    headers=headers
)

JetStream Message Acknowledgments

Handle JetStream message acknowledgments with various strategies.

class Msg:
    is_acked: bool
    
    async def ack(self) -> None:
        """Acknowledge JetStream message successfully processed."""
    
    async def ack_sync(self, timeout: float = 1.0) -> None:
        """
        Synchronously acknowledge JetStream message.
        
        Parameters:
        - timeout: Acknowledgment timeout in seconds
        """
    
    async def nak(self, delay: float = None) -> None:
        """
        Negative acknowledgment - message will be redelivered.
        
        Parameters:
        - delay: Delay before redelivery in seconds
        """
    
    async def in_progress(self) -> None:
        """Extend acknowledgment deadline for longer processing."""
    
    async def term(self) -> None:
        """Terminate message processing - no further redelivery."""

Usage Examples

async def jetstream_handler(msg):
    try:
        # Check if this is a JetStream message
        if not msg.metadata:
            print("Not a JetStream message")
            return
        
        print(f"Processing JetStream message {msg.metadata.sequence.stream}")
        
        # Long-running processing
        if await is_long_running_task(msg.data):
            await msg.in_progress()  # Extend processing deadline
        
        # Process the message
        result = await process_message(msg.data)
        
        if result.success:
            await msg.ack()  # Successfully processed
            print("Message acknowledged successfully")
        else:
            # Temporary failure - retry after delay
            await msg.nak(delay=30.0)  # Retry in 30 seconds
            print("Message negatively acknowledged, will retry")
            
    except FatalProcessingError as e:
        # Permanent failure - don't retry
        await msg.term()
        print(f"Message terminated due to fatal error: {e}")
        
    except Exception as e:
        # Temporary error - retry immediately
        await msg.nak()
        print(f"Message processing failed, will retry: {e}")

# Subscribe to JetStream
js = nc.jetstream()
await js.subscribe("events.orders", cb=jetstream_handler, manual_ack=True)

JetStream Metadata

Access JetStream-specific message metadata.

class Metadata:
    """JetStream message metadata."""
    sequence: SequencePair
    num_delivered: int
    num_pending: int
    timestamp: datetime
    stream: str
    consumer: str
    domain: str

class SequencePair:
    """Consumer and stream sequence numbers."""
    consumer: int
    stream: int

Usage Examples

async def metadata_handler(msg):
    if not msg.metadata:
        print("Core NATS message (no JetStream metadata)")
        return
    
    meta = msg.metadata
    print(f"JetStream Message Metadata:")
    print(f"  Stream: {meta.stream}")
    print(f"  Consumer: {meta.consumer}")
    print(f"  Stream Sequence: {meta.sequence.stream}")
    print(f"  Consumer Sequence: {meta.sequence.consumer}")
    print(f"  Delivered: {meta.num_delivered} times")
    print(f"  Pending: {meta.num_pending} messages")
    print(f"  Timestamp: {meta.timestamp}")
    
    # Handle redelivery scenarios
    if meta.num_delivered > 1:
        print(f"This message has been redelivered {meta.num_delivered} times")
        
        # Maybe handle differently based on delivery count
        if meta.num_delivered > 3:
            print("Too many redeliveries, terminating")
            await msg.term()
            return
    
    # Process message
    await process_jetstream_message(msg.data)
    await msg.ack()

# JetStream subscription with metadata handling
await js.subscribe("stream.events", cb=metadata_handler)

Subscription Management

Handle subscriptions and their message flows.

class Subscription:
    """NATS subscription."""
    
    def subject(self) -> str:
        """Get subscription subject pattern."""
    
    def queue(self) -> str:
        """Get queue group name."""
    
    def messages(self) -> AsyncIterator[Msg]:
        """Async iterator for messages."""
    
    def pending_msgs(self) -> int:
        """Number of pending messages."""
    
    def pending_bytes(self) -> int:
        """Number of pending bytes."""
    
    def delivered(self) -> int:
        """Total messages delivered."""
    
    async def next_msg(self, timeout: float = 1.0) -> Msg:
        """Get next message with timeout."""
    
    async def drain(self) -> None:
        """Drain subscription."""
    
    async def unsubscribe(self, limit: int = 0) -> None:
        """Unsubscribe after limit messages."""

Usage Examples

# Subscription with async iteration
sub = await nc.subscribe("events.*")

async def process_subscription():
    async for msg in sub.messages():
        print(f"Processing: {msg.subject}")
        await handle_message(msg)
        
        # Break on specific condition
        if should_stop_processing():
            break

# Manual message fetching
async def manual_processing():
    sub = await nc.subscribe("work.queue")
    
    while True:
        try:
            msg = await sub.next_msg(timeout=5.0)
            await process_work_item(msg)
        except TimeoutError:
            print("No messages available")
            break
        except Exception as e:
            print(f"Processing error: {e}")

# Monitor subscription health
async def monitor_subscription():
    sub = await nc.subscribe("monitoring.*")
    
    while True:
        print(f"Subscription stats:")
        print(f"  Pending messages: {sub.pending_msgs()}")
        print(f"  Pending bytes: {sub.pending_bytes()}")
        print(f"  Total delivered: {sub.delivered()}")
        
        await asyncio.sleep(10)  # Check every 10 seconds

# Graceful subscription shutdown
async def graceful_shutdown():
    # Stop accepting new messages and process pending
    await sub.drain()
    print("Subscription drained")

Message Patterns

Common message processing patterns and utilities.

Usage Examples

# Fan-out pattern - one message to multiple handlers
async def fan_out_handler(msg):
    # Process message with multiple handlers concurrently
    await asyncio.gather(
        analytics_handler(msg),
        audit_handler(msg),
        notification_handler(msg)
    )

# Message batching
class MessageBatcher:
    def __init__(self, batch_size=10, timeout=5.0):
        self.batch = []
        self.batch_size = batch_size
        self.timeout = timeout
        self.last_batch_time = time.time()
    
    async def add_message(self, msg):
        self.batch.append(msg)
        
        # Process batch if full or timeout reached
        if (len(self.batch) >= self.batch_size or 
            time.time() - self.last_batch_time > self.timeout):
            await self.process_batch()
    
    async def process_batch(self):
        if not self.batch:
            return
            
        print(f"Processing batch of {len(self.batch)} messages")
        await process_message_batch(self.batch)
        
        # Acknowledge all messages
        for msg in self.batch:
            if msg.metadata:  # JetStream message
                await msg.ack()
        
        self.batch.clear()
        self.last_batch_time = time.time()

batcher = MessageBatcher()

async def batching_handler(msg):
    await batcher.add_message(msg)

# Content-based routing
async def routing_handler(msg):
    # Route based on subject
    if msg.subject.startswith("user."):
        await user_service_handler(msg)
    elif msg.subject.startswith("order."):
        await order_service_handler(msg)
    elif msg.subject.startswith("inventory."):
        await inventory_handler(msg)
    else:
        print(f"Unknown message type: {msg.subject}")

# Message transformation pipeline
async def transform_pipeline(msg):
    # Step 1: Validate
    if not await validate_message(msg):
        await msg.nak()
        return
    
    # Step 2: Transform
    transformed_data = await transform_message_data(msg.data)
    
    # Step 3: Enrich with external data
    enriched_data = await enrich_message(transformed_data, msg.headers)
    
    # Step 4: Store result
    await store_processed_message(enriched_data)
    
    # Step 5: Acknowledge
    if msg.metadata:
        await msg.ack()

Constants

# Subscription limits
DEFAULT_SUB_PENDING_MSGS_LIMIT = 512 * 1024
DEFAULT_SUB_PENDING_BYTES_LIMIT = 128 * 1024 * 1024

# JetStream limits  
DEFAULT_JS_SUB_PENDING_MSGS_LIMIT = 512 * 1024
DEFAULT_JS_SUB_PENDING_BYTES_LIMIT = 256 * 1024 * 1024

# Message acknowledgment types
class Ack:
    """JetStream acknowledgment types."""
    ACK = "+ACK"
    NAK = "-NAK"
    PROGRESS = "+WPI"  # Work in Progress
    TERM = "+TERM"     # Terminate

Install with Tessl CLI

npx tessl i tessl/pypi-nats-py

docs

core-client.md

error-handling.md

index.md

jetstream-management.md

jetstream.md

key-value-store.md

message-handling.md

microservices.md

object-store.md

tile.json