0
# Reactive Support
1
2
Reactive programming support provides enhanced reactive utilities and operators specifically designed for cloud-native applications. This includes custom Flux operators and reactive versions of all core Spring Cloud Commons components.
3
4
## Capabilities
5
6
### Cloud Flux
7
8
Enhanced Flux with additional operators for cloud scenarios.
9
10
```java { .api }
11
/**
12
* Extended Flux with additional operators for cloud scenarios
13
*/
14
public abstract class CloudFlux<T> extends Flux<T> {
15
/**
16
* Returns the first Publisher to emit any signal (onNext, onComplete, onError) from the provided Publishers.
17
* @param publishers The Publishers to compete
18
* @return A Flux that emits from the first non-empty Publisher
19
*/
20
public static <I> Flux<I> firstNonEmpty(Publisher<? extends I>... publishers);
21
22
/**
23
* Returns the first Publisher to emit any signal (onNext, onComplete, onError) from the provided Publishers.
24
* @param publishers An Iterable of Publishers to compete
25
* @return A Flux that emits from the first non-empty Publisher
26
*/
27
public static <I> Flux<I> firstNonEmpty(Iterable<? extends Publisher<? extends I>> publishers);
28
}
29
```
30
31
**Usage Examples:**
32
33
```java
34
@Service
35
public class ReactiveServiceDiscovery {
36
37
@Autowired
38
private List<ReactiveDiscoveryClient> discoveryClients;
39
40
public Flux<ServiceInstance> findInstances(String serviceId) {
41
// Create publishers for each discovery client
42
List<Publisher<ServiceInstance>> publishers = discoveryClients.stream()
43
.map(client -> client.getInstances(serviceId))
44
.collect(Collectors.toList());
45
46
// Return first non-empty result
47
return CloudFlux.firstNonEmpty(publishers);
48
}
49
50
public Flux<String> findServices() {
51
// Get services from multiple discovery clients, return first non-empty
52
Publisher<String>[] publishers = discoveryClients.stream()
53
.map(ReactiveDiscoveryClient::getServices)
54
.toArray(Publisher[]::new);
55
56
return CloudFlux.firstNonEmpty(publishers);
57
}
58
}
59
```
60
61
### First Non-Empty Implementation
62
63
Internal implementation class for first non-empty emissions.
64
65
```java { .api }
66
/**
67
* Implementation class for first non-empty emissions
68
* Note: This is an internal implementation class and should not be used directly
69
*/
70
public class FluxFirstNonEmptyEmitting<T> extends Flux<T> {
71
// Internal implementation details
72
// This class should not be used directly in application code
73
}
74
```
75
76
## Reactive Discovery
77
78
All discovery components have reactive counterparts that integrate seamlessly with the reactive ecosystem.
79
80
### Reactive Discovery Client
81
82
```java { .api }
83
/**
84
* Reactive interface for service discovery operations
85
*/
86
public interface ReactiveDiscoveryClient {
87
String description();
88
Flux<ServiceInstance> getInstances(String serviceId);
89
Flux<String> getServices();
90
default Mono<Void> reactiveProbe() { return Mono.empty(); }
91
}
92
```
93
94
### Reactive Composite Discovery Client
95
96
```java { .api }
97
/**
98
* Reactive composite discovery client combining multiple discovery clients
99
*/
100
public class ReactiveCompositeDiscoveryClient implements ReactiveDiscoveryClient {
101
public ReactiveCompositeDiscoveryClient(List<ReactiveDiscoveryClient> discoveryClients);
102
103
@Override
104
public Flux<ServiceInstance> getInstances(String serviceId) {
105
// Combines results from all discovery clients reactively
106
}
107
108
@Override
109
public Flux<String> getServices() {
110
// Combines services from all discovery clients reactively
111
}
112
}
113
```
114
115
## Reactive Load Balancing
116
117
Reactive load balancing provides non-blocking service instance selection and WebClient integration.
118
119
### Reactive Load Balancer
120
121
```java { .api }
122
/**
123
* Reactive load balancer interface
124
*/
125
public interface ReactiveLoadBalancer<T> {
126
Mono<Response<T>> choose(Request request);
127
default Mono<Response<T>> choose() { return choose(REQUEST); }
128
129
Request REQUEST = new DefaultRequest<>();
130
131
interface Factory<T> {
132
ReactiveLoadBalancer<T> getInstance(String serviceId);
133
}
134
}
135
```
136
137
### WebClient Exchange Filter Functions
138
139
```java { .api }
140
/**
141
* Reactor-based WebClient exchange filter function for load balancing
142
*/
143
public class ReactorLoadBalancerExchangeFilterFunction implements ExchangeFilterFunction {
144
public ReactorLoadBalancerExchangeFilterFunction(ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory);
145
146
@Override
147
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next);
148
}
149
150
/**
151
* Retryable WebClient exchange filter function
152
*/
153
public class RetryableLoadBalancerExchangeFilterFunction implements ExchangeFilterFunction {
154
public RetryableLoadBalancerExchangeFilterFunction(RetrySpec retrySpec,
155
ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory);
156
157
@Override
158
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next);
159
}
160
```
161
162
## Reactive Circuit Breakers
163
164
Full reactive support for circuit breaker patterns with Mono and Flux.
165
166
### Reactive Circuit Breaker Factory
167
168
```java { .api }
169
/**
170
* Abstract factory for reactive circuit breakers
171
*/
172
public abstract class ReactiveCircuitBreakerFactory<CONF, CONFB> {
173
public abstract ReactiveCircuitBreaker create(String id);
174
public ReactiveCircuitBreaker create(String id, String groupName) { return create(id); }
175
public abstract void configure(Consumer<CONFB> consumer);
176
public abstract void configure(String id, Consumer<CONFB> consumer);
177
public void configureGroup(String groupName, Consumer<CONFB> consumer) {}
178
}
179
```
180
181
## Configuration and Customization
182
183
### WebClient Customization
184
185
```java { .api }
186
/**
187
* Interface for customizing WebClient instances
188
*/
189
public interface WebClientCustomizer {
190
/**
191
* Customize the WebClient.Builder
192
* @param webClientBuilder The WebClient.Builder to customize
193
*/
194
void customize(WebClient.Builder webClientBuilder);
195
}
196
```
197
198
### Exchange Filter Function Utilities
199
200
```java { .api }
201
/**
202
* Utility functions for exchange filter functions
203
*/
204
public class ExchangeFilterFunctionUtils {
205
/**
206
* Create a load balanced exchange filter function
207
* @param loadBalancerFactory The load balancer factory
208
* @return An exchange filter function
209
*/
210
public static ExchangeFilterFunction loadBalancerFilter(
211
ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory);
212
213
/**
214
* Create a retryable load balanced exchange filter function
215
* @param retrySpec The retry specification
216
* @param loadBalancerFactory The load balancer factory
217
* @return A retryable exchange filter function
218
*/
219
public static ExchangeFilterFunction retryableLoadBalancerFilter(
220
RetrySpec retrySpec,
221
ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory);
222
}
223
```
224
225
## Usage Examples
226
227
**Reactive Service Discovery with Fallback:**
228
229
```java
230
@Service
231
public class ReactiveUserService {
232
233
@Autowired
234
private ReactiveDiscoveryClient discoveryClient;
235
236
@Autowired
237
private WebClient.Builder webClientBuilder;
238
239
public Mono<User> getUser(String userId) {
240
return discoveryClient.getInstances("user-service")
241
.next() // Get first available instance
242
.switchIfEmpty(Mono.error(new ServiceUnavailableException("No user-service instances available")))
243
.flatMap(instance -> {
244
WebClient webClient = webClientBuilder
245
.baseUrl(instance.getUri().toString())
246
.build();
247
248
return webClient.get()
249
.uri("/users/{id}", userId)
250
.retrieve()
251
.bodyToMono(User.class);
252
})
253
.onErrorResume(throwable -> {
254
log.warn("Failed to get user, using fallback", throwable);
255
return Mono.just(User.defaultUser(userId));
256
});
257
}
258
259
public Flux<User> getAllUsers() {
260
return discoveryClient.getInstances("user-service")
261
.collectList()
262
.flatMapMany(instances -> {
263
if (instances.isEmpty()) {
264
return Flux.error(new ServiceUnavailableException("No user-service instances"));
265
}
266
267
// Load balance across instances
268
List<Publisher<User>> publishers = instances.stream()
269
.map(instance -> {
270
WebClient webClient = webClientBuilder
271
.baseUrl(instance.getUri().toString())
272
.build();
273
274
return webClient.get()
275
.uri("/users")
276
.retrieve()
277
.bodyToFlux(User.class);
278
})
279
.collect(Collectors.toList());
280
281
// Use CloudFlux to get first non-empty result
282
return CloudFlux.firstNonEmpty(publishers);
283
});
284
}
285
}
286
```
287
288
**Reactive Load Balancing with WebClient:**
289
290
```java
291
@Configuration
292
public class ReactiveWebClientConfig {
293
294
@Bean
295
@LoadBalanced
296
public WebClient.Builder loadBalancedWebClientBuilder() {
297
return WebClient.builder();
298
}
299
300
@Bean
301
public WebClient loadBalancedWebClient(@LoadBalanced WebClient.Builder builder) {
302
return builder.build();
303
}
304
}
305
306
@Service
307
public class ReactiveOrderService {
308
309
private final WebClient webClient;
310
311
public ReactiveOrderService(@LoadBalanced WebClient.Builder webClientBuilder) {
312
this.webClient = webClientBuilder.build();
313
}
314
315
public Mono<Order> getOrder(String orderId) {
316
return webClient.get()
317
.uri("http://order-service/orders/{id}", orderId)
318
.retrieve()
319
.bodyToMono(Order.class)
320
.timeout(Duration.ofSeconds(5))
321
.retry(3)
322
.onErrorResume(throwable -> {
323
log.error("Failed to get order: " + orderId, throwable);
324
return Mono.just(Order.fallbackOrder(orderId));
325
});
326
}
327
328
public Flux<Order> getOrdersStream() {
329
return webClient.get()
330
.uri("http://order-service/orders/stream")
331
.retrieve()
332
.bodyToFlux(Order.class)
333
.delayElements(Duration.ofMillis(100))
334
.onBackpressureBuffer(1000)
335
.onErrorContinue((throwable, obj) -> {
336
log.warn("Error processing order: " + obj, throwable);
337
});
338
}
339
}
340
```
341
342
**Reactive Circuit Breaker with Custom Configuration:**
343
344
```java
345
@Configuration
346
public class ReactiveCircuitBreakerConfig {
347
348
@Bean
349
public Customizer<ReactiveResilience4JCircuitBreakerFactory> defaultCustomizer() {
350
return factory -> factory.configureDefault(id -> new Resilience4JConfigBuilder(id)
351
.circuitBreakerConfig(CircuitBreakerConfig.custom()
352
.slidingWindowSize(20)
353
.permittedNumberOfCallsInHalfOpenState(5)
354
.failureRateThreshold(50.0f)
355
.waitDurationInOpenState(Duration.ofSeconds(30))
356
.recordExceptions(IOException.class, TimeoutException.class)
357
.build())
358
.timeLimiterConfig(TimeLimiterConfig.custom()
359
.timeoutDuration(Duration.ofSeconds(3))
360
.build())
361
.build());
362
}
363
}
364
365
@Service
366
public class ReactivePaymentService {
367
368
@Autowired
369
private ReactiveCircuitBreakerFactory circuitBreakerFactory;
370
371
private ReactiveCircuitBreaker circuitBreaker;
372
private WebClient webClient;
373
374
@PostConstruct
375
public void init() {
376
this.circuitBreaker = circuitBreakerFactory.create("payment-service");
377
this.webClient = WebClient.create("http://payment-service");
378
}
379
380
public Mono<PaymentResult> processPayment(PaymentRequest request) {
381
return circuitBreaker.run(
382
webClient.post()
383
.uri("/payments")
384
.bodyValue(request)
385
.retrieve()
386
.bodyToMono(PaymentResult.class),
387
throwable -> Mono.just(PaymentResult.failed("Circuit breaker fallback"))
388
);
389
}
390
391
public Flux<PaymentStatus> getPaymentStatusStream(String paymentId) {
392
return circuitBreaker.run(
393
webClient.get()
394
.uri("/payments/{id}/status/stream", paymentId)
395
.retrieve()
396
.bodyToFlux(PaymentStatus.class),
397
throwable -> Flux.just(PaymentStatus.unknown(paymentId))
398
);
399
}
400
}
401
```
402
403
**Custom Reactive Discovery with CloudFlux:**
404
405
```java
406
@Component
407
public class MultiRegionDiscoveryClient implements ReactiveDiscoveryClient {
408
409
private final List<ReactiveDiscoveryClient> regionalClients;
410
411
public MultiRegionDiscoveryClient(List<ReactiveDiscoveryClient> regionalClients) {
412
this.regionalClients = regionalClients;
413
}
414
415
@Override
416
public String description() {
417
return "Multi-region discovery client";
418
}
419
420
@Override
421
public Flux<ServiceInstance> getInstances(String serviceId) {
422
// Get instances from all regions, return first non-empty
423
List<Publisher<ServiceInstance>> publishers = regionalClients.stream()
424
.map(client -> client.getInstances(serviceId))
425
.collect(Collectors.toList());
426
427
return CloudFlux.firstNonEmpty(publishers)
428
.switchIfEmpty(Flux.defer(() -> {
429
log.warn("No instances found in any region for service: {}", serviceId);
430
return Flux.empty();
431
}));
432
}
433
434
@Override
435
public Flux<String> getServices() {
436
// Combine services from all regions
437
return Flux.fromIterable(regionalClients)
438
.flatMap(ReactiveDiscoveryClient::getServices)
439
.distinct()
440
.sort();
441
}
442
}
443
```
444
445
## Auto-Configuration
446
447
```java { .api }
448
/**
449
* Auto-configuration for reactive client features
450
*/
451
@Configuration
452
@ConditionalOnClass({Flux.class, Mono.class})
453
public class ReactiveCommonsClientAutoConfiguration {
454
455
@Bean
456
@ConditionalOnMissingBean
457
public ReactiveDiscoveryClient reactiveDiscoveryClient(List<ReactiveDiscoveryClient> discoveryClients) {
458
return new ReactiveCompositeDiscoveryClient(discoveryClients);
459
}
460
461
@Bean
462
@ConditionalOnMissingBean
463
@ConditionalOnBean(ReactiveLoadBalancer.Factory.class)
464
public ReactorLoadBalancerExchangeFilterFunction loadBalancerExchangeFilterFunction(
465
ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory) {
466
return new ReactorLoadBalancerExchangeFilterFunction(loadBalancerFactory);
467
}
468
}
469
```