CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-minos-microservice-saga

Implementation of the SAGA pattern for distributed microservice transactions in the Minos Framework.

Pending
Overview
Eval results
Files

context-management.mddocs/

Context Management

Stateful execution context that maintains data across saga steps with dictionary-like interface and automatic persistence. The context serves as the primary mechanism for passing data between saga steps and maintaining execution state throughout the distributed transaction lifecycle.

Capabilities

Saga Context

The core context class that acts as a stateful container for saga execution data.

from minos.common import BucketModel
from collections.abc import MutableMapping

class SagaContext(BucketModel, MutableMapping):
    """
    Execution state container with dict-like interface.
    
    Provides both dictionary-style and attribute-style access to data,
    with automatic field creation and persistence capabilities.
    Extends BucketModel for serialization and MutableMapping for dict operations.
    """
    def __init__(self, **kwargs):
        """
        Initialize context with key-value pairs.
        
        Args:
            **kwargs: Initial context data as keyword arguments
            
        Example:
            context = SagaContext(order_id=123, customer="john", amount=99.99)
        """
    
    def __setitem__(self, key, value):
        """
        Set context value using dictionary syntax.
        
        Args:
            key (str): Context key
            value: Value to store
            
        Example:
            context["order_id"] = 123
        """
    
    def __getitem__(self, key):
        """
        Get context value using dictionary syntax.
        
        Args:
            key (str): Context key
            
        Returns:
            Value stored at key
            
        Raises:
            KeyError: If key not found
            
        Example:
            order_id = context["order_id"]
        """
    
    def __delitem__(self, key):
        """
        Delete context value using dictionary syntax.
        
        Args:
            key (str): Context key to delete
            
        Raises:
            KeyError: If key not found
            
        Example:
            del context["temp_data"]
        """
    
    def __setattr__(self, key, value):
        """
        Set context value using attribute syntax.
        
        Args:
            key (str): Attribute name
            value: Value to store
            
        Example:
            context.order_id = 123
        """
    
    def __delattr__(self, key):
        """
        Delete context value using attribute syntax.
        
        Args:
            key (str): Attribute name to delete
            
        Raises:
            AttributeError: If attribute not found
            
        Example:
            del context.temp_data
        """
    
    def __contains__(self, key):
        """
        Check if key exists in context.
        
        Args:
            key (str): Key to check
            
        Returns:
            bool: True if key exists
            
        Example:
            if "order_id" in context:
                process_order(context.order_id)
        """
    
    def __iter__(self):
        """
        Iterate over context keys.
        
        Returns:
            Iterator over context keys
            
        Example:
            for key in context:
                print(f"{key}: {context[key]}")
        """
    
    def __len__(self):
        """
        Get number of items in context.
        
        Returns:
            int: Number of context items
            
        Example:
            if len(context) > 0:
                process_context(context)
        """
    
    def keys(self):
        """
        Get context keys.
        
        Returns:
            dict_keys: Context keys
        """
    
    def values(self):
        """
        Get context values.
        
        Returns:
            dict_values: Context values
        """
    
    def items(self):
        """
        Get context key-value pairs.
        
        Returns:
            dict_items: Key-value pairs
        """
    
    def get(self, key, default=None):
        """
        Get context value with default.
        
        Args:
            key (str): Context key
            default: Default value if key not found
            
        Returns:
            Value at key or default
            
        Example:
            amount = context.get("amount", 0.0)
        """
    
    def update(self, other=None, **kwargs):
        """
        Update context with another mapping or keyword arguments.
        
        Args:
            other: Mapping to update from
            **kwargs: Additional key-value pairs
            
        Example:
            context.update({"status": "processing"}, priority="high")
        """
    
    def pop(self, key, default=None):
        """
        Remove and return context value.
        
        Args:
            key (str): Context key
            default: Default value if key not found
            
        Returns:
            Removed value or default
            
        Example:
            temp_value = context.pop("temp_data")
        """
    
    def clear(self):
        """
        Remove all context data.
        
        Example:
            context.clear()  # Reset context
        """

Usage Examples

Basic Context Operations

from minos.saga import SagaContext

# Initialize context with data
context = SagaContext(
    order_id=12345,
    customer_id="cust_456",
    items=[{"sku": "ITEM1", "quantity": 2}],
    total=99.99
)

# Dictionary-style access
print(f"Order ID: {context['order_id']}")
context["status"] = "processing"

# Attribute-style access  
print(f"Customer: {context.customer_id}")
context.payment_method = "credit_card"

# Check for data
if "discount" in context:
    apply_discount(context.discount)

# Get with default
shipping_cost = context.get("shipping_cost", 0.0)

Context in Saga Steps

from minos.saga import Saga, SagaContext, SagaRequest

def create_order_processing_saga():
    saga = Saga()
    
    # Step 1: Validate order
    saga.local_step().on_execute(validate_order_data)
    
    # Step 2: Reserve inventory
    saga.remote_step() \
        .on_execute(create_inventory_request) \
        .on_success(handle_inventory_success) \
        .on_failure(release_reservation)
    
    # Step 3: Process payment
    saga.remote_step() \
        .on_execute(create_payment_request) \
        .on_success(handle_payment_success) \
        .on_failure(refund_payment)
    
    return saga.commit()

# Context flows through all steps
def validate_order_data(context):
    """Local step - modify context directly"""
    if not context.get("total") or context.total <= 0:
        raise ValueError("Invalid order total")
    
    # Add validation timestamp
    context.validated_at = datetime.utcnow().isoformat()
    return context

def create_inventory_request(context):
    """Remote step - use context to create request"""
    return SagaRequest(
        target="inventory-service",
        content={
            "items": context.items,
            "order_id": context.order_id
        }
    )

def handle_inventory_success(context, response):
    """Success handler - update context with response data"""
    reservation_data = response.content()
    context.reservation_id = reservation_data["reservation_id"]
    context.inventory_status = "reserved"
    return context

def create_payment_request(context):
    """Payment step - access multiple context fields"""
    return SagaRequest(
        target="payment-service",
        content={
            "amount": context.total,
            "customer_id": context.customer_id,
            "order_id": context.order_id,
            "payment_method": context.get("payment_method", "default")
        }
    )

def handle_payment_success(context, response):
    """Update context with payment details"""
    payment_data = response.content()
    context.payment_id = payment_data["payment_id"]
    context.transaction_id = payment_data["transaction_id"]
    context.payment_status = "completed"
    return context

Context Persistence and Recovery

from minos.saga import SagaManager, SagaExecution
import json

# Context is automatically persisted with execution state
async def demonstrate_persistence():
    # Initial context
    context = SagaContext(
        process_id="proc_123",
        data={"key": "value"},
        step_count=0
    )
    
    # Execute saga with pause capability
    manager = SagaManager(storage=repo, broker_pool=broker)
    execution_uuid = await manager.run(
        definition=long_running_saga,
        context=context,
        pause_on_disk=True,  # Context saved to disk
        return_execution=False
    )
    
    # Later - reload execution and context
    execution = await repo.load(execution_uuid)
    recovered_context = execution.context
    
    # Context is fully restored
    print(f"Process ID: {recovered_context.process_id}")
    print(f"Data: {recovered_context.data}")
    print(f"Step count: {recovered_context.step_count}")
    
    # Continue execution with recovered context
    final_result = await manager.run(
        response=some_response,
        pause_on_disk=True
    )

Advanced Context Patterns

# Nested data structures
context = SagaContext()
context.customer = {
    "id": "cust_123",
    "name": "John Doe",
    "tier": "premium"
}
context.order = {
    "items": [
        {"sku": "ITEM1", "qty": 2, "price": 25.00},
        {"sku": "ITEM2", "qty": 1, "price": 49.99}
    ],
    "shipping": {
        "address": "123 Main St",
        "method": "express"
    }
}

# Access nested data
customer_tier = context.customer["tier"]
first_item_sku = context.order["items"][0]["sku"]
shipping_method = context.order["shipping"]["method"]

# Conditional logic based on context
def determine_processing_flow(context):
    if context.customer.get("tier") == "premium":
        return "express_processing"
    elif context.order.get("total", 0) > 100:
        return "standard_processing"
    else:
        return "basic_processing"

# Context-driven step selection
def create_dynamic_saga(context):
    saga = Saga()
    
    processing_type = determine_processing_flow(context)
    context.processing_type = processing_type
    
    if processing_type == "express_processing":
        saga.remote_step().on_execute(express_process_order)
    else:
        saga.remote_step().on_execute(standard_process_order)
    
    return saga.commit()

Context Validation and Type Safety

from typing import Dict, List, Optional, Any

def validate_context_schema(context: SagaContext) -> bool:
    """Validate context has required fields with correct types."""
    
    required_fields = {
        "order_id": (str, int),
        "customer_id": str,
        "total": (int, float),
        "items": list
    }
    
    for field, expected_type in required_fields.items():
        if field not in context:
            raise ValueError(f"Required field '{field}' missing from context")
        
        if not isinstance(context[field], expected_type):
            raise TypeError(f"Field '{field}' must be of type {expected_type}")
    
    return True

def create_validated_saga():
    saga = Saga()
    
    # Add validation step at the beginning
    saga.local_step().on_execute(validate_context_schema)
    
    # Continue with business logic
    saga.remote_step().on_execute(process_validated_order)
    
    return saga.commit()

# Context factory pattern
class OrderContextFactory:
    @staticmethod
    def create_order_context(order_data: Dict[str, Any]) -> SagaContext:
        """Create standardized order context."""
        return SagaContext(
            order_id=order_data["id"],
            customer_id=order_data["customer_id"],
            items=order_data["items"],
            total=sum(item["price"] * item["quantity"] for item in order_data["items"]),
            created_at=datetime.utcnow().isoformat(),
            status="pending"
        )
    
    @staticmethod
    def create_payment_context(payment_data: Dict[str, Any], order_context: SagaContext) -> SagaContext:
        """Extend order context with payment data."""
        order_context.update(
            payment_method=payment_data["method"],
            payment_token=payment_data["token"],
            billing_address=payment_data["billing_address"]
        )
        return order_context

# Usage
order_data = {"id": "ord_123", "customer_id": "cust_456", "items": [...]}
context = OrderContextFactory.create_order_context(order_data)

Context Debugging and Monitoring

import json
from datetime import datetime

def log_context_state(context: SagaContext, step_name: str):
    """Log context state for debugging."""
    print(f"[{datetime.utcnow()}] Context at {step_name}:")
    for key, value in context.items():
        print(f"  {key}: {value}")
    print()

def create_monitored_saga():
    saga = Saga()
    
    # Add monitoring to each step
    saga.local_step().on_execute(lambda ctx: log_context_state(ctx, "start") or validate_order(ctx))
    
    saga.remote_step() \
        .on_execute(lambda ctx: log_context_state(ctx, "payment_request") or create_payment_request(ctx)) \
        .on_success(lambda ctx, resp: log_context_state(ctx, "payment_success") or handle_payment_success(ctx, resp))
    
    return saga.commit()

# Context snapshots for debugging
class ContextSnapshot:
    def __init__(self, context: SagaContext, step_name: str):
        self.step_name = step_name
        self.timestamp = datetime.utcnow().isoformat()
        self.data = dict(context)  # Create snapshot copy
    
    def to_json(self) -> str:
        return json.dumps({
            "step": self.step_name,
            "timestamp": self.timestamp,
            "data": self.data
        }, indent=2)

# Usage in saga steps
def debug_step_with_snapshot(context):
    snapshot = ContextSnapshot(context, "debug_checkpoint")
    print(snapshot.to_json())
    
    # Continue with step logic
    context.debug_checkpoint = snapshot.timestamp
    return context

Install with Tessl CLI

npx tessl i tessl/pypi-minos-microservice-saga

docs

context-management.md

exception-handling.md

execution-engine.md

index.md

message-system.md

saga-definitions.md

testing-utilities.md

tile.json