0
# Context Management
1
2
Stateful execution context that maintains data across saga steps with dictionary-like interface and automatic persistence. The context serves as the primary mechanism for passing data between saga steps and maintaining execution state throughout the distributed transaction lifecycle.
3
4
## Capabilities
5
6
### Saga Context
7
8
The core context class that acts as a stateful container for saga execution data.
9
10
```python { .api }
11
from minos.common import BucketModel
12
from collections.abc import MutableMapping
13
14
class SagaContext(BucketModel, MutableMapping):
15
"""
16
Execution state container with dict-like interface.
17
18
Provides both dictionary-style and attribute-style access to data,
19
with automatic field creation and persistence capabilities.
20
Extends BucketModel for serialization and MutableMapping for dict operations.
21
"""
22
def __init__(self, **kwargs):
23
"""
24
Initialize context with key-value pairs.
25
26
Args:
27
**kwargs: Initial context data as keyword arguments
28
29
Example:
30
context = SagaContext(order_id=123, customer="john", amount=99.99)
31
"""
32
33
def __setitem__(self, key, value):
34
"""
35
Set context value using dictionary syntax.
36
37
Args:
38
key (str): Context key
39
value: Value to store
40
41
Example:
42
context["order_id"] = 123
43
"""
44
45
def __getitem__(self, key):
46
"""
47
Get context value using dictionary syntax.
48
49
Args:
50
key (str): Context key
51
52
Returns:
53
Value stored at key
54
55
Raises:
56
KeyError: If key not found
57
58
Example:
59
order_id = context["order_id"]
60
"""
61
62
def __delitem__(self, key):
63
"""
64
Delete context value using dictionary syntax.
65
66
Args:
67
key (str): Context key to delete
68
69
Raises:
70
KeyError: If key not found
71
72
Example:
73
del context["temp_data"]
74
"""
75
76
def __setattr__(self, key, value):
77
"""
78
Set context value using attribute syntax.
79
80
Args:
81
key (str): Attribute name
82
value: Value to store
83
84
Example:
85
context.order_id = 123
86
"""
87
88
def __delattr__(self, key):
89
"""
90
Delete context value using attribute syntax.
91
92
Args:
93
key (str): Attribute name to delete
94
95
Raises:
96
AttributeError: If attribute not found
97
98
Example:
99
del context.temp_data
100
"""
101
102
def __contains__(self, key):
103
"""
104
Check if key exists in context.
105
106
Args:
107
key (str): Key to check
108
109
Returns:
110
bool: True if key exists
111
112
Example:
113
if "order_id" in context:
114
process_order(context.order_id)
115
"""
116
117
def __iter__(self):
118
"""
119
Iterate over context keys.
120
121
Returns:
122
Iterator over context keys
123
124
Example:
125
for key in context:
126
print(f"{key}: {context[key]}")
127
"""
128
129
def __len__(self):
130
"""
131
Get number of items in context.
132
133
Returns:
134
int: Number of context items
135
136
Example:
137
if len(context) > 0:
138
process_context(context)
139
"""
140
141
def keys(self):
142
"""
143
Get context keys.
144
145
Returns:
146
dict_keys: Context keys
147
"""
148
149
def values(self):
150
"""
151
Get context values.
152
153
Returns:
154
dict_values: Context values
155
"""
156
157
def items(self):
158
"""
159
Get context key-value pairs.
160
161
Returns:
162
dict_items: Key-value pairs
163
"""
164
165
def get(self, key, default=None):
166
"""
167
Get context value with default.
168
169
Args:
170
key (str): Context key
171
default: Default value if key not found
172
173
Returns:
174
Value at key or default
175
176
Example:
177
amount = context.get("amount", 0.0)
178
"""
179
180
def update(self, other=None, **kwargs):
181
"""
182
Update context with another mapping or keyword arguments.
183
184
Args:
185
other: Mapping to update from
186
**kwargs: Additional key-value pairs
187
188
Example:
189
context.update({"status": "processing"}, priority="high")
190
"""
191
192
def pop(self, key, default=None):
193
"""
194
Remove and return context value.
195
196
Args:
197
key (str): Context key
198
default: Default value if key not found
199
200
Returns:
201
Removed value or default
202
203
Example:
204
temp_value = context.pop("temp_data")
205
"""
206
207
def clear(self):
208
"""
209
Remove all context data.
210
211
Example:
212
context.clear() # Reset context
213
"""
214
```
215
216
## Usage Examples
217
218
### Basic Context Operations
219
220
```python
221
from minos.saga import SagaContext
222
223
# Initialize context with data
224
context = SagaContext(
225
order_id=12345,
226
customer_id="cust_456",
227
items=[{"sku": "ITEM1", "quantity": 2}],
228
total=99.99
229
)
230
231
# Dictionary-style access
232
print(f"Order ID: {context['order_id']}")
233
context["status"] = "processing"
234
235
# Attribute-style access
236
print(f"Customer: {context.customer_id}")
237
context.payment_method = "credit_card"
238
239
# Check for data
240
if "discount" in context:
241
apply_discount(context.discount)
242
243
# Get with default
244
shipping_cost = context.get("shipping_cost", 0.0)
245
```
246
247
### Context in Saga Steps
248
249
```python
250
from minos.saga import Saga, SagaContext, SagaRequest
251
252
def create_order_processing_saga():
253
saga = Saga()
254
255
# Step 1: Validate order
256
saga.local_step().on_execute(validate_order_data)
257
258
# Step 2: Reserve inventory
259
saga.remote_step() \
260
.on_execute(create_inventory_request) \
261
.on_success(handle_inventory_success) \
262
.on_failure(release_reservation)
263
264
# Step 3: Process payment
265
saga.remote_step() \
266
.on_execute(create_payment_request) \
267
.on_success(handle_payment_success) \
268
.on_failure(refund_payment)
269
270
return saga.commit()
271
272
# Context flows through all steps
273
def validate_order_data(context):
274
"""Local step - modify context directly"""
275
if not context.get("total") or context.total <= 0:
276
raise ValueError("Invalid order total")
277
278
# Add validation timestamp
279
context.validated_at = datetime.utcnow().isoformat()
280
return context
281
282
def create_inventory_request(context):
283
"""Remote step - use context to create request"""
284
return SagaRequest(
285
target="inventory-service",
286
content={
287
"items": context.items,
288
"order_id": context.order_id
289
}
290
)
291
292
def handle_inventory_success(context, response):
293
"""Success handler - update context with response data"""
294
reservation_data = response.content()
295
context.reservation_id = reservation_data["reservation_id"]
296
context.inventory_status = "reserved"
297
return context
298
299
def create_payment_request(context):
300
"""Payment step - access multiple context fields"""
301
return SagaRequest(
302
target="payment-service",
303
content={
304
"amount": context.total,
305
"customer_id": context.customer_id,
306
"order_id": context.order_id,
307
"payment_method": context.get("payment_method", "default")
308
}
309
)
310
311
def handle_payment_success(context, response):
312
"""Update context with payment details"""
313
payment_data = response.content()
314
context.payment_id = payment_data["payment_id"]
315
context.transaction_id = payment_data["transaction_id"]
316
context.payment_status = "completed"
317
return context
318
```
319
320
### Context Persistence and Recovery
321
322
```python
323
from minos.saga import SagaManager, SagaExecution
324
import json
325
326
# Context is automatically persisted with execution state
327
async def demonstrate_persistence():
328
# Initial context
329
context = SagaContext(
330
process_id="proc_123",
331
data={"key": "value"},
332
step_count=0
333
)
334
335
# Execute saga with pause capability
336
manager = SagaManager(storage=repo, broker_pool=broker)
337
execution_uuid = await manager.run(
338
definition=long_running_saga,
339
context=context,
340
pause_on_disk=True, # Context saved to disk
341
return_execution=False
342
)
343
344
# Later - reload execution and context
345
execution = await repo.load(execution_uuid)
346
recovered_context = execution.context
347
348
# Context is fully restored
349
print(f"Process ID: {recovered_context.process_id}")
350
print(f"Data: {recovered_context.data}")
351
print(f"Step count: {recovered_context.step_count}")
352
353
# Continue execution with recovered context
354
final_result = await manager.run(
355
response=some_response,
356
pause_on_disk=True
357
)
358
```
359
360
### Advanced Context Patterns
361
362
```python
363
# Nested data structures
364
context = SagaContext()
365
context.customer = {
366
"id": "cust_123",
367
"name": "John Doe",
368
"tier": "premium"
369
}
370
context.order = {
371
"items": [
372
{"sku": "ITEM1", "qty": 2, "price": 25.00},
373
{"sku": "ITEM2", "qty": 1, "price": 49.99}
374
],
375
"shipping": {
376
"address": "123 Main St",
377
"method": "express"
378
}
379
}
380
381
# Access nested data
382
customer_tier = context.customer["tier"]
383
first_item_sku = context.order["items"][0]["sku"]
384
shipping_method = context.order["shipping"]["method"]
385
386
# Conditional logic based on context
387
def determine_processing_flow(context):
388
if context.customer.get("tier") == "premium":
389
return "express_processing"
390
elif context.order.get("total", 0) > 100:
391
return "standard_processing"
392
else:
393
return "basic_processing"
394
395
# Context-driven step selection
396
def create_dynamic_saga(context):
397
saga = Saga()
398
399
processing_type = determine_processing_flow(context)
400
context.processing_type = processing_type
401
402
if processing_type == "express_processing":
403
saga.remote_step().on_execute(express_process_order)
404
else:
405
saga.remote_step().on_execute(standard_process_order)
406
407
return saga.commit()
408
```
409
410
### Context Validation and Type Safety
411
412
```python
413
from typing import Dict, List, Optional, Any
414
415
def validate_context_schema(context: SagaContext) -> bool:
416
"""Validate context has required fields with correct types."""
417
418
required_fields = {
419
"order_id": (str, int),
420
"customer_id": str,
421
"total": (int, float),
422
"items": list
423
}
424
425
for field, expected_type in required_fields.items():
426
if field not in context:
427
raise ValueError(f"Required field '{field}' missing from context")
428
429
if not isinstance(context[field], expected_type):
430
raise TypeError(f"Field '{field}' must be of type {expected_type}")
431
432
return True
433
434
def create_validated_saga():
435
saga = Saga()
436
437
# Add validation step at the beginning
438
saga.local_step().on_execute(validate_context_schema)
439
440
# Continue with business logic
441
saga.remote_step().on_execute(process_validated_order)
442
443
return saga.commit()
444
445
# Context factory pattern
446
class OrderContextFactory:
447
@staticmethod
448
def create_order_context(order_data: Dict[str, Any]) -> SagaContext:
449
"""Create standardized order context."""
450
return SagaContext(
451
order_id=order_data["id"],
452
customer_id=order_data["customer_id"],
453
items=order_data["items"],
454
total=sum(item["price"] * item["quantity"] for item in order_data["items"]),
455
created_at=datetime.utcnow().isoformat(),
456
status="pending"
457
)
458
459
@staticmethod
460
def create_payment_context(payment_data: Dict[str, Any], order_context: SagaContext) -> SagaContext:
461
"""Extend order context with payment data."""
462
order_context.update(
463
payment_method=payment_data["method"],
464
payment_token=payment_data["token"],
465
billing_address=payment_data["billing_address"]
466
)
467
return order_context
468
469
# Usage
470
order_data = {"id": "ord_123", "customer_id": "cust_456", "items": [...]}
471
context = OrderContextFactory.create_order_context(order_data)
472
```
473
474
### Context Debugging and Monitoring
475
476
```python
477
import json
478
from datetime import datetime
479
480
def log_context_state(context: SagaContext, step_name: str):
481
"""Log context state for debugging."""
482
print(f"[{datetime.utcnow()}] Context at {step_name}:")
483
for key, value in context.items():
484
print(f" {key}: {value}")
485
print()
486
487
def create_monitored_saga():
488
saga = Saga()
489
490
# Add monitoring to each step
491
saga.local_step().on_execute(lambda ctx: log_context_state(ctx, "start") or validate_order(ctx))
492
493
saga.remote_step() \
494
.on_execute(lambda ctx: log_context_state(ctx, "payment_request") or create_payment_request(ctx)) \
495
.on_success(lambda ctx, resp: log_context_state(ctx, "payment_success") or handle_payment_success(ctx, resp))
496
497
return saga.commit()
498
499
# Context snapshots for debugging
500
class ContextSnapshot:
501
def __init__(self, context: SagaContext, step_name: str):
502
self.step_name = step_name
503
self.timestamp = datetime.utcnow().isoformat()
504
self.data = dict(context) # Create snapshot copy
505
506
def to_json(self) -> str:
507
return json.dumps({
508
"step": self.step_name,
509
"timestamp": self.timestamp,
510
"data": self.data
511
}, indent=2)
512
513
# Usage in saga steps
514
def debug_step_with_snapshot(context):
515
snapshot = ContextSnapshot(context, "debug_checkpoint")
516
print(snapshot.to_json())
517
518
# Continue with step logic
519
context.debug_checkpoint = snapshot.timestamp
520
return context
521
```