Implementation of the SAGA pattern for distributed microservice transactions in the Minos Framework.
—
Core functionality for defining distributed transaction sequences with local and remote steps, conditional logic, and compensation behaviors. This module provides the declarative API for constructing sagas with proper validation and commitment semantics.
The main Saga class provides the primary interface for constructing distributed transaction definitions.
class Saga:
"""
Main class for defining distributed transaction sequences.
Attributes:
steps (list[SagaStep]): List of saga steps in execution order
committed (bool): Whether saga is committed and ready for execution
raw (dict[str, Any]): Raw dictionary representation
"""
def __init__(self, *args, steps=None, committed=False, commit=None, **kwargs):
"""Initialize saga with optional steps and commit callback."""
def local_step(self, step=None, **kwargs):
"""
Add a local execution step.
Returns:
LocalSagaStep: New local step for method chaining
"""
def remote_step(self, step=None, **kwargs):
"""
Add a remote microservice step.
Returns:
RemoteSagaStep: New remote step for method chaining
"""
def conditional_step(self, step=None):
"""
Add a conditional step with if-then-else logic.
Returns:
ConditionalSagaStep: New conditional step for method chaining
"""
def commit(self, callback=None, **kwargs):
"""
Commit saga for execution. Once committed, saga cannot be modified.
Args:
callback: Optional commit callback function
Returns:
Saga: Self for chaining
Raises:
AlreadyCommittedException: If saga is already committed
EmptySagaException: If saga has no steps
"""
def validate(self):
"""
Validate saga definition.
Raises:
EmptySagaException: If saga has no steps
Various step validation exceptions
"""
@classmethod
def from_raw(cls, raw, **kwargs):
"""
Build saga from raw dictionary representation.
Args:
raw (dict): Raw saga definition
Returns:
Saga: Constructed saga instance
"""Container class for saga step operations with callbacks and parameters.
class SagaOperation:
"""
Container for saga step operations with callbacks and parameters.
Attributes:
callback (T): The callback function to execute
parameters (Optional[SagaContext]): Parameters to pass to callback
parameterized (bool): Whether parameters are provided
raw (dict[str, Any]): Raw representation
"""
def __init__(self, callback, parameters=None, **kwargs):
"""Initialize with callback function and optional parameters."""
@classmethod
def from_raw(cls, raw, **kwargs):
"""Build operation from raw representation."""Steps for local execution within the same microservice.
class LocalSagaStep(SagaStep):
"""
Step for local execution within same service.
Attributes:
on_execute_operation (Optional[SagaOperation[LocalCallback]]): Execute operation
on_failure_operation (Optional[SagaOperation[LocalCallback]]): Failure operation
"""
def __init__(self, on_execute=None, on_failure=None, **kwargs):
"""Initialize local step with optional callbacks."""
def on_execute(self, callback, parameters=None, **kwargs):
"""
Set execution callback for the step.
Args:
callback (LocalCallback): Function to execute locally
parameters (Optional[SagaContext]): Parameters for callback
Returns:
LocalSagaStep: Self for chaining
Raises:
MultipleOnExecuteException: If on_execute already set
"""
def on_failure(self, callback, parameters=None, **kwargs):
"""
Set failure compensation callback.
Args:
callback (LocalCallback): Function to execute on failure
parameters (Optional[SagaContext]): Parameters for callback
Returns:
LocalSagaStep: Self for chaining
Raises:
MultipleOnFailureException: If on_failure already set
"""
def validate(self):
"""
Validate step has required callbacks.
Raises:
UndefinedOnExecuteException: If no on_execute defined
EmptySagaStepException: If step has no actions
"""Steps for remote microservice calls with success/error handling.
class RemoteSagaStep(SagaStep):
"""
Step for remote microservice calls.
Attributes:
on_execute_operation (Optional[SagaOperation[RequestCallBack]]): Execute operation
on_success_operation (Optional[SagaOperation[ResponseCallBack]]): Success operation
on_error_operation (Optional[SagaOperation[ResponseCallBack]]): Error operation
on_failure_operation (Optional[SagaOperation[RequestCallBack]]): Failure operation
"""
def __init__(self, on_execute=None, on_success=None, on_error=None, on_failure=None, **kwargs):
"""Initialize remote step with optional callbacks."""
def on_execute(self, callback, parameters=None, **kwargs):
"""
Set request callback for remote service call.
Args:
callback (RequestCallBack): Function that creates SagaRequest
parameters (Optional[SagaContext]): Parameters for callback
Returns:
RemoteSagaStep: Self for chaining
Raises:
MultipleOnExecuteException: If on_execute already set
"""
def on_success(self, callback, parameters=None, **kwargs):
"""
Set success response callback.
Args:
callback (ResponseCallBack): Function to handle successful response
parameters (Optional[SagaContext]): Parameters for callback
Returns:
RemoteSagaStep: Self for chaining
Raises:
MultipleOnSuccessException: If on_success already set
"""
def on_error(self, callback, parameters=None, **kwargs):
"""
Set error response callback for business logic errors.
Args:
callback (ResponseCallBack): Function to handle error response
parameters (Optional[SagaContext]): Parameters for callback
Returns:
RemoteSagaStep: Self for chaining
Raises:
MultipleOnErrorException: If on_error already set
"""
def on_failure(self, callback, parameters=None, **kwargs):
"""
Set failure compensation callback for rollback.
Args:
callback (RequestCallBack): Function to create compensation request
parameters (Optional[SagaContext]): Parameters for callback
Returns:
RemoteSagaStep: Self for chaining
Raises:
MultipleOnFailureException: If on_failure already set
"""
def validate(self):
"""
Validate step configuration.
Raises:
UndefinedOnExecuteException: If no on_execute defined
EmptySagaStepException: If step has no actions
"""Steps for conditional saga execution based on runtime conditions.
class ConditionalSagaStep(SagaStep):
"""
Step for conditional saga execution.
Attributes:
if_then_alternatives (list[IfThenAlternative]): List of conditional alternatives
else_then_alternative (ElseThenAlternative): Default alternative
"""
def __init__(self, if_then=None, else_then=None, **kwargs):
"""Initialize conditional step with optional alternatives."""
def if_then(self, condition, saga):
"""
Add if-then alternative with condition and saga.
Args:
condition: Condition function to evaluate
saga (Saga): Saga to execute if condition is true
Returns:
ConditionalSagaStep: Self for chaining
"""
def else_then(self, saga):
"""
Set else alternative with saga.
Args:
saga (Saga): Default saga to execute
Returns:
ConditionalSagaStep: Self for chaining
Raises:
MultipleElseThenException: If else_then already set
"""
def validate(self):
"""
Validate alternatives.
Raises:
EmptySagaStepException: If no alternatives defined
"""
class IfThenAlternative:
"""
Represents an if-then conditional alternative.
Attributes:
condition (SagaOperation): Condition operation to evaluate
saga (Saga): Saga to execute if condition is true
"""
def __init__(self, condition, saga):
"""Initialize with condition function and saga."""
@classmethod
def from_raw(cls, raw, **kwargs):
"""Build from raw representation."""
def validate(self):
"""Validate the alternative."""
class ElseThenAlternative:
"""
Represents an else alternative for conditional steps.
Attributes:
saga (Saga): Saga to execute as default
"""
def __init__(self, saga):
"""Initialize with saga."""
@classmethod
def from_raw(cls, raw, **kwargs):
"""Build from raw representation."""
def validate(self):
"""Validate the alternative."""Abstract base class for all saga steps providing common functionality.
from abc import ABC, abstractmethod
class SagaStep(ABC):
"""
Base class for all saga steps.
Attributes:
saga (Optional[Saga]): Reference to parent saga
"""
def __init__(self, saga=None, **kwargs):
"""Initialize step with optional saga reference."""
def conditional_step(self, *args, **kwargs):
"""Create new conditional step in saga."""
def local_step(self, *args, **kwargs):
"""Create new local step in saga."""
def remote_step(self, *args, **kwargs):
"""Create new remote step in saga."""
def commit(self, *args, **kwargs):
"""Commit the saga."""
@abstractmethod
def validate(self):
"""Abstract method to validate step."""
@classmethod
def from_raw(cls, raw, **kwargs):
"""Class method to build from raw representation."""
@property
@abstractmethod
def raw(self):
"""Abstract property for raw representation."""from minos.saga import Saga, SagaContext, SagaRequest
# Create a simple order processing saga
def create_order_saga():
saga = Saga()
# Local validation step
saga.local_step() \
.on_execute(validate_order) \
.on_failure(log_validation_failure)
# Remote payment step
saga.remote_step() \
.on_execute(create_payment_request) \
.on_success(handle_payment_success) \
.on_error(handle_payment_error) \
.on_failure(refund_payment)
return saga.commit()
def validate_order(context):
if not context.order.get('total'):
raise ValueError("Order total required")
return context
def create_payment_request(context):
return SagaRequest(
target="payment-service",
content={"amount": context.order["total"]}
)
def handle_payment_success(context, response):
context.payment_id = response.content()["id"]
return contextdef create_conditional_saga():
saga = Saga()
# Main processing step
saga.local_step().on_execute(process_data)
# Conditional step based on data type
conditional = saga.conditional_step()
# If premium customer
premium_saga = Saga()
premium_saga.remote_step().on_execute(process_premium_features)
conditional.if_then(is_premium_customer, premium_saga.commit())
# If regular customer
regular_saga = Saga()
regular_saga.remote_step().on_execute(process_regular_features)
conditional.else_then(regular_saga.commit())
return saga.commit()
def is_premium_customer(context):
return context.customer.get('tier') == 'premium'
def process_premium_features(context):
return SagaRequest(
target="premium-service",
content={"customer_id": context.customer["id"]}
)def create_booking_saga():
saga = Saga()
# Step 1: Validate booking request
saga.local_step() \
.on_execute(validate_booking) \
.on_failure(handle_validation_failure)
# Step 2: Reserve hotel room
saga.remote_step() \
.on_execute(reserve_room) \
.on_success(handle_room_reserved) \
.on_error(handle_room_error) \
.on_failure(cancel_room_reservation)
# Step 3: Book flight
saga.remote_step() \
.on_execute(book_flight) \
.on_success(handle_flight_booked) \
.on_error(handle_flight_error) \
.on_failure(cancel_flight)
# Step 4: Process payment
saga.remote_step() \
.on_execute(process_payment) \
.on_success(handle_payment_complete) \
.on_error(handle_payment_failed) \
.on_failure(refund_charges)
# Step 5: Send confirmation
saga.local_step().on_execute(send_confirmation)
return saga.commit()from typing import Callable, Union, Awaitable, Optional
# Callback function types
RequestCallBack = Callable[[SagaContext, ...], Union[SagaRequest, Awaitable[SagaRequest]]]
ResponseCallBack = Callable[[SagaContext, SagaResponse, ...], Union[Union[Exception, SagaContext], Awaitable[Union[Exception, SagaContext]]]]
LocalCallback = Callable[[SagaContext, ...], Union[Optional[SagaContext], Awaitable[Optional[SagaContext]]]]Install with Tessl CLI
npx tessl i tessl/pypi-minos-microservice-saga