0
# Exception Handling
1
2
Comprehensive exception hierarchy for saga definition validation, execution errors, and system failures with specific error types for different failure scenarios. This module provides structured error handling that enables proper saga rollback, debugging, and recovery mechanisms.
3
4
## Capabilities
5
6
### Exception Hierarchy
7
8
The saga exception system is organized into a hierarchical structure for precise error handling.
9
10
```python { .api }
11
class SagaException(MinosException):
12
"""
13
Base saga exception.
14
15
Root exception class for all saga-related errors. Extends MinosException
16
to integrate with the broader Minos framework error handling.
17
"""
18
19
class SagaStepException(SagaException):
20
"""
21
Base saga step exception.
22
23
Base class for errors related to saga step definition and validation.
24
"""
25
26
class SagaExecutionException(SagaException):
27
"""
28
Base saga execution exception.
29
30
Base class for errors that occur during saga execution runtime.
31
"""
32
33
class SagaStepExecutionException(SagaExecutionException):
34
"""
35
Base saga step execution exception.
36
37
Base class for errors that occur during individual step execution.
38
"""
39
```
40
41
### Definition and Validation Exceptions
42
43
Exceptions thrown during saga definition construction and validation.
44
45
```python { .api }
46
# Saga Definition Exceptions
47
class EmptySagaException(SagaException):
48
"""
49
Saga must have at least one step.
50
51
Raised when attempting to commit a saga that has no steps defined.
52
"""
53
54
class SagaNotCommittedException(SagaException):
55
"""
56
Saga must be committed before execution.
57
58
Raised when attempting to execute a saga that hasn't been committed.
59
"""
60
61
class AlreadyCommittedException(SagaException):
62
"""
63
Cannot modify committed saga.
64
65
Raised when attempting to add steps or modify a saga that has
66
already been committed for execution.
67
"""
68
69
class SagaNotDefinedException(SagaStepException):
70
"""
71
Step must have saga instance.
72
73
Raised when a saga step operation is called but the step
74
doesn't have a reference to its parent saga.
75
"""
76
77
class EmptySagaStepException(SagaStepException):
78
"""
79
Step must have at least one action.
80
81
Raised when a saga step has no operations defined (no on_execute,
82
on_success, on_error, or on_failure callbacks).
83
"""
84
85
class AlreadyOnSagaException(SagaStepException):
86
"""
87
Step can only belong to one saga.
88
89
Raised when attempting to add a step that's already part of
90
another saga definition.
91
"""
92
93
class UndefinedOnExecuteException(SagaStepException):
94
"""
95
Step must define on_execute logic.
96
97
Raised when a step lacks the required on_execute callback
98
that defines the primary step operation.
99
"""
100
101
# Callback Validation Exceptions
102
class MultipleOnExecuteException(SagaStepException):
103
"""
104
Only one on_execute method allowed.
105
106
Raised when attempting to define multiple on_execute callbacks
107
for a single saga step.
108
"""
109
110
class MultipleOnFailureException(SagaStepException):
111
"""
112
Only one on_failure method allowed.
113
114
Raised when attempting to define multiple on_failure callbacks
115
for a single saga step.
116
"""
117
118
class MultipleOnSuccessException(SagaStepException):
119
"""
120
Only one on_success method allowed.
121
122
Raised when attempting to define multiple on_success callbacks
123
for a single remote saga step.
124
"""
125
126
class MultipleOnErrorException(SagaStepException):
127
"""
128
Only one on_error method allowed.
129
130
Raised when attempting to define multiple on_error callbacks
131
for a single remote saga step.
132
"""
133
134
class MultipleElseThenException(SagaStepException):
135
"""
136
Only one else_then method allowed.
137
138
Raised when attempting to define multiple else_then alternatives
139
for a single conditional saga step.
140
"""
141
```
142
143
### Execution Runtime Exceptions
144
145
Exceptions that occur during saga execution runtime.
146
147
```python { .api }
148
# Execution Management Exceptions
149
class SagaExecutionNotFoundException(SagaExecutionException):
150
"""
151
Execution not found in storage.
152
153
Raised when attempting to load a saga execution that doesn't
154
exist in the configured repository.
155
"""
156
157
class SagaExecutionAlreadyExecutedException(SagaExecutionException):
158
"""
159
Cannot re-execute finished execution.
160
161
Raised when attempting to execute a saga that has already
162
completed (finished or errored status).
163
"""
164
165
# Execution Failure Exceptions
166
class SagaFailedExecutionException(SagaExecutionException):
167
"""
168
Execution failed during running.
169
170
Raised when saga execution encounters an unrecoverable error
171
during step processing.
172
"""
173
174
class SagaFailedExecutionStepException(SagaStepExecutionException):
175
"""
176
Step failed during execution.
177
178
Raised when an individual saga step fails to execute properly.
179
"""
180
181
class SagaPausedExecutionStepException(SagaStepExecutionException):
182
"""
183
Step paused during execution.
184
185
Raised when a saga step requires a pause for external response
186
(typically remote steps waiting for service responses).
187
"""
188
189
# Rollback Exceptions
190
class SagaRollbackExecutionException(SagaExecutionException):
191
"""
192
Failed during rollback process.
193
194
Raised when saga rollback (compensation) operations fail.
195
"""
196
197
class SagaRollbackExecutionStepException(SagaStepExecutionException):
198
"""
199
Step failed during rollback.
200
201
Raised when an individual step's compensation logic fails
202
during saga rollback.
203
"""
204
205
# Transaction Management Exceptions
206
class SagaFailedCommitCallbackException(SagaExecutionException):
207
"""
208
Commit callback raised exception.
209
210
Raised when the saga commit callback function encounters
211
an error during transaction commitment.
212
"""
213
214
# Response Handling Exceptions
215
class SagaResponseException(SagaException):
216
"""
217
Response status not SUCCESS.
218
219
Raised when a remote service responds with an error status
220
that cannot be handled by the step's error handlers.
221
"""
222
```
223
224
## Usage Examples
225
226
### Exception Handling in Saga Definition
227
228
```python
229
from minos.saga import (
230
Saga, SagaException, EmptySagaException,
231
AlreadyCommittedException, UndefinedOnExecuteException
232
)
233
234
def create_saga_with_validation():
235
"""Create saga with proper exception handling."""
236
try:
237
saga = Saga()
238
239
# This would raise EmptySagaException
240
# saga.commit() # Uncommenting would fail
241
242
# Add required steps
243
saga.local_step().on_execute(validate_order)
244
saga.remote_step().on_execute(process_payment)
245
246
# Commit saga
247
committed_saga = saga.commit()
248
249
# This would raise AlreadyCommittedException
250
# saga.local_step() # Uncommenting would fail
251
252
return committed_saga
253
254
except EmptySagaException:
255
print("Cannot commit saga without steps")
256
raise
257
except AlreadyCommittedException:
258
print("Cannot modify saga after commit")
259
raise
260
except UndefinedOnExecuteException:
261
print("Step missing required on_execute callback")
262
raise
263
264
def create_step_with_validation():
265
"""Create saga step with callback validation."""
266
try:
267
saga = Saga()
268
step = saga.local_step()
269
270
# Define on_execute
271
step.on_execute(process_data)
272
273
# This would raise MultipleOnExecuteException
274
# step.on_execute(other_function) # Uncommenting would fail
275
276
return saga.commit()
277
278
except MultipleOnExecuteException:
279
print("Step already has on_execute callback defined")
280
raise
281
```
282
283
### Exception Handling in Saga Execution
284
285
```python
286
from minos.saga import (
287
SagaManager, SagaExecution, SagaFailedExecutionException,
288
SagaPausedExecutionStepException, SagaRollbackExecutionException
289
)
290
291
async def execute_saga_with_error_handling(saga_definition, context):
292
"""Execute saga with comprehensive error handling."""
293
manager = SagaManager(storage=repo, broker_pool=broker)
294
295
try:
296
# Attempt saga execution
297
result = await manager.run(
298
definition=saga_definition,
299
context=context,
300
autocommit=True,
301
raise_on_error=True
302
)
303
304
print("Saga completed successfully")
305
return result
306
307
except SagaFailedExecutionException as e:
308
print(f"Saga execution failed: {e}")
309
310
# Attempt to load execution for inspection
311
try:
312
execution = await repo.load(e.execution_uuid)
313
print(f"Failed at step: {execution.paused_step}")
314
print(f"Execution status: {execution.status}")
315
316
# Trigger rollback
317
await execution.rollback()
318
print("Rollback completed")
319
320
except SagaRollbackExecutionException as rollback_error:
321
print(f"Rollback also failed: {rollback_error}")
322
# Manual cleanup may be required
323
324
raise
325
326
except SagaPausedExecutionStepException as e:
327
print(f"Saga paused at step: {e.step_uuid}")
328
329
# Handle pause - could resume later with response
330
print("Saga execution paused, will resume when response arrives")
331
return e.execution_uuid # Return for later resumption
332
333
except Exception as e:
334
print(f"Unexpected error during saga execution: {e}")
335
raise
336
```
337
338
### Step-Level Exception Handling
339
340
```python
341
from minos.saga import SagaContext, SagaRequest, SagaResponse
342
343
def handle_step_with_exceptions(context):
344
"""Local step with exception handling."""
345
try:
346
# Validate required context data
347
if not context.get("order_id"):
348
raise ValueError("Order ID is required")
349
350
if not context.get("customer_id"):
351
raise ValueError("Customer ID is required")
352
353
# Process business logic
354
result = process_order_validation(context)
355
356
# Update context with results
357
context.validation_result = result
358
context.validation_status = "passed"
359
360
return context
361
362
except ValueError as e:
363
# Business logic error - log and propagate
364
print(f"Validation error: {e}")
365
context.validation_error = str(e)
366
context.validation_status = "failed"
367
raise # Will cause saga to fail and rollback
368
369
except Exception as e:
370
# Unexpected error - log and propagate
371
print(f"Unexpected validation error: {e}")
372
context.validation_error = f"System error: {str(e)}"
373
context.validation_status = "error"
374
raise
375
376
def handle_remote_step_errors(context, response):
377
"""Remote step success handler with error checking."""
378
try:
379
# Check response status
380
if not response.ok:
381
error_data = await response.content()
382
error_message = error_data.get("message", "Unknown error")
383
384
# Handle different error types
385
if response.status == 400: # Business error
386
raise ValueError(f"Business error: {error_message}")
387
elif response.status == 500: # System error
388
raise RuntimeError(f"System error: {error_message}")
389
else:
390
raise Exception(f"Unexpected error: {error_message}")
391
392
# Process successful response
393
data = await response.content()
394
context.update(data)
395
return context
396
397
except ValueError as e:
398
# Business error - update context and propagate
399
context.business_error = str(e)
400
raise
401
402
except RuntimeError as e:
403
# System error - update context and propagate
404
context.system_error = str(e)
405
raise
406
407
except Exception as e:
408
# Unexpected error
409
context.unexpected_error = str(e)
410
raise
411
```
412
413
### Compensation Exception Handling
414
415
```python
416
def create_saga_with_compensation_handling():
417
"""Create saga with robust compensation error handling."""
418
saga = Saga()
419
420
# Step 1: Reserve inventory
421
saga.remote_step() \
422
.on_execute(reserve_inventory) \
423
.on_success(handle_reservation_success) \
424
.on_error(handle_reservation_error) \
425
.on_failure(release_inventory_with_error_handling)
426
427
# Step 2: Process payment
428
saga.remote_step() \
429
.on_execute(process_payment) \
430
.on_success(handle_payment_success) \
431
.on_error(handle_payment_error) \
432
.on_failure(refund_payment_with_error_handling)
433
434
return saga.commit()
435
436
def release_inventory_with_error_handling(context):
437
"""Compensation with error handling."""
438
try:
439
if not hasattr(context, 'reservation_id'):
440
print("No reservation to release")
441
return None
442
443
return SagaRequest(
444
target="inventory-service",
445
content={
446
"action": "release",
447
"reservation_id": context.reservation_id
448
}
449
)
450
451
except Exception as e:
452
# Log compensation error but don't fail saga rollback
453
print(f"Error creating inventory release request: {e}")
454
context.compensation_errors = context.get("compensation_errors", [])
455
context.compensation_errors.append(f"inventory_release: {str(e)}")
456
457
# Return None to skip this compensation
458
return None
459
460
def refund_payment_with_error_handling(context):
461
"""Payment refund compensation with error handling."""
462
try:
463
if not hasattr(context, 'payment_id'):
464
print("No payment to refund")
465
return None
466
467
return SagaRequest(
468
target="payment-service",
469
content={
470
"action": "refund",
471
"payment_id": context.payment_id,
472
"amount": context.charged_amount,
473
"reason": "saga_rollback"
474
}
475
)
476
477
except Exception as e:
478
print(f"Error creating refund request: {e}")
479
context.compensation_errors = context.get("compensation_errors", [])
480
context.compensation_errors.append(f"payment_refund: {str(e)}")
481
482
# Critical compensation - still return request even if there are issues
483
return SagaRequest(
484
target="payment-service",
485
content={
486
"action": "refund",
487
"payment_id": context.get("payment_id", "unknown"),
488
"reason": "saga_rollback_fallback"
489
}
490
)
491
```
492
493
### Exception Recovery Patterns
494
495
```python
496
from minos.saga import SagaExecutionNotFoundException
497
498
async def resume_failed_saga(execution_uuid):
499
"""Attempt to resume or recover a failed saga execution."""
500
try:
501
# Try to load the execution
502
execution = await repo.load(execution_uuid)
503
504
# Check execution status
505
if execution.status == SagaStatus.Errored:
506
print("Execution is in error state, attempting rollback")
507
await execution.rollback()
508
509
elif execution.status == SagaStatus.Paused:
510
print("Execution is paused, may be resumable")
511
# Could resume with appropriate response
512
513
elif execution.status == SagaStatus.Finished:
514
print("Execution already completed")
515
516
return execution
517
518
except SagaExecutionNotFoundException:
519
print(f"Execution {execution_uuid} not found in storage")
520
return None
521
522
except SagaRollbackExecutionException as e:
523
print(f"Rollback failed: {e}")
524
# May need manual intervention
525
return None
526
527
except Exception as e:
528
print(f"Unexpected error during recovery: {e}")
529
return None
530
531
def create_resilient_saga():
532
"""Create saga with multiple fallback mechanisms."""
533
saga = Saga()
534
535
# Primary processing step
536
saga.remote_step() \
537
.on_execute(primary_processing) \
538
.on_success(handle_primary_success) \
539
.on_error(try_fallback_processing) \
540
.on_failure(cleanup_primary_attempt)
541
542
return saga.commit()
543
544
def try_fallback_processing(context, response):
545
"""Error handler that attempts fallback processing."""
546
try:
547
error_data = await response.content()
548
549
# Check if error is recoverable
550
if error_data.get("error_code") == "TEMPORARY_UNAVAILABLE":
551
# Mark for retry
552
context.retry_primary = True
553
context.retry_count = context.get("retry_count", 0) + 1
554
555
if context.retry_count < 3:
556
# Could trigger retry logic
557
return context
558
559
# Try alternative processing
560
context.use_fallback = True
561
return context
562
563
except Exception as e:
564
# Fallback attempt failed
565
print(f"Fallback processing failed: {e}")
566
return Exception(f"Both primary and fallback processing failed: {e}")
567
568
def cleanup_primary_attempt(context):
569
"""Cleanup after failed primary processing attempt."""
570
try:
571
cleanup_requests = []
572
573
# Cleanup any partial state
574
if hasattr(context, 'temp_resources'):
575
cleanup_requests.append(SagaRequest(
576
target="resource-service",
577
content={"action": "cleanup", "resources": context.temp_resources}
578
))
579
580
# Return first cleanup request (saga will handle sequentially)
581
return cleanup_requests[0] if cleanup_requests else None
582
583
except Exception as e:
584
print(f"Cleanup failed: {e}")
585
# Don't fail the saga rollback due to cleanup issues
586
return None
587
```
588
589
### Exception Monitoring and Alerting
590
591
```python
592
import logging
593
from datetime import datetime
594
595
# Configure saga exception logging
596
logging.basicConfig(level=logging.INFO)
597
saga_logger = logging.getLogger("saga.exceptions")
598
599
def log_saga_exception(exception, context=None, execution_uuid=None):
600
"""Log saga exception with context information."""
601
log_data = {
602
"timestamp": datetime.utcnow().isoformat(),
603
"exception_type": type(exception).__name__,
604
"exception_message": str(exception),
605
"execution_uuid": str(execution_uuid) if execution_uuid else None
606
}
607
608
if context:
609
log_data["context_keys"] = list(context.keys())
610
log_data["context_size"] = len(context)
611
612
saga_logger.error(f"Saga exception occurred: {log_data}")
613
614
def create_monitored_saga():
615
"""Create saga with exception monitoring."""
616
saga = Saga()
617
618
saga.local_step().on_execute(monitored_local_step)
619
saga.remote_step() \
620
.on_execute(monitored_remote_step) \
621
.on_success(monitored_success_handler) \
622
.on_error(monitored_error_handler)
623
624
return saga.commit()
625
626
def monitored_local_step(context):
627
"""Local step with exception monitoring."""
628
try:
629
# Business logic here
630
result = process_business_logic(context)
631
return result
632
633
except Exception as e:
634
log_saga_exception(e, context)
635
636
# Could send alert to monitoring system
637
send_alert(f"Local step failed: {e}")
638
639
raise # Re-raise to trigger saga rollback
640
641
def monitored_error_handler(context, response):
642
"""Error handler with monitoring."""
643
try:
644
error_data = await response.content()
645
646
# Log the error with full context
647
log_saga_exception(
648
Exception(f"Remote service error: {error_data}"),
649
context,
650
context.get("execution_uuid")
651
)
652
653
# Handle the error
654
return Exception(f"Service error: {error_data.get('message')}")
655
656
except Exception as e:
657
log_saga_exception(e, context)
658
raise
659
660
def send_alert(message):
661
"""Send alert to monitoring system."""
662
# Integration with monitoring/alerting system
663
print(f"ALERT: {message}")
664
```