Implementation of the SAGA pattern for distributed microservice transactions in the Minos Framework.
npx @tessl/cli install tessl/pypi-minos-microservice-saga@0.7.0A comprehensive Python library implementing the SAGA pattern for distributed microservice transactions in the Minos Framework. This package provides orchestration capabilities for managing complex business processes that span multiple microservices, ensuring data consistency through eventual consistency patterns and compensation-based rollback mechanisms.
pip install minos-microservice-sagafrom minos.saga import (
# Core classes
Saga,
SagaContext,
SagaManager,
SagaExecution,
SagaService,
# Step definitions
LocalSagaStep,
RemoteSagaStep,
ConditionalSagaStep,
IfThenAlternative,
ElseThenAlternative,
# Messages
SagaRequest,
SagaResponse,
SagaResponseStatus,
# Status and execution
SagaStatus,
SagaStepStatus,
SagaStepExecution,
# Repositories
SagaExecutionRepository,
DatabaseSagaExecutionRepository,
# Transaction management
TransactionCommitter,
# Middleware and utilities
transactional_command,
get_service_name,
)from minos.saga import Saga, SagaContext, SagaManager, SagaRequest
# Define a saga with multiple steps
def create_order_saga():
saga = Saga()
# Add local step for order validation
saga.local_step().on_execute(validate_order).on_failure(handle_validation_failure)
# Add remote step for payment processing
saga.remote_step() \
.on_execute(create_payment_request) \
.on_success(handle_payment_success) \
.on_error(handle_payment_error) \
.on_failure(compensate_payment)
# Add remote step for inventory reservation
saga.remote_step() \
.on_execute(reserve_inventory_request) \
.on_success(handle_inventory_success) \
.on_error(handle_inventory_error) \
.on_failure(release_inventory)
return saga.commit()
# Execute the saga
async def process_order(order_data):
saga_definition = create_order_saga()
context = SagaContext(order=order_data)
# Initialize saga manager with storage and broker
manager = SagaManager(storage=repo, broker_pool=broker)
# Run the saga
result = await manager.run(saga_definition, context=context)
return result
# Define callback functions
def validate_order(context):
# Local validation logic
if not context.order.get('amount'):
raise ValueError("Order amount required")
return context
def create_payment_request(context):
# Create payment service request
return SagaRequest(
target="payment-service",
content={"amount": context.order["amount"]}
)
def handle_payment_success(context, response):
# Process successful payment
context.payment_id = response.content()["payment_id"]
return contextThe SAGA pattern implementation is built around several key components:
This architecture enables resilient distributed transactions with automatic compensation, pause/resume capabilities, and comprehensive error handling for complex microservice workflows.
Core functionality for defining distributed transaction sequences with local and remote steps, conditional logic, and compensation behaviors.
class Saga:
def __init__(self, steps=None, committed=False, **kwargs): ...
def local_step(self, step=None, **kwargs): ...
def remote_step(self, step=None, **kwargs): ...
def conditional_step(self, step=None): ...
def commit(self, callback=None, **kwargs): ...
class LocalSagaStep(SagaStep):
def on_execute(self, callback, parameters=None, **kwargs): ...
def on_failure(self, callback, parameters=None, **kwargs): ...
class RemoteSagaStep(SagaStep):
def on_execute(self, callback, parameters=None, **kwargs): ...
def on_success(self, callback, parameters=None, **kwargs): ...
def on_error(self, callback, parameters=None, **kwargs): ...
def on_failure(self, callback, parameters=None, **kwargs): ...
class ConditionalSagaStep(SagaStep):
def if_then(self, condition, saga): ...
def else_then(self, saga): ...Runtime execution engine providing saga orchestration, state management, pause/resume capabilities, and comprehensive lifecycle control.
class SagaExecution:
def __init__(self, definition, uuid, context, status=SagaStatus.Created, **kwargs): ...
def execute(self, response=None, autocommit=True, **kwargs): ...
def rollback(self, autoreject=True, **kwargs): ...
def commit(self, **kwargs): ...
def reject(self, **kwargs): ...
class SagaManager:
def __init__(self, storage, broker_pool=None, **kwargs): ...
def run(self, definition=None, context=None, response=None, user=None, autocommit=True, pause_on_disk=False, **kwargs): ...
class SagaStatus(Enum):
Created = "created"
Running = "running"
Paused = "paused"
Finished = "finished"
Errored = "errored"Stateful execution context that maintains data across saga steps with dictionary-like interface and automatic persistence.
class SagaContext(BucketModel, MutableMapping):
def __init__(self, **kwargs): ...
def __setitem__(self, key, value): ...
def __getitem__(self, key): ...
def __setattr__(self, key, value): ...Request/response infrastructure for microservice communication with status tracking and service relationship management.
class SagaRequest:
def __init__(self, target, content=None): ...
def content(self, **kwargs): ...
class SagaResponse:
def __init__(self, content=None, related_services=None, status=None, uuid=None, **kwargs): ...
def content(self, **kwargs): ...
class SagaResponseStatus(IntEnum):
SUCCESS = 200
ERROR = 400
SYSTEM_ERROR = 500Comprehensive exception hierarchy for saga definition validation, execution errors, and system failures with specific error types for different failure scenarios.
class SagaException(MinosException): ...
class SagaExecutionException(SagaException): ...
class SagaFailedExecutionException(SagaExecutionException): ...
class SagaRollbackExecutionException(SagaExecutionException): ...
class SagaResponseException(SagaException): ...Testing utilities and mocked implementations for saga development and validation including repository test cases and operation factories.
class SagaExecutionRepositoryTestCase:
def build_saga_execution_repository(self): ...
def test_store(self): ...
def test_load_from_str(self): ...
def test_delete(self): ...
class MockedSagaExecutionDatabaseOperationFactory: ...Service-level integration for saga management within microservice applications with middleware support for transactional operations.
class SagaService:
def __init__(self, saga_manager, **kwargs): ...
def __get_enroute__(self, config): ...
def __reply__(self, request): ...
def transactional_command(request, inner):
"""
Middleware for transactional command execution.
Provides automatic transaction context management for
saga operations within command handlers.
Args:
request: Incoming command request
inner: Inner command handler function
Returns:
Command response with transaction context
"""
def get_service_name(config):
"""
Utility function to extract service name from configuration.
Args:
config: Service configuration object
Returns:
str: Service name identifier
"""from typing import Callable, Union, Awaitable, Optional, TypeVar, Any, Dict, List, Tuple
from uuid import UUID
from enum import Enum, IntEnum
# Type variables
T = TypeVar('T')
# Core callback types
RequestCallBack = Callable[[SagaContext, ...], Union[SagaResponse, Awaitable[SagaRequest]]]
ResponseCallBack = Callable[[SagaContext, SagaResponse, ...], Union[Union[Exception, SagaContext], Awaitable[Union[Exception, SagaContext]]]]
LocalCallback = Callable[[SagaContext, ...], Union[Optional[SagaContext], Awaitable[Optional[SagaContext]]]]
# Operation wrapper
class SagaOperation:
callback: T
parameters: Optional[SagaContext]
parameterized: bool
raw: Dict[str, Any]
# Step definitions
class SagaStep:
saga: Optional[Saga]
raw: Dict[str, Any]
# Alternative classes for conditional steps
class IfThenAlternative:
condition: Any
saga: Saga
class ElseThenAlternative:
saga: Saga
# Status enumerations
class SagaStatus(Enum):
Created: str
Running: str
Paused: str
Finished: str
Errored: str
class SagaStepStatus(Enum):
Created: str
RunningOnExecute: str
FinishedOnExecute: str
ErroredOnExecute: str
PausedByOnExecute: str
ErroredByOnExecute: str
RunningOnFailure: str
PausedOnFailure: str
ErroredOnFailure: str
RunningOnSuccess: str
ErroredOnSuccess: str
RunningOnError: str
ErroredOnError: str
Finished: str
class SagaResponseStatus(IntEnum):
SUCCESS: int
ERROR: int
SYSTEM_ERROR: int
# Execution types
class SagaStepExecution:
uuid: UUID
definition: SagaStep
status: SagaStepStatus
already_rollback: bool
related_services: Optional[List[str]]
raw: Dict[str, Any]
class TransactionCommitter:
execution_uuid: UUID
executed_steps: List[SagaStepExecution]
transactions: List[Tuple[UUID, str]]
# Repository interface
class SagaExecutionRepository:
async def store(self, execution: SagaExecution) -> None: ...
async def load(self, uuid: Union[UUID, str]) -> SagaExecution: ...
async def delete(self, uuid: Union[UUID, str]) -> None: ...