0
# Message System
1
2
Request/response infrastructure for microservice communication with status tracking and service relationship management. This module provides the messaging primitives that enable saga steps to communicate with remote services in a structured and trackable manner.
3
4
## Capabilities
5
6
### Saga Request Messages
7
8
Request messages for remote saga operations targeting specific microservices.
9
10
```python { .api }
11
class SagaRequest:
12
"""
13
Request message for remote saga operations.
14
15
Represents a request to be sent to a remote microservice as part
16
of saga execution. Contains target service information and payload.
17
18
Attributes:
19
target (str): Target microservice/endpoint identifier
20
_content (Any): Request payload data
21
"""
22
def __init__(self, target, content=None):
23
"""
24
Initialize request with target and content.
25
26
Args:
27
target (str): Target service identifier (e.g., "payment-service", "inventory-service")
28
content (Optional[Any]): Request payload data
29
30
Example:
31
request = SagaRequest(
32
target="payment-service",
33
content={"amount": 99.99, "currency": "USD"}
34
)
35
"""
36
37
async def content(self, **kwargs):
38
"""
39
Get request content asynchronously.
40
41
Args:
42
**kwargs: Additional parameters for content retrieval
43
44
Returns:
45
Any: Request payload content
46
47
Example:
48
payload = await request.content()
49
"""
50
```
51
52
### Saga Response Messages
53
54
Response messages from remote saga operations with status and metadata.
55
56
```python { .api }
57
from enum import IntEnum
58
59
class SagaResponse:
60
"""
61
Response message from remote saga operations.
62
63
Contains the response data, status, and metadata from remote
64
microservice calls, including service relationship tracking.
65
66
Attributes:
67
ok (bool): Whether response status is SUCCESS
68
status (SagaResponseStatus): Response status code
69
related_services (set[str]): Set of related microservice names
70
uuid (UUID): Saga execution UUID this response belongs to
71
_content (Any): Response payload data
72
"""
73
def __init__(self, content=None, related_services=None, status=None, uuid=None, **kwargs):
74
"""
75
Initialize response with content and metadata.
76
77
Args:
78
content (Optional[Any]): Response payload data
79
related_services (Optional[set[str]]): Related service names
80
status (Optional[SagaResponseStatus]): Response status
81
uuid (Optional[UUID]): Saga execution identifier
82
83
Example:
84
response = SagaResponse(
85
content={"payment_id": "pay_123", "status": "completed"},
86
status=SagaResponseStatus.SUCCESS,
87
related_services={"payment-service"}
88
)
89
"""
90
91
@classmethod
92
def from_message(cls, message):
93
"""
94
Build response from BrokerMessage.
95
96
Args:
97
message: Broker message to convert
98
99
Returns:
100
SagaResponse: Constructed response instance
101
102
Example:
103
response = SagaResponse.from_message(broker_message)
104
"""
105
106
async def content(self, **kwargs):
107
"""
108
Get response content asynchronously.
109
110
Args:
111
**kwargs: Additional parameters for content retrieval
112
113
Returns:
114
Any: Response payload content
115
116
Example:
117
payload = await response.content()
118
payment_id = payload["payment_id"]
119
"""
120
121
class SagaResponseStatus(IntEnum):
122
"""
123
HTTP-like status codes for saga responses.
124
125
Values:
126
SUCCESS (200): Successful operation
127
ERROR (400): Client/business logic error
128
SYSTEM_ERROR (500): System/infrastructure error
129
"""
130
SUCCESS = 200
131
ERROR = 400
132
SYSTEM_ERROR = 500
133
```
134
135
## Usage Examples
136
137
### Creating and Sending Requests
138
139
```python
140
from minos.saga import SagaRequest, SagaContext
141
142
def create_payment_request(context):
143
"""Create a payment request for remote service."""
144
return SagaRequest(
145
target="payment-service",
146
content={
147
"order_id": context.order_id,
148
"amount": context.total,
149
"currency": context.get("currency", "USD"),
150
"customer_id": context.customer_id,
151
"payment_method": context.payment_method
152
}
153
)
154
155
def create_inventory_request(context):
156
"""Create an inventory reservation request."""
157
return SagaRequest(
158
target="inventory-service",
159
content={
160
"items": [
161
{
162
"sku": item["sku"],
163
"quantity": item["quantity"],
164
"warehouse": item.get("warehouse", "default")
165
}
166
for item in context.items
167
],
168
"order_id": context.order_id,
169
"priority": "high" if context.customer.get("tier") == "premium" else "normal"
170
}
171
)
172
173
def create_shipping_request(context):
174
"""Create a shipping request with address validation."""
175
return SagaRequest(
176
target="shipping-service",
177
content={
178
"order_id": context.order_id,
179
"items": context.items,
180
"destination": context.shipping_address,
181
"method": context.shipping_method,
182
"insurance": context.get("insurance_required", False)
183
}
184
)
185
```
186
187
### Handling Response Messages
188
189
```python
190
from minos.saga import SagaResponse, SagaResponseStatus, SagaContext
191
192
def handle_payment_success(context, response):
193
"""Handle successful payment response."""
194
if not response.ok:
195
raise ValueError(f"Expected successful response, got status: {response.status}")
196
197
# Extract payment details from response
198
payment_data = await response.content()
199
200
# Update context with payment information
201
context.payment_id = payment_data["payment_id"]
202
context.transaction_id = payment_data["transaction_id"]
203
context.payment_status = "completed"
204
context.charged_amount = payment_data["charged_amount"]
205
206
return context
207
208
def handle_payment_error(context, response):
209
"""Handle payment error response."""
210
error_data = await response.content()
211
212
# Log error details
213
print(f"Payment failed: {error_data.get('error_message')}")
214
215
# Update context with error information
216
context.payment_error = error_data.get("error_code")
217
context.payment_status = "failed"
218
219
# Return exception to trigger rollback
220
return Exception(f"Payment failed: {error_data.get('error_message')}")
221
222
def handle_inventory_success(context, response):
223
"""Handle successful inventory reservation."""
224
inventory_data = await response.content()
225
226
# Update context with reservation details
227
context.reservation_id = inventory_data["reservation_id"]
228
context.reserved_items = inventory_data["reserved_items"]
229
context.inventory_status = "reserved"
230
context.expiry_time = inventory_data.get("expiry_time")
231
232
return context
233
234
def handle_inventory_error(context, response):
235
"""Handle inventory shortage or error."""
236
error_data = await response.content()
237
238
if response.status == SagaResponseStatus.ERROR:
239
# Business logic error (e.g., insufficient inventory)
240
context.inventory_error = error_data.get("error_code")
241
context.unavailable_items = error_data.get("unavailable_items", [])
242
243
# Could return modified context to continue with partial order
244
if error_data.get("partial_available"):
245
context.items = error_data["available_items"]
246
return context
247
else:
248
return Exception("Insufficient inventory")
249
250
elif response.status == SagaResponseStatus.SYSTEM_ERROR:
251
# System error - should retry
252
return Exception("Inventory service unavailable")
253
```
254
255
### Status-Based Response Handling
256
257
```python
258
from minos.saga import SagaResponseStatus
259
260
def comprehensive_response_handler(context, response):
261
"""Handle response based on status code."""
262
263
if response.status == SagaResponseStatus.SUCCESS:
264
# Successful operation
265
data = await response.content()
266
context.update(data)
267
return context
268
269
elif response.status == SagaResponseStatus.ERROR:
270
# Business logic error - handle gracefully
271
error_data = await response.content()
272
error_code = error_data.get("error_code")
273
274
if error_code == "INSUFFICIENT_FUNDS":
275
context.payment_error = "insufficient_funds"
276
return Exception("Customer has insufficient funds")
277
278
elif error_code == "INVALID_CARD":
279
context.payment_error = "invalid_card"
280
return Exception("Payment method is invalid")
281
282
elif error_code == "LIMIT_EXCEEDED":
283
context.payment_error = "limit_exceeded"
284
return Exception("Transaction exceeds limit")
285
286
else:
287
# Generic business error
288
return Exception(f"Business error: {error_data.get('message')}")
289
290
elif response.status == SagaResponseStatus.SYSTEM_ERROR:
291
# System/infrastructure error - should trigger retry logic
292
error_data = await response.content()
293
return Exception(f"System error: {error_data.get('message', 'Unknown system error')}")
294
295
else:
296
# Unknown status
297
return Exception(f"Unknown response status: {response.status}")
298
```
299
300
### Service Relationship Tracking
301
302
```python
303
def handle_response_with_service_tracking(context, response):
304
"""Handle response and track related services."""
305
306
# Check which services are related to this response
307
if response.related_services:
308
context.involved_services = context.get("involved_services", set())
309
context.involved_services.update(response.related_services)
310
311
print(f"Services involved so far: {context.involved_services}")
312
313
# Process response data
314
data = await response.content()
315
316
# Example: Payment service might involve fraud detection service
317
if "fraud-check-service" in response.related_services:
318
context.fraud_check_id = data.get("fraud_check_id")
319
context.risk_score = data.get("risk_score")
320
321
# Example: Inventory service might involve warehouse management
322
if "warehouse-service" in response.related_services:
323
context.warehouse_location = data.get("warehouse")
324
context.pick_list_id = data.get("pick_list_id")
325
326
return context
327
```
328
329
### Complex Request/Response Workflows
330
331
```python
332
from minos.saga import Saga, SagaRequest, SagaResponse
333
334
def create_multi_service_saga():
335
"""Create saga with multiple service interactions."""
336
saga = Saga()
337
338
# Step 1: Validate customer and pricing
339
saga.remote_step() \
340
.on_execute(create_validation_request) \
341
.on_success(handle_validation_success) \
342
.on_error(handle_validation_error)
343
344
# Step 2: Process payment with fraud check
345
saga.remote_step() \
346
.on_execute(create_payment_request) \
347
.on_success(handle_payment_with_fraud_check) \
348
.on_error(handle_payment_error) \
349
.on_failure(create_refund_request)
350
351
# Step 3: Reserve inventory across multiple warehouses
352
saga.remote_step() \
353
.on_execute(create_multi_warehouse_request) \
354
.on_success(handle_inventory_allocation) \
355
.on_error(handle_inventory_shortage) \
356
.on_failure(create_inventory_release_request)
357
358
return saga.commit()
359
360
def create_validation_request(context):
361
"""Request validation from multiple services."""
362
return SagaRequest(
363
target="validation-service",
364
content={
365
"customer_id": context.customer_id,
366
"items": context.items,
367
"shipping_address": context.shipping_address,
368
"billing_address": context.billing_address,
369
"checks": ["customer_status", "address_validation", "pricing"]
370
}
371
)
372
373
def handle_validation_success(context, response):
374
"""Process validation results from multiple checks."""
375
validation_data = await response.content()
376
377
# Update context with validated data
378
context.validated_customer = validation_data["customer_status"]
379
context.validated_addresses = validation_data["address_validation"]
380
context.final_pricing = validation_data["pricing"]
381
382
# Check if any validation failed
383
if not all([
384
validation_data["customer_status"]["valid"],
385
validation_data["address_validation"]["valid"],
386
validation_data["pricing"]["valid"]
387
]):
388
failed_checks = [
389
check for check, result in validation_data.items()
390
if not result.get("valid", False)
391
]
392
return Exception(f"Validation failed for: {', '.join(failed_checks)}")
393
394
return context
395
396
def create_multi_warehouse_request(context):
397
"""Create request for multi-warehouse inventory allocation."""
398
return SagaRequest(
399
target="inventory-allocation-service",
400
content={
401
"items": context.items,
402
"customer_location": context.shipping_address,
403
"priority": context.customer.get("tier", "standard"),
404
"allocation_strategy": "cost_optimized",
405
"max_warehouses": 3
406
}
407
)
408
409
def handle_inventory_allocation(context, response):
410
"""Handle complex inventory allocation response."""
411
allocation_data = await response.content()
412
413
# Store allocation details
414
context.allocations = allocation_data["allocations"]
415
context.total_warehouses = len(allocation_data["warehouses_used"])
416
context.estimated_shipping_cost = allocation_data["shipping_estimate"]
417
418
# Track all involved warehouse services
419
warehouse_services = {f"warehouse-{wh['id']}" for wh in allocation_data["warehouses_used"]}
420
context.warehouse_services = warehouse_services
421
422
return context
423
```
424
425
### Error Recovery and Compensation
426
427
```python
428
def create_compensation_request(context):
429
"""Create compensation request for failed operation."""
430
if hasattr(context, 'payment_id'):
431
# Refund payment
432
return SagaRequest(
433
target="payment-service",
434
content={
435
"action": "refund",
436
"payment_id": context.payment_id,
437
"amount": context.charged_amount,
438
"reason": "saga_rollback"
439
}
440
)
441
else:
442
# No payment to refund
443
return None
444
445
def create_inventory_release_request(context):
446
"""Create request to release reserved inventory."""
447
if hasattr(context, 'reservation_id'):
448
return SagaRequest(
449
target="inventory-service",
450
content={
451
"action": "release_reservation",
452
"reservation_id": context.reservation_id,
453
"reason": "saga_rollback"
454
}
455
)
456
else:
457
return None
458
459
def handle_compensation_response(context, response):
460
"""Handle compensation operation response."""
461
if response.ok:
462
compensation_data = await response.content()
463
context.compensation_completed = True
464
context.compensation_id = compensation_data.get("compensation_id")
465
return context
466
else:
467
# Compensation failed - log but don't fail saga
468
error_data = await response.content()
469
context.compensation_failed = True
470
context.compensation_error = error_data.get("error_message")
471
print(f"Compensation failed: {context.compensation_error}")
472
return context
473
```
474
475
### Request Content Patterns
476
477
```python
478
import json
479
from datetime import datetime, timezone
480
481
def create_timestamped_request(context, target, base_content):
482
"""Create request with timestamp and tracking info."""
483
content = {
484
**base_content,
485
"timestamp": datetime.now(timezone.utc).isoformat(),
486
"saga_execution_id": str(context.get("saga_uuid", "")),
487
"correlation_id": context.get("correlation_id"),
488
"source_service": "order-orchestrator"
489
}
490
491
return SagaRequest(target=target, content=content)
492
493
def create_batched_request(context, target, items):
494
"""Create request for batch processing."""
495
return SagaRequest(
496
target=target,
497
content={
498
"batch_id": f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
499
"items": items,
500
"batch_size": len(items),
501
"processing_options": {
502
"parallel": True,
503
"fail_fast": False,
504
"timeout": 300
505
}
506
}
507
)
508
509
def create_conditional_request(context):
510
"""Create request with different content based on context."""
511
base_content = {
512
"customer_id": context.customer_id,
513
"order_id": context.order_id
514
}
515
516
# Conditional content based on customer tier
517
if context.customer.get("tier") == "premium":
518
content = {
519
**base_content,
520
"priority": "high",
521
"express_processing": True,
522
"dedicated_support": True
523
}
524
target = "premium-processing-service"
525
else:
526
content = {
527
**base_content,
528
"priority": "normal",
529
"standard_processing": True
530
}
531
target = "standard-processing-service"
532
533
return SagaRequest(target=target, content=content)
534
```