The networks core of the Minos Framework providing networking components for reactive microservices
—
Service registration and discovery mechanisms for microservice coordination. Supports multiple discovery backends and automatic service lifecycle management.
Abstract client interface for service discovery operations.
class DiscoveryClient:
def __init__(self, host: str, port: int): ...
route: str # http://host:port
@classmethod
def _from_config(cls, config: Config, **kwargs) -> DiscoveryClient: ...
async def subscribe(self, host: str, port: int, name: str, endpoints: list[dict[str, str]], *args, **kwargs) -> None: ... # Abstract
async def unsubscribe(self, name: str, *args, **kwargs) -> None: ... # Abstract
class InMemoryDiscoveryClient(DiscoveryClient):
is_subscribed: bool
async def subscribe(self, *args, **kwargs) -> None: ...
async def unsubscribe(self, *args, **kwargs) -> None: ...Usage Examples:
from minos.networks import InMemoryDiscoveryClient
# Create discovery client
client = InMemoryDiscoveryClient(host="localhost", port=8500)
# Register service
await client.subscribe(
host="localhost",
port=8080,
name="user-service",
endpoints=[
{"path": "/users", "method": "GET"},
{"path": "/users", "method": "POST"}
]
)
print(f"Service registered: {client.is_subscribed}")
# Unregister service
await client.unsubscribe("user-service")Manages service registration with discovery service and handles lifecycle.
class DiscoveryConnector:
def __init__(self, client: DiscoveryClient, name: str, endpoints: list[dict[str, Any]], host: str, port: Optional[int] = None): ...
@classmethod
def _from_config(cls, config: Config, **kwargs) -> DiscoveryConnector: ...
@classmethod
def _client_from_config(cls, config: Config) -> DiscoveryClient: ...
@classmethod
def _client_cls_from_config(cls, discovery_config: dict[str, Any]) -> type[DiscoveryClient]: ...
@classmethod
def _port_from_config(cls, config: Config) -> Optional[int]: ...
@classmethod
def _endpoints_from_config(cls, config: Config) -> list[dict[str, Any]]: ...
async def subscribe(self) -> None: ...
async def unsubscribe(self) -> None: ...Usage Examples:
from minos.networks import DiscoveryConnector, InMemoryDiscoveryClient
from minos.common import Config
# Create client
client = InMemoryDiscoveryClient(host="consul", port=8500)
# Create connector
connector = DiscoveryConnector(
client=client,
name="payment-service",
endpoints=[
{"path": "/payments", "method": "POST"},
{"path": "/payments/{id}", "method": "GET"}
],
host="localhost",
port=8080
)
# Register service
await connector.subscribe()
# Service is now discoverable
# Unregister when shutting down
await connector.unsubscribe()
# Using configuration
config = Config("config.yml")
connector = DiscoveryConnector._from_config(config)from minos.networks import DiscoveryConnector, HttpPort, enroute
from minos.common import Config
class PaymentService:
@enroute.rest.command("/payments", method="POST")
async def create_payment(self, request) -> Response:
return Response({"payment_id": "123", "status": "created"})
@enroute.rest.query("/payments/{payment_id}", method="GET")
async def get_payment(self, request) -> Response:
return Response({"payment_id": "123", "amount": 100.0})
@enroute.broker.event("payment.processed")
async def handle_payment_processed(self, request) -> Response:
return Response({"processed": True})
# Configuration-based setup
config = Config({
"service": {
"name": "payment-service",
"host": "0.0.0.0",
"port": 8080
},
"discovery": {
"client": "InMemoryDiscoveryClient",
"host": "consul",
"port": 8500
}
})
# Create HTTP service
http_port = HttpPort._from_config(config)
# Create discovery connector
discovery = DiscoveryConnector._from_config(config)
# Start services
await http_port.start()
await discovery.subscribe()
print(f"Payment service running on {config.service.host}:{config.service.port}")
print("Service registered with discovery")
# Service runs and handles requests...
# Shutdown
await discovery.unsubscribe()
await http_port.stop()import aiohttp
from minos.networks import DiscoveryClient
class ConsulDiscoveryClient(DiscoveryClient):
"""Consul-based discovery client implementation"""
async def subscribe(self, host: str, port: int, name: str, endpoints: list[dict], *args, **kwargs) -> None:
service_definition = {
"ID": f"{name}-{host}-{port}",
"Name": name,
"Address": host,
"Port": port,
"Tags": [endpoint["path"] for endpoint in endpoints],
"Check": {
"HTTP": f"http://{host}:{port}/health",
"Interval": "10s"
}
}
async with aiohttp.ClientSession() as session:
async with session.put(
f"{self.route}/v1/agent/service/register",
json=service_definition
) as response:
if response.status != 200:
raise Exception(f"Failed to register service: {response.status}")
async def unsubscribe(self, name: str, *args, **kwargs) -> None:
service_id = f"{name}-{self.host}-{self.port}"
async with aiohttp.ClientSession() as session:
async with session.put(
f"{self.route}/v1/agent/service/deregister/{service_id}"
) as response:
if response.status != 200:
raise Exception(f"Failed to unregister service: {response.status}")
# Usage
consul_client = ConsulDiscoveryClient(host="consul", port=8500)
connector = DiscoveryConnector(
client=consul_client,
name="user-service",
endpoints=[{"path": "/users", "method": "GET"}],
host="localhost",
port=8080
)from minos.networks import enroute, SystemService
class HealthCheckService:
def __init__(self, dependencies: list[str]):
self.dependencies = dependencies
@enroute.rest.query("/health", method="GET")
async def health_check(self, request) -> Response:
health_status = {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"dependencies": {}
}
# Check each dependency
for dependency in self.dependencies:
try:
status = await self.check_dependency(dependency)
health_status["dependencies"][dependency] = status
except Exception as e:
health_status["dependencies"][dependency] = {
"status": "unhealthy",
"error": str(e)
}
health_status["status"] = "degraded"
# Overall health based on dependencies
if any(dep["status"] == "unhealthy" for dep in health_status["dependencies"].values()):
health_status["status"] = "unhealthy"
status_code = 200 if health_status["status"] in ["healthy", "degraded"] else 503
return Response(health_status, status=status_code)
async def check_dependency(self, dependency: str) -> dict:
# Implement dependency-specific health checks
if dependency == "database":
# Check database connection
return {"status": "healthy", "response_time": "5ms"}
elif dependency == "redis":
# Check Redis connection
return {"status": "healthy", "response_time": "2ms"}
else:
return {"status": "unknown"}
# Discovery with health checks
discovery_connector = DiscoveryConnector(
client=consul_client,
name="user-service",
endpoints=[
{"path": "/users", "method": "GET"},
{"path": "/health", "method": "GET"} # Health check endpoint
],
host="localhost",
port=8080
)class DynamicDiscoveryService:
def __init__(self, config: Config):
self.config = config
self.connector = None
self.registered_endpoints = []
async def register_service(self, service_name: str, endpoints: list[dict]):
"""Dynamically register a service with discovery"""
if self.connector:
await self.connector.unsubscribe()
self.connector = DiscoveryConnector(
client=self._create_client(),
name=service_name,
endpoints=endpoints,
host=self.config.service.host,
port=self.config.service.port
)
await self.connector.subscribe()
self.registered_endpoints = endpoints
print(f"Service {service_name} registered with {len(endpoints)} endpoints")
async def add_endpoint(self, endpoint: dict):
"""Add a new endpoint to the registered service"""
self.registered_endpoints.append(endpoint)
# Re-register with updated endpoints
if self.connector:
service_name = self.connector.name
await self.register_service(service_name, self.registered_endpoints)
async def remove_endpoint(self, path: str, method: str):
"""Remove an endpoint from the registered service"""
self.registered_endpoints = [
ep for ep in self.registered_endpoints
if not (ep["path"] == path and ep["method"] == method)
]
# Re-register with updated endpoints
if self.connector:
service_name = self.connector.name
await self.register_service(service_name, self.registered_endpoints)
async def unregister_service(self):
"""Unregister the service"""
if self.connector:
await self.connector.unsubscribe()
self.connector = None
self.registered_endpoints = []
def _create_client(self) -> DiscoveryClient:
return DiscoveryClient._from_config(self.config)
# Usage
dynamic_discovery = DynamicDiscoveryService(config)
# Initial registration
await dynamic_discovery.register_service("api-service", [
{"path": "/api/v1/users", "method": "GET"}
])
# Add endpoints dynamically
await dynamic_discovery.add_endpoint({"path": "/api/v1/users", "method": "POST"})
await dynamic_discovery.add_endpoint({"path": "/api/v1/posts", "method": "GET"})
# Remove endpoints dynamically
await dynamic_discovery.remove_endpoint("/api/v1/posts", "GET")
# Cleanup
await dynamic_discovery.unregister_service()Install with Tessl CLI
npx tessl i tessl/pypi-minos-microservice-networks