CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-minos-microservice-networks

The networks core of the Minos Framework providing networking components for reactive microservices

Pending
Overview
Eval results
Files

brokers.mddocs/

Message Broker System

The message broker system provides comprehensive publish/subscribe messaging with queuing, filtering, validation, and multiple delivery strategies. It supports both in-memory implementations for testing and database-backed implementations for production use.

Capabilities

Core Message Classes

Base classes for broker messages with versioning support and metadata.

class BrokerMessage:
    topic: str
    identifier: UUID
    should_reply: bool
    reply_topic: Optional[str]
    version: int
    content: Any
    ok: bool
    status: int
    headers: dict[str, str]
    def set_reply_topic(self, value: Optional[str]) -> None: ...

class BrokerMessageV1(BrokerMessage):
    def __init__(self, topic: str, payload: BrokerMessageV1Payload, *, identifier: Optional[UUID] = None, strategy: Optional[BrokerMessageV1Strategy] = None): ...
    topic: str
    identifier: UUID
    reply_topic: Optional[str]
    strategy: BrokerMessageV1Strategy
    payload: BrokerMessageV1Payload
    version: int = 1

class BrokerMessageV1Payload:
    def __init__(self, content: Any = None, headers: Optional[dict[str, str]] = None, status: Optional[int] = None): ...
    content: Any
    status: BrokerMessageV1Status
    headers: dict[str, str]
    ok: bool

Message Status and Strategy Enums

from enum import Enum

class BrokerMessageV1Status(Enum):
    SUCCESS = 200
    ERROR = 400
    SYSTEM_ERROR = 500
    UNKNOWN = 600

class BrokerMessageV1Strategy(Enum):
    UNICAST = "unicast"
    MULTICAST = "multicast"

Usage Examples:

from minos.networks import BrokerMessageV1, BrokerMessageV1Payload, BrokerMessageV1Status

# Create a message payload
payload = BrokerMessageV1Payload(
    content={"user_id": "123", "name": "John Doe"},
    headers={"content-type": "application/json"},
    status=BrokerMessageV1Status.SUCCESS
)

# Create a broker message
message = BrokerMessageV1(
    topic="user.created",
    payload=payload,
    strategy=BrokerMessageV1Strategy.MULTICAST
)

# Check message properties
print(f"Topic: {message.topic}")
print(f"Content: {message.content}")
print(f"Is OK: {message.ok}")

Broker Client

High-level client interface for broker communication with sending and receiving capabilities.

class BrokerClient:
    def __init__(self, topic: str, publisher: BrokerPublisher, subscriber: BrokerSubscriber): ...
    topic: str
    publisher: BrokerPublisher
    subscriber: BrokerSubscriber
    @classmethod
    def _from_config(cls, config: Config, **kwargs) -> BrokerClient: ...
    async def send(self, message: BrokerMessage) -> None: ...
    async def receive(self, *args, **kwargs) -> BrokerMessage: ...
    async def receive_many(self, count: int, timeout: float = 60, **kwargs) -> AsyncIterator[BrokerMessage]: ...

class BrokerClientPool:
    def __init__(self, instance_kwargs: dict[str, Any], maxsize: int = 5): ...
    @classmethod  
    def _from_config(cls, config: Config, **kwargs) -> BrokerClientPool: ...
    def acquire(self, *args, **kwargs) -> AsyncContextManager: ...

Usage Examples:

from minos.networks import BrokerClient, BrokerMessageV1
from minos.common import Config

# Create client from configuration
config = Config("config.yml")
client = BrokerClient._from_config(config, topic="user.events")

# Send a message
message = BrokerMessageV1("user.created", payload=payload)
await client.send(message)

# Receive messages
message = await client.receive()
print(f"Received: {message.content}")

# Receive multiple messages
async for message in client.receive_many(count=10, timeout=30):
    print(f"Processing: {message.content}")

# Using client pool
pool = BrokerClientPool._from_config(config, maxsize=10)
async with pool.acquire() as client:
    await client.send(message)

Publishers

Message publishers with various implementations for different use cases.

class BrokerPublisher:
    async def send(self, message: BrokerMessage) -> None: ...

class BrokerPublisherBuilder:
    def __init__(self, *, queue_builder: Optional[Builder] = None, queued_cls: Optional[type[QueuedBrokerPublisher]] = None): ...
    def with_queued_cls(self, queued_cls: type[QueuedBrokerPublisher]) -> BrokerPublisherBuilder: ...
    def with_config(self, config: Config) -> BrokerPublisherBuilder: ...
    def with_queue(self, queue: Union[type[BrokerPublisherQueue], Builder[BrokerPublisherQueue]]) -> BrokerPublisherBuilder: ...
    def with_kwargs(self, kwargs: dict[str, Any]) -> BrokerPublisherBuilder: ...
    def build(self) -> BrokerPublisher: ...

class InMemoryBrokerPublisher(BrokerPublisher):
    messages: list[BrokerMessage]
    async def _send(self, message: BrokerMessage) -> None: ...

class QueuedBrokerPublisher(BrokerPublisher):
    """Publisher with queue support for reliable delivery"""

class DatabaseBrokerPublisherQueue:
    """Database-backed publisher queue for persistence"""

class InMemoryBrokerPublisherQueue:
    """In-memory publisher queue for testing"""

class BrokerPublisherBuilder:
    """Builder for creating configured broker publishers with dependency injection"""
    def with_config(self, config: Config) -> BrokerPublisherBuilder: ...
    def with_queued_cls(self, queued_cls: type[QueuedBrokerPublisher]) -> BrokerPublisherBuilder: ...
    def build(self) -> BrokerPublisher: ...

Usage Examples:

# Using in-memory publisher for testing
publisher = InMemoryBrokerPublisher()
await publisher.send(message)
print(f"Sent messages: {len(publisher.messages)}")

# Building a custom publisher
builder = BrokerPublisherBuilder()
publisher = (builder
    .with_config(config)
    .with_queued_cls(QueuedBrokerPublisher)
    .build())

Subscribers

Message subscribers with filtering, validation, and queue support.

class BrokerSubscriber:
    def __init__(self, topics: Iterable[str]): ...
    topics: set[str]
    def __aiter__(self) -> AsyncIterator[BrokerMessage]: ...
    async def __anext__(self) -> BrokerMessage: ...
    async def receive(self) -> BrokerMessage: ...

class BrokerSubscriberBuilder:
    def __init__(self, *, validator_builder: Optional[Builder] = None, queue_builder: Optional[BrokerSubscriberQueueBuilder] = None, filtered_cls: Optional[type[FilteredBrokerSubscriber]] = None, queued_cls: Optional[type[QueuedBrokerSubscriber]] = None): ...
    def with_filtered_cls(self, filtered_cls: type[FilteredBrokerSubscriber]) -> BrokerSubscriberBuilder: ...
    def with_queued_cls(self, queued_cls: type[QueuedBrokerSubscriber]) -> BrokerSubscriberBuilder: ...
    def with_config(self, config: Config) -> BrokerSubscriberBuilder: ...
    def with_validator(self, validator: Union[type[BrokerSubscriberValidator], Builder[BrokerSubscriberValidator]]) -> BrokerSubscriberBuilder: ...
    def with_queue(self, queue: Union[type[BrokerSubscriberQueue], BrokerSubscriberQueueBuilder]) -> BrokerSubscriberBuilder: ...
    def with_group_id(self, group_id: Optional[str]) -> BrokerSubscriberBuilder: ...
    def with_remove_topics_on_destroy(self, remove_topics_on_destroy: bool) -> BrokerSubscriberBuilder: ...
    def with_topics(self, topics: Iterable[str]) -> BrokerSubscriberBuilder: ...
    def build(self) -> BrokerSubscriber: ...

class InMemoryBrokerSubscriber(BrokerSubscriber):
    """In-memory subscriber implementation for testing"""

class FilteredBrokerSubscriber(BrokerSubscriber):
    """Subscriber with message filtering and validation"""

class QueuedBrokerSubscriber(BrokerSubscriber):
    """Subscriber with queue support for reliable processing"""

Usage Examples:

# Create subscriber for specific topics
subscriber = InMemoryBrokerSubscriber(topics=["user.created", "user.updated"])

# Iterate over messages
async for message in subscriber:
    print(f"Received message: {message.content}")
    if message.topic == "user.created":
        # Handle user creation
        pass

# Build custom subscriber with validation
builder = BrokerSubscriberBuilder()
subscriber = (builder
    .with_topics(["user.*"])
    .with_filtered_cls(FilteredBrokerSubscriber)
    .with_group_id("user-service")
    .build())

Message Handlers

Handler services for processing broker messages with concurrency control.

class BrokerHandler:
    def __init__(self, dispatcher: BrokerDispatcher, subscriber: BrokerSubscriber, concurrency: int = 5): ...
    @classmethod
    def _from_config(cls, config: Config, **kwargs) -> BrokerHandler: ...
    async def run(self) -> NoReturn: ...

class BrokerPort:
    handler: BrokerHandler
    async def _start(self) -> None: ...
    async def _stop(self, err: Exception = None) -> None: ...

class BrokerHandlerService:
    """Deprecated - use BrokerPort instead"""

Usage Examples:

# Create handler with dispatcher and subscriber
handler = BrokerHandler(
    dispatcher=dispatcher,
    subscriber=subscriber,
    concurrency=10
)

# Run handler (blocks until cancelled)
await handler.run()

# Using BrokerPort for lifecycle management
port = BrokerPort._from_config(config)
await port.start()
# ... service runs
await port.stop()

Message Dispatchers

Dispatchers that route messages to appropriate handler functions.

class BrokerDispatcher:
    def __init__(self, actions: dict[str, Optional[Callable]], publisher: BrokerPublisher): ...
    @classmethod
    def _from_config(cls, config: Config, **kwargs) -> BrokerDispatcher: ...
    publisher: BrokerPublisher
    actions: dict[str, Optional[Callable]]
    async def dispatch(self, message: BrokerMessage) -> None: ...
    def get_action(self, topic: str) -> Callable: ...
    @staticmethod
    def get_callback(fn: Callable) -> Callable: ...

class BrokerRequest:
    def __init__(self, raw: BrokerMessage): ...
    raw: BrokerMessage
    user: Optional[UUID]
    headers: dict[str, str]
    has_content: bool
    has_params: bool
    async def _content(self, **kwargs) -> Any: ...

class BrokerResponse:
    """Response class for broker handlers"""

class BrokerResponseException:
    """Exception class for broker response errors"""

Usage Examples:

# Create dispatcher with topic mappings
actions = {
    "user.create": create_user_handler,
    "user.update": update_user_handler,
    "user.delete": delete_user_handler
}

dispatcher = BrokerDispatcher(actions=actions, publisher=publisher)

# Dispatch a message
message = BrokerMessageV1(topic="user.create", payload=payload)
await dispatcher.dispatch(message)

# Get handler for a topic
handler = dispatcher.get_action("user.create")

Queue Management

Queue implementations for reliable message processing.

class BrokerQueue:
    """Abstract base for broker queues"""

class DatabaseBrokerQueue(BrokerQueue):
    """Database-backed queue for persistence"""

class InMemoryBrokerQueue(BrokerQueue):
    """In-memory queue for testing"""

class BrokerSubscriberQueue:
    """Queue specifically for subscriber implementations"""

class BrokerSubscriberValidator:
    """Validator for subscriber message processing"""

class BrokerSubscriberDuplicateValidator:
    """Prevents duplicate message processing"""

Context Variables

Context variables for passing request metadata across async boundaries.

from contextvars import ContextVar

REQUEST_HEADERS_CONTEXT_VAR: ContextVar[Optional[dict[str, str]]]
REQUEST_REPLY_TOPIC_CONTEXT_VAR: ContextVar[Optional[str]]

Usage Examples:

from minos.networks import REQUEST_HEADERS_CONTEXT_VAR, REQUEST_REPLY_TOPIC_CONTEXT_VAR

# Set context variables
headers = {"user-id": "123", "trace-id": "abc"}
REQUEST_HEADERS_CONTEXT_VAR.set(headers)
REQUEST_REPLY_TOPIC_CONTEXT_VAR.set("user.created.reply")

# Access in handler functions
def my_handler(request):
    headers = REQUEST_HEADERS_CONTEXT_VAR.get({})
    reply_topic = REQUEST_REPLY_TOPIC_CONTEXT_VAR.get()
    # Use context data

Advanced Usage

Complete Broker Service Setup

from minos.networks import (
    BrokerHandler, BrokerDispatcher, BrokerSubscriber,
    BrokerPublisher, BrokerClient, enroute
)
from minos.common import Config

class UserService:
    @enroute.broker.command("user.create")
    async def create_user(self, request: BrokerRequest) -> BrokerResponse:
        user_data = await request.content()
        # Create user logic
        return BrokerResponse({"id": "123", "status": "created"})
    
    @enroute.broker.event("user.created")
    async def handle_user_created(self, request: BrokerRequest) -> BrokerResponse:
        event_data = await request.content()
        # Handle event
        return BrokerResponse({"processed": True})

# Setup broker infrastructure
config = Config("config.yml")
publisher = BrokerPublisher._from_config(config)
subscriber = BrokerSubscriber._from_config(config, topics=["user.*"])

# Create dispatcher with service handlers
from minos.networks import EnrouteFactory
factory = EnrouteFactory(UserService)
actions = factory.get_broker_command_query_event()

dispatcher = BrokerDispatcher(actions=actions, publisher=publisher)
handler = BrokerHandler(dispatcher=dispatcher, subscriber=subscriber, concurrency=5)

# Run the handler
await handler.run()

Message Publishing Patterns

# Simple publish
client = BrokerClient._from_config(config, topic="notifications")
message = BrokerMessageV1("email.send", payload=email_payload)
await client.send(message)

# Request-reply pattern
reply_message = BrokerMessageV1(
    topic="user.get",
    payload=query_payload,
    reply_topic="user.get.reply"
)
await client.send(reply_message)

# Listen for reply
reply_client = BrokerClient._from_config(config, topic="user.get.reply")
response = await reply_client.receive()

Error Handling

from minos.networks import BrokerResponseException

@enroute.broker.command("user.create")
async def create_user(request: BrokerRequest) -> BrokerResponse:
    try:
        user_data = await request.content()
        if not user_data.get("email"):
            raise BrokerResponseException("Email is required", status=400)
        # Process creation
        return BrokerResponse({"status": "created"})
    except Exception as e:
        raise BrokerResponseException(f"Creation failed: {e}", status=500)

Install with Tessl CLI

npx tessl i tessl/pypi-minos-microservice-networks

docs

brokers.md

decorators.md

discovery.md

http.md

index.md

requests.md

routers.md

scheduling.md

specs.md

system-utils.md

tile.json