or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

context-management.mdexception-handling.mdexecution-engine.mdindex.mdmessage-system.mdsaga-definitions.mdtesting-utilities.md
tile.json

tessl/pypi-minos-microservice-saga

Implementation of the SAGA pattern for distributed microservice transactions in the Minos Framework.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/minos-microservice-saga@0.7.x

To install, run

npx @tessl/cli install tessl/pypi-minos-microservice-saga@0.7.0

index.mddocs/

Minos Microservice Saga

A comprehensive Python library implementing the SAGA pattern for distributed microservice transactions in the Minos Framework. This package provides orchestration capabilities for managing complex business processes that span multiple microservices, ensuring data consistency through eventual consistency patterns and compensation-based rollback mechanisms.

Package Information

  • Package Name: minos-microservice-saga
  • Package Type: pypi
  • Language: Python
  • Installation: pip install minos-microservice-saga

Core Imports

from minos.saga import (
    # Core classes
    Saga,
    SagaContext,
    SagaManager,
    SagaExecution,
    SagaService,
    
    # Step definitions
    LocalSagaStep,
    RemoteSagaStep,
    ConditionalSagaStep,
    IfThenAlternative,
    ElseThenAlternative,
    
    # Messages
    SagaRequest,
    SagaResponse,
    SagaResponseStatus,
    
    # Status and execution
    SagaStatus,
    SagaStepStatus,
    SagaStepExecution,
    
    # Repositories
    SagaExecutionRepository,
    DatabaseSagaExecutionRepository,
    
    # Transaction management
    TransactionCommitter,
    
    # Middleware and utilities
    transactional_command,
    get_service_name,
)

Basic Usage

from minos.saga import Saga, SagaContext, SagaManager, SagaRequest

# Define a saga with multiple steps
def create_order_saga():
    saga = Saga()
    
    # Add local step for order validation
    saga.local_step().on_execute(validate_order).on_failure(handle_validation_failure)
    
    # Add remote step for payment processing
    saga.remote_step() \
        .on_execute(create_payment_request) \
        .on_success(handle_payment_success) \
        .on_error(handle_payment_error) \
        .on_failure(compensate_payment)
    
    # Add remote step for inventory reservation
    saga.remote_step() \
        .on_execute(reserve_inventory_request) \
        .on_success(handle_inventory_success) \
        .on_error(handle_inventory_error) \
        .on_failure(release_inventory)
    
    return saga.commit()

# Execute the saga
async def process_order(order_data):
    saga_definition = create_order_saga()
    context = SagaContext(order=order_data)
    
    # Initialize saga manager with storage and broker
    manager = SagaManager(storage=repo, broker_pool=broker)
    
    # Run the saga
    result = await manager.run(saga_definition, context=context)
    return result

# Define callback functions
def validate_order(context):
    # Local validation logic
    if not context.order.get('amount'):
        raise ValueError("Order amount required")
    return context

def create_payment_request(context):
    # Create payment service request
    return SagaRequest(
        target="payment-service", 
        content={"amount": context.order["amount"]}
    )

def handle_payment_success(context, response):
    # Process successful payment
    context.payment_id = response.content()["payment_id"]
    return context

Architecture

The SAGA pattern implementation is built around several key components:

  • Saga Definitions: Declarative step sequences with compensation logic
  • Execution Engine: Runtime orchestrator managing step execution and state
  • Context Management: Stateful container carrying data between steps
  • Message System: Request/response infrastructure for microservice communication
  • Transaction Management: Two-phase commit protocol for distributed consistency
  • Persistence Layer: Durable storage for execution state and recovery

This architecture enables resilient distributed transactions with automatic compensation, pause/resume capabilities, and comprehensive error handling for complex microservice workflows.

Capabilities

Saga Definition and Construction

Core functionality for defining distributed transaction sequences with local and remote steps, conditional logic, and compensation behaviors.

class Saga:
    def __init__(self, steps=None, committed=False, **kwargs): ...
    def local_step(self, step=None, **kwargs): ...
    def remote_step(self, step=None, **kwargs): ...
    def conditional_step(self, step=None): ...
    def commit(self, callback=None, **kwargs): ...

class LocalSagaStep(SagaStep):
    def on_execute(self, callback, parameters=None, **kwargs): ...
    def on_failure(self, callback, parameters=None, **kwargs): ...

class RemoteSagaStep(SagaStep):
    def on_execute(self, callback, parameters=None, **kwargs): ...
    def on_success(self, callback, parameters=None, **kwargs): ...
    def on_error(self, callback, parameters=None, **kwargs): ...
    def on_failure(self, callback, parameters=None, **kwargs): ...

class ConditionalSagaStep(SagaStep):
    def if_then(self, condition, saga): ...
    def else_then(self, saga): ...

Saga Definitions

Execution Management and Orchestration

Runtime execution engine providing saga orchestration, state management, pause/resume capabilities, and comprehensive lifecycle control.

class SagaExecution:
    def __init__(self, definition, uuid, context, status=SagaStatus.Created, **kwargs): ...
    def execute(self, response=None, autocommit=True, **kwargs): ...
    def rollback(self, autoreject=True, **kwargs): ...
    def commit(self, **kwargs): ...
    def reject(self, **kwargs): ...

class SagaManager:
    def __init__(self, storage, broker_pool=None, **kwargs): ...
    def run(self, definition=None, context=None, response=None, user=None, autocommit=True, pause_on_disk=False, **kwargs): ...

class SagaStatus(Enum):
    Created = "created"
    Running = "running"
    Paused = "paused"
    Finished = "finished"
    Errored = "errored"

Execution Engine

Context and State Management

Stateful execution context that maintains data across saga steps with dictionary-like interface and automatic persistence.

class SagaContext(BucketModel, MutableMapping):
    def __init__(self, **kwargs): ...
    def __setitem__(self, key, value): ...
    def __getitem__(self, key): ...
    def __setattr__(self, key, value): ...

Context Management

Message System and Communication

Request/response infrastructure for microservice communication with status tracking and service relationship management.

class SagaRequest:
    def __init__(self, target, content=None): ...
    def content(self, **kwargs): ...

class SagaResponse:
    def __init__(self, content=None, related_services=None, status=None, uuid=None, **kwargs): ...
    def content(self, **kwargs): ...

class SagaResponseStatus(IntEnum):
    SUCCESS = 200
    ERROR = 400
    SYSTEM_ERROR = 500

Message System

Exception Handling and Error Management

Comprehensive exception hierarchy for saga definition validation, execution errors, and system failures with specific error types for different failure scenarios.

class SagaException(MinosException): ...
class SagaExecutionException(SagaException): ...
class SagaFailedExecutionException(SagaExecutionException): ...
class SagaRollbackExecutionException(SagaExecutionException): ...
class SagaResponseException(SagaException): ...

Exception Handling

Testing and Development Utilities

Testing utilities and mocked implementations for saga development and validation including repository test cases and operation factories.

class SagaExecutionRepositoryTestCase:
    def build_saga_execution_repository(self): ...
    def test_store(self): ...
    def test_load_from_str(self): ...
    def test_delete(self): ...

class MockedSagaExecutionDatabaseOperationFactory: ...

Testing Utilities

Service Integration and Middleware

Service-level integration for saga management within microservice applications with middleware support for transactional operations.

class SagaService:
    def __init__(self, saga_manager, **kwargs): ...
    def __get_enroute__(self, config): ...
    def __reply__(self, request): ...

def transactional_command(request, inner):
    """
    Middleware for transactional command execution.
    
    Provides automatic transaction context management for
    saga operations within command handlers.
    
    Args:
        request: Incoming command request
        inner: Inner command handler function
        
    Returns:
        Command response with transaction context
    """

def get_service_name(config):
    """
    Utility function to extract service name from configuration.
    
    Args:
        config: Service configuration object
        
    Returns:
        str: Service name identifier
    """

Types

from typing import Callable, Union, Awaitable, Optional, TypeVar, Any, Dict, List, Tuple
from uuid import UUID
from enum import Enum, IntEnum

# Type variables
T = TypeVar('T')

# Core callback types
RequestCallBack = Callable[[SagaContext, ...], Union[SagaResponse, Awaitable[SagaRequest]]]
ResponseCallBack = Callable[[SagaContext, SagaResponse, ...], Union[Union[Exception, SagaContext], Awaitable[Union[Exception, SagaContext]]]]
LocalCallback = Callable[[SagaContext, ...], Union[Optional[SagaContext], Awaitable[Optional[SagaContext]]]]

# Operation wrapper
class SagaOperation:
    callback: T
    parameters: Optional[SagaContext]
    parameterized: bool
    raw: Dict[str, Any]

# Step definitions
class SagaStep:
    saga: Optional[Saga]
    raw: Dict[str, Any]

# Alternative classes for conditional steps
class IfThenAlternative:
    condition: Any
    saga: Saga

class ElseThenAlternative:
    saga: Saga

# Status enumerations
class SagaStatus(Enum):
    Created: str
    Running: str
    Paused: str
    Finished: str
    Errored: str

class SagaStepStatus(Enum):
    Created: str
    RunningOnExecute: str
    FinishedOnExecute: str
    ErroredOnExecute: str
    PausedByOnExecute: str
    ErroredByOnExecute: str
    RunningOnFailure: str
    PausedOnFailure: str
    ErroredOnFailure: str
    RunningOnSuccess: str
    ErroredOnSuccess: str
    RunningOnError: str
    ErroredOnError: str
    Finished: str

class SagaResponseStatus(IntEnum):
    SUCCESS: int
    ERROR: int
    SYSTEM_ERROR: int

# Execution types
class SagaStepExecution:
    uuid: UUID
    definition: SagaStep
    status: SagaStepStatus
    already_rollback: bool
    related_services: Optional[List[str]]
    raw: Dict[str, Any]

class TransactionCommitter:
    execution_uuid: UUID
    executed_steps: List[SagaStepExecution]
    transactions: List[Tuple[UUID, str]]

# Repository interface
class SagaExecutionRepository:
    async def store(self, execution: SagaExecution) -> None: ...
    async def load(self, uuid: Union[UUID, str]) -> SagaExecution: ...
    async def delete(self, uuid: Union[UUID, str]) -> None: ...