Reactive transaction management provides transaction support for reactive applications using Project Reactor. The TransactionalOperator and ReactiveTransactionManager enable transactional boundaries for Mono and Flux publishers.
Reactive operator for transaction demarcation. Provides functional composition for transactional reactive streams.
/**
* Reactive operator for transaction demarcation.
* Provides functional composition for transactional Mono and Flux publishers.
*/
public interface TransactionalOperator {
/**
* Create a TransactionalOperator for the given transaction manager.
*/
static TransactionalOperator create(ReactiveTransactionManager transactionManager);
/**
* Create a TransactionalOperator for the given transaction manager
* using a custom transaction definition.
*/
static TransactionalOperator create(
ReactiveTransactionManager transactionManager,
TransactionDefinition transactionDefinition
);
/**
* Wrap a Flux in a transaction. The transaction is created before
* subscription and committed/rolled back after completion.
*/
<T> Flux<T> transactional(Flux<T> flux);
/**
* Wrap a Mono in a transaction. The transaction is created before
* subscription and committed/rolled back after completion.
*/
<T> Mono<T> transactional(Mono<T> mono);
/**
* Execute a callback within a transaction.
*/
<T> Flux<T> execute(TransactionCallback<T> action);
}Usage Examples:
import org.springframework.transaction.reactive.TransactionalOperator;
import org.springframework.transaction.ReactiveTransactionManager;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
@Service
public class ReactiveOrderService {
private final TransactionalOperator transactionalOperator;
private final ReactiveOrderRepository orderRepository;
public ReactiveOrderService(
ReactiveTransactionManager transactionManager,
ReactiveOrderRepository orderRepository) {
this.transactionalOperator = TransactionalOperator.create(transactionManager);
this.orderRepository = orderRepository;
}
// Wrap Mono in transaction using as() operator
public Mono<Order> createOrder(Order order) {
return orderRepository.save(order)
.doOnNext(saved -> logger.info("Order saved: {}", saved.getId()))
.as(transactionalOperator::transactional);
}
// Wrap Flux in transaction
public Flux<Order> createOrders(List<Order> orders) {
return Flux.fromIterable(orders)
.flatMap(orderRepository::save)
.as(transactionalOperator::transactional);
}
// Use execute for complex operations
public Mono<OrderResult> processOrder(OrderRequest request) {
return transactionalOperator.execute(status -> {
return orderRepository.save(new Order(request))
.flatMap(order -> inventoryService.reserveItems(order)
.then(paymentService.charge(order))
.thenReturn(new OrderResult(order)));
}).single();
}
// Custom transaction definition
public Mono<Order> criticalOperation(Order order) {
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setIsolationLevel(TransactionDefinition.ISOLATION_SERIALIZABLE);
def.setTimeout(30);
TransactionalOperator customOperator =
TransactionalOperator.create(transactionManager, def);
return orderRepository.save(order)
.as(customOperator::transactional);
}
}Central interface for reactive transaction management. Parallels PlatformTransactionManager for reactive streams.
/**
* Central interface for reactive transaction management.
* This is the reactive equivalent of PlatformTransactionManager.
*/
public interface ReactiveTransactionManager extends TransactionManager {
/**
* Get a reactive transaction according to the specified definition,
* or create a new one if necessary.
*
* @param definition TransactionDefinition instance (can be null for defaults)
* @return Mono emitting the transaction object
* @throws TransactionException in case of lookup, creation, or system errors
*/
Mono<ReactiveTransaction> getReactiveTransaction(TransactionDefinition definition)
throws TransactionException;
/**
* Commit the given reactive transaction.
* If the transaction has been marked rollback-only, perform a rollback.
*
* @param transaction the transaction object returned by getReactiveTransaction
* @return Mono that completes when commit finishes
* @throws TransactionException in case of commit or system errors
*/
Mono<Void> commit(ReactiveTransaction transaction) throws TransactionException;
/**
* Rollback the given reactive transaction.
*
* @param transaction the transaction object returned by getReactiveTransaction
* @return Mono that completes when rollback finishes
* @throws TransactionException in case of system errors
*/
Mono<Void> rollback(ReactiveTransaction transaction) throws TransactionException;
}Usage Example:
@Configuration
public class ReactiveTransactionConfig {
@Bean
public ReactiveTransactionManager transactionManager(
ConnectionFactory connectionFactory) {
return new R2dbcTransactionManager(connectionFactory);
}
}
@Service
public class ReactiveUserService {
private final ReactiveTransactionManager transactionManager;
public Mono<User> createUserManually(User user) {
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
return transactionManager.getReactiveTransaction(def)
.flatMap(transaction -> {
return userRepository.save(user)
.flatMap(saved -> transactionManager.commit(transaction)
.thenReturn(saved))
.onErrorResume(error -> transactionManager.rollback(transaction)
.then(Mono.error(error)));
});
}
}Representation of an ongoing reactive transaction with status and control methods.
/**
* Representation of the status of a reactive transaction.
* Extends TransactionExecution to provide status query methods.
*/
public interface ReactiveTransaction extends TransactionExecution {
// Inherits methods from TransactionExecution:
// - String getTransactionName()
// - boolean hasTransaction()
// - boolean isNewTransaction()
// - boolean isNested()
// - boolean isReadOnly()
// - void setRollbackOnly()
// - boolean isRollbackOnly()
// - boolean isCompleted()
}Usage Example:
@Service
public class ReactiveTransactionStatusService {
public Mono<Order> processOrder(TransactionalOperator operator) {
return operator.execute(status -> {
ReactiveTransaction transaction = (ReactiveTransaction) status;
// Query transaction status
boolean isNew = transaction.isNewTransaction();
boolean isReadOnly = transaction.isReadOnly();
return orderRepository.save(order)
.flatMap(saved -> {
if (!isValidOrder(saved)) {
// Mark for rollback
transaction.setRollbackOnly();
return Mono.error(new InvalidOrderException());
}
return Mono.just(saved);
});
}).single();
}
}Callback interface for reactive transactional code.
/**
* Callback interface for reactive transactional code.
* Used with TransactionalOperator.execute().
*
* @param <T> the result type
*/
@FunctionalInterface
public interface TransactionCallback<T> {
/**
* Execute within a reactive transaction.
* Return a Publisher (Mono or Flux) for the operations to perform.
*
* @param status the reactive transaction status
* @return a Publisher with the result
*/
Publisher<T> doInTransaction(ReactiveTransaction status);
}Usage Example:
@Service
public class ReactiveCallbackService {
private final TransactionalOperator operator;
public Flux<Order> processOrders(List<OrderRequest> requests) {
return operator.execute(status -> {
return Flux.fromIterable(requests)
.flatMap(request -> {
Order order = new Order(request);
return orderRepository.save(order)
.flatMap(saved -> inventoryService.reserve(saved)
.thenReturn(saved));
});
});
}
}Abstract base class for reactive transaction manager implementations.
/**
* Abstract base class for ReactiveTransactionManager implementations.
* Provides the template method pattern for reactive transaction management.
*/
public abstract class AbstractReactiveTransactionManager
implements ReactiveTransactionManager, ConfigurableTransactionManager {
/**
* Set the default timeout in seconds.
*/
public void setDefaultTimeout(int defaultTimeout);
/**
* Set whether nested transactions are allowed.
*/
public void setNestedTransactionAllowed(boolean nestedTransactionAllowed);
/**
* Set whether existing transactions should be validated before participating.
*/
public void setValidateExistingTransaction(boolean validateExistingTransaction);
/**
* Set whether to globally mark an existing transaction as rollback-only
* after a participating transaction failed.
*/
public void setGlobalRollbackOnParticipationFailure(
boolean globalRollbackOnParticipationFailure
);
/**
* Set whether to fail early on commit if the transaction has been
* marked rollback-only globally.
*/
public void setFailEarlyOnGlobalRollbackOnly(boolean failEarlyOnGlobalRollbackOnly);
// Template methods to be implemented by subclasses
/**
* Return the current transaction object.
*/
protected abstract Object doGetTransaction(
TransactionSynchronizationManager synchronizationManager
) throws TransactionException;
/**
* Check if the given transaction object indicates an existing transaction.
*/
protected abstract boolean isExistingTransaction(Object transaction)
throws TransactionException;
/**
* Begin a new transaction with the given definition.
*/
protected abstract Mono<Void> doBegin(
TransactionSynchronizationManager synchronizationManager,
Object transaction,
TransactionDefinition definition
) throws TransactionException;
/**
* Perform an actual commit of the given transaction.
*/
protected abstract Mono<Void> doCommit(
TransactionSynchronizationManager synchronizationManager,
GenericReactiveTransaction status
) throws TransactionException;
/**
* Perform an actual rollback of the given transaction.
*/
protected abstract Mono<Void> doRollback(
TransactionSynchronizationManager synchronizationManager,
GenericReactiveTransaction status
) throws TransactionException;
/**
* Suspend the resources of the current transaction (optional).
*/
protected Mono<Object> doSuspend(
TransactionSynchronizationManager synchronizationManager,
Object transaction
) throws TransactionException;
/**
* Resume the resources of the current transaction (optional).
*/
protected Mono<Void> doResume(
TransactionSynchronizationManager synchronizationManager,
Object transaction,
Object suspendedResources
) throws TransactionException;
/**
* Set the given transaction rollback-only (optional).
*/
protected Mono<Void> doSetRollbackOnly(
TransactionSynchronizationManager synchronizationManager,
GenericReactiveTransaction status
) throws TransactionException;
/**
* Clean up after completion (optional).
*/
protected Mono<Void> doCleanupAfterCompletion(
TransactionSynchronizationManager synchronizationManager,
Object transaction
);
}Usage Example (Custom Reactive Transaction Manager):
public class CustomReactiveTransactionManager extends AbstractReactiveTransactionManager {
private final ReactiveResourceManager resourceManager;
public CustomReactiveTransactionManager(ReactiveResourceManager resourceManager) {
this.resourceManager = resourceManager;
setNestedTransactionAllowed(true);
}
@Override
protected Object doGetTransaction(TransactionSynchronizationManager syncManager) {
CustomReactiveTransactionObject txObject = new CustomReactiveTransactionObject();
txObject.setResourceHolder(resourceManager.getResourceHolder());
return txObject;
}
@Override
protected boolean isExistingTransaction(Object transaction) {
CustomReactiveTransactionObject txObject =
(CustomReactiveTransactionObject) transaction;
return txObject.hasTransaction();
}
@Override
protected Mono<Void> doBegin(
TransactionSynchronizationManager syncManager,
Object transaction,
TransactionDefinition definition) {
CustomReactiveTransactionObject txObject =
(CustomReactiveTransactionObject) transaction;
return resourceManager.beginTransaction(
definition.getIsolationLevel(),
definition.getTimeout()
).doOnSuccess(v -> txObject.setNewTransaction(true));
}
@Override
protected Mono<Void> doCommit(
TransactionSynchronizationManager syncManager,
GenericReactiveTransaction status) {
return resourceManager.commit();
}
@Override
protected Mono<Void> doRollback(
TransactionSynchronizationManager syncManager,
GenericReactiveTransaction status) {
return resourceManager.rollback();
}
@Override
protected Mono<Void> doCleanupAfterCompletion(
TransactionSynchronizationManager syncManager,
Object transaction) {
return resourceManager.releaseConnection();
}
}Context management for reactive transactions propagated through Reactor's Context.
/**
* Context holder for reactive transactions.
* Provides access to current transaction state in reactive pipelines.
*/
public class TransactionContext {
/**
* Return the transaction name.
*/
public String getName();
/**
* Return whether the transaction is read-only.
*/
public boolean isReadOnly();
/**
* Return the isolation level.
*/
public Integer getIsolationLevel();
/**
* Return whether a transaction is currently active.
*/
public boolean isActualTransactionActive();
}
/**
* Manager for reactive transaction contexts.
* Provides static methods for accessing transaction context in reactive streams.
*/
public abstract class TransactionContextManager {
/**
* Obtain the current TransactionContext from the Reactor context.
*/
public static Mono<TransactionContext> currentContext();
/**
* Create or retrieve the transaction context in the Reactor Context.
*/
public static Function<Context, Context> getOrCreateContext();
/**
* Create or retrieve the transaction context holder in the Reactor Context.
*/
public static Function<Context, Context> getOrCreateContextHolder();
}Usage Example:
@Service
public class TransactionContextService {
public Mono<String> getCurrentTransactionName() {
return TransactionContextManager.currentContext()
.map(TransactionContext::getName)
.defaultIfEmpty("No transaction");
}
public Mono<Order> processWithContext() {
return orderRepository.save(new Order())
.flatMap(order -> {
return TransactionContextManager.currentContext()
.doOnNext(ctx -> {
logger.info("Transaction name: {}", ctx.getName());
logger.info("Read-only: {}", ctx.isReadOnly());
logger.info("Isolation: {}", ctx.getIsolationLevel());
})
.thenReturn(order);
});
}
}Callback interface for reactive transaction lifecycle events.
/**
* Reactive transaction synchronization callbacks.
* Allows reactive operations during transaction lifecycle.
*/
public interface TransactionSynchronization {
/**
* Suspend this synchronization.
*/
default Mono<Void> suspend();
/**
* Resume this synchronization.
*/
default Mono<Void> resume();
/**
* Invoked before transaction commit (within transaction boundaries).
*/
default Mono<Void> beforeCommit(boolean readOnly);
/**
* Invoked before transaction completion (commit or rollback).
*/
default Mono<Void> beforeCompletion();
/**
* Invoked after transaction commit.
*/
default Mono<Void> afterCommit();
/**
* Invoked after transaction completion (commit or rollback).
*/
default Mono<Void> afterCompletion(int status);
// Status constants
int STATUS_COMMITTED = 0;
int STATUS_ROLLED_BACK = 1;
int STATUS_UNKNOWN = 2;
}Usage Example:
public class MetricsReactiveSynchronization implements TransactionSynchronization {
private final MeterRegistry registry;
@Override
public Mono<Void> beforeCommit(boolean readOnly) {
return Mono.fromRunnable(() ->
logger.info("About to commit transaction, readOnly: {}", readOnly)
);
}
@Override
public Mono<Void> afterCommit() {
return Mono.fromRunnable(() -> {
registry.counter("reactive.transactions.committed").increment();
});
}
@Override
public Mono<Void> afterCompletion(int status) {
return Mono.fromRunnable(() -> {
if (status == STATUS_ROLLED_BACK) {
registry.counter("reactive.transactions.rolledback").increment();
}
});
}
}@Service
public class ReactiveUserService {
private final TransactionalOperator operator;
private final ReactiveUserRepository userRepository;
public ReactiveUserService(
ReactiveTransactionManager transactionManager,
ReactiveUserRepository userRepository) {
this.operator = TransactionalOperator.create(transactionManager);
this.userRepository = userRepository;
}
public Mono<User> createUser(User user) {
return userRepository.save(user)
.as(operator::transactional);
}
public Flux<User> createUsers(List<User> users) {
return Flux.fromIterable(users)
.flatMap(userRepository::save)
.as(operator::transactional);
}
}@Service
public class ReactiveOrderService {
private final TransactionalOperator operator;
public Mono<OrderResult> processOrder(OrderRequest request) {
return operator.execute(status -> {
// Create order
return orderRepository.save(new Order(request))
// Reserve inventory
.flatMap(order -> inventoryService.reserve(order.getItems())
.thenReturn(order))
// Process payment
.flatMap(order -> paymentService.charge(order.getTotal())
.thenReturn(order))
// Send notifications
.flatMap(order -> notificationService.sendConfirmation(order)
.thenReturn(new OrderResult(order)))
// Handle errors
.onErrorResume(error -> {
status.setRollbackOnly();
return Mono.error(new OrderProcessingException(error));
});
}).single();
}
}@Service
public class CustomReactiveTransactionService {
private final ReactiveTransactionManager transactionManager;
public Mono<Order> criticalOperation(Order order) {
// Create custom transaction definition
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
def.setIsolationLevel(TransactionDefinition.ISOLATION_SERIALIZABLE);
def.setTimeout(30);
def.setReadOnly(false);
TransactionalOperator operator =
TransactionalOperator.create(transactionManager, def);
return orderRepository.save(order)
.as(operator::transactional);
}
}@Service
public class HybridService {
private final TransactionalOperator reactiveOperator;
private final TransactionTemplate blockingTemplate;
// Process with reactive transaction
public Mono<Order> processReactive(Order order) {
return orderRepository.save(order)
.as(reactiveOperator::transactional);
}
// Process with blocking transaction
public Order processBlocking(Order order) {
return blockingTemplate.execute(status -> {
return blockingOrderRepository.save(order);
});
}
}TransactionalOperator is the reactive equivalent of TransactionTemplate.as(operator::transactional) for cleaner syntax with method referencessingle() or singleOrEmpty() when using execute() which returns a Fluxstatus.setRollbackOnly()