Implementation of the SAGA pattern for distributed microservice transactions in the Minos Framework.
—
Request/response infrastructure for microservice communication with status tracking and service relationship management. This module provides the messaging primitives that enable saga steps to communicate with remote services in a structured and trackable manner.
Request messages for remote saga operations targeting specific microservices.
class SagaRequest:
"""
Request message for remote saga operations.
Represents a request to be sent to a remote microservice as part
of saga execution. Contains target service information and payload.
Attributes:
target (str): Target microservice/endpoint identifier
_content (Any): Request payload data
"""
def __init__(self, target, content=None):
"""
Initialize request with target and content.
Args:
target (str): Target service identifier (e.g., "payment-service", "inventory-service")
content (Optional[Any]): Request payload data
Example:
request = SagaRequest(
target="payment-service",
content={"amount": 99.99, "currency": "USD"}
)
"""
async def content(self, **kwargs):
"""
Get request content asynchronously.
Args:
**kwargs: Additional parameters for content retrieval
Returns:
Any: Request payload content
Example:
payload = await request.content()
"""Response messages from remote saga operations with status and metadata.
from enum import IntEnum
class SagaResponse:
"""
Response message from remote saga operations.
Contains the response data, status, and metadata from remote
microservice calls, including service relationship tracking.
Attributes:
ok (bool): Whether response status is SUCCESS
status (SagaResponseStatus): Response status code
related_services (set[str]): Set of related microservice names
uuid (UUID): Saga execution UUID this response belongs to
_content (Any): Response payload data
"""
def __init__(self, content=None, related_services=None, status=None, uuid=None, **kwargs):
"""
Initialize response with content and metadata.
Args:
content (Optional[Any]): Response payload data
related_services (Optional[set[str]]): Related service names
status (Optional[SagaResponseStatus]): Response status
uuid (Optional[UUID]): Saga execution identifier
Example:
response = SagaResponse(
content={"payment_id": "pay_123", "status": "completed"},
status=SagaResponseStatus.SUCCESS,
related_services={"payment-service"}
)
"""
@classmethod
def from_message(cls, message):
"""
Build response from BrokerMessage.
Args:
message: Broker message to convert
Returns:
SagaResponse: Constructed response instance
Example:
response = SagaResponse.from_message(broker_message)
"""
async def content(self, **kwargs):
"""
Get response content asynchronously.
Args:
**kwargs: Additional parameters for content retrieval
Returns:
Any: Response payload content
Example:
payload = await response.content()
payment_id = payload["payment_id"]
"""
class SagaResponseStatus(IntEnum):
"""
HTTP-like status codes for saga responses.
Values:
SUCCESS (200): Successful operation
ERROR (400): Client/business logic error
SYSTEM_ERROR (500): System/infrastructure error
"""
SUCCESS = 200
ERROR = 400
SYSTEM_ERROR = 500from minos.saga import SagaRequest, SagaContext
def create_payment_request(context):
"""Create a payment request for remote service."""
return SagaRequest(
target="payment-service",
content={
"order_id": context.order_id,
"amount": context.total,
"currency": context.get("currency", "USD"),
"customer_id": context.customer_id,
"payment_method": context.payment_method
}
)
def create_inventory_request(context):
"""Create an inventory reservation request."""
return SagaRequest(
target="inventory-service",
content={
"items": [
{
"sku": item["sku"],
"quantity": item["quantity"],
"warehouse": item.get("warehouse", "default")
}
for item in context.items
],
"order_id": context.order_id,
"priority": "high" if context.customer.get("tier") == "premium" else "normal"
}
)
def create_shipping_request(context):
"""Create a shipping request with address validation."""
return SagaRequest(
target="shipping-service",
content={
"order_id": context.order_id,
"items": context.items,
"destination": context.shipping_address,
"method": context.shipping_method,
"insurance": context.get("insurance_required", False)
}
)from minos.saga import SagaResponse, SagaResponseStatus, SagaContext
def handle_payment_success(context, response):
"""Handle successful payment response."""
if not response.ok:
raise ValueError(f"Expected successful response, got status: {response.status}")
# Extract payment details from response
payment_data = await response.content()
# Update context with payment information
context.payment_id = payment_data["payment_id"]
context.transaction_id = payment_data["transaction_id"]
context.payment_status = "completed"
context.charged_amount = payment_data["charged_amount"]
return context
def handle_payment_error(context, response):
"""Handle payment error response."""
error_data = await response.content()
# Log error details
print(f"Payment failed: {error_data.get('error_message')}")
# Update context with error information
context.payment_error = error_data.get("error_code")
context.payment_status = "failed"
# Return exception to trigger rollback
return Exception(f"Payment failed: {error_data.get('error_message')}")
def handle_inventory_success(context, response):
"""Handle successful inventory reservation."""
inventory_data = await response.content()
# Update context with reservation details
context.reservation_id = inventory_data["reservation_id"]
context.reserved_items = inventory_data["reserved_items"]
context.inventory_status = "reserved"
context.expiry_time = inventory_data.get("expiry_time")
return context
def handle_inventory_error(context, response):
"""Handle inventory shortage or error."""
error_data = await response.content()
if response.status == SagaResponseStatus.ERROR:
# Business logic error (e.g., insufficient inventory)
context.inventory_error = error_data.get("error_code")
context.unavailable_items = error_data.get("unavailable_items", [])
# Could return modified context to continue with partial order
if error_data.get("partial_available"):
context.items = error_data["available_items"]
return context
else:
return Exception("Insufficient inventory")
elif response.status == SagaResponseStatus.SYSTEM_ERROR:
# System error - should retry
return Exception("Inventory service unavailable")from minos.saga import SagaResponseStatus
def comprehensive_response_handler(context, response):
"""Handle response based on status code."""
if response.status == SagaResponseStatus.SUCCESS:
# Successful operation
data = await response.content()
context.update(data)
return context
elif response.status == SagaResponseStatus.ERROR:
# Business logic error - handle gracefully
error_data = await response.content()
error_code = error_data.get("error_code")
if error_code == "INSUFFICIENT_FUNDS":
context.payment_error = "insufficient_funds"
return Exception("Customer has insufficient funds")
elif error_code == "INVALID_CARD":
context.payment_error = "invalid_card"
return Exception("Payment method is invalid")
elif error_code == "LIMIT_EXCEEDED":
context.payment_error = "limit_exceeded"
return Exception("Transaction exceeds limit")
else:
# Generic business error
return Exception(f"Business error: {error_data.get('message')}")
elif response.status == SagaResponseStatus.SYSTEM_ERROR:
# System/infrastructure error - should trigger retry logic
error_data = await response.content()
return Exception(f"System error: {error_data.get('message', 'Unknown system error')}")
else:
# Unknown status
return Exception(f"Unknown response status: {response.status}")def handle_response_with_service_tracking(context, response):
"""Handle response and track related services."""
# Check which services are related to this response
if response.related_services:
context.involved_services = context.get("involved_services", set())
context.involved_services.update(response.related_services)
print(f"Services involved so far: {context.involved_services}")
# Process response data
data = await response.content()
# Example: Payment service might involve fraud detection service
if "fraud-check-service" in response.related_services:
context.fraud_check_id = data.get("fraud_check_id")
context.risk_score = data.get("risk_score")
# Example: Inventory service might involve warehouse management
if "warehouse-service" in response.related_services:
context.warehouse_location = data.get("warehouse")
context.pick_list_id = data.get("pick_list_id")
return contextfrom minos.saga import Saga, SagaRequest, SagaResponse
def create_multi_service_saga():
"""Create saga with multiple service interactions."""
saga = Saga()
# Step 1: Validate customer and pricing
saga.remote_step() \
.on_execute(create_validation_request) \
.on_success(handle_validation_success) \
.on_error(handle_validation_error)
# Step 2: Process payment with fraud check
saga.remote_step() \
.on_execute(create_payment_request) \
.on_success(handle_payment_with_fraud_check) \
.on_error(handle_payment_error) \
.on_failure(create_refund_request)
# Step 3: Reserve inventory across multiple warehouses
saga.remote_step() \
.on_execute(create_multi_warehouse_request) \
.on_success(handle_inventory_allocation) \
.on_error(handle_inventory_shortage) \
.on_failure(create_inventory_release_request)
return saga.commit()
def create_validation_request(context):
"""Request validation from multiple services."""
return SagaRequest(
target="validation-service",
content={
"customer_id": context.customer_id,
"items": context.items,
"shipping_address": context.shipping_address,
"billing_address": context.billing_address,
"checks": ["customer_status", "address_validation", "pricing"]
}
)
def handle_validation_success(context, response):
"""Process validation results from multiple checks."""
validation_data = await response.content()
# Update context with validated data
context.validated_customer = validation_data["customer_status"]
context.validated_addresses = validation_data["address_validation"]
context.final_pricing = validation_data["pricing"]
# Check if any validation failed
if not all([
validation_data["customer_status"]["valid"],
validation_data["address_validation"]["valid"],
validation_data["pricing"]["valid"]
]):
failed_checks = [
check for check, result in validation_data.items()
if not result.get("valid", False)
]
return Exception(f"Validation failed for: {', '.join(failed_checks)}")
return context
def create_multi_warehouse_request(context):
"""Create request for multi-warehouse inventory allocation."""
return SagaRequest(
target="inventory-allocation-service",
content={
"items": context.items,
"customer_location": context.shipping_address,
"priority": context.customer.get("tier", "standard"),
"allocation_strategy": "cost_optimized",
"max_warehouses": 3
}
)
def handle_inventory_allocation(context, response):
"""Handle complex inventory allocation response."""
allocation_data = await response.content()
# Store allocation details
context.allocations = allocation_data["allocations"]
context.total_warehouses = len(allocation_data["warehouses_used"])
context.estimated_shipping_cost = allocation_data["shipping_estimate"]
# Track all involved warehouse services
warehouse_services = {f"warehouse-{wh['id']}" for wh in allocation_data["warehouses_used"]}
context.warehouse_services = warehouse_services
return contextdef create_compensation_request(context):
"""Create compensation request for failed operation."""
if hasattr(context, 'payment_id'):
# Refund payment
return SagaRequest(
target="payment-service",
content={
"action": "refund",
"payment_id": context.payment_id,
"amount": context.charged_amount,
"reason": "saga_rollback"
}
)
else:
# No payment to refund
return None
def create_inventory_release_request(context):
"""Create request to release reserved inventory."""
if hasattr(context, 'reservation_id'):
return SagaRequest(
target="inventory-service",
content={
"action": "release_reservation",
"reservation_id": context.reservation_id,
"reason": "saga_rollback"
}
)
else:
return None
def handle_compensation_response(context, response):
"""Handle compensation operation response."""
if response.ok:
compensation_data = await response.content()
context.compensation_completed = True
context.compensation_id = compensation_data.get("compensation_id")
return context
else:
# Compensation failed - log but don't fail saga
error_data = await response.content()
context.compensation_failed = True
context.compensation_error = error_data.get("error_message")
print(f"Compensation failed: {context.compensation_error}")
return contextimport json
from datetime import datetime, timezone
def create_timestamped_request(context, target, base_content):
"""Create request with timestamp and tracking info."""
content = {
**base_content,
"timestamp": datetime.now(timezone.utc).isoformat(),
"saga_execution_id": str(context.get("saga_uuid", "")),
"correlation_id": context.get("correlation_id"),
"source_service": "order-orchestrator"
}
return SagaRequest(target=target, content=content)
def create_batched_request(context, target, items):
"""Create request for batch processing."""
return SagaRequest(
target=target,
content={
"batch_id": f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
"items": items,
"batch_size": len(items),
"processing_options": {
"parallel": True,
"fail_fast": False,
"timeout": 300
}
}
)
def create_conditional_request(context):
"""Create request with different content based on context."""
base_content = {
"customer_id": context.customer_id,
"order_id": context.order_id
}
# Conditional content based on customer tier
if context.customer.get("tier") == "premium":
content = {
**base_content,
"priority": "high",
"express_processing": True,
"dedicated_support": True
}
target = "premium-processing-service"
else:
content = {
**base_content,
"priority": "normal",
"standard_processing": True
}
target = "standard-processing-service"
return SagaRequest(target=target, content=content)Install with Tessl CLI
npx tessl i tessl/pypi-minos-microservice-saga