or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

brokers.mddecorators.mddiscovery.mdhttp.mdindex.mdrequests.mdrouters.mdscheduling.mdspecs.mdsystem-utils.md
tile.json

tessl/pypi-minos-microservice-networks

The networks core of the Minos Framework providing networking components for reactive microservices

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/minos-microservice-networks@0.7.x

To install, run

npx @tessl/cli install tessl/pypi-minos-microservice-networks@0.7.0

index.mddocs/

Minos Microservice Networks

A comprehensive Python library providing networking core functionality for the Minos Framework. It implements components for building reactive microservices using Event Sourcing, CQRS, and message-driven architecture patterns.

Package Information

  • Package Name: minos-microservice-networks
  • Package Type: pypi
  • Language: Python 3.9+
  • Installation: pip install minos-microservice-networks
  • License: MIT

Core Imports

import minos.networks

Common import patterns:

from minos.networks import (
    enroute,
    BrokerClient,
    BrokerHandler,
    HttpPort,
    Request,
    Response
)

For decorators:

from minos.networks import enroute

# Use decorators
@enroute.broker.command("user.create")
@enroute.rest.query("/users", method="GET") 
@enroute.periodic.event("0 */5 * * * *")

Basic Usage

Creating a Simple Microservice with Decorators

from minos.networks import enroute, Request, Response

class UserService:
    @enroute.broker.command("user.create")
    async def create_user(self, request: Request) -> Response:
        user_data = await request.content()
        # Process user creation logic
        return Response({"id": "123", "status": "created"})
    
    @enroute.rest.query("/users/{user_id}", method="GET")
    async def get_user(self, request: Request) -> Response:
        params = await request.params()
        user_id = params["user_id"]
        # Fetch user logic
        return Response({"id": user_id, "name": "John Doe"})
    
    @enroute.periodic.event("0 */5 * * * *")  # Every 5 minutes
    async def cleanup_task(self, request: Request) -> Response:
        # Periodic cleanup logic
        return Response({"status": "cleanup_completed"})

Setting up Broker Communication

from minos.networks import BrokerClient, BrokerPublisher, BrokerSubscriber
from minos.common import Config

# Create configuration
config = Config("config.yml")

# Create broker client
client = BrokerClient.from_config(config, topic="my.topic")

# Send a message
message = BrokerMessageV1("user.created", payload=payload)
await client.send(message)

# Receive messages
async for message in client.receive_many(count=10):
    print(f"Received: {message.content}")

Setting up HTTP Services

from minos.networks import HttpPort
from minos.common import Config

# Create HTTP port from configuration
config = Config("config.yml")
http_port = HttpPort.from_config(config)

# Start the HTTP server
await http_port.start()

Architecture

The Minos Networks library implements a reactive microservices architecture with these key components:

  • Decorators: The enroute system provides unified decorators for broker, HTTP, and periodic handlers
  • Message Brokers: Publish/subscribe messaging with queuing, filtering, and validation
  • HTTP Services: RESTful API support with request/response abstractions
  • Routing: Smart routing based on decorator types and configurations
  • Scheduling: Cron-based periodic task execution
  • Discovery: Service registration and discovery mechanisms

The library follows reactive manifesto principles: responsive, resilient, elastic, and message-driven.

Capabilities

Decorators and Routing

The enroute decorator system provides unified handler registration for different transport types. Supports broker commands/queries/events, REST endpoints, and periodic tasks with automatic routing and middleware integration.

class Enroute:
    broker: BrokerEnroute
    rest: RestEnroute
    periodic: PeriodicEnroute

class BrokerEnroute:
    command: type[BrokerCommandEnrouteDecorator]
    query: type[BrokerQueryEnrouteDecorator] 
    event: type[BrokerEventEnrouteDecorator]

class RestEnroute:
    command: type[RestCommandEnrouteDecorator]
    query: type[RestQueryEnrouteDecorator]

class PeriodicEnroute:
    event: type[PeriodicEventEnrouteDecorator]

enroute: Enroute

Decorators and Routing

Message Broker System

Comprehensive message broker implementation supporting publish/subscribe patterns, message queuing, filtering, validation, and multiple delivery strategies. Includes both in-memory and database-backed implementations.

class BrokerClient:
    def __init__(self, topic: str, publisher: BrokerPublisher, subscriber: BrokerSubscriber): ...
    async def send(self, message: BrokerMessage) -> None: ...
    async def receive(self) -> BrokerMessage: ...
    async def receive_many(self, count: int, timeout: float = 60) -> AsyncIterator[BrokerMessage]: ...

class BrokerMessage:
    topic: str
    identifier: UUID
    content: Any
    headers: dict[str, str]
    
class BrokerHandler:
    def __init__(self, dispatcher: BrokerDispatcher, subscriber: BrokerSubscriber, concurrency: int = 5): ...
    async def run(self) -> NoReturn: ...

Message Broker System

HTTP Services

HTTP server functionality with REST support, request/response abstractions, routing, and connector patterns. Provides foundation for building HTTP APIs and web services.

class HttpPort:
    connector: HttpConnector
    async def start(self) -> None: ...
    async def stop(self) -> None: ...

class HttpRequest:
    user: Optional[UUID]
    headers: dict[str, str]
    content_type: str
    async def url_params(self, type_: Optional[type] = None) -> Any: ...
    async def query_params(self, type_: Optional[type] = None) -> Any: ...

class HttpResponse:
    def __init__(self, content_type: str = "application/json"): ...
    status: int
    async def content(self) -> Any: ...

HTTP Services

Request and Response Handling

Core request/response abstractions used across all transport types. Provides unified interfaces for handling different types of requests with content, parameters, and user context.

class Request:
    user: Optional[UUID]
    has_content: bool
    has_params: bool
    async def content(self) -> Any: ...
    async def params(self) -> dict[str, Any]: ...

class Response:
    def __init__(self, data: Any = None, status: int = 200): ...
    has_content: bool
    status: int
    async def content(self) -> Any: ...

class InMemoryRequest:
    def __init__(self, content: Any = None, params: dict[str, Any] = None, user: Optional[UUID] = None): ...

Request and Response Handling

Task Scheduling

Cron-based periodic task scheduling with async execution, lifecycle management, and integration with the decorator system. Supports complex scheduling patterns and error handling.

class CronTab:
    def __init__(self, pattern: Union[str, CrontTabImpl]): ...
    repetitions: Union[int, float]
    async def sleep_until_next(self) -> None: ...
    get_delay_until_next(self, now: Optional[datetime] = None) -> float: ...

class PeriodicTask:
    def __init__(self, crontab: Union[str, CronTab], fn: Callable): ...
    crontab: CronTab
    started: bool
    running: bool
    async def start(self) -> None: ...
    async def stop(self, timeout: Optional[float] = None) -> None: ...

class PeriodicPort:
    scheduler: PeriodicTaskScheduler

Task Scheduling

Service Discovery

Service registration and discovery mechanisms for microservice coordination. Supports multiple discovery backends and automatic service lifecycle management.

class DiscoveryClient:
    def __init__(self, host: str, port: int): ...
    route: str
    async def subscribe(self, host: str, port: int, name: str, endpoints: list[dict]) -> None: ...
    async def unsubscribe(self, name: str) -> None: ...

class DiscoveryConnector:
    def __init__(self, client: DiscoveryClient, name: str, endpoints: list[dict], host: str, port: Optional[int] = None): ...
    async def subscribe(self) -> None: ...
    async def unsubscribe(self) -> None: ...

Service Discovery

API Specification Services

Built-in services for generating OpenAPI and AsyncAPI specifications from decorated handlers. Provides automated API documentation and specification generation.

class OpenAPIService:
    def __init__(self, config: Config): ...
    spec: dict
    async def generate_specification(self, request: Request) -> Response: ...

class AsyncAPIService:
    def __init__(self, config: Config): ...
    spec: dict
    async def generate_specification(self, request: Request) -> Response: ...

API Specification Services

System and Utilities

System health endpoints, network utilities, and various helper functions for microservice operations.

class SystemService:
    async def check_health(self, request: Request) -> Response: ...

def get_host_ip() -> str: ...
def get_host_name() -> str: ...
def get_ip(name: str) -> str: ...
async def consume_queue(queue, max_count: int) -> None: ...

System and Utilities

Routing System

Abstract interfaces and concrete implementations for handling route registration and management across different transport types. Routers aggregate decorated handlers and provide structured access to routing information.

class Router:
    routes: dict[Any, Callable]
    def build_routes(self) -> None: ...
    def get_routes(self) -> dict[Any, Callable]: ...

class BrokerRouter(Router): ...
class HttpRouter(Router): ...
class RestHttpRouter(HttpRouter): ...
class PeriodicRouter(Router): ...

Routing System

Types

Core Types

from typing import Callable, Union, Optional, Any, Awaitable
from uuid import UUID
from datetime import datetime

Handler = Callable[[Request], Union[Optional[Response], Awaitable[Optional[Response]]]]
Checker = Callable[[Request], Union[Optional[bool], Awaitable[Optional[bool]]]]

class BrokerMessageV1Status(Enum):
    SUCCESS = 200
    ERROR = 400
    SYSTEM_ERROR = 500
    UNKNOWN = 600

class BrokerMessageV1Strategy(Enum):
    UNICAST = "unicast"
    MULTICAST = "multicast"

class EnrouteDecoratorKind(Enum):
    Command = auto()
    Query = auto()
    Event = auto()

Context Variables

from contextvars import ContextVar

REQUEST_USER_CONTEXT_VAR: ContextVar[Optional[UUID]]
REQUEST_HEADERS_CONTEXT_VAR: ContextVar[Optional[dict[str, str]]]
REQUEST_REPLY_TOPIC_CONTEXT_VAR: ContextVar[Optional[str]]

Usage Examples:

from minos.networks import REQUEST_USER_CONTEXT_VAR, REQUEST_HEADERS_CONTEXT_VAR

# Context variables are automatically managed by the framework
# Access current request user in any handler
current_user = REQUEST_USER_CONTEXT_VAR.get()
request_headers = REQUEST_HEADERS_CONTEXT_VAR.get()

Exception Types

class MinosNetworkException(MinosException): ...
class MinosDiscoveryConnectorException(MinosNetworkException): ...
class MinosInvalidDiscoveryClient(MinosNetworkException): ...
class MinosHandlerException(MinosNetworkException): ...
class MinosActionNotFoundException(MinosHandlerException): ...
class MinosHandlerNotFoundEnoughEntriesException(MinosHandlerException): ...
class NotSatisfiedCheckerException(MinosHandlerException): ...
class MinosEnrouteDecoratorException(MinosNetworkException): ...
class MinosMultipleEnrouteDecoratorKindsException(MinosEnrouteDecoratorException): ...
class MinosRedefinedEnrouteDecoratorException(MinosEnrouteDecoratorException): ...
class RequestException(MinosNetworkException): ...
class NotHasContentException(RequestException): ...
class NotHasParamsException(RequestException): ...
class ResponseException(MinosException):
    def __init__(self, status: int = 400): ...
    status: int