CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-springframework-cloud--spring-cloud-commons

Spring Cloud Commons provides foundational abstractions and utilities for service discovery, load balancing, circuit breakers, and cloud-native application development.

Pending
Overview
Eval results
Files

reactive-support.mddocs/

Reactive Support

Reactive programming support provides enhanced reactive utilities and operators specifically designed for cloud-native applications. This includes custom Flux operators and reactive versions of all core Spring Cloud Commons components.

Capabilities

Cloud Flux

Enhanced Flux with additional operators for cloud scenarios.

/**
 * Extended Flux with additional operators for cloud scenarios
 */
public abstract class CloudFlux<T> extends Flux<T> {
    /**
     * Returns the first Publisher to emit any signal (onNext, onComplete, onError) from the provided Publishers.
     * @param publishers The Publishers to compete
     * @return A Flux that emits from the first non-empty Publisher
     */
    public static <I> Flux<I> firstNonEmpty(Publisher<? extends I>... publishers);
    
    /**
     * Returns the first Publisher to emit any signal (onNext, onComplete, onError) from the provided Publishers.
     * @param publishers An Iterable of Publishers to compete
     * @return A Flux that emits from the first non-empty Publisher
     */
    public static <I> Flux<I> firstNonEmpty(Iterable<? extends Publisher<? extends I>> publishers);
}

Usage Examples:

@Service
public class ReactiveServiceDiscovery {
    
    @Autowired
    private List<ReactiveDiscoveryClient> discoveryClients;
    
    public Flux<ServiceInstance> findInstances(String serviceId) {
        // Create publishers for each discovery client
        List<Publisher<ServiceInstance>> publishers = discoveryClients.stream()
            .map(client -> client.getInstances(serviceId))
            .collect(Collectors.toList());
        
        // Return first non-empty result
        return CloudFlux.firstNonEmpty(publishers);
    }
    
    public Flux<String> findServices() {
        // Get services from multiple discovery clients, return first non-empty
        Publisher<String>[] publishers = discoveryClients.stream()
            .map(ReactiveDiscoveryClient::getServices)
            .toArray(Publisher[]::new);
        
        return CloudFlux.firstNonEmpty(publishers);
    }
}

First Non-Empty Implementation

Internal implementation class for first non-empty emissions.

/**
 * Implementation class for first non-empty emissions
 * Note: This is an internal implementation class and should not be used directly
 */
public class FluxFirstNonEmptyEmitting<T> extends Flux<T> {
    // Internal implementation details
    // This class should not be used directly in application code
}

Reactive Discovery

All discovery components have reactive counterparts that integrate seamlessly with the reactive ecosystem.

Reactive Discovery Client

/**
 * Reactive interface for service discovery operations
 */
public interface ReactiveDiscoveryClient {
    String description();
    Flux<ServiceInstance> getInstances(String serviceId);
    Flux<String> getServices();
    default Mono<Void> reactiveProbe() { return Mono.empty(); }
}

Reactive Composite Discovery Client

/**
 * Reactive composite discovery client combining multiple discovery clients
 */
public class ReactiveCompositeDiscoveryClient implements ReactiveDiscoveryClient {
    public ReactiveCompositeDiscoveryClient(List<ReactiveDiscoveryClient> discoveryClients);
    
    @Override
    public Flux<ServiceInstance> getInstances(String serviceId) {
        // Combines results from all discovery clients reactively
    }
    
    @Override
    public Flux<String> getServices() {
        // Combines services from all discovery clients reactively
    }
}

Reactive Load Balancing

Reactive load balancing provides non-blocking service instance selection and WebClient integration.

Reactive Load Balancer

/**
 * Reactive load balancer interface
 */
public interface ReactiveLoadBalancer<T> {
    Mono<Response<T>> choose(Request request);
    default Mono<Response<T>> choose() { return choose(REQUEST); }
    
    Request REQUEST = new DefaultRequest<>();
    
    interface Factory<T> {
        ReactiveLoadBalancer<T> getInstance(String serviceId);
    }
}

WebClient Exchange Filter Functions

/**
 * Reactor-based WebClient exchange filter function for load balancing
 */
public class ReactorLoadBalancerExchangeFilterFunction implements ExchangeFilterFunction {
    public ReactorLoadBalancerExchangeFilterFunction(ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory);
    
    @Override
    public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next);
}

/**
 * Retryable WebClient exchange filter function
 */
public class RetryableLoadBalancerExchangeFilterFunction implements ExchangeFilterFunction {
    public RetryableLoadBalancerExchangeFilterFunction(RetrySpec retrySpec,
                                                      ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory);
    
    @Override  
    public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next);
}

Reactive Circuit Breakers

Full reactive support for circuit breaker patterns with Mono and Flux.

Reactive Circuit Breaker Factory

/**
 * Abstract factory for reactive circuit breakers
 */
public abstract class ReactiveCircuitBreakerFactory<CONF, CONFB> {
    public abstract ReactiveCircuitBreaker create(String id);
    public ReactiveCircuitBreaker create(String id, String groupName) { return create(id); }
    public abstract void configure(Consumer<CONFB> consumer);
    public abstract void configure(String id, Consumer<CONFB> consumer);
    public void configureGroup(String groupName, Consumer<CONFB> consumer) {}
}

Configuration and Customization

WebClient Customization

/**
 * Interface for customizing WebClient instances
 */
public interface WebClientCustomizer {
    /**
     * Customize the WebClient.Builder
     * @param webClientBuilder The WebClient.Builder to customize
     */
    void customize(WebClient.Builder webClientBuilder);
}

Exchange Filter Function Utilities

/**
 * Utility functions for exchange filter functions
 */
public class ExchangeFilterFunctionUtils {
    /**
     * Create a load balanced exchange filter function
     * @param loadBalancerFactory The load balancer factory
     * @return An exchange filter function
     */
    public static ExchangeFilterFunction loadBalancerFilter(
        ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory);
    
    /**
     * Create a retryable load balanced exchange filter function
     * @param retrySpec The retry specification
     * @param loadBalancerFactory The load balancer factory
     * @return A retryable exchange filter function
     */
    public static ExchangeFilterFunction retryableLoadBalancerFilter(
        RetrySpec retrySpec,
        ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory);
}

Usage Examples

Reactive Service Discovery with Fallback:

@Service
public class ReactiveUserService {
    
    @Autowired
    private ReactiveDiscoveryClient discoveryClient;
    
    @Autowired
    private WebClient.Builder webClientBuilder;
    
    public Mono<User> getUser(String userId) {
        return discoveryClient.getInstances("user-service")
            .next() // Get first available instance
            .switchIfEmpty(Mono.error(new ServiceUnavailableException("No user-service instances available")))
            .flatMap(instance -> {
                WebClient webClient = webClientBuilder
                    .baseUrl(instance.getUri().toString())
                    .build();
                
                return webClient.get()
                    .uri("/users/{id}", userId)
                    .retrieve()
                    .bodyToMono(User.class);
            })
            .onErrorResume(throwable -> {
                log.warn("Failed to get user, using fallback", throwable);
                return Mono.just(User.defaultUser(userId));
            });
    }
    
    public Flux<User> getAllUsers() {
        return discoveryClient.getInstances("user-service")
            .collectList()
            .flatMapMany(instances -> {
                if (instances.isEmpty()) {
                    return Flux.error(new ServiceUnavailableException("No user-service instances"));
                }
                
                // Load balance across instances
                List<Publisher<User>> publishers = instances.stream()
                    .map(instance -> {
                        WebClient webClient = webClientBuilder
                            .baseUrl(instance.getUri().toString())
                            .build();
                        
                        return webClient.get()
                            .uri("/users")
                            .retrieve()
                            .bodyToFlux(User.class);
                    })
                    .collect(Collectors.toList());
                
                // Use CloudFlux to get first non-empty result
                return CloudFlux.firstNonEmpty(publishers);
            });
    }
}

Reactive Load Balancing with WebClient:

@Configuration
public class ReactiveWebClientConfig {
    
    @Bean
    @LoadBalanced
    public WebClient.Builder loadBalancedWebClientBuilder() {
        return WebClient.builder();
    }
    
    @Bean
    public WebClient loadBalancedWebClient(@LoadBalanced WebClient.Builder builder) {
        return builder.build();
    }
}

@Service
public class ReactiveOrderService {
    
    private final WebClient webClient;
    
    public ReactiveOrderService(@LoadBalanced WebClient.Builder webClientBuilder) {
        this.webClient = webClientBuilder.build();
    }
    
    public Mono<Order> getOrder(String orderId) {
        return webClient.get()
            .uri("http://order-service/orders/{id}", orderId)
            .retrieve()
            .bodyToMono(Order.class)
            .timeout(Duration.ofSeconds(5))
            .retry(3)
            .onErrorResume(throwable -> {
                log.error("Failed to get order: " + orderId, throwable);
                return Mono.just(Order.fallbackOrder(orderId));
            });
    }
    
    public Flux<Order> getOrdersStream() {
        return webClient.get()
            .uri("http://order-service/orders/stream")
            .retrieve()
            .bodyToFlux(Order.class)
            .delayElements(Duration.ofMillis(100))
            .onBackpressureBuffer(1000)
            .onErrorContinue((throwable, obj) -> {
                log.warn("Error processing order: " + obj, throwable);
            });
    }
}

Reactive Circuit Breaker with Custom Configuration:

@Configuration
public class ReactiveCircuitBreakerConfig {
    
    @Bean
    public Customizer<ReactiveResilience4JCircuitBreakerFactory> defaultCustomizer() {
        return factory -> factory.configureDefault(id -> new Resilience4JConfigBuilder(id)
            .circuitBreakerConfig(CircuitBreakerConfig.custom()
                .slidingWindowSize(20)
                .permittedNumberOfCallsInHalfOpenState(5)
                .failureRateThreshold(50.0f)
                .waitDurationInOpenState(Duration.ofSeconds(30))
                .recordExceptions(IOException.class, TimeoutException.class)
                .build())
            .timeLimiterConfig(TimeLimiterConfig.custom()
                .timeoutDuration(Duration.ofSeconds(3))
                .build())
            .build());
    }
}

@Service
public class ReactivePaymentService {
    
    @Autowired
    private ReactiveCircuitBreakerFactory circuitBreakerFactory;
    
    private ReactiveCircuitBreaker circuitBreaker;
    private WebClient webClient;
    
    @PostConstruct
    public void init() {
        this.circuitBreaker = circuitBreakerFactory.create("payment-service");
        this.webClient = WebClient.create("http://payment-service");
    }
    
    public Mono<PaymentResult> processPayment(PaymentRequest request) {
        return circuitBreaker.run(
            webClient.post()
                .uri("/payments")
                .bodyValue(request)
                .retrieve()
                .bodyToMono(PaymentResult.class),
            throwable -> Mono.just(PaymentResult.failed("Circuit breaker fallback"))
        );
    }
    
    public Flux<PaymentStatus> getPaymentStatusStream(String paymentId) {
        return circuitBreaker.run(
            webClient.get()
                .uri("/payments/{id}/status/stream", paymentId)
                .retrieve()
                .bodyToFlux(PaymentStatus.class),
            throwable -> Flux.just(PaymentStatus.unknown(paymentId))
        );
    }
}

Custom Reactive Discovery with CloudFlux:

@Component
public class MultiRegionDiscoveryClient implements ReactiveDiscoveryClient {
    
    private final List<ReactiveDiscoveryClient> regionalClients;
    
    public MultiRegionDiscoveryClient(List<ReactiveDiscoveryClient> regionalClients) {
        this.regionalClients = regionalClients;
    }
    
    @Override
    public String description() {
        return "Multi-region discovery client";
    }
    
    @Override
    public Flux<ServiceInstance> getInstances(String serviceId) {
        // Get instances from all regions, return first non-empty
        List<Publisher<ServiceInstance>> publishers = regionalClients.stream()
            .map(client -> client.getInstances(serviceId))
            .collect(Collectors.toList());
        
        return CloudFlux.firstNonEmpty(publishers)
            .switchIfEmpty(Flux.defer(() -> {
                log.warn("No instances found in any region for service: {}", serviceId);
                return Flux.empty();
            }));
    }
    
    @Override
    public Flux<String> getServices() {
        // Combine services from all regions
        return Flux.fromIterable(regionalClients)
            .flatMap(ReactiveDiscoveryClient::getServices)
            .distinct()
            .sort();
    }
}

Auto-Configuration

/**
 * Auto-configuration for reactive client features
 */
@Configuration
@ConditionalOnClass({Flux.class, Mono.class})
public class ReactiveCommonsClientAutoConfiguration {
    
    @Bean
    @ConditionalOnMissingBean
    public ReactiveDiscoveryClient reactiveDiscoveryClient(List<ReactiveDiscoveryClient> discoveryClients) {
        return new ReactiveCompositeDiscoveryClient(discoveryClients);
    }
    
    @Bean
    @ConditionalOnMissingBean
    @ConditionalOnBean(ReactiveLoadBalancer.Factory.class)
    public ReactorLoadBalancerExchangeFilterFunction loadBalancerExchangeFilterFunction(
        ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory) {
        return new ReactorLoadBalancerExchangeFilterFunction(loadBalancerFactory);
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-springframework-cloud--spring-cloud-commons

docs

circuit-breakers.md

http-clients.md

index.md

load-balancing.md

reactive-support.md

service-discovery.md

service-registration.md

tile.json