Implementation of the SAGA pattern for distributed microservice transactions in the Minos Framework.
—
Testing utilities and mocked implementations for saga development and validation including repository test cases and operation factories. This module provides comprehensive testing infrastructure to validate saga behavior, execution flows, and integration with storage and messaging systems.
Abstract test case providing standardized tests for saga execution repository implementations.
from minos.common.testing import MinosTestCase
from abc import abstractmethod
class SagaExecutionRepositoryTestCase(MinosTestCase):
"""
Base test case for saga execution repository implementations.
Provides standard test methods to validate repository implementations
conform to the expected interface and behavior patterns.
"""
@abstractmethod
def build_saga_execution_repository(self):
"""
Abstract method to build repository instance.
Subclasses must implement this method to return a configured
instance of their repository implementation for testing.
Returns:
SagaExecutionRepository: Repository instance to test
"""
def test_store(self):
"""
Test storing saga executions.
Validates that the repository can successfully store saga execution
instances and that they can be retrieved with correct data.
"""
def test_load_from_str(self):
"""
Test loading executions from string UUID.
Validates that executions can be loaded using string representation
of UUIDs as well as UUID objects.
"""
def test_delete(self):
"""
Test deleting saga executions.
Validates that executions can be deleted from the repository
and that subsequent load attempts properly raise exceptions.
"""
def test_load_raises(self):
"""
Test loading non-existent executions raises appropriate exceptions.
Validates that attempting to load non-existent executions
raises SagaExecutionNotFoundException.
"""
def test_store_update(self):
"""
Test updating existing stored executions.
Validates that storing an execution with the same UUID
updates the existing record rather than creating duplicates.
"""Mocked implementation of database operation factory for testing purposes.
class MockedSagaExecutionDatabaseOperationFactory(SagaExecutionDatabaseOperationFactory):
"""
Mocked factory for testing purposes.
Provides a test-friendly implementation of the database operation factory
that can be used in unit tests without requiring actual database connections.
"""
def __init__(self, **kwargs):
"""Initialize mocked factory with test configuration."""
def build_create_operation(self, execution):
"""Build mocked create operation for testing."""
def build_update_operation(self, execution):
"""Build mocked update operation for testing."""
def build_delete_operation(self, uuid):
"""Build mocked delete operation for testing."""
def build_select_operation(self, uuid):
"""Build mocked select operation for testing."""import unittest
from minos.saga.testing import SagaExecutionRepositoryTestCase
from minos.saga import SagaExecution, Saga, SagaContext, SagaStatus
from uuid import uuid4
class MyRepositoryTestCase(SagaExecutionRepositoryTestCase):
"""Test case for custom repository implementation."""
def build_saga_execution_repository(self):
"""Build repository instance for testing."""
# Return your repository implementation
return MyCustomSagaRepository(
connection_string="test://localhost",
schema="test_schema"
)
def test_custom_repository_features(self):
"""Test custom features specific to your repository."""
repo = self.build_saga_execution_repository()
# Create test execution
saga_def = Saga()
saga_def.local_step().on_execute(lambda ctx: ctx)
committed_saga = saga_def.commit()
execution = SagaExecution.from_definition(
definition=committed_saga,
context=SagaContext(test_data="value"),
uuid=uuid4()
)
# Test storing
await repo.store(execution)
# Test loading
loaded = await repo.load(execution.uuid)
self.assertEqual(loaded.uuid, execution.uuid)
self.assertEqual(loaded.context.test_data, "value")
# Test your custom features
self.assertTrue(repo.has_custom_feature())
def test_repository_error_handling(self):
"""Test repository error handling."""
repo = self.build_saga_execution_repository()
# Test loading non-existent execution
with self.assertRaises(SagaExecutionNotFoundException):
await repo.load(uuid4())
# Test deleting non-existent execution
# Should not raise exception
await repo.delete(uuid4())
# Run the tests
if __name__ == '__main__':
unittest.main()import unittest
from minos.saga import (
Saga, SagaContext, LocalSagaStep, RemoteSagaStep,
EmptySagaException, UndefinedOnExecuteException
)
class SagaDefinitionTestCase(unittest.TestCase):
"""Test cases for saga definition validation."""
def test_empty_saga_raises_exception(self):
"""Test that empty saga cannot be committed."""
saga = Saga()
with self.assertRaises(EmptySagaException):
saga.commit()
def test_step_without_execute_raises_exception(self):
"""Test that step without on_execute cannot be validated."""
saga = Saga()
step = saga.local_step()
# Don't set on_execute
with self.assertRaises(UndefinedOnExecuteException):
saga.commit()
def test_valid_saga_commits_successfully(self):
"""Test that valid saga commits without errors."""
saga = Saga()
saga.local_step().on_execute(lambda ctx: ctx)
committed_saga = saga.commit()
self.assertTrue(committed_saga.committed)
self.assertEqual(len(committed_saga.steps), 1)
def test_complex_saga_structure(self):
"""Test complex saga with multiple step types."""
saga = Saga()
# Local step
local_step = saga.local_step()
local_step.on_execute(lambda ctx: ctx)
local_step.on_failure(lambda ctx: ctx)
# Remote step
remote_step = saga.remote_step()
remote_step.on_execute(lambda ctx: None)
remote_step.on_success(lambda ctx, resp: ctx)
remote_step.on_error(lambda ctx, resp: ctx)
remote_step.on_failure(lambda ctx: None)
# Conditional step
conditional = saga.conditional_step()
inner_saga = Saga()
inner_saga.local_step().on_execute(lambda ctx: ctx)
conditional.if_then(lambda ctx: True, inner_saga.commit())
committed_saga = saga.commit()
self.assertEqual(len(committed_saga.steps), 3)
self.assertIsInstance(committed_saga.steps[0], LocalSagaStep)
self.assertIsInstance(committed_saga.steps[1], RemoteSagaStep)
self.assertIsInstance(committed_saga.steps[2], ConditionalSagaStep)import unittest
from unittest.mock import AsyncMock, MagicMock
from minos.saga import (
SagaExecution, SagaManager, SagaContext, SagaRequest, SagaResponse,
SagaStatus, SagaResponseStatus
)
class SagaExecutionTestCase(unittest.TestCase):
"""Test cases for saga execution behavior."""
def setUp(self):
"""Set up test fixtures."""
self.mock_repo = AsyncMock()
self.mock_broker = AsyncMock()
self.manager = SagaManager(
storage=self.mock_repo,
broker_pool=self.mock_broker
)
async def test_successful_execution(self):
"""Test successful saga execution flow."""
# Create test saga
saga = Saga()
saga.local_step().on_execute(self.success_callback)
committed_saga = saga.commit()
# Execute saga
context = SagaContext(test_value=42)
result = await self.manager.run(
definition=committed_saga,
context=context
)
# Verify results
self.assertEqual(result.test_value, 42)
self.assertEqual(result.processed, True)
async def test_execution_with_failure(self):
"""Test saga execution with step failure."""
# Create saga that will fail
saga = Saga()
saga.local_step().on_execute(self.failing_callback)
committed_saga = saga.commit()
# Execute saga
context = SagaContext(test_value=42)
with self.assertRaises(SagaFailedExecutionException):
await self.manager.run(
definition=committed_saga,
context=context,
raise_on_error=True
)
async def test_remote_step_execution(self):
"""Test remote step execution with mocked responses."""
# Create saga with remote step
saga = Saga()
saga.remote_step() \
.on_execute(self.create_request) \
.on_success(self.handle_success)
committed_saga = saga.commit()
# Mock broker response
mock_response = SagaResponse(
content={"result": "processed"},
status=SagaResponseStatus.SUCCESS
)
# Execute saga
context = SagaContext(order_id=123)
result = await self.manager.run(
definition=committed_saga,
context=context
)
# Verify remote call was made
self.mock_broker.send.assert_called_once()
self.assertEqual(result.result, "processed")
def success_callback(self, context):
"""Test callback that succeeds."""
context.processed = True
return context
def failing_callback(self, context):
"""Test callback that fails."""
raise ValueError("Test failure")
def create_request(self, context):
"""Test callback that creates request."""
return SagaRequest(
target="test-service",
content={"order_id": context.order_id}
)
def handle_success(self, context, response):
"""Test callback that handles successful response."""
data = response.content()
context.update(data)
return contextimport unittest
from unittest.mock import Mock, AsyncMock, patch
from minos.saga.testing import MockedSagaExecutionDatabaseOperationFactory
class MockedComponentTestCase(unittest.TestCase):
"""Test cases using mocked components."""
def setUp(self):
"""Set up mocked components."""
self.mock_factory = MockedSagaExecutionDatabaseOperationFactory()
self.mock_storage = Mock()
self.mock_broker = AsyncMock()
def test_mocked_database_operations(self):
"""Test using mocked database operation factory."""
execution = self.create_test_execution()
# Test create operation
create_op = self.mock_factory.build_create_operation(execution)
self.assertIsNotNone(create_op)
# Test select operation
select_op = self.mock_factory.build_select_operation(execution.uuid)
self.assertIsNotNone(select_op)
# Test update operation
update_op = self.mock_factory.build_update_operation(execution)
self.assertIsNotNone(update_op)
# Test delete operation
delete_op = self.mock_factory.build_delete_operation(execution.uuid)
self.assertIsNotNone(delete_op)
@patch('minos.saga.SagaManager')
async def test_mocked_manager(self, mock_manager_class):
"""Test using mocked saga manager."""
mock_manager = mock_manager_class.return_value
mock_manager.run.return_value = SagaContext(result="mocked")
# Use mocked manager
result = await mock_manager.run(
definition=self.create_test_saga(),
context=SagaContext(input="test")
)
self.assertEqual(result.result, "mocked")
mock_manager.run.assert_called_once()
def create_test_execution(self):
"""Create test execution for mocking."""
saga = Saga()
saga.local_step().on_execute(lambda ctx: ctx)
committed_saga = saga.commit()
return SagaExecution.from_definition(
definition=committed_saga,
context=SagaContext(test="data")
)
def create_test_saga(self):
"""Create test saga definition."""
saga = Saga()
saga.local_step().on_execute(lambda ctx: ctx)
return saga.commit()import unittest
import asyncio
from minos.saga import SagaManager, Saga, SagaContext, SagaRequest, SagaResponse
class SagaIntegrationTestCase(unittest.TestCase):
"""Integration tests for complete saga workflows."""
def setUp(self):
"""Set up integration test environment."""
# Set up real or test implementations
self.setup_test_database()
self.setup_test_broker()
self.manager = SagaManager(
storage=self.test_repo,
broker_pool=self.test_broker
)
def setup_test_database(self):
"""Set up test database for integration testing."""
# Configure test database
self.test_repo = DatabaseSagaExecutionRepository(
connection="sqlite:///:memory:",
create_tables=True
)
def setup_test_broker(self):
"""Set up test message broker."""
# Configure test broker
self.test_broker = TestBrokerPool()
async def test_end_to_end_order_processing(self):
"""Test complete order processing saga."""
# Create realistic saga
saga = self.create_order_processing_saga()
# Create order context
context = SagaContext(
order_id="order_123",
customer_id="customer_456",
items=[
{"sku": "ITEM1", "quantity": 2, "price": 25.00},
{"sku": "ITEM2", "quantity": 1, "price": 50.00}
],
total=100.00,
payment_method="credit_card"
)
# Execute saga
result = await self.manager.run(
definition=saga,
context=context,
autocommit=True
)
# Verify final state
self.assertEqual(result.order_status, "completed")
self.assertTrue(result.payment_processed)
self.assertTrue(result.inventory_reserved)
self.assertIsNotNone(result.confirmation_id)
async def test_saga_failure_and_compensation(self):
"""Test saga failure triggers proper compensation."""
# Create saga that will fail at payment step
saga = self.create_failing_payment_saga()
context = SagaContext(
order_id="order_456",
customer_id="customer_789",
items=[{"sku": "ITEM1", "quantity": 1}],
total=50.00
)
# Execute saga (should fail)
with self.assertRaises(SagaFailedExecutionException):
await self.manager.run(
definition=saga,
context=context,
raise_on_error=True
)
# Verify compensation was executed
self.verify_inventory_released()
self.verify_no_payment_charged()
def create_order_processing_saga(self):
"""Create realistic order processing saga."""
saga = Saga()
# Step 1: Validate order
saga.local_step() \
.on_execute(self.validate_order) \
.on_failure(self.log_validation_failure)
# Step 2: Reserve inventory
saga.remote_step() \
.on_execute(self.reserve_inventory) \
.on_success(self.handle_inventory_reserved) \
.on_error(self.handle_inventory_error) \
.on_failure(self.release_inventory)
# Step 3: Process payment
saga.remote_step() \
.on_execute(self.process_payment) \
.on_success(self.handle_payment_success) \
.on_error(self.handle_payment_error) \
.on_failure(self.refund_payment)
# Step 4: Send confirmation
saga.local_step().on_execute(self.send_confirmation)
return saga.commit()
# Implementation of saga callbacks for testing
def validate_order(self, context):
context.order_validated = True
return context
def reserve_inventory(self, context):
return SagaRequest(
target="inventory-service",
content={"items": context.items, "order_id": context.order_id}
)
def handle_inventory_reserved(self, context, response):
data = response.content()
context.inventory_reserved = True
context.reservation_id = data["reservation_id"]
return context
# Additional callback implementations...import unittest
import time
import asyncio
from statistics import mean, stdev
class SagaPerformanceTestCase(unittest.TestCase):
"""Performance tests for saga execution."""
async def test_execution_performance(self):
"""Test saga execution performance under load."""
saga = self.create_performance_test_saga()
execution_times = []
num_executions = 100
for i in range(num_executions):
context = SagaContext(iteration=i, data=f"test_data_{i}")
start_time = time.time()
result = await self.manager.run(definition=saga, context=context)
end_time = time.time()
execution_times.append(end_time - start_time)
# Analyze performance
avg_time = mean(execution_times)
std_dev = stdev(execution_times)
max_time = max(execution_times)
min_time = min(execution_times)
print(f"Performance Results:")
print(f" Average execution time: {avg_time:.4f}s")
print(f" Standard deviation: {std_dev:.4f}s")
print(f" Max execution time: {max_time:.4f}s")
print(f" Min execution time: {min_time:.4f}s")
# Assert performance criteria
self.assertLess(avg_time, 0.1, "Average execution time too high")
self.assertLess(max_time, 0.5, "Max execution time too high")
async def test_concurrent_executions(self):
"""Test concurrent saga executions."""
saga = self.create_performance_test_saga()
# Create concurrent executions
tasks = []
for i in range(50):
context = SagaContext(concurrent_test=i)
task = self.manager.run(definition=saga, context=context)
tasks.append(task)
# Execute concurrently
start_time = time.time()
results = await asyncio.gather(*tasks)
end_time = time.time()
total_time = end_time - start_time
print(f"Concurrent execution of 50 sagas took: {total_time:.2f}s")
# Verify all completed successfully
self.assertEqual(len(results), 50)
for i, result in enumerate(results):
self.assertEqual(result.concurrent_test, i)
def create_performance_test_saga(self):
"""Create saga optimized for performance testing."""
saga = Saga()
saga.local_step().on_execute(lambda ctx: ctx)
return saga.commit()Install with Tessl CLI
npx tessl i tessl/pypi-minos-microservice-saga