0
# Execution Engine
1
2
Runtime execution engine providing saga orchestration, state management, pause/resume capabilities, and comprehensive lifecycle control. This module handles the actual execution of saga definitions with support for distributed coordination, error recovery, and transaction management.
3
4
## Capabilities
5
6
### Saga Execution Management
7
8
The core execution class that represents a running instance of a saga definition.
9
10
```python { .api }
11
class SagaExecution:
12
"""
13
Runtime execution instance of a saga definition.
14
15
Attributes:
16
uuid (UUID): Unique execution identifier
17
definition (Saga): Saga definition being executed
18
executed_steps (list[SagaStepExecution]): Steps that have been executed
19
context (SagaContext): Current execution context
20
status (SagaStatus): Current execution status
21
paused_step (SagaStepExecution): Currently paused step (if any)
22
already_rollback (bool): Whether rollback has been performed
23
user (Optional[UUID]): User identifier for remote steps
24
"""
25
def __init__(self, definition, uuid, context, status=SagaStatus.Created, steps=None, paused_step=None, already_rollback=False, user=None, *args, **kwargs):
26
"""
27
Initialize execution instance with definition and context.
28
29
Args:
30
definition (Saga): Saga definition to execute
31
uuid (UUID): Unique execution identifier
32
context (SagaContext): Initial execution context
33
status (SagaStatus): Initial execution status
34
steps (Optional[list[SagaStepExecution]]): Pre-existing step executions
35
paused_step (Optional[SagaStepExecution]): Currently paused step
36
already_rollback (bool): Whether rollback has been performed
37
user (Optional[UUID]): User identifier for remote steps
38
"""
39
40
@classmethod
41
def from_definition(cls, definition, context=None, uuid=None, *args, **kwargs):
42
"""
43
Create execution from saga definition.
44
45
Args:
46
definition (Saga): Committed saga definition
47
context (Optional[SagaContext]): Initial context
48
uuid (Optional[UUID]): Execution identifier
49
50
Returns:
51
SagaExecution: New execution instance
52
53
Raises:
54
SagaNotCommittedException: If saga not committed
55
"""
56
57
@classmethod
58
def from_raw(cls, raw, **kwargs):
59
"""
60
Build execution from raw representation.
61
62
Args:
63
raw (dict): Raw execution data
64
65
Returns:
66
SagaExecution: Reconstructed execution instance
67
"""
68
69
def execute(self, response=None, autocommit=True, **kwargs):
70
"""
71
Execute the saga steps.
72
73
Args:
74
response (Optional[SagaResponse]): Response for continuing paused step
75
autocommit (bool): Whether to automatically commit/reject transactions
76
77
Returns:
78
SagaContext: Final execution context
79
80
Raises:
81
SagaExecutionAlreadyExecutedException: If execution already finished
82
SagaFailedExecutionException: If execution fails
83
SagaPausedExecutionStepException: If step requires pause
84
"""
85
86
def rollback(self, autoreject=True, **kwargs):
87
"""
88
Perform compensatory rollback of executed steps.
89
90
Args:
91
autoreject (bool): Whether to automatically reject transactions
92
93
Raises:
94
SagaRollbackExecutionException: If rollback fails
95
"""
96
97
def commit(self, **kwargs):
98
"""
99
Commit execution transactions.
100
101
Raises:
102
SagaFailedCommitCallbackException: If commit callback fails
103
"""
104
105
def reject(self, **kwargs):
106
"""Reject execution transactions."""
107
```
108
109
### Saga Orchestration Manager
110
111
The main orchestrator for saga execution lifecycle management.
112
113
```python { .api }
114
class SagaManager:
115
"""
116
Main orchestrator for saga execution lifecycle.
117
118
Attributes:
119
storage (SagaExecutionRepository): Persistence repository
120
broker_pool (BrokerClientPool): Message broker connection pool
121
"""
122
def __init__(self, storage, broker_pool=None, pool_factory=None, **kwargs):
123
"""
124
Initialize manager with storage and broker pool.
125
126
Args:
127
storage (SagaExecutionRepository): Repository for saga persistence
128
broker_pool (Optional[BrokerClientPool]): Message broker pool
129
pool_factory: Factory for creating broker pools
130
"""
131
132
def run(self, definition=None, context=None, response=None, user=None, autocommit=True, pause_on_disk=False, raise_on_error=True, return_execution=True, **kwargs):
133
"""
134
Execute saga with comprehensive lifecycle management.
135
136
Args:
137
definition (Optional[Saga]): Saga definition to execute
138
context (Optional[SagaContext]): Initial execution context
139
response (Optional[SagaResponse]): Response for continuing execution
140
user (Optional[UUID]): User identifier for remote steps
141
autocommit (bool): Automatically commit/reject transactions
142
pause_on_disk (bool): Pause remote steps on disk vs memory
143
raise_on_error (bool): Raise exceptions on execution errors
144
return_execution (bool): Return SagaExecution vs UUID
145
146
Returns:
147
Union[SagaExecution, UUID, SagaContext]: Execution result based on options
148
149
Raises:
150
SagaFailedExecutionException: If execution fails and raise_on_error=True
151
"""
152
153
@classmethod
154
def _from_config(cls, config, **kwargs):
155
"""
156
Build manager from configuration.
157
158
Args:
159
config: Configuration object
160
161
Returns:
162
SagaManager: Configured manager instance
163
"""
164
```
165
166
### Execution Status Management
167
168
Enums defining the various states of saga and step execution.
169
170
```python { .api }
171
from enum import Enum
172
173
class SagaStatus(Enum):
174
"""
175
Saga execution status states.
176
177
Values:
178
Created: Initial state before execution
179
Running: Currently executing steps
180
Paused: Execution paused waiting for response
181
Finished: Successfully completed all steps
182
Errored: Execution failed with error
183
"""
184
Created = "created"
185
Running = "running"
186
Paused = "paused"
187
Finished = "finished"
188
Errored = "errored"
189
190
class SagaStepStatus(Enum):
191
"""
192
Individual step execution status states.
193
194
Values:
195
Created: Step created but not started
196
RunningOnExecute: Executing main operation
197
FinishedOnExecute: Main operation completed
198
ErroredOnExecute: Main operation failed
199
PausedByOnExecute: Paused by main operation
200
ErroredByOnExecute: Error in main operation
201
RunningOnFailure: Executing failure compensation
202
PausedOnFailure: Paused during failure handling
203
ErroredOnFailure: Failure compensation failed
204
RunningOnSuccess: Processing successful response
205
ErroredOnSuccess: Success handler failed
206
RunningOnError: Processing error response
207
ErroredOnError: Error handler failed
208
Finished: Step completed successfully
209
"""
210
Created = "created"
211
RunningOnExecute = "running-on-execute"
212
FinishedOnExecute = "finished-on-execute"
213
ErroredOnExecute = "errored-on-execute"
214
PausedByOnExecute = "paused-by-on-execute"
215
ErroredByOnExecute = "errored-by-on-execute"
216
RunningOnFailure = "running-on-failure"
217
PausedOnFailure = "paused-on-failure"
218
ErroredOnFailure = "errored-on-failure"
219
RunningOnSuccess = "running-on-success"
220
ErroredOnSuccess = "errored-on-success"
221
RunningOnError = "running-on-error"
222
ErroredOnError = "errored-on-error"
223
Finished = "finished"
224
```
225
226
### Step Execution Classes
227
228
Runtime execution instances for different step types.
229
230
```python { .api }
231
from abc import ABC, abstractmethod
232
233
class SagaStepExecution(ABC):
234
"""
235
Base class for step execution instances.
236
237
Attributes:
238
uuid (UUID): Step execution identifier
239
definition (SagaStep): Step definition being executed
240
status (SagaStepStatus): Current step status
241
"""
242
def __init__(self, definition, uuid, status=SagaStepStatus.Created, **kwargs):
243
"""Initialize step execution with definition."""
244
245
class LocalSagaStepExecution(SagaStepExecution):
246
"""
247
Execution instance for local steps.
248
249
Handles local function execution within the same service process.
250
"""
251
def __init__(self, definition, uuid, status=SagaStepStatus.Created, **kwargs):
252
"""Initialize local step execution."""
253
254
class RemoteSagaStepExecution(SagaStepExecution):
255
"""
256
Execution instance for remote steps.
257
258
Handles remote service calls with request/response coordination.
259
"""
260
def __init__(self, definition, uuid, status=SagaStepStatus.Created, **kwargs):
261
"""Initialize remote step execution."""
262
263
class ConditionalSagaStepExecution(SagaStepExecution):
264
"""
265
Execution instance for conditional steps.
266
267
Handles conditional logic evaluation and nested saga execution.
268
"""
269
def __init__(self, definition, uuid, status=SagaStepStatus.Created, **kwargs):
270
"""Initialize conditional step execution."""
271
```
272
273
### Execution Persistence
274
275
Repository system for durable saga execution storage and recovery.
276
277
```python { .api }
278
from abc import ABC, abstractmethod
279
280
class SagaExecutionRepository(ABC):
281
"""
282
Base class for saga execution persistence.
283
284
Provides durable storage for saga execution state enabling
285
pause/resume and recovery capabilities.
286
"""
287
@abstractmethod
288
def store(self, execution):
289
"""
290
Store saga execution to persistent storage.
291
292
Args:
293
execution (SagaExecution): Execution to store
294
"""
295
296
@abstractmethod
297
def load(self, uuid):
298
"""
299
Load saga execution from persistent storage.
300
301
Args:
302
uuid (Union[UUID, str]): Execution identifier
303
304
Returns:
305
SagaExecution: Loaded execution instance
306
307
Raises:
308
SagaExecutionNotFoundException: If execution not found
309
"""
310
311
@abstractmethod
312
def delete(self, uuid):
313
"""
314
Delete saga execution from persistent storage.
315
316
Args:
317
uuid (Union[UUID, str]): Execution identifier
318
"""
319
320
class DatabaseSagaExecutionRepository(SagaExecutionRepository):
321
"""Database implementation of saga execution repository."""
322
323
class SagaExecutionDatabaseOperationFactory:
324
"""Factory for database operations on saga executions."""
325
```
326
327
### Transaction Management
328
329
Two-phase commit protocol implementation for distributed transaction coordination.
330
331
```python { .api }
332
class TransactionCommitter:
333
"""
334
Manages two-phase commit protocol for saga transactions.
335
336
Coordinates distributed transaction commits across multiple
337
services participating in the saga execution.
338
339
Attributes:
340
execution_uuid (UUID): Execution identifier
341
executed_steps (list[SagaStepExecution]): Steps that participated
342
transactions (list[tuple[UUID, str]]): Transaction UUID and service pairs
343
"""
344
def __init__(self, execution_uuid, executed_steps, broker_publisher, broker_pool=None, **kwargs):
345
"""
346
Initialize committer with execution details.
347
348
Args:
349
execution_uuid (UUID): Saga execution identifier
350
executed_steps (list[SagaStepExecution]): Executed steps
351
broker_publisher: Message broker publisher
352
broker_pool: Optional broker connection pool
353
"""
354
355
def commit(self, **kwargs):
356
"""
357
Commit all transactions using two-phase commit protocol.
358
359
Sends commit messages to all participating services and
360
waits for confirmation of successful commitment.
361
362
Raises:
363
SagaFailedCommitCallbackException: If any service fails to commit
364
"""
365
366
def reject(self):
367
"""
368
Reject all transactions.
369
370
Sends reject messages to all participating services to
371
rollback their local transaction state.
372
"""
373
```
374
375
### Execution Coordinators
376
377
Specialized executors for different types of saga operations.
378
379
```python { .api }
380
class Executor:
381
"""
382
Base executor for saga operations.
383
384
Attributes:
385
execution_uuid (UUID): Execution identifier for transaction context
386
"""
387
def __init__(self, execution_uuid, *args, **kwargs):
388
"""Initialize executor with execution context."""
389
390
def exec(self, operation, *args, **kwargs):
391
"""Execute saga operation within transaction context."""
392
393
def exec_function(self, func, *args, **kwargs):
394
"""Execute function within transaction context."""
395
396
class LocalExecutor(Executor):
397
"""Executor for local operations within the same service."""
398
399
class RequestExecutor(Executor):
400
"""Executor for remote request operations to other services."""
401
402
class ResponseExecutor(Executor):
403
"""Executor for remote response processing from other services."""
404
```
405
406
## Usage Examples
407
408
### Basic Saga Execution
409
410
```python
411
from minos.saga import SagaManager, SagaExecution, SagaContext
412
from minos.saga.executions.repositories import DatabaseSagaExecutionRepository
413
414
# Initialize saga manager
415
storage = DatabaseSagaExecutionRepository(...)
416
manager = SagaManager(storage=storage, broker_pool=broker_pool)
417
418
# Execute saga with automatic lifecycle management
419
async def execute_order_saga(order_data):
420
saga_definition = create_order_saga()
421
context = SagaContext(order=order_data)
422
423
# Run with automatic commit and error handling
424
result = await manager.run(
425
definition=saga_definition,
426
context=context,
427
autocommit=True,
428
raise_on_error=True
429
)
430
431
return result
432
```
433
434
### Manual Execution Control
435
436
```python
437
from minos.saga import SagaExecution, SagaStatus
438
439
# Create execution manually for fine-grained control
440
execution = SagaExecution.from_definition(
441
definition=saga_definition,
442
context=SagaContext(data="initial"),
443
uuid=uuid4()
444
)
445
446
# Execute with manual transaction control
447
try:
448
context = await execution.execute(autocommit=False)
449
450
# Manual commit after validation
451
if validate_results(context):
452
await execution.commit()
453
else:
454
await execution.reject()
455
456
except Exception as e:
457
# Manual rollback on failure
458
await execution.rollback()
459
raise
460
```
461
462
### Pause and Resume Execution
463
464
```python
465
# Execute with pause-on-disk for background processing
466
execution_uuid = await manager.run(
467
definition=long_running_saga,
468
context=initial_context,
469
pause_on_disk=True, # Pause remote steps on disk
470
return_execution=False # Return UUID instead of execution
471
)
472
473
# Later, resume with response
474
response = SagaResponse(content={"result": "processed"})
475
final_context = await manager.run(
476
response=response,
477
pause_on_disk=True
478
)
479
```
480
481
### Status Monitoring
482
483
```python
484
from minos.saga import SagaStatus, SagaStepStatus
485
486
# Check execution status
487
execution = await storage.load(execution_uuid)
488
489
if execution.status == SagaStatus.Paused:
490
print(f"Execution paused at step: {execution.paused_step.uuid}")
491
print(f"Step status: {execution.paused_step.status}")
492
493
elif execution.status == SagaStatus.Finished:
494
print("Saga completed successfully")
495
print(f"Final context: {execution.context}")
496
497
elif execution.status == SagaStatus.Errored:
498
print("Saga execution failed")
499
# Trigger rollback if needed
500
await execution.rollback()
501
```
502
503
### Custom Repository Implementation
504
505
```python
506
from minos.saga.executions.repositories import SagaExecutionRepository
507
508
class CustomSagaRepository(SagaExecutionRepository):
509
def __init__(self, storage_backend):
510
self.storage = storage_backend
511
512
async def store(self, execution):
513
# Custom storage logic
514
await self.storage.save(execution.uuid, execution.raw)
515
516
async def load(self, uuid):
517
# Custom loading logic
518
raw_data = await self.storage.get(uuid)
519
if not raw_data:
520
raise SagaExecutionNotFoundException(f"Execution {uuid} not found")
521
return SagaExecution.from_raw(raw_data)
522
523
async def delete(self, uuid):
524
# Custom deletion logic
525
await self.storage.remove(uuid)
526
527
# Use custom repository
528
custom_repo = CustomSagaRepository(my_storage)
529
manager = SagaManager(storage=custom_repo)
530
```