or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-transaction-management.mddatasource-auto-proxying.mdhttp-transaction-propagation.mdindex.mdsaga-pattern-support.md

saga-pattern-support.mddocs/

0

# Saga Pattern Support

1

2

The Saga pattern support provides auto-configuration for long-running business processes and complex workflow coordination using Seata's state machine engine. It enables distributed transaction management through the Saga pattern with compensation-based rollback and async execution capabilities.

3

4

## Installation and Setup

5

6

Saga pattern support requires explicit enablement and additional configuration:

7

8

```properties

9

# Enable Seata core functionality

10

seata.enabled=true

11

12

# Enable Saga pattern support

13

seata.saga.enabled=true

14

15

# Configure Saga DataSource (required)

16

seata.saga.data-source-ref=seataSagaDataSource

17

18

# State machine configuration

19

seata.saga.state-machine.enable-async=true

20

seata.saga.state-machine.thread-pool-executor-size=20

21

```

22

23

## Core Components

24

25

### StateMachineEngine Bean

26

27

The main engine for executing Saga state machines and managing workflow orchestration.

28

29

```java { .api }

30

@Bean

31

@ConditionalOnMissingBean

32

public StateMachineEngine stateMachineEngine(StateMachineConfig config);

33

```

34

35

**Parameters:**

36

- `config`: StateMachineConfig containing engine configuration and DataSource

37

38

**Returns:** `ProcessCtrlStateMachineEngine` - The configured state machine engine

39

40

### StateMachineConfig Bean

41

42

Configuration for the state machine engine with database persistence support.

43

44

```java { .api }

45

@Bean

46

@ConditionalOnBean(DataSource.class)

47

@ConditionalOnMissingBean

48

@ConfigurationProperties("seata.saga.state-machine")

49

public StateMachineConfig dbStateMachineConfig(

50

DataSource dataSource,

51

@Qualifier("seataSagaDataSource") @Autowired(required = false) DataSource sagaDataSource,

52

@Qualifier("seataSagaAsyncThreadPoolExecutor") @Autowired(required = false) ThreadPoolExecutor threadPoolExecutor,

53

@Value("${spring.application.name:}") String applicationId,

54

@Value("${seata.tx-service-group:}") String txServiceGroup

55

);

56

```

57

58

**Parameters:**

59

- `dataSource`: Primary application DataSource

60

- `sagaDataSource`: Dedicated DataSource for Saga state persistence

61

- `threadPoolExecutor`: Thread pool for async Saga execution

62

- `applicationId`: Application identifier for transaction coordination

63

- `txServiceGroup`: Transaction service group name

64

65

**Returns:** `DbStateMachineConfig` - Database-backed state machine configuration

66

67

### Async Thread Pool Configuration

68

69

Optional async thread pool for non-blocking Saga execution.

70

71

```java { .api }

72

@Bean("seataSagaAsyncThreadPoolExecutor")

73

@ConditionalOnProperty(prefix = "seata.saga.state-machine", name = "enable-async", havingValue = "true")

74

public ThreadPoolExecutor sagaAsyncThreadPoolExecutor(

75

SagaAsyncThreadPoolProperties properties

76

);

77

```

78

79

**Parameters:**

80

- `properties`: Thread pool configuration properties

81

82

**Returns:** `ThreadPoolExecutor` - Configured async thread pool for Saga execution

83

84

## State Machine Definition

85

86

### JSON State Machine Definition

87

88

Define Saga workflows using JSON state machine definitions:

89

90

```json

91

{

92

"Name": "OrderProcessingSaga",

93

"Comment": "Complete order processing workflow with compensation",

94

"StartAt": "CreateOrder",

95

"Version": "0.0.1",

96

"States": {

97

"CreateOrder": {

98

"Type": "ServiceTask",

99

"ServiceName": "orderService",

100

"ServiceMethod": "createOrder",

101

"CompensateState": "CancelOrder",

102

"Next": "ProcessPayment",

103

"Input": {

104

"$.[order]": "$.[order]"

105

},

106

"Output": {

107

"$.[createdOrder]": "$.[order]"

108

}

109

},

110

"ProcessPayment": {

111

"Type": "ServiceTask",

112

"ServiceName": "paymentService",

113

"ServiceMethod": "processPayment",

114

"CompensateState": "RefundPayment",

115

"Next": "UpdateInventory",

116

"Input": {

117

"$.[payment]": "$.[order.payment]"

118

}

119

},

120

"UpdateInventory": {

121

"Type": "ServiceTask",

122

"ServiceName": "inventoryService",

123

"ServiceMethod": "updateInventory",

124

"CompensateState": "RestoreInventory",

125

"End": true,

126

"Input": {

127

"$.[items]": "$.[order.items]"

128

}

129

},

130

"CancelOrder": {

131

"Type": "ServiceTask",

132

"ServiceName": "orderService",

133

"ServiceMethod": "cancelOrder"

134

},

135

"RefundPayment": {

136

"Type": "ServiceTask",

137

"ServiceName": "paymentService",

138

"ServiceMethod": "refundPayment"

139

},

140

"RestoreInventory": {

141

"Type": "ServiceTask",

142

"ServiceName": "inventoryService",

143

"ServiceMethod": "restoreInventory"

144

}

145

}

146

}

147

```

148

149

### State Machine Registration

150

151

Register state machine definitions with the engine:

152

153

```java

154

@Configuration

155

public class SagaConfig {

156

157

@Autowired

158

private StateMachineEngine stateMachineEngine;

159

160

@PostConstruct

161

public void registerStateMachines() {

162

// Load state machine definition from classpath

163

Resource resource = new ClassPathResource("statemachine/OrderProcessingSaga.json");

164

165

// Register with the engine

166

stateMachineEngine.reloadStateMachineDefinition(resource);

167

}

168

}

169

```

170

171

## Saga Execution

172

173

### Starting a Saga

174

175

Execute Saga workflows using the state machine engine:

176

177

```java

178

@Service

179

public class OrderSagaService {

180

181

@Autowired

182

private StateMachineEngine stateMachineEngine;

183

184

public StateMachineInstance processOrder(Order order) {

185

// Prepare input parameters for the Saga

186

Map<String, Object> startParams = new HashMap<>();

187

startParams.put("order", order);

188

startParams.put("businessKey", "order-" + order.getId());

189

190

// Start the Saga state machine

191

StateMachineInstance instance = stateMachineEngine.startWithBusinessKey(

192

"OrderProcessingSaga", // State machine name

193

null, // Tenant ID (optional)

194

startParams.get("businessKey").toString(), // Business key

195

startParams // Input parameters

196

);

197

198

return instance;

199

}

200

}

201

```

202

203

### Async Saga Execution

204

205

Execute Sagas asynchronously for better performance:

206

207

```java

208

@Service

209

public class AsyncOrderSagaService {

210

211

@Autowired

212

private StateMachineEngine stateMachineEngine;

213

214

@Async

215

public CompletableFuture<StateMachineInstance> processOrderAsync(Order order) {

216

Map<String, Object> startParams = new HashMap<>();

217

startParams.put("order", order);

218

startParams.put("businessKey", "order-" + order.getId());

219

220

// Start async Saga execution

221

StateMachineInstance instance = stateMachineEngine.startAsync(

222

"OrderProcessingSaga",

223

null,

224

startParams,

225

callback -> {

226

// Handle completion callback

227

if (callback.isSuccess()) {

228

log.info("Saga completed successfully: {}", callback.getStateMachineInstance().getId());

229

} else {

230

log.error("Saga failed: {}", callback.getException().getMessage());

231

}

232

}

233

);

234

235

return CompletableFuture.completedFuture(instance);

236

}

237

}

238

```

239

240

## Service Task Implementation

241

242

### Saga Service Methods

243

244

Implement service methods that participate in Saga workflows:

245

246

```java

247

@Component("orderService")

248

public class OrderSagaService {

249

250

@Autowired

251

private OrderRepository orderRepository;

252

253

// Forward service method

254

public Order createOrder(Map<String, Object> context) {

255

Order order = (Order) context.get("order");

256

Order savedOrder = orderRepository.save(order);

257

258

// Update context for next state

259

context.put("createdOrder", savedOrder);

260

return savedOrder;

261

}

262

263

// Compensation method

264

public void cancelOrder(Map<String, Object> context) {

265

Order order = (Order) context.get("createdOrder");

266

if (order != null) {

267

order.setStatus(OrderStatus.CANCELLED);

268

orderRepository.save(order);

269

}

270

}

271

}

272

273

@Component("paymentService")

274

public class PaymentSagaService {

275

276

@Autowired

277

private PaymentRepository paymentRepository;

278

279

public Payment processPayment(Map<String, Object> context) {

280

Map<String, Object> paymentData = (Map<String, Object>) context.get("payment");

281

282

Payment payment = new Payment();

283

payment.setAmount((BigDecimal) paymentData.get("amount"));

284

payment.setCustomerId((String) paymentData.get("customerId"));

285

payment.setStatus(PaymentStatus.COMPLETED);

286

287

return paymentRepository.save(payment);

288

}

289

290

public void refundPayment(Map<String, Object> context) {

291

// Implement payment refund logic

292

Payment payment = (Payment) context.get("payment");

293

if (payment != null) {

294

payment.setStatus(PaymentStatus.REFUNDED);

295

paymentRepository.save(payment);

296

}

297

}

298

}

299

```

300

301

## Configuration Properties

302

303

### Saga Core Configuration

304

305

```java { .api }

306

public class SeataProperties {

307

// Saga-specific properties

308

private SagaProperties saga = new SagaProperties();

309

310

public static class SagaProperties {

311

// Enable Saga pattern support

312

private boolean enabled = false;

313

314

// DataSource reference for Saga state persistence

315

private String dataSourceRef = "seataSagaDataSource";

316

317

// State machine configuration

318

private StateMachineProperties stateMachine = new StateMachineProperties();

319

}

320

}

321

```

322

323

### State Machine Configuration

324

325

```java { .api }

326

@ConfigurationProperties(prefix = "seata.saga.state-machine")

327

public class StateMachineProperties {

328

// Enable async execution

329

private boolean enableAsync = false;

330

331

// Thread pool configuration

332

private AsyncThreadPoolProperties asyncThreadPool = new AsyncThreadPoolProperties();

333

334

// State machine engine settings

335

private String charset = "UTF-8";

336

private boolean enableAutoDataSourceProxy = true;

337

private String sagaJsonParser = "fastjson";

338

}

339

```

340

341

### Async Thread Pool Properties

342

343

```java { .api }

344

@ConfigurationProperties(prefix = "seata.saga.state-machine.async-thread-pool")

345

public class SagaAsyncThreadPoolProperties {

346

// Core thread pool size (default: 1)

347

private int corePoolSize = 1;

348

349

// Maximum thread pool size (default: 20)

350

private int maxPoolSize = 20;

351

352

// Thread keep-alive time in seconds (default: 60)

353

private int keepAliveTime = 60;

354

355

// Work queue capacity

356

private int queueCapacity = Integer.MAX_VALUE;

357

358

// Thread name prefix

359

private String threadNamePrefix = "saga-async-";

360

}

361

```

362

363

## Auto-Configuration Details

364

365

Saga pattern support is configured through `SeataSagaAutoConfiguration`:

366

367

```java { .api }

368

@Configuration(proxyBeanMethods = false)

369

@ConditionalOnProperty(prefix = "seata", name = {"enabled", "saga.enabled"}, havingValue = "true")

370

@AutoConfigureAfter({SeataCoreAutoConfiguration.class, SeataAutoConfiguration.class})

371

public class SeataSagaAutoConfiguration {

372

373

@Bean

374

@ConfigurationProperties("seata.saga.state-machine")

375

public StateMachineConfig dbStateMachineConfig(

376

@Qualifier("dataSource") DataSource dataSource,

377

@Qualifier("seataSagaDataSource") DataSource sagaDataSource,

378

@Qualifier("seataSagaAsyncThreadPoolExecutor") ThreadPoolExecutor threadPoolExecutor,

379

@Value("${seata.application-id}") String applicationId,

380

@Value("${seata.tx-service-group}") String txServiceGroup

381

);

382

383

@Bean

384

@ConditionalOnMissingBean(StateMachineEngine.class)

385

public StateMachineEngine stateMachineEngine(StateMachineConfig stateMachineConfig);

386

387

@Bean("seataSagaAsyncThreadPoolExecutor")

388

@ConditionalOnProperty(prefix = "seata.saga.state-machine", name = "enable-async", havingValue = "true")

389

public ThreadPoolExecutor sagaAsyncThreadPoolExecutor(

390

SagaAsyncThreadPoolProperties properties

391

);

392

393

@Bean("seataSagaRejectedExecutionHandler")

394

@ConditionalOnProperty(prefix = "seata.saga.state-machine", name = "enable-async", havingValue = "true")

395

public RejectedExecutionHandler sagaRejectedExecutionHandler();

396

}

397

```

398

399

## Database Setup

400

401

### Saga State Tables

402

403

Create required database tables for Saga state persistence:

404

405

```sql

406

-- State machine definition table

407

CREATE TABLE `seata_state_machine_def` (

408

`id` varchar(32) NOT NULL COMMENT 'id',

409

`name` varchar(128) NOT NULL COMMENT 'name',

410

`tenant_id` varchar(32) NOT NULL COMMENT 'tenant_id',

411

`app_name` varchar(32) NOT NULL COMMENT 'application name',

412

`type` varchar(20) COMMENT 'state machine type',

413

`comment_` varchar(255) COMMENT 'comment',

414

`ver` varchar(16) NOT NULL COMMENT 'version',

415

`gmt_create` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT 'create time',

416

`status` varchar(2) NOT NULL COMMENT 'status(AC:active|IN:inactive)',

417

`content` longtext COMMENT 'JSON content',

418

`recover_strategy` varchar(16) COMMENT 'transaction recover strategy(Forward|Backward|Unknown)',

419

PRIMARY KEY (`id`)

420

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

421

422

-- State machine instance table

423

CREATE TABLE `seata_state_machine_inst` (

424

`id` varchar(128) NOT NULL COMMENT 'id',

425

`machine_id` varchar(32) NOT NULL COMMENT 'state machine definition id',

426

`tenant_id` varchar(32) NOT NULL COMMENT 'tenant id',

427

`parent_id` varchar(128) COMMENT 'parent id',

428

`gmt_started` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT 'start time',

429

`business_key` varchar(48) COMMENT 'business key',

430

`start_params` longtext COMMENT 'start parameters',

431

`gmt_end` timestamp(3) COMMENT 'end time',

432

`excep` blob COMMENT 'exception',

433

`end_params` longtext COMMENT 'end parameters',

434

`status` varchar(2) COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',

435

`compensation_status` varchar(2) COMMENT 'compensation status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',

436

`is_running` tinyint(1) COMMENT 'is running(0 no|1 yes)',

437

`gmt_updated` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),

438

PRIMARY KEY (`id`),

439

KEY `state_machine_inst_business_key` (`business_key`),

440

KEY `state_machine_inst_status` (`status`),

441

KEY `state_machine_inst_recovery` (`gmt_updated`)

442

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

443

444

-- State instance table

445

CREATE TABLE `seata_state_inst` (

446

`id` varchar(48) NOT NULL COMMENT 'id',

447

`machine_inst_id` varchar(128) NOT NULL COMMENT 'state machine instance id',

448

`name` varchar(128) NOT NULL COMMENT 'state name',

449

`type` varchar(20) COMMENT 'state type',

450

`service_name` varchar(128) COMMENT 'service name',

451

`service_method` varchar(128) COMMENT 'method name',

452

`service_type` varchar(16) COMMENT 'service type',

453

`business_key` varchar(48) COMMENT 'business key',

454

`state_id_compensated_for` varchar(50) COMMENT 'compensated state id',

455

`state_id_retried_for` varchar(50) COMMENT 'retried state id',

456

`gmt_started` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT 'start time',

457

`is_for_update` tinyint(1) COMMENT 'is service for update',

458

`input_params` longtext COMMENT 'input parameters',

459

`output_params` longtext COMMENT 'output parameters',

460

`status` varchar(2) NOT NULL COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',

461

`excep` blob COMMENT 'exception',

462

`gmt_end` timestamp(3) COMMENT 'end time',

463

`gmt_updated` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),

464

PRIMARY KEY (`id`,`machine_inst_id`)

465

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

466

```

467

468

### DataSource Configuration

469

470

Configure separate DataSource for Saga state persistence:

471

472

```java

473

@Configuration

474

public class SagaDataSourceConfig {

475

476

@Bean("seataSagaDataSource")

477

@ConfigurationProperties("seata.saga.datasource")

478

public DataSource sagaDataSource() {

479

return DataSourceBuilder.create().build();

480

}

481

}

482

```

483

484

```properties

485

# Saga DataSource configuration

486

seata.saga.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

487

seata.saga.datasource.url=jdbc:mysql://localhost:3306/seata_saga

488

seata.saga.datasource.username=saga_user

489

seata.saga.datasource.password=saga_password

490

```

491

492

## Monitoring and Management

493

494

### Saga Instance Monitoring

495

496

Monitor Saga execution and state:

497

498

```java

499

@Service

500

public class SagaMonitoringService {

501

502

@Autowired

503

private StateMachineEngine stateMachineEngine;

504

505

public StateMachineInstance getSagaStatus(String instanceId) {

506

return stateMachineEngine.getStateMachineConfig()

507

.getStateLogStore()

508

.getStateMachineInstance(instanceId);

509

}

510

511

public List<StateMachineInstance> getActiveSagas() {

512

return stateMachineEngine.getStateMachineConfig()

513

.getStateLogStore()

514

.queryStateMachineInstanceByStatus(ExecutionStatus.RU);

515

}

516

}

517

```

518

519

### Saga Recovery and Retry

520

521

Handle failed Saga instances:

522

523

```java

524

@Service

525

public class SagaRecoveryService {

526

527

@Autowired

528

private StateMachineEngine stateMachineEngine;

529

530

@Scheduled(fixedDelay = 60000) // Every minute

531

public void recoverFailedSagas() {

532

List<StateMachineInstance> failedInstances =

533

stateMachineEngine.getStateMachineConfig()

534

.getStateLogStore()

535

.queryStateMachineInstanceByStatus(ExecutionStatus.FA);

536

537

for (StateMachineInstance instance : failedInstances) {

538

try {

539

// Retry failed Saga

540

stateMachineEngine.replayStateMachineInstance(instance.getId());

541

log.info("Retried Saga instance: {}", instance.getId());

542

543

} catch (Exception e) {

544

log.error("Failed to retry Saga instance: {}", instance.getId(), e);

545

}

546

}

547

}

548

}

549

```

550

551

## Error Handling and Compensation

552

553

### Compensation Logic

554

555

Implement proper compensation for failed Saga steps:

556

557

```java

558

@Component("inventoryService")

559

public class InventorySagaService {

560

561

public InventoryResponse updateInventory(Map<String, Object> context) {

562

List<OrderItem> items = (List<OrderItem>) context.get("items");

563

564

for (OrderItem item : items) {

565

// Reduce inventory

566

inventoryRepository.reduceStock(item.getProductId(), item.getQuantity());

567

}

568

569

// Store compensation data

570

context.put("inventoryUpdates", items);

571

return new InventoryResponse("SUCCESS");

572

}

573

574

public void restoreInventory(Map<String, Object> context) {

575

List<OrderItem> items = (List<OrderItem>) context.get("inventoryUpdates");

576

577

if (items != null) {

578

for (OrderItem item : items) {

579

// Restore inventory (compensation)

580

inventoryRepository.increaseStock(item.getProductId(), item.getQuantity());

581

}

582

}

583

}

584

}

585

```

586

587

## Performance Optimization

588

589

### Thread Pool Tuning

590

591

Optimize async execution performance:

592

593

```properties

594

# Saga async thread pool optimization

595

seata.saga.state-machine.async-thread-pool.core-pool-size=10

596

seata.saga.state-machine.async-thread-pool.max-pool-size=50

597

seata.saga.state-machine.async-thread-pool.keep-alive-time=300

598

seata.saga.state-machine.async-thread-pool.queue-capacity=1000

599

```

600

601

### Database Connection Optimization

602

603

```properties

604

# Saga DataSource connection pool

605

seata.saga.datasource.hikari.maximum-pool-size=10

606

seata.saga.datasource.hikari.minimum-idle=2

607

seata.saga.datasource.hikari.connection-timeout=30000

608

```

609

610

## Troubleshooting

611

612

### Common Issues

613

614

1. **Saga DataSource Not Found**: Ensure `seataSagaDataSource` bean is properly configured

615

2. **State Machine Definition Errors**: Validate JSON syntax and state transitions

616

3. **Service Method Not Found**: Verify service bean names and method signatures match state machine definition

617

4. **Async Execution Issues**: Check thread pool configuration and async enablement

618

619

### Debug Configuration

620

621

```properties

622

# Debug Saga execution

623

logging.level.io.seata.saga=DEBUG

624

logging.level.io.seata.saga.engine=DEBUG

625

logging.level.io.seata.saga.statelang=DEBUG

626

627

# Monitor state machine execution

628

seata.saga.state-machine.enable-log-details=true

629

```