or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-minos-microservice-saga

Implementation of the SAGA pattern for distributed microservice transactions in the Minos Framework.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/minos-microservice-saga@0.7.x

To install, run

npx @tessl/cli install tessl/pypi-minos-microservice-saga@0.7.0

0

# Minos Microservice Saga

1

2

A comprehensive Python library implementing the SAGA pattern for distributed microservice transactions in the Minos Framework. This package provides orchestration capabilities for managing complex business processes that span multiple microservices, ensuring data consistency through eventual consistency patterns and compensation-based rollback mechanisms.

3

4

## Package Information

5

6

- **Package Name**: minos-microservice-saga

7

- **Package Type**: pypi

8

- **Language**: Python

9

- **Installation**: `pip install minos-microservice-saga`

10

11

## Core Imports

12

13

```python

14

from minos.saga import (

15

# Core classes

16

Saga,

17

SagaContext,

18

SagaManager,

19

SagaExecution,

20

SagaService,

21

22

# Step definitions

23

LocalSagaStep,

24

RemoteSagaStep,

25

ConditionalSagaStep,

26

IfThenAlternative,

27

ElseThenAlternative,

28

29

# Messages

30

SagaRequest,

31

SagaResponse,

32

SagaResponseStatus,

33

34

# Status and execution

35

SagaStatus,

36

SagaStepStatus,

37

SagaStepExecution,

38

39

# Repositories

40

SagaExecutionRepository,

41

DatabaseSagaExecutionRepository,

42

43

# Transaction management

44

TransactionCommitter,

45

46

# Middleware and utilities

47

transactional_command,

48

get_service_name,

49

)

50

```

51

52

## Basic Usage

53

54

```python

55

from minos.saga import Saga, SagaContext, SagaManager, SagaRequest

56

57

# Define a saga with multiple steps

58

def create_order_saga():

59

saga = Saga()

60

61

# Add local step for order validation

62

saga.local_step().on_execute(validate_order).on_failure(handle_validation_failure)

63

64

# Add remote step for payment processing

65

saga.remote_step() \

66

.on_execute(create_payment_request) \

67

.on_success(handle_payment_success) \

68

.on_error(handle_payment_error) \

69

.on_failure(compensate_payment)

70

71

# Add remote step for inventory reservation

72

saga.remote_step() \

73

.on_execute(reserve_inventory_request) \

74

.on_success(handle_inventory_success) \

75

.on_error(handle_inventory_error) \

76

.on_failure(release_inventory)

77

78

return saga.commit()

79

80

# Execute the saga

81

async def process_order(order_data):

82

saga_definition = create_order_saga()

83

context = SagaContext(order=order_data)

84

85

# Initialize saga manager with storage and broker

86

manager = SagaManager(storage=repo, broker_pool=broker)

87

88

# Run the saga

89

result = await manager.run(saga_definition, context=context)

90

return result

91

92

# Define callback functions

93

def validate_order(context):

94

# Local validation logic

95

if not context.order.get('amount'):

96

raise ValueError("Order amount required")

97

return context

98

99

def create_payment_request(context):

100

# Create payment service request

101

return SagaRequest(

102

target="payment-service",

103

content={"amount": context.order["amount"]}

104

)

105

106

def handle_payment_success(context, response):

107

# Process successful payment

108

context.payment_id = response.content()["payment_id"]

109

return context

110

```

111

112

## Architecture

113

114

The SAGA pattern implementation is built around several key components:

115

116

- **Saga Definitions**: Declarative step sequences with compensation logic

117

- **Execution Engine**: Runtime orchestrator managing step execution and state

118

- **Context Management**: Stateful container carrying data between steps

119

- **Message System**: Request/response infrastructure for microservice communication

120

- **Transaction Management**: Two-phase commit protocol for distributed consistency

121

- **Persistence Layer**: Durable storage for execution state and recovery

122

123

This architecture enables resilient distributed transactions with automatic compensation, pause/resume capabilities, and comprehensive error handling for complex microservice workflows.

124

125

## Capabilities

126

127

### Saga Definition and Construction

128

129

Core functionality for defining distributed transaction sequences with local and remote steps, conditional logic, and compensation behaviors.

130

131

```python { .api }

132

class Saga:

133

def __init__(self, steps=None, committed=False, **kwargs): ...

134

def local_step(self, step=None, **kwargs): ...

135

def remote_step(self, step=None, **kwargs): ...

136

def conditional_step(self, step=None): ...

137

def commit(self, callback=None, **kwargs): ...

138

139

class LocalSagaStep(SagaStep):

140

def on_execute(self, callback, parameters=None, **kwargs): ...

141

def on_failure(self, callback, parameters=None, **kwargs): ...

142

143

class RemoteSagaStep(SagaStep):

144

def on_execute(self, callback, parameters=None, **kwargs): ...

145

def on_success(self, callback, parameters=None, **kwargs): ...

146

def on_error(self, callback, parameters=None, **kwargs): ...

147

def on_failure(self, callback, parameters=None, **kwargs): ...

148

149

class ConditionalSagaStep(SagaStep):

150

def if_then(self, condition, saga): ...

151

def else_then(self, saga): ...

152

```

153

154

[Saga Definitions](./saga-definitions.md)

155

156

### Execution Management and Orchestration

157

158

Runtime execution engine providing saga orchestration, state management, pause/resume capabilities, and comprehensive lifecycle control.

159

160

```python { .api }

161

class SagaExecution:

162

def __init__(self, definition, uuid, context, status=SagaStatus.Created, **kwargs): ...

163

def execute(self, response=None, autocommit=True, **kwargs): ...

164

def rollback(self, autoreject=True, **kwargs): ...

165

def commit(self, **kwargs): ...

166

def reject(self, **kwargs): ...

167

168

class SagaManager:

169

def __init__(self, storage, broker_pool=None, **kwargs): ...

170

def run(self, definition=None, context=None, response=None, user=None, autocommit=True, pause_on_disk=False, **kwargs): ...

171

172

class SagaStatus(Enum):

173

Created = "created"

174

Running = "running"

175

Paused = "paused"

176

Finished = "finished"

177

Errored = "errored"

178

```

179

180

[Execution Engine](./execution-engine.md)

181

182

### Context and State Management

183

184

Stateful execution context that maintains data across saga steps with dictionary-like interface and automatic persistence.

185

186

```python { .api }

187

class SagaContext(BucketModel, MutableMapping):

188

def __init__(self, **kwargs): ...

189

def __setitem__(self, key, value): ...

190

def __getitem__(self, key): ...

191

def __setattr__(self, key, value): ...

192

```

193

194

[Context Management](./context-management.md)

195

196

### Message System and Communication

197

198

Request/response infrastructure for microservice communication with status tracking and service relationship management.

199

200

```python { .api }

201

class SagaRequest:

202

def __init__(self, target, content=None): ...

203

def content(self, **kwargs): ...

204

205

class SagaResponse:

206

def __init__(self, content=None, related_services=None, status=None, uuid=None, **kwargs): ...

207

def content(self, **kwargs): ...

208

209

class SagaResponseStatus(IntEnum):

210

SUCCESS = 200

211

ERROR = 400

212

SYSTEM_ERROR = 500

213

```

214

215

[Message System](./message-system.md)

216

217

### Exception Handling and Error Management

218

219

Comprehensive exception hierarchy for saga definition validation, execution errors, and system failures with specific error types for different failure scenarios.

220

221

```python { .api }

222

class SagaException(MinosException): ...

223

class SagaExecutionException(SagaException): ...

224

class SagaFailedExecutionException(SagaExecutionException): ...

225

class SagaRollbackExecutionException(SagaExecutionException): ...

226

class SagaResponseException(SagaException): ...

227

```

228

229

[Exception Handling](./exception-handling.md)

230

231

### Testing and Development Utilities

232

233

Testing utilities and mocked implementations for saga development and validation including repository test cases and operation factories.

234

235

```python { .api }

236

class SagaExecutionRepositoryTestCase:

237

def build_saga_execution_repository(self): ...

238

def test_store(self): ...

239

def test_load_from_str(self): ...

240

def test_delete(self): ...

241

242

class MockedSagaExecutionDatabaseOperationFactory: ...

243

```

244

245

[Testing Utilities](./testing-utilities.md)

246

247

### Service Integration and Middleware

248

249

Service-level integration for saga management within microservice applications with middleware support for transactional operations.

250

251

```python { .api }

252

class SagaService:

253

def __init__(self, saga_manager, **kwargs): ...

254

def __get_enroute__(self, config): ...

255

def __reply__(self, request): ...

256

257

def transactional_command(request, inner):

258

"""

259

Middleware for transactional command execution.

260

261

Provides automatic transaction context management for

262

saga operations within command handlers.

263

264

Args:

265

request: Incoming command request

266

inner: Inner command handler function

267

268

Returns:

269

Command response with transaction context

270

"""

271

272

def get_service_name(config):

273

"""

274

Utility function to extract service name from configuration.

275

276

Args:

277

config: Service configuration object

278

279

Returns:

280

str: Service name identifier

281

"""

282

```

283

284

## Types

285

286

```python { .api }

287

from typing import Callable, Union, Awaitable, Optional, TypeVar, Any, Dict, List, Tuple

288

from uuid import UUID

289

from enum import Enum, IntEnum

290

291

# Type variables

292

T = TypeVar('T')

293

294

# Core callback types

295

RequestCallBack = Callable[[SagaContext, ...], Union[SagaResponse, Awaitable[SagaRequest]]]

296

ResponseCallBack = Callable[[SagaContext, SagaResponse, ...], Union[Union[Exception, SagaContext], Awaitable[Union[Exception, SagaContext]]]]

297

LocalCallback = Callable[[SagaContext, ...], Union[Optional[SagaContext], Awaitable[Optional[SagaContext]]]]

298

299

# Operation wrapper

300

class SagaOperation:

301

callback: T

302

parameters: Optional[SagaContext]

303

parameterized: bool

304

raw: Dict[str, Any]

305

306

# Step definitions

307

class SagaStep:

308

saga: Optional[Saga]

309

raw: Dict[str, Any]

310

311

# Alternative classes for conditional steps

312

class IfThenAlternative:

313

condition: Any

314

saga: Saga

315

316

class ElseThenAlternative:

317

saga: Saga

318

319

# Status enumerations

320

class SagaStatus(Enum):

321

Created: str

322

Running: str

323

Paused: str

324

Finished: str

325

Errored: str

326

327

class SagaStepStatus(Enum):

328

Created: str

329

RunningOnExecute: str

330

FinishedOnExecute: str

331

ErroredOnExecute: str

332

PausedByOnExecute: str

333

ErroredByOnExecute: str

334

RunningOnFailure: str

335

PausedOnFailure: str

336

ErroredOnFailure: str

337

RunningOnSuccess: str

338

ErroredOnSuccess: str

339

RunningOnError: str

340

ErroredOnError: str

341

Finished: str

342

343

class SagaResponseStatus(IntEnum):

344

SUCCESS: int

345

ERROR: int

346

SYSTEM_ERROR: int

347

348

# Execution types

349

class SagaStepExecution:

350

uuid: UUID

351

definition: SagaStep

352

status: SagaStepStatus

353

already_rollback: bool

354

related_services: Optional[List[str]]

355

raw: Dict[str, Any]

356

357

class TransactionCommitter:

358

execution_uuid: UUID

359

executed_steps: List[SagaStepExecution]

360

transactions: List[Tuple[UUID, str]]

361

362

# Repository interface

363

class SagaExecutionRepository:

364

async def store(self, execution: SagaExecution) -> None: ...

365

async def load(self, uuid: Union[UUID, str]) -> SagaExecution: ...

366

async def delete(self, uuid: Union[UUID, str]) -> None: ...

367

```