CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-nats-py

An asyncio Python client for the NATS messaging system with JetStream, microservices, and key-value store support

Pending
Overview
Eval results
Files

error-handling.mddocs/

Error Handling

Comprehensive exception hierarchy covering connection, protocol, JetStream, key-value, object store, and microservice errors with specific error types for precise error handling.

Capabilities

Core NATS Errors

Base error classes and connection-related exceptions.

class Error(Exception):
    """Base NATS error class."""

class ConnectionClosedError(Error):
    """Connection was closed."""

class TimeoutError(Error):
    """Operation timed out."""

class NoRespondersError(Error):
    """No services responded to request."""

class StaleConnectionError(Error):
    """Connection is stale and unusable."""

class OutboundBufferLimitError(Error):
    """Outbound buffer limit exceeded."""

class UnexpectedEOF(Error):
    """Unexpected end of file/connection."""

class FlushTimeoutError(Error):
    """Flush operation timed out."""

Usage Examples

import asyncio
import nats
from nats.errors import (
    ConnectionClosedError, TimeoutError, NoRespondersError,
    StaleConnectionError, FlushTimeoutError
)

async def robust_client():
    nc = None
    try:
        # Connect with error handling
        nc = await nats.connect("nats://localhost:4222")
        
        # Publish with flush error handling
        await nc.publish("test.subject", b"Hello")
        await nc.flush(timeout=5.0)
        
        # Request with timeout and no responders handling
        response = await nc.request("api.service", b"request", timeout=2.0)
        print(f"Response: {response.data.decode()}")
        
    except ConnectionClosedError:
        print("Connection was closed unexpectedly")
    except FlushTimeoutError:
        print("Failed to flush messages within timeout")
    except TimeoutError:
        print("Request timed out")
    except NoRespondersError:
        print("No service available to handle request")
    except StaleConnectionError:
        print("Connection is stale, need to reconnect")
    except Exception as e:
        print(f"Unexpected error: {e}")
    finally:
        if nc and not nc.is_closed():
            await nc.close()

Connection and Protocol Errors

Network and protocol-level error handling.

class SecureConnRequiredError(Error):
    """Server requires secure connection."""

class SecureConnWantedError(Error):
    """Server prefers secure connection."""

class SecureConnFailedError(Error):
    """Secure connection failed to establish."""

class AuthorizationError(Error):
    """Authentication/authorization failed."""

class NoServersError(Error):
    """No servers available for connection."""

class ProtocolError(Error):
    """NATS protocol error."""

class MaxPayloadError(Error):
    """Message payload exceeds maximum size."""

Usage Examples

from nats.errors import (
    SecureConnRequiredError, AuthorizationError, NoServersError,
    MaxPayloadError, ProtocolError
)

async def secure_connection():
    try:
        # Try insecure connection first
        nc = await nats.connect("nats://secure-server:4222")
        
    except SecureConnRequiredError:
        print("Server requires TLS, retrying with secure connection")
        import ssl
        ssl_ctx = ssl.create_default_context()
        nc = await nats.connect("tls://secure-server:4443", tls=ssl_ctx)
        
    except AuthorizationError:
        print("Authentication failed, check credentials")
        return None
        
    except NoServersError:
        print("No NATS servers available")
        return None
    
    try:
        # Test large message
        large_message = b"x" * (2 * 1024 * 1024)  # 2MB
        await nc.publish("test", large_message)
        
    except MaxPayloadError as e:
        print(f"Message too large: {e}")
        # Split into smaller chunks
        chunk_size = nc.max_payload()
        for i in range(0, len(large_message), chunk_size):
            chunk = large_message[i:i + chunk_size]
            await nc.publish(f"test.chunk.{i//chunk_size}", chunk)
    
    return nc

Subscription and Message Errors

Subscription and message processing error handling.

class BadSubscriptionError(Error):
    """Invalid subscription parameters."""

class BadSubjectError(Error):
    """Invalid subject format."""

class SlowConsumerError(Error):
    """Consumer cannot keep up with message rate."""

class InvalidCallbackTypeError(Error):
    """Invalid callback function type."""

class BadTimeoutError(Error):
    """Invalid timeout value."""

class DrainTimeoutError(Error):
    """Drain operation timed out."""

class ConnectionDrainingError(Error):
    """Connection is in draining state."""

class ConnectionReconnectingError(Error):
    """Connection is in reconnecting state."""

Usage Examples

from nats.errors import (
    BadSubscriptionError, SlowConsumerError, DrainTimeoutError,
    ConnectionReconnectingError
)

async def robust_subscription():
    nc = await nats.connect()
    
    try:
        # Subscribe with error handling
        async def message_handler(msg):
            try:
                await process_message(msg)
            except Exception as e:
                print(f"Message processing error: {e}")
        
        sub = await nc.subscribe("events.*", cb=message_handler)
        
        # Monitor for slow consumer
        while True:
            await asyncio.sleep(10)
            
            if sub.pending_msgs() > 10000:
                print("Warning: High pending message count")
            
    except BadSubscriptionError as e:
        print(f"Invalid subscription: {e}")
        
    except SlowConsumerError:
        print("Consumer is too slow, increase processing capacity")
        
    except ConnectionReconnectingError:
        print("Connection is reconnecting, waiting...")
        await asyncio.sleep(5)  # Wait for reconnection
        
    finally:
        try:
            await nc.drain(timeout=30)
        except DrainTimeoutError:
            print("Drain timed out, forcing close")
            await nc.close()

JetStream Errors

JetStream-specific error handling for streams and consumers.

from nats.js.errors import (
    Error as JSError,
    APIError,
    ServiceUnavailableError,
    ServerError,
    NotFoundError,
    BadRequestError,
    NoStreamResponseError,
    TooManyStalledMsgsError,
    FetchTimeoutError,
    ConsumerSequenceMismatchError
)

Usage Examples

from nats.js.errors import (
    APIError, NotFoundError, BadRequestError, ServiceUnavailableError,
    FetchTimeoutError, ConsumerSequenceMismatchError
)

async def jetstream_operations():
    nc = await nats.connect()
    js = nc.jetstream()
    jsm = nc.jsm()
    
    try:
        # Create stream with error handling
        stream_info = await jsm.add_stream(
            name="events",
            subjects=["events.*"]
        )
        print(f"Created stream: {stream_info.config.name}")
        
    except BadRequestError as e:
        print(f"Invalid stream configuration: {e}")
        
    except ServiceUnavailableError:
        print("JetStream service unavailable")
        return
    
    try:
        # Publish to JetStream
        ack = await js.publish("events.test", b"test message")
        print(f"Published message at sequence {ack.seq}")
        
    except APIError as e:
        print(f"JetStream API error: {e}")
    
    try:
        # Pull subscribe with error handling
        psub = await js.pull_subscribe("events.*", durable="test-consumer")
        
        msgs = await psub.fetch(batch_size=10, timeout=5.0)
        for msg in msgs:
            await msg.ack()
            
    except FetchTimeoutError:
        print("No messages available within timeout")
        
    except ConsumerSequenceMismatchError as e:
        print(f"Consumer sequence mismatch: {e}")
        # Reset consumer or handle sequence gap
    
    try:
        # Get stream info
        info = await jsm.stream_info("events")
        print(f"Stream has {info.state.messages} messages")
        
    except NotFoundError:
        print("Stream 'events' not found")

Key-Value Store Errors

Key-value store specific error handling.

from nats.js.errors import (
    BucketNotFoundError,
    BadBucketError,
    KeyValueError,
    KeyDeletedError,
    KeyNotFoundError,
    KeyWrongLastSequenceError,
    NoKeysError,
    KeyHistoryTooLargeError,
    InvalidKeyError,
    InvalidBucketNameError
)

Usage Examples

from nats.js.errors import (
    BucketNotFoundError, KeyNotFoundError, KeyDeletedError,
    KeyWrongLastSequenceError, InvalidKeyError
)

async def kv_operations():
    nc = await nats.connect()
    js = nc.jetstream()
    
    try:
        # Get or create KV store
        kv = await js.key_value("user-sessions")
        
    except BucketNotFoundError:
        print("Creating new KV bucket")
        kv = await js.create_key_value(bucket="user-sessions")
    
    try:
        # Get key with error handling
        entry = await kv.get("session:user123")
        print(f"Session data: {entry.value.decode()}")
        
    except KeyNotFoundError:
        print("Session not found, creating new one")
        await kv.put("session:user123", b'{"new": "session"}')
        
    except KeyDeletedError:
        print("Session was deleted")
    
    try:
        # Conditional update with error handling
        entry = await kv.get("session:user123")
        updated_data = b'{"updated": "session"}'
        await kv.update("session:user123", updated_data, entry.revision)
        
    except KeyWrongLastSequenceError:
        print("Session was modified by another process")
        # Retry with latest revision
        entry = await kv.get("session:user123")
        await kv.update("session:user123", updated_data, entry.revision)
    
    try:
        # Validate key format
        await kv.put("invalid key name!", b"data")
        
    except InvalidKeyError as e:
        print(f"Invalid key format: {e}")

Object Store Errors

Object store specific error handling.

from nats.js.errors import (
    InvalidObjectNameError,
    BadObjectMetaError,
    LinkIsABucketError,
    DigestMismatchError,
    ObjectNotFoundError,
    ObjectDeletedError,
    ObjectAlreadyExists
)

Usage Examples

from nats.js.errors import (
    ObjectNotFoundError, ObjectDeletedError, ObjectAlreadyExists,
    DigestMismatchError, BadObjectMetaError
)

async def object_store_operations():
    nc = await nats.connect()
    js = nc.jetstream()
    
    try:
        os = await js.object_store("file-storage")
        
        # Store object with error handling
        obj_info = await os.put("document.pdf", file_data)
        print(f"Stored object: {obj_info.name}")
        
    except ObjectAlreadyExists:
        print("Object already exists")
        # Update existing object
        obj_info = await os.put("document.pdf", file_data, replace=True)
    
    try:
        # Retrieve object
        data = await os.get("document.pdf")
        
        # Verify integrity
        if verify_checksum(data, expected_checksum):
            print("Data integrity verified")
        
    except ObjectNotFoundError:
        print("Object not found")
        
    except ObjectDeletedError:
        print("Object was deleted")
        
    except DigestMismatchError as e:
        print(f"Data corruption detected: {e}")
        # Handle corrupted data
    
    try:
        # Update metadata
        from nats.js.api import ObjectMeta
        meta = ObjectMeta(
            name="document.pdf",
            description="Updated document"
        )
        await os.update_meta("document.pdf", meta)
        
    except BadObjectMetaError as e:
        print(f"Invalid object metadata: {e}")

Microservices Errors

Service framework error handling.

from nats.micro import ServiceError

class ServiceError(Exception):
    """Service error with code and description."""
    def __init__(self, code: str, description: str):
        self.code = code
        self.description = description
        super().__init__(f"{code}: {description}")

Usage Examples

from nats.micro import ServiceError, Request

async def service_handler(request: Request):
    try:
        # Process request
        data = json.loads(request.data.decode())
        result = await process_service_request(data)
        
        response = json.dumps(result).encode()
        await request.respond(response)
        
    except ValidationError as e:
        await request.respond_error(
            code="VALIDATION_ERROR",
            description=str(e)
        )
        
    except AuthenticationError:
        await request.respond_error(
            code="UNAUTHORIZED",
            description="Authentication required"
        )
        
    except RateLimitError:
        await request.respond_error(
            code="RATE_LIMITED",
            description="Too many requests"
        )
        
    except ServiceError as e:
        # Re-raise service errors to be handled by framework
        await request.respond_error(e.code, e.description)
        
    except Exception as e:
        # Log unexpected errors
        logger.error(f"Unexpected service error: {e}")
        await request.respond_error(
            code="INTERNAL_ERROR",
            description="Internal server error"
        )

# Custom service errors
class BusinessLogicError(ServiceError):
    def __init__(self, message: str):
        super().__init__("BUSINESS_LOGIC_ERROR", message)

class DataValidationError(ServiceError):
    def __init__(self, field: str, message: str):
        super().__init__("VALIDATION_ERROR", f"{field}: {message}")

# Usage in service logic
async def create_user_service(request: Request):
    try:
        user_data = json.loads(request.data.decode())
        
        if not user_data.get("email"):
            raise DataValidationError("email", "Email is required")
        
        if await user_exists(user_data["email"]):
            raise BusinessLogicError("User already exists")
        
        user = await create_user(user_data)
        await request.respond(json.dumps(user.to_dict()).encode())
        
    except ServiceError:
        raise  # Let framework handle service errors
    except Exception as e:
        raise ServiceError("INTERNAL_ERROR", str(e))

Error Handling Patterns

Retry Strategies

Implement robust retry logic for transient errors.

import asyncio
from typing import Callable, Type

async def retry_with_backoff(
    operation: Callable,
    max_retries: int = 3,
    backoff_factor: float = 2.0,
    exceptions: tuple = (Exception,)
) -> any:
    """Retry operation with exponential backoff."""
    
    for attempt in range(max_retries + 1):
        try:
            return await operation()
        except exceptions as e:
            if attempt == max_retries:
                raise e
            
            wait_time = backoff_factor ** attempt
            print(f"Attempt {attempt + 1} failed: {e}, retrying in {wait_time}s")
            await asyncio.sleep(wait_time)

# Usage
async def flaky_operation():
    # Simulate flaky network operation
    response = await nc.request("flaky.service", b"request", timeout=1.0)
    return response

try:
    result = await retry_with_backoff(
        flaky_operation,
        max_retries=3,
        exceptions=(TimeoutError, ConnectionClosedError)
    )
except Exception as e:
    print(f"Operation failed after retries: {e}")

Circuit Breaker Pattern

Prevent cascading failures with circuit breaker.

import time
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
    
    async def call(self, operation):
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.timeout:
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is open")
        
        try:
            result = await operation()
            self.on_success()
            return result
        except Exception as e:
            self.on_failure()
            raise e
    
    def on_success(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED
    
    def on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

# Usage
circuit_breaker = CircuitBreaker(failure_threshold=3, timeout=30)

async def protected_service_call():
    try:
        return await circuit_breaker.call(lambda: nc.request("service", b"data"))
    except Exception as e:
        print(f"Service call failed: {e}")
        return None

Constants

# Error categories
NATS_ERRORS = [
    Error, ConnectionClosedError, TimeoutError, NoRespondersError,
    StaleConnectionError, OutboundBufferLimitError, UnexpectedEOF,
    FlushTimeoutError, SecureConnRequiredError, SecureConnWantedError,
    SecureConnFailedError, AuthorizationError, NoServersError,
    ProtocolError, MaxPayloadError, BadSubscriptionError,
    BadSubjectError, SlowConsumerError, InvalidCallbackTypeError,
    BadTimeoutError, DrainTimeoutError, ConnectionDrainingError,
    ConnectionReconnectingError, InvalidUserCredentialsError,
    JsonParseError
]

JETSTREAM_ERRORS = [
    "APIError", "ServiceUnavailableError", "ServerError", "NotFoundError",
    "BadRequestError", "NoStreamResponseError", "TooManyStalledMsgsError",
    "FetchTimeoutError", "ConsumerSequenceMismatchError"
]

KEYVALUE_ERRORS = [
    "BucketNotFoundError", "BadBucketError", "KeyValueError",
    "KeyDeletedError", "KeyNotFoundError", "KeyWrongLastSequenceError",
    "NoKeysError", "KeyHistoryTooLargeError", "InvalidKeyError",
    "InvalidBucketNameError"
]

OBJECTSTORE_ERRORS = [
    "InvalidObjectNameError", "BadObjectMetaError", "LinkIsABucketError",
    "DigestMismatchError", "ObjectNotFoundError", "ObjectDeletedError",
    "ObjectAlreadyExists"
]

Install with Tessl CLI

npx tessl i tessl/pypi-nats-py

docs

core-client.md

error-handling.md

index.md

jetstream-management.md

jetstream.md

key-value-store.md

message-handling.md

microservices.md

object-store.md

tile.json