or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

brokers.mddecorators.mddiscovery.mdhttp.mdindex.mdrequests.mdrouters.mdscheduling.mdspecs.mdsystem-utils.md

brokers.mddocs/

0

# Message Broker System

1

2

The message broker system provides comprehensive publish/subscribe messaging with queuing, filtering, validation, and multiple delivery strategies. It supports both in-memory implementations for testing and database-backed implementations for production use.

3

4

## Capabilities

5

6

### Core Message Classes

7

8

Base classes for broker messages with versioning support and metadata.

9

10

```python { .api }

11

class BrokerMessage:

12

topic: str

13

identifier: UUID

14

should_reply: bool

15

reply_topic: Optional[str]

16

version: int

17

content: Any

18

ok: bool

19

status: int

20

headers: dict[str, str]

21

def set_reply_topic(self, value: Optional[str]) -> None: ...

22

23

class BrokerMessageV1(BrokerMessage):

24

def __init__(self, topic: str, payload: BrokerMessageV1Payload, *, identifier: Optional[UUID] = None, strategy: Optional[BrokerMessageV1Strategy] = None): ...

25

topic: str

26

identifier: UUID

27

reply_topic: Optional[str]

28

strategy: BrokerMessageV1Strategy

29

payload: BrokerMessageV1Payload

30

version: int = 1

31

32

class BrokerMessageV1Payload:

33

def __init__(self, content: Any = None, headers: Optional[dict[str, str]] = None, status: Optional[int] = None): ...

34

content: Any

35

status: BrokerMessageV1Status

36

headers: dict[str, str]

37

ok: bool

38

```

39

40

### Message Status and Strategy Enums

41

42

```python { .api }

43

from enum import Enum

44

45

class BrokerMessageV1Status(Enum):

46

SUCCESS = 200

47

ERROR = 400

48

SYSTEM_ERROR = 500

49

UNKNOWN = 600

50

51

class BrokerMessageV1Strategy(Enum):

52

UNICAST = "unicast"

53

MULTICAST = "multicast"

54

```

55

56

**Usage Examples:**

57

58

```python

59

from minos.networks import BrokerMessageV1, BrokerMessageV1Payload, BrokerMessageV1Status

60

61

# Create a message payload

62

payload = BrokerMessageV1Payload(

63

content={"user_id": "123", "name": "John Doe"},

64

headers={"content-type": "application/json"},

65

status=BrokerMessageV1Status.SUCCESS

66

)

67

68

# Create a broker message

69

message = BrokerMessageV1(

70

topic="user.created",

71

payload=payload,

72

strategy=BrokerMessageV1Strategy.MULTICAST

73

)

74

75

# Check message properties

76

print(f"Topic: {message.topic}")

77

print(f"Content: {message.content}")

78

print(f"Is OK: {message.ok}")

79

```

80

81

### Broker Client

82

83

High-level client interface for broker communication with sending and receiving capabilities.

84

85

```python { .api }

86

class BrokerClient:

87

def __init__(self, topic: str, publisher: BrokerPublisher, subscriber: BrokerSubscriber): ...

88

topic: str

89

publisher: BrokerPublisher

90

subscriber: BrokerSubscriber

91

@classmethod

92

def _from_config(cls, config: Config, **kwargs) -> BrokerClient: ...

93

async def send(self, message: BrokerMessage) -> None: ...

94

async def receive(self, *args, **kwargs) -> BrokerMessage: ...

95

async def receive_many(self, count: int, timeout: float = 60, **kwargs) -> AsyncIterator[BrokerMessage]: ...

96

97

class BrokerClientPool:

98

def __init__(self, instance_kwargs: dict[str, Any], maxsize: int = 5): ...

99

@classmethod

100

def _from_config(cls, config: Config, **kwargs) -> BrokerClientPool: ...

101

def acquire(self, *args, **kwargs) -> AsyncContextManager: ...

102

```

103

104

**Usage Examples:**

105

106

```python

107

from minos.networks import BrokerClient, BrokerMessageV1

108

from minos.common import Config

109

110

# Create client from configuration

111

config = Config("config.yml")

112

client = BrokerClient._from_config(config, topic="user.events")

113

114

# Send a message

115

message = BrokerMessageV1("user.created", payload=payload)

116

await client.send(message)

117

118

# Receive messages

119

message = await client.receive()

120

print(f"Received: {message.content}")

121

122

# Receive multiple messages

123

async for message in client.receive_many(count=10, timeout=30):

124

print(f"Processing: {message.content}")

125

126

# Using client pool

127

pool = BrokerClientPool._from_config(config, maxsize=10)

128

async with pool.acquire() as client:

129

await client.send(message)

130

```

131

132

### Publishers

133

134

Message publishers with various implementations for different use cases.

135

136

```python { .api }

137

class BrokerPublisher:

138

async def send(self, message: BrokerMessage) -> None: ...

139

140

class BrokerPublisherBuilder:

141

def __init__(self, *, queue_builder: Optional[Builder] = None, queued_cls: Optional[type[QueuedBrokerPublisher]] = None): ...

142

def with_queued_cls(self, queued_cls: type[QueuedBrokerPublisher]) -> BrokerPublisherBuilder: ...

143

def with_config(self, config: Config) -> BrokerPublisherBuilder: ...

144

def with_queue(self, queue: Union[type[BrokerPublisherQueue], Builder[BrokerPublisherQueue]]) -> BrokerPublisherBuilder: ...

145

def with_kwargs(self, kwargs: dict[str, Any]) -> BrokerPublisherBuilder: ...

146

def build(self) -> BrokerPublisher: ...

147

148

class InMemoryBrokerPublisher(BrokerPublisher):

149

messages: list[BrokerMessage]

150

async def _send(self, message: BrokerMessage) -> None: ...

151

152

class QueuedBrokerPublisher(BrokerPublisher):

153

"""Publisher with queue support for reliable delivery"""

154

155

class DatabaseBrokerPublisherQueue:

156

"""Database-backed publisher queue for persistence"""

157

158

class InMemoryBrokerPublisherQueue:

159

"""In-memory publisher queue for testing"""

160

161

class BrokerPublisherBuilder:

162

"""Builder for creating configured broker publishers with dependency injection"""

163

def with_config(self, config: Config) -> BrokerPublisherBuilder: ...

164

def with_queued_cls(self, queued_cls: type[QueuedBrokerPublisher]) -> BrokerPublisherBuilder: ...

165

def build(self) -> BrokerPublisher: ...

166

```

167

168

**Usage Examples:**

169

170

```python

171

# Using in-memory publisher for testing

172

publisher = InMemoryBrokerPublisher()

173

await publisher.send(message)

174

print(f"Sent messages: {len(publisher.messages)}")

175

176

# Building a custom publisher

177

builder = BrokerPublisherBuilder()

178

publisher = (builder

179

.with_config(config)

180

.with_queued_cls(QueuedBrokerPublisher)

181

.build())

182

```

183

184

### Subscribers

185

186

Message subscribers with filtering, validation, and queue support.

187

188

```python { .api }

189

class BrokerSubscriber:

190

def __init__(self, topics: Iterable[str]): ...

191

topics: set[str]

192

def __aiter__(self) -> AsyncIterator[BrokerMessage]: ...

193

async def __anext__(self) -> BrokerMessage: ...

194

async def receive(self) -> BrokerMessage: ...

195

196

class BrokerSubscriberBuilder:

197

def __init__(self, *, validator_builder: Optional[Builder] = None, queue_builder: Optional[BrokerSubscriberQueueBuilder] = None, filtered_cls: Optional[type[FilteredBrokerSubscriber]] = None, queued_cls: Optional[type[QueuedBrokerSubscriber]] = None): ...

198

def with_filtered_cls(self, filtered_cls: type[FilteredBrokerSubscriber]) -> BrokerSubscriberBuilder: ...

199

def with_queued_cls(self, queued_cls: type[QueuedBrokerSubscriber]) -> BrokerSubscriberBuilder: ...

200

def with_config(self, config: Config) -> BrokerSubscriberBuilder: ...

201

def with_validator(self, validator: Union[type[BrokerSubscriberValidator], Builder[BrokerSubscriberValidator]]) -> BrokerSubscriberBuilder: ...

202

def with_queue(self, queue: Union[type[BrokerSubscriberQueue], BrokerSubscriberQueueBuilder]) -> BrokerSubscriberBuilder: ...

203

def with_group_id(self, group_id: Optional[str]) -> BrokerSubscriberBuilder: ...

204

def with_remove_topics_on_destroy(self, remove_topics_on_destroy: bool) -> BrokerSubscriberBuilder: ...

205

def with_topics(self, topics: Iterable[str]) -> BrokerSubscriberBuilder: ...

206

def build(self) -> BrokerSubscriber: ...

207

208

class InMemoryBrokerSubscriber(BrokerSubscriber):

209

"""In-memory subscriber implementation for testing"""

210

211

class FilteredBrokerSubscriber(BrokerSubscriber):

212

"""Subscriber with message filtering and validation"""

213

214

class QueuedBrokerSubscriber(BrokerSubscriber):

215

"""Subscriber with queue support for reliable processing"""

216

```

217

218

**Usage Examples:**

219

220

```python

221

# Create subscriber for specific topics

222

subscriber = InMemoryBrokerSubscriber(topics=["user.created", "user.updated"])

223

224

# Iterate over messages

225

async for message in subscriber:

226

print(f"Received message: {message.content}")

227

if message.topic == "user.created":

228

# Handle user creation

229

pass

230

231

# Build custom subscriber with validation

232

builder = BrokerSubscriberBuilder()

233

subscriber = (builder

234

.with_topics(["user.*"])

235

.with_filtered_cls(FilteredBrokerSubscriber)

236

.with_group_id("user-service")

237

.build())

238

```

239

240

### Message Handlers

241

242

Handler services for processing broker messages with concurrency control.

243

244

```python { .api }

245

class BrokerHandler:

246

def __init__(self, dispatcher: BrokerDispatcher, subscriber: BrokerSubscriber, concurrency: int = 5): ...

247

@classmethod

248

def _from_config(cls, config: Config, **kwargs) -> BrokerHandler: ...

249

async def run(self) -> NoReturn: ...

250

251

class BrokerPort:

252

handler: BrokerHandler

253

async def _start(self) -> None: ...

254

async def _stop(self, err: Exception = None) -> None: ...

255

256

class BrokerHandlerService:

257

"""Deprecated - use BrokerPort instead"""

258

```

259

260

**Usage Examples:**

261

262

```python

263

# Create handler with dispatcher and subscriber

264

handler = BrokerHandler(

265

dispatcher=dispatcher,

266

subscriber=subscriber,

267

concurrency=10

268

)

269

270

# Run handler (blocks until cancelled)

271

await handler.run()

272

273

# Using BrokerPort for lifecycle management

274

port = BrokerPort._from_config(config)

275

await port.start()

276

# ... service runs

277

await port.stop()

278

```

279

280

### Message Dispatchers

281

282

Dispatchers that route messages to appropriate handler functions.

283

284

```python { .api }

285

class BrokerDispatcher:

286

def __init__(self, actions: dict[str, Optional[Callable]], publisher: BrokerPublisher): ...

287

@classmethod

288

def _from_config(cls, config: Config, **kwargs) -> BrokerDispatcher: ...

289

publisher: BrokerPublisher

290

actions: dict[str, Optional[Callable]]

291

async def dispatch(self, message: BrokerMessage) -> None: ...

292

def get_action(self, topic: str) -> Callable: ...

293

@staticmethod

294

def get_callback(fn: Callable) -> Callable: ...

295

296

class BrokerRequest:

297

def __init__(self, raw: BrokerMessage): ...

298

raw: BrokerMessage

299

user: Optional[UUID]

300

headers: dict[str, str]

301

has_content: bool

302

has_params: bool

303

async def _content(self, **kwargs) -> Any: ...

304

305

class BrokerResponse:

306

"""Response class for broker handlers"""

307

308

class BrokerResponseException:

309

"""Exception class for broker response errors"""

310

```

311

312

**Usage Examples:**

313

314

```python

315

# Create dispatcher with topic mappings

316

actions = {

317

"user.create": create_user_handler,

318

"user.update": update_user_handler,

319

"user.delete": delete_user_handler

320

}

321

322

dispatcher = BrokerDispatcher(actions=actions, publisher=publisher)

323

324

# Dispatch a message

325

message = BrokerMessageV1(topic="user.create", payload=payload)

326

await dispatcher.dispatch(message)

327

328

# Get handler for a topic

329

handler = dispatcher.get_action("user.create")

330

```

331

332

### Queue Management

333

334

Queue implementations for reliable message processing.

335

336

```python { .api }

337

class BrokerQueue:

338

"""Abstract base for broker queues"""

339

340

class DatabaseBrokerQueue(BrokerQueue):

341

"""Database-backed queue for persistence"""

342

343

class InMemoryBrokerQueue(BrokerQueue):

344

"""In-memory queue for testing"""

345

346

class BrokerSubscriberQueue:

347

"""Queue specifically for subscriber implementations"""

348

349

class BrokerSubscriberValidator:

350

"""Validator for subscriber message processing"""

351

352

class BrokerSubscriberDuplicateValidator:

353

"""Prevents duplicate message processing"""

354

```

355

356

### Context Variables

357

358

Context variables for passing request metadata across async boundaries.

359

360

```python { .api }

361

from contextvars import ContextVar

362

363

REQUEST_HEADERS_CONTEXT_VAR: ContextVar[Optional[dict[str, str]]]

364

REQUEST_REPLY_TOPIC_CONTEXT_VAR: ContextVar[Optional[str]]

365

```

366

367

**Usage Examples:**

368

369

```python

370

from minos.networks import REQUEST_HEADERS_CONTEXT_VAR, REQUEST_REPLY_TOPIC_CONTEXT_VAR

371

372

# Set context variables

373

headers = {"user-id": "123", "trace-id": "abc"}

374

REQUEST_HEADERS_CONTEXT_VAR.set(headers)

375

REQUEST_REPLY_TOPIC_CONTEXT_VAR.set("user.created.reply")

376

377

# Access in handler functions

378

def my_handler(request):

379

headers = REQUEST_HEADERS_CONTEXT_VAR.get({})

380

reply_topic = REQUEST_REPLY_TOPIC_CONTEXT_VAR.get()

381

# Use context data

382

```

383

384

## Advanced Usage

385

386

### Complete Broker Service Setup

387

388

```python

389

from minos.networks import (

390

BrokerHandler, BrokerDispatcher, BrokerSubscriber,

391

BrokerPublisher, BrokerClient, enroute

392

)

393

from minos.common import Config

394

395

class UserService:

396

@enroute.broker.command("user.create")

397

async def create_user(self, request: BrokerRequest) -> BrokerResponse:

398

user_data = await request.content()

399

# Create user logic

400

return BrokerResponse({"id": "123", "status": "created"})

401

402

@enroute.broker.event("user.created")

403

async def handle_user_created(self, request: BrokerRequest) -> BrokerResponse:

404

event_data = await request.content()

405

# Handle event

406

return BrokerResponse({"processed": True})

407

408

# Setup broker infrastructure

409

config = Config("config.yml")

410

publisher = BrokerPublisher._from_config(config)

411

subscriber = BrokerSubscriber._from_config(config, topics=["user.*"])

412

413

# Create dispatcher with service handlers

414

from minos.networks import EnrouteFactory

415

factory = EnrouteFactory(UserService)

416

actions = factory.get_broker_command_query_event()

417

418

dispatcher = BrokerDispatcher(actions=actions, publisher=publisher)

419

handler = BrokerHandler(dispatcher=dispatcher, subscriber=subscriber, concurrency=5)

420

421

# Run the handler

422

await handler.run()

423

```

424

425

### Message Publishing Patterns

426

427

```python

428

# Simple publish

429

client = BrokerClient._from_config(config, topic="notifications")

430

message = BrokerMessageV1("email.send", payload=email_payload)

431

await client.send(message)

432

433

# Request-reply pattern

434

reply_message = BrokerMessageV1(

435

topic="user.get",

436

payload=query_payload,

437

reply_topic="user.get.reply"

438

)

439

await client.send(reply_message)

440

441

# Listen for reply

442

reply_client = BrokerClient._from_config(config, topic="user.get.reply")

443

response = await reply_client.receive()

444

```

445

446

### Error Handling

447

448

```python

449

from minos.networks import BrokerResponseException

450

451

@enroute.broker.command("user.create")

452

async def create_user(request: BrokerRequest) -> BrokerResponse:

453

try:

454

user_data = await request.content()

455

if not user_data.get("email"):

456

raise BrokerResponseException("Email is required", status=400)

457

# Process creation

458

return BrokerResponse({"status": "created"})

459

except Exception as e:

460

raise BrokerResponseException(f"Creation failed: {e}", status=500)

461

```