The networks core of the Minos Framework providing networking components for reactive microservices
—
The message broker system provides comprehensive publish/subscribe messaging with queuing, filtering, validation, and multiple delivery strategies. It supports both in-memory implementations for testing and database-backed implementations for production use.
Base classes for broker messages with versioning support and metadata.
class BrokerMessage:
topic: str
identifier: UUID
should_reply: bool
reply_topic: Optional[str]
version: int
content: Any
ok: bool
status: int
headers: dict[str, str]
def set_reply_topic(self, value: Optional[str]) -> None: ...
class BrokerMessageV1(BrokerMessage):
def __init__(self, topic: str, payload: BrokerMessageV1Payload, *, identifier: Optional[UUID] = None, strategy: Optional[BrokerMessageV1Strategy] = None): ...
topic: str
identifier: UUID
reply_topic: Optional[str]
strategy: BrokerMessageV1Strategy
payload: BrokerMessageV1Payload
version: int = 1
class BrokerMessageV1Payload:
def __init__(self, content: Any = None, headers: Optional[dict[str, str]] = None, status: Optional[int] = None): ...
content: Any
status: BrokerMessageV1Status
headers: dict[str, str]
ok: boolfrom enum import Enum
class BrokerMessageV1Status(Enum):
SUCCESS = 200
ERROR = 400
SYSTEM_ERROR = 500
UNKNOWN = 600
class BrokerMessageV1Strategy(Enum):
UNICAST = "unicast"
MULTICAST = "multicast"Usage Examples:
from minos.networks import BrokerMessageV1, BrokerMessageV1Payload, BrokerMessageV1Status
# Create a message payload
payload = BrokerMessageV1Payload(
content={"user_id": "123", "name": "John Doe"},
headers={"content-type": "application/json"},
status=BrokerMessageV1Status.SUCCESS
)
# Create a broker message
message = BrokerMessageV1(
topic="user.created",
payload=payload,
strategy=BrokerMessageV1Strategy.MULTICAST
)
# Check message properties
print(f"Topic: {message.topic}")
print(f"Content: {message.content}")
print(f"Is OK: {message.ok}")High-level client interface for broker communication with sending and receiving capabilities.
class BrokerClient:
def __init__(self, topic: str, publisher: BrokerPublisher, subscriber: BrokerSubscriber): ...
topic: str
publisher: BrokerPublisher
subscriber: BrokerSubscriber
@classmethod
def _from_config(cls, config: Config, **kwargs) -> BrokerClient: ...
async def send(self, message: BrokerMessage) -> None: ...
async def receive(self, *args, **kwargs) -> BrokerMessage: ...
async def receive_many(self, count: int, timeout: float = 60, **kwargs) -> AsyncIterator[BrokerMessage]: ...
class BrokerClientPool:
def __init__(self, instance_kwargs: dict[str, Any], maxsize: int = 5): ...
@classmethod
def _from_config(cls, config: Config, **kwargs) -> BrokerClientPool: ...
def acquire(self, *args, **kwargs) -> AsyncContextManager: ...Usage Examples:
from minos.networks import BrokerClient, BrokerMessageV1
from minos.common import Config
# Create client from configuration
config = Config("config.yml")
client = BrokerClient._from_config(config, topic="user.events")
# Send a message
message = BrokerMessageV1("user.created", payload=payload)
await client.send(message)
# Receive messages
message = await client.receive()
print(f"Received: {message.content}")
# Receive multiple messages
async for message in client.receive_many(count=10, timeout=30):
print(f"Processing: {message.content}")
# Using client pool
pool = BrokerClientPool._from_config(config, maxsize=10)
async with pool.acquire() as client:
await client.send(message)Message publishers with various implementations for different use cases.
class BrokerPublisher:
async def send(self, message: BrokerMessage) -> None: ...
class BrokerPublisherBuilder:
def __init__(self, *, queue_builder: Optional[Builder] = None, queued_cls: Optional[type[QueuedBrokerPublisher]] = None): ...
def with_queued_cls(self, queued_cls: type[QueuedBrokerPublisher]) -> BrokerPublisherBuilder: ...
def with_config(self, config: Config) -> BrokerPublisherBuilder: ...
def with_queue(self, queue: Union[type[BrokerPublisherQueue], Builder[BrokerPublisherQueue]]) -> BrokerPublisherBuilder: ...
def with_kwargs(self, kwargs: dict[str, Any]) -> BrokerPublisherBuilder: ...
def build(self) -> BrokerPublisher: ...
class InMemoryBrokerPublisher(BrokerPublisher):
messages: list[BrokerMessage]
async def _send(self, message: BrokerMessage) -> None: ...
class QueuedBrokerPublisher(BrokerPublisher):
"""Publisher with queue support for reliable delivery"""
class DatabaseBrokerPublisherQueue:
"""Database-backed publisher queue for persistence"""
class InMemoryBrokerPublisherQueue:
"""In-memory publisher queue for testing"""
class BrokerPublisherBuilder:
"""Builder for creating configured broker publishers with dependency injection"""
def with_config(self, config: Config) -> BrokerPublisherBuilder: ...
def with_queued_cls(self, queued_cls: type[QueuedBrokerPublisher]) -> BrokerPublisherBuilder: ...
def build(self) -> BrokerPublisher: ...Usage Examples:
# Using in-memory publisher for testing
publisher = InMemoryBrokerPublisher()
await publisher.send(message)
print(f"Sent messages: {len(publisher.messages)}")
# Building a custom publisher
builder = BrokerPublisherBuilder()
publisher = (builder
.with_config(config)
.with_queued_cls(QueuedBrokerPublisher)
.build())Message subscribers with filtering, validation, and queue support.
class BrokerSubscriber:
def __init__(self, topics: Iterable[str]): ...
topics: set[str]
def __aiter__(self) -> AsyncIterator[BrokerMessage]: ...
async def __anext__(self) -> BrokerMessage: ...
async def receive(self) -> BrokerMessage: ...
class BrokerSubscriberBuilder:
def __init__(self, *, validator_builder: Optional[Builder] = None, queue_builder: Optional[BrokerSubscriberQueueBuilder] = None, filtered_cls: Optional[type[FilteredBrokerSubscriber]] = None, queued_cls: Optional[type[QueuedBrokerSubscriber]] = None): ...
def with_filtered_cls(self, filtered_cls: type[FilteredBrokerSubscriber]) -> BrokerSubscriberBuilder: ...
def with_queued_cls(self, queued_cls: type[QueuedBrokerSubscriber]) -> BrokerSubscriberBuilder: ...
def with_config(self, config: Config) -> BrokerSubscriberBuilder: ...
def with_validator(self, validator: Union[type[BrokerSubscriberValidator], Builder[BrokerSubscriberValidator]]) -> BrokerSubscriberBuilder: ...
def with_queue(self, queue: Union[type[BrokerSubscriberQueue], BrokerSubscriberQueueBuilder]) -> BrokerSubscriberBuilder: ...
def with_group_id(self, group_id: Optional[str]) -> BrokerSubscriberBuilder: ...
def with_remove_topics_on_destroy(self, remove_topics_on_destroy: bool) -> BrokerSubscriberBuilder: ...
def with_topics(self, topics: Iterable[str]) -> BrokerSubscriberBuilder: ...
def build(self) -> BrokerSubscriber: ...
class InMemoryBrokerSubscriber(BrokerSubscriber):
"""In-memory subscriber implementation for testing"""
class FilteredBrokerSubscriber(BrokerSubscriber):
"""Subscriber with message filtering and validation"""
class QueuedBrokerSubscriber(BrokerSubscriber):
"""Subscriber with queue support for reliable processing"""Usage Examples:
# Create subscriber for specific topics
subscriber = InMemoryBrokerSubscriber(topics=["user.created", "user.updated"])
# Iterate over messages
async for message in subscriber:
print(f"Received message: {message.content}")
if message.topic == "user.created":
# Handle user creation
pass
# Build custom subscriber with validation
builder = BrokerSubscriberBuilder()
subscriber = (builder
.with_topics(["user.*"])
.with_filtered_cls(FilteredBrokerSubscriber)
.with_group_id("user-service")
.build())Handler services for processing broker messages with concurrency control.
class BrokerHandler:
def __init__(self, dispatcher: BrokerDispatcher, subscriber: BrokerSubscriber, concurrency: int = 5): ...
@classmethod
def _from_config(cls, config: Config, **kwargs) -> BrokerHandler: ...
async def run(self) -> NoReturn: ...
class BrokerPort:
handler: BrokerHandler
async def _start(self) -> None: ...
async def _stop(self, err: Exception = None) -> None: ...
class BrokerHandlerService:
"""Deprecated - use BrokerPort instead"""Usage Examples:
# Create handler with dispatcher and subscriber
handler = BrokerHandler(
dispatcher=dispatcher,
subscriber=subscriber,
concurrency=10
)
# Run handler (blocks until cancelled)
await handler.run()
# Using BrokerPort for lifecycle management
port = BrokerPort._from_config(config)
await port.start()
# ... service runs
await port.stop()Dispatchers that route messages to appropriate handler functions.
class BrokerDispatcher:
def __init__(self, actions: dict[str, Optional[Callable]], publisher: BrokerPublisher): ...
@classmethod
def _from_config(cls, config: Config, **kwargs) -> BrokerDispatcher: ...
publisher: BrokerPublisher
actions: dict[str, Optional[Callable]]
async def dispatch(self, message: BrokerMessage) -> None: ...
def get_action(self, topic: str) -> Callable: ...
@staticmethod
def get_callback(fn: Callable) -> Callable: ...
class BrokerRequest:
def __init__(self, raw: BrokerMessage): ...
raw: BrokerMessage
user: Optional[UUID]
headers: dict[str, str]
has_content: bool
has_params: bool
async def _content(self, **kwargs) -> Any: ...
class BrokerResponse:
"""Response class for broker handlers"""
class BrokerResponseException:
"""Exception class for broker response errors"""Usage Examples:
# Create dispatcher with topic mappings
actions = {
"user.create": create_user_handler,
"user.update": update_user_handler,
"user.delete": delete_user_handler
}
dispatcher = BrokerDispatcher(actions=actions, publisher=publisher)
# Dispatch a message
message = BrokerMessageV1(topic="user.create", payload=payload)
await dispatcher.dispatch(message)
# Get handler for a topic
handler = dispatcher.get_action("user.create")Queue implementations for reliable message processing.
class BrokerQueue:
"""Abstract base for broker queues"""
class DatabaseBrokerQueue(BrokerQueue):
"""Database-backed queue for persistence"""
class InMemoryBrokerQueue(BrokerQueue):
"""In-memory queue for testing"""
class BrokerSubscriberQueue:
"""Queue specifically for subscriber implementations"""
class BrokerSubscriberValidator:
"""Validator for subscriber message processing"""
class BrokerSubscriberDuplicateValidator:
"""Prevents duplicate message processing"""Context variables for passing request metadata across async boundaries.
from contextvars import ContextVar
REQUEST_HEADERS_CONTEXT_VAR: ContextVar[Optional[dict[str, str]]]
REQUEST_REPLY_TOPIC_CONTEXT_VAR: ContextVar[Optional[str]]Usage Examples:
from minos.networks import REQUEST_HEADERS_CONTEXT_VAR, REQUEST_REPLY_TOPIC_CONTEXT_VAR
# Set context variables
headers = {"user-id": "123", "trace-id": "abc"}
REQUEST_HEADERS_CONTEXT_VAR.set(headers)
REQUEST_REPLY_TOPIC_CONTEXT_VAR.set("user.created.reply")
# Access in handler functions
def my_handler(request):
headers = REQUEST_HEADERS_CONTEXT_VAR.get({})
reply_topic = REQUEST_REPLY_TOPIC_CONTEXT_VAR.get()
# Use context datafrom minos.networks import (
BrokerHandler, BrokerDispatcher, BrokerSubscriber,
BrokerPublisher, BrokerClient, enroute
)
from minos.common import Config
class UserService:
@enroute.broker.command("user.create")
async def create_user(self, request: BrokerRequest) -> BrokerResponse:
user_data = await request.content()
# Create user logic
return BrokerResponse({"id": "123", "status": "created"})
@enroute.broker.event("user.created")
async def handle_user_created(self, request: BrokerRequest) -> BrokerResponse:
event_data = await request.content()
# Handle event
return BrokerResponse({"processed": True})
# Setup broker infrastructure
config = Config("config.yml")
publisher = BrokerPublisher._from_config(config)
subscriber = BrokerSubscriber._from_config(config, topics=["user.*"])
# Create dispatcher with service handlers
from minos.networks import EnrouteFactory
factory = EnrouteFactory(UserService)
actions = factory.get_broker_command_query_event()
dispatcher = BrokerDispatcher(actions=actions, publisher=publisher)
handler = BrokerHandler(dispatcher=dispatcher, subscriber=subscriber, concurrency=5)
# Run the handler
await handler.run()# Simple publish
client = BrokerClient._from_config(config, topic="notifications")
message = BrokerMessageV1("email.send", payload=email_payload)
await client.send(message)
# Request-reply pattern
reply_message = BrokerMessageV1(
topic="user.get",
payload=query_payload,
reply_topic="user.get.reply"
)
await client.send(reply_message)
# Listen for reply
reply_client = BrokerClient._from_config(config, topic="user.get.reply")
response = await reply_client.receive()from minos.networks import BrokerResponseException
@enroute.broker.command("user.create")
async def create_user(request: BrokerRequest) -> BrokerResponse:
try:
user_data = await request.content()
if not user_data.get("email"):
raise BrokerResponseException("Email is required", status=400)
# Process creation
return BrokerResponse({"status": "created"})
except Exception as e:
raise BrokerResponseException(f"Creation failed: {e}", status=500)Install with Tessl CLI
npx tessl i tessl/pypi-minos-microservice-networks