or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

context-management.mdexception-handling.mdexecution-engine.mdindex.mdmessage-system.mdsaga-definitions.mdtesting-utilities.md

execution-engine.mddocs/

0

# Execution Engine

1

2

Runtime execution engine providing saga orchestration, state management, pause/resume capabilities, and comprehensive lifecycle control. This module handles the actual execution of saga definitions with support for distributed coordination, error recovery, and transaction management.

3

4

## Capabilities

5

6

### Saga Execution Management

7

8

The core execution class that represents a running instance of a saga definition.

9

10

```python { .api }

11

class SagaExecution:

12

"""

13

Runtime execution instance of a saga definition.

14

15

Attributes:

16

uuid (UUID): Unique execution identifier

17

definition (Saga): Saga definition being executed

18

executed_steps (list[SagaStepExecution]): Steps that have been executed

19

context (SagaContext): Current execution context

20

status (SagaStatus): Current execution status

21

paused_step (SagaStepExecution): Currently paused step (if any)

22

already_rollback (bool): Whether rollback has been performed

23

user (Optional[UUID]): User identifier for remote steps

24

"""

25

def __init__(self, definition, uuid, context, status=SagaStatus.Created, steps=None, paused_step=None, already_rollback=False, user=None, *args, **kwargs):

26

"""

27

Initialize execution instance with definition and context.

28

29

Args:

30

definition (Saga): Saga definition to execute

31

uuid (UUID): Unique execution identifier

32

context (SagaContext): Initial execution context

33

status (SagaStatus): Initial execution status

34

steps (Optional[list[SagaStepExecution]]): Pre-existing step executions

35

paused_step (Optional[SagaStepExecution]): Currently paused step

36

already_rollback (bool): Whether rollback has been performed

37

user (Optional[UUID]): User identifier for remote steps

38

"""

39

40

@classmethod

41

def from_definition(cls, definition, context=None, uuid=None, *args, **kwargs):

42

"""

43

Create execution from saga definition.

44

45

Args:

46

definition (Saga): Committed saga definition

47

context (Optional[SagaContext]): Initial context

48

uuid (Optional[UUID]): Execution identifier

49

50

Returns:

51

SagaExecution: New execution instance

52

53

Raises:

54

SagaNotCommittedException: If saga not committed

55

"""

56

57

@classmethod

58

def from_raw(cls, raw, **kwargs):

59

"""

60

Build execution from raw representation.

61

62

Args:

63

raw (dict): Raw execution data

64

65

Returns:

66

SagaExecution: Reconstructed execution instance

67

"""

68

69

def execute(self, response=None, autocommit=True, **kwargs):

70

"""

71

Execute the saga steps.

72

73

Args:

74

response (Optional[SagaResponse]): Response for continuing paused step

75

autocommit (bool): Whether to automatically commit/reject transactions

76

77

Returns:

78

SagaContext: Final execution context

79

80

Raises:

81

SagaExecutionAlreadyExecutedException: If execution already finished

82

SagaFailedExecutionException: If execution fails

83

SagaPausedExecutionStepException: If step requires pause

84

"""

85

86

def rollback(self, autoreject=True, **kwargs):

87

"""

88

Perform compensatory rollback of executed steps.

89

90

Args:

91

autoreject (bool): Whether to automatically reject transactions

92

93

Raises:

94

SagaRollbackExecutionException: If rollback fails

95

"""

96

97

def commit(self, **kwargs):

98

"""

99

Commit execution transactions.

100

101

Raises:

102

SagaFailedCommitCallbackException: If commit callback fails

103

"""

104

105

def reject(self, **kwargs):

106

"""Reject execution transactions."""

107

```

108

109

### Saga Orchestration Manager

110

111

The main orchestrator for saga execution lifecycle management.

112

113

```python { .api }

114

class SagaManager:

115

"""

116

Main orchestrator for saga execution lifecycle.

117

118

Attributes:

119

storage (SagaExecutionRepository): Persistence repository

120

broker_pool (BrokerClientPool): Message broker connection pool

121

"""

122

def __init__(self, storage, broker_pool=None, pool_factory=None, **kwargs):

123

"""

124

Initialize manager with storage and broker pool.

125

126

Args:

127

storage (SagaExecutionRepository): Repository for saga persistence

128

broker_pool (Optional[BrokerClientPool]): Message broker pool

129

pool_factory: Factory for creating broker pools

130

"""

131

132

def run(self, definition=None, context=None, response=None, user=None, autocommit=True, pause_on_disk=False, raise_on_error=True, return_execution=True, **kwargs):

133

"""

134

Execute saga with comprehensive lifecycle management.

135

136

Args:

137

definition (Optional[Saga]): Saga definition to execute

138

context (Optional[SagaContext]): Initial execution context

139

response (Optional[SagaResponse]): Response for continuing execution

140

user (Optional[UUID]): User identifier for remote steps

141

autocommit (bool): Automatically commit/reject transactions

142

pause_on_disk (bool): Pause remote steps on disk vs memory

143

raise_on_error (bool): Raise exceptions on execution errors

144

return_execution (bool): Return SagaExecution vs UUID

145

146

Returns:

147

Union[SagaExecution, UUID, SagaContext]: Execution result based on options

148

149

Raises:

150

SagaFailedExecutionException: If execution fails and raise_on_error=True

151

"""

152

153

@classmethod

154

def _from_config(cls, config, **kwargs):

155

"""

156

Build manager from configuration.

157

158

Args:

159

config: Configuration object

160

161

Returns:

162

SagaManager: Configured manager instance

163

"""

164

```

165

166

### Execution Status Management

167

168

Enums defining the various states of saga and step execution.

169

170

```python { .api }

171

from enum import Enum

172

173

class SagaStatus(Enum):

174

"""

175

Saga execution status states.

176

177

Values:

178

Created: Initial state before execution

179

Running: Currently executing steps

180

Paused: Execution paused waiting for response

181

Finished: Successfully completed all steps

182

Errored: Execution failed with error

183

"""

184

Created = "created"

185

Running = "running"

186

Paused = "paused"

187

Finished = "finished"

188

Errored = "errored"

189

190

class SagaStepStatus(Enum):

191

"""

192

Individual step execution status states.

193

194

Values:

195

Created: Step created but not started

196

RunningOnExecute: Executing main operation

197

FinishedOnExecute: Main operation completed

198

ErroredOnExecute: Main operation failed

199

PausedByOnExecute: Paused by main operation

200

ErroredByOnExecute: Error in main operation

201

RunningOnFailure: Executing failure compensation

202

PausedOnFailure: Paused during failure handling

203

ErroredOnFailure: Failure compensation failed

204

RunningOnSuccess: Processing successful response

205

ErroredOnSuccess: Success handler failed

206

RunningOnError: Processing error response

207

ErroredOnError: Error handler failed

208

Finished: Step completed successfully

209

"""

210

Created = "created"

211

RunningOnExecute = "running-on-execute"

212

FinishedOnExecute = "finished-on-execute"

213

ErroredOnExecute = "errored-on-execute"

214

PausedByOnExecute = "paused-by-on-execute"

215

ErroredByOnExecute = "errored-by-on-execute"

216

RunningOnFailure = "running-on-failure"

217

PausedOnFailure = "paused-on-failure"

218

ErroredOnFailure = "errored-on-failure"

219

RunningOnSuccess = "running-on-success"

220

ErroredOnSuccess = "errored-on-success"

221

RunningOnError = "running-on-error"

222

ErroredOnError = "errored-on-error"

223

Finished = "finished"

224

```

225

226

### Step Execution Classes

227

228

Runtime execution instances for different step types.

229

230

```python { .api }

231

from abc import ABC, abstractmethod

232

233

class SagaStepExecution(ABC):

234

"""

235

Base class for step execution instances.

236

237

Attributes:

238

uuid (UUID): Step execution identifier

239

definition (SagaStep): Step definition being executed

240

status (SagaStepStatus): Current step status

241

"""

242

def __init__(self, definition, uuid, status=SagaStepStatus.Created, **kwargs):

243

"""Initialize step execution with definition."""

244

245

class LocalSagaStepExecution(SagaStepExecution):

246

"""

247

Execution instance for local steps.

248

249

Handles local function execution within the same service process.

250

"""

251

def __init__(self, definition, uuid, status=SagaStepStatus.Created, **kwargs):

252

"""Initialize local step execution."""

253

254

class RemoteSagaStepExecution(SagaStepExecution):

255

"""

256

Execution instance for remote steps.

257

258

Handles remote service calls with request/response coordination.

259

"""

260

def __init__(self, definition, uuid, status=SagaStepStatus.Created, **kwargs):

261

"""Initialize remote step execution."""

262

263

class ConditionalSagaStepExecution(SagaStepExecution):

264

"""

265

Execution instance for conditional steps.

266

267

Handles conditional logic evaluation and nested saga execution.

268

"""

269

def __init__(self, definition, uuid, status=SagaStepStatus.Created, **kwargs):

270

"""Initialize conditional step execution."""

271

```

272

273

### Execution Persistence

274

275

Repository system for durable saga execution storage and recovery.

276

277

```python { .api }

278

from abc import ABC, abstractmethod

279

280

class SagaExecutionRepository(ABC):

281

"""

282

Base class for saga execution persistence.

283

284

Provides durable storage for saga execution state enabling

285

pause/resume and recovery capabilities.

286

"""

287

@abstractmethod

288

def store(self, execution):

289

"""

290

Store saga execution to persistent storage.

291

292

Args:

293

execution (SagaExecution): Execution to store

294

"""

295

296

@abstractmethod

297

def load(self, uuid):

298

"""

299

Load saga execution from persistent storage.

300

301

Args:

302

uuid (Union[UUID, str]): Execution identifier

303

304

Returns:

305

SagaExecution: Loaded execution instance

306

307

Raises:

308

SagaExecutionNotFoundException: If execution not found

309

"""

310

311

@abstractmethod

312

def delete(self, uuid):

313

"""

314

Delete saga execution from persistent storage.

315

316

Args:

317

uuid (Union[UUID, str]): Execution identifier

318

"""

319

320

class DatabaseSagaExecutionRepository(SagaExecutionRepository):

321

"""Database implementation of saga execution repository."""

322

323

class SagaExecutionDatabaseOperationFactory:

324

"""Factory for database operations on saga executions."""

325

```

326

327

### Transaction Management

328

329

Two-phase commit protocol implementation for distributed transaction coordination.

330

331

```python { .api }

332

class TransactionCommitter:

333

"""

334

Manages two-phase commit protocol for saga transactions.

335

336

Coordinates distributed transaction commits across multiple

337

services participating in the saga execution.

338

339

Attributes:

340

execution_uuid (UUID): Execution identifier

341

executed_steps (list[SagaStepExecution]): Steps that participated

342

transactions (list[tuple[UUID, str]]): Transaction UUID and service pairs

343

"""

344

def __init__(self, execution_uuid, executed_steps, broker_publisher, broker_pool=None, **kwargs):

345

"""

346

Initialize committer with execution details.

347

348

Args:

349

execution_uuid (UUID): Saga execution identifier

350

executed_steps (list[SagaStepExecution]): Executed steps

351

broker_publisher: Message broker publisher

352

broker_pool: Optional broker connection pool

353

"""

354

355

def commit(self, **kwargs):

356

"""

357

Commit all transactions using two-phase commit protocol.

358

359

Sends commit messages to all participating services and

360

waits for confirmation of successful commitment.

361

362

Raises:

363

SagaFailedCommitCallbackException: If any service fails to commit

364

"""

365

366

def reject(self):

367

"""

368

Reject all transactions.

369

370

Sends reject messages to all participating services to

371

rollback their local transaction state.

372

"""

373

```

374

375

### Execution Coordinators

376

377

Specialized executors for different types of saga operations.

378

379

```python { .api }

380

class Executor:

381

"""

382

Base executor for saga operations.

383

384

Attributes:

385

execution_uuid (UUID): Execution identifier for transaction context

386

"""

387

def __init__(self, execution_uuid, *args, **kwargs):

388

"""Initialize executor with execution context."""

389

390

def exec(self, operation, *args, **kwargs):

391

"""Execute saga operation within transaction context."""

392

393

def exec_function(self, func, *args, **kwargs):

394

"""Execute function within transaction context."""

395

396

class LocalExecutor(Executor):

397

"""Executor for local operations within the same service."""

398

399

class RequestExecutor(Executor):

400

"""Executor for remote request operations to other services."""

401

402

class ResponseExecutor(Executor):

403

"""Executor for remote response processing from other services."""

404

```

405

406

## Usage Examples

407

408

### Basic Saga Execution

409

410

```python

411

from minos.saga import SagaManager, SagaExecution, SagaContext

412

from minos.saga.executions.repositories import DatabaseSagaExecutionRepository

413

414

# Initialize saga manager

415

storage = DatabaseSagaExecutionRepository(...)

416

manager = SagaManager(storage=storage, broker_pool=broker_pool)

417

418

# Execute saga with automatic lifecycle management

419

async def execute_order_saga(order_data):

420

saga_definition = create_order_saga()

421

context = SagaContext(order=order_data)

422

423

# Run with automatic commit and error handling

424

result = await manager.run(

425

definition=saga_definition,

426

context=context,

427

autocommit=True,

428

raise_on_error=True

429

)

430

431

return result

432

```

433

434

### Manual Execution Control

435

436

```python

437

from minos.saga import SagaExecution, SagaStatus

438

439

# Create execution manually for fine-grained control

440

execution = SagaExecution.from_definition(

441

definition=saga_definition,

442

context=SagaContext(data="initial"),

443

uuid=uuid4()

444

)

445

446

# Execute with manual transaction control

447

try:

448

context = await execution.execute(autocommit=False)

449

450

# Manual commit after validation

451

if validate_results(context):

452

await execution.commit()

453

else:

454

await execution.reject()

455

456

except Exception as e:

457

# Manual rollback on failure

458

await execution.rollback()

459

raise

460

```

461

462

### Pause and Resume Execution

463

464

```python

465

# Execute with pause-on-disk for background processing

466

execution_uuid = await manager.run(

467

definition=long_running_saga,

468

context=initial_context,

469

pause_on_disk=True, # Pause remote steps on disk

470

return_execution=False # Return UUID instead of execution

471

)

472

473

# Later, resume with response

474

response = SagaResponse(content={"result": "processed"})

475

final_context = await manager.run(

476

response=response,

477

pause_on_disk=True

478

)

479

```

480

481

### Status Monitoring

482

483

```python

484

from minos.saga import SagaStatus, SagaStepStatus

485

486

# Check execution status

487

execution = await storage.load(execution_uuid)

488

489

if execution.status == SagaStatus.Paused:

490

print(f"Execution paused at step: {execution.paused_step.uuid}")

491

print(f"Step status: {execution.paused_step.status}")

492

493

elif execution.status == SagaStatus.Finished:

494

print("Saga completed successfully")

495

print(f"Final context: {execution.context}")

496

497

elif execution.status == SagaStatus.Errored:

498

print("Saga execution failed")

499

# Trigger rollback if needed

500

await execution.rollback()

501

```

502

503

### Custom Repository Implementation

504

505

```python

506

from minos.saga.executions.repositories import SagaExecutionRepository

507

508

class CustomSagaRepository(SagaExecutionRepository):

509

def __init__(self, storage_backend):

510

self.storage = storage_backend

511

512

async def store(self, execution):

513

# Custom storage logic

514

await self.storage.save(execution.uuid, execution.raw)

515

516

async def load(self, uuid):

517

# Custom loading logic

518

raw_data = await self.storage.get(uuid)

519

if not raw_data:

520

raise SagaExecutionNotFoundException(f"Execution {uuid} not found")

521

return SagaExecution.from_raw(raw_data)

522

523

async def delete(self, uuid):

524

# Custom deletion logic

525

await self.storage.remove(uuid)

526

527

# Use custom repository

528

custom_repo = CustomSagaRepository(my_storage)

529

manager = SagaManager(storage=custom_repo)

530

```