or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

context-management.mdexception-handling.mdexecution-engine.mdindex.mdmessage-system.mdsaga-definitions.mdtesting-utilities.md

message-system.mddocs/

0

# Message System

1

2

Request/response infrastructure for microservice communication with status tracking and service relationship management. This module provides the messaging primitives that enable saga steps to communicate with remote services in a structured and trackable manner.

3

4

## Capabilities

5

6

### Saga Request Messages

7

8

Request messages for remote saga operations targeting specific microservices.

9

10

```python { .api }

11

class SagaRequest:

12

"""

13

Request message for remote saga operations.

14

15

Represents a request to be sent to a remote microservice as part

16

of saga execution. Contains target service information and payload.

17

18

Attributes:

19

target (str): Target microservice/endpoint identifier

20

_content (Any): Request payload data

21

"""

22

def __init__(self, target, content=None):

23

"""

24

Initialize request with target and content.

25

26

Args:

27

target (str): Target service identifier (e.g., "payment-service", "inventory-service")

28

content (Optional[Any]): Request payload data

29

30

Example:

31

request = SagaRequest(

32

target="payment-service",

33

content={"amount": 99.99, "currency": "USD"}

34

)

35

"""

36

37

async def content(self, **kwargs):

38

"""

39

Get request content asynchronously.

40

41

Args:

42

**kwargs: Additional parameters for content retrieval

43

44

Returns:

45

Any: Request payload content

46

47

Example:

48

payload = await request.content()

49

"""

50

```

51

52

### Saga Response Messages

53

54

Response messages from remote saga operations with status and metadata.

55

56

```python { .api }

57

from enum import IntEnum

58

59

class SagaResponse:

60

"""

61

Response message from remote saga operations.

62

63

Contains the response data, status, and metadata from remote

64

microservice calls, including service relationship tracking.

65

66

Attributes:

67

ok (bool): Whether response status is SUCCESS

68

status (SagaResponseStatus): Response status code

69

related_services (set[str]): Set of related microservice names

70

uuid (UUID): Saga execution UUID this response belongs to

71

_content (Any): Response payload data

72

"""

73

def __init__(self, content=None, related_services=None, status=None, uuid=None, **kwargs):

74

"""

75

Initialize response with content and metadata.

76

77

Args:

78

content (Optional[Any]): Response payload data

79

related_services (Optional[set[str]]): Related service names

80

status (Optional[SagaResponseStatus]): Response status

81

uuid (Optional[UUID]): Saga execution identifier

82

83

Example:

84

response = SagaResponse(

85

content={"payment_id": "pay_123", "status": "completed"},

86

status=SagaResponseStatus.SUCCESS,

87

related_services={"payment-service"}

88

)

89

"""

90

91

@classmethod

92

def from_message(cls, message):

93

"""

94

Build response from BrokerMessage.

95

96

Args:

97

message: Broker message to convert

98

99

Returns:

100

SagaResponse: Constructed response instance

101

102

Example:

103

response = SagaResponse.from_message(broker_message)

104

"""

105

106

async def content(self, **kwargs):

107

"""

108

Get response content asynchronously.

109

110

Args:

111

**kwargs: Additional parameters for content retrieval

112

113

Returns:

114

Any: Response payload content

115

116

Example:

117

payload = await response.content()

118

payment_id = payload["payment_id"]

119

"""

120

121

class SagaResponseStatus(IntEnum):

122

"""

123

HTTP-like status codes for saga responses.

124

125

Values:

126

SUCCESS (200): Successful operation

127

ERROR (400): Client/business logic error

128

SYSTEM_ERROR (500): System/infrastructure error

129

"""

130

SUCCESS = 200

131

ERROR = 400

132

SYSTEM_ERROR = 500

133

```

134

135

## Usage Examples

136

137

### Creating and Sending Requests

138

139

```python

140

from minos.saga import SagaRequest, SagaContext

141

142

def create_payment_request(context):

143

"""Create a payment request for remote service."""

144

return SagaRequest(

145

target="payment-service",

146

content={

147

"order_id": context.order_id,

148

"amount": context.total,

149

"currency": context.get("currency", "USD"),

150

"customer_id": context.customer_id,

151

"payment_method": context.payment_method

152

}

153

)

154

155

def create_inventory_request(context):

156

"""Create an inventory reservation request."""

157

return SagaRequest(

158

target="inventory-service",

159

content={

160

"items": [

161

{

162

"sku": item["sku"],

163

"quantity": item["quantity"],

164

"warehouse": item.get("warehouse", "default")

165

}

166

for item in context.items

167

],

168

"order_id": context.order_id,

169

"priority": "high" if context.customer.get("tier") == "premium" else "normal"

170

}

171

)

172

173

def create_shipping_request(context):

174

"""Create a shipping request with address validation."""

175

return SagaRequest(

176

target="shipping-service",

177

content={

178

"order_id": context.order_id,

179

"items": context.items,

180

"destination": context.shipping_address,

181

"method": context.shipping_method,

182

"insurance": context.get("insurance_required", False)

183

}

184

)

185

```

186

187

### Handling Response Messages

188

189

```python

190

from minos.saga import SagaResponse, SagaResponseStatus, SagaContext

191

192

def handle_payment_success(context, response):

193

"""Handle successful payment response."""

194

if not response.ok:

195

raise ValueError(f"Expected successful response, got status: {response.status}")

196

197

# Extract payment details from response

198

payment_data = await response.content()

199

200

# Update context with payment information

201

context.payment_id = payment_data["payment_id"]

202

context.transaction_id = payment_data["transaction_id"]

203

context.payment_status = "completed"

204

context.charged_amount = payment_data["charged_amount"]

205

206

return context

207

208

def handle_payment_error(context, response):

209

"""Handle payment error response."""

210

error_data = await response.content()

211

212

# Log error details

213

print(f"Payment failed: {error_data.get('error_message')}")

214

215

# Update context with error information

216

context.payment_error = error_data.get("error_code")

217

context.payment_status = "failed"

218

219

# Return exception to trigger rollback

220

return Exception(f"Payment failed: {error_data.get('error_message')}")

221

222

def handle_inventory_success(context, response):

223

"""Handle successful inventory reservation."""

224

inventory_data = await response.content()

225

226

# Update context with reservation details

227

context.reservation_id = inventory_data["reservation_id"]

228

context.reserved_items = inventory_data["reserved_items"]

229

context.inventory_status = "reserved"

230

context.expiry_time = inventory_data.get("expiry_time")

231

232

return context

233

234

def handle_inventory_error(context, response):

235

"""Handle inventory shortage or error."""

236

error_data = await response.content()

237

238

if response.status == SagaResponseStatus.ERROR:

239

# Business logic error (e.g., insufficient inventory)

240

context.inventory_error = error_data.get("error_code")

241

context.unavailable_items = error_data.get("unavailable_items", [])

242

243

# Could return modified context to continue with partial order

244

if error_data.get("partial_available"):

245

context.items = error_data["available_items"]

246

return context

247

else:

248

return Exception("Insufficient inventory")

249

250

elif response.status == SagaResponseStatus.SYSTEM_ERROR:

251

# System error - should retry

252

return Exception("Inventory service unavailable")

253

```

254

255

### Status-Based Response Handling

256

257

```python

258

from minos.saga import SagaResponseStatus

259

260

def comprehensive_response_handler(context, response):

261

"""Handle response based on status code."""

262

263

if response.status == SagaResponseStatus.SUCCESS:

264

# Successful operation

265

data = await response.content()

266

context.update(data)

267

return context

268

269

elif response.status == SagaResponseStatus.ERROR:

270

# Business logic error - handle gracefully

271

error_data = await response.content()

272

error_code = error_data.get("error_code")

273

274

if error_code == "INSUFFICIENT_FUNDS":

275

context.payment_error = "insufficient_funds"

276

return Exception("Customer has insufficient funds")

277

278

elif error_code == "INVALID_CARD":

279

context.payment_error = "invalid_card"

280

return Exception("Payment method is invalid")

281

282

elif error_code == "LIMIT_EXCEEDED":

283

context.payment_error = "limit_exceeded"

284

return Exception("Transaction exceeds limit")

285

286

else:

287

# Generic business error

288

return Exception(f"Business error: {error_data.get('message')}")

289

290

elif response.status == SagaResponseStatus.SYSTEM_ERROR:

291

# System/infrastructure error - should trigger retry logic

292

error_data = await response.content()

293

return Exception(f"System error: {error_data.get('message', 'Unknown system error')}")

294

295

else:

296

# Unknown status

297

return Exception(f"Unknown response status: {response.status}")

298

```

299

300

### Service Relationship Tracking

301

302

```python

303

def handle_response_with_service_tracking(context, response):

304

"""Handle response and track related services."""

305

306

# Check which services are related to this response

307

if response.related_services:

308

context.involved_services = context.get("involved_services", set())

309

context.involved_services.update(response.related_services)

310

311

print(f"Services involved so far: {context.involved_services}")

312

313

# Process response data

314

data = await response.content()

315

316

# Example: Payment service might involve fraud detection service

317

if "fraud-check-service" in response.related_services:

318

context.fraud_check_id = data.get("fraud_check_id")

319

context.risk_score = data.get("risk_score")

320

321

# Example: Inventory service might involve warehouse management

322

if "warehouse-service" in response.related_services:

323

context.warehouse_location = data.get("warehouse")

324

context.pick_list_id = data.get("pick_list_id")

325

326

return context

327

```

328

329

### Complex Request/Response Workflows

330

331

```python

332

from minos.saga import Saga, SagaRequest, SagaResponse

333

334

def create_multi_service_saga():

335

"""Create saga with multiple service interactions."""

336

saga = Saga()

337

338

# Step 1: Validate customer and pricing

339

saga.remote_step() \

340

.on_execute(create_validation_request) \

341

.on_success(handle_validation_success) \

342

.on_error(handle_validation_error)

343

344

# Step 2: Process payment with fraud check

345

saga.remote_step() \

346

.on_execute(create_payment_request) \

347

.on_success(handle_payment_with_fraud_check) \

348

.on_error(handle_payment_error) \

349

.on_failure(create_refund_request)

350

351

# Step 3: Reserve inventory across multiple warehouses

352

saga.remote_step() \

353

.on_execute(create_multi_warehouse_request) \

354

.on_success(handle_inventory_allocation) \

355

.on_error(handle_inventory_shortage) \

356

.on_failure(create_inventory_release_request)

357

358

return saga.commit()

359

360

def create_validation_request(context):

361

"""Request validation from multiple services."""

362

return SagaRequest(

363

target="validation-service",

364

content={

365

"customer_id": context.customer_id,

366

"items": context.items,

367

"shipping_address": context.shipping_address,

368

"billing_address": context.billing_address,

369

"checks": ["customer_status", "address_validation", "pricing"]

370

}

371

)

372

373

def handle_validation_success(context, response):

374

"""Process validation results from multiple checks."""

375

validation_data = await response.content()

376

377

# Update context with validated data

378

context.validated_customer = validation_data["customer_status"]

379

context.validated_addresses = validation_data["address_validation"]

380

context.final_pricing = validation_data["pricing"]

381

382

# Check if any validation failed

383

if not all([

384

validation_data["customer_status"]["valid"],

385

validation_data["address_validation"]["valid"],

386

validation_data["pricing"]["valid"]

387

]):

388

failed_checks = [

389

check for check, result in validation_data.items()

390

if not result.get("valid", False)

391

]

392

return Exception(f"Validation failed for: {', '.join(failed_checks)}")

393

394

return context

395

396

def create_multi_warehouse_request(context):

397

"""Create request for multi-warehouse inventory allocation."""

398

return SagaRequest(

399

target="inventory-allocation-service",

400

content={

401

"items": context.items,

402

"customer_location": context.shipping_address,

403

"priority": context.customer.get("tier", "standard"),

404

"allocation_strategy": "cost_optimized",

405

"max_warehouses": 3

406

}

407

)

408

409

def handle_inventory_allocation(context, response):

410

"""Handle complex inventory allocation response."""

411

allocation_data = await response.content()

412

413

# Store allocation details

414

context.allocations = allocation_data["allocations"]

415

context.total_warehouses = len(allocation_data["warehouses_used"])

416

context.estimated_shipping_cost = allocation_data["shipping_estimate"]

417

418

# Track all involved warehouse services

419

warehouse_services = {f"warehouse-{wh['id']}" for wh in allocation_data["warehouses_used"]}

420

context.warehouse_services = warehouse_services

421

422

return context

423

```

424

425

### Error Recovery and Compensation

426

427

```python

428

def create_compensation_request(context):

429

"""Create compensation request for failed operation."""

430

if hasattr(context, 'payment_id'):

431

# Refund payment

432

return SagaRequest(

433

target="payment-service",

434

content={

435

"action": "refund",

436

"payment_id": context.payment_id,

437

"amount": context.charged_amount,

438

"reason": "saga_rollback"

439

}

440

)

441

else:

442

# No payment to refund

443

return None

444

445

def create_inventory_release_request(context):

446

"""Create request to release reserved inventory."""

447

if hasattr(context, 'reservation_id'):

448

return SagaRequest(

449

target="inventory-service",

450

content={

451

"action": "release_reservation",

452

"reservation_id": context.reservation_id,

453

"reason": "saga_rollback"

454

}

455

)

456

else:

457

return None

458

459

def handle_compensation_response(context, response):

460

"""Handle compensation operation response."""

461

if response.ok:

462

compensation_data = await response.content()

463

context.compensation_completed = True

464

context.compensation_id = compensation_data.get("compensation_id")

465

return context

466

else:

467

# Compensation failed - log but don't fail saga

468

error_data = await response.content()

469

context.compensation_failed = True

470

context.compensation_error = error_data.get("error_message")

471

print(f"Compensation failed: {context.compensation_error}")

472

return context

473

```

474

475

### Request Content Patterns

476

477

```python

478

import json

479

from datetime import datetime, timezone

480

481

def create_timestamped_request(context, target, base_content):

482

"""Create request with timestamp and tracking info."""

483

content = {

484

**base_content,

485

"timestamp": datetime.now(timezone.utc).isoformat(),

486

"saga_execution_id": str(context.get("saga_uuid", "")),

487

"correlation_id": context.get("correlation_id"),

488

"source_service": "order-orchestrator"

489

}

490

491

return SagaRequest(target=target, content=content)

492

493

def create_batched_request(context, target, items):

494

"""Create request for batch processing."""

495

return SagaRequest(

496

target=target,

497

content={

498

"batch_id": f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}",

499

"items": items,

500

"batch_size": len(items),

501

"processing_options": {

502

"parallel": True,

503

"fail_fast": False,

504

"timeout": 300

505

}

506

}

507

)

508

509

def create_conditional_request(context):

510

"""Create request with different content based on context."""

511

base_content = {

512

"customer_id": context.customer_id,

513

"order_id": context.order_id

514

}

515

516

# Conditional content based on customer tier

517

if context.customer.get("tier") == "premium":

518

content = {

519

**base_content,

520

"priority": "high",

521

"express_processing": True,

522

"dedicated_support": True

523

}

524

target = "premium-processing-service"

525

else:

526

content = {

527

**base_content,

528

"priority": "normal",

529

"standard_processing": True

530

}

531

target = "standard-processing-service"

532

533

return SagaRequest(target=target, content=content)

534

```