An asyncio Python client for the NATS messaging system with JetStream, microservices, and key-value store support
—
Comprehensive exception hierarchy covering connection, protocol, JetStream, key-value, object store, and microservice errors with specific error types for precise error handling.
Base error classes and connection-related exceptions.
class Error(Exception):
"""Base NATS error class."""
class ConnectionClosedError(Error):
"""Connection was closed."""
class TimeoutError(Error):
"""Operation timed out."""
class NoRespondersError(Error):
"""No services responded to request."""
class StaleConnectionError(Error):
"""Connection is stale and unusable."""
class OutboundBufferLimitError(Error):
"""Outbound buffer limit exceeded."""
class UnexpectedEOF(Error):
"""Unexpected end of file/connection."""
class FlushTimeoutError(Error):
"""Flush operation timed out."""import asyncio
import nats
from nats.errors import (
ConnectionClosedError, TimeoutError, NoRespondersError,
StaleConnectionError, FlushTimeoutError
)
async def robust_client():
nc = None
try:
# Connect with error handling
nc = await nats.connect("nats://localhost:4222")
# Publish with flush error handling
await nc.publish("test.subject", b"Hello")
await nc.flush(timeout=5.0)
# Request with timeout and no responders handling
response = await nc.request("api.service", b"request", timeout=2.0)
print(f"Response: {response.data.decode()}")
except ConnectionClosedError:
print("Connection was closed unexpectedly")
except FlushTimeoutError:
print("Failed to flush messages within timeout")
except TimeoutError:
print("Request timed out")
except NoRespondersError:
print("No service available to handle request")
except StaleConnectionError:
print("Connection is stale, need to reconnect")
except Exception as e:
print(f"Unexpected error: {e}")
finally:
if nc and not nc.is_closed():
await nc.close()Network and protocol-level error handling.
class SecureConnRequiredError(Error):
"""Server requires secure connection."""
class SecureConnWantedError(Error):
"""Server prefers secure connection."""
class SecureConnFailedError(Error):
"""Secure connection failed to establish."""
class AuthorizationError(Error):
"""Authentication/authorization failed."""
class NoServersError(Error):
"""No servers available for connection."""
class ProtocolError(Error):
"""NATS protocol error."""
class MaxPayloadError(Error):
"""Message payload exceeds maximum size."""from nats.errors import (
SecureConnRequiredError, AuthorizationError, NoServersError,
MaxPayloadError, ProtocolError
)
async def secure_connection():
try:
# Try insecure connection first
nc = await nats.connect("nats://secure-server:4222")
except SecureConnRequiredError:
print("Server requires TLS, retrying with secure connection")
import ssl
ssl_ctx = ssl.create_default_context()
nc = await nats.connect("tls://secure-server:4443", tls=ssl_ctx)
except AuthorizationError:
print("Authentication failed, check credentials")
return None
except NoServersError:
print("No NATS servers available")
return None
try:
# Test large message
large_message = b"x" * (2 * 1024 * 1024) # 2MB
await nc.publish("test", large_message)
except MaxPayloadError as e:
print(f"Message too large: {e}")
# Split into smaller chunks
chunk_size = nc.max_payload()
for i in range(0, len(large_message), chunk_size):
chunk = large_message[i:i + chunk_size]
await nc.publish(f"test.chunk.{i//chunk_size}", chunk)
return ncSubscription and message processing error handling.
class BadSubscriptionError(Error):
"""Invalid subscription parameters."""
class BadSubjectError(Error):
"""Invalid subject format."""
class SlowConsumerError(Error):
"""Consumer cannot keep up with message rate."""
class InvalidCallbackTypeError(Error):
"""Invalid callback function type."""
class BadTimeoutError(Error):
"""Invalid timeout value."""
class DrainTimeoutError(Error):
"""Drain operation timed out."""
class ConnectionDrainingError(Error):
"""Connection is in draining state."""
class ConnectionReconnectingError(Error):
"""Connection is in reconnecting state."""from nats.errors import (
BadSubscriptionError, SlowConsumerError, DrainTimeoutError,
ConnectionReconnectingError
)
async def robust_subscription():
nc = await nats.connect()
try:
# Subscribe with error handling
async def message_handler(msg):
try:
await process_message(msg)
except Exception as e:
print(f"Message processing error: {e}")
sub = await nc.subscribe("events.*", cb=message_handler)
# Monitor for slow consumer
while True:
await asyncio.sleep(10)
if sub.pending_msgs() > 10000:
print("Warning: High pending message count")
except BadSubscriptionError as e:
print(f"Invalid subscription: {e}")
except SlowConsumerError:
print("Consumer is too slow, increase processing capacity")
except ConnectionReconnectingError:
print("Connection is reconnecting, waiting...")
await asyncio.sleep(5) # Wait for reconnection
finally:
try:
await nc.drain(timeout=30)
except DrainTimeoutError:
print("Drain timed out, forcing close")
await nc.close()JetStream-specific error handling for streams and consumers.
from nats.js.errors import (
Error as JSError,
APIError,
ServiceUnavailableError,
ServerError,
NotFoundError,
BadRequestError,
NoStreamResponseError,
TooManyStalledMsgsError,
FetchTimeoutError,
ConsumerSequenceMismatchError
)from nats.js.errors import (
APIError, NotFoundError, BadRequestError, ServiceUnavailableError,
FetchTimeoutError, ConsumerSequenceMismatchError
)
async def jetstream_operations():
nc = await nats.connect()
js = nc.jetstream()
jsm = nc.jsm()
try:
# Create stream with error handling
stream_info = await jsm.add_stream(
name="events",
subjects=["events.*"]
)
print(f"Created stream: {stream_info.config.name}")
except BadRequestError as e:
print(f"Invalid stream configuration: {e}")
except ServiceUnavailableError:
print("JetStream service unavailable")
return
try:
# Publish to JetStream
ack = await js.publish("events.test", b"test message")
print(f"Published message at sequence {ack.seq}")
except APIError as e:
print(f"JetStream API error: {e}")
try:
# Pull subscribe with error handling
psub = await js.pull_subscribe("events.*", durable="test-consumer")
msgs = await psub.fetch(batch_size=10, timeout=5.0)
for msg in msgs:
await msg.ack()
except FetchTimeoutError:
print("No messages available within timeout")
except ConsumerSequenceMismatchError as e:
print(f"Consumer sequence mismatch: {e}")
# Reset consumer or handle sequence gap
try:
# Get stream info
info = await jsm.stream_info("events")
print(f"Stream has {info.state.messages} messages")
except NotFoundError:
print("Stream 'events' not found")Key-value store specific error handling.
from nats.js.errors import (
BucketNotFoundError,
BadBucketError,
KeyValueError,
KeyDeletedError,
KeyNotFoundError,
KeyWrongLastSequenceError,
NoKeysError,
KeyHistoryTooLargeError,
InvalidKeyError,
InvalidBucketNameError
)from nats.js.errors import (
BucketNotFoundError, KeyNotFoundError, KeyDeletedError,
KeyWrongLastSequenceError, InvalidKeyError
)
async def kv_operations():
nc = await nats.connect()
js = nc.jetstream()
try:
# Get or create KV store
kv = await js.key_value("user-sessions")
except BucketNotFoundError:
print("Creating new KV bucket")
kv = await js.create_key_value(bucket="user-sessions")
try:
# Get key with error handling
entry = await kv.get("session:user123")
print(f"Session data: {entry.value.decode()}")
except KeyNotFoundError:
print("Session not found, creating new one")
await kv.put("session:user123", b'{"new": "session"}')
except KeyDeletedError:
print("Session was deleted")
try:
# Conditional update with error handling
entry = await kv.get("session:user123")
updated_data = b'{"updated": "session"}'
await kv.update("session:user123", updated_data, entry.revision)
except KeyWrongLastSequenceError:
print("Session was modified by another process")
# Retry with latest revision
entry = await kv.get("session:user123")
await kv.update("session:user123", updated_data, entry.revision)
try:
# Validate key format
await kv.put("invalid key name!", b"data")
except InvalidKeyError as e:
print(f"Invalid key format: {e}")Object store specific error handling.
from nats.js.errors import (
InvalidObjectNameError,
BadObjectMetaError,
LinkIsABucketError,
DigestMismatchError,
ObjectNotFoundError,
ObjectDeletedError,
ObjectAlreadyExists
)from nats.js.errors import (
ObjectNotFoundError, ObjectDeletedError, ObjectAlreadyExists,
DigestMismatchError, BadObjectMetaError
)
async def object_store_operations():
nc = await nats.connect()
js = nc.jetstream()
try:
os = await js.object_store("file-storage")
# Store object with error handling
obj_info = await os.put("document.pdf", file_data)
print(f"Stored object: {obj_info.name}")
except ObjectAlreadyExists:
print("Object already exists")
# Update existing object
obj_info = await os.put("document.pdf", file_data, replace=True)
try:
# Retrieve object
data = await os.get("document.pdf")
# Verify integrity
if verify_checksum(data, expected_checksum):
print("Data integrity verified")
except ObjectNotFoundError:
print("Object not found")
except ObjectDeletedError:
print("Object was deleted")
except DigestMismatchError as e:
print(f"Data corruption detected: {e}")
# Handle corrupted data
try:
# Update metadata
from nats.js.api import ObjectMeta
meta = ObjectMeta(
name="document.pdf",
description="Updated document"
)
await os.update_meta("document.pdf", meta)
except BadObjectMetaError as e:
print(f"Invalid object metadata: {e}")Service framework error handling.
from nats.micro import ServiceError
class ServiceError(Exception):
"""Service error with code and description."""
def __init__(self, code: str, description: str):
self.code = code
self.description = description
super().__init__(f"{code}: {description}")from nats.micro import ServiceError, Request
async def service_handler(request: Request):
try:
# Process request
data = json.loads(request.data.decode())
result = await process_service_request(data)
response = json.dumps(result).encode()
await request.respond(response)
except ValidationError as e:
await request.respond_error(
code="VALIDATION_ERROR",
description=str(e)
)
except AuthenticationError:
await request.respond_error(
code="UNAUTHORIZED",
description="Authentication required"
)
except RateLimitError:
await request.respond_error(
code="RATE_LIMITED",
description="Too many requests"
)
except ServiceError as e:
# Re-raise service errors to be handled by framework
await request.respond_error(e.code, e.description)
except Exception as e:
# Log unexpected errors
logger.error(f"Unexpected service error: {e}")
await request.respond_error(
code="INTERNAL_ERROR",
description="Internal server error"
)
# Custom service errors
class BusinessLogicError(ServiceError):
def __init__(self, message: str):
super().__init__("BUSINESS_LOGIC_ERROR", message)
class DataValidationError(ServiceError):
def __init__(self, field: str, message: str):
super().__init__("VALIDATION_ERROR", f"{field}: {message}")
# Usage in service logic
async def create_user_service(request: Request):
try:
user_data = json.loads(request.data.decode())
if not user_data.get("email"):
raise DataValidationError("email", "Email is required")
if await user_exists(user_data["email"]):
raise BusinessLogicError("User already exists")
user = await create_user(user_data)
await request.respond(json.dumps(user.to_dict()).encode())
except ServiceError:
raise # Let framework handle service errors
except Exception as e:
raise ServiceError("INTERNAL_ERROR", str(e))Implement robust retry logic for transient errors.
import asyncio
from typing import Callable, Type
async def retry_with_backoff(
operation: Callable,
max_retries: int = 3,
backoff_factor: float = 2.0,
exceptions: tuple = (Exception,)
) -> any:
"""Retry operation with exponential backoff."""
for attempt in range(max_retries + 1):
try:
return await operation()
except exceptions as e:
if attempt == max_retries:
raise e
wait_time = backoff_factor ** attempt
print(f"Attempt {attempt + 1} failed: {e}, retrying in {wait_time}s")
await asyncio.sleep(wait_time)
# Usage
async def flaky_operation():
# Simulate flaky network operation
response = await nc.request("flaky.service", b"request", timeout=1.0)
return response
try:
result = await retry_with_backoff(
flaky_operation,
max_retries=3,
exceptions=(TimeoutError, ConnectionClosedError)
)
except Exception as e:
print(f"Operation failed after retries: {e}")Prevent cascading failures with circuit breaker.
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
async def call(self, operation):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is open")
try:
result = await operation()
self.on_success()
return result
except Exception as e:
self.on_failure()
raise e
def on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# Usage
circuit_breaker = CircuitBreaker(failure_threshold=3, timeout=30)
async def protected_service_call():
try:
return await circuit_breaker.call(lambda: nc.request("service", b"data"))
except Exception as e:
print(f"Service call failed: {e}")
return None# Error categories
NATS_ERRORS = [
Error, ConnectionClosedError, TimeoutError, NoRespondersError,
StaleConnectionError, OutboundBufferLimitError, UnexpectedEOF,
FlushTimeoutError, SecureConnRequiredError, SecureConnWantedError,
SecureConnFailedError, AuthorizationError, NoServersError,
ProtocolError, MaxPayloadError, BadSubscriptionError,
BadSubjectError, SlowConsumerError, InvalidCallbackTypeError,
BadTimeoutError, DrainTimeoutError, ConnectionDrainingError,
ConnectionReconnectingError, InvalidUserCredentialsError,
JsonParseError
]
JETSTREAM_ERRORS = [
"APIError", "ServiceUnavailableError", "ServerError", "NotFoundError",
"BadRequestError", "NoStreamResponseError", "TooManyStalledMsgsError",
"FetchTimeoutError", "ConsumerSequenceMismatchError"
]
KEYVALUE_ERRORS = [
"BucketNotFoundError", "BadBucketError", "KeyValueError",
"KeyDeletedError", "KeyNotFoundError", "KeyWrongLastSequenceError",
"NoKeysError", "KeyHistoryTooLargeError", "InvalidKeyError",
"InvalidBucketNameError"
]
OBJECTSTORE_ERRORS = [
"InvalidObjectNameError", "BadObjectMetaError", "LinkIsABucketError",
"DigestMismatchError", "ObjectNotFoundError", "ObjectDeletedError",
"ObjectAlreadyExists"
]Install with Tessl CLI
npx tessl i tessl/pypi-nats-py