0
# Minos Microservice Saga
1
2
A comprehensive Python library implementing the SAGA pattern for distributed microservice transactions in the Minos Framework. This package provides orchestration capabilities for managing complex business processes that span multiple microservices, ensuring data consistency through eventual consistency patterns and compensation-based rollback mechanisms.
3
4
## Package Information
5
6
- **Package Name**: minos-microservice-saga
7
- **Package Type**: pypi
8
- **Language**: Python
9
- **Installation**: `pip install minos-microservice-saga`
10
11
## Core Imports
12
13
```python
14
from minos.saga import (
15
# Core classes
16
Saga,
17
SagaContext,
18
SagaManager,
19
SagaExecution,
20
SagaService,
21
22
# Step definitions
23
LocalSagaStep,
24
RemoteSagaStep,
25
ConditionalSagaStep,
26
IfThenAlternative,
27
ElseThenAlternative,
28
29
# Messages
30
SagaRequest,
31
SagaResponse,
32
SagaResponseStatus,
33
34
# Status and execution
35
SagaStatus,
36
SagaStepStatus,
37
SagaStepExecution,
38
39
# Repositories
40
SagaExecutionRepository,
41
DatabaseSagaExecutionRepository,
42
43
# Transaction management
44
TransactionCommitter,
45
46
# Middleware and utilities
47
transactional_command,
48
get_service_name,
49
)
50
```
51
52
## Basic Usage
53
54
```python
55
from minos.saga import Saga, SagaContext, SagaManager, SagaRequest
56
57
# Define a saga with multiple steps
58
def create_order_saga():
59
saga = Saga()
60
61
# Add local step for order validation
62
saga.local_step().on_execute(validate_order).on_failure(handle_validation_failure)
63
64
# Add remote step for payment processing
65
saga.remote_step() \
66
.on_execute(create_payment_request) \
67
.on_success(handle_payment_success) \
68
.on_error(handle_payment_error) \
69
.on_failure(compensate_payment)
70
71
# Add remote step for inventory reservation
72
saga.remote_step() \
73
.on_execute(reserve_inventory_request) \
74
.on_success(handle_inventory_success) \
75
.on_error(handle_inventory_error) \
76
.on_failure(release_inventory)
77
78
return saga.commit()
79
80
# Execute the saga
81
async def process_order(order_data):
82
saga_definition = create_order_saga()
83
context = SagaContext(order=order_data)
84
85
# Initialize saga manager with storage and broker
86
manager = SagaManager(storage=repo, broker_pool=broker)
87
88
# Run the saga
89
result = await manager.run(saga_definition, context=context)
90
return result
91
92
# Define callback functions
93
def validate_order(context):
94
# Local validation logic
95
if not context.order.get('amount'):
96
raise ValueError("Order amount required")
97
return context
98
99
def create_payment_request(context):
100
# Create payment service request
101
return SagaRequest(
102
target="payment-service",
103
content={"amount": context.order["amount"]}
104
)
105
106
def handle_payment_success(context, response):
107
# Process successful payment
108
context.payment_id = response.content()["payment_id"]
109
return context
110
```
111
112
## Architecture
113
114
The SAGA pattern implementation is built around several key components:
115
116
- **Saga Definitions**: Declarative step sequences with compensation logic
117
- **Execution Engine**: Runtime orchestrator managing step execution and state
118
- **Context Management**: Stateful container carrying data between steps
119
- **Message System**: Request/response infrastructure for microservice communication
120
- **Transaction Management**: Two-phase commit protocol for distributed consistency
121
- **Persistence Layer**: Durable storage for execution state and recovery
122
123
This architecture enables resilient distributed transactions with automatic compensation, pause/resume capabilities, and comprehensive error handling for complex microservice workflows.
124
125
## Capabilities
126
127
### Saga Definition and Construction
128
129
Core functionality for defining distributed transaction sequences with local and remote steps, conditional logic, and compensation behaviors.
130
131
```python { .api }
132
class Saga:
133
def __init__(self, steps=None, committed=False, **kwargs): ...
134
def local_step(self, step=None, **kwargs): ...
135
def remote_step(self, step=None, **kwargs): ...
136
def conditional_step(self, step=None): ...
137
def commit(self, callback=None, **kwargs): ...
138
139
class LocalSagaStep(SagaStep):
140
def on_execute(self, callback, parameters=None, **kwargs): ...
141
def on_failure(self, callback, parameters=None, **kwargs): ...
142
143
class RemoteSagaStep(SagaStep):
144
def on_execute(self, callback, parameters=None, **kwargs): ...
145
def on_success(self, callback, parameters=None, **kwargs): ...
146
def on_error(self, callback, parameters=None, **kwargs): ...
147
def on_failure(self, callback, parameters=None, **kwargs): ...
148
149
class ConditionalSagaStep(SagaStep):
150
def if_then(self, condition, saga): ...
151
def else_then(self, saga): ...
152
```
153
154
[Saga Definitions](./saga-definitions.md)
155
156
### Execution Management and Orchestration
157
158
Runtime execution engine providing saga orchestration, state management, pause/resume capabilities, and comprehensive lifecycle control.
159
160
```python { .api }
161
class SagaExecution:
162
def __init__(self, definition, uuid, context, status=SagaStatus.Created, **kwargs): ...
163
def execute(self, response=None, autocommit=True, **kwargs): ...
164
def rollback(self, autoreject=True, **kwargs): ...
165
def commit(self, **kwargs): ...
166
def reject(self, **kwargs): ...
167
168
class SagaManager:
169
def __init__(self, storage, broker_pool=None, **kwargs): ...
170
def run(self, definition=None, context=None, response=None, user=None, autocommit=True, pause_on_disk=False, **kwargs): ...
171
172
class SagaStatus(Enum):
173
Created = "created"
174
Running = "running"
175
Paused = "paused"
176
Finished = "finished"
177
Errored = "errored"
178
```
179
180
[Execution Engine](./execution-engine.md)
181
182
### Context and State Management
183
184
Stateful execution context that maintains data across saga steps with dictionary-like interface and automatic persistence.
185
186
```python { .api }
187
class SagaContext(BucketModel, MutableMapping):
188
def __init__(self, **kwargs): ...
189
def __setitem__(self, key, value): ...
190
def __getitem__(self, key): ...
191
def __setattr__(self, key, value): ...
192
```
193
194
[Context Management](./context-management.md)
195
196
### Message System and Communication
197
198
Request/response infrastructure for microservice communication with status tracking and service relationship management.
199
200
```python { .api }
201
class SagaRequest:
202
def __init__(self, target, content=None): ...
203
def content(self, **kwargs): ...
204
205
class SagaResponse:
206
def __init__(self, content=None, related_services=None, status=None, uuid=None, **kwargs): ...
207
def content(self, **kwargs): ...
208
209
class SagaResponseStatus(IntEnum):
210
SUCCESS = 200
211
ERROR = 400
212
SYSTEM_ERROR = 500
213
```
214
215
[Message System](./message-system.md)
216
217
### Exception Handling and Error Management
218
219
Comprehensive exception hierarchy for saga definition validation, execution errors, and system failures with specific error types for different failure scenarios.
220
221
```python { .api }
222
class SagaException(MinosException): ...
223
class SagaExecutionException(SagaException): ...
224
class SagaFailedExecutionException(SagaExecutionException): ...
225
class SagaRollbackExecutionException(SagaExecutionException): ...
226
class SagaResponseException(SagaException): ...
227
```
228
229
[Exception Handling](./exception-handling.md)
230
231
### Testing and Development Utilities
232
233
Testing utilities and mocked implementations for saga development and validation including repository test cases and operation factories.
234
235
```python { .api }
236
class SagaExecutionRepositoryTestCase:
237
def build_saga_execution_repository(self): ...
238
def test_store(self): ...
239
def test_load_from_str(self): ...
240
def test_delete(self): ...
241
242
class MockedSagaExecutionDatabaseOperationFactory: ...
243
```
244
245
[Testing Utilities](./testing-utilities.md)
246
247
### Service Integration and Middleware
248
249
Service-level integration for saga management within microservice applications with middleware support for transactional operations.
250
251
```python { .api }
252
class SagaService:
253
def __init__(self, saga_manager, **kwargs): ...
254
def __get_enroute__(self, config): ...
255
def __reply__(self, request): ...
256
257
def transactional_command(request, inner):
258
"""
259
Middleware for transactional command execution.
260
261
Provides automatic transaction context management for
262
saga operations within command handlers.
263
264
Args:
265
request: Incoming command request
266
inner: Inner command handler function
267
268
Returns:
269
Command response with transaction context
270
"""
271
272
def get_service_name(config):
273
"""
274
Utility function to extract service name from configuration.
275
276
Args:
277
config: Service configuration object
278
279
Returns:
280
str: Service name identifier
281
"""
282
```
283
284
## Types
285
286
```python { .api }
287
from typing import Callable, Union, Awaitable, Optional, TypeVar, Any, Dict, List, Tuple
288
from uuid import UUID
289
from enum import Enum, IntEnum
290
291
# Type variables
292
T = TypeVar('T')
293
294
# Core callback types
295
RequestCallBack = Callable[[SagaContext, ...], Union[SagaResponse, Awaitable[SagaRequest]]]
296
ResponseCallBack = Callable[[SagaContext, SagaResponse, ...], Union[Union[Exception, SagaContext], Awaitable[Union[Exception, SagaContext]]]]
297
LocalCallback = Callable[[SagaContext, ...], Union[Optional[SagaContext], Awaitable[Optional[SagaContext]]]]
298
299
# Operation wrapper
300
class SagaOperation:
301
callback: T
302
parameters: Optional[SagaContext]
303
parameterized: bool
304
raw: Dict[str, Any]
305
306
# Step definitions
307
class SagaStep:
308
saga: Optional[Saga]
309
raw: Dict[str, Any]
310
311
# Alternative classes for conditional steps
312
class IfThenAlternative:
313
condition: Any
314
saga: Saga
315
316
class ElseThenAlternative:
317
saga: Saga
318
319
# Status enumerations
320
class SagaStatus(Enum):
321
Created: str
322
Running: str
323
Paused: str
324
Finished: str
325
Errored: str
326
327
class SagaStepStatus(Enum):
328
Created: str
329
RunningOnExecute: str
330
FinishedOnExecute: str
331
ErroredOnExecute: str
332
PausedByOnExecute: str
333
ErroredByOnExecute: str
334
RunningOnFailure: str
335
PausedOnFailure: str
336
ErroredOnFailure: str
337
RunningOnSuccess: str
338
ErroredOnSuccess: str
339
RunningOnError: str
340
ErroredOnError: str
341
Finished: str
342
343
class SagaResponseStatus(IntEnum):
344
SUCCESS: int
345
ERROR: int
346
SYSTEM_ERROR: int
347
348
# Execution types
349
class SagaStepExecution:
350
uuid: UUID
351
definition: SagaStep
352
status: SagaStepStatus
353
already_rollback: bool
354
related_services: Optional[List[str]]
355
raw: Dict[str, Any]
356
357
class TransactionCommitter:
358
execution_uuid: UUID
359
executed_steps: List[SagaStepExecution]
360
transactions: List[Tuple[UUID, str]]
361
362
# Repository interface
363
class SagaExecutionRepository:
364
async def store(self, execution: SagaExecution) -> None: ...
365
async def load(self, uuid: Union[UUID, str]) -> SagaExecution: ...
366
async def delete(self, uuid: Union[UUID, str]) -> None: ...
367
```