CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-minos-microservice-saga

Implementation of the SAGA pattern for distributed microservice transactions in the Minos Framework.

Pending
Overview
Eval results
Files

saga-definitions.mddocs/

Saga Definitions

Core functionality for defining distributed transaction sequences with local and remote steps, conditional logic, and compensation behaviors. This module provides the declarative API for constructing sagas with proper validation and commitment semantics.

Capabilities

Saga Construction and Management

The main Saga class provides the primary interface for constructing distributed transaction definitions.

class Saga:
    """
    Main class for defining distributed transaction sequences.
    
    Attributes:
        steps (list[SagaStep]): List of saga steps in execution order
        committed (bool): Whether saga is committed and ready for execution
        raw (dict[str, Any]): Raw dictionary representation
    """
    def __init__(self, *args, steps=None, committed=False, commit=None, **kwargs):
        """Initialize saga with optional steps and commit callback."""
    
    def local_step(self, step=None, **kwargs):
        """
        Add a local execution step.
        
        Returns:
            LocalSagaStep: New local step for method chaining
        """
    
    def remote_step(self, step=None, **kwargs):
        """
        Add a remote microservice step.
        
        Returns:
            RemoteSagaStep: New remote step for method chaining
        """
    
    def conditional_step(self, step=None):
        """
        Add a conditional step with if-then-else logic.
        
        Returns:
            ConditionalSagaStep: New conditional step for method chaining
        """
    
    def commit(self, callback=None, **kwargs):
        """
        Commit saga for execution. Once committed, saga cannot be modified.
        
        Args:
            callback: Optional commit callback function
            
        Returns:
            Saga: Self for chaining
            
        Raises:
            AlreadyCommittedException: If saga is already committed
            EmptySagaException: If saga has no steps
        """
    
    def validate(self):
        """
        Validate saga definition.
        
        Raises:
            EmptySagaException: If saga has no steps
            Various step validation exceptions
        """
    
    @classmethod
    def from_raw(cls, raw, **kwargs):
        """
        Build saga from raw dictionary representation.
        
        Args:
            raw (dict): Raw saga definition
            
        Returns:
            Saga: Constructed saga instance
        """

Saga Operations

Container class for saga step operations with callbacks and parameters.

class SagaOperation:
    """
    Container for saga step operations with callbacks and parameters.
    
    Attributes:
        callback (T): The callback function to execute
        parameters (Optional[SagaContext]): Parameters to pass to callback
        parameterized (bool): Whether parameters are provided
        raw (dict[str, Any]): Raw representation
    """
    def __init__(self, callback, parameters=None, **kwargs):
        """Initialize with callback function and optional parameters."""
    
    @classmethod
    def from_raw(cls, raw, **kwargs):
        """Build operation from raw representation."""

Local Saga Steps

Steps for local execution within the same microservice.

class LocalSagaStep(SagaStep):
    """
    Step for local execution within same service.
    
    Attributes:
        on_execute_operation (Optional[SagaOperation[LocalCallback]]): Execute operation
        on_failure_operation (Optional[SagaOperation[LocalCallback]]): Failure operation
    """
    def __init__(self, on_execute=None, on_failure=None, **kwargs):
        """Initialize local step with optional callbacks."""
    
    def on_execute(self, callback, parameters=None, **kwargs):
        """
        Set execution callback for the step.
        
        Args:
            callback (LocalCallback): Function to execute locally
            parameters (Optional[SagaContext]): Parameters for callback
            
        Returns:
            LocalSagaStep: Self for chaining
            
        Raises:
            MultipleOnExecuteException: If on_execute already set
        """
    
    def on_failure(self, callback, parameters=None, **kwargs):
        """
        Set failure compensation callback.
        
        Args:
            callback (LocalCallback): Function to execute on failure
            parameters (Optional[SagaContext]): Parameters for callback
            
        Returns:
            LocalSagaStep: Self for chaining
            
        Raises:
            MultipleOnFailureException: If on_failure already set
        """
    
    def validate(self):
        """
        Validate step has required callbacks.
        
        Raises:
            UndefinedOnExecuteException: If no on_execute defined
            EmptySagaStepException: If step has no actions
        """

Remote Saga Steps

Steps for remote microservice calls with success/error handling.

class RemoteSagaStep(SagaStep):
    """
    Step for remote microservice calls.
    
    Attributes:
        on_execute_operation (Optional[SagaOperation[RequestCallBack]]): Execute operation
        on_success_operation (Optional[SagaOperation[ResponseCallBack]]): Success operation
        on_error_operation (Optional[SagaOperation[ResponseCallBack]]): Error operation
        on_failure_operation (Optional[SagaOperation[RequestCallBack]]): Failure operation
    """
    def __init__(self, on_execute=None, on_success=None, on_error=None, on_failure=None, **kwargs):
        """Initialize remote step with optional callbacks."""
    
    def on_execute(self, callback, parameters=None, **kwargs):
        """
        Set request callback for remote service call.
        
        Args:
            callback (RequestCallBack): Function that creates SagaRequest
            parameters (Optional[SagaContext]): Parameters for callback
            
        Returns:
            RemoteSagaStep: Self for chaining
            
        Raises:
            MultipleOnExecuteException: If on_execute already set
        """
    
    def on_success(self, callback, parameters=None, **kwargs):
        """
        Set success response callback.
        
        Args:
            callback (ResponseCallBack): Function to handle successful response
            parameters (Optional[SagaContext]): Parameters for callback
            
        Returns:
            RemoteSagaStep: Self for chaining
            
        Raises:
            MultipleOnSuccessException: If on_success already set
        """
    
    def on_error(self, callback, parameters=None, **kwargs):
        """
        Set error response callback for business logic errors.
        
        Args:
            callback (ResponseCallBack): Function to handle error response
            parameters (Optional[SagaContext]): Parameters for callback
            
        Returns:
            RemoteSagaStep: Self for chaining
            
        Raises:
            MultipleOnErrorException: If on_error already set
        """
    
    def on_failure(self, callback, parameters=None, **kwargs):
        """
        Set failure compensation callback for rollback.
        
        Args:
            callback (RequestCallBack): Function to create compensation request
            parameters (Optional[SagaContext]): Parameters for callback
            
        Returns:
            RemoteSagaStep: Self for chaining
            
        Raises:
            MultipleOnFailureException: If on_failure already set
        """
    
    def validate(self):
        """
        Validate step configuration.
        
        Raises:
            UndefinedOnExecuteException: If no on_execute defined
            EmptySagaStepException: If step has no actions
        """

Conditional Saga Steps

Steps for conditional saga execution based on runtime conditions.

class ConditionalSagaStep(SagaStep):
    """
    Step for conditional saga execution.
    
    Attributes:
        if_then_alternatives (list[IfThenAlternative]): List of conditional alternatives
        else_then_alternative (ElseThenAlternative): Default alternative
    """
    def __init__(self, if_then=None, else_then=None, **kwargs):
        """Initialize conditional step with optional alternatives."""
    
    def if_then(self, condition, saga):
        """
        Add if-then alternative with condition and saga.
        
        Args:
            condition: Condition function to evaluate
            saga (Saga): Saga to execute if condition is true
            
        Returns:
            ConditionalSagaStep: Self for chaining
        """
    
    def else_then(self, saga):
        """
        Set else alternative with saga.
        
        Args:
            saga (Saga): Default saga to execute
            
        Returns:
            ConditionalSagaStep: Self for chaining
            
        Raises:
            MultipleElseThenException: If else_then already set
        """
    
    def validate(self):
        """
        Validate alternatives.
        
        Raises:
            EmptySagaStepException: If no alternatives defined
        """

class IfThenAlternative:
    """
    Represents an if-then conditional alternative.
    
    Attributes:
        condition (SagaOperation): Condition operation to evaluate
        saga (Saga): Saga to execute if condition is true
    """
    def __init__(self, condition, saga):
        """Initialize with condition function and saga."""
    
    @classmethod
    def from_raw(cls, raw, **kwargs):
        """Build from raw representation."""
    
    def validate(self):
        """Validate the alternative."""

class ElseThenAlternative:
    """
    Represents an else alternative for conditional steps.
    
    Attributes:
        saga (Saga): Saga to execute as default
    """
    def __init__(self, saga):
        """Initialize with saga."""
    
    @classmethod
    def from_raw(cls, raw, **kwargs):
        """Build from raw representation."""
    
    def validate(self):
        """Validate the alternative."""

Step Base Class

Abstract base class for all saga steps providing common functionality.

from abc import ABC, abstractmethod

class SagaStep(ABC):
    """
    Base class for all saga steps.
    
    Attributes:
        saga (Optional[Saga]): Reference to parent saga
    """
    def __init__(self, saga=None, **kwargs):
        """Initialize step with optional saga reference."""
    
    def conditional_step(self, *args, **kwargs):
        """Create new conditional step in saga."""
    
    def local_step(self, *args, **kwargs):
        """Create new local step in saga."""
    
    def remote_step(self, *args, **kwargs):
        """Create new remote step in saga."""
    
    def commit(self, *args, **kwargs):
        """Commit the saga."""
    
    @abstractmethod
    def validate(self):
        """Abstract method to validate step."""
    
    @classmethod
    def from_raw(cls, raw, **kwargs):
        """Class method to build from raw representation."""
    
    @property
    @abstractmethod
    def raw(self):
        """Abstract property for raw representation."""

Usage Examples

Basic Saga with Local and Remote Steps

from minos.saga import Saga, SagaContext, SagaRequest

# Create a simple order processing saga
def create_order_saga():
    saga = Saga()
    
    # Local validation step
    saga.local_step() \
        .on_execute(validate_order) \
        .on_failure(log_validation_failure)
    
    # Remote payment step
    saga.remote_step() \
        .on_execute(create_payment_request) \
        .on_success(handle_payment_success) \
        .on_error(handle_payment_error) \
        .on_failure(refund_payment)
    
    return saga.commit()

def validate_order(context):
    if not context.order.get('total'):
        raise ValueError("Order total required")
    return context

def create_payment_request(context):
    return SagaRequest(
        target="payment-service",
        content={"amount": context.order["total"]}
    )

def handle_payment_success(context, response):
    context.payment_id = response.content()["id"]
    return context

Conditional Saga Logic

def create_conditional_saga():
    saga = Saga()
    
    # Main processing step
    saga.local_step().on_execute(process_data)
    
    # Conditional step based on data type
    conditional = saga.conditional_step()
    
    # If premium customer
    premium_saga = Saga()
    premium_saga.remote_step().on_execute(process_premium_features)
    conditional.if_then(is_premium_customer, premium_saga.commit())
    
    # If regular customer  
    regular_saga = Saga()
    regular_saga.remote_step().on_execute(process_regular_features)
    conditional.else_then(regular_saga.commit())
    
    return saga.commit()

def is_premium_customer(context):
    return context.customer.get('tier') == 'premium'

def process_premium_features(context):
    return SagaRequest(
        target="premium-service",
        content={"customer_id": context.customer["id"]}
    )

Complex Multi-Step Saga

def create_booking_saga():
    saga = Saga()
    
    # Step 1: Validate booking request
    saga.local_step() \
        .on_execute(validate_booking) \
        .on_failure(handle_validation_failure)
    
    # Step 2: Reserve hotel room
    saga.remote_step() \
        .on_execute(reserve_room) \
        .on_success(handle_room_reserved) \
        .on_error(handle_room_error) \
        .on_failure(cancel_room_reservation)
    
    # Step 3: Book flight
    saga.remote_step() \
        .on_execute(book_flight) \
        .on_success(handle_flight_booked) \
        .on_error(handle_flight_error) \
        .on_failure(cancel_flight)
    
    # Step 4: Process payment
    saga.remote_step() \
        .on_execute(process_payment) \
        .on_success(handle_payment_complete) \
        .on_error(handle_payment_failed) \
        .on_failure(refund_charges)
    
    # Step 5: Send confirmation
    saga.local_step().on_execute(send_confirmation)
    
    return saga.commit()

Type Definitions

from typing import Callable, Union, Awaitable, Optional

# Callback function types
RequestCallBack = Callable[[SagaContext, ...], Union[SagaRequest, Awaitable[SagaRequest]]]
ResponseCallBack = Callable[[SagaContext, SagaResponse, ...], Union[Union[Exception, SagaContext], Awaitable[Union[Exception, SagaContext]]]]
LocalCallback = Callable[[SagaContext, ...], Union[Optional[SagaContext], Awaitable[Optional[SagaContext]]]]

Install with Tessl CLI

npx tessl i tessl/pypi-minos-microservice-saga

docs

context-management.md

exception-handling.md

execution-engine.md

index.md

message-system.md

saga-definitions.md

testing-utilities.md

tile.json