An asyncio Python client for the NATS messaging system with JetStream, microservices, and key-value store support
—
Service discovery, request routing, monitoring, and health checks with built-in load balancing, error handling, and comprehensive service information for building distributed microservice architectures.
Create and manage microservices with automatic discovery and monitoring.
async def add_service(
nc: NATS,
config: ServiceConfig = None,
**kwargs
) -> Service:
"""
Create and start a microservice.
Parameters:
- nc: NATS connection
- config: Complete service configuration
- **kwargs: Individual configuration parameters
Returns:
Running service instance
"""
class Service:
async def start(self) -> None:
"""Start the service and begin handling requests."""
async def stop(self) -> None:
"""Stop the service gracefully."""
async def add_endpoint(self, config: EndpointConfig) -> Endpoint:
"""
Add new endpoint to service.
Parameters:
- config: Endpoint configuration
Returns:
Endpoint instance
"""
async def reset(self) -> None:
"""Reset service statistics."""import asyncio
import nats
from nats.micro import add_service, ServiceConfig, EndpointConfig
async def main():
nc = await nats.connect()
# Create service configuration
config = ServiceConfig(
name="user-service",
version="1.2.0",
description="User management service",
metadata={
"team": "backend",
"repository": "https://github.com/company/user-service"
}
)
# Create and start service
service = await add_service(nc, config=config)
await service.start()
# Add endpoints dynamically
await service.add_endpoint(EndpointConfig(
name="create-user",
subject="users.create",
handler=create_user_handler
))
print(f"Service {config.name} started")
# Keep service running
try:
await asyncio.sleep(3600) # Run for 1 hour
finally:
await service.stop()Handle incoming service requests with automatic routing and error handling.
class Request:
"""Service request wrapper."""
subject: str
headers: Dict[str, str]
data: bytes
async def respond(
self,
data: bytes = b"",
headers: Dict[str, str] = None
) -> None:
"""
Send successful response.
Parameters:
- data: Response data
- headers: Response headers
"""
async def respond_error(
self,
code: str,
description: str,
data: bytes = b"",
headers: Dict[str, str] = None
) -> None:
"""
Send error response.
Parameters:
- code: Error code
- description: Error description
- data: Error details
- headers: Response headers
"""
# Handler type
Handler = Callable[[Request], Awaitable[None]]
class ServiceError(Exception):
"""Service error with code and description."""
def __init__(self, code: str, description: str):
self.code = code
self.description = description
super().__init__(f"{code}: {description}")import json
from nats.micro import Request, ServiceError
async def create_user_handler(request: Request):
try:
# Parse request data
user_data = json.loads(request.data.decode())
# Validate input
if not user_data.get("email"):
await request.respond_error(
code="INVALID_INPUT",
description="Email is required",
data=b'{"field": "email"}'
)
return
# Create user (business logic)
user = await create_user(user_data)
# Send success response
response_data = json.dumps({
"id": user.id,
"email": user.email,
"created_at": user.created_at.isoformat()
}).encode()
await request.respond(
data=response_data,
headers={"Content-Type": "application/json"}
)
except ValidationError as e:
await request.respond_error(
code="VALIDATION_ERROR",
description=str(e)
)
except Exception as e:
await request.respond_error(
code="INTERNAL_ERROR",
description="Internal server error"
)
async def get_user_handler(request: Request):
# Extract user ID from subject or headers
user_id = request.headers.get("User-ID")
if not user_id:
await request.respond_error(
code="MISSING_USER_ID",
description="User-ID header required"
)
return
try:
user = await get_user_by_id(user_id)
response_data = json.dumps(user.to_dict()).encode()
await request.respond(data=response_data)
except UserNotFound:
await request.respond_error(
code="USER_NOT_FOUND",
description=f"User {user_id} not found"
)Configure service endpoints with subjects, handlers, and metadata.
@dataclass
class EndpointConfig:
"""Endpoint configuration."""
name: str
subject: str
handler: Handler
metadata: Dict[str, str] = None
queue_group: str = "q"
@dataclass
class GroupConfig:
"""Endpoint group configuration."""
name: str
queue_group: str = "q"
class Group:
"""Endpoint group for organizing related endpoints."""
async def add_endpoint(self, config: EndpointConfig) -> Endpoint:
"""Add endpoint to group."""
class Endpoint:
"""Individual service endpoint."""
pass# Create endpoints with different patterns
endpoints = [
EndpointConfig(
name="create-user",
subject="users.create",
handler=create_user_handler,
metadata={"description": "Create new user account"}
),
EndpointConfig(
name="get-user",
subject="users.get",
handler=get_user_handler,
metadata={"description": "Retrieve user by ID"}
),
EndpointConfig(
name="update-user",
subject="users.update",
handler=update_user_handler,
metadata={"description": "Update user information"}
),
EndpointConfig(
name="delete-user",
subject="users.delete",
handler=delete_user_handler,
metadata={"description": "Delete user account"}
)
]
# Add all endpoints to service
for endpoint_config in endpoints:
await service.add_endpoint(endpoint_config)
# Group related endpoints
group_config = GroupConfig(
name="user-management",
queue_group="user-workers"
)
group = await service.add_group(group_config)
await group.add_endpoint(EndpointConfig(
name="bulk-import",
subject="users.bulk.import",
handler=bulk_import_handler
))Built-in service discovery with health checks and statistics.
@dataclass
class ServiceInfo:
"""Service information for discovery."""
name: str
id: str
version: str
description: str
endpoints: List[EndpointInfo]
metadata: Dict[str, str]
@dataclass
class ServiceStats:
"""Service statistics."""
name: str
id: str
started: datetime
endpoints: List[EndpointStats]
@dataclass
class ServicePing:
"""Service ping response."""
name: str
id: str
version: str
metadata: Dict[str, str]
@dataclass
class EndpointInfo:
"""Endpoint information."""
name: str
subject: str
metadata: Dict[str, str]
@dataclass
class EndpointStats:
"""Endpoint statistics."""
name: str
subject: str
num_requests: int
num_errors: int
last_error: str
processing_time: float
average_processing_time: float# Service automatically responds to discovery requests
# These are handled internally by the framework
# Get service information (as a client)
service_info = await nc.request("$SRV.INFO", timeout=5.0)
info = json.loads(service_info.data.decode())
print(f"Available services: {[s['name'] for s in info]}")
# Ping specific service
ping_response = await nc.request("$SRV.PING.user-service", timeout=2.0)
ping = json.loads(ping_response.data.decode())
print(f"Service {ping['name']} version {ping['version']} is alive")
# Get service statistics
stats_response = await nc.request("$SRV.STATS.user-service", timeout=2.0)
stats = json.loads(stats_response.data.decode())
print(f"Total requests: {sum(e['num_requests'] for e in stats['endpoints'])}")Automatic load balancing across multiple service instances.
# Constants for service framework
DEFAULT_QUEUE_GROUP = "q"
DEFAULT_PREFIX = "$SRV"
# Service control verbs
class ServiceVerb:
PING = "PING"
STATS = "STATS"
INFO = "INFO"# Multiple service instances automatically load balance
async def start_service_instance(instance_id: str):
config = ServiceConfig(
name="user-service",
version="1.2.0",
metadata={"instance_id": instance_id}
)
service = await add_service(nc, config=config)
await service.start()
# Add same endpoints - they'll automatically load balance
await service.add_endpoint(EndpointConfig(
name="create-user",
subject="users.create",
handler=create_user_handler
))
return service
# Start multiple instances
instances = await asyncio.gather(*[
start_service_instance(f"instance-{i}")
for i in range(3)
])
print("Started 3 service instances with automatic load balancing")@dataclass
class ServiceConfig:
"""Complete service configuration."""
name: str
version: str = "0.0.1"
description: str = ""
metadata: Dict[str, str] = None
queue_group: str = DEFAULT_QUEUE_GROUP
# Control subjects configuration
stats_handler: Handler = None
info_handler: Handler = None
ping_handler: Handler = NoneOverride default service control behavior.
async def custom_stats_handler(request: Request):
# Custom statistics response
stats = {
"service": "user-service",
"uptime": time.time() - start_time,
"custom_metrics": {
"cache_hit_rate": 0.95,
"db_connections": 10
}
}
await request.respond(json.dumps(stats).encode())
# Use custom handler
config = ServiceConfig(
name="user-service",
version="1.2.0",
stats_handler=custom_stats_handler
)
service = await add_service(nc, config=config)Consistent error handling across endpoints.
# Common error codes
class ErrorCodes:
INVALID_INPUT = "INVALID_INPUT"
NOT_FOUND = "NOT_FOUND"
UNAUTHORIZED = "UNAUTHORIZED"
INTERNAL_ERROR = "INTERNAL_ERROR"
RATE_LIMITED = "RATE_LIMITED"
# Error handling middleware
async def with_error_handling(handler: Handler) -> Handler:
async def wrapper(request: Request):
try:
await handler(request)
except ValidationError as e:
await request.respond_error(
ErrorCodes.INVALID_INPUT,
str(e)
)
except NotFoundError as e:
await request.respond_error(
ErrorCodes.NOT_FOUND,
str(e)
)
except Exception as e:
logger.error(f"Unexpected error: {e}")
await request.respond_error(
ErrorCodes.INTERNAL_ERROR,
"Internal server error"
)
return wrapper
# Apply to endpoints
await service.add_endpoint(EndpointConfig(
name="create-user",
subject="users.create",
handler=with_error_handling(create_user_handler)
))# Default configuration
DEFAULT_QUEUE_GROUP = "q"
DEFAULT_PREFIX = "$SRV"
# Response types
INFO_RESPONSE_TYPE = "io.nats.micro.v1.info_response"
PING_RESPONSE_TYPE = "io.nats.micro.v1.ping_response"
STATS_RESPONSE_TYPE = "io.nats.micro.v1.stats_response"
# Headers
ERROR_HEADER = "Nats-Service-Error"
ERROR_CODE_HEADER = "Nats-Service-Error-Code"Install with Tessl CLI
npx tessl i tessl/pypi-nats-py