CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-minos-microservice-saga

Implementation of the SAGA pattern for distributed microservice transactions in the Minos Framework.

Pending
Overview
Eval results
Files

exception-handling.mddocs/

Exception Handling

Comprehensive exception hierarchy for saga definition validation, execution errors, and system failures with specific error types for different failure scenarios. This module provides structured error handling that enables proper saga rollback, debugging, and recovery mechanisms.

Capabilities

Exception Hierarchy

The saga exception system is organized into a hierarchical structure for precise error handling.

class SagaException(MinosException):
    """
    Base saga exception.
    
    Root exception class for all saga-related errors. Extends MinosException
    to integrate with the broader Minos framework error handling.
    """

class SagaStepException(SagaException):
    """
    Base saga step exception.
    
    Base class for errors related to saga step definition and validation.
    """

class SagaExecutionException(SagaException):
    """
    Base saga execution exception.
    
    Base class for errors that occur during saga execution runtime.
    """

class SagaStepExecutionException(SagaExecutionException):
    """
    Base saga step execution exception.
    
    Base class for errors that occur during individual step execution.
    """

Definition and Validation Exceptions

Exceptions thrown during saga definition construction and validation.

# Saga Definition Exceptions
class EmptySagaException(SagaException):
    """
    Saga must have at least one step.
    
    Raised when attempting to commit a saga that has no steps defined.
    """

class SagaNotCommittedException(SagaException):
    """
    Saga must be committed before execution.
    
    Raised when attempting to execute a saga that hasn't been committed.
    """

class AlreadyCommittedException(SagaException):
    """
    Cannot modify committed saga.
    
    Raised when attempting to add steps or modify a saga that has
    already been committed for execution.
    """

class SagaNotDefinedException(SagaStepException):
    """
    Step must have saga instance.
    
    Raised when a saga step operation is called but the step
    doesn't have a reference to its parent saga.
    """

class EmptySagaStepException(SagaStepException):
    """
    Step must have at least one action.
    
    Raised when a saga step has no operations defined (no on_execute,
    on_success, on_error, or on_failure callbacks).
    """

class AlreadyOnSagaException(SagaStepException):
    """
    Step can only belong to one saga.
    
    Raised when attempting to add a step that's already part of
    another saga definition.
    """

class UndefinedOnExecuteException(SagaStepException):
    """
    Step must define on_execute logic.
    
    Raised when a step lacks the required on_execute callback
    that defines the primary step operation.
    """

# Callback Validation Exceptions
class MultipleOnExecuteException(SagaStepException):
    """
    Only one on_execute method allowed.
    
    Raised when attempting to define multiple on_execute callbacks
    for a single saga step.
    """

class MultipleOnFailureException(SagaStepException):
    """
    Only one on_failure method allowed.
    
    Raised when attempting to define multiple on_failure callbacks
    for a single saga step.
    """

class MultipleOnSuccessException(SagaStepException):
    """
    Only one on_success method allowed.
    
    Raised when attempting to define multiple on_success callbacks
    for a single remote saga step.
    """

class MultipleOnErrorException(SagaStepException):
    """
    Only one on_error method allowed.
    
    Raised when attempting to define multiple on_error callbacks
    for a single remote saga step.
    """

class MultipleElseThenException(SagaStepException):
    """
    Only one else_then method allowed.
    
    Raised when attempting to define multiple else_then alternatives
    for a single conditional saga step.
    """

Execution Runtime Exceptions

Exceptions that occur during saga execution runtime.

# Execution Management Exceptions
class SagaExecutionNotFoundException(SagaExecutionException):
    """
    Execution not found in storage.
    
    Raised when attempting to load a saga execution that doesn't
    exist in the configured repository.
    """

class SagaExecutionAlreadyExecutedException(SagaExecutionException):
    """
    Cannot re-execute finished execution.
    
    Raised when attempting to execute a saga that has already
    completed (finished or errored status).
    """

# Execution Failure Exceptions
class SagaFailedExecutionException(SagaExecutionException):
    """
    Execution failed during running.
    
    Raised when saga execution encounters an unrecoverable error
    during step processing.
    """

class SagaFailedExecutionStepException(SagaStepExecutionException):
    """
    Step failed during execution.
    
    Raised when an individual saga step fails to execute properly.
    """

class SagaPausedExecutionStepException(SagaStepExecutionException):
    """
    Step paused during execution.
    
    Raised when a saga step requires a pause for external response
    (typically remote steps waiting for service responses).
    """

# Rollback Exceptions
class SagaRollbackExecutionException(SagaExecutionException):
    """
    Failed during rollback process.
    
    Raised when saga rollback (compensation) operations fail.
    """

class SagaRollbackExecutionStepException(SagaStepExecutionException):
    """
    Step failed during rollback.
    
    Raised when an individual step's compensation logic fails
    during saga rollback.
    """

# Transaction Management Exceptions
class SagaFailedCommitCallbackException(SagaExecutionException):
    """
    Commit callback raised exception.
    
    Raised when the saga commit callback function encounters
    an error during transaction commitment.
    """

# Response Handling Exceptions
class SagaResponseException(SagaException):
    """
    Response status not SUCCESS.
    
    Raised when a remote service responds with an error status
    that cannot be handled by the step's error handlers.
    """

Usage Examples

Exception Handling in Saga Definition

from minos.saga import (
    Saga, SagaException, EmptySagaException, 
    AlreadyCommittedException, UndefinedOnExecuteException
)

def create_saga_with_validation():
    """Create saga with proper exception handling."""
    try:
        saga = Saga()
        
        # This would raise EmptySagaException
        # saga.commit()  # Uncommenting would fail
        
        # Add required steps
        saga.local_step().on_execute(validate_order)
        saga.remote_step().on_execute(process_payment)
        
        # Commit saga
        committed_saga = saga.commit()
        
        # This would raise AlreadyCommittedException
        # saga.local_step()  # Uncommenting would fail
        
        return committed_saga
        
    except EmptySagaException:
        print("Cannot commit saga without steps")
        raise
    except AlreadyCommittedException:
        print("Cannot modify saga after commit")
        raise
    except UndefinedOnExecuteException:
        print("Step missing required on_execute callback")
        raise

def create_step_with_validation():
    """Create saga step with callback validation."""
    try:
        saga = Saga()
        step = saga.local_step()
        
        # Define on_execute
        step.on_execute(process_data)
        
        # This would raise MultipleOnExecuteException
        # step.on_execute(other_function)  # Uncommenting would fail
        
        return saga.commit()
        
    except MultipleOnExecuteException:
        print("Step already has on_execute callback defined")
        raise

Exception Handling in Saga Execution

from minos.saga import (
    SagaManager, SagaExecution, SagaFailedExecutionException,
    SagaPausedExecutionStepException, SagaRollbackExecutionException
)

async def execute_saga_with_error_handling(saga_definition, context):
    """Execute saga with comprehensive error handling."""
    manager = SagaManager(storage=repo, broker_pool=broker)
    
    try:
        # Attempt saga execution
        result = await manager.run(
            definition=saga_definition,
            context=context,
            autocommit=True,
            raise_on_error=True
        )
        
        print("Saga completed successfully")
        return result
        
    except SagaFailedExecutionException as e:
        print(f"Saga execution failed: {e}")
        
        # Attempt to load execution for inspection
        try:
            execution = await repo.load(e.execution_uuid)
            print(f"Failed at step: {execution.paused_step}")
            print(f"Execution status: {execution.status}")
            
            # Trigger rollback
            await execution.rollback()
            print("Rollback completed")
            
        except SagaRollbackExecutionException as rollback_error:
            print(f"Rollback also failed: {rollback_error}")
            # Manual cleanup may be required
            
        raise
        
    except SagaPausedExecutionStepException as e:
        print(f"Saga paused at step: {e.step_uuid}")
        
        # Handle pause - could resume later with response
        print("Saga execution paused, will resume when response arrives")
        return e.execution_uuid  # Return for later resumption
        
    except Exception as e:
        print(f"Unexpected error during saga execution: {e}")
        raise

Step-Level Exception Handling

from minos.saga import SagaContext, SagaRequest, SagaResponse

def handle_step_with_exceptions(context):
    """Local step with exception handling."""
    try:
        # Validate required context data
        if not context.get("order_id"):
            raise ValueError("Order ID is required")
        
        if not context.get("customer_id"):
            raise ValueError("Customer ID is required")
        
        # Process business logic
        result = process_order_validation(context)
        
        # Update context with results
        context.validation_result = result
        context.validation_status = "passed"
        
        return context
        
    except ValueError as e:
        # Business logic error - log and propagate
        print(f"Validation error: {e}")
        context.validation_error = str(e)
        context.validation_status = "failed"
        raise  # Will cause saga to fail and rollback
        
    except Exception as e:
        # Unexpected error - log and propagate
        print(f"Unexpected validation error: {e}")
        context.validation_error = f"System error: {str(e)}"
        context.validation_status = "error"
        raise

def handle_remote_step_errors(context, response):
    """Remote step success handler with error checking."""
    try:
        # Check response status
        if not response.ok:
            error_data = await response.content()
            error_message = error_data.get("message", "Unknown error")
            
            # Handle different error types
            if response.status == 400:  # Business error
                raise ValueError(f"Business error: {error_message}")
            elif response.status == 500:  # System error
                raise RuntimeError(f"System error: {error_message}")
            else:
                raise Exception(f"Unexpected error: {error_message}")
        
        # Process successful response
        data = await response.content()
        context.update(data)
        return context
        
    except ValueError as e:
        # Business error - update context and propagate
        context.business_error = str(e)
        raise
        
    except RuntimeError as e:
        # System error - update context and propagate
        context.system_error = str(e) 
        raise
        
    except Exception as e:
        # Unexpected error
        context.unexpected_error = str(e)
        raise

Compensation Exception Handling

def create_saga_with_compensation_handling():
    """Create saga with robust compensation error handling."""
    saga = Saga()
    
    # Step 1: Reserve inventory
    saga.remote_step() \
        .on_execute(reserve_inventory) \
        .on_success(handle_reservation_success) \
        .on_error(handle_reservation_error) \
        .on_failure(release_inventory_with_error_handling)
    
    # Step 2: Process payment
    saga.remote_step() \
        .on_execute(process_payment) \
        .on_success(handle_payment_success) \
        .on_error(handle_payment_error) \
        .on_failure(refund_payment_with_error_handling)
    
    return saga.commit()

def release_inventory_with_error_handling(context):
    """Compensation with error handling."""
    try:
        if not hasattr(context, 'reservation_id'):
            print("No reservation to release")
            return None
        
        return SagaRequest(
            target="inventory-service",
            content={
                "action": "release",
                "reservation_id": context.reservation_id
            }
        )
        
    except Exception as e:
        # Log compensation error but don't fail saga rollback
        print(f"Error creating inventory release request: {e}")
        context.compensation_errors = context.get("compensation_errors", [])
        context.compensation_errors.append(f"inventory_release: {str(e)}")
        
        # Return None to skip this compensation
        return None

def refund_payment_with_error_handling(context):
    """Payment refund compensation with error handling."""
    try:
        if not hasattr(context, 'payment_id'):
            print("No payment to refund")
            return None
        
        return SagaRequest(
            target="payment-service",
            content={
                "action": "refund",
                "payment_id": context.payment_id,
                "amount": context.charged_amount,
                "reason": "saga_rollback"
            }
        )
        
    except Exception as e:
        print(f"Error creating refund request: {e}")
        context.compensation_errors = context.get("compensation_errors", [])
        context.compensation_errors.append(f"payment_refund: {str(e)}")
        
        # Critical compensation - still return request even if there are issues
        return SagaRequest(
            target="payment-service",
            content={
                "action": "refund",
                "payment_id": context.get("payment_id", "unknown"),
                "reason": "saga_rollback_fallback"
            }
        )

Exception Recovery Patterns

from minos.saga import SagaExecutionNotFoundException

async def resume_failed_saga(execution_uuid):
    """Attempt to resume or recover a failed saga execution."""
    try:
        # Try to load the execution
        execution = await repo.load(execution_uuid)
        
        # Check execution status
        if execution.status == SagaStatus.Errored:
            print("Execution is in error state, attempting rollback")
            await execution.rollback()
            
        elif execution.status == SagaStatus.Paused:
            print("Execution is paused, may be resumable")
            # Could resume with appropriate response
            
        elif execution.status == SagaStatus.Finished:
            print("Execution already completed")
            
        return execution
        
    except SagaExecutionNotFoundException:
        print(f"Execution {execution_uuid} not found in storage")
        return None
        
    except SagaRollbackExecutionException as e:
        print(f"Rollback failed: {e}")
        # May need manual intervention
        return None
        
    except Exception as e:
        print(f"Unexpected error during recovery: {e}")
        return None

def create_resilient_saga():
    """Create saga with multiple fallback mechanisms."""
    saga = Saga()
    
    # Primary processing step
    saga.remote_step() \
        .on_execute(primary_processing) \
        .on_success(handle_primary_success) \
        .on_error(try_fallback_processing) \
        .on_failure(cleanup_primary_attempt)
    
    return saga.commit()

def try_fallback_processing(context, response):
    """Error handler that attempts fallback processing."""
    try:
        error_data = await response.content()
        
        # Check if error is recoverable
        if error_data.get("error_code") == "TEMPORARY_UNAVAILABLE":
            # Mark for retry
            context.retry_primary = True
            context.retry_count = context.get("retry_count", 0) + 1
            
            if context.retry_count < 3:
                # Could trigger retry logic
                return context
        
        # Try alternative processing
        context.use_fallback = True
        return context
        
    except Exception as e:
        # Fallback attempt failed
        print(f"Fallback processing failed: {e}")
        return Exception(f"Both primary and fallback processing failed: {e}")

def cleanup_primary_attempt(context):
    """Cleanup after failed primary processing attempt."""
    try:
        cleanup_requests = []
        
        # Cleanup any partial state
        if hasattr(context, 'temp_resources'):
            cleanup_requests.append(SagaRequest(
                target="resource-service",
                content={"action": "cleanup", "resources": context.temp_resources}
            ))
        
        # Return first cleanup request (saga will handle sequentially)
        return cleanup_requests[0] if cleanup_requests else None
        
    except Exception as e:
        print(f"Cleanup failed: {e}")
        # Don't fail the saga rollback due to cleanup issues
        return None

Exception Monitoring and Alerting

import logging
from datetime import datetime

# Configure saga exception logging
logging.basicConfig(level=logging.INFO)
saga_logger = logging.getLogger("saga.exceptions")

def log_saga_exception(exception, context=None, execution_uuid=None):
    """Log saga exception with context information."""
    log_data = {
        "timestamp": datetime.utcnow().isoformat(),
        "exception_type": type(exception).__name__,
        "exception_message": str(exception),
        "execution_uuid": str(execution_uuid) if execution_uuid else None
    }
    
    if context:
        log_data["context_keys"] = list(context.keys())
        log_data["context_size"] = len(context)
    
    saga_logger.error(f"Saga exception occurred: {log_data}")

def create_monitored_saga():
    """Create saga with exception monitoring."""
    saga = Saga()
    
    saga.local_step().on_execute(monitored_local_step)
    saga.remote_step() \
        .on_execute(monitored_remote_step) \
        .on_success(monitored_success_handler) \
        .on_error(monitored_error_handler)
    
    return saga.commit()

def monitored_local_step(context):
    """Local step with exception monitoring."""
    try:
        # Business logic here
        result = process_business_logic(context)
        return result
        
    except Exception as e:
        log_saga_exception(e, context)
        
        # Could send alert to monitoring system
        send_alert(f"Local step failed: {e}")
        
        raise  # Re-raise to trigger saga rollback

def monitored_error_handler(context, response):
    """Error handler with monitoring."""
    try:
        error_data = await response.content()
        
        # Log the error with full context
        log_saga_exception(
            Exception(f"Remote service error: {error_data}"),
            context,
            context.get("execution_uuid")
        )
        
        # Handle the error
        return Exception(f"Service error: {error_data.get('message')}")
        
    except Exception as e:
        log_saga_exception(e, context)
        raise

def send_alert(message):
    """Send alert to monitoring system."""
    # Integration with monitoring/alerting system
    print(f"ALERT: {message}")

Install with Tessl CLI

npx tessl i tessl/pypi-minos-microservice-saga

docs

context-management.md

exception-handling.md

execution-engine.md

index.md

message-system.md

saga-definitions.md

testing-utilities.md

tile.json