Implementation of the SAGA pattern for distributed microservice transactions in the Minos Framework.
—
Runtime execution engine providing saga orchestration, state management, pause/resume capabilities, and comprehensive lifecycle control. This module handles the actual execution of saga definitions with support for distributed coordination, error recovery, and transaction management.
The core execution class that represents a running instance of a saga definition.
class SagaExecution:
"""
Runtime execution instance of a saga definition.
Attributes:
uuid (UUID): Unique execution identifier
definition (Saga): Saga definition being executed
executed_steps (list[SagaStepExecution]): Steps that have been executed
context (SagaContext): Current execution context
status (SagaStatus): Current execution status
paused_step (SagaStepExecution): Currently paused step (if any)
already_rollback (bool): Whether rollback has been performed
user (Optional[UUID]): User identifier for remote steps
"""
def __init__(self, definition, uuid, context, status=SagaStatus.Created, steps=None, paused_step=None, already_rollback=False, user=None, *args, **kwargs):
"""
Initialize execution instance with definition and context.
Args:
definition (Saga): Saga definition to execute
uuid (UUID): Unique execution identifier
context (SagaContext): Initial execution context
status (SagaStatus): Initial execution status
steps (Optional[list[SagaStepExecution]]): Pre-existing step executions
paused_step (Optional[SagaStepExecution]): Currently paused step
already_rollback (bool): Whether rollback has been performed
user (Optional[UUID]): User identifier for remote steps
"""
@classmethod
def from_definition(cls, definition, context=None, uuid=None, *args, **kwargs):
"""
Create execution from saga definition.
Args:
definition (Saga): Committed saga definition
context (Optional[SagaContext]): Initial context
uuid (Optional[UUID]): Execution identifier
Returns:
SagaExecution: New execution instance
Raises:
SagaNotCommittedException: If saga not committed
"""
@classmethod
def from_raw(cls, raw, **kwargs):
"""
Build execution from raw representation.
Args:
raw (dict): Raw execution data
Returns:
SagaExecution: Reconstructed execution instance
"""
def execute(self, response=None, autocommit=True, **kwargs):
"""
Execute the saga steps.
Args:
response (Optional[SagaResponse]): Response for continuing paused step
autocommit (bool): Whether to automatically commit/reject transactions
Returns:
SagaContext: Final execution context
Raises:
SagaExecutionAlreadyExecutedException: If execution already finished
SagaFailedExecutionException: If execution fails
SagaPausedExecutionStepException: If step requires pause
"""
def rollback(self, autoreject=True, **kwargs):
"""
Perform compensatory rollback of executed steps.
Args:
autoreject (bool): Whether to automatically reject transactions
Raises:
SagaRollbackExecutionException: If rollback fails
"""
def commit(self, **kwargs):
"""
Commit execution transactions.
Raises:
SagaFailedCommitCallbackException: If commit callback fails
"""
def reject(self, **kwargs):
"""Reject execution transactions."""The main orchestrator for saga execution lifecycle management.
class SagaManager:
"""
Main orchestrator for saga execution lifecycle.
Attributes:
storage (SagaExecutionRepository): Persistence repository
broker_pool (BrokerClientPool): Message broker connection pool
"""
def __init__(self, storage, broker_pool=None, pool_factory=None, **kwargs):
"""
Initialize manager with storage and broker pool.
Args:
storage (SagaExecutionRepository): Repository for saga persistence
broker_pool (Optional[BrokerClientPool]): Message broker pool
pool_factory: Factory for creating broker pools
"""
def run(self, definition=None, context=None, response=None, user=None, autocommit=True, pause_on_disk=False, raise_on_error=True, return_execution=True, **kwargs):
"""
Execute saga with comprehensive lifecycle management.
Args:
definition (Optional[Saga]): Saga definition to execute
context (Optional[SagaContext]): Initial execution context
response (Optional[SagaResponse]): Response for continuing execution
user (Optional[UUID]): User identifier for remote steps
autocommit (bool): Automatically commit/reject transactions
pause_on_disk (bool): Pause remote steps on disk vs memory
raise_on_error (bool): Raise exceptions on execution errors
return_execution (bool): Return SagaExecution vs UUID
Returns:
Union[SagaExecution, UUID, SagaContext]: Execution result based on options
Raises:
SagaFailedExecutionException: If execution fails and raise_on_error=True
"""
@classmethod
def _from_config(cls, config, **kwargs):
"""
Build manager from configuration.
Args:
config: Configuration object
Returns:
SagaManager: Configured manager instance
"""Enums defining the various states of saga and step execution.
from enum import Enum
class SagaStatus(Enum):
"""
Saga execution status states.
Values:
Created: Initial state before execution
Running: Currently executing steps
Paused: Execution paused waiting for response
Finished: Successfully completed all steps
Errored: Execution failed with error
"""
Created = "created"
Running = "running"
Paused = "paused"
Finished = "finished"
Errored = "errored"
class SagaStepStatus(Enum):
"""
Individual step execution status states.
Values:
Created: Step created but not started
RunningOnExecute: Executing main operation
FinishedOnExecute: Main operation completed
ErroredOnExecute: Main operation failed
PausedByOnExecute: Paused by main operation
ErroredByOnExecute: Error in main operation
RunningOnFailure: Executing failure compensation
PausedOnFailure: Paused during failure handling
ErroredOnFailure: Failure compensation failed
RunningOnSuccess: Processing successful response
ErroredOnSuccess: Success handler failed
RunningOnError: Processing error response
ErroredOnError: Error handler failed
Finished: Step completed successfully
"""
Created = "created"
RunningOnExecute = "running-on-execute"
FinishedOnExecute = "finished-on-execute"
ErroredOnExecute = "errored-on-execute"
PausedByOnExecute = "paused-by-on-execute"
ErroredByOnExecute = "errored-by-on-execute"
RunningOnFailure = "running-on-failure"
PausedOnFailure = "paused-on-failure"
ErroredOnFailure = "errored-on-failure"
RunningOnSuccess = "running-on-success"
ErroredOnSuccess = "errored-on-success"
RunningOnError = "running-on-error"
ErroredOnError = "errored-on-error"
Finished = "finished"Runtime execution instances for different step types.
from abc import ABC, abstractmethod
class SagaStepExecution(ABC):
"""
Base class for step execution instances.
Attributes:
uuid (UUID): Step execution identifier
definition (SagaStep): Step definition being executed
status (SagaStepStatus): Current step status
"""
def __init__(self, definition, uuid, status=SagaStepStatus.Created, **kwargs):
"""Initialize step execution with definition."""
class LocalSagaStepExecution(SagaStepExecution):
"""
Execution instance for local steps.
Handles local function execution within the same service process.
"""
def __init__(self, definition, uuid, status=SagaStepStatus.Created, **kwargs):
"""Initialize local step execution."""
class RemoteSagaStepExecution(SagaStepExecution):
"""
Execution instance for remote steps.
Handles remote service calls with request/response coordination.
"""
def __init__(self, definition, uuid, status=SagaStepStatus.Created, **kwargs):
"""Initialize remote step execution."""
class ConditionalSagaStepExecution(SagaStepExecution):
"""
Execution instance for conditional steps.
Handles conditional logic evaluation and nested saga execution.
"""
def __init__(self, definition, uuid, status=SagaStepStatus.Created, **kwargs):
"""Initialize conditional step execution."""Repository system for durable saga execution storage and recovery.
from abc import ABC, abstractmethod
class SagaExecutionRepository(ABC):
"""
Base class for saga execution persistence.
Provides durable storage for saga execution state enabling
pause/resume and recovery capabilities.
"""
@abstractmethod
def store(self, execution):
"""
Store saga execution to persistent storage.
Args:
execution (SagaExecution): Execution to store
"""
@abstractmethod
def load(self, uuid):
"""
Load saga execution from persistent storage.
Args:
uuid (Union[UUID, str]): Execution identifier
Returns:
SagaExecution: Loaded execution instance
Raises:
SagaExecutionNotFoundException: If execution not found
"""
@abstractmethod
def delete(self, uuid):
"""
Delete saga execution from persistent storage.
Args:
uuid (Union[UUID, str]): Execution identifier
"""
class DatabaseSagaExecutionRepository(SagaExecutionRepository):
"""Database implementation of saga execution repository."""
class SagaExecutionDatabaseOperationFactory:
"""Factory for database operations on saga executions."""Two-phase commit protocol implementation for distributed transaction coordination.
class TransactionCommitter:
"""
Manages two-phase commit protocol for saga transactions.
Coordinates distributed transaction commits across multiple
services participating in the saga execution.
Attributes:
execution_uuid (UUID): Execution identifier
executed_steps (list[SagaStepExecution]): Steps that participated
transactions (list[tuple[UUID, str]]): Transaction UUID and service pairs
"""
def __init__(self, execution_uuid, executed_steps, broker_publisher, broker_pool=None, **kwargs):
"""
Initialize committer with execution details.
Args:
execution_uuid (UUID): Saga execution identifier
executed_steps (list[SagaStepExecution]): Executed steps
broker_publisher: Message broker publisher
broker_pool: Optional broker connection pool
"""
def commit(self, **kwargs):
"""
Commit all transactions using two-phase commit protocol.
Sends commit messages to all participating services and
waits for confirmation of successful commitment.
Raises:
SagaFailedCommitCallbackException: If any service fails to commit
"""
def reject(self):
"""
Reject all transactions.
Sends reject messages to all participating services to
rollback their local transaction state.
"""Specialized executors for different types of saga operations.
class Executor:
"""
Base executor for saga operations.
Attributes:
execution_uuid (UUID): Execution identifier for transaction context
"""
def __init__(self, execution_uuid, *args, **kwargs):
"""Initialize executor with execution context."""
def exec(self, operation, *args, **kwargs):
"""Execute saga operation within transaction context."""
def exec_function(self, func, *args, **kwargs):
"""Execute function within transaction context."""
class LocalExecutor(Executor):
"""Executor for local operations within the same service."""
class RequestExecutor(Executor):
"""Executor for remote request operations to other services."""
class ResponseExecutor(Executor):
"""Executor for remote response processing from other services."""from minos.saga import SagaManager, SagaExecution, SagaContext
from minos.saga.executions.repositories import DatabaseSagaExecutionRepository
# Initialize saga manager
storage = DatabaseSagaExecutionRepository(...)
manager = SagaManager(storage=storage, broker_pool=broker_pool)
# Execute saga with automatic lifecycle management
async def execute_order_saga(order_data):
saga_definition = create_order_saga()
context = SagaContext(order=order_data)
# Run with automatic commit and error handling
result = await manager.run(
definition=saga_definition,
context=context,
autocommit=True,
raise_on_error=True
)
return resultfrom minos.saga import SagaExecution, SagaStatus
# Create execution manually for fine-grained control
execution = SagaExecution.from_definition(
definition=saga_definition,
context=SagaContext(data="initial"),
uuid=uuid4()
)
# Execute with manual transaction control
try:
context = await execution.execute(autocommit=False)
# Manual commit after validation
if validate_results(context):
await execution.commit()
else:
await execution.reject()
except Exception as e:
# Manual rollback on failure
await execution.rollback()
raise# Execute with pause-on-disk for background processing
execution_uuid = await manager.run(
definition=long_running_saga,
context=initial_context,
pause_on_disk=True, # Pause remote steps on disk
return_execution=False # Return UUID instead of execution
)
# Later, resume with response
response = SagaResponse(content={"result": "processed"})
final_context = await manager.run(
response=response,
pause_on_disk=True
)from minos.saga import SagaStatus, SagaStepStatus
# Check execution status
execution = await storage.load(execution_uuid)
if execution.status == SagaStatus.Paused:
print(f"Execution paused at step: {execution.paused_step.uuid}")
print(f"Step status: {execution.paused_step.status}")
elif execution.status == SagaStatus.Finished:
print("Saga completed successfully")
print(f"Final context: {execution.context}")
elif execution.status == SagaStatus.Errored:
print("Saga execution failed")
# Trigger rollback if needed
await execution.rollback()from minos.saga.executions.repositories import SagaExecutionRepository
class CustomSagaRepository(SagaExecutionRepository):
def __init__(self, storage_backend):
self.storage = storage_backend
async def store(self, execution):
# Custom storage logic
await self.storage.save(execution.uuid, execution.raw)
async def load(self, uuid):
# Custom loading logic
raw_data = await self.storage.get(uuid)
if not raw_data:
raise SagaExecutionNotFoundException(f"Execution {uuid} not found")
return SagaExecution.from_raw(raw_data)
async def delete(self, uuid):
# Custom deletion logic
await self.storage.remove(uuid)
# Use custom repository
custom_repo = CustomSagaRepository(my_storage)
manager = SagaManager(storage=custom_repo)Install with Tessl CLI
npx tessl i tessl/pypi-minos-microservice-saga