CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-minos-microservice-saga

Implementation of the SAGA pattern for distributed microservice transactions in the Minos Framework.

Pending
Overview
Eval results
Files

execution-engine.mddocs/

Execution Engine

Runtime execution engine providing saga orchestration, state management, pause/resume capabilities, and comprehensive lifecycle control. This module handles the actual execution of saga definitions with support for distributed coordination, error recovery, and transaction management.

Capabilities

Saga Execution Management

The core execution class that represents a running instance of a saga definition.

class SagaExecution:
    """
    Runtime execution instance of a saga definition.
    
    Attributes:
        uuid (UUID): Unique execution identifier
        definition (Saga): Saga definition being executed
        executed_steps (list[SagaStepExecution]): Steps that have been executed
        context (SagaContext): Current execution context
        status (SagaStatus): Current execution status
        paused_step (SagaStepExecution): Currently paused step (if any)
        already_rollback (bool): Whether rollback has been performed
        user (Optional[UUID]): User identifier for remote steps
    """
    def __init__(self, definition, uuid, context, status=SagaStatus.Created, steps=None, paused_step=None, already_rollback=False, user=None, *args, **kwargs):
        """
        Initialize execution instance with definition and context.
        
        Args:
            definition (Saga): Saga definition to execute
            uuid (UUID): Unique execution identifier
            context (SagaContext): Initial execution context
            status (SagaStatus): Initial execution status
            steps (Optional[list[SagaStepExecution]]): Pre-existing step executions
            paused_step (Optional[SagaStepExecution]): Currently paused step
            already_rollback (bool): Whether rollback has been performed
            user (Optional[UUID]): User identifier for remote steps
        """
    
    @classmethod
    def from_definition(cls, definition, context=None, uuid=None, *args, **kwargs):
        """
        Create execution from saga definition.
        
        Args:
            definition (Saga): Committed saga definition
            context (Optional[SagaContext]): Initial context
            uuid (Optional[UUID]): Execution identifier
            
        Returns:
            SagaExecution: New execution instance
            
        Raises:
            SagaNotCommittedException: If saga not committed
        """
    
    @classmethod
    def from_raw(cls, raw, **kwargs):
        """
        Build execution from raw representation.
        
        Args:
            raw (dict): Raw execution data
            
        Returns:
            SagaExecution: Reconstructed execution instance
        """
    
    def execute(self, response=None, autocommit=True, **kwargs):
        """
        Execute the saga steps.
        
        Args:
            response (Optional[SagaResponse]): Response for continuing paused step
            autocommit (bool): Whether to automatically commit/reject transactions
            
        Returns:
            SagaContext: Final execution context
            
        Raises:
            SagaExecutionAlreadyExecutedException: If execution already finished
            SagaFailedExecutionException: If execution fails
            SagaPausedExecutionStepException: If step requires pause
        """
    
    def rollback(self, autoreject=True, **kwargs):
        """
        Perform compensatory rollback of executed steps.
        
        Args:
            autoreject (bool): Whether to automatically reject transactions
            
        Raises:
            SagaRollbackExecutionException: If rollback fails
        """
    
    def commit(self, **kwargs):
        """
        Commit execution transactions.
        
        Raises:
            SagaFailedCommitCallbackException: If commit callback fails
        """
    
    def reject(self, **kwargs):
        """Reject execution transactions."""

Saga Orchestration Manager

The main orchestrator for saga execution lifecycle management.

class SagaManager:
    """
    Main orchestrator for saga execution lifecycle.
    
    Attributes:
        storage (SagaExecutionRepository): Persistence repository
        broker_pool (BrokerClientPool): Message broker connection pool
    """
    def __init__(self, storage, broker_pool=None, pool_factory=None, **kwargs):
        """
        Initialize manager with storage and broker pool.
        
        Args:
            storage (SagaExecutionRepository): Repository for saga persistence
            broker_pool (Optional[BrokerClientPool]): Message broker pool
            pool_factory: Factory for creating broker pools
        """
    
    def run(self, definition=None, context=None, response=None, user=None, autocommit=True, pause_on_disk=False, raise_on_error=True, return_execution=True, **kwargs):
        """
        Execute saga with comprehensive lifecycle management.
        
        Args:
            definition (Optional[Saga]): Saga definition to execute
            context (Optional[SagaContext]): Initial execution context
            response (Optional[SagaResponse]): Response for continuing execution
            user (Optional[UUID]): User identifier for remote steps
            autocommit (bool): Automatically commit/reject transactions
            pause_on_disk (bool): Pause remote steps on disk vs memory
            raise_on_error (bool): Raise exceptions on execution errors
            return_execution (bool): Return SagaExecution vs UUID
            
        Returns:
            Union[SagaExecution, UUID, SagaContext]: Execution result based on options
            
        Raises:
            SagaFailedExecutionException: If execution fails and raise_on_error=True
        """
    
    @classmethod
    def _from_config(cls, config, **kwargs):
        """
        Build manager from configuration.
        
        Args:
            config: Configuration object
            
        Returns:
            SagaManager: Configured manager instance
        """

Execution Status Management

Enums defining the various states of saga and step execution.

from enum import Enum

class SagaStatus(Enum):
    """
    Saga execution status states.
    
    Values:
        Created: Initial state before execution
        Running: Currently executing steps
        Paused: Execution paused waiting for response
        Finished: Successfully completed all steps
        Errored: Execution failed with error
    """
    Created = "created"
    Running = "running"
    Paused = "paused"
    Finished = "finished"
    Errored = "errored"

class SagaStepStatus(Enum):
    """
    Individual step execution status states.
    
    Values:
        Created: Step created but not started
        RunningOnExecute: Executing main operation
        FinishedOnExecute: Main operation completed
        ErroredOnExecute: Main operation failed
        PausedByOnExecute: Paused by main operation
        ErroredByOnExecute: Error in main operation
        RunningOnFailure: Executing failure compensation
        PausedOnFailure: Paused during failure handling
        ErroredOnFailure: Failure compensation failed
        RunningOnSuccess: Processing successful response
        ErroredOnSuccess: Success handler failed
        RunningOnError: Processing error response
        ErroredOnError: Error handler failed
        Finished: Step completed successfully
    """
    Created = "created"
    RunningOnExecute = "running-on-execute"
    FinishedOnExecute = "finished-on-execute"
    ErroredOnExecute = "errored-on-execute"
    PausedByOnExecute = "paused-by-on-execute"
    ErroredByOnExecute = "errored-by-on-execute"
    RunningOnFailure = "running-on-failure"
    PausedOnFailure = "paused-on-failure"
    ErroredOnFailure = "errored-on-failure"
    RunningOnSuccess = "running-on-success"
    ErroredOnSuccess = "errored-on-success"
    RunningOnError = "running-on-error"
    ErroredOnError = "errored-on-error"
    Finished = "finished"

Step Execution Classes

Runtime execution instances for different step types.

from abc import ABC, abstractmethod

class SagaStepExecution(ABC):
    """
    Base class for step execution instances.
    
    Attributes:
        uuid (UUID): Step execution identifier
        definition (SagaStep): Step definition being executed
        status (SagaStepStatus): Current step status
    """
    def __init__(self, definition, uuid, status=SagaStepStatus.Created, **kwargs):
        """Initialize step execution with definition."""

class LocalSagaStepExecution(SagaStepExecution):
    """
    Execution instance for local steps.
    
    Handles local function execution within the same service process.
    """
    def __init__(self, definition, uuid, status=SagaStepStatus.Created, **kwargs):
        """Initialize local step execution."""

class RemoteSagaStepExecution(SagaStepExecution):
    """
    Execution instance for remote steps.
    
    Handles remote service calls with request/response coordination.
    """
    def __init__(self, definition, uuid, status=SagaStepStatus.Created, **kwargs):
        """Initialize remote step execution."""

class ConditionalSagaStepExecution(SagaStepExecution):
    """
    Execution instance for conditional steps.
    
    Handles conditional logic evaluation and nested saga execution.
    """
    def __init__(self, definition, uuid, status=SagaStepStatus.Created, **kwargs):
        """Initialize conditional step execution."""

Execution Persistence

Repository system for durable saga execution storage and recovery.

from abc import ABC, abstractmethod

class SagaExecutionRepository(ABC):
    """
    Base class for saga execution persistence.
    
    Provides durable storage for saga execution state enabling
    pause/resume and recovery capabilities.
    """
    @abstractmethod
    def store(self, execution):
        """
        Store saga execution to persistent storage.
        
        Args:
            execution (SagaExecution): Execution to store
        """
    
    @abstractmethod
    def load(self, uuid):
        """
        Load saga execution from persistent storage.
        
        Args:
            uuid (Union[UUID, str]): Execution identifier
            
        Returns:
            SagaExecution: Loaded execution instance
            
        Raises:
            SagaExecutionNotFoundException: If execution not found
        """
    
    @abstractmethod
    def delete(self, uuid):
        """
        Delete saga execution from persistent storage.
        
        Args:
            uuid (Union[UUID, str]): Execution identifier
        """

class DatabaseSagaExecutionRepository(SagaExecutionRepository):
    """Database implementation of saga execution repository."""

class SagaExecutionDatabaseOperationFactory:
    """Factory for database operations on saga executions."""

Transaction Management

Two-phase commit protocol implementation for distributed transaction coordination.

class TransactionCommitter:
    """
    Manages two-phase commit protocol for saga transactions.
    
    Coordinates distributed transaction commits across multiple
    services participating in the saga execution.
    
    Attributes:
        execution_uuid (UUID): Execution identifier
        executed_steps (list[SagaStepExecution]): Steps that participated
        transactions (list[tuple[UUID, str]]): Transaction UUID and service pairs
    """
    def __init__(self, execution_uuid, executed_steps, broker_publisher, broker_pool=None, **kwargs):
        """
        Initialize committer with execution details.
        
        Args:
            execution_uuid (UUID): Saga execution identifier
            executed_steps (list[SagaStepExecution]): Executed steps
            broker_publisher: Message broker publisher
            broker_pool: Optional broker connection pool
        """
    
    def commit(self, **kwargs):
        """
        Commit all transactions using two-phase commit protocol.
        
        Sends commit messages to all participating services and
        waits for confirmation of successful commitment.
        
        Raises:
            SagaFailedCommitCallbackException: If any service fails to commit
        """
    
    def reject(self):
        """
        Reject all transactions.
        
        Sends reject messages to all participating services to
        rollback their local transaction state.
        """

Execution Coordinators

Specialized executors for different types of saga operations.

class Executor:
    """
    Base executor for saga operations.
    
    Attributes:
        execution_uuid (UUID): Execution identifier for transaction context
    """
    def __init__(self, execution_uuid, *args, **kwargs):
        """Initialize executor with execution context."""
    
    def exec(self, operation, *args, **kwargs):
        """Execute saga operation within transaction context."""
    
    def exec_function(self, func, *args, **kwargs):
        """Execute function within transaction context."""

class LocalExecutor(Executor):
    """Executor for local operations within the same service."""

class RequestExecutor(Executor):
    """Executor for remote request operations to other services."""

class ResponseExecutor(Executor):
    """Executor for remote response processing from other services."""

Usage Examples

Basic Saga Execution

from minos.saga import SagaManager, SagaExecution, SagaContext
from minos.saga.executions.repositories import DatabaseSagaExecutionRepository

# Initialize saga manager
storage = DatabaseSagaExecutionRepository(...)
manager = SagaManager(storage=storage, broker_pool=broker_pool)

# Execute saga with automatic lifecycle management
async def execute_order_saga(order_data):
    saga_definition = create_order_saga()
    context = SagaContext(order=order_data)
    
    # Run with automatic commit and error handling
    result = await manager.run(
        definition=saga_definition,
        context=context,
        autocommit=True,
        raise_on_error=True
    )
    
    return result

Manual Execution Control

from minos.saga import SagaExecution, SagaStatus

# Create execution manually for fine-grained control
execution = SagaExecution.from_definition(
    definition=saga_definition,
    context=SagaContext(data="initial"),
    uuid=uuid4()
)

# Execute with manual transaction control
try:
    context = await execution.execute(autocommit=False)
    
    # Manual commit after validation
    if validate_results(context):
        await execution.commit()
    else:
        await execution.reject()
        
except Exception as e:
    # Manual rollback on failure
    await execution.rollback()
    raise

Pause and Resume Execution

# Execute with pause-on-disk for background processing
execution_uuid = await manager.run(
    definition=long_running_saga,
    context=initial_context,
    pause_on_disk=True,      # Pause remote steps on disk
    return_execution=False   # Return UUID instead of execution
)

# Later, resume with response
response = SagaResponse(content={"result": "processed"})
final_context = await manager.run(
    response=response,
    pause_on_disk=True
)

Status Monitoring

from minos.saga import SagaStatus, SagaStepStatus

# Check execution status
execution = await storage.load(execution_uuid)

if execution.status == SagaStatus.Paused:
    print(f"Execution paused at step: {execution.paused_step.uuid}")
    print(f"Step status: {execution.paused_step.status}")
    
elif execution.status == SagaStatus.Finished:
    print("Saga completed successfully")
    print(f"Final context: {execution.context}")
    
elif execution.status == SagaStatus.Errored:
    print("Saga execution failed")
    # Trigger rollback if needed
    await execution.rollback()

Custom Repository Implementation

from minos.saga.executions.repositories import SagaExecutionRepository

class CustomSagaRepository(SagaExecutionRepository):
    def __init__(self, storage_backend):
        self.storage = storage_backend
    
    async def store(self, execution):
        # Custom storage logic
        await self.storage.save(execution.uuid, execution.raw)
    
    async def load(self, uuid):
        # Custom loading logic
        raw_data = await self.storage.get(uuid)
        if not raw_data:
            raise SagaExecutionNotFoundException(f"Execution {uuid} not found")
        return SagaExecution.from_raw(raw_data)
    
    async def delete(self, uuid):
        # Custom deletion logic  
        await self.storage.remove(uuid)

# Use custom repository
custom_repo = CustomSagaRepository(my_storage)
manager = SagaManager(storage=custom_repo)

Install with Tessl CLI

npx tessl i tessl/pypi-minos-microservice-saga

docs

context-management.md

exception-handling.md

execution-engine.md

index.md

message-system.md

saga-definitions.md

testing-utilities.md

tile.json