or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

context-management.mdexception-handling.mdexecution-engine.mdindex.mdmessage-system.mdsaga-definitions.mdtesting-utilities.md

context-management.mddocs/

0

# Context Management

1

2

Stateful execution context that maintains data across saga steps with dictionary-like interface and automatic persistence. The context serves as the primary mechanism for passing data between saga steps and maintaining execution state throughout the distributed transaction lifecycle.

3

4

## Capabilities

5

6

### Saga Context

7

8

The core context class that acts as a stateful container for saga execution data.

9

10

```python { .api }

11

from minos.common import BucketModel

12

from collections.abc import MutableMapping

13

14

class SagaContext(BucketModel, MutableMapping):

15

"""

16

Execution state container with dict-like interface.

17

18

Provides both dictionary-style and attribute-style access to data,

19

with automatic field creation and persistence capabilities.

20

Extends BucketModel for serialization and MutableMapping for dict operations.

21

"""

22

def __init__(self, **kwargs):

23

"""

24

Initialize context with key-value pairs.

25

26

Args:

27

**kwargs: Initial context data as keyword arguments

28

29

Example:

30

context = SagaContext(order_id=123, customer="john", amount=99.99)

31

"""

32

33

def __setitem__(self, key, value):

34

"""

35

Set context value using dictionary syntax.

36

37

Args:

38

key (str): Context key

39

value: Value to store

40

41

Example:

42

context["order_id"] = 123

43

"""

44

45

def __getitem__(self, key):

46

"""

47

Get context value using dictionary syntax.

48

49

Args:

50

key (str): Context key

51

52

Returns:

53

Value stored at key

54

55

Raises:

56

KeyError: If key not found

57

58

Example:

59

order_id = context["order_id"]

60

"""

61

62

def __delitem__(self, key):

63

"""

64

Delete context value using dictionary syntax.

65

66

Args:

67

key (str): Context key to delete

68

69

Raises:

70

KeyError: If key not found

71

72

Example:

73

del context["temp_data"]

74

"""

75

76

def __setattr__(self, key, value):

77

"""

78

Set context value using attribute syntax.

79

80

Args:

81

key (str): Attribute name

82

value: Value to store

83

84

Example:

85

context.order_id = 123

86

"""

87

88

def __delattr__(self, key):

89

"""

90

Delete context value using attribute syntax.

91

92

Args:

93

key (str): Attribute name to delete

94

95

Raises:

96

AttributeError: If attribute not found

97

98

Example:

99

del context.temp_data

100

"""

101

102

def __contains__(self, key):

103

"""

104

Check if key exists in context.

105

106

Args:

107

key (str): Key to check

108

109

Returns:

110

bool: True if key exists

111

112

Example:

113

if "order_id" in context:

114

process_order(context.order_id)

115

"""

116

117

def __iter__(self):

118

"""

119

Iterate over context keys.

120

121

Returns:

122

Iterator over context keys

123

124

Example:

125

for key in context:

126

print(f"{key}: {context[key]}")

127

"""

128

129

def __len__(self):

130

"""

131

Get number of items in context.

132

133

Returns:

134

int: Number of context items

135

136

Example:

137

if len(context) > 0:

138

process_context(context)

139

"""

140

141

def keys(self):

142

"""

143

Get context keys.

144

145

Returns:

146

dict_keys: Context keys

147

"""

148

149

def values(self):

150

"""

151

Get context values.

152

153

Returns:

154

dict_values: Context values

155

"""

156

157

def items(self):

158

"""

159

Get context key-value pairs.

160

161

Returns:

162

dict_items: Key-value pairs

163

"""

164

165

def get(self, key, default=None):

166

"""

167

Get context value with default.

168

169

Args:

170

key (str): Context key

171

default: Default value if key not found

172

173

Returns:

174

Value at key or default

175

176

Example:

177

amount = context.get("amount", 0.0)

178

"""

179

180

def update(self, other=None, **kwargs):

181

"""

182

Update context with another mapping or keyword arguments.

183

184

Args:

185

other: Mapping to update from

186

**kwargs: Additional key-value pairs

187

188

Example:

189

context.update({"status": "processing"}, priority="high")

190

"""

191

192

def pop(self, key, default=None):

193

"""

194

Remove and return context value.

195

196

Args:

197

key (str): Context key

198

default: Default value if key not found

199

200

Returns:

201

Removed value or default

202

203

Example:

204

temp_value = context.pop("temp_data")

205

"""

206

207

def clear(self):

208

"""

209

Remove all context data.

210

211

Example:

212

context.clear() # Reset context

213

"""

214

```

215

216

## Usage Examples

217

218

### Basic Context Operations

219

220

```python

221

from minos.saga import SagaContext

222

223

# Initialize context with data

224

context = SagaContext(

225

order_id=12345,

226

customer_id="cust_456",

227

items=[{"sku": "ITEM1", "quantity": 2}],

228

total=99.99

229

)

230

231

# Dictionary-style access

232

print(f"Order ID: {context['order_id']}")

233

context["status"] = "processing"

234

235

# Attribute-style access

236

print(f"Customer: {context.customer_id}")

237

context.payment_method = "credit_card"

238

239

# Check for data

240

if "discount" in context:

241

apply_discount(context.discount)

242

243

# Get with default

244

shipping_cost = context.get("shipping_cost", 0.0)

245

```

246

247

### Context in Saga Steps

248

249

```python

250

from minos.saga import Saga, SagaContext, SagaRequest

251

252

def create_order_processing_saga():

253

saga = Saga()

254

255

# Step 1: Validate order

256

saga.local_step().on_execute(validate_order_data)

257

258

# Step 2: Reserve inventory

259

saga.remote_step() \

260

.on_execute(create_inventory_request) \

261

.on_success(handle_inventory_success) \

262

.on_failure(release_reservation)

263

264

# Step 3: Process payment

265

saga.remote_step() \

266

.on_execute(create_payment_request) \

267

.on_success(handle_payment_success) \

268

.on_failure(refund_payment)

269

270

return saga.commit()

271

272

# Context flows through all steps

273

def validate_order_data(context):

274

"""Local step - modify context directly"""

275

if not context.get("total") or context.total <= 0:

276

raise ValueError("Invalid order total")

277

278

# Add validation timestamp

279

context.validated_at = datetime.utcnow().isoformat()

280

return context

281

282

def create_inventory_request(context):

283

"""Remote step - use context to create request"""

284

return SagaRequest(

285

target="inventory-service",

286

content={

287

"items": context.items,

288

"order_id": context.order_id

289

}

290

)

291

292

def handle_inventory_success(context, response):

293

"""Success handler - update context with response data"""

294

reservation_data = response.content()

295

context.reservation_id = reservation_data["reservation_id"]

296

context.inventory_status = "reserved"

297

return context

298

299

def create_payment_request(context):

300

"""Payment step - access multiple context fields"""

301

return SagaRequest(

302

target="payment-service",

303

content={

304

"amount": context.total,

305

"customer_id": context.customer_id,

306

"order_id": context.order_id,

307

"payment_method": context.get("payment_method", "default")

308

}

309

)

310

311

def handle_payment_success(context, response):

312

"""Update context with payment details"""

313

payment_data = response.content()

314

context.payment_id = payment_data["payment_id"]

315

context.transaction_id = payment_data["transaction_id"]

316

context.payment_status = "completed"

317

return context

318

```

319

320

### Context Persistence and Recovery

321

322

```python

323

from minos.saga import SagaManager, SagaExecution

324

import json

325

326

# Context is automatically persisted with execution state

327

async def demonstrate_persistence():

328

# Initial context

329

context = SagaContext(

330

process_id="proc_123",

331

data={"key": "value"},

332

step_count=0

333

)

334

335

# Execute saga with pause capability

336

manager = SagaManager(storage=repo, broker_pool=broker)

337

execution_uuid = await manager.run(

338

definition=long_running_saga,

339

context=context,

340

pause_on_disk=True, # Context saved to disk

341

return_execution=False

342

)

343

344

# Later - reload execution and context

345

execution = await repo.load(execution_uuid)

346

recovered_context = execution.context

347

348

# Context is fully restored

349

print(f"Process ID: {recovered_context.process_id}")

350

print(f"Data: {recovered_context.data}")

351

print(f"Step count: {recovered_context.step_count}")

352

353

# Continue execution with recovered context

354

final_result = await manager.run(

355

response=some_response,

356

pause_on_disk=True

357

)

358

```

359

360

### Advanced Context Patterns

361

362

```python

363

# Nested data structures

364

context = SagaContext()

365

context.customer = {

366

"id": "cust_123",

367

"name": "John Doe",

368

"tier": "premium"

369

}

370

context.order = {

371

"items": [

372

{"sku": "ITEM1", "qty": 2, "price": 25.00},

373

{"sku": "ITEM2", "qty": 1, "price": 49.99}

374

],

375

"shipping": {

376

"address": "123 Main St",

377

"method": "express"

378

}

379

}

380

381

# Access nested data

382

customer_tier = context.customer["tier"]

383

first_item_sku = context.order["items"][0]["sku"]

384

shipping_method = context.order["shipping"]["method"]

385

386

# Conditional logic based on context

387

def determine_processing_flow(context):

388

if context.customer.get("tier") == "premium":

389

return "express_processing"

390

elif context.order.get("total", 0) > 100:

391

return "standard_processing"

392

else:

393

return "basic_processing"

394

395

# Context-driven step selection

396

def create_dynamic_saga(context):

397

saga = Saga()

398

399

processing_type = determine_processing_flow(context)

400

context.processing_type = processing_type

401

402

if processing_type == "express_processing":

403

saga.remote_step().on_execute(express_process_order)

404

else:

405

saga.remote_step().on_execute(standard_process_order)

406

407

return saga.commit()

408

```

409

410

### Context Validation and Type Safety

411

412

```python

413

from typing import Dict, List, Optional, Any

414

415

def validate_context_schema(context: SagaContext) -> bool:

416

"""Validate context has required fields with correct types."""

417

418

required_fields = {

419

"order_id": (str, int),

420

"customer_id": str,

421

"total": (int, float),

422

"items": list

423

}

424

425

for field, expected_type in required_fields.items():

426

if field not in context:

427

raise ValueError(f"Required field '{field}' missing from context")

428

429

if not isinstance(context[field], expected_type):

430

raise TypeError(f"Field '{field}' must be of type {expected_type}")

431

432

return True

433

434

def create_validated_saga():

435

saga = Saga()

436

437

# Add validation step at the beginning

438

saga.local_step().on_execute(validate_context_schema)

439

440

# Continue with business logic

441

saga.remote_step().on_execute(process_validated_order)

442

443

return saga.commit()

444

445

# Context factory pattern

446

class OrderContextFactory:

447

@staticmethod

448

def create_order_context(order_data: Dict[str, Any]) -> SagaContext:

449

"""Create standardized order context."""

450

return SagaContext(

451

order_id=order_data["id"],

452

customer_id=order_data["customer_id"],

453

items=order_data["items"],

454

total=sum(item["price"] * item["quantity"] for item in order_data["items"]),

455

created_at=datetime.utcnow().isoformat(),

456

status="pending"

457

)

458

459

@staticmethod

460

def create_payment_context(payment_data: Dict[str, Any], order_context: SagaContext) -> SagaContext:

461

"""Extend order context with payment data."""

462

order_context.update(

463

payment_method=payment_data["method"],

464

payment_token=payment_data["token"],

465

billing_address=payment_data["billing_address"]

466

)

467

return order_context

468

469

# Usage

470

order_data = {"id": "ord_123", "customer_id": "cust_456", "items": [...]}

471

context = OrderContextFactory.create_order_context(order_data)

472

```

473

474

### Context Debugging and Monitoring

475

476

```python

477

import json

478

from datetime import datetime

479

480

def log_context_state(context: SagaContext, step_name: str):

481

"""Log context state for debugging."""

482

print(f"[{datetime.utcnow()}] Context at {step_name}:")

483

for key, value in context.items():

484

print(f" {key}: {value}")

485

print()

486

487

def create_monitored_saga():

488

saga = Saga()

489

490

# Add monitoring to each step

491

saga.local_step().on_execute(lambda ctx: log_context_state(ctx, "start") or validate_order(ctx))

492

493

saga.remote_step() \

494

.on_execute(lambda ctx: log_context_state(ctx, "payment_request") or create_payment_request(ctx)) \

495

.on_success(lambda ctx, resp: log_context_state(ctx, "payment_success") or handle_payment_success(ctx, resp))

496

497

return saga.commit()

498

499

# Context snapshots for debugging

500

class ContextSnapshot:

501

def __init__(self, context: SagaContext, step_name: str):

502

self.step_name = step_name

503

self.timestamp = datetime.utcnow().isoformat()

504

self.data = dict(context) # Create snapshot copy

505

506

def to_json(self) -> str:

507

return json.dumps({

508

"step": self.step_name,

509

"timestamp": self.timestamp,

510

"data": self.data

511

}, indent=2)

512

513

# Usage in saga steps

514

def debug_step_with_snapshot(context):

515

snapshot = ContextSnapshot(context, "debug_checkpoint")

516

print(snapshot.to_json())

517

518

# Continue with step logic

519

context.debug_checkpoint = snapshot.timestamp

520

return context

521

```