Implementation of the SAGA pattern for distributed microservice transactions in the Minos Framework.
—
Stateful execution context that maintains data across saga steps with dictionary-like interface and automatic persistence. The context serves as the primary mechanism for passing data between saga steps and maintaining execution state throughout the distributed transaction lifecycle.
The core context class that acts as a stateful container for saga execution data.
from minos.common import BucketModel
from collections.abc import MutableMapping
class SagaContext(BucketModel, MutableMapping):
"""
Execution state container with dict-like interface.
Provides both dictionary-style and attribute-style access to data,
with automatic field creation and persistence capabilities.
Extends BucketModel for serialization and MutableMapping for dict operations.
"""
def __init__(self, **kwargs):
"""
Initialize context with key-value pairs.
Args:
**kwargs: Initial context data as keyword arguments
Example:
context = SagaContext(order_id=123, customer="john", amount=99.99)
"""
def __setitem__(self, key, value):
"""
Set context value using dictionary syntax.
Args:
key (str): Context key
value: Value to store
Example:
context["order_id"] = 123
"""
def __getitem__(self, key):
"""
Get context value using dictionary syntax.
Args:
key (str): Context key
Returns:
Value stored at key
Raises:
KeyError: If key not found
Example:
order_id = context["order_id"]
"""
def __delitem__(self, key):
"""
Delete context value using dictionary syntax.
Args:
key (str): Context key to delete
Raises:
KeyError: If key not found
Example:
del context["temp_data"]
"""
def __setattr__(self, key, value):
"""
Set context value using attribute syntax.
Args:
key (str): Attribute name
value: Value to store
Example:
context.order_id = 123
"""
def __delattr__(self, key):
"""
Delete context value using attribute syntax.
Args:
key (str): Attribute name to delete
Raises:
AttributeError: If attribute not found
Example:
del context.temp_data
"""
def __contains__(self, key):
"""
Check if key exists in context.
Args:
key (str): Key to check
Returns:
bool: True if key exists
Example:
if "order_id" in context:
process_order(context.order_id)
"""
def __iter__(self):
"""
Iterate over context keys.
Returns:
Iterator over context keys
Example:
for key in context:
print(f"{key}: {context[key]}")
"""
def __len__(self):
"""
Get number of items in context.
Returns:
int: Number of context items
Example:
if len(context) > 0:
process_context(context)
"""
def keys(self):
"""
Get context keys.
Returns:
dict_keys: Context keys
"""
def values(self):
"""
Get context values.
Returns:
dict_values: Context values
"""
def items(self):
"""
Get context key-value pairs.
Returns:
dict_items: Key-value pairs
"""
def get(self, key, default=None):
"""
Get context value with default.
Args:
key (str): Context key
default: Default value if key not found
Returns:
Value at key or default
Example:
amount = context.get("amount", 0.0)
"""
def update(self, other=None, **kwargs):
"""
Update context with another mapping or keyword arguments.
Args:
other: Mapping to update from
**kwargs: Additional key-value pairs
Example:
context.update({"status": "processing"}, priority="high")
"""
def pop(self, key, default=None):
"""
Remove and return context value.
Args:
key (str): Context key
default: Default value if key not found
Returns:
Removed value or default
Example:
temp_value = context.pop("temp_data")
"""
def clear(self):
"""
Remove all context data.
Example:
context.clear() # Reset context
"""from minos.saga import SagaContext
# Initialize context with data
context = SagaContext(
order_id=12345,
customer_id="cust_456",
items=[{"sku": "ITEM1", "quantity": 2}],
total=99.99
)
# Dictionary-style access
print(f"Order ID: {context['order_id']}")
context["status"] = "processing"
# Attribute-style access
print(f"Customer: {context.customer_id}")
context.payment_method = "credit_card"
# Check for data
if "discount" in context:
apply_discount(context.discount)
# Get with default
shipping_cost = context.get("shipping_cost", 0.0)from minos.saga import Saga, SagaContext, SagaRequest
def create_order_processing_saga():
saga = Saga()
# Step 1: Validate order
saga.local_step().on_execute(validate_order_data)
# Step 2: Reserve inventory
saga.remote_step() \
.on_execute(create_inventory_request) \
.on_success(handle_inventory_success) \
.on_failure(release_reservation)
# Step 3: Process payment
saga.remote_step() \
.on_execute(create_payment_request) \
.on_success(handle_payment_success) \
.on_failure(refund_payment)
return saga.commit()
# Context flows through all steps
def validate_order_data(context):
"""Local step - modify context directly"""
if not context.get("total") or context.total <= 0:
raise ValueError("Invalid order total")
# Add validation timestamp
context.validated_at = datetime.utcnow().isoformat()
return context
def create_inventory_request(context):
"""Remote step - use context to create request"""
return SagaRequest(
target="inventory-service",
content={
"items": context.items,
"order_id": context.order_id
}
)
def handle_inventory_success(context, response):
"""Success handler - update context with response data"""
reservation_data = response.content()
context.reservation_id = reservation_data["reservation_id"]
context.inventory_status = "reserved"
return context
def create_payment_request(context):
"""Payment step - access multiple context fields"""
return SagaRequest(
target="payment-service",
content={
"amount": context.total,
"customer_id": context.customer_id,
"order_id": context.order_id,
"payment_method": context.get("payment_method", "default")
}
)
def handle_payment_success(context, response):
"""Update context with payment details"""
payment_data = response.content()
context.payment_id = payment_data["payment_id"]
context.transaction_id = payment_data["transaction_id"]
context.payment_status = "completed"
return contextfrom minos.saga import SagaManager, SagaExecution
import json
# Context is automatically persisted with execution state
async def demonstrate_persistence():
# Initial context
context = SagaContext(
process_id="proc_123",
data={"key": "value"},
step_count=0
)
# Execute saga with pause capability
manager = SagaManager(storage=repo, broker_pool=broker)
execution_uuid = await manager.run(
definition=long_running_saga,
context=context,
pause_on_disk=True, # Context saved to disk
return_execution=False
)
# Later - reload execution and context
execution = await repo.load(execution_uuid)
recovered_context = execution.context
# Context is fully restored
print(f"Process ID: {recovered_context.process_id}")
print(f"Data: {recovered_context.data}")
print(f"Step count: {recovered_context.step_count}")
# Continue execution with recovered context
final_result = await manager.run(
response=some_response,
pause_on_disk=True
)# Nested data structures
context = SagaContext()
context.customer = {
"id": "cust_123",
"name": "John Doe",
"tier": "premium"
}
context.order = {
"items": [
{"sku": "ITEM1", "qty": 2, "price": 25.00},
{"sku": "ITEM2", "qty": 1, "price": 49.99}
],
"shipping": {
"address": "123 Main St",
"method": "express"
}
}
# Access nested data
customer_tier = context.customer["tier"]
first_item_sku = context.order["items"][0]["sku"]
shipping_method = context.order["shipping"]["method"]
# Conditional logic based on context
def determine_processing_flow(context):
if context.customer.get("tier") == "premium":
return "express_processing"
elif context.order.get("total", 0) > 100:
return "standard_processing"
else:
return "basic_processing"
# Context-driven step selection
def create_dynamic_saga(context):
saga = Saga()
processing_type = determine_processing_flow(context)
context.processing_type = processing_type
if processing_type == "express_processing":
saga.remote_step().on_execute(express_process_order)
else:
saga.remote_step().on_execute(standard_process_order)
return saga.commit()from typing import Dict, List, Optional, Any
def validate_context_schema(context: SagaContext) -> bool:
"""Validate context has required fields with correct types."""
required_fields = {
"order_id": (str, int),
"customer_id": str,
"total": (int, float),
"items": list
}
for field, expected_type in required_fields.items():
if field not in context:
raise ValueError(f"Required field '{field}' missing from context")
if not isinstance(context[field], expected_type):
raise TypeError(f"Field '{field}' must be of type {expected_type}")
return True
def create_validated_saga():
saga = Saga()
# Add validation step at the beginning
saga.local_step().on_execute(validate_context_schema)
# Continue with business logic
saga.remote_step().on_execute(process_validated_order)
return saga.commit()
# Context factory pattern
class OrderContextFactory:
@staticmethod
def create_order_context(order_data: Dict[str, Any]) -> SagaContext:
"""Create standardized order context."""
return SagaContext(
order_id=order_data["id"],
customer_id=order_data["customer_id"],
items=order_data["items"],
total=sum(item["price"] * item["quantity"] for item in order_data["items"]),
created_at=datetime.utcnow().isoformat(),
status="pending"
)
@staticmethod
def create_payment_context(payment_data: Dict[str, Any], order_context: SagaContext) -> SagaContext:
"""Extend order context with payment data."""
order_context.update(
payment_method=payment_data["method"],
payment_token=payment_data["token"],
billing_address=payment_data["billing_address"]
)
return order_context
# Usage
order_data = {"id": "ord_123", "customer_id": "cust_456", "items": [...]}
context = OrderContextFactory.create_order_context(order_data)import json
from datetime import datetime
def log_context_state(context: SagaContext, step_name: str):
"""Log context state for debugging."""
print(f"[{datetime.utcnow()}] Context at {step_name}:")
for key, value in context.items():
print(f" {key}: {value}")
print()
def create_monitored_saga():
saga = Saga()
# Add monitoring to each step
saga.local_step().on_execute(lambda ctx: log_context_state(ctx, "start") or validate_order(ctx))
saga.remote_step() \
.on_execute(lambda ctx: log_context_state(ctx, "payment_request") or create_payment_request(ctx)) \
.on_success(lambda ctx, resp: log_context_state(ctx, "payment_success") or handle_payment_success(ctx, resp))
return saga.commit()
# Context snapshots for debugging
class ContextSnapshot:
def __init__(self, context: SagaContext, step_name: str):
self.step_name = step_name
self.timestamp = datetime.utcnow().isoformat()
self.data = dict(context) # Create snapshot copy
def to_json(self) -> str:
return json.dumps({
"step": self.step_name,
"timestamp": self.timestamp,
"data": self.data
}, indent=2)
# Usage in saga steps
def debug_step_with_snapshot(context):
snapshot = ContextSnapshot(context, "debug_checkpoint")
print(snapshot.to_json())
# Continue with step logic
context.debug_checkpoint = snapshot.timestamp
return contextInstall with Tessl CLI
npx tessl i tessl/pypi-minos-microservice-saga