0
# Message Broker System
1
2
The message broker system provides comprehensive publish/subscribe messaging with queuing, filtering, validation, and multiple delivery strategies. It supports both in-memory implementations for testing and database-backed implementations for production use.
3
4
## Capabilities
5
6
### Core Message Classes
7
8
Base classes for broker messages with versioning support and metadata.
9
10
```python { .api }
11
class BrokerMessage:
12
topic: str
13
identifier: UUID
14
should_reply: bool
15
reply_topic: Optional[str]
16
version: int
17
content: Any
18
ok: bool
19
status: int
20
headers: dict[str, str]
21
def set_reply_topic(self, value: Optional[str]) -> None: ...
22
23
class BrokerMessageV1(BrokerMessage):
24
def __init__(self, topic: str, payload: BrokerMessageV1Payload, *, identifier: Optional[UUID] = None, strategy: Optional[BrokerMessageV1Strategy] = None): ...
25
topic: str
26
identifier: UUID
27
reply_topic: Optional[str]
28
strategy: BrokerMessageV1Strategy
29
payload: BrokerMessageV1Payload
30
version: int = 1
31
32
class BrokerMessageV1Payload:
33
def __init__(self, content: Any = None, headers: Optional[dict[str, str]] = None, status: Optional[int] = None): ...
34
content: Any
35
status: BrokerMessageV1Status
36
headers: dict[str, str]
37
ok: bool
38
```
39
40
### Message Status and Strategy Enums
41
42
```python { .api }
43
from enum import Enum
44
45
class BrokerMessageV1Status(Enum):
46
SUCCESS = 200
47
ERROR = 400
48
SYSTEM_ERROR = 500
49
UNKNOWN = 600
50
51
class BrokerMessageV1Strategy(Enum):
52
UNICAST = "unicast"
53
MULTICAST = "multicast"
54
```
55
56
**Usage Examples:**
57
58
```python
59
from minos.networks import BrokerMessageV1, BrokerMessageV1Payload, BrokerMessageV1Status
60
61
# Create a message payload
62
payload = BrokerMessageV1Payload(
63
content={"user_id": "123", "name": "John Doe"},
64
headers={"content-type": "application/json"},
65
status=BrokerMessageV1Status.SUCCESS
66
)
67
68
# Create a broker message
69
message = BrokerMessageV1(
70
topic="user.created",
71
payload=payload,
72
strategy=BrokerMessageV1Strategy.MULTICAST
73
)
74
75
# Check message properties
76
print(f"Topic: {message.topic}")
77
print(f"Content: {message.content}")
78
print(f"Is OK: {message.ok}")
79
```
80
81
### Broker Client
82
83
High-level client interface for broker communication with sending and receiving capabilities.
84
85
```python { .api }
86
class BrokerClient:
87
def __init__(self, topic: str, publisher: BrokerPublisher, subscriber: BrokerSubscriber): ...
88
topic: str
89
publisher: BrokerPublisher
90
subscriber: BrokerSubscriber
91
@classmethod
92
def _from_config(cls, config: Config, **kwargs) -> BrokerClient: ...
93
async def send(self, message: BrokerMessage) -> None: ...
94
async def receive(self, *args, **kwargs) -> BrokerMessage: ...
95
async def receive_many(self, count: int, timeout: float = 60, **kwargs) -> AsyncIterator[BrokerMessage]: ...
96
97
class BrokerClientPool:
98
def __init__(self, instance_kwargs: dict[str, Any], maxsize: int = 5): ...
99
@classmethod
100
def _from_config(cls, config: Config, **kwargs) -> BrokerClientPool: ...
101
def acquire(self, *args, **kwargs) -> AsyncContextManager: ...
102
```
103
104
**Usage Examples:**
105
106
```python
107
from minos.networks import BrokerClient, BrokerMessageV1
108
from minos.common import Config
109
110
# Create client from configuration
111
config = Config("config.yml")
112
client = BrokerClient._from_config(config, topic="user.events")
113
114
# Send a message
115
message = BrokerMessageV1("user.created", payload=payload)
116
await client.send(message)
117
118
# Receive messages
119
message = await client.receive()
120
print(f"Received: {message.content}")
121
122
# Receive multiple messages
123
async for message in client.receive_many(count=10, timeout=30):
124
print(f"Processing: {message.content}")
125
126
# Using client pool
127
pool = BrokerClientPool._from_config(config, maxsize=10)
128
async with pool.acquire() as client:
129
await client.send(message)
130
```
131
132
### Publishers
133
134
Message publishers with various implementations for different use cases.
135
136
```python { .api }
137
class BrokerPublisher:
138
async def send(self, message: BrokerMessage) -> None: ...
139
140
class BrokerPublisherBuilder:
141
def __init__(self, *, queue_builder: Optional[Builder] = None, queued_cls: Optional[type[QueuedBrokerPublisher]] = None): ...
142
def with_queued_cls(self, queued_cls: type[QueuedBrokerPublisher]) -> BrokerPublisherBuilder: ...
143
def with_config(self, config: Config) -> BrokerPublisherBuilder: ...
144
def with_queue(self, queue: Union[type[BrokerPublisherQueue], Builder[BrokerPublisherQueue]]) -> BrokerPublisherBuilder: ...
145
def with_kwargs(self, kwargs: dict[str, Any]) -> BrokerPublisherBuilder: ...
146
def build(self) -> BrokerPublisher: ...
147
148
class InMemoryBrokerPublisher(BrokerPublisher):
149
messages: list[BrokerMessage]
150
async def _send(self, message: BrokerMessage) -> None: ...
151
152
class QueuedBrokerPublisher(BrokerPublisher):
153
"""Publisher with queue support for reliable delivery"""
154
155
class DatabaseBrokerPublisherQueue:
156
"""Database-backed publisher queue for persistence"""
157
158
class InMemoryBrokerPublisherQueue:
159
"""In-memory publisher queue for testing"""
160
161
class BrokerPublisherBuilder:
162
"""Builder for creating configured broker publishers with dependency injection"""
163
def with_config(self, config: Config) -> BrokerPublisherBuilder: ...
164
def with_queued_cls(self, queued_cls: type[QueuedBrokerPublisher]) -> BrokerPublisherBuilder: ...
165
def build(self) -> BrokerPublisher: ...
166
```
167
168
**Usage Examples:**
169
170
```python
171
# Using in-memory publisher for testing
172
publisher = InMemoryBrokerPublisher()
173
await publisher.send(message)
174
print(f"Sent messages: {len(publisher.messages)}")
175
176
# Building a custom publisher
177
builder = BrokerPublisherBuilder()
178
publisher = (builder
179
.with_config(config)
180
.with_queued_cls(QueuedBrokerPublisher)
181
.build())
182
```
183
184
### Subscribers
185
186
Message subscribers with filtering, validation, and queue support.
187
188
```python { .api }
189
class BrokerSubscriber:
190
def __init__(self, topics: Iterable[str]): ...
191
topics: set[str]
192
def __aiter__(self) -> AsyncIterator[BrokerMessage]: ...
193
async def __anext__(self) -> BrokerMessage: ...
194
async def receive(self) -> BrokerMessage: ...
195
196
class BrokerSubscriberBuilder:
197
def __init__(self, *, validator_builder: Optional[Builder] = None, queue_builder: Optional[BrokerSubscriberQueueBuilder] = None, filtered_cls: Optional[type[FilteredBrokerSubscriber]] = None, queued_cls: Optional[type[QueuedBrokerSubscriber]] = None): ...
198
def with_filtered_cls(self, filtered_cls: type[FilteredBrokerSubscriber]) -> BrokerSubscriberBuilder: ...
199
def with_queued_cls(self, queued_cls: type[QueuedBrokerSubscriber]) -> BrokerSubscriberBuilder: ...
200
def with_config(self, config: Config) -> BrokerSubscriberBuilder: ...
201
def with_validator(self, validator: Union[type[BrokerSubscriberValidator], Builder[BrokerSubscriberValidator]]) -> BrokerSubscriberBuilder: ...
202
def with_queue(self, queue: Union[type[BrokerSubscriberQueue], BrokerSubscriberQueueBuilder]) -> BrokerSubscriberBuilder: ...
203
def with_group_id(self, group_id: Optional[str]) -> BrokerSubscriberBuilder: ...
204
def with_remove_topics_on_destroy(self, remove_topics_on_destroy: bool) -> BrokerSubscriberBuilder: ...
205
def with_topics(self, topics: Iterable[str]) -> BrokerSubscriberBuilder: ...
206
def build(self) -> BrokerSubscriber: ...
207
208
class InMemoryBrokerSubscriber(BrokerSubscriber):
209
"""In-memory subscriber implementation for testing"""
210
211
class FilteredBrokerSubscriber(BrokerSubscriber):
212
"""Subscriber with message filtering and validation"""
213
214
class QueuedBrokerSubscriber(BrokerSubscriber):
215
"""Subscriber with queue support for reliable processing"""
216
```
217
218
**Usage Examples:**
219
220
```python
221
# Create subscriber for specific topics
222
subscriber = InMemoryBrokerSubscriber(topics=["user.created", "user.updated"])
223
224
# Iterate over messages
225
async for message in subscriber:
226
print(f"Received message: {message.content}")
227
if message.topic == "user.created":
228
# Handle user creation
229
pass
230
231
# Build custom subscriber with validation
232
builder = BrokerSubscriberBuilder()
233
subscriber = (builder
234
.with_topics(["user.*"])
235
.with_filtered_cls(FilteredBrokerSubscriber)
236
.with_group_id("user-service")
237
.build())
238
```
239
240
### Message Handlers
241
242
Handler services for processing broker messages with concurrency control.
243
244
```python { .api }
245
class BrokerHandler:
246
def __init__(self, dispatcher: BrokerDispatcher, subscriber: BrokerSubscriber, concurrency: int = 5): ...
247
@classmethod
248
def _from_config(cls, config: Config, **kwargs) -> BrokerHandler: ...
249
async def run(self) -> NoReturn: ...
250
251
class BrokerPort:
252
handler: BrokerHandler
253
async def _start(self) -> None: ...
254
async def _stop(self, err: Exception = None) -> None: ...
255
256
class BrokerHandlerService:
257
"""Deprecated - use BrokerPort instead"""
258
```
259
260
**Usage Examples:**
261
262
```python
263
# Create handler with dispatcher and subscriber
264
handler = BrokerHandler(
265
dispatcher=dispatcher,
266
subscriber=subscriber,
267
concurrency=10
268
)
269
270
# Run handler (blocks until cancelled)
271
await handler.run()
272
273
# Using BrokerPort for lifecycle management
274
port = BrokerPort._from_config(config)
275
await port.start()
276
# ... service runs
277
await port.stop()
278
```
279
280
### Message Dispatchers
281
282
Dispatchers that route messages to appropriate handler functions.
283
284
```python { .api }
285
class BrokerDispatcher:
286
def __init__(self, actions: dict[str, Optional[Callable]], publisher: BrokerPublisher): ...
287
@classmethod
288
def _from_config(cls, config: Config, **kwargs) -> BrokerDispatcher: ...
289
publisher: BrokerPublisher
290
actions: dict[str, Optional[Callable]]
291
async def dispatch(self, message: BrokerMessage) -> None: ...
292
def get_action(self, topic: str) -> Callable: ...
293
@staticmethod
294
def get_callback(fn: Callable) -> Callable: ...
295
296
class BrokerRequest:
297
def __init__(self, raw: BrokerMessage): ...
298
raw: BrokerMessage
299
user: Optional[UUID]
300
headers: dict[str, str]
301
has_content: bool
302
has_params: bool
303
async def _content(self, **kwargs) -> Any: ...
304
305
class BrokerResponse:
306
"""Response class for broker handlers"""
307
308
class BrokerResponseException:
309
"""Exception class for broker response errors"""
310
```
311
312
**Usage Examples:**
313
314
```python
315
# Create dispatcher with topic mappings
316
actions = {
317
"user.create": create_user_handler,
318
"user.update": update_user_handler,
319
"user.delete": delete_user_handler
320
}
321
322
dispatcher = BrokerDispatcher(actions=actions, publisher=publisher)
323
324
# Dispatch a message
325
message = BrokerMessageV1(topic="user.create", payload=payload)
326
await dispatcher.dispatch(message)
327
328
# Get handler for a topic
329
handler = dispatcher.get_action("user.create")
330
```
331
332
### Queue Management
333
334
Queue implementations for reliable message processing.
335
336
```python { .api }
337
class BrokerQueue:
338
"""Abstract base for broker queues"""
339
340
class DatabaseBrokerQueue(BrokerQueue):
341
"""Database-backed queue for persistence"""
342
343
class InMemoryBrokerQueue(BrokerQueue):
344
"""In-memory queue for testing"""
345
346
class BrokerSubscriberQueue:
347
"""Queue specifically for subscriber implementations"""
348
349
class BrokerSubscriberValidator:
350
"""Validator for subscriber message processing"""
351
352
class BrokerSubscriberDuplicateValidator:
353
"""Prevents duplicate message processing"""
354
```
355
356
### Context Variables
357
358
Context variables for passing request metadata across async boundaries.
359
360
```python { .api }
361
from contextvars import ContextVar
362
363
REQUEST_HEADERS_CONTEXT_VAR: ContextVar[Optional[dict[str, str]]]
364
REQUEST_REPLY_TOPIC_CONTEXT_VAR: ContextVar[Optional[str]]
365
```
366
367
**Usage Examples:**
368
369
```python
370
from minos.networks import REQUEST_HEADERS_CONTEXT_VAR, REQUEST_REPLY_TOPIC_CONTEXT_VAR
371
372
# Set context variables
373
headers = {"user-id": "123", "trace-id": "abc"}
374
REQUEST_HEADERS_CONTEXT_VAR.set(headers)
375
REQUEST_REPLY_TOPIC_CONTEXT_VAR.set("user.created.reply")
376
377
# Access in handler functions
378
def my_handler(request):
379
headers = REQUEST_HEADERS_CONTEXT_VAR.get({})
380
reply_topic = REQUEST_REPLY_TOPIC_CONTEXT_VAR.get()
381
# Use context data
382
```
383
384
## Advanced Usage
385
386
### Complete Broker Service Setup
387
388
```python
389
from minos.networks import (
390
BrokerHandler, BrokerDispatcher, BrokerSubscriber,
391
BrokerPublisher, BrokerClient, enroute
392
)
393
from minos.common import Config
394
395
class UserService:
396
@enroute.broker.command("user.create")
397
async def create_user(self, request: BrokerRequest) -> BrokerResponse:
398
user_data = await request.content()
399
# Create user logic
400
return BrokerResponse({"id": "123", "status": "created"})
401
402
@enroute.broker.event("user.created")
403
async def handle_user_created(self, request: BrokerRequest) -> BrokerResponse:
404
event_data = await request.content()
405
# Handle event
406
return BrokerResponse({"processed": True})
407
408
# Setup broker infrastructure
409
config = Config("config.yml")
410
publisher = BrokerPublisher._from_config(config)
411
subscriber = BrokerSubscriber._from_config(config, topics=["user.*"])
412
413
# Create dispatcher with service handlers
414
from minos.networks import EnrouteFactory
415
factory = EnrouteFactory(UserService)
416
actions = factory.get_broker_command_query_event()
417
418
dispatcher = BrokerDispatcher(actions=actions, publisher=publisher)
419
handler = BrokerHandler(dispatcher=dispatcher, subscriber=subscriber, concurrency=5)
420
421
# Run the handler
422
await handler.run()
423
```
424
425
### Message Publishing Patterns
426
427
```python
428
# Simple publish
429
client = BrokerClient._from_config(config, topic="notifications")
430
message = BrokerMessageV1("email.send", payload=email_payload)
431
await client.send(message)
432
433
# Request-reply pattern
434
reply_message = BrokerMessageV1(
435
topic="user.get",
436
payload=query_payload,
437
reply_topic="user.get.reply"
438
)
439
await client.send(reply_message)
440
441
# Listen for reply
442
reply_client = BrokerClient._from_config(config, topic="user.get.reply")
443
response = await reply_client.receive()
444
```
445
446
### Error Handling
447
448
```python
449
from minos.networks import BrokerResponseException
450
451
@enroute.broker.command("user.create")
452
async def create_user(request: BrokerRequest) -> BrokerResponse:
453
try:
454
user_data = await request.content()
455
if not user_data.get("email"):
456
raise BrokerResponseException("Email is required", status=400)
457
# Process creation
458
return BrokerResponse({"status": "created"})
459
except Exception as e:
460
raise BrokerResponseException(f"Creation failed: {e}", status=500)
461
```