0
# Saga Definitions
1
2
Core functionality for defining distributed transaction sequences with local and remote steps, conditional logic, and compensation behaviors. This module provides the declarative API for constructing sagas with proper validation and commitment semantics.
3
4
## Capabilities
5
6
### Saga Construction and Management
7
8
The main Saga class provides the primary interface for constructing distributed transaction definitions.
9
10
```python { .api }
11
class Saga:
12
"""
13
Main class for defining distributed transaction sequences.
14
15
Attributes:
16
steps (list[SagaStep]): List of saga steps in execution order
17
committed (bool): Whether saga is committed and ready for execution
18
raw (dict[str, Any]): Raw dictionary representation
19
"""
20
def __init__(self, *args, steps=None, committed=False, commit=None, **kwargs):
21
"""Initialize saga with optional steps and commit callback."""
22
23
def local_step(self, step=None, **kwargs):
24
"""
25
Add a local execution step.
26
27
Returns:
28
LocalSagaStep: New local step for method chaining
29
"""
30
31
def remote_step(self, step=None, **kwargs):
32
"""
33
Add a remote microservice step.
34
35
Returns:
36
RemoteSagaStep: New remote step for method chaining
37
"""
38
39
def conditional_step(self, step=None):
40
"""
41
Add a conditional step with if-then-else logic.
42
43
Returns:
44
ConditionalSagaStep: New conditional step for method chaining
45
"""
46
47
def commit(self, callback=None, **kwargs):
48
"""
49
Commit saga for execution. Once committed, saga cannot be modified.
50
51
Args:
52
callback: Optional commit callback function
53
54
Returns:
55
Saga: Self for chaining
56
57
Raises:
58
AlreadyCommittedException: If saga is already committed
59
EmptySagaException: If saga has no steps
60
"""
61
62
def validate(self):
63
"""
64
Validate saga definition.
65
66
Raises:
67
EmptySagaException: If saga has no steps
68
Various step validation exceptions
69
"""
70
71
@classmethod
72
def from_raw(cls, raw, **kwargs):
73
"""
74
Build saga from raw dictionary representation.
75
76
Args:
77
raw (dict): Raw saga definition
78
79
Returns:
80
Saga: Constructed saga instance
81
"""
82
```
83
84
### Saga Operations
85
86
Container class for saga step operations with callbacks and parameters.
87
88
```python { .api }
89
class SagaOperation:
90
"""
91
Container for saga step operations with callbacks and parameters.
92
93
Attributes:
94
callback (T): The callback function to execute
95
parameters (Optional[SagaContext]): Parameters to pass to callback
96
parameterized (bool): Whether parameters are provided
97
raw (dict[str, Any]): Raw representation
98
"""
99
def __init__(self, callback, parameters=None, **kwargs):
100
"""Initialize with callback function and optional parameters."""
101
102
@classmethod
103
def from_raw(cls, raw, **kwargs):
104
"""Build operation from raw representation."""
105
```
106
107
### Local Saga Steps
108
109
Steps for local execution within the same microservice.
110
111
```python { .api }
112
class LocalSagaStep(SagaStep):
113
"""
114
Step for local execution within same service.
115
116
Attributes:
117
on_execute_operation (Optional[SagaOperation[LocalCallback]]): Execute operation
118
on_failure_operation (Optional[SagaOperation[LocalCallback]]): Failure operation
119
"""
120
def __init__(self, on_execute=None, on_failure=None, **kwargs):
121
"""Initialize local step with optional callbacks."""
122
123
def on_execute(self, callback, parameters=None, **kwargs):
124
"""
125
Set execution callback for the step.
126
127
Args:
128
callback (LocalCallback): Function to execute locally
129
parameters (Optional[SagaContext]): Parameters for callback
130
131
Returns:
132
LocalSagaStep: Self for chaining
133
134
Raises:
135
MultipleOnExecuteException: If on_execute already set
136
"""
137
138
def on_failure(self, callback, parameters=None, **kwargs):
139
"""
140
Set failure compensation callback.
141
142
Args:
143
callback (LocalCallback): Function to execute on failure
144
parameters (Optional[SagaContext]): Parameters for callback
145
146
Returns:
147
LocalSagaStep: Self for chaining
148
149
Raises:
150
MultipleOnFailureException: If on_failure already set
151
"""
152
153
def validate(self):
154
"""
155
Validate step has required callbacks.
156
157
Raises:
158
UndefinedOnExecuteException: If no on_execute defined
159
EmptySagaStepException: If step has no actions
160
"""
161
```
162
163
### Remote Saga Steps
164
165
Steps for remote microservice calls with success/error handling.
166
167
```python { .api }
168
class RemoteSagaStep(SagaStep):
169
"""
170
Step for remote microservice calls.
171
172
Attributes:
173
on_execute_operation (Optional[SagaOperation[RequestCallBack]]): Execute operation
174
on_success_operation (Optional[SagaOperation[ResponseCallBack]]): Success operation
175
on_error_operation (Optional[SagaOperation[ResponseCallBack]]): Error operation
176
on_failure_operation (Optional[SagaOperation[RequestCallBack]]): Failure operation
177
"""
178
def __init__(self, on_execute=None, on_success=None, on_error=None, on_failure=None, **kwargs):
179
"""Initialize remote step with optional callbacks."""
180
181
def on_execute(self, callback, parameters=None, **kwargs):
182
"""
183
Set request callback for remote service call.
184
185
Args:
186
callback (RequestCallBack): Function that creates SagaRequest
187
parameters (Optional[SagaContext]): Parameters for callback
188
189
Returns:
190
RemoteSagaStep: Self for chaining
191
192
Raises:
193
MultipleOnExecuteException: If on_execute already set
194
"""
195
196
def on_success(self, callback, parameters=None, **kwargs):
197
"""
198
Set success response callback.
199
200
Args:
201
callback (ResponseCallBack): Function to handle successful response
202
parameters (Optional[SagaContext]): Parameters for callback
203
204
Returns:
205
RemoteSagaStep: Self for chaining
206
207
Raises:
208
MultipleOnSuccessException: If on_success already set
209
"""
210
211
def on_error(self, callback, parameters=None, **kwargs):
212
"""
213
Set error response callback for business logic errors.
214
215
Args:
216
callback (ResponseCallBack): Function to handle error response
217
parameters (Optional[SagaContext]): Parameters for callback
218
219
Returns:
220
RemoteSagaStep: Self for chaining
221
222
Raises:
223
MultipleOnErrorException: If on_error already set
224
"""
225
226
def on_failure(self, callback, parameters=None, **kwargs):
227
"""
228
Set failure compensation callback for rollback.
229
230
Args:
231
callback (RequestCallBack): Function to create compensation request
232
parameters (Optional[SagaContext]): Parameters for callback
233
234
Returns:
235
RemoteSagaStep: Self for chaining
236
237
Raises:
238
MultipleOnFailureException: If on_failure already set
239
"""
240
241
def validate(self):
242
"""
243
Validate step configuration.
244
245
Raises:
246
UndefinedOnExecuteException: If no on_execute defined
247
EmptySagaStepException: If step has no actions
248
"""
249
```
250
251
### Conditional Saga Steps
252
253
Steps for conditional saga execution based on runtime conditions.
254
255
```python { .api }
256
class ConditionalSagaStep(SagaStep):
257
"""
258
Step for conditional saga execution.
259
260
Attributes:
261
if_then_alternatives (list[IfThenAlternative]): List of conditional alternatives
262
else_then_alternative (ElseThenAlternative): Default alternative
263
"""
264
def __init__(self, if_then=None, else_then=None, **kwargs):
265
"""Initialize conditional step with optional alternatives."""
266
267
def if_then(self, condition, saga):
268
"""
269
Add if-then alternative with condition and saga.
270
271
Args:
272
condition: Condition function to evaluate
273
saga (Saga): Saga to execute if condition is true
274
275
Returns:
276
ConditionalSagaStep: Self for chaining
277
"""
278
279
def else_then(self, saga):
280
"""
281
Set else alternative with saga.
282
283
Args:
284
saga (Saga): Default saga to execute
285
286
Returns:
287
ConditionalSagaStep: Self for chaining
288
289
Raises:
290
MultipleElseThenException: If else_then already set
291
"""
292
293
def validate(self):
294
"""
295
Validate alternatives.
296
297
Raises:
298
EmptySagaStepException: If no alternatives defined
299
"""
300
301
class IfThenAlternative:
302
"""
303
Represents an if-then conditional alternative.
304
305
Attributes:
306
condition (SagaOperation): Condition operation to evaluate
307
saga (Saga): Saga to execute if condition is true
308
"""
309
def __init__(self, condition, saga):
310
"""Initialize with condition function and saga."""
311
312
@classmethod
313
def from_raw(cls, raw, **kwargs):
314
"""Build from raw representation."""
315
316
def validate(self):
317
"""Validate the alternative."""
318
319
class ElseThenAlternative:
320
"""
321
Represents an else alternative for conditional steps.
322
323
Attributes:
324
saga (Saga): Saga to execute as default
325
"""
326
def __init__(self, saga):
327
"""Initialize with saga."""
328
329
@classmethod
330
def from_raw(cls, raw, **kwargs):
331
"""Build from raw representation."""
332
333
def validate(self):
334
"""Validate the alternative."""
335
```
336
337
### Step Base Class
338
339
Abstract base class for all saga steps providing common functionality.
340
341
```python { .api }
342
from abc import ABC, abstractmethod
343
344
class SagaStep(ABC):
345
"""
346
Base class for all saga steps.
347
348
Attributes:
349
saga (Optional[Saga]): Reference to parent saga
350
"""
351
def __init__(self, saga=None, **kwargs):
352
"""Initialize step with optional saga reference."""
353
354
def conditional_step(self, *args, **kwargs):
355
"""Create new conditional step in saga."""
356
357
def local_step(self, *args, **kwargs):
358
"""Create new local step in saga."""
359
360
def remote_step(self, *args, **kwargs):
361
"""Create new remote step in saga."""
362
363
def commit(self, *args, **kwargs):
364
"""Commit the saga."""
365
366
@abstractmethod
367
def validate(self):
368
"""Abstract method to validate step."""
369
370
@classmethod
371
def from_raw(cls, raw, **kwargs):
372
"""Class method to build from raw representation."""
373
374
@property
375
@abstractmethod
376
def raw(self):
377
"""Abstract property for raw representation."""
378
```
379
380
## Usage Examples
381
382
### Basic Saga with Local and Remote Steps
383
384
```python
385
from minos.saga import Saga, SagaContext, SagaRequest
386
387
# Create a simple order processing saga
388
def create_order_saga():
389
saga = Saga()
390
391
# Local validation step
392
saga.local_step() \
393
.on_execute(validate_order) \
394
.on_failure(log_validation_failure)
395
396
# Remote payment step
397
saga.remote_step() \
398
.on_execute(create_payment_request) \
399
.on_success(handle_payment_success) \
400
.on_error(handle_payment_error) \
401
.on_failure(refund_payment)
402
403
return saga.commit()
404
405
def validate_order(context):
406
if not context.order.get('total'):
407
raise ValueError("Order total required")
408
return context
409
410
def create_payment_request(context):
411
return SagaRequest(
412
target="payment-service",
413
content={"amount": context.order["total"]}
414
)
415
416
def handle_payment_success(context, response):
417
context.payment_id = response.content()["id"]
418
return context
419
```
420
421
### Conditional Saga Logic
422
423
```python
424
def create_conditional_saga():
425
saga = Saga()
426
427
# Main processing step
428
saga.local_step().on_execute(process_data)
429
430
# Conditional step based on data type
431
conditional = saga.conditional_step()
432
433
# If premium customer
434
premium_saga = Saga()
435
premium_saga.remote_step().on_execute(process_premium_features)
436
conditional.if_then(is_premium_customer, premium_saga.commit())
437
438
# If regular customer
439
regular_saga = Saga()
440
regular_saga.remote_step().on_execute(process_regular_features)
441
conditional.else_then(regular_saga.commit())
442
443
return saga.commit()
444
445
def is_premium_customer(context):
446
return context.customer.get('tier') == 'premium'
447
448
def process_premium_features(context):
449
return SagaRequest(
450
target="premium-service",
451
content={"customer_id": context.customer["id"]}
452
)
453
```
454
455
### Complex Multi-Step Saga
456
457
```python
458
def create_booking_saga():
459
saga = Saga()
460
461
# Step 1: Validate booking request
462
saga.local_step() \
463
.on_execute(validate_booking) \
464
.on_failure(handle_validation_failure)
465
466
# Step 2: Reserve hotel room
467
saga.remote_step() \
468
.on_execute(reserve_room) \
469
.on_success(handle_room_reserved) \
470
.on_error(handle_room_error) \
471
.on_failure(cancel_room_reservation)
472
473
# Step 3: Book flight
474
saga.remote_step() \
475
.on_execute(book_flight) \
476
.on_success(handle_flight_booked) \
477
.on_error(handle_flight_error) \
478
.on_failure(cancel_flight)
479
480
# Step 4: Process payment
481
saga.remote_step() \
482
.on_execute(process_payment) \
483
.on_success(handle_payment_complete) \
484
.on_error(handle_payment_failed) \
485
.on_failure(refund_charges)
486
487
# Step 5: Send confirmation
488
saga.local_step().on_execute(send_confirmation)
489
490
return saga.commit()
491
```
492
493
## Type Definitions
494
495
```python { .api }
496
from typing import Callable, Union, Awaitable, Optional
497
498
# Callback function types
499
RequestCallBack = Callable[[SagaContext, ...], Union[SagaRequest, Awaitable[SagaRequest]]]
500
ResponseCallBack = Callable[[SagaContext, SagaResponse, ...], Union[Union[Exception, SagaContext], Awaitable[Union[Exception, SagaContext]]]]
501
LocalCallback = Callable[[SagaContext, ...], Union[Optional[SagaContext], Awaitable[Optional[SagaContext]]]]
502
```