CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-minos-microservice-saga

Implementation of the SAGA pattern for distributed microservice transactions in the Minos Framework.

Pending
Overview
Eval results
Files

message-system.mddocs/

Message System

Request/response infrastructure for microservice communication with status tracking and service relationship management. This module provides the messaging primitives that enable saga steps to communicate with remote services in a structured and trackable manner.

Capabilities

Saga Request Messages

Request messages for remote saga operations targeting specific microservices.

class SagaRequest:
    """
    Request message for remote saga operations.
    
    Represents a request to be sent to a remote microservice as part
    of saga execution. Contains target service information and payload.
    
    Attributes:
        target (str): Target microservice/endpoint identifier
        _content (Any): Request payload data
    """
    def __init__(self, target, content=None):
        """
        Initialize request with target and content.
        
        Args:
            target (str): Target service identifier (e.g., "payment-service", "inventory-service")
            content (Optional[Any]): Request payload data
            
        Example:
            request = SagaRequest(
                target="payment-service",
                content={"amount": 99.99, "currency": "USD"}
            )
        """
    
    async def content(self, **kwargs):
        """
        Get request content asynchronously.
        
        Args:
            **kwargs: Additional parameters for content retrieval
            
        Returns:
            Any: Request payload content
            
        Example:
            payload = await request.content()
        """

Saga Response Messages

Response messages from remote saga operations with status and metadata.

from enum import IntEnum

class SagaResponse:
    """
    Response message from remote saga operations.
    
    Contains the response data, status, and metadata from remote
    microservice calls, including service relationship tracking.
    
    Attributes:
        ok (bool): Whether response status is SUCCESS
        status (SagaResponseStatus): Response status code
        related_services (set[str]): Set of related microservice names
        uuid (UUID): Saga execution UUID this response belongs to
        _content (Any): Response payload data
    """
    def __init__(self, content=None, related_services=None, status=None, uuid=None, **kwargs):
        """
        Initialize response with content and metadata.
        
        Args:
            content (Optional[Any]): Response payload data
            related_services (Optional[set[str]]): Related service names
            status (Optional[SagaResponseStatus]): Response status
            uuid (Optional[UUID]): Saga execution identifier
            
        Example:
            response = SagaResponse(
                content={"payment_id": "pay_123", "status": "completed"},
                status=SagaResponseStatus.SUCCESS,
                related_services={"payment-service"}
            )
        """
    
    @classmethod
    def from_message(cls, message):
        """
        Build response from BrokerMessage.
        
        Args:
            message: Broker message to convert
            
        Returns:
            SagaResponse: Constructed response instance
            
        Example:
            response = SagaResponse.from_message(broker_message)
        """
    
    async def content(self, **kwargs):
        """
        Get response content asynchronously.
        
        Args:
            **kwargs: Additional parameters for content retrieval
            
        Returns:
            Any: Response payload content
            
        Example:
            payload = await response.content()
            payment_id = payload["payment_id"]
        """

class SagaResponseStatus(IntEnum):
    """
    HTTP-like status codes for saga responses.
    
    Values:
        SUCCESS (200): Successful operation
        ERROR (400): Client/business logic error  
        SYSTEM_ERROR (500): System/infrastructure error
    """
    SUCCESS = 200
    ERROR = 400
    SYSTEM_ERROR = 500

Usage Examples

Creating and Sending Requests

from minos.saga import SagaRequest, SagaContext

def create_payment_request(context):
    """Create a payment request for remote service."""
    return SagaRequest(
        target="payment-service",
        content={
            "order_id": context.order_id,
            "amount": context.total,
            "currency": context.get("currency", "USD"),
            "customer_id": context.customer_id,
            "payment_method": context.payment_method
        }
    )

def create_inventory_request(context):
    """Create an inventory reservation request."""
    return SagaRequest(
        target="inventory-service", 
        content={
            "items": [
                {
                    "sku": item["sku"],
                    "quantity": item["quantity"],
                    "warehouse": item.get("warehouse", "default")
                }
                for item in context.items
            ],
            "order_id": context.order_id,
            "priority": "high" if context.customer.get("tier") == "premium" else "normal"
        }
    )

def create_shipping_request(context):
    """Create a shipping request with address validation."""
    return SagaRequest(
        target="shipping-service",
        content={
            "order_id": context.order_id,
            "items": context.items,
            "destination": context.shipping_address,
            "method": context.shipping_method,
            "insurance": context.get("insurance_required", False)
        }
    )

Handling Response Messages

from minos.saga import SagaResponse, SagaResponseStatus, SagaContext

def handle_payment_success(context, response):
    """Handle successful payment response."""
    if not response.ok:
        raise ValueError(f"Expected successful response, got status: {response.status}")
    
    # Extract payment details from response
    payment_data = await response.content()
    
    # Update context with payment information
    context.payment_id = payment_data["payment_id"]
    context.transaction_id = payment_data["transaction_id"]
    context.payment_status = "completed"
    context.charged_amount = payment_data["charged_amount"]
    
    return context

def handle_payment_error(context, response):
    """Handle payment error response."""
    error_data = await response.content()
    
    # Log error details
    print(f"Payment failed: {error_data.get('error_message')}")
    
    # Update context with error information
    context.payment_error = error_data.get("error_code")
    context.payment_status = "failed"
    
    # Return exception to trigger rollback
    return Exception(f"Payment failed: {error_data.get('error_message')}")

def handle_inventory_success(context, response):
    """Handle successful inventory reservation."""
    inventory_data = await response.content()
    
    # Update context with reservation details
    context.reservation_id = inventory_data["reservation_id"]
    context.reserved_items = inventory_data["reserved_items"]
    context.inventory_status = "reserved"
    context.expiry_time = inventory_data.get("expiry_time")
    
    return context

def handle_inventory_error(context, response):
    """Handle inventory shortage or error."""
    error_data = await response.content()
    
    if response.status == SagaResponseStatus.ERROR:
        # Business logic error (e.g., insufficient inventory)
        context.inventory_error = error_data.get("error_code")
        context.unavailable_items = error_data.get("unavailable_items", [])
        
        # Could return modified context to continue with partial order
        if error_data.get("partial_available"):
            context.items = error_data["available_items"]
            return context
        else:
            return Exception("Insufficient inventory")
    
    elif response.status == SagaResponseStatus.SYSTEM_ERROR:
        # System error - should retry
        return Exception("Inventory service unavailable")

Status-Based Response Handling

from minos.saga import SagaResponseStatus

def comprehensive_response_handler(context, response):
    """Handle response based on status code."""
    
    if response.status == SagaResponseStatus.SUCCESS:
        # Successful operation
        data = await response.content()
        context.update(data)
        return context
        
    elif response.status == SagaResponseStatus.ERROR:
        # Business logic error - handle gracefully
        error_data = await response.content()
        error_code = error_data.get("error_code")
        
        if error_code == "INSUFFICIENT_FUNDS":
            context.payment_error = "insufficient_funds"
            return Exception("Customer has insufficient funds")
            
        elif error_code == "INVALID_CARD":
            context.payment_error = "invalid_card"
            return Exception("Payment method is invalid")
            
        elif error_code == "LIMIT_EXCEEDED":
            context.payment_error = "limit_exceeded"  
            return Exception("Transaction exceeds limit")
            
        else:
            # Generic business error
            return Exception(f"Business error: {error_data.get('message')}")
    
    elif response.status == SagaResponseStatus.SYSTEM_ERROR:
        # System/infrastructure error - should trigger retry logic
        error_data = await response.content()
        return Exception(f"System error: {error_data.get('message', 'Unknown system error')}")
    
    else:
        # Unknown status
        return Exception(f"Unknown response status: {response.status}")

Service Relationship Tracking

def handle_response_with_service_tracking(context, response):
    """Handle response and track related services."""
    
    # Check which services are related to this response
    if response.related_services:
        context.involved_services = context.get("involved_services", set())
        context.involved_services.update(response.related_services)
        
        print(f"Services involved so far: {context.involved_services}")
    
    # Process response data
    data = await response.content()
    
    # Example: Payment service might involve fraud detection service
    if "fraud-check-service" in response.related_services:
        context.fraud_check_id = data.get("fraud_check_id")
        context.risk_score = data.get("risk_score")
    
    # Example: Inventory service might involve warehouse management
    if "warehouse-service" in response.related_services:
        context.warehouse_location = data.get("warehouse")
        context.pick_list_id = data.get("pick_list_id")
    
    return context

Complex Request/Response Workflows

from minos.saga import Saga, SagaRequest, SagaResponse

def create_multi_service_saga():
    """Create saga with multiple service interactions."""
    saga = Saga()
    
    # Step 1: Validate customer and pricing
    saga.remote_step() \
        .on_execute(create_validation_request) \
        .on_success(handle_validation_success) \
        .on_error(handle_validation_error)
    
    # Step 2: Process payment with fraud check
    saga.remote_step() \
        .on_execute(create_payment_request) \
        .on_success(handle_payment_with_fraud_check) \
        .on_error(handle_payment_error) \
        .on_failure(create_refund_request)
    
    # Step 3: Reserve inventory across multiple warehouses
    saga.remote_step() \
        .on_execute(create_multi_warehouse_request) \
        .on_success(handle_inventory_allocation) \
        .on_error(handle_inventory_shortage) \
        .on_failure(create_inventory_release_request)
    
    return saga.commit()

def create_validation_request(context):
    """Request validation from multiple services."""
    return SagaRequest(
        target="validation-service",
        content={
            "customer_id": context.customer_id,
            "items": context.items,
            "shipping_address": context.shipping_address,
            "billing_address": context.billing_address,
            "checks": ["customer_status", "address_validation", "pricing"]
        }
    )

def handle_validation_success(context, response):
    """Process validation results from multiple checks."""
    validation_data = await response.content()
    
    # Update context with validated data
    context.validated_customer = validation_data["customer_status"]
    context.validated_addresses = validation_data["address_validation"]
    context.final_pricing = validation_data["pricing"]
    
    # Check if any validation failed
    if not all([
        validation_data["customer_status"]["valid"],
        validation_data["address_validation"]["valid"],
        validation_data["pricing"]["valid"]
    ]):
        failed_checks = [
            check for check, result in validation_data.items()
            if not result.get("valid", False)
        ]
        return Exception(f"Validation failed for: {', '.join(failed_checks)}")
    
    return context

def create_multi_warehouse_request(context):
    """Create request for multi-warehouse inventory allocation."""
    return SagaRequest(
        target="inventory-allocation-service",
        content={
            "items": context.items,
            "customer_location": context.shipping_address,
            "priority": context.customer.get("tier", "standard"),
            "allocation_strategy": "cost_optimized",
            "max_warehouses": 3
        }
    )

def handle_inventory_allocation(context, response):
    """Handle complex inventory allocation response."""
    allocation_data = await response.content()
    
    # Store allocation details
    context.allocations = allocation_data["allocations"]
    context.total_warehouses = len(allocation_data["warehouses_used"])
    context.estimated_shipping_cost = allocation_data["shipping_estimate"]
    
    # Track all involved warehouse services
    warehouse_services = {f"warehouse-{wh['id']}" for wh in allocation_data["warehouses_used"]}
    context.warehouse_services = warehouse_services
    
    return context

Error Recovery and Compensation

def create_compensation_request(context):
    """Create compensation request for failed operation."""
    if hasattr(context, 'payment_id'):
        # Refund payment
        return SagaRequest(
            target="payment-service",
            content={
                "action": "refund",
                "payment_id": context.payment_id,
                "amount": context.charged_amount,
                "reason": "saga_rollback"
            }
        )
    else:
        # No payment to refund
        return None

def create_inventory_release_request(context):
    """Create request to release reserved inventory."""
    if hasattr(context, 'reservation_id'):
        return SagaRequest(
            target="inventory-service",
            content={
                "action": "release_reservation",
                "reservation_id": context.reservation_id,
                "reason": "saga_rollback"
            }
        )
    else:
        return None

def handle_compensation_response(context, response):
    """Handle compensation operation response."""
    if response.ok:
        compensation_data = await response.content()
        context.compensation_completed = True
        context.compensation_id = compensation_data.get("compensation_id")
        return context
    else:
        # Compensation failed - log but don't fail saga
        error_data = await response.content()
        context.compensation_failed = True
        context.compensation_error = error_data.get("error_message")
        print(f"Compensation failed: {context.compensation_error}")
        return context

Request Content Patterns

import json
from datetime import datetime, timezone

def create_timestamped_request(context, target, base_content):
    """Create request with timestamp and tracking info."""
    content = {
        **base_content,
        "timestamp": datetime.now(timezone.utc).isoformat(),
        "saga_execution_id": str(context.get("saga_uuid", "")),
        "correlation_id": context.get("correlation_id"),
        "source_service": "order-orchestrator"
    }
    
    return SagaRequest(target=target, content=content)

def create_batched_request(context, target, items):
    """Create request for batch processing."""
    return SagaRequest(
        target=target,
        content={
            "batch_id": f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
            "items": items,
            "batch_size": len(items),
            "processing_options": {
                "parallel": True,
                "fail_fast": False,
                "timeout": 300
            }
        }
    )

def create_conditional_request(context):
    """Create request with different content based on context."""
    base_content = {
        "customer_id": context.customer_id,
        "order_id": context.order_id
    }
    
    # Conditional content based on customer tier
    if context.customer.get("tier") == "premium":
        content = {
            **base_content,
            "priority": "high",
            "express_processing": True,
            "dedicated_support": True
        }
        target = "premium-processing-service"
    else:
        content = {
            **base_content,
            "priority": "normal",
            "standard_processing": True
        }
        target = "standard-processing-service"
    
    return SagaRequest(target=target, content=content)

Install with Tessl CLI

npx tessl i tessl/pypi-minos-microservice-saga

docs

context-management.md

exception-handling.md

execution-engine.md

index.md

message-system.md

saga-definitions.md

testing-utilities.md

tile.json