Implementation of the SAGA pattern for distributed microservice transactions in the Minos Framework.
—
Comprehensive exception hierarchy for saga definition validation, execution errors, and system failures with specific error types for different failure scenarios. This module provides structured error handling that enables proper saga rollback, debugging, and recovery mechanisms.
The saga exception system is organized into a hierarchical structure for precise error handling.
class SagaException(MinosException):
"""
Base saga exception.
Root exception class for all saga-related errors. Extends MinosException
to integrate with the broader Minos framework error handling.
"""
class SagaStepException(SagaException):
"""
Base saga step exception.
Base class for errors related to saga step definition and validation.
"""
class SagaExecutionException(SagaException):
"""
Base saga execution exception.
Base class for errors that occur during saga execution runtime.
"""
class SagaStepExecutionException(SagaExecutionException):
"""
Base saga step execution exception.
Base class for errors that occur during individual step execution.
"""Exceptions thrown during saga definition construction and validation.
# Saga Definition Exceptions
class EmptySagaException(SagaException):
"""
Saga must have at least one step.
Raised when attempting to commit a saga that has no steps defined.
"""
class SagaNotCommittedException(SagaException):
"""
Saga must be committed before execution.
Raised when attempting to execute a saga that hasn't been committed.
"""
class AlreadyCommittedException(SagaException):
"""
Cannot modify committed saga.
Raised when attempting to add steps or modify a saga that has
already been committed for execution.
"""
class SagaNotDefinedException(SagaStepException):
"""
Step must have saga instance.
Raised when a saga step operation is called but the step
doesn't have a reference to its parent saga.
"""
class EmptySagaStepException(SagaStepException):
"""
Step must have at least one action.
Raised when a saga step has no operations defined (no on_execute,
on_success, on_error, or on_failure callbacks).
"""
class AlreadyOnSagaException(SagaStepException):
"""
Step can only belong to one saga.
Raised when attempting to add a step that's already part of
another saga definition.
"""
class UndefinedOnExecuteException(SagaStepException):
"""
Step must define on_execute logic.
Raised when a step lacks the required on_execute callback
that defines the primary step operation.
"""
# Callback Validation Exceptions
class MultipleOnExecuteException(SagaStepException):
"""
Only one on_execute method allowed.
Raised when attempting to define multiple on_execute callbacks
for a single saga step.
"""
class MultipleOnFailureException(SagaStepException):
"""
Only one on_failure method allowed.
Raised when attempting to define multiple on_failure callbacks
for a single saga step.
"""
class MultipleOnSuccessException(SagaStepException):
"""
Only one on_success method allowed.
Raised when attempting to define multiple on_success callbacks
for a single remote saga step.
"""
class MultipleOnErrorException(SagaStepException):
"""
Only one on_error method allowed.
Raised when attempting to define multiple on_error callbacks
for a single remote saga step.
"""
class MultipleElseThenException(SagaStepException):
"""
Only one else_then method allowed.
Raised when attempting to define multiple else_then alternatives
for a single conditional saga step.
"""Exceptions that occur during saga execution runtime.
# Execution Management Exceptions
class SagaExecutionNotFoundException(SagaExecutionException):
"""
Execution not found in storage.
Raised when attempting to load a saga execution that doesn't
exist in the configured repository.
"""
class SagaExecutionAlreadyExecutedException(SagaExecutionException):
"""
Cannot re-execute finished execution.
Raised when attempting to execute a saga that has already
completed (finished or errored status).
"""
# Execution Failure Exceptions
class SagaFailedExecutionException(SagaExecutionException):
"""
Execution failed during running.
Raised when saga execution encounters an unrecoverable error
during step processing.
"""
class SagaFailedExecutionStepException(SagaStepExecutionException):
"""
Step failed during execution.
Raised when an individual saga step fails to execute properly.
"""
class SagaPausedExecutionStepException(SagaStepExecutionException):
"""
Step paused during execution.
Raised when a saga step requires a pause for external response
(typically remote steps waiting for service responses).
"""
# Rollback Exceptions
class SagaRollbackExecutionException(SagaExecutionException):
"""
Failed during rollback process.
Raised when saga rollback (compensation) operations fail.
"""
class SagaRollbackExecutionStepException(SagaStepExecutionException):
"""
Step failed during rollback.
Raised when an individual step's compensation logic fails
during saga rollback.
"""
# Transaction Management Exceptions
class SagaFailedCommitCallbackException(SagaExecutionException):
"""
Commit callback raised exception.
Raised when the saga commit callback function encounters
an error during transaction commitment.
"""
# Response Handling Exceptions
class SagaResponseException(SagaException):
"""
Response status not SUCCESS.
Raised when a remote service responds with an error status
that cannot be handled by the step's error handlers.
"""from minos.saga import (
Saga, SagaException, EmptySagaException,
AlreadyCommittedException, UndefinedOnExecuteException
)
def create_saga_with_validation():
"""Create saga with proper exception handling."""
try:
saga = Saga()
# This would raise EmptySagaException
# saga.commit() # Uncommenting would fail
# Add required steps
saga.local_step().on_execute(validate_order)
saga.remote_step().on_execute(process_payment)
# Commit saga
committed_saga = saga.commit()
# This would raise AlreadyCommittedException
# saga.local_step() # Uncommenting would fail
return committed_saga
except EmptySagaException:
print("Cannot commit saga without steps")
raise
except AlreadyCommittedException:
print("Cannot modify saga after commit")
raise
except UndefinedOnExecuteException:
print("Step missing required on_execute callback")
raise
def create_step_with_validation():
"""Create saga step with callback validation."""
try:
saga = Saga()
step = saga.local_step()
# Define on_execute
step.on_execute(process_data)
# This would raise MultipleOnExecuteException
# step.on_execute(other_function) # Uncommenting would fail
return saga.commit()
except MultipleOnExecuteException:
print("Step already has on_execute callback defined")
raisefrom minos.saga import (
SagaManager, SagaExecution, SagaFailedExecutionException,
SagaPausedExecutionStepException, SagaRollbackExecutionException
)
async def execute_saga_with_error_handling(saga_definition, context):
"""Execute saga with comprehensive error handling."""
manager = SagaManager(storage=repo, broker_pool=broker)
try:
# Attempt saga execution
result = await manager.run(
definition=saga_definition,
context=context,
autocommit=True,
raise_on_error=True
)
print("Saga completed successfully")
return result
except SagaFailedExecutionException as e:
print(f"Saga execution failed: {e}")
# Attempt to load execution for inspection
try:
execution = await repo.load(e.execution_uuid)
print(f"Failed at step: {execution.paused_step}")
print(f"Execution status: {execution.status}")
# Trigger rollback
await execution.rollback()
print("Rollback completed")
except SagaRollbackExecutionException as rollback_error:
print(f"Rollback also failed: {rollback_error}")
# Manual cleanup may be required
raise
except SagaPausedExecutionStepException as e:
print(f"Saga paused at step: {e.step_uuid}")
# Handle pause - could resume later with response
print("Saga execution paused, will resume when response arrives")
return e.execution_uuid # Return for later resumption
except Exception as e:
print(f"Unexpected error during saga execution: {e}")
raisefrom minos.saga import SagaContext, SagaRequest, SagaResponse
def handle_step_with_exceptions(context):
"""Local step with exception handling."""
try:
# Validate required context data
if not context.get("order_id"):
raise ValueError("Order ID is required")
if not context.get("customer_id"):
raise ValueError("Customer ID is required")
# Process business logic
result = process_order_validation(context)
# Update context with results
context.validation_result = result
context.validation_status = "passed"
return context
except ValueError as e:
# Business logic error - log and propagate
print(f"Validation error: {e}")
context.validation_error = str(e)
context.validation_status = "failed"
raise # Will cause saga to fail and rollback
except Exception as e:
# Unexpected error - log and propagate
print(f"Unexpected validation error: {e}")
context.validation_error = f"System error: {str(e)}"
context.validation_status = "error"
raise
def handle_remote_step_errors(context, response):
"""Remote step success handler with error checking."""
try:
# Check response status
if not response.ok:
error_data = await response.content()
error_message = error_data.get("message", "Unknown error")
# Handle different error types
if response.status == 400: # Business error
raise ValueError(f"Business error: {error_message}")
elif response.status == 500: # System error
raise RuntimeError(f"System error: {error_message}")
else:
raise Exception(f"Unexpected error: {error_message}")
# Process successful response
data = await response.content()
context.update(data)
return context
except ValueError as e:
# Business error - update context and propagate
context.business_error = str(e)
raise
except RuntimeError as e:
# System error - update context and propagate
context.system_error = str(e)
raise
except Exception as e:
# Unexpected error
context.unexpected_error = str(e)
raisedef create_saga_with_compensation_handling():
"""Create saga with robust compensation error handling."""
saga = Saga()
# Step 1: Reserve inventory
saga.remote_step() \
.on_execute(reserve_inventory) \
.on_success(handle_reservation_success) \
.on_error(handle_reservation_error) \
.on_failure(release_inventory_with_error_handling)
# Step 2: Process payment
saga.remote_step() \
.on_execute(process_payment) \
.on_success(handle_payment_success) \
.on_error(handle_payment_error) \
.on_failure(refund_payment_with_error_handling)
return saga.commit()
def release_inventory_with_error_handling(context):
"""Compensation with error handling."""
try:
if not hasattr(context, 'reservation_id'):
print("No reservation to release")
return None
return SagaRequest(
target="inventory-service",
content={
"action": "release",
"reservation_id": context.reservation_id
}
)
except Exception as e:
# Log compensation error but don't fail saga rollback
print(f"Error creating inventory release request: {e}")
context.compensation_errors = context.get("compensation_errors", [])
context.compensation_errors.append(f"inventory_release: {str(e)}")
# Return None to skip this compensation
return None
def refund_payment_with_error_handling(context):
"""Payment refund compensation with error handling."""
try:
if not hasattr(context, 'payment_id'):
print("No payment to refund")
return None
return SagaRequest(
target="payment-service",
content={
"action": "refund",
"payment_id": context.payment_id,
"amount": context.charged_amount,
"reason": "saga_rollback"
}
)
except Exception as e:
print(f"Error creating refund request: {e}")
context.compensation_errors = context.get("compensation_errors", [])
context.compensation_errors.append(f"payment_refund: {str(e)}")
# Critical compensation - still return request even if there are issues
return SagaRequest(
target="payment-service",
content={
"action": "refund",
"payment_id": context.get("payment_id", "unknown"),
"reason": "saga_rollback_fallback"
}
)from minos.saga import SagaExecutionNotFoundException
async def resume_failed_saga(execution_uuid):
"""Attempt to resume or recover a failed saga execution."""
try:
# Try to load the execution
execution = await repo.load(execution_uuid)
# Check execution status
if execution.status == SagaStatus.Errored:
print("Execution is in error state, attempting rollback")
await execution.rollback()
elif execution.status == SagaStatus.Paused:
print("Execution is paused, may be resumable")
# Could resume with appropriate response
elif execution.status == SagaStatus.Finished:
print("Execution already completed")
return execution
except SagaExecutionNotFoundException:
print(f"Execution {execution_uuid} not found in storage")
return None
except SagaRollbackExecutionException as e:
print(f"Rollback failed: {e}")
# May need manual intervention
return None
except Exception as e:
print(f"Unexpected error during recovery: {e}")
return None
def create_resilient_saga():
"""Create saga with multiple fallback mechanisms."""
saga = Saga()
# Primary processing step
saga.remote_step() \
.on_execute(primary_processing) \
.on_success(handle_primary_success) \
.on_error(try_fallback_processing) \
.on_failure(cleanup_primary_attempt)
return saga.commit()
def try_fallback_processing(context, response):
"""Error handler that attempts fallback processing."""
try:
error_data = await response.content()
# Check if error is recoverable
if error_data.get("error_code") == "TEMPORARY_UNAVAILABLE":
# Mark for retry
context.retry_primary = True
context.retry_count = context.get("retry_count", 0) + 1
if context.retry_count < 3:
# Could trigger retry logic
return context
# Try alternative processing
context.use_fallback = True
return context
except Exception as e:
# Fallback attempt failed
print(f"Fallback processing failed: {e}")
return Exception(f"Both primary and fallback processing failed: {e}")
def cleanup_primary_attempt(context):
"""Cleanup after failed primary processing attempt."""
try:
cleanup_requests = []
# Cleanup any partial state
if hasattr(context, 'temp_resources'):
cleanup_requests.append(SagaRequest(
target="resource-service",
content={"action": "cleanup", "resources": context.temp_resources}
))
# Return first cleanup request (saga will handle sequentially)
return cleanup_requests[0] if cleanup_requests else None
except Exception as e:
print(f"Cleanup failed: {e}")
# Don't fail the saga rollback due to cleanup issues
return Noneimport logging
from datetime import datetime
# Configure saga exception logging
logging.basicConfig(level=logging.INFO)
saga_logger = logging.getLogger("saga.exceptions")
def log_saga_exception(exception, context=None, execution_uuid=None):
"""Log saga exception with context information."""
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"exception_type": type(exception).__name__,
"exception_message": str(exception),
"execution_uuid": str(execution_uuid) if execution_uuid else None
}
if context:
log_data["context_keys"] = list(context.keys())
log_data["context_size"] = len(context)
saga_logger.error(f"Saga exception occurred: {log_data}")
def create_monitored_saga():
"""Create saga with exception monitoring."""
saga = Saga()
saga.local_step().on_execute(monitored_local_step)
saga.remote_step() \
.on_execute(monitored_remote_step) \
.on_success(monitored_success_handler) \
.on_error(monitored_error_handler)
return saga.commit()
def monitored_local_step(context):
"""Local step with exception monitoring."""
try:
# Business logic here
result = process_business_logic(context)
return result
except Exception as e:
log_saga_exception(e, context)
# Could send alert to monitoring system
send_alert(f"Local step failed: {e}")
raise # Re-raise to trigger saga rollback
def monitored_error_handler(context, response):
"""Error handler with monitoring."""
try:
error_data = await response.content()
# Log the error with full context
log_saga_exception(
Exception(f"Remote service error: {error_data}"),
context,
context.get("execution_uuid")
)
# Handle the error
return Exception(f"Service error: {error_data.get('message')}")
except Exception as e:
log_saga_exception(e, context)
raise
def send_alert(message):
"""Send alert to monitoring system."""
# Integration with monitoring/alerting system
print(f"ALERT: {message}")Install with Tessl CLI
npx tessl i tessl/pypi-minos-microservice-saga