The networks core of the Minos Framework providing networking components for reactive microservices
npx @tessl/cli install tessl/pypi-minos-microservice-networks@0.7.0A comprehensive Python library providing networking core functionality for the Minos Framework. It implements components for building reactive microservices using Event Sourcing, CQRS, and message-driven architecture patterns.
pip install minos-microservice-networksimport minos.networksCommon import patterns:
from minos.networks import (
enroute,
BrokerClient,
BrokerHandler,
HttpPort,
Request,
Response
)For decorators:
from minos.networks import enroute
# Use decorators
@enroute.broker.command("user.create")
@enroute.rest.query("/users", method="GET")
@enroute.periodic.event("0 */5 * * * *")from minos.networks import enroute, Request, Response
class UserService:
@enroute.broker.command("user.create")
async def create_user(self, request: Request) -> Response:
user_data = await request.content()
# Process user creation logic
return Response({"id": "123", "status": "created"})
@enroute.rest.query("/users/{user_id}", method="GET")
async def get_user(self, request: Request) -> Response:
params = await request.params()
user_id = params["user_id"]
# Fetch user logic
return Response({"id": user_id, "name": "John Doe"})
@enroute.periodic.event("0 */5 * * * *") # Every 5 minutes
async def cleanup_task(self, request: Request) -> Response:
# Periodic cleanup logic
return Response({"status": "cleanup_completed"})from minos.networks import BrokerClient, BrokerPublisher, BrokerSubscriber
from minos.common import Config
# Create configuration
config = Config("config.yml")
# Create broker client
client = BrokerClient.from_config(config, topic="my.topic")
# Send a message
message = BrokerMessageV1("user.created", payload=payload)
await client.send(message)
# Receive messages
async for message in client.receive_many(count=10):
print(f"Received: {message.content}")from minos.networks import HttpPort
from minos.common import Config
# Create HTTP port from configuration
config = Config("config.yml")
http_port = HttpPort.from_config(config)
# Start the HTTP server
await http_port.start()The Minos Networks library implements a reactive microservices architecture with these key components:
enroute system provides unified decorators for broker, HTTP, and periodic handlersThe library follows reactive manifesto principles: responsive, resilient, elastic, and message-driven.
The enroute decorator system provides unified handler registration for different transport types. Supports broker commands/queries/events, REST endpoints, and periodic tasks with automatic routing and middleware integration.
class Enroute:
broker: BrokerEnroute
rest: RestEnroute
periodic: PeriodicEnroute
class BrokerEnroute:
command: type[BrokerCommandEnrouteDecorator]
query: type[BrokerQueryEnrouteDecorator]
event: type[BrokerEventEnrouteDecorator]
class RestEnroute:
command: type[RestCommandEnrouteDecorator]
query: type[RestQueryEnrouteDecorator]
class PeriodicEnroute:
event: type[PeriodicEventEnrouteDecorator]
enroute: EnrouteComprehensive message broker implementation supporting publish/subscribe patterns, message queuing, filtering, validation, and multiple delivery strategies. Includes both in-memory and database-backed implementations.
class BrokerClient:
def __init__(self, topic: str, publisher: BrokerPublisher, subscriber: BrokerSubscriber): ...
async def send(self, message: BrokerMessage) -> None: ...
async def receive(self) -> BrokerMessage: ...
async def receive_many(self, count: int, timeout: float = 60) -> AsyncIterator[BrokerMessage]: ...
class BrokerMessage:
topic: str
identifier: UUID
content: Any
headers: dict[str, str]
class BrokerHandler:
def __init__(self, dispatcher: BrokerDispatcher, subscriber: BrokerSubscriber, concurrency: int = 5): ...
async def run(self) -> NoReturn: ...HTTP server functionality with REST support, request/response abstractions, routing, and connector patterns. Provides foundation for building HTTP APIs and web services.
class HttpPort:
connector: HttpConnector
async def start(self) -> None: ...
async def stop(self) -> None: ...
class HttpRequest:
user: Optional[UUID]
headers: dict[str, str]
content_type: str
async def url_params(self, type_: Optional[type] = None) -> Any: ...
async def query_params(self, type_: Optional[type] = None) -> Any: ...
class HttpResponse:
def __init__(self, content_type: str = "application/json"): ...
status: int
async def content(self) -> Any: ...Core request/response abstractions used across all transport types. Provides unified interfaces for handling different types of requests with content, parameters, and user context.
class Request:
user: Optional[UUID]
has_content: bool
has_params: bool
async def content(self) -> Any: ...
async def params(self) -> dict[str, Any]: ...
class Response:
def __init__(self, data: Any = None, status: int = 200): ...
has_content: bool
status: int
async def content(self) -> Any: ...
class InMemoryRequest:
def __init__(self, content: Any = None, params: dict[str, Any] = None, user: Optional[UUID] = None): ...Cron-based periodic task scheduling with async execution, lifecycle management, and integration with the decorator system. Supports complex scheduling patterns and error handling.
class CronTab:
def __init__(self, pattern: Union[str, CrontTabImpl]): ...
repetitions: Union[int, float]
async def sleep_until_next(self) -> None: ...
get_delay_until_next(self, now: Optional[datetime] = None) -> float: ...
class PeriodicTask:
def __init__(self, crontab: Union[str, CronTab], fn: Callable): ...
crontab: CronTab
started: bool
running: bool
async def start(self) -> None: ...
async def stop(self, timeout: Optional[float] = None) -> None: ...
class PeriodicPort:
scheduler: PeriodicTaskSchedulerService registration and discovery mechanisms for microservice coordination. Supports multiple discovery backends and automatic service lifecycle management.
class DiscoveryClient:
def __init__(self, host: str, port: int): ...
route: str
async def subscribe(self, host: str, port: int, name: str, endpoints: list[dict]) -> None: ...
async def unsubscribe(self, name: str) -> None: ...
class DiscoveryConnector:
def __init__(self, client: DiscoveryClient, name: str, endpoints: list[dict], host: str, port: Optional[int] = None): ...
async def subscribe(self) -> None: ...
async def unsubscribe(self) -> None: ...Built-in services for generating OpenAPI and AsyncAPI specifications from decorated handlers. Provides automated API documentation and specification generation.
class OpenAPIService:
def __init__(self, config: Config): ...
spec: dict
async def generate_specification(self, request: Request) -> Response: ...
class AsyncAPIService:
def __init__(self, config: Config): ...
spec: dict
async def generate_specification(self, request: Request) -> Response: ...System health endpoints, network utilities, and various helper functions for microservice operations.
class SystemService:
async def check_health(self, request: Request) -> Response: ...
def get_host_ip() -> str: ...
def get_host_name() -> str: ...
def get_ip(name: str) -> str: ...
async def consume_queue(queue, max_count: int) -> None: ...Abstract interfaces and concrete implementations for handling route registration and management across different transport types. Routers aggregate decorated handlers and provide structured access to routing information.
class Router:
routes: dict[Any, Callable]
def build_routes(self) -> None: ...
def get_routes(self) -> dict[Any, Callable]: ...
class BrokerRouter(Router): ...
class HttpRouter(Router): ...
class RestHttpRouter(HttpRouter): ...
class PeriodicRouter(Router): ...from typing import Callable, Union, Optional, Any, Awaitable
from uuid import UUID
from datetime import datetime
Handler = Callable[[Request], Union[Optional[Response], Awaitable[Optional[Response]]]]
Checker = Callable[[Request], Union[Optional[bool], Awaitable[Optional[bool]]]]
class BrokerMessageV1Status(Enum):
SUCCESS = 200
ERROR = 400
SYSTEM_ERROR = 500
UNKNOWN = 600
class BrokerMessageV1Strategy(Enum):
UNICAST = "unicast"
MULTICAST = "multicast"
class EnrouteDecoratorKind(Enum):
Command = auto()
Query = auto()
Event = auto()from contextvars import ContextVar
REQUEST_USER_CONTEXT_VAR: ContextVar[Optional[UUID]]
REQUEST_HEADERS_CONTEXT_VAR: ContextVar[Optional[dict[str, str]]]
REQUEST_REPLY_TOPIC_CONTEXT_VAR: ContextVar[Optional[str]]Usage Examples:
from minos.networks import REQUEST_USER_CONTEXT_VAR, REQUEST_HEADERS_CONTEXT_VAR
# Context variables are automatically managed by the framework
# Access current request user in any handler
current_user = REQUEST_USER_CONTEXT_VAR.get()
request_headers = REQUEST_HEADERS_CONTEXT_VAR.get()class MinosNetworkException(MinosException): ...
class MinosDiscoveryConnectorException(MinosNetworkException): ...
class MinosInvalidDiscoveryClient(MinosNetworkException): ...
class MinosHandlerException(MinosNetworkException): ...
class MinosActionNotFoundException(MinosHandlerException): ...
class MinosHandlerNotFoundEnoughEntriesException(MinosHandlerException): ...
class NotSatisfiedCheckerException(MinosHandlerException): ...
class MinosEnrouteDecoratorException(MinosNetworkException): ...
class MinosMultipleEnrouteDecoratorKindsException(MinosEnrouteDecoratorException): ...
class MinosRedefinedEnrouteDecoratorException(MinosEnrouteDecoratorException): ...
class RequestException(MinosNetworkException): ...
class NotHasContentException(RequestException): ...
class NotHasParamsException(RequestException): ...
class ResponseException(MinosException):
def __init__(self, status: int = 400): ...
status: int