0
# Saga Pattern Support
1
2
The Saga pattern support provides auto-configuration for long-running business processes and complex workflow coordination using Seata's state machine engine. It enables distributed transaction management through the Saga pattern with compensation-based rollback and async execution capabilities.
3
4
## Installation and Setup
5
6
Saga pattern support requires explicit enablement and additional configuration:
7
8
```properties
9
# Enable Seata core functionality
10
seata.enabled=true
11
12
# Enable Saga pattern support
13
seata.saga.enabled=true
14
15
# Configure Saga DataSource (required)
16
seata.saga.data-source-ref=seataSagaDataSource
17
18
# State machine configuration
19
seata.saga.state-machine.enable-async=true
20
seata.saga.state-machine.thread-pool-executor-size=20
21
```
22
23
## Core Components
24
25
### StateMachineEngine Bean
26
27
The main engine for executing Saga state machines and managing workflow orchestration.
28
29
```java { .api }
30
@Bean
31
@ConditionalOnMissingBean
32
public StateMachineEngine stateMachineEngine(StateMachineConfig config);
33
```
34
35
**Parameters:**
36
- `config`: StateMachineConfig containing engine configuration and DataSource
37
38
**Returns:** `ProcessCtrlStateMachineEngine` - The configured state machine engine
39
40
### StateMachineConfig Bean
41
42
Configuration for the state machine engine with database persistence support.
43
44
```java { .api }
45
@Bean
46
@ConditionalOnBean(DataSource.class)
47
@ConditionalOnMissingBean
48
@ConfigurationProperties("seata.saga.state-machine")
49
public StateMachineConfig dbStateMachineConfig(
50
DataSource dataSource,
51
@Qualifier("seataSagaDataSource") @Autowired(required = false) DataSource sagaDataSource,
52
@Qualifier("seataSagaAsyncThreadPoolExecutor") @Autowired(required = false) ThreadPoolExecutor threadPoolExecutor,
53
@Value("${spring.application.name:}") String applicationId,
54
@Value("${seata.tx-service-group:}") String txServiceGroup
55
);
56
```
57
58
**Parameters:**
59
- `dataSource`: Primary application DataSource
60
- `sagaDataSource`: Dedicated DataSource for Saga state persistence
61
- `threadPoolExecutor`: Thread pool for async Saga execution
62
- `applicationId`: Application identifier for transaction coordination
63
- `txServiceGroup`: Transaction service group name
64
65
**Returns:** `DbStateMachineConfig` - Database-backed state machine configuration
66
67
### Async Thread Pool Configuration
68
69
Optional async thread pool for non-blocking Saga execution.
70
71
```java { .api }
72
@Bean("seataSagaAsyncThreadPoolExecutor")
73
@ConditionalOnProperty(prefix = "seata.saga.state-machine", name = "enable-async", havingValue = "true")
74
public ThreadPoolExecutor sagaAsyncThreadPoolExecutor(
75
SagaAsyncThreadPoolProperties properties
76
);
77
```
78
79
**Parameters:**
80
- `properties`: Thread pool configuration properties
81
82
**Returns:** `ThreadPoolExecutor` - Configured async thread pool for Saga execution
83
84
## State Machine Definition
85
86
### JSON State Machine Definition
87
88
Define Saga workflows using JSON state machine definitions:
89
90
```json
91
{
92
"Name": "OrderProcessingSaga",
93
"Comment": "Complete order processing workflow with compensation",
94
"StartAt": "CreateOrder",
95
"Version": "0.0.1",
96
"States": {
97
"CreateOrder": {
98
"Type": "ServiceTask",
99
"ServiceName": "orderService",
100
"ServiceMethod": "createOrder",
101
"CompensateState": "CancelOrder",
102
"Next": "ProcessPayment",
103
"Input": {
104
"$.[order]": "$.[order]"
105
},
106
"Output": {
107
"$.[createdOrder]": "$.[order]"
108
}
109
},
110
"ProcessPayment": {
111
"Type": "ServiceTask",
112
"ServiceName": "paymentService",
113
"ServiceMethod": "processPayment",
114
"CompensateState": "RefundPayment",
115
"Next": "UpdateInventory",
116
"Input": {
117
"$.[payment]": "$.[order.payment]"
118
}
119
},
120
"UpdateInventory": {
121
"Type": "ServiceTask",
122
"ServiceName": "inventoryService",
123
"ServiceMethod": "updateInventory",
124
"CompensateState": "RestoreInventory",
125
"End": true,
126
"Input": {
127
"$.[items]": "$.[order.items]"
128
}
129
},
130
"CancelOrder": {
131
"Type": "ServiceTask",
132
"ServiceName": "orderService",
133
"ServiceMethod": "cancelOrder"
134
},
135
"RefundPayment": {
136
"Type": "ServiceTask",
137
"ServiceName": "paymentService",
138
"ServiceMethod": "refundPayment"
139
},
140
"RestoreInventory": {
141
"Type": "ServiceTask",
142
"ServiceName": "inventoryService",
143
"ServiceMethod": "restoreInventory"
144
}
145
}
146
}
147
```
148
149
### State Machine Registration
150
151
Register state machine definitions with the engine:
152
153
```java
154
@Configuration
155
public class SagaConfig {
156
157
@Autowired
158
private StateMachineEngine stateMachineEngine;
159
160
@PostConstruct
161
public void registerStateMachines() {
162
// Load state machine definition from classpath
163
Resource resource = new ClassPathResource("statemachine/OrderProcessingSaga.json");
164
165
// Register with the engine
166
stateMachineEngine.reloadStateMachineDefinition(resource);
167
}
168
}
169
```
170
171
## Saga Execution
172
173
### Starting a Saga
174
175
Execute Saga workflows using the state machine engine:
176
177
```java
178
@Service
179
public class OrderSagaService {
180
181
@Autowired
182
private StateMachineEngine stateMachineEngine;
183
184
public StateMachineInstance processOrder(Order order) {
185
// Prepare input parameters for the Saga
186
Map<String, Object> startParams = new HashMap<>();
187
startParams.put("order", order);
188
startParams.put("businessKey", "order-" + order.getId());
189
190
// Start the Saga state machine
191
StateMachineInstance instance = stateMachineEngine.startWithBusinessKey(
192
"OrderProcessingSaga", // State machine name
193
null, // Tenant ID (optional)
194
startParams.get("businessKey").toString(), // Business key
195
startParams // Input parameters
196
);
197
198
return instance;
199
}
200
}
201
```
202
203
### Async Saga Execution
204
205
Execute Sagas asynchronously for better performance:
206
207
```java
208
@Service
209
public class AsyncOrderSagaService {
210
211
@Autowired
212
private StateMachineEngine stateMachineEngine;
213
214
@Async
215
public CompletableFuture<StateMachineInstance> processOrderAsync(Order order) {
216
Map<String, Object> startParams = new HashMap<>();
217
startParams.put("order", order);
218
startParams.put("businessKey", "order-" + order.getId());
219
220
// Start async Saga execution
221
StateMachineInstance instance = stateMachineEngine.startAsync(
222
"OrderProcessingSaga",
223
null,
224
startParams,
225
callback -> {
226
// Handle completion callback
227
if (callback.isSuccess()) {
228
log.info("Saga completed successfully: {}", callback.getStateMachineInstance().getId());
229
} else {
230
log.error("Saga failed: {}", callback.getException().getMessage());
231
}
232
}
233
);
234
235
return CompletableFuture.completedFuture(instance);
236
}
237
}
238
```
239
240
## Service Task Implementation
241
242
### Saga Service Methods
243
244
Implement service methods that participate in Saga workflows:
245
246
```java
247
@Component("orderService")
248
public class OrderSagaService {
249
250
@Autowired
251
private OrderRepository orderRepository;
252
253
// Forward service method
254
public Order createOrder(Map<String, Object> context) {
255
Order order = (Order) context.get("order");
256
Order savedOrder = orderRepository.save(order);
257
258
// Update context for next state
259
context.put("createdOrder", savedOrder);
260
return savedOrder;
261
}
262
263
// Compensation method
264
public void cancelOrder(Map<String, Object> context) {
265
Order order = (Order) context.get("createdOrder");
266
if (order != null) {
267
order.setStatus(OrderStatus.CANCELLED);
268
orderRepository.save(order);
269
}
270
}
271
}
272
273
@Component("paymentService")
274
public class PaymentSagaService {
275
276
@Autowired
277
private PaymentRepository paymentRepository;
278
279
public Payment processPayment(Map<String, Object> context) {
280
Map<String, Object> paymentData = (Map<String, Object>) context.get("payment");
281
282
Payment payment = new Payment();
283
payment.setAmount((BigDecimal) paymentData.get("amount"));
284
payment.setCustomerId((String) paymentData.get("customerId"));
285
payment.setStatus(PaymentStatus.COMPLETED);
286
287
return paymentRepository.save(payment);
288
}
289
290
public void refundPayment(Map<String, Object> context) {
291
// Implement payment refund logic
292
Payment payment = (Payment) context.get("payment");
293
if (payment != null) {
294
payment.setStatus(PaymentStatus.REFUNDED);
295
paymentRepository.save(payment);
296
}
297
}
298
}
299
```
300
301
## Configuration Properties
302
303
### Saga Core Configuration
304
305
```java { .api }
306
public class SeataProperties {
307
// Saga-specific properties
308
private SagaProperties saga = new SagaProperties();
309
310
public static class SagaProperties {
311
// Enable Saga pattern support
312
private boolean enabled = false;
313
314
// DataSource reference for Saga state persistence
315
private String dataSourceRef = "seataSagaDataSource";
316
317
// State machine configuration
318
private StateMachineProperties stateMachine = new StateMachineProperties();
319
}
320
}
321
```
322
323
### State Machine Configuration
324
325
```java { .api }
326
@ConfigurationProperties(prefix = "seata.saga.state-machine")
327
public class StateMachineProperties {
328
// Enable async execution
329
private boolean enableAsync = false;
330
331
// Thread pool configuration
332
private AsyncThreadPoolProperties asyncThreadPool = new AsyncThreadPoolProperties();
333
334
// State machine engine settings
335
private String charset = "UTF-8";
336
private boolean enableAutoDataSourceProxy = true;
337
private String sagaJsonParser = "fastjson";
338
}
339
```
340
341
### Async Thread Pool Properties
342
343
```java { .api }
344
@ConfigurationProperties(prefix = "seata.saga.state-machine.async-thread-pool")
345
public class SagaAsyncThreadPoolProperties {
346
// Core thread pool size (default: 1)
347
private int corePoolSize = 1;
348
349
// Maximum thread pool size (default: 20)
350
private int maxPoolSize = 20;
351
352
// Thread keep-alive time in seconds (default: 60)
353
private int keepAliveTime = 60;
354
355
// Work queue capacity
356
private int queueCapacity = Integer.MAX_VALUE;
357
358
// Thread name prefix
359
private String threadNamePrefix = "saga-async-";
360
}
361
```
362
363
## Auto-Configuration Details
364
365
Saga pattern support is configured through `SeataSagaAutoConfiguration`:
366
367
```java { .api }
368
@Configuration(proxyBeanMethods = false)
369
@ConditionalOnProperty(prefix = "seata", name = {"enabled", "saga.enabled"}, havingValue = "true")
370
@AutoConfigureAfter({SeataCoreAutoConfiguration.class, SeataAutoConfiguration.class})
371
public class SeataSagaAutoConfiguration {
372
373
@Bean
374
@ConfigurationProperties("seata.saga.state-machine")
375
public StateMachineConfig dbStateMachineConfig(
376
@Qualifier("dataSource") DataSource dataSource,
377
@Qualifier("seataSagaDataSource") DataSource sagaDataSource,
378
@Qualifier("seataSagaAsyncThreadPoolExecutor") ThreadPoolExecutor threadPoolExecutor,
379
@Value("${seata.application-id}") String applicationId,
380
@Value("${seata.tx-service-group}") String txServiceGroup
381
);
382
383
@Bean
384
@ConditionalOnMissingBean(StateMachineEngine.class)
385
public StateMachineEngine stateMachineEngine(StateMachineConfig stateMachineConfig);
386
387
@Bean("seataSagaAsyncThreadPoolExecutor")
388
@ConditionalOnProperty(prefix = "seata.saga.state-machine", name = "enable-async", havingValue = "true")
389
public ThreadPoolExecutor sagaAsyncThreadPoolExecutor(
390
SagaAsyncThreadPoolProperties properties
391
);
392
393
@Bean("seataSagaRejectedExecutionHandler")
394
@ConditionalOnProperty(prefix = "seata.saga.state-machine", name = "enable-async", havingValue = "true")
395
public RejectedExecutionHandler sagaRejectedExecutionHandler();
396
}
397
```
398
399
## Database Setup
400
401
### Saga State Tables
402
403
Create required database tables for Saga state persistence:
404
405
```sql
406
-- State machine definition table
407
CREATE TABLE `seata_state_machine_def` (
408
`id` varchar(32) NOT NULL COMMENT 'id',
409
`name` varchar(128) NOT NULL COMMENT 'name',
410
`tenant_id` varchar(32) NOT NULL COMMENT 'tenant_id',
411
`app_name` varchar(32) NOT NULL COMMENT 'application name',
412
`type` varchar(20) COMMENT 'state machine type',
413
`comment_` varchar(255) COMMENT 'comment',
414
`ver` varchar(16) NOT NULL COMMENT 'version',
415
`gmt_create` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT 'create time',
416
`status` varchar(2) NOT NULL COMMENT 'status(AC:active|IN:inactive)',
417
`content` longtext COMMENT 'JSON content',
418
`recover_strategy` varchar(16) COMMENT 'transaction recover strategy(Forward|Backward|Unknown)',
419
PRIMARY KEY (`id`)
420
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
421
422
-- State machine instance table
423
CREATE TABLE `seata_state_machine_inst` (
424
`id` varchar(128) NOT NULL COMMENT 'id',
425
`machine_id` varchar(32) NOT NULL COMMENT 'state machine definition id',
426
`tenant_id` varchar(32) NOT NULL COMMENT 'tenant id',
427
`parent_id` varchar(128) COMMENT 'parent id',
428
`gmt_started` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT 'start time',
429
`business_key` varchar(48) COMMENT 'business key',
430
`start_params` longtext COMMENT 'start parameters',
431
`gmt_end` timestamp(3) COMMENT 'end time',
432
`excep` blob COMMENT 'exception',
433
`end_params` longtext COMMENT 'end parameters',
434
`status` varchar(2) COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
435
`compensation_status` varchar(2) COMMENT 'compensation status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
436
`is_running` tinyint(1) COMMENT 'is running(0 no|1 yes)',
437
`gmt_updated` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
438
PRIMARY KEY (`id`),
439
KEY `state_machine_inst_business_key` (`business_key`),
440
KEY `state_machine_inst_status` (`status`),
441
KEY `state_machine_inst_recovery` (`gmt_updated`)
442
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
443
444
-- State instance table
445
CREATE TABLE `seata_state_inst` (
446
`id` varchar(48) NOT NULL COMMENT 'id',
447
`machine_inst_id` varchar(128) NOT NULL COMMENT 'state machine instance id',
448
`name` varchar(128) NOT NULL COMMENT 'state name',
449
`type` varchar(20) COMMENT 'state type',
450
`service_name` varchar(128) COMMENT 'service name',
451
`service_method` varchar(128) COMMENT 'method name',
452
`service_type` varchar(16) COMMENT 'service type',
453
`business_key` varchar(48) COMMENT 'business key',
454
`state_id_compensated_for` varchar(50) COMMENT 'compensated state id',
455
`state_id_retried_for` varchar(50) COMMENT 'retried state id',
456
`gmt_started` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT 'start time',
457
`is_for_update` tinyint(1) COMMENT 'is service for update',
458
`input_params` longtext COMMENT 'input parameters',
459
`output_params` longtext COMMENT 'output parameters',
460
`status` varchar(2) NOT NULL COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
461
`excep` blob COMMENT 'exception',
462
`gmt_end` timestamp(3) COMMENT 'end time',
463
`gmt_updated` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
464
PRIMARY KEY (`id`,`machine_inst_id`)
465
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
466
```
467
468
### DataSource Configuration
469
470
Configure separate DataSource for Saga state persistence:
471
472
```java
473
@Configuration
474
public class SagaDataSourceConfig {
475
476
@Bean("seataSagaDataSource")
477
@ConfigurationProperties("seata.saga.datasource")
478
public DataSource sagaDataSource() {
479
return DataSourceBuilder.create().build();
480
}
481
}
482
```
483
484
```properties
485
# Saga DataSource configuration
486
seata.saga.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
487
seata.saga.datasource.url=jdbc:mysql://localhost:3306/seata_saga
488
seata.saga.datasource.username=saga_user
489
seata.saga.datasource.password=saga_password
490
```
491
492
## Monitoring and Management
493
494
### Saga Instance Monitoring
495
496
Monitor Saga execution and state:
497
498
```java
499
@Service
500
public class SagaMonitoringService {
501
502
@Autowired
503
private StateMachineEngine stateMachineEngine;
504
505
public StateMachineInstance getSagaStatus(String instanceId) {
506
return stateMachineEngine.getStateMachineConfig()
507
.getStateLogStore()
508
.getStateMachineInstance(instanceId);
509
}
510
511
public List<StateMachineInstance> getActiveSagas() {
512
return stateMachineEngine.getStateMachineConfig()
513
.getStateLogStore()
514
.queryStateMachineInstanceByStatus(ExecutionStatus.RU);
515
}
516
}
517
```
518
519
### Saga Recovery and Retry
520
521
Handle failed Saga instances:
522
523
```java
524
@Service
525
public class SagaRecoveryService {
526
527
@Autowired
528
private StateMachineEngine stateMachineEngine;
529
530
@Scheduled(fixedDelay = 60000) // Every minute
531
public void recoverFailedSagas() {
532
List<StateMachineInstance> failedInstances =
533
stateMachineEngine.getStateMachineConfig()
534
.getStateLogStore()
535
.queryStateMachineInstanceByStatus(ExecutionStatus.FA);
536
537
for (StateMachineInstance instance : failedInstances) {
538
try {
539
// Retry failed Saga
540
stateMachineEngine.replayStateMachineInstance(instance.getId());
541
log.info("Retried Saga instance: {}", instance.getId());
542
543
} catch (Exception e) {
544
log.error("Failed to retry Saga instance: {}", instance.getId(), e);
545
}
546
}
547
}
548
}
549
```
550
551
## Error Handling and Compensation
552
553
### Compensation Logic
554
555
Implement proper compensation for failed Saga steps:
556
557
```java
558
@Component("inventoryService")
559
public class InventorySagaService {
560
561
public InventoryResponse updateInventory(Map<String, Object> context) {
562
List<OrderItem> items = (List<OrderItem>) context.get("items");
563
564
for (OrderItem item : items) {
565
// Reduce inventory
566
inventoryRepository.reduceStock(item.getProductId(), item.getQuantity());
567
}
568
569
// Store compensation data
570
context.put("inventoryUpdates", items);
571
return new InventoryResponse("SUCCESS");
572
}
573
574
public void restoreInventory(Map<String, Object> context) {
575
List<OrderItem> items = (List<OrderItem>) context.get("inventoryUpdates");
576
577
if (items != null) {
578
for (OrderItem item : items) {
579
// Restore inventory (compensation)
580
inventoryRepository.increaseStock(item.getProductId(), item.getQuantity());
581
}
582
}
583
}
584
}
585
```
586
587
## Performance Optimization
588
589
### Thread Pool Tuning
590
591
Optimize async execution performance:
592
593
```properties
594
# Saga async thread pool optimization
595
seata.saga.state-machine.async-thread-pool.core-pool-size=10
596
seata.saga.state-machine.async-thread-pool.max-pool-size=50
597
seata.saga.state-machine.async-thread-pool.keep-alive-time=300
598
seata.saga.state-machine.async-thread-pool.queue-capacity=1000
599
```
600
601
### Database Connection Optimization
602
603
```properties
604
# Saga DataSource connection pool
605
seata.saga.datasource.hikari.maximum-pool-size=10
606
seata.saga.datasource.hikari.minimum-idle=2
607
seata.saga.datasource.hikari.connection-timeout=30000
608
```
609
610
## Troubleshooting
611
612
### Common Issues
613
614
1. **Saga DataSource Not Found**: Ensure `seataSagaDataSource` bean is properly configured
615
2. **State Machine Definition Errors**: Validate JSON syntax and state transitions
616
3. **Service Method Not Found**: Verify service bean names and method signatures match state machine definition
617
4. **Async Execution Issues**: Check thread pool configuration and async enablement
618
619
### Debug Configuration
620
621
```properties
622
# Debug Saga execution
623
logging.level.io.seata.saga=DEBUG
624
logging.level.io.seata.saga.engine=DEBUG
625
logging.level.io.seata.saga.statelang=DEBUG
626
627
# Monitor state machine execution
628
seata.saga.state-machine.enable-log-details=true
629
```