CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-minos-microservice-saga

Implementation of the SAGA pattern for distributed microservice transactions in the Minos Framework.

Pending
Overview
Eval results
Files

testing-utilities.mddocs/

Testing Utilities

Testing utilities and mocked implementations for saga development and validation including repository test cases and operation factories. This module provides comprehensive testing infrastructure to validate saga behavior, execution flows, and integration with storage and messaging systems.

Capabilities

Repository Testing Base Class

Abstract test case providing standardized tests for saga execution repository implementations.

from minos.common.testing import MinosTestCase
from abc import abstractmethod

class SagaExecutionRepositoryTestCase(MinosTestCase):
    """
    Base test case for saga execution repository implementations.
    
    Provides standard test methods to validate repository implementations
    conform to the expected interface and behavior patterns.
    """
    
    @abstractmethod
    def build_saga_execution_repository(self):
        """
        Abstract method to build repository instance.
        
        Subclasses must implement this method to return a configured
        instance of their repository implementation for testing.
        
        Returns:
            SagaExecutionRepository: Repository instance to test
        """
    
    def test_store(self):
        """
        Test storing saga executions.
        
        Validates that the repository can successfully store saga execution
        instances and that they can be retrieved with correct data.
        """
    
    def test_load_from_str(self):
        """
        Test loading executions from string UUID.
        
        Validates that executions can be loaded using string representation
        of UUIDs as well as UUID objects.
        """
    
    def test_delete(self):
        """
        Test deleting saga executions.
        
        Validates that executions can be deleted from the repository
        and that subsequent load attempts properly raise exceptions.
        """
    
    def test_load_raises(self):
        """
        Test loading non-existent executions raises appropriate exceptions.
        
        Validates that attempting to load non-existent executions
        raises SagaExecutionNotFoundException.
        """
    
    def test_store_update(self):
        """
        Test updating existing stored executions.
        
        Validates that storing an execution with the same UUID
        updates the existing record rather than creating duplicates.
        """

Mocked Operation Factory

Mocked implementation of database operation factory for testing purposes.

class MockedSagaExecutionDatabaseOperationFactory(SagaExecutionDatabaseOperationFactory):
    """
    Mocked factory for testing purposes.
    
    Provides a test-friendly implementation of the database operation factory
    that can be used in unit tests without requiring actual database connections.
    """
    
    def __init__(self, **kwargs):
        """Initialize mocked factory with test configuration."""
    
    def build_create_operation(self, execution):
        """Build mocked create operation for testing."""
    
    def build_update_operation(self, execution):
        """Build mocked update operation for testing."""
    
    def build_delete_operation(self, uuid):
        """Build mocked delete operation for testing."""
    
    def build_select_operation(self, uuid):
        """Build mocked select operation for testing."""

Usage Examples

Creating Repository Test Cases

import unittest
from minos.saga.testing import SagaExecutionRepositoryTestCase
from minos.saga import SagaExecution, Saga, SagaContext, SagaStatus
from uuid import uuid4

class MyRepositoryTestCase(SagaExecutionRepositoryTestCase):
    """Test case for custom repository implementation."""
    
    def build_saga_execution_repository(self):
        """Build repository instance for testing."""
        # Return your repository implementation
        return MyCustomSagaRepository(
            connection_string="test://localhost",
            schema="test_schema"
        )
    
    def test_custom_repository_features(self):
        """Test custom features specific to your repository."""
        repo = self.build_saga_execution_repository()
        
        # Create test execution
        saga_def = Saga()
        saga_def.local_step().on_execute(lambda ctx: ctx)
        committed_saga = saga_def.commit()
        
        execution = SagaExecution.from_definition(
            definition=committed_saga,
            context=SagaContext(test_data="value"),
            uuid=uuid4()
        )
        
        # Test storing
        await repo.store(execution)
        
        # Test loading
        loaded = await repo.load(execution.uuid)
        self.assertEqual(loaded.uuid, execution.uuid)
        self.assertEqual(loaded.context.test_data, "value")
        
        # Test your custom features
        self.assertTrue(repo.has_custom_feature())
        
    def test_repository_error_handling(self):
        """Test repository error handling."""
        repo = self.build_saga_execution_repository()
        
        # Test loading non-existent execution
        with self.assertRaises(SagaExecutionNotFoundException):
            await repo.load(uuid4())
        
        # Test deleting non-existent execution
        # Should not raise exception
        await repo.delete(uuid4())

# Run the tests
if __name__ == '__main__':
    unittest.main()

Testing Saga Definitions

import unittest
from minos.saga import (
    Saga, SagaContext, LocalSagaStep, RemoteSagaStep,
    EmptySagaException, UndefinedOnExecuteException
)

class SagaDefinitionTestCase(unittest.TestCase):
    """Test cases for saga definition validation."""
    
    def test_empty_saga_raises_exception(self):
        """Test that empty saga cannot be committed."""
        saga = Saga()
        
        with self.assertRaises(EmptySagaException):
            saga.commit()
    
    def test_step_without_execute_raises_exception(self):
        """Test that step without on_execute cannot be validated."""
        saga = Saga()
        step = saga.local_step()
        # Don't set on_execute
        
        with self.assertRaises(UndefinedOnExecuteException):
            saga.commit()
    
    def test_valid_saga_commits_successfully(self):
        """Test that valid saga commits without errors."""
        saga = Saga()
        saga.local_step().on_execute(lambda ctx: ctx)
        
        committed_saga = saga.commit()
        self.assertTrue(committed_saga.committed)
        self.assertEqual(len(committed_saga.steps), 1)
    
    def test_complex_saga_structure(self):
        """Test complex saga with multiple step types."""
        saga = Saga()
        
        # Local step
        local_step = saga.local_step()
        local_step.on_execute(lambda ctx: ctx)
        local_step.on_failure(lambda ctx: ctx)
        
        # Remote step
        remote_step = saga.remote_step()
        remote_step.on_execute(lambda ctx: None)
        remote_step.on_success(lambda ctx, resp: ctx)
        remote_step.on_error(lambda ctx, resp: ctx)
        remote_step.on_failure(lambda ctx: None)
        
        # Conditional step
        conditional = saga.conditional_step()
        inner_saga = Saga()
        inner_saga.local_step().on_execute(lambda ctx: ctx)
        conditional.if_then(lambda ctx: True, inner_saga.commit())
        
        committed_saga = saga.commit()
        self.assertEqual(len(committed_saga.steps), 3)
        self.assertIsInstance(committed_saga.steps[0], LocalSagaStep)
        self.assertIsInstance(committed_saga.steps[1], RemoteSagaStep)
        self.assertIsInstance(committed_saga.steps[2], ConditionalSagaStep)

Testing Saga Execution

import unittest
from unittest.mock import AsyncMock, MagicMock
from minos.saga import (
    SagaExecution, SagaManager, SagaContext, SagaRequest, SagaResponse,
    SagaStatus, SagaResponseStatus
)

class SagaExecutionTestCase(unittest.TestCase):
    """Test cases for saga execution behavior."""
    
    def setUp(self):
        """Set up test fixtures."""
        self.mock_repo = AsyncMock()
        self.mock_broker = AsyncMock()
        self.manager = SagaManager(
            storage=self.mock_repo,
            broker_pool=self.mock_broker
        )
    
    async def test_successful_execution(self):
        """Test successful saga execution flow."""
        # Create test saga
        saga = Saga()
        saga.local_step().on_execute(self.success_callback)
        committed_saga = saga.commit()
        
        # Execute saga
        context = SagaContext(test_value=42)
        result = await self.manager.run(
            definition=committed_saga,
            context=context
        )
        
        # Verify results
        self.assertEqual(result.test_value, 42)
        self.assertEqual(result.processed, True)
    
    async def test_execution_with_failure(self):
        """Test saga execution with step failure."""
        # Create saga that will fail
        saga = Saga()
        saga.local_step().on_execute(self.failing_callback)
        committed_saga = saga.commit()
        
        # Execute saga
        context = SagaContext(test_value=42)
        
        with self.assertRaises(SagaFailedExecutionException):
            await self.manager.run(
                definition=committed_saga,
                context=context,
                raise_on_error=True
            )
    
    async def test_remote_step_execution(self):
        """Test remote step execution with mocked responses."""
        # Create saga with remote step
        saga = Saga()
        saga.remote_step() \
            .on_execute(self.create_request) \
            .on_success(self.handle_success)
        committed_saga = saga.commit()
        
        # Mock broker response
        mock_response = SagaResponse(
            content={"result": "processed"},
            status=SagaResponseStatus.SUCCESS
        )
        
        # Execute saga
        context = SagaContext(order_id=123)
        result = await self.manager.run(
            definition=committed_saga,
            context=context
        )
        
        # Verify remote call was made
        self.mock_broker.send.assert_called_once()
        self.assertEqual(result.result, "processed")
    
    def success_callback(self, context):
        """Test callback that succeeds."""
        context.processed = True
        return context
    
    def failing_callback(self, context):
        """Test callback that fails."""
        raise ValueError("Test failure")
    
    def create_request(self, context):
        """Test callback that creates request."""
        return SagaRequest(
            target="test-service",
            content={"order_id": context.order_id}
        )
    
    def handle_success(self, context, response):
        """Test callback that handles successful response."""
        data = response.content()
        context.update(data)
        return context

Testing with Mocked Components

import unittest
from unittest.mock import Mock, AsyncMock, patch
from minos.saga.testing import MockedSagaExecutionDatabaseOperationFactory

class MockedComponentTestCase(unittest.TestCase):
    """Test cases using mocked components."""
    
    def setUp(self):
        """Set up mocked components."""
        self.mock_factory = MockedSagaExecutionDatabaseOperationFactory()
        self.mock_storage = Mock()
        self.mock_broker = AsyncMock()
    
    def test_mocked_database_operations(self):
        """Test using mocked database operation factory."""
        execution = self.create_test_execution()
        
        # Test create operation
        create_op = self.mock_factory.build_create_operation(execution)
        self.assertIsNotNone(create_op)
        
        # Test select operation
        select_op = self.mock_factory.build_select_operation(execution.uuid)
        self.assertIsNotNone(select_op)
        
        # Test update operation
        update_op = self.mock_factory.build_update_operation(execution)
        self.assertIsNotNone(update_op)
        
        # Test delete operation
        delete_op = self.mock_factory.build_delete_operation(execution.uuid)
        self.assertIsNotNone(delete_op)
    
    @patch('minos.saga.SagaManager')
    async def test_mocked_manager(self, mock_manager_class):
        """Test using mocked saga manager."""
        mock_manager = mock_manager_class.return_value
        mock_manager.run.return_value = SagaContext(result="mocked")
        
        # Use mocked manager
        result = await mock_manager.run(
            definition=self.create_test_saga(),
            context=SagaContext(input="test")
        )
        
        self.assertEqual(result.result, "mocked")
        mock_manager.run.assert_called_once()
    
    def create_test_execution(self):
        """Create test execution for mocking."""
        saga = Saga()
        saga.local_step().on_execute(lambda ctx: ctx)
        committed_saga = saga.commit()
        
        return SagaExecution.from_definition(
            definition=committed_saga,
            context=SagaContext(test="data")
        )
    
    def create_test_saga(self):
        """Create test saga definition."""
        saga = Saga()
        saga.local_step().on_execute(lambda ctx: ctx)
        return saga.commit()

Integration Testing

import unittest
import asyncio
from minos.saga import SagaManager, Saga, SagaContext, SagaRequest, SagaResponse

class SagaIntegrationTestCase(unittest.TestCase):
    """Integration tests for complete saga workflows."""
    
    def setUp(self):
        """Set up integration test environment."""
        # Set up real or test implementations
        self.setup_test_database()
        self.setup_test_broker()
        
        self.manager = SagaManager(
            storage=self.test_repo,
            broker_pool=self.test_broker
        )
    
    def setup_test_database(self):
        """Set up test database for integration testing."""
        # Configure test database
        self.test_repo = DatabaseSagaExecutionRepository(
            connection="sqlite:///:memory:",
            create_tables=True
        )
    
    def setup_test_broker(self):
        """Set up test message broker."""
        # Configure test broker
        self.test_broker = TestBrokerPool()
    
    async def test_end_to_end_order_processing(self):
        """Test complete order processing saga."""
        # Create realistic saga
        saga = self.create_order_processing_saga()
        
        # Create order context
        context = SagaContext(
            order_id="order_123",
            customer_id="customer_456",
            items=[
                {"sku": "ITEM1", "quantity": 2, "price": 25.00},
                {"sku": "ITEM2", "quantity": 1, "price": 50.00}
            ],
            total=100.00,
            payment_method="credit_card"
        )
        
        # Execute saga
        result = await self.manager.run(
            definition=saga,
            context=context,
            autocommit=True
        )
        
        # Verify final state
        self.assertEqual(result.order_status, "completed")
        self.assertTrue(result.payment_processed)
        self.assertTrue(result.inventory_reserved)
        self.assertIsNotNone(result.confirmation_id)
    
    async def test_saga_failure_and_compensation(self):
        """Test saga failure triggers proper compensation."""
        # Create saga that will fail at payment step
        saga = self.create_failing_payment_saga()
        
        context = SagaContext(
            order_id="order_456",
            customer_id="customer_789",
            items=[{"sku": "ITEM1", "quantity": 1}],
            total=50.00
        )
        
        # Execute saga (should fail)
        with self.assertRaises(SagaFailedExecutionException):
            await self.manager.run(
                definition=saga,
                context=context,
                raise_on_error=True
            )
        
        # Verify compensation was executed
        self.verify_inventory_released()
        self.verify_no_payment_charged()
    
    def create_order_processing_saga(self):
        """Create realistic order processing saga."""
        saga = Saga()
        
        # Step 1: Validate order
        saga.local_step() \
            .on_execute(self.validate_order) \
            .on_failure(self.log_validation_failure)
        
        # Step 2: Reserve inventory
        saga.remote_step() \
            .on_execute(self.reserve_inventory) \
            .on_success(self.handle_inventory_reserved) \
            .on_error(self.handle_inventory_error) \
            .on_failure(self.release_inventory)
        
        # Step 3: Process payment
        saga.remote_step() \
            .on_execute(self.process_payment) \
            .on_success(self.handle_payment_success) \
            .on_error(self.handle_payment_error) \
            .on_failure(self.refund_payment)
        
        # Step 4: Send confirmation
        saga.local_step().on_execute(self.send_confirmation)
        
        return saga.commit()
    
    # Implementation of saga callbacks for testing
    def validate_order(self, context):
        context.order_validated = True
        return context
    
    def reserve_inventory(self, context):
        return SagaRequest(
            target="inventory-service",
            content={"items": context.items, "order_id": context.order_id}
        )
    
    def handle_inventory_reserved(self, context, response):
        data = response.content()
        context.inventory_reserved = True
        context.reservation_id = data["reservation_id"]
        return context
    
    # Additional callback implementations...

Performance Testing

import unittest
import time
import asyncio
from statistics import mean, stdev

class SagaPerformanceTestCase(unittest.TestCase):
    """Performance tests for saga execution."""
    
    async def test_execution_performance(self):
        """Test saga execution performance under load."""
        saga = self.create_performance_test_saga()
        
        execution_times = []
        num_executions = 100
        
        for i in range(num_executions):
            context = SagaContext(iteration=i, data=f"test_data_{i}")
            
            start_time = time.time()
            result = await self.manager.run(definition=saga, context=context)
            end_time = time.time()
            
            execution_times.append(end_time - start_time)
        
        # Analyze performance
        avg_time = mean(execution_times)
        std_dev = stdev(execution_times)
        max_time = max(execution_times)
        min_time = min(execution_times)
        
        print(f"Performance Results:")
        print(f"  Average execution time: {avg_time:.4f}s")
        print(f"  Standard deviation: {std_dev:.4f}s")
        print(f"  Max execution time: {max_time:.4f}s")
        print(f"  Min execution time: {min_time:.4f}s")
        
        # Assert performance criteria
        self.assertLess(avg_time, 0.1, "Average execution time too high")
        self.assertLess(max_time, 0.5, "Max execution time too high")
    
    async def test_concurrent_executions(self):
        """Test concurrent saga executions."""
        saga = self.create_performance_test_saga()
        
        # Create concurrent executions
        tasks = []
        for i in range(50):
            context = SagaContext(concurrent_test=i)
            task = self.manager.run(definition=saga, context=context)
            tasks.append(task)
        
        # Execute concurrently
        start_time = time.time()
        results = await asyncio.gather(*tasks)
        end_time = time.time()
        
        total_time = end_time - start_time
        print(f"Concurrent execution of 50 sagas took: {total_time:.2f}s")
        
        # Verify all completed successfully
        self.assertEqual(len(results), 50)
        for i, result in enumerate(results):
            self.assertEqual(result.concurrent_test, i)
    
    def create_performance_test_saga(self):
        """Create saga optimized for performance testing."""
        saga = Saga()
        saga.local_step().on_execute(lambda ctx: ctx)
        return saga.commit()

Install with Tessl CLI

npx tessl i tessl/pypi-minos-microservice-saga

docs

context-management.md

exception-handling.md

execution-engine.md

index.md

message-system.md

saga-definitions.md

testing-utilities.md

tile.json