0
# Testing Utilities
1
2
Testing utilities and mocked implementations for saga development and validation including repository test cases and operation factories. This module provides comprehensive testing infrastructure to validate saga behavior, execution flows, and integration with storage and messaging systems.
3
4
## Capabilities
5
6
### Repository Testing Base Class
7
8
Abstract test case providing standardized tests for saga execution repository implementations.
9
10
```python { .api }
11
from minos.common.testing import MinosTestCase
12
from abc import abstractmethod
13
14
class SagaExecutionRepositoryTestCase(MinosTestCase):
15
"""
16
Base test case for saga execution repository implementations.
17
18
Provides standard test methods to validate repository implementations
19
conform to the expected interface and behavior patterns.
20
"""
21
22
@abstractmethod
23
def build_saga_execution_repository(self):
24
"""
25
Abstract method to build repository instance.
26
27
Subclasses must implement this method to return a configured
28
instance of their repository implementation for testing.
29
30
Returns:
31
SagaExecutionRepository: Repository instance to test
32
"""
33
34
def test_store(self):
35
"""
36
Test storing saga executions.
37
38
Validates that the repository can successfully store saga execution
39
instances and that they can be retrieved with correct data.
40
"""
41
42
def test_load_from_str(self):
43
"""
44
Test loading executions from string UUID.
45
46
Validates that executions can be loaded using string representation
47
of UUIDs as well as UUID objects.
48
"""
49
50
def test_delete(self):
51
"""
52
Test deleting saga executions.
53
54
Validates that executions can be deleted from the repository
55
and that subsequent load attempts properly raise exceptions.
56
"""
57
58
def test_load_raises(self):
59
"""
60
Test loading non-existent executions raises appropriate exceptions.
61
62
Validates that attempting to load non-existent executions
63
raises SagaExecutionNotFoundException.
64
"""
65
66
def test_store_update(self):
67
"""
68
Test updating existing stored executions.
69
70
Validates that storing an execution with the same UUID
71
updates the existing record rather than creating duplicates.
72
"""
73
```
74
75
### Mocked Operation Factory
76
77
Mocked implementation of database operation factory for testing purposes.
78
79
```python { .api }
80
class MockedSagaExecutionDatabaseOperationFactory(SagaExecutionDatabaseOperationFactory):
81
"""
82
Mocked factory for testing purposes.
83
84
Provides a test-friendly implementation of the database operation factory
85
that can be used in unit tests without requiring actual database connections.
86
"""
87
88
def __init__(self, **kwargs):
89
"""Initialize mocked factory with test configuration."""
90
91
def build_create_operation(self, execution):
92
"""Build mocked create operation for testing."""
93
94
def build_update_operation(self, execution):
95
"""Build mocked update operation for testing."""
96
97
def build_delete_operation(self, uuid):
98
"""Build mocked delete operation for testing."""
99
100
def build_select_operation(self, uuid):
101
"""Build mocked select operation for testing."""
102
```
103
104
## Usage Examples
105
106
### Creating Repository Test Cases
107
108
```python
109
import unittest
110
from minos.saga.testing import SagaExecutionRepositoryTestCase
111
from minos.saga import SagaExecution, Saga, SagaContext, SagaStatus
112
from uuid import uuid4
113
114
class MyRepositoryTestCase(SagaExecutionRepositoryTestCase):
115
"""Test case for custom repository implementation."""
116
117
def build_saga_execution_repository(self):
118
"""Build repository instance for testing."""
119
# Return your repository implementation
120
return MyCustomSagaRepository(
121
connection_string="test://localhost",
122
schema="test_schema"
123
)
124
125
def test_custom_repository_features(self):
126
"""Test custom features specific to your repository."""
127
repo = self.build_saga_execution_repository()
128
129
# Create test execution
130
saga_def = Saga()
131
saga_def.local_step().on_execute(lambda ctx: ctx)
132
committed_saga = saga_def.commit()
133
134
execution = SagaExecution.from_definition(
135
definition=committed_saga,
136
context=SagaContext(test_data="value"),
137
uuid=uuid4()
138
)
139
140
# Test storing
141
await repo.store(execution)
142
143
# Test loading
144
loaded = await repo.load(execution.uuid)
145
self.assertEqual(loaded.uuid, execution.uuid)
146
self.assertEqual(loaded.context.test_data, "value")
147
148
# Test your custom features
149
self.assertTrue(repo.has_custom_feature())
150
151
def test_repository_error_handling(self):
152
"""Test repository error handling."""
153
repo = self.build_saga_execution_repository()
154
155
# Test loading non-existent execution
156
with self.assertRaises(SagaExecutionNotFoundException):
157
await repo.load(uuid4())
158
159
# Test deleting non-existent execution
160
# Should not raise exception
161
await repo.delete(uuid4())
162
163
# Run the tests
164
if __name__ == '__main__':
165
unittest.main()
166
```
167
168
### Testing Saga Definitions
169
170
```python
171
import unittest
172
from minos.saga import (
173
Saga, SagaContext, LocalSagaStep, RemoteSagaStep,
174
EmptySagaException, UndefinedOnExecuteException
175
)
176
177
class SagaDefinitionTestCase(unittest.TestCase):
178
"""Test cases for saga definition validation."""
179
180
def test_empty_saga_raises_exception(self):
181
"""Test that empty saga cannot be committed."""
182
saga = Saga()
183
184
with self.assertRaises(EmptySagaException):
185
saga.commit()
186
187
def test_step_without_execute_raises_exception(self):
188
"""Test that step without on_execute cannot be validated."""
189
saga = Saga()
190
step = saga.local_step()
191
# Don't set on_execute
192
193
with self.assertRaises(UndefinedOnExecuteException):
194
saga.commit()
195
196
def test_valid_saga_commits_successfully(self):
197
"""Test that valid saga commits without errors."""
198
saga = Saga()
199
saga.local_step().on_execute(lambda ctx: ctx)
200
201
committed_saga = saga.commit()
202
self.assertTrue(committed_saga.committed)
203
self.assertEqual(len(committed_saga.steps), 1)
204
205
def test_complex_saga_structure(self):
206
"""Test complex saga with multiple step types."""
207
saga = Saga()
208
209
# Local step
210
local_step = saga.local_step()
211
local_step.on_execute(lambda ctx: ctx)
212
local_step.on_failure(lambda ctx: ctx)
213
214
# Remote step
215
remote_step = saga.remote_step()
216
remote_step.on_execute(lambda ctx: None)
217
remote_step.on_success(lambda ctx, resp: ctx)
218
remote_step.on_error(lambda ctx, resp: ctx)
219
remote_step.on_failure(lambda ctx: None)
220
221
# Conditional step
222
conditional = saga.conditional_step()
223
inner_saga = Saga()
224
inner_saga.local_step().on_execute(lambda ctx: ctx)
225
conditional.if_then(lambda ctx: True, inner_saga.commit())
226
227
committed_saga = saga.commit()
228
self.assertEqual(len(committed_saga.steps), 3)
229
self.assertIsInstance(committed_saga.steps[0], LocalSagaStep)
230
self.assertIsInstance(committed_saga.steps[1], RemoteSagaStep)
231
self.assertIsInstance(committed_saga.steps[2], ConditionalSagaStep)
232
```
233
234
### Testing Saga Execution
235
236
```python
237
import unittest
238
from unittest.mock import AsyncMock, MagicMock
239
from minos.saga import (
240
SagaExecution, SagaManager, SagaContext, SagaRequest, SagaResponse,
241
SagaStatus, SagaResponseStatus
242
)
243
244
class SagaExecutionTestCase(unittest.TestCase):
245
"""Test cases for saga execution behavior."""
246
247
def setUp(self):
248
"""Set up test fixtures."""
249
self.mock_repo = AsyncMock()
250
self.mock_broker = AsyncMock()
251
self.manager = SagaManager(
252
storage=self.mock_repo,
253
broker_pool=self.mock_broker
254
)
255
256
async def test_successful_execution(self):
257
"""Test successful saga execution flow."""
258
# Create test saga
259
saga = Saga()
260
saga.local_step().on_execute(self.success_callback)
261
committed_saga = saga.commit()
262
263
# Execute saga
264
context = SagaContext(test_value=42)
265
result = await self.manager.run(
266
definition=committed_saga,
267
context=context
268
)
269
270
# Verify results
271
self.assertEqual(result.test_value, 42)
272
self.assertEqual(result.processed, True)
273
274
async def test_execution_with_failure(self):
275
"""Test saga execution with step failure."""
276
# Create saga that will fail
277
saga = Saga()
278
saga.local_step().on_execute(self.failing_callback)
279
committed_saga = saga.commit()
280
281
# Execute saga
282
context = SagaContext(test_value=42)
283
284
with self.assertRaises(SagaFailedExecutionException):
285
await self.manager.run(
286
definition=committed_saga,
287
context=context,
288
raise_on_error=True
289
)
290
291
async def test_remote_step_execution(self):
292
"""Test remote step execution with mocked responses."""
293
# Create saga with remote step
294
saga = Saga()
295
saga.remote_step() \
296
.on_execute(self.create_request) \
297
.on_success(self.handle_success)
298
committed_saga = saga.commit()
299
300
# Mock broker response
301
mock_response = SagaResponse(
302
content={"result": "processed"},
303
status=SagaResponseStatus.SUCCESS
304
)
305
306
# Execute saga
307
context = SagaContext(order_id=123)
308
result = await self.manager.run(
309
definition=committed_saga,
310
context=context
311
)
312
313
# Verify remote call was made
314
self.mock_broker.send.assert_called_once()
315
self.assertEqual(result.result, "processed")
316
317
def success_callback(self, context):
318
"""Test callback that succeeds."""
319
context.processed = True
320
return context
321
322
def failing_callback(self, context):
323
"""Test callback that fails."""
324
raise ValueError("Test failure")
325
326
def create_request(self, context):
327
"""Test callback that creates request."""
328
return SagaRequest(
329
target="test-service",
330
content={"order_id": context.order_id}
331
)
332
333
def handle_success(self, context, response):
334
"""Test callback that handles successful response."""
335
data = response.content()
336
context.update(data)
337
return context
338
```
339
340
### Testing with Mocked Components
341
342
```python
343
import unittest
344
from unittest.mock import Mock, AsyncMock, patch
345
from minos.saga.testing import MockedSagaExecutionDatabaseOperationFactory
346
347
class MockedComponentTestCase(unittest.TestCase):
348
"""Test cases using mocked components."""
349
350
def setUp(self):
351
"""Set up mocked components."""
352
self.mock_factory = MockedSagaExecutionDatabaseOperationFactory()
353
self.mock_storage = Mock()
354
self.mock_broker = AsyncMock()
355
356
def test_mocked_database_operations(self):
357
"""Test using mocked database operation factory."""
358
execution = self.create_test_execution()
359
360
# Test create operation
361
create_op = self.mock_factory.build_create_operation(execution)
362
self.assertIsNotNone(create_op)
363
364
# Test select operation
365
select_op = self.mock_factory.build_select_operation(execution.uuid)
366
self.assertIsNotNone(select_op)
367
368
# Test update operation
369
update_op = self.mock_factory.build_update_operation(execution)
370
self.assertIsNotNone(update_op)
371
372
# Test delete operation
373
delete_op = self.mock_factory.build_delete_operation(execution.uuid)
374
self.assertIsNotNone(delete_op)
375
376
@patch('minos.saga.SagaManager')
377
async def test_mocked_manager(self, mock_manager_class):
378
"""Test using mocked saga manager."""
379
mock_manager = mock_manager_class.return_value
380
mock_manager.run.return_value = SagaContext(result="mocked")
381
382
# Use mocked manager
383
result = await mock_manager.run(
384
definition=self.create_test_saga(),
385
context=SagaContext(input="test")
386
)
387
388
self.assertEqual(result.result, "mocked")
389
mock_manager.run.assert_called_once()
390
391
def create_test_execution(self):
392
"""Create test execution for mocking."""
393
saga = Saga()
394
saga.local_step().on_execute(lambda ctx: ctx)
395
committed_saga = saga.commit()
396
397
return SagaExecution.from_definition(
398
definition=committed_saga,
399
context=SagaContext(test="data")
400
)
401
402
def create_test_saga(self):
403
"""Create test saga definition."""
404
saga = Saga()
405
saga.local_step().on_execute(lambda ctx: ctx)
406
return saga.commit()
407
```
408
409
### Integration Testing
410
411
```python
412
import unittest
413
import asyncio
414
from minos.saga import SagaManager, Saga, SagaContext, SagaRequest, SagaResponse
415
416
class SagaIntegrationTestCase(unittest.TestCase):
417
"""Integration tests for complete saga workflows."""
418
419
def setUp(self):
420
"""Set up integration test environment."""
421
# Set up real or test implementations
422
self.setup_test_database()
423
self.setup_test_broker()
424
425
self.manager = SagaManager(
426
storage=self.test_repo,
427
broker_pool=self.test_broker
428
)
429
430
def setup_test_database(self):
431
"""Set up test database for integration testing."""
432
# Configure test database
433
self.test_repo = DatabaseSagaExecutionRepository(
434
connection="sqlite:///:memory:",
435
create_tables=True
436
)
437
438
def setup_test_broker(self):
439
"""Set up test message broker."""
440
# Configure test broker
441
self.test_broker = TestBrokerPool()
442
443
async def test_end_to_end_order_processing(self):
444
"""Test complete order processing saga."""
445
# Create realistic saga
446
saga = self.create_order_processing_saga()
447
448
# Create order context
449
context = SagaContext(
450
order_id="order_123",
451
customer_id="customer_456",
452
items=[
453
{"sku": "ITEM1", "quantity": 2, "price": 25.00},
454
{"sku": "ITEM2", "quantity": 1, "price": 50.00}
455
],
456
total=100.00,
457
payment_method="credit_card"
458
)
459
460
# Execute saga
461
result = await self.manager.run(
462
definition=saga,
463
context=context,
464
autocommit=True
465
)
466
467
# Verify final state
468
self.assertEqual(result.order_status, "completed")
469
self.assertTrue(result.payment_processed)
470
self.assertTrue(result.inventory_reserved)
471
self.assertIsNotNone(result.confirmation_id)
472
473
async def test_saga_failure_and_compensation(self):
474
"""Test saga failure triggers proper compensation."""
475
# Create saga that will fail at payment step
476
saga = self.create_failing_payment_saga()
477
478
context = SagaContext(
479
order_id="order_456",
480
customer_id="customer_789",
481
items=[{"sku": "ITEM1", "quantity": 1}],
482
total=50.00
483
)
484
485
# Execute saga (should fail)
486
with self.assertRaises(SagaFailedExecutionException):
487
await self.manager.run(
488
definition=saga,
489
context=context,
490
raise_on_error=True
491
)
492
493
# Verify compensation was executed
494
self.verify_inventory_released()
495
self.verify_no_payment_charged()
496
497
def create_order_processing_saga(self):
498
"""Create realistic order processing saga."""
499
saga = Saga()
500
501
# Step 1: Validate order
502
saga.local_step() \
503
.on_execute(self.validate_order) \
504
.on_failure(self.log_validation_failure)
505
506
# Step 2: Reserve inventory
507
saga.remote_step() \
508
.on_execute(self.reserve_inventory) \
509
.on_success(self.handle_inventory_reserved) \
510
.on_error(self.handle_inventory_error) \
511
.on_failure(self.release_inventory)
512
513
# Step 3: Process payment
514
saga.remote_step() \
515
.on_execute(self.process_payment) \
516
.on_success(self.handle_payment_success) \
517
.on_error(self.handle_payment_error) \
518
.on_failure(self.refund_payment)
519
520
# Step 4: Send confirmation
521
saga.local_step().on_execute(self.send_confirmation)
522
523
return saga.commit()
524
525
# Implementation of saga callbacks for testing
526
def validate_order(self, context):
527
context.order_validated = True
528
return context
529
530
def reserve_inventory(self, context):
531
return SagaRequest(
532
target="inventory-service",
533
content={"items": context.items, "order_id": context.order_id}
534
)
535
536
def handle_inventory_reserved(self, context, response):
537
data = response.content()
538
context.inventory_reserved = True
539
context.reservation_id = data["reservation_id"]
540
return context
541
542
# Additional callback implementations...
543
```
544
545
### Performance Testing
546
547
```python
548
import unittest
549
import time
550
import asyncio
551
from statistics import mean, stdev
552
553
class SagaPerformanceTestCase(unittest.TestCase):
554
"""Performance tests for saga execution."""
555
556
async def test_execution_performance(self):
557
"""Test saga execution performance under load."""
558
saga = self.create_performance_test_saga()
559
560
execution_times = []
561
num_executions = 100
562
563
for i in range(num_executions):
564
context = SagaContext(iteration=i, data=f"test_data_{i}")
565
566
start_time = time.time()
567
result = await self.manager.run(definition=saga, context=context)
568
end_time = time.time()
569
570
execution_times.append(end_time - start_time)
571
572
# Analyze performance
573
avg_time = mean(execution_times)
574
std_dev = stdev(execution_times)
575
max_time = max(execution_times)
576
min_time = min(execution_times)
577
578
print(f"Performance Results:")
579
print(f" Average execution time: {avg_time:.4f}s")
580
print(f" Standard deviation: {std_dev:.4f}s")
581
print(f" Max execution time: {max_time:.4f}s")
582
print(f" Min execution time: {min_time:.4f}s")
583
584
# Assert performance criteria
585
self.assertLess(avg_time, 0.1, "Average execution time too high")
586
self.assertLess(max_time, 0.5, "Max execution time too high")
587
588
async def test_concurrent_executions(self):
589
"""Test concurrent saga executions."""
590
saga = self.create_performance_test_saga()
591
592
# Create concurrent executions
593
tasks = []
594
for i in range(50):
595
context = SagaContext(concurrent_test=i)
596
task = self.manager.run(definition=saga, context=context)
597
tasks.append(task)
598
599
# Execute concurrently
600
start_time = time.time()
601
results = await asyncio.gather(*tasks)
602
end_time = time.time()
603
604
total_time = end_time - start_time
605
print(f"Concurrent execution of 50 sagas took: {total_time:.2f}s")
606
607
# Verify all completed successfully
608
self.assertEqual(len(results), 50)
609
for i, result in enumerate(results):
610
self.assertEqual(result.concurrent_test, i)
611
612
def create_performance_test_saga(self):
613
"""Create saga optimized for performance testing."""
614
saga = Saga()
615
saga.local_step().on_execute(lambda ctx: ctx)
616
return saga.commit()
617
```