or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

context-management.mdexception-handling.mdexecution-engine.mdindex.mdmessage-system.mdsaga-definitions.mdtesting-utilities.md

testing-utilities.mddocs/

0

# Testing Utilities

1

2

Testing utilities and mocked implementations for saga development and validation including repository test cases and operation factories. This module provides comprehensive testing infrastructure to validate saga behavior, execution flows, and integration with storage and messaging systems.

3

4

## Capabilities

5

6

### Repository Testing Base Class

7

8

Abstract test case providing standardized tests for saga execution repository implementations.

9

10

```python { .api }

11

from minos.common.testing import MinosTestCase

12

from abc import abstractmethod

13

14

class SagaExecutionRepositoryTestCase(MinosTestCase):

15

"""

16

Base test case for saga execution repository implementations.

17

18

Provides standard test methods to validate repository implementations

19

conform to the expected interface and behavior patterns.

20

"""

21

22

@abstractmethod

23

def build_saga_execution_repository(self):

24

"""

25

Abstract method to build repository instance.

26

27

Subclasses must implement this method to return a configured

28

instance of their repository implementation for testing.

29

30

Returns:

31

SagaExecutionRepository: Repository instance to test

32

"""

33

34

def test_store(self):

35

"""

36

Test storing saga executions.

37

38

Validates that the repository can successfully store saga execution

39

instances and that they can be retrieved with correct data.

40

"""

41

42

def test_load_from_str(self):

43

"""

44

Test loading executions from string UUID.

45

46

Validates that executions can be loaded using string representation

47

of UUIDs as well as UUID objects.

48

"""

49

50

def test_delete(self):

51

"""

52

Test deleting saga executions.

53

54

Validates that executions can be deleted from the repository

55

and that subsequent load attempts properly raise exceptions.

56

"""

57

58

def test_load_raises(self):

59

"""

60

Test loading non-existent executions raises appropriate exceptions.

61

62

Validates that attempting to load non-existent executions

63

raises SagaExecutionNotFoundException.

64

"""

65

66

def test_store_update(self):

67

"""

68

Test updating existing stored executions.

69

70

Validates that storing an execution with the same UUID

71

updates the existing record rather than creating duplicates.

72

"""

73

```

74

75

### Mocked Operation Factory

76

77

Mocked implementation of database operation factory for testing purposes.

78

79

```python { .api }

80

class MockedSagaExecutionDatabaseOperationFactory(SagaExecutionDatabaseOperationFactory):

81

"""

82

Mocked factory for testing purposes.

83

84

Provides a test-friendly implementation of the database operation factory

85

that can be used in unit tests without requiring actual database connections.

86

"""

87

88

def __init__(self, **kwargs):

89

"""Initialize mocked factory with test configuration."""

90

91

def build_create_operation(self, execution):

92

"""Build mocked create operation for testing."""

93

94

def build_update_operation(self, execution):

95

"""Build mocked update operation for testing."""

96

97

def build_delete_operation(self, uuid):

98

"""Build mocked delete operation for testing."""

99

100

def build_select_operation(self, uuid):

101

"""Build mocked select operation for testing."""

102

```

103

104

## Usage Examples

105

106

### Creating Repository Test Cases

107

108

```python

109

import unittest

110

from minos.saga.testing import SagaExecutionRepositoryTestCase

111

from minos.saga import SagaExecution, Saga, SagaContext, SagaStatus

112

from uuid import uuid4

113

114

class MyRepositoryTestCase(SagaExecutionRepositoryTestCase):

115

"""Test case for custom repository implementation."""

116

117

def build_saga_execution_repository(self):

118

"""Build repository instance for testing."""

119

# Return your repository implementation

120

return MyCustomSagaRepository(

121

connection_string="test://localhost",

122

schema="test_schema"

123

)

124

125

def test_custom_repository_features(self):

126

"""Test custom features specific to your repository."""

127

repo = self.build_saga_execution_repository()

128

129

# Create test execution

130

saga_def = Saga()

131

saga_def.local_step().on_execute(lambda ctx: ctx)

132

committed_saga = saga_def.commit()

133

134

execution = SagaExecution.from_definition(

135

definition=committed_saga,

136

context=SagaContext(test_data="value"),

137

uuid=uuid4()

138

)

139

140

# Test storing

141

await repo.store(execution)

142

143

# Test loading

144

loaded = await repo.load(execution.uuid)

145

self.assertEqual(loaded.uuid, execution.uuid)

146

self.assertEqual(loaded.context.test_data, "value")

147

148

# Test your custom features

149

self.assertTrue(repo.has_custom_feature())

150

151

def test_repository_error_handling(self):

152

"""Test repository error handling."""

153

repo = self.build_saga_execution_repository()

154

155

# Test loading non-existent execution

156

with self.assertRaises(SagaExecutionNotFoundException):

157

await repo.load(uuid4())

158

159

# Test deleting non-existent execution

160

# Should not raise exception

161

await repo.delete(uuid4())

162

163

# Run the tests

164

if __name__ == '__main__':

165

unittest.main()

166

```

167

168

### Testing Saga Definitions

169

170

```python

171

import unittest

172

from minos.saga import (

173

Saga, SagaContext, LocalSagaStep, RemoteSagaStep,

174

EmptySagaException, UndefinedOnExecuteException

175

)

176

177

class SagaDefinitionTestCase(unittest.TestCase):

178

"""Test cases for saga definition validation."""

179

180

def test_empty_saga_raises_exception(self):

181

"""Test that empty saga cannot be committed."""

182

saga = Saga()

183

184

with self.assertRaises(EmptySagaException):

185

saga.commit()

186

187

def test_step_without_execute_raises_exception(self):

188

"""Test that step without on_execute cannot be validated."""

189

saga = Saga()

190

step = saga.local_step()

191

# Don't set on_execute

192

193

with self.assertRaises(UndefinedOnExecuteException):

194

saga.commit()

195

196

def test_valid_saga_commits_successfully(self):

197

"""Test that valid saga commits without errors."""

198

saga = Saga()

199

saga.local_step().on_execute(lambda ctx: ctx)

200

201

committed_saga = saga.commit()

202

self.assertTrue(committed_saga.committed)

203

self.assertEqual(len(committed_saga.steps), 1)

204

205

def test_complex_saga_structure(self):

206

"""Test complex saga with multiple step types."""

207

saga = Saga()

208

209

# Local step

210

local_step = saga.local_step()

211

local_step.on_execute(lambda ctx: ctx)

212

local_step.on_failure(lambda ctx: ctx)

213

214

# Remote step

215

remote_step = saga.remote_step()

216

remote_step.on_execute(lambda ctx: None)

217

remote_step.on_success(lambda ctx, resp: ctx)

218

remote_step.on_error(lambda ctx, resp: ctx)

219

remote_step.on_failure(lambda ctx: None)

220

221

# Conditional step

222

conditional = saga.conditional_step()

223

inner_saga = Saga()

224

inner_saga.local_step().on_execute(lambda ctx: ctx)

225

conditional.if_then(lambda ctx: True, inner_saga.commit())

226

227

committed_saga = saga.commit()

228

self.assertEqual(len(committed_saga.steps), 3)

229

self.assertIsInstance(committed_saga.steps[0], LocalSagaStep)

230

self.assertIsInstance(committed_saga.steps[1], RemoteSagaStep)

231

self.assertIsInstance(committed_saga.steps[2], ConditionalSagaStep)

232

```

233

234

### Testing Saga Execution

235

236

```python

237

import unittest

238

from unittest.mock import AsyncMock, MagicMock

239

from minos.saga import (

240

SagaExecution, SagaManager, SagaContext, SagaRequest, SagaResponse,

241

SagaStatus, SagaResponseStatus

242

)

243

244

class SagaExecutionTestCase(unittest.TestCase):

245

"""Test cases for saga execution behavior."""

246

247

def setUp(self):

248

"""Set up test fixtures."""

249

self.mock_repo = AsyncMock()

250

self.mock_broker = AsyncMock()

251

self.manager = SagaManager(

252

storage=self.mock_repo,

253

broker_pool=self.mock_broker

254

)

255

256

async def test_successful_execution(self):

257

"""Test successful saga execution flow."""

258

# Create test saga

259

saga = Saga()

260

saga.local_step().on_execute(self.success_callback)

261

committed_saga = saga.commit()

262

263

# Execute saga

264

context = SagaContext(test_value=42)

265

result = await self.manager.run(

266

definition=committed_saga,

267

context=context

268

)

269

270

# Verify results

271

self.assertEqual(result.test_value, 42)

272

self.assertEqual(result.processed, True)

273

274

async def test_execution_with_failure(self):

275

"""Test saga execution with step failure."""

276

# Create saga that will fail

277

saga = Saga()

278

saga.local_step().on_execute(self.failing_callback)

279

committed_saga = saga.commit()

280

281

# Execute saga

282

context = SagaContext(test_value=42)

283

284

with self.assertRaises(SagaFailedExecutionException):

285

await self.manager.run(

286

definition=committed_saga,

287

context=context,

288

raise_on_error=True

289

)

290

291

async def test_remote_step_execution(self):

292

"""Test remote step execution with mocked responses."""

293

# Create saga with remote step

294

saga = Saga()

295

saga.remote_step() \

296

.on_execute(self.create_request) \

297

.on_success(self.handle_success)

298

committed_saga = saga.commit()

299

300

# Mock broker response

301

mock_response = SagaResponse(

302

content={"result": "processed"},

303

status=SagaResponseStatus.SUCCESS

304

)

305

306

# Execute saga

307

context = SagaContext(order_id=123)

308

result = await self.manager.run(

309

definition=committed_saga,

310

context=context

311

)

312

313

# Verify remote call was made

314

self.mock_broker.send.assert_called_once()

315

self.assertEqual(result.result, "processed")

316

317

def success_callback(self, context):

318

"""Test callback that succeeds."""

319

context.processed = True

320

return context

321

322

def failing_callback(self, context):

323

"""Test callback that fails."""

324

raise ValueError("Test failure")

325

326

def create_request(self, context):

327

"""Test callback that creates request."""

328

return SagaRequest(

329

target="test-service",

330

content={"order_id": context.order_id}

331

)

332

333

def handle_success(self, context, response):

334

"""Test callback that handles successful response."""

335

data = response.content()

336

context.update(data)

337

return context

338

```

339

340

### Testing with Mocked Components

341

342

```python

343

import unittest

344

from unittest.mock import Mock, AsyncMock, patch

345

from minos.saga.testing import MockedSagaExecutionDatabaseOperationFactory

346

347

class MockedComponentTestCase(unittest.TestCase):

348

"""Test cases using mocked components."""

349

350

def setUp(self):

351

"""Set up mocked components."""

352

self.mock_factory = MockedSagaExecutionDatabaseOperationFactory()

353

self.mock_storage = Mock()

354

self.mock_broker = AsyncMock()

355

356

def test_mocked_database_operations(self):

357

"""Test using mocked database operation factory."""

358

execution = self.create_test_execution()

359

360

# Test create operation

361

create_op = self.mock_factory.build_create_operation(execution)

362

self.assertIsNotNone(create_op)

363

364

# Test select operation

365

select_op = self.mock_factory.build_select_operation(execution.uuid)

366

self.assertIsNotNone(select_op)

367

368

# Test update operation

369

update_op = self.mock_factory.build_update_operation(execution)

370

self.assertIsNotNone(update_op)

371

372

# Test delete operation

373

delete_op = self.mock_factory.build_delete_operation(execution.uuid)

374

self.assertIsNotNone(delete_op)

375

376

@patch('minos.saga.SagaManager')

377

async def test_mocked_manager(self, mock_manager_class):

378

"""Test using mocked saga manager."""

379

mock_manager = mock_manager_class.return_value

380

mock_manager.run.return_value = SagaContext(result="mocked")

381

382

# Use mocked manager

383

result = await mock_manager.run(

384

definition=self.create_test_saga(),

385

context=SagaContext(input="test")

386

)

387

388

self.assertEqual(result.result, "mocked")

389

mock_manager.run.assert_called_once()

390

391

def create_test_execution(self):

392

"""Create test execution for mocking."""

393

saga = Saga()

394

saga.local_step().on_execute(lambda ctx: ctx)

395

committed_saga = saga.commit()

396

397

return SagaExecution.from_definition(

398

definition=committed_saga,

399

context=SagaContext(test="data")

400

)

401

402

def create_test_saga(self):

403

"""Create test saga definition."""

404

saga = Saga()

405

saga.local_step().on_execute(lambda ctx: ctx)

406

return saga.commit()

407

```

408

409

### Integration Testing

410

411

```python

412

import unittest

413

import asyncio

414

from minos.saga import SagaManager, Saga, SagaContext, SagaRequest, SagaResponse

415

416

class SagaIntegrationTestCase(unittest.TestCase):

417

"""Integration tests for complete saga workflows."""

418

419

def setUp(self):

420

"""Set up integration test environment."""

421

# Set up real or test implementations

422

self.setup_test_database()

423

self.setup_test_broker()

424

425

self.manager = SagaManager(

426

storage=self.test_repo,

427

broker_pool=self.test_broker

428

)

429

430

def setup_test_database(self):

431

"""Set up test database for integration testing."""

432

# Configure test database

433

self.test_repo = DatabaseSagaExecutionRepository(

434

connection="sqlite:///:memory:",

435

create_tables=True

436

)

437

438

def setup_test_broker(self):

439

"""Set up test message broker."""

440

# Configure test broker

441

self.test_broker = TestBrokerPool()

442

443

async def test_end_to_end_order_processing(self):

444

"""Test complete order processing saga."""

445

# Create realistic saga

446

saga = self.create_order_processing_saga()

447

448

# Create order context

449

context = SagaContext(

450

order_id="order_123",

451

customer_id="customer_456",

452

items=[

453

{"sku": "ITEM1", "quantity": 2, "price": 25.00},

454

{"sku": "ITEM2", "quantity": 1, "price": 50.00}

455

],

456

total=100.00,

457

payment_method="credit_card"

458

)

459

460

# Execute saga

461

result = await self.manager.run(

462

definition=saga,

463

context=context,

464

autocommit=True

465

)

466

467

# Verify final state

468

self.assertEqual(result.order_status, "completed")

469

self.assertTrue(result.payment_processed)

470

self.assertTrue(result.inventory_reserved)

471

self.assertIsNotNone(result.confirmation_id)

472

473

async def test_saga_failure_and_compensation(self):

474

"""Test saga failure triggers proper compensation."""

475

# Create saga that will fail at payment step

476

saga = self.create_failing_payment_saga()

477

478

context = SagaContext(

479

order_id="order_456",

480

customer_id="customer_789",

481

items=[{"sku": "ITEM1", "quantity": 1}],

482

total=50.00

483

)

484

485

# Execute saga (should fail)

486

with self.assertRaises(SagaFailedExecutionException):

487

await self.manager.run(

488

definition=saga,

489

context=context,

490

raise_on_error=True

491

)

492

493

# Verify compensation was executed

494

self.verify_inventory_released()

495

self.verify_no_payment_charged()

496

497

def create_order_processing_saga(self):

498

"""Create realistic order processing saga."""

499

saga = Saga()

500

501

# Step 1: Validate order

502

saga.local_step() \

503

.on_execute(self.validate_order) \

504

.on_failure(self.log_validation_failure)

505

506

# Step 2: Reserve inventory

507

saga.remote_step() \

508

.on_execute(self.reserve_inventory) \

509

.on_success(self.handle_inventory_reserved) \

510

.on_error(self.handle_inventory_error) \

511

.on_failure(self.release_inventory)

512

513

# Step 3: Process payment

514

saga.remote_step() \

515

.on_execute(self.process_payment) \

516

.on_success(self.handle_payment_success) \

517

.on_error(self.handle_payment_error) \

518

.on_failure(self.refund_payment)

519

520

# Step 4: Send confirmation

521

saga.local_step().on_execute(self.send_confirmation)

522

523

return saga.commit()

524

525

# Implementation of saga callbacks for testing

526

def validate_order(self, context):

527

context.order_validated = True

528

return context

529

530

def reserve_inventory(self, context):

531

return SagaRequest(

532

target="inventory-service",

533

content={"items": context.items, "order_id": context.order_id}

534

)

535

536

def handle_inventory_reserved(self, context, response):

537

data = response.content()

538

context.inventory_reserved = True

539

context.reservation_id = data["reservation_id"]

540

return context

541

542

# Additional callback implementations...

543

```

544

545

### Performance Testing

546

547

```python

548

import unittest

549

import time

550

import asyncio

551

from statistics import mean, stdev

552

553

class SagaPerformanceTestCase(unittest.TestCase):

554

"""Performance tests for saga execution."""

555

556

async def test_execution_performance(self):

557

"""Test saga execution performance under load."""

558

saga = self.create_performance_test_saga()

559

560

execution_times = []

561

num_executions = 100

562

563

for i in range(num_executions):

564

context = SagaContext(iteration=i, data=f"test_data_{i}")

565

566

start_time = time.time()

567

result = await self.manager.run(definition=saga, context=context)

568

end_time = time.time()

569

570

execution_times.append(end_time - start_time)

571

572

# Analyze performance

573

avg_time = mean(execution_times)

574

std_dev = stdev(execution_times)

575

max_time = max(execution_times)

576

min_time = min(execution_times)

577

578

print(f"Performance Results:")

579

print(f" Average execution time: {avg_time:.4f}s")

580

print(f" Standard deviation: {std_dev:.4f}s")

581

print(f" Max execution time: {max_time:.4f}s")

582

print(f" Min execution time: {min_time:.4f}s")

583

584

# Assert performance criteria

585

self.assertLess(avg_time, 0.1, "Average execution time too high")

586

self.assertLess(max_time, 0.5, "Max execution time too high")

587

588

async def test_concurrent_executions(self):

589

"""Test concurrent saga executions."""

590

saga = self.create_performance_test_saga()

591

592

# Create concurrent executions

593

tasks = []

594

for i in range(50):

595

context = SagaContext(concurrent_test=i)

596

task = self.manager.run(definition=saga, context=context)

597

tasks.append(task)

598

599

# Execute concurrently

600

start_time = time.time()

601

results = await asyncio.gather(*tasks)

602

end_time = time.time()

603

604

total_time = end_time - start_time

605

print(f"Concurrent execution of 50 sagas took: {total_time:.2f}s")

606

607

# Verify all completed successfully

608

self.assertEqual(len(results), 50)

609

for i, result in enumerate(results):

610

self.assertEqual(result.concurrent_test, i)

611

612

def create_performance_test_saga(self):

613

"""Create saga optimized for performance testing."""

614

saga = Saga()

615

saga.local_step().on_execute(lambda ctx: ctx)

616

return saga.commit()

617

```