CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-nats-py

An asyncio Python client for the NATS messaging system with JetStream, microservices, and key-value store support

Pending
Overview
Eval results
Files

microservices.mddocs/

Microservices Framework

Service discovery, request routing, monitoring, and health checks with built-in load balancing, error handling, and comprehensive service information for building distributed microservice architectures.

Capabilities

Service Creation and Management

Create and manage microservices with automatic discovery and monitoring.

async def add_service(
    nc: NATS,
    config: ServiceConfig = None,
    **kwargs
) -> Service:
    """
    Create and start a microservice.
    
    Parameters:
    - nc: NATS connection
    - config: Complete service configuration
    - **kwargs: Individual configuration parameters
    
    Returns:
    Running service instance
    """

class Service:
    async def start(self) -> None:
        """Start the service and begin handling requests."""
    
    async def stop(self) -> None:
        """Stop the service gracefully."""
    
    async def add_endpoint(self, config: EndpointConfig) -> Endpoint:
        """
        Add new endpoint to service.
        
        Parameters:
        - config: Endpoint configuration
        
        Returns:
        Endpoint instance
        """
    
    async def reset(self) -> None:
        """Reset service statistics."""

Usage Examples

import asyncio
import nats
from nats.micro import add_service, ServiceConfig, EndpointConfig

async def main():
    nc = await nats.connect()
    
    # Create service configuration
    config = ServiceConfig(
        name="user-service",
        version="1.2.0",
        description="User management service",
        metadata={
            "team": "backend",
            "repository": "https://github.com/company/user-service"
        }
    )
    
    # Create and start service
    service = await add_service(nc, config=config)
    await service.start()
    
    # Add endpoints dynamically
    await service.add_endpoint(EndpointConfig(
        name="create-user",
        subject="users.create",
        handler=create_user_handler
    ))
    
    print(f"Service {config.name} started")
    
    # Keep service running
    try:
        await asyncio.sleep(3600)  # Run for 1 hour
    finally:
        await service.stop()

Request Handling

Handle incoming service requests with automatic routing and error handling.

class Request:
    """Service request wrapper."""
    subject: str
    headers: Dict[str, str]
    data: bytes
    
    async def respond(
        self,
        data: bytes = b"",
        headers: Dict[str, str] = None
    ) -> None:
        """
        Send successful response.
        
        Parameters:
        - data: Response data
        - headers: Response headers
        """
    
    async def respond_error(
        self,
        code: str,
        description: str,
        data: bytes = b"",
        headers: Dict[str, str] = None
    ) -> None:
        """
        Send error response.
        
        Parameters:
        - code: Error code
        - description: Error description
        - data: Error details
        - headers: Response headers
        """

# Handler type
Handler = Callable[[Request], Awaitable[None]]

class ServiceError(Exception):
    """Service error with code and description."""
    def __init__(self, code: str, description: str):
        self.code = code
        self.description = description
        super().__init__(f"{code}: {description}")

Usage Examples

import json
from nats.micro import Request, ServiceError

async def create_user_handler(request: Request):
    try:
        # Parse request data
        user_data = json.loads(request.data.decode())
        
        # Validate input
        if not user_data.get("email"):
            await request.respond_error(
                code="INVALID_INPUT",
                description="Email is required",
                data=b'{"field": "email"}'
            )
            return
        
        # Create user (business logic)
        user = await create_user(user_data)
        
        # Send success response
        response_data = json.dumps({
            "id": user.id,
            "email": user.email,
            "created_at": user.created_at.isoformat()
        }).encode()
        
        await request.respond(
            data=response_data,
            headers={"Content-Type": "application/json"}
        )
        
    except ValidationError as e:
        await request.respond_error(
            code="VALIDATION_ERROR",
            description=str(e)
        )
    except Exception as e:
        await request.respond_error(
            code="INTERNAL_ERROR",
            description="Internal server error"
        )

async def get_user_handler(request: Request):
    # Extract user ID from subject or headers
    user_id = request.headers.get("User-ID")
    if not user_id:
        await request.respond_error(
            code="MISSING_USER_ID",
            description="User-ID header required"
        )
        return
    
    try:
        user = await get_user_by_id(user_id)
        response_data = json.dumps(user.to_dict()).encode()
        await request.respond(data=response_data)
    except UserNotFound:
        await request.respond_error(
            code="USER_NOT_FOUND",
            description=f"User {user_id} not found"
        )

Endpoint Configuration

Configure service endpoints with subjects, handlers, and metadata.

@dataclass
class EndpointConfig:
    """Endpoint configuration."""
    name: str
    subject: str
    handler: Handler
    metadata: Dict[str, str] = None
    queue_group: str = "q"

@dataclass
class GroupConfig:
    """Endpoint group configuration."""
    name: str
    queue_group: str = "q"

class Group:
    """Endpoint group for organizing related endpoints."""
    async def add_endpoint(self, config: EndpointConfig) -> Endpoint:
        """Add endpoint to group."""

class Endpoint:
    """Individual service endpoint."""
    pass

Usage Examples

# Create endpoints with different patterns
endpoints = [
    EndpointConfig(
        name="create-user",
        subject="users.create",
        handler=create_user_handler,
        metadata={"description": "Create new user account"}
    ),
    EndpointConfig(
        name="get-user",
        subject="users.get",
        handler=get_user_handler,
        metadata={"description": "Retrieve user by ID"}
    ),
    EndpointConfig(
        name="update-user",
        subject="users.update",
        handler=update_user_handler,
        metadata={"description": "Update user information"}
    ),
    EndpointConfig(
        name="delete-user",
        subject="users.delete",
        handler=delete_user_handler,
        metadata={"description": "Delete user account"}
    )
]

# Add all endpoints to service
for endpoint_config in endpoints:
    await service.add_endpoint(endpoint_config)

# Group related endpoints
group_config = GroupConfig(
    name="user-management",
    queue_group="user-workers"
)

group = await service.add_group(group_config)
await group.add_endpoint(EndpointConfig(
    name="bulk-import",
    subject="users.bulk.import",
    handler=bulk_import_handler
))

Service Discovery and Monitoring

Built-in service discovery with health checks and statistics.

@dataclass
class ServiceInfo:
    """Service information for discovery."""
    name: str
    id: str
    version: str
    description: str
    endpoints: List[EndpointInfo]
    metadata: Dict[str, str]

@dataclass
class ServiceStats:
    """Service statistics."""
    name: str
    id: str
    started: datetime
    endpoints: List[EndpointStats]

@dataclass
class ServicePing:
    """Service ping response."""
    name: str
    id: str
    version: str
    metadata: Dict[str, str]

@dataclass
class EndpointInfo:
    """Endpoint information."""
    name: str
    subject: str
    metadata: Dict[str, str]

@dataclass
class EndpointStats:
    """Endpoint statistics."""
    name: str
    subject: str
    num_requests: int
    num_errors: int
    last_error: str
    processing_time: float
    average_processing_time: float

Usage Examples

# Service automatically responds to discovery requests
# These are handled internally by the framework

# Get service information (as a client)
service_info = await nc.request("$SRV.INFO", timeout=5.0)
info = json.loads(service_info.data.decode())
print(f"Available services: {[s['name'] for s in info]}")

# Ping specific service
ping_response = await nc.request("$SRV.PING.user-service", timeout=2.0)
ping = json.loads(ping_response.data.decode())
print(f"Service {ping['name']} version {ping['version']} is alive")

# Get service statistics
stats_response = await nc.request("$SRV.STATS.user-service", timeout=2.0)
stats = json.loads(stats_response.data.decode())
print(f"Total requests: {sum(e['num_requests'] for e in stats['endpoints'])}")

Load Balancing and Scaling

Automatic load balancing across multiple service instances.

# Constants for service framework
DEFAULT_QUEUE_GROUP = "q"
DEFAULT_PREFIX = "$SRV"

# Service control verbs
class ServiceVerb:
    PING = "PING"
    STATS = "STATS"
    INFO = "INFO"

Usage Examples

# Multiple service instances automatically load balance
async def start_service_instance(instance_id: str):
    config = ServiceConfig(
        name="user-service",
        version="1.2.0",
        metadata={"instance_id": instance_id}
    )
    
    service = await add_service(nc, config=config)
    await service.start()
    
    # Add same endpoints - they'll automatically load balance
    await service.add_endpoint(EndpointConfig(
        name="create-user",
        subject="users.create",
        handler=create_user_handler
    ))
    
    return service

# Start multiple instances
instances = await asyncio.gather(*[
    start_service_instance(f"instance-{i}") 
    for i in range(3)
])

print("Started 3 service instances with automatic load balancing")

Service Configuration

@dataclass
class ServiceConfig:
    """Complete service configuration."""
    name: str
    version: str = "0.0.1"
    description: str = ""
    metadata: Dict[str, str] = None
    queue_group: str = DEFAULT_QUEUE_GROUP
    
    # Control subjects configuration
    stats_handler: Handler = None
    info_handler: Handler = None
    ping_handler: Handler = None

Advanced Features

Custom Control Handlers

Override default service control behavior.

async def custom_stats_handler(request: Request):
    # Custom statistics response
    stats = {
        "service": "user-service",
        "uptime": time.time() - start_time,
        "custom_metrics": {
            "cache_hit_rate": 0.95,
            "db_connections": 10
        }
    }
    await request.respond(json.dumps(stats).encode())

# Use custom handler
config = ServiceConfig(
    name="user-service",
    version="1.2.0",
    stats_handler=custom_stats_handler
)

service = await add_service(nc, config=config)

Error Handling Patterns

Consistent error handling across endpoints.

# Common error codes
class ErrorCodes:
    INVALID_INPUT = "INVALID_INPUT"
    NOT_FOUND = "NOT_FOUND"
    UNAUTHORIZED = "UNAUTHORIZED"
    INTERNAL_ERROR = "INTERNAL_ERROR"
    RATE_LIMITED = "RATE_LIMITED"

# Error handling middleware
async def with_error_handling(handler: Handler) -> Handler:
    async def wrapper(request: Request):
        try:
            await handler(request)
        except ValidationError as e:
            await request.respond_error(
                ErrorCodes.INVALID_INPUT,
                str(e)
            )
        except NotFoundError as e:
            await request.respond_error(
                ErrorCodes.NOT_FOUND,
                str(e)
            )
        except Exception as e:
            logger.error(f"Unexpected error: {e}")
            await request.respond_error(
                ErrorCodes.INTERNAL_ERROR,
                "Internal server error"
            )
    return wrapper

# Apply to endpoints
await service.add_endpoint(EndpointConfig(
    name="create-user",
    subject="users.create",
    handler=with_error_handling(create_user_handler)
))

Constants

# Default configuration
DEFAULT_QUEUE_GROUP = "q"
DEFAULT_PREFIX = "$SRV"

# Response types
INFO_RESPONSE_TYPE = "io.nats.micro.v1.info_response"
PING_RESPONSE_TYPE = "io.nats.micro.v1.ping_response"
STATS_RESPONSE_TYPE = "io.nats.micro.v1.stats_response"

# Headers
ERROR_HEADER = "Nats-Service-Error"
ERROR_CODE_HEADER = "Nats-Service-Error-Code"

Install with Tessl CLI

npx tessl i tessl/pypi-nats-py

docs

core-client.md

error-handling.md

index.md

jetstream-management.md

jetstream.md

key-value-store.md

message-handling.md

microservices.md

object-store.md

tile.json