The networks core of the Minos Framework providing networking components for reactive microservices
—
The decorator system provides a unified interface for registering handler functions across different transport types (broker, HTTP, periodic). The enroute decorator system automatically handles routing, middleware integration, and request/response processing.
The enroute object provides access to all decorator types through a hierarchical interface.
enroute: Enroute
class Enroute:
broker: BrokerEnroute
rest: RestEnroute
periodic: PeriodicEnrouteDecorators for message broker handlers supporting commands, queries, and events.
class BrokerEnroute:
command: type[BrokerCommandEnrouteDecorator]
query: type[BrokerQueryEnrouteDecorator]
event: type[BrokerEventEnrouteDecorator]
class BrokerCommandEnrouteDecorator:
def __init__(self, topic: str, **kwargs): ...
topic: str
KIND: EnrouteDecoratorKind = EnrouteDecoratorKind.Command
class BrokerQueryEnrouteDecorator:
def __init__(self, topic: str, **kwargs): ...
topic: str
KIND: EnrouteDecoratorKind = EnrouteDecoratorKind.Query
class BrokerEventEnrouteDecorator:
def __init__(self, topic: str, **kwargs): ...
topic: str
KIND: EnrouteDecoratorKind = EnrouteDecoratorKind.EventUsage Examples:
@enroute.broker.command("user.create")
async def create_user(request: Request) -> Response:
user_data = await request.content()
return Response({"id": "123", "status": "created"})
@enroute.broker.query("user.get")
async def get_user(request: Request) -> Response:
user_id = await request.content()
return Response({"id": user_id, "name": "John Doe"})
@enroute.broker.event("user.created")
async def handle_user_created(request: Request) -> Response:
event_data = await request.content()
# Handle event processing
return Response({"processed": True})Decorators for HTTP REST endpoint handlers supporting commands and queries.
class RestEnroute:
command: type[RestCommandEnrouteDecorator]
query: type[RestQueryEnrouteDecorator]
class RestCommandEnrouteDecorator:
def __init__(self, path: str, method: str = "POST", **kwargs): ...
path: str
method: str
KIND: EnrouteDecoratorKind = EnrouteDecoratorKind.Command
class RestQueryEnrouteDecorator:
def __init__(self, path: str, method: str = "GET", **kwargs): ...
path: str
method: str
KIND: EnrouteDecoratorKind = EnrouteDecoratorKind.QueryUsage Examples:
@enroute.rest.command("/users", method="POST")
async def create_user_http(request: Request) -> Response:
user_data = await request.content()
return Response({"id": "123", "status": "created"})
@enroute.rest.query("/users/{user_id}", method="GET")
async def get_user_http(request: Request) -> Response:
params = await request.params()
user_id = params["user_id"]
return Response({"id": user_id, "name": "John Doe"})
@enroute.rest.command("/users/{user_id}", method="PUT")
async def update_user(request: Request) -> Response:
params = await request.params()
user_data = await request.content()
return Response({"id": params["user_id"], "status": "updated"})Decorators for scheduled task handlers using cron expressions.
class PeriodicEnroute:
event: type[PeriodicEventEnrouteDecorator]
class PeriodicEventEnrouteDecorator:
def __init__(self, crontab: Union[str, CronTab], **kwargs): ...
crontab: CronTab
KIND: EnrouteDecoratorKind = EnrouteDecoratorKind.EventUsage Examples:
@enroute.periodic.event("0 */5 * * * *") # Every 5 minutes
async def cleanup_task(request: Request) -> Response:
# Periodic cleanup logic
return Response({"status": "cleanup_completed"})
@enroute.periodic.event("0 0 * * *") # Daily at midnight
async def daily_report(request: Request) -> Response:
# Generate daily reports
return Response({"report": "generated"})
# Using CronTab object
from minos.networks import CronTab
cron = CronTab("0 30 9 * * MON-FRI") # Weekdays at 9:30 AM
@enroute.periodic.event(cron)
async def business_hours_task(request: Request) -> Response:
return Response({"status": "executed"})Classes for managing handler function metadata and execution wrappers.
class HandlerMeta:
def __init__(self, func: Handler, decorators: Optional[set[EnrouteDecorator]] = None, checkers: Optional[set[CheckerMeta]] = None): ...
func: Handler
decorators: set[EnrouteDecorator]
checkers: set[CheckerMeta]
wrapper: HandlerWrapper
check: type[CheckDecorator]
def add_decorator(self, decorator: EnrouteDecorator) -> None: ...
class HandlerWrapper:
"""Wrapper for handler functions with decorator metadata"""
class CheckerMeta:
def __init__(self, func: Checker, max_attempts: int, delay: float): ...
func: Checker
max_attempts: int
delay: float
wrapper: CheckerWrapper
@staticmethod
async def run_async(metas: set[CheckerMeta], *args, **kwargs) -> None: ...
@staticmethod
def run_sync(metas: set[CheckerMeta], *args, **kwargs) -> bool: ...
class CheckDecorator:
def __init__(self, handler_meta: HandlerMeta, max_attempts: int = 10, delay: Union[float, timedelta] = 0.1): ...
max_attempts: int
delay: float
def __call__(self, func: Union[CheckerWrapper, Checker]) -> CheckerWrapper: ...Utilities for discovering and organizing decorated handlers.
class EnrouteCollector:
def __init__(self, decorated: Any, config: Optional[Config] = None): ...
decorated: Any
config: Optional[Config]
def get_rest_command_query(self) -> dict[str, set[RestEnrouteDecorator]]: ...
def get_broker_command_query_event(self) -> dict[str, set[BrokerEnrouteDecorator]]: ...
def get_broker_command_query(self) -> dict[str, set[BrokerEnrouteDecorator]]: ...
def get_broker_event(self) -> dict[str, set[BrokerEnrouteDecorator]]: ...
def get_periodic_event(self) -> dict[str, set[PeriodicEventEnrouteDecorator]]: ...
def get_all(self) -> dict[str, set[EnrouteDecorator]]: ...
class EnrouteFactory:
def __init__(self, *classes: Union[str, type], middleware: Optional[Union[str, Callable, list]] = None): ...
classes: tuple
middleware: list
def get_rest_command_query(self, **kwargs) -> dict[RestEnrouteDecorator, Handler]: ...
def get_broker_command_query_event(self, **kwargs) -> dict[BrokerEnrouteDecorator, Handler]: ...
def get_broker_command_query(self, **kwargs) -> dict[BrokerEnrouteDecorator, Handler]: ...
def get_broker_event(self, **kwargs) -> dict[BrokerEnrouteDecorator, Handler]: ...
def get_periodic_event(self, **kwargs) -> dict[PeriodicEnrouteDecorator, Handler]: ...
def get_all(self, **kwargs) -> dict[EnrouteDecorator, Handler]: ...from typing import Callable, Union, Optional, Awaitable
from enum import Enum, auto
Handler = Callable[[Request], Union[Optional[Response], Awaitable[Optional[Response]]]]
Checker = Callable[[Request], Union[Optional[bool], Awaitable[Optional[bool]]]]
class EnrouteDecoratorKind(Enum):
Command = auto()
Query = auto()
Event = auto()
@property
def pre_fn_name(self) -> str:
"""Returns pre-execution function name mapping"""
@property
def post_fn_name(self) -> str:
"""Returns post-execution function name mapping"""Core abstractions for request processing and validation with metadata support.
class Handler:
"""Base class for handling decorated function execution"""
def __init__(self, fn: Callable, decorators: list[EnrouteDecorator] = None): ...
fn: Callable
decorators: list[EnrouteDecorator]
class HandlerMeta:
"""Metadata container for handler information"""
def __init__(self, fn: Callable): ...
class HandlerWrapper:
"""Wrapper that provides handler execution context"""
def __init__(self, handler: Handler): ...
async def __call__(self, request: Request) -> Response: ...
class Checker:
"""Base class for request validation logic"""
def __init__(self, fn: Callable): ...
fn: Callable
class CheckerMeta:
"""Metadata container for checker information"""
def __init__(self, fn: Callable): ...
class CheckerWrapper:
"""Wrapper that provides checker execution context"""
def __init__(self, checker: Checker): ...
async def __call__(self, request: Request) -> bool: ...
class CheckDecorator:
"""Decorator for adding validation checks to handlers"""
def __init__(self, max_attempts: int = 1, delay: float = 0.0): ...
max_attempts: int
delay: floatUsage Examples:
from minos.networks import Handler, Checker, CheckDecorator
# Custom handler with metadata
handler = Handler(my_function, decorators=[decorator1, decorator2])
wrapped = HandlerWrapper(handler)
# Custom checker for validation
checker = Checker(my_validation_function)
wrapped_checker = CheckerWrapper(checker)
# Using check decorator
@CheckDecorator(max_attempts=3, delay=0.5)
async def validate_input(request: Request) -> bool:
return True # validation logicclass UserService:
@enroute.broker.command("user.create")
async def create_user(self, request: Request) -> Response:
# Handler implementation
pass
# Add checker with retry logic
@create_user.check(max_attempts=3, delay=0.5)
async def validate_user_data(self, request: Request) -> bool:
user_data = await request.content()
return "email" in user_data and "name" in user_datafrom minos.networks import EnrouteFactory
# Create factory with middleware
factory = EnrouteFactory(
UserService,
OrderService,
middleware=["auth_middleware", "logging_middleware"]
)
# Get handlers with middleware applied
handlers = factory.get_all()class UserService:
@enroute.broker.command("user.create")
async def create_user(self, request: Request) -> Response:
return Response({"status": "created"})
@enroute.broker.query("user.get")
async def get_user(self, request: Request) -> Response:
return Response({"user": "data"})
@enroute.rest.command("/users", method="POST")
async def create_user_rest(self, request: Request) -> Response:
return Response({"status": "created"})
@enroute.rest.query("/users/{user_id}", method="GET")
async def get_user_rest(self, request: Request) -> Response:
return Response({"user": "data"})
@enroute.periodic.event("0 0 * * *")
async def daily_cleanup(self, request: Request) -> Response:
return Response({"status": "cleaned"})Install with Tessl CLI
npx tessl i tessl/pypi-minos-microservice-networks