Spring Cloud Commons provides foundational abstractions and utilities for service discovery, load balancing, circuit breakers, and cloud-native application development.
—
Reactive programming support provides enhanced reactive utilities and operators specifically designed for cloud-native applications. This includes custom Flux operators and reactive versions of all core Spring Cloud Commons components.
Enhanced Flux with additional operators for cloud scenarios.
/**
* Extended Flux with additional operators for cloud scenarios
*/
public abstract class CloudFlux<T> extends Flux<T> {
/**
* Returns the first Publisher to emit any signal (onNext, onComplete, onError) from the provided Publishers.
* @param publishers The Publishers to compete
* @return A Flux that emits from the first non-empty Publisher
*/
public static <I> Flux<I> firstNonEmpty(Publisher<? extends I>... publishers);
/**
* Returns the first Publisher to emit any signal (onNext, onComplete, onError) from the provided Publishers.
* @param publishers An Iterable of Publishers to compete
* @return A Flux that emits from the first non-empty Publisher
*/
public static <I> Flux<I> firstNonEmpty(Iterable<? extends Publisher<? extends I>> publishers);
}Usage Examples:
@Service
public class ReactiveServiceDiscovery {
@Autowired
private List<ReactiveDiscoveryClient> discoveryClients;
public Flux<ServiceInstance> findInstances(String serviceId) {
// Create publishers for each discovery client
List<Publisher<ServiceInstance>> publishers = discoveryClients.stream()
.map(client -> client.getInstances(serviceId))
.collect(Collectors.toList());
// Return first non-empty result
return CloudFlux.firstNonEmpty(publishers);
}
public Flux<String> findServices() {
// Get services from multiple discovery clients, return first non-empty
Publisher<String>[] publishers = discoveryClients.stream()
.map(ReactiveDiscoveryClient::getServices)
.toArray(Publisher[]::new);
return CloudFlux.firstNonEmpty(publishers);
}
}Internal implementation class for first non-empty emissions.
/**
* Implementation class for first non-empty emissions
* Note: This is an internal implementation class and should not be used directly
*/
public class FluxFirstNonEmptyEmitting<T> extends Flux<T> {
// Internal implementation details
// This class should not be used directly in application code
}All discovery components have reactive counterparts that integrate seamlessly with the reactive ecosystem.
/**
* Reactive interface for service discovery operations
*/
public interface ReactiveDiscoveryClient {
String description();
Flux<ServiceInstance> getInstances(String serviceId);
Flux<String> getServices();
default Mono<Void> reactiveProbe() { return Mono.empty(); }
}/**
* Reactive composite discovery client combining multiple discovery clients
*/
public class ReactiveCompositeDiscoveryClient implements ReactiveDiscoveryClient {
public ReactiveCompositeDiscoveryClient(List<ReactiveDiscoveryClient> discoveryClients);
@Override
public Flux<ServiceInstance> getInstances(String serviceId) {
// Combines results from all discovery clients reactively
}
@Override
public Flux<String> getServices() {
// Combines services from all discovery clients reactively
}
}Reactive load balancing provides non-blocking service instance selection and WebClient integration.
/**
* Reactive load balancer interface
*/
public interface ReactiveLoadBalancer<T> {
Mono<Response<T>> choose(Request request);
default Mono<Response<T>> choose() { return choose(REQUEST); }
Request REQUEST = new DefaultRequest<>();
interface Factory<T> {
ReactiveLoadBalancer<T> getInstance(String serviceId);
}
}/**
* Reactor-based WebClient exchange filter function for load balancing
*/
public class ReactorLoadBalancerExchangeFilterFunction implements ExchangeFilterFunction {
public ReactorLoadBalancerExchangeFilterFunction(ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory);
@Override
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next);
}
/**
* Retryable WebClient exchange filter function
*/
public class RetryableLoadBalancerExchangeFilterFunction implements ExchangeFilterFunction {
public RetryableLoadBalancerExchangeFilterFunction(RetrySpec retrySpec,
ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory);
@Override
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next);
}Full reactive support for circuit breaker patterns with Mono and Flux.
/**
* Abstract factory for reactive circuit breakers
*/
public abstract class ReactiveCircuitBreakerFactory<CONF, CONFB> {
public abstract ReactiveCircuitBreaker create(String id);
public ReactiveCircuitBreaker create(String id, String groupName) { return create(id); }
public abstract void configure(Consumer<CONFB> consumer);
public abstract void configure(String id, Consumer<CONFB> consumer);
public void configureGroup(String groupName, Consumer<CONFB> consumer) {}
}/**
* Interface for customizing WebClient instances
*/
public interface WebClientCustomizer {
/**
* Customize the WebClient.Builder
* @param webClientBuilder The WebClient.Builder to customize
*/
void customize(WebClient.Builder webClientBuilder);
}/**
* Utility functions for exchange filter functions
*/
public class ExchangeFilterFunctionUtils {
/**
* Create a load balanced exchange filter function
* @param loadBalancerFactory The load balancer factory
* @return An exchange filter function
*/
public static ExchangeFilterFunction loadBalancerFilter(
ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory);
/**
* Create a retryable load balanced exchange filter function
* @param retrySpec The retry specification
* @param loadBalancerFactory The load balancer factory
* @return A retryable exchange filter function
*/
public static ExchangeFilterFunction retryableLoadBalancerFilter(
RetrySpec retrySpec,
ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory);
}Reactive Service Discovery with Fallback:
@Service
public class ReactiveUserService {
@Autowired
private ReactiveDiscoveryClient discoveryClient;
@Autowired
private WebClient.Builder webClientBuilder;
public Mono<User> getUser(String userId) {
return discoveryClient.getInstances("user-service")
.next() // Get first available instance
.switchIfEmpty(Mono.error(new ServiceUnavailableException("No user-service instances available")))
.flatMap(instance -> {
WebClient webClient = webClientBuilder
.baseUrl(instance.getUri().toString())
.build();
return webClient.get()
.uri("/users/{id}", userId)
.retrieve()
.bodyToMono(User.class);
})
.onErrorResume(throwable -> {
log.warn("Failed to get user, using fallback", throwable);
return Mono.just(User.defaultUser(userId));
});
}
public Flux<User> getAllUsers() {
return discoveryClient.getInstances("user-service")
.collectList()
.flatMapMany(instances -> {
if (instances.isEmpty()) {
return Flux.error(new ServiceUnavailableException("No user-service instances"));
}
// Load balance across instances
List<Publisher<User>> publishers = instances.stream()
.map(instance -> {
WebClient webClient = webClientBuilder
.baseUrl(instance.getUri().toString())
.build();
return webClient.get()
.uri("/users")
.retrieve()
.bodyToFlux(User.class);
})
.collect(Collectors.toList());
// Use CloudFlux to get first non-empty result
return CloudFlux.firstNonEmpty(publishers);
});
}
}Reactive Load Balancing with WebClient:
@Configuration
public class ReactiveWebClientConfig {
@Bean
@LoadBalanced
public WebClient.Builder loadBalancedWebClientBuilder() {
return WebClient.builder();
}
@Bean
public WebClient loadBalancedWebClient(@LoadBalanced WebClient.Builder builder) {
return builder.build();
}
}
@Service
public class ReactiveOrderService {
private final WebClient webClient;
public ReactiveOrderService(@LoadBalanced WebClient.Builder webClientBuilder) {
this.webClient = webClientBuilder.build();
}
public Mono<Order> getOrder(String orderId) {
return webClient.get()
.uri("http://order-service/orders/{id}", orderId)
.retrieve()
.bodyToMono(Order.class)
.timeout(Duration.ofSeconds(5))
.retry(3)
.onErrorResume(throwable -> {
log.error("Failed to get order: " + orderId, throwable);
return Mono.just(Order.fallbackOrder(orderId));
});
}
public Flux<Order> getOrdersStream() {
return webClient.get()
.uri("http://order-service/orders/stream")
.retrieve()
.bodyToFlux(Order.class)
.delayElements(Duration.ofMillis(100))
.onBackpressureBuffer(1000)
.onErrorContinue((throwable, obj) -> {
log.warn("Error processing order: " + obj, throwable);
});
}
}Reactive Circuit Breaker with Custom Configuration:
@Configuration
public class ReactiveCircuitBreakerConfig {
@Bean
public Customizer<ReactiveResilience4JCircuitBreakerFactory> defaultCustomizer() {
return factory -> factory.configureDefault(id -> new Resilience4JConfigBuilder(id)
.circuitBreakerConfig(CircuitBreakerConfig.custom()
.slidingWindowSize(20)
.permittedNumberOfCallsInHalfOpenState(5)
.failureRateThreshold(50.0f)
.waitDurationInOpenState(Duration.ofSeconds(30))
.recordExceptions(IOException.class, TimeoutException.class)
.build())
.timeLimiterConfig(TimeLimiterConfig.custom()
.timeoutDuration(Duration.ofSeconds(3))
.build())
.build());
}
}
@Service
public class ReactivePaymentService {
@Autowired
private ReactiveCircuitBreakerFactory circuitBreakerFactory;
private ReactiveCircuitBreaker circuitBreaker;
private WebClient webClient;
@PostConstruct
public void init() {
this.circuitBreaker = circuitBreakerFactory.create("payment-service");
this.webClient = WebClient.create("http://payment-service");
}
public Mono<PaymentResult> processPayment(PaymentRequest request) {
return circuitBreaker.run(
webClient.post()
.uri("/payments")
.bodyValue(request)
.retrieve()
.bodyToMono(PaymentResult.class),
throwable -> Mono.just(PaymentResult.failed("Circuit breaker fallback"))
);
}
public Flux<PaymentStatus> getPaymentStatusStream(String paymentId) {
return circuitBreaker.run(
webClient.get()
.uri("/payments/{id}/status/stream", paymentId)
.retrieve()
.bodyToFlux(PaymentStatus.class),
throwable -> Flux.just(PaymentStatus.unknown(paymentId))
);
}
}Custom Reactive Discovery with CloudFlux:
@Component
public class MultiRegionDiscoveryClient implements ReactiveDiscoveryClient {
private final List<ReactiveDiscoveryClient> regionalClients;
public MultiRegionDiscoveryClient(List<ReactiveDiscoveryClient> regionalClients) {
this.regionalClients = regionalClients;
}
@Override
public String description() {
return "Multi-region discovery client";
}
@Override
public Flux<ServiceInstance> getInstances(String serviceId) {
// Get instances from all regions, return first non-empty
List<Publisher<ServiceInstance>> publishers = regionalClients.stream()
.map(client -> client.getInstances(serviceId))
.collect(Collectors.toList());
return CloudFlux.firstNonEmpty(publishers)
.switchIfEmpty(Flux.defer(() -> {
log.warn("No instances found in any region for service: {}", serviceId);
return Flux.empty();
}));
}
@Override
public Flux<String> getServices() {
// Combine services from all regions
return Flux.fromIterable(regionalClients)
.flatMap(ReactiveDiscoveryClient::getServices)
.distinct()
.sort();
}
}/**
* Auto-configuration for reactive client features
*/
@Configuration
@ConditionalOnClass({Flux.class, Mono.class})
public class ReactiveCommonsClientAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public ReactiveDiscoveryClient reactiveDiscoveryClient(List<ReactiveDiscoveryClient> discoveryClients) {
return new ReactiveCompositeDiscoveryClient(discoveryClients);
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnBean(ReactiveLoadBalancer.Factory.class)
public ReactorLoadBalancerExchangeFilterFunction loadBalancerExchangeFilterFunction(
ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory) {
return new ReactorLoadBalancerExchangeFilterFunction(loadBalancerFactory);
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-springframework-cloud--spring-cloud-commons