CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-minos-microservice-networks

The networks core of the Minos Framework providing networking components for reactive microservices

Pending
Overview
Eval results
Files

discovery.mddocs/

Service Discovery

Service registration and discovery mechanisms for microservice coordination. Supports multiple discovery backends and automatic service lifecycle management.

Capabilities

Discovery Client Interface

Abstract client interface for service discovery operations.

class DiscoveryClient:
    def __init__(self, host: str, port: int): ...
    route: str  # http://host:port
    @classmethod
    def _from_config(cls, config: Config, **kwargs) -> DiscoveryClient: ...
    async def subscribe(self, host: str, port: int, name: str, endpoints: list[dict[str, str]], *args, **kwargs) -> None: ...  # Abstract
    async def unsubscribe(self, name: str, *args, **kwargs) -> None: ...  # Abstract

class InMemoryDiscoveryClient(DiscoveryClient):
    is_subscribed: bool
    async def subscribe(self, *args, **kwargs) -> None: ...
    async def unsubscribe(self, *args, **kwargs) -> None: ...

Usage Examples:

from minos.networks import InMemoryDiscoveryClient

# Create discovery client
client = InMemoryDiscoveryClient(host="localhost", port=8500)

# Register service
await client.subscribe(
    host="localhost",
    port=8080,
    name="user-service",
    endpoints=[
        {"path": "/users", "method": "GET"},
        {"path": "/users", "method": "POST"}
    ]
)

print(f"Service registered: {client.is_subscribed}")

# Unregister service
await client.unsubscribe("user-service")

Discovery Connector

Manages service registration with discovery service and handles lifecycle.

class DiscoveryConnector:
    def __init__(self, client: DiscoveryClient, name: str, endpoints: list[dict[str, Any]], host: str, port: Optional[int] = None): ...
    @classmethod
    def _from_config(cls, config: Config, **kwargs) -> DiscoveryConnector: ...
    @classmethod
    def _client_from_config(cls, config: Config) -> DiscoveryClient: ...
    @classmethod
    def _client_cls_from_config(cls, discovery_config: dict[str, Any]) -> type[DiscoveryClient]: ...
    @classmethod
    def _port_from_config(cls, config: Config) -> Optional[int]: ...
    @classmethod
    def _endpoints_from_config(cls, config: Config) -> list[dict[str, Any]]: ...
    async def subscribe(self) -> None: ...
    async def unsubscribe(self) -> None: ...

Usage Examples:

from minos.networks import DiscoveryConnector, InMemoryDiscoveryClient
from minos.common import Config

# Create client
client = InMemoryDiscoveryClient(host="consul", port=8500)

# Create connector
connector = DiscoveryConnector(
    client=client,
    name="payment-service",
    endpoints=[
        {"path": "/payments", "method": "POST"},
        {"path": "/payments/{id}", "method": "GET"}
    ],
    host="localhost",
    port=8080
)

# Register service
await connector.subscribe()

# Service is now discoverable
# Unregister when shutting down
await connector.unsubscribe()

# Using configuration
config = Config("config.yml")
connector = DiscoveryConnector._from_config(config)

Advanced Usage

Complete Service Discovery Setup

from minos.networks import DiscoveryConnector, HttpPort, enroute
from minos.common import Config

class PaymentService:
    @enroute.rest.command("/payments", method="POST")
    async def create_payment(self, request) -> Response:
        return Response({"payment_id": "123", "status": "created"})
    
    @enroute.rest.query("/payments/{payment_id}", method="GET")
    async def get_payment(self, request) -> Response:
        return Response({"payment_id": "123", "amount": 100.0})
    
    @enroute.broker.event("payment.processed")
    async def handle_payment_processed(self, request) -> Response:
        return Response({"processed": True})

# Configuration-based setup
config = Config({
    "service": {
        "name": "payment-service",
        "host": "0.0.0.0",
        "port": 8080
    },
    "discovery": {
        "client": "InMemoryDiscoveryClient",
        "host": "consul",
        "port": 8500
    }
})

# Create HTTP service
http_port = HttpPort._from_config(config)

# Create discovery connector
discovery = DiscoveryConnector._from_config(config)

# Start services
await http_port.start()
await discovery.subscribe()

print(f"Payment service running on {config.service.host}:{config.service.port}")
print("Service registered with discovery")

# Service runs and handles requests...

# Shutdown
await discovery.unsubscribe()
await http_port.stop()

Custom Discovery Client Implementation

import aiohttp
from minos.networks import DiscoveryClient

class ConsulDiscoveryClient(DiscoveryClient):
    """Consul-based discovery client implementation"""
    
    async def subscribe(self, host: str, port: int, name: str, endpoints: list[dict], *args, **kwargs) -> None:
        service_definition = {
            "ID": f"{name}-{host}-{port}",
            "Name": name,
            "Address": host,
            "Port": port,
            "Tags": [endpoint["path"] for endpoint in endpoints],
            "Check": {
                "HTTP": f"http://{host}:{port}/health",
                "Interval": "10s"
            }
        }
        
        async with aiohttp.ClientSession() as session:
            async with session.put(
                f"{self.route}/v1/agent/service/register",
                json=service_definition
            ) as response:
                if response.status != 200:
                    raise Exception(f"Failed to register service: {response.status}")
    
    async def unsubscribe(self, name: str, *args, **kwargs) -> None:
        service_id = f"{name}-{self.host}-{self.port}"
        
        async with aiohttp.ClientSession() as session:
            async with session.put(
                f"{self.route}/v1/agent/service/deregister/{service_id}"
            ) as response:
                if response.status != 200:
                    raise Exception(f"Failed to unregister service: {response.status}")

# Usage
consul_client = ConsulDiscoveryClient(host="consul", port=8500)
connector = DiscoveryConnector(
    client=consul_client,
    name="user-service",
    endpoints=[{"path": "/users", "method": "GET"}],
    host="localhost",
    port=8080
)

Service Health Checks

from minos.networks import enroute, SystemService

class HealthCheckService:
    def __init__(self, dependencies: list[str]):
        self.dependencies = dependencies
    
    @enroute.rest.query("/health", method="GET")
    async def health_check(self, request) -> Response:
        health_status = {
            "status": "healthy",
            "timestamp": datetime.utcnow().isoformat(),
            "dependencies": {}
        }
        
        # Check each dependency
        for dependency in self.dependencies:
            try:
                status = await self.check_dependency(dependency)
                health_status["dependencies"][dependency] = status
            except Exception as e:
                health_status["dependencies"][dependency] = {
                    "status": "unhealthy",
                    "error": str(e)
                }
                health_status["status"] = "degraded"
        
        # Overall health based on dependencies
        if any(dep["status"] == "unhealthy" for dep in health_status["dependencies"].values()):
            health_status["status"] = "unhealthy"
        
        status_code = 200 if health_status["status"] in ["healthy", "degraded"] else 503
        return Response(health_status, status=status_code)
    
    async def check_dependency(self, dependency: str) -> dict:
        # Implement dependency-specific health checks
        if dependency == "database":
            # Check database connection
            return {"status": "healthy", "response_time": "5ms"}
        elif dependency == "redis":
            # Check Redis connection
            return {"status": "healthy", "response_time": "2ms"}
        else:
            return {"status": "unknown"}

# Discovery with health checks
discovery_connector = DiscoveryConnector(
    client=consul_client,
    name="user-service",
    endpoints=[
        {"path": "/users", "method": "GET"},
        {"path": "/health", "method": "GET"}  # Health check endpoint
    ],
    host="localhost",
    port=8080
)

Dynamic Service Configuration

class DynamicDiscoveryService:
    def __init__(self, config: Config):
        self.config = config
        self.connector = None
        self.registered_endpoints = []
    
    async def register_service(self, service_name: str, endpoints: list[dict]):
        """Dynamically register a service with discovery"""
        if self.connector:
            await self.connector.unsubscribe()
        
        self.connector = DiscoveryConnector(
            client=self._create_client(),
            name=service_name,
            endpoints=endpoints,
            host=self.config.service.host,
            port=self.config.service.port
        )
        
        await self.connector.subscribe()
        self.registered_endpoints = endpoints
        print(f"Service {service_name} registered with {len(endpoints)} endpoints")
    
    async def add_endpoint(self, endpoint: dict):
        """Add a new endpoint to the registered service"""
        self.registered_endpoints.append(endpoint)
        
        # Re-register with updated endpoints
        if self.connector:
            service_name = self.connector.name
            await self.register_service(service_name, self.registered_endpoints)
    
    async def remove_endpoint(self, path: str, method: str):
        """Remove an endpoint from the registered service"""
        self.registered_endpoints = [
            ep for ep in self.registered_endpoints 
            if not (ep["path"] == path and ep["method"] == method)
        ]
        
        # Re-register with updated endpoints
        if self.connector:
            service_name = self.connector.name
            await self.register_service(service_name, self.registered_endpoints)
    
    async def unregister_service(self):
        """Unregister the service"""
        if self.connector:
            await self.connector.unsubscribe()
            self.connector = None
            self.registered_endpoints = []
    
    def _create_client(self) -> DiscoveryClient:
        return DiscoveryClient._from_config(self.config)

# Usage
dynamic_discovery = DynamicDiscoveryService(config)

# Initial registration
await dynamic_discovery.register_service("api-service", [
    {"path": "/api/v1/users", "method": "GET"}
])

# Add endpoints dynamically
await dynamic_discovery.add_endpoint({"path": "/api/v1/users", "method": "POST"})
await dynamic_discovery.add_endpoint({"path": "/api/v1/posts", "method": "GET"})

# Remove endpoints dynamically
await dynamic_discovery.remove_endpoint("/api/v1/posts", "GET")

# Cleanup
await dynamic_discovery.unregister_service()

Install with Tessl CLI

npx tessl i tessl/pypi-minos-microservice-networks

docs

brokers.md

decorators.md

discovery.md

http.md

index.md

requests.md

routers.md

scheduling.md

specs.md

system-utils.md

tile.json