or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

circuit-breakers.mdhttp-clients.mdindex.mdload-balancing.mdreactive-support.mdservice-discovery.mdservice-registration.md

reactive-support.mddocs/

0

# Reactive Support

1

2

Reactive programming support provides enhanced reactive utilities and operators specifically designed for cloud-native applications. This includes custom Flux operators and reactive versions of all core Spring Cloud Commons components.

3

4

## Capabilities

5

6

### Cloud Flux

7

8

Enhanced Flux with additional operators for cloud scenarios.

9

10

```java { .api }

11

/**

12

* Extended Flux with additional operators for cloud scenarios

13

*/

14

public abstract class CloudFlux<T> extends Flux<T> {

15

/**

16

* Returns the first Publisher to emit any signal (onNext, onComplete, onError) from the provided Publishers.

17

* @param publishers The Publishers to compete

18

* @return A Flux that emits from the first non-empty Publisher

19

*/

20

public static <I> Flux<I> firstNonEmpty(Publisher<? extends I>... publishers);

21

22

/**

23

* Returns the first Publisher to emit any signal (onNext, onComplete, onError) from the provided Publishers.

24

* @param publishers An Iterable of Publishers to compete

25

* @return A Flux that emits from the first non-empty Publisher

26

*/

27

public static <I> Flux<I> firstNonEmpty(Iterable<? extends Publisher<? extends I>> publishers);

28

}

29

```

30

31

**Usage Examples:**

32

33

```java

34

@Service

35

public class ReactiveServiceDiscovery {

36

37

@Autowired

38

private List<ReactiveDiscoveryClient> discoveryClients;

39

40

public Flux<ServiceInstance> findInstances(String serviceId) {

41

// Create publishers for each discovery client

42

List<Publisher<ServiceInstance>> publishers = discoveryClients.stream()

43

.map(client -> client.getInstances(serviceId))

44

.collect(Collectors.toList());

45

46

// Return first non-empty result

47

return CloudFlux.firstNonEmpty(publishers);

48

}

49

50

public Flux<String> findServices() {

51

// Get services from multiple discovery clients, return first non-empty

52

Publisher<String>[] publishers = discoveryClients.stream()

53

.map(ReactiveDiscoveryClient::getServices)

54

.toArray(Publisher[]::new);

55

56

return CloudFlux.firstNonEmpty(publishers);

57

}

58

}

59

```

60

61

### First Non-Empty Implementation

62

63

Internal implementation class for first non-empty emissions.

64

65

```java { .api }

66

/**

67

* Implementation class for first non-empty emissions

68

* Note: This is an internal implementation class and should not be used directly

69

*/

70

public class FluxFirstNonEmptyEmitting<T> extends Flux<T> {

71

// Internal implementation details

72

// This class should not be used directly in application code

73

}

74

```

75

76

## Reactive Discovery

77

78

All discovery components have reactive counterparts that integrate seamlessly with the reactive ecosystem.

79

80

### Reactive Discovery Client

81

82

```java { .api }

83

/**

84

* Reactive interface for service discovery operations

85

*/

86

public interface ReactiveDiscoveryClient {

87

String description();

88

Flux<ServiceInstance> getInstances(String serviceId);

89

Flux<String> getServices();

90

default Mono<Void> reactiveProbe() { return Mono.empty(); }

91

}

92

```

93

94

### Reactive Composite Discovery Client

95

96

```java { .api }

97

/**

98

* Reactive composite discovery client combining multiple discovery clients

99

*/

100

public class ReactiveCompositeDiscoveryClient implements ReactiveDiscoveryClient {

101

public ReactiveCompositeDiscoveryClient(List<ReactiveDiscoveryClient> discoveryClients);

102

103

@Override

104

public Flux<ServiceInstance> getInstances(String serviceId) {

105

// Combines results from all discovery clients reactively

106

}

107

108

@Override

109

public Flux<String> getServices() {

110

// Combines services from all discovery clients reactively

111

}

112

}

113

```

114

115

## Reactive Load Balancing

116

117

Reactive load balancing provides non-blocking service instance selection and WebClient integration.

118

119

### Reactive Load Balancer

120

121

```java { .api }

122

/**

123

* Reactive load balancer interface

124

*/

125

public interface ReactiveLoadBalancer<T> {

126

Mono<Response<T>> choose(Request request);

127

default Mono<Response<T>> choose() { return choose(REQUEST); }

128

129

Request REQUEST = new DefaultRequest<>();

130

131

interface Factory<T> {

132

ReactiveLoadBalancer<T> getInstance(String serviceId);

133

}

134

}

135

```

136

137

### WebClient Exchange Filter Functions

138

139

```java { .api }

140

/**

141

* Reactor-based WebClient exchange filter function for load balancing

142

*/

143

public class ReactorLoadBalancerExchangeFilterFunction implements ExchangeFilterFunction {

144

public ReactorLoadBalancerExchangeFilterFunction(ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory);

145

146

@Override

147

public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next);

148

}

149

150

/**

151

* Retryable WebClient exchange filter function

152

*/

153

public class RetryableLoadBalancerExchangeFilterFunction implements ExchangeFilterFunction {

154

public RetryableLoadBalancerExchangeFilterFunction(RetrySpec retrySpec,

155

ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory);

156

157

@Override

158

public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next);

159

}

160

```

161

162

## Reactive Circuit Breakers

163

164

Full reactive support for circuit breaker patterns with Mono and Flux.

165

166

### Reactive Circuit Breaker Factory

167

168

```java { .api }

169

/**

170

* Abstract factory for reactive circuit breakers

171

*/

172

public abstract class ReactiveCircuitBreakerFactory<CONF, CONFB> {

173

public abstract ReactiveCircuitBreaker create(String id);

174

public ReactiveCircuitBreaker create(String id, String groupName) { return create(id); }

175

public abstract void configure(Consumer<CONFB> consumer);

176

public abstract void configure(String id, Consumer<CONFB> consumer);

177

public void configureGroup(String groupName, Consumer<CONFB> consumer) {}

178

}

179

```

180

181

## Configuration and Customization

182

183

### WebClient Customization

184

185

```java { .api }

186

/**

187

* Interface for customizing WebClient instances

188

*/

189

public interface WebClientCustomizer {

190

/**

191

* Customize the WebClient.Builder

192

* @param webClientBuilder The WebClient.Builder to customize

193

*/

194

void customize(WebClient.Builder webClientBuilder);

195

}

196

```

197

198

### Exchange Filter Function Utilities

199

200

```java { .api }

201

/**

202

* Utility functions for exchange filter functions

203

*/

204

public class ExchangeFilterFunctionUtils {

205

/**

206

* Create a load balanced exchange filter function

207

* @param loadBalancerFactory The load balancer factory

208

* @return An exchange filter function

209

*/

210

public static ExchangeFilterFunction loadBalancerFilter(

211

ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory);

212

213

/**

214

* Create a retryable load balanced exchange filter function

215

* @param retrySpec The retry specification

216

* @param loadBalancerFactory The load balancer factory

217

* @return A retryable exchange filter function

218

*/

219

public static ExchangeFilterFunction retryableLoadBalancerFilter(

220

RetrySpec retrySpec,

221

ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory);

222

}

223

```

224

225

## Usage Examples

226

227

**Reactive Service Discovery with Fallback:**

228

229

```java

230

@Service

231

public class ReactiveUserService {

232

233

@Autowired

234

private ReactiveDiscoveryClient discoveryClient;

235

236

@Autowired

237

private WebClient.Builder webClientBuilder;

238

239

public Mono<User> getUser(String userId) {

240

return discoveryClient.getInstances("user-service")

241

.next() // Get first available instance

242

.switchIfEmpty(Mono.error(new ServiceUnavailableException("No user-service instances available")))

243

.flatMap(instance -> {

244

WebClient webClient = webClientBuilder

245

.baseUrl(instance.getUri().toString())

246

.build();

247

248

return webClient.get()

249

.uri("/users/{id}", userId)

250

.retrieve()

251

.bodyToMono(User.class);

252

})

253

.onErrorResume(throwable -> {

254

log.warn("Failed to get user, using fallback", throwable);

255

return Mono.just(User.defaultUser(userId));

256

});

257

}

258

259

public Flux<User> getAllUsers() {

260

return discoveryClient.getInstances("user-service")

261

.collectList()

262

.flatMapMany(instances -> {

263

if (instances.isEmpty()) {

264

return Flux.error(new ServiceUnavailableException("No user-service instances"));

265

}

266

267

// Load balance across instances

268

List<Publisher<User>> publishers = instances.stream()

269

.map(instance -> {

270

WebClient webClient = webClientBuilder

271

.baseUrl(instance.getUri().toString())

272

.build();

273

274

return webClient.get()

275

.uri("/users")

276

.retrieve()

277

.bodyToFlux(User.class);

278

})

279

.collect(Collectors.toList());

280

281

// Use CloudFlux to get first non-empty result

282

return CloudFlux.firstNonEmpty(publishers);

283

});

284

}

285

}

286

```

287

288

**Reactive Load Balancing with WebClient:**

289

290

```java

291

@Configuration

292

public class ReactiveWebClientConfig {

293

294

@Bean

295

@LoadBalanced

296

public WebClient.Builder loadBalancedWebClientBuilder() {

297

return WebClient.builder();

298

}

299

300

@Bean

301

public WebClient loadBalancedWebClient(@LoadBalanced WebClient.Builder builder) {

302

return builder.build();

303

}

304

}

305

306

@Service

307

public class ReactiveOrderService {

308

309

private final WebClient webClient;

310

311

public ReactiveOrderService(@LoadBalanced WebClient.Builder webClientBuilder) {

312

this.webClient = webClientBuilder.build();

313

}

314

315

public Mono<Order> getOrder(String orderId) {

316

return webClient.get()

317

.uri("http://order-service/orders/{id}", orderId)

318

.retrieve()

319

.bodyToMono(Order.class)

320

.timeout(Duration.ofSeconds(5))

321

.retry(3)

322

.onErrorResume(throwable -> {

323

log.error("Failed to get order: " + orderId, throwable);

324

return Mono.just(Order.fallbackOrder(orderId));

325

});

326

}

327

328

public Flux<Order> getOrdersStream() {

329

return webClient.get()

330

.uri("http://order-service/orders/stream")

331

.retrieve()

332

.bodyToFlux(Order.class)

333

.delayElements(Duration.ofMillis(100))

334

.onBackpressureBuffer(1000)

335

.onErrorContinue((throwable, obj) -> {

336

log.warn("Error processing order: " + obj, throwable);

337

});

338

}

339

}

340

```

341

342

**Reactive Circuit Breaker with Custom Configuration:**

343

344

```java

345

@Configuration

346

public class ReactiveCircuitBreakerConfig {

347

348

@Bean

349

public Customizer<ReactiveResilience4JCircuitBreakerFactory> defaultCustomizer() {

350

return factory -> factory.configureDefault(id -> new Resilience4JConfigBuilder(id)

351

.circuitBreakerConfig(CircuitBreakerConfig.custom()

352

.slidingWindowSize(20)

353

.permittedNumberOfCallsInHalfOpenState(5)

354

.failureRateThreshold(50.0f)

355

.waitDurationInOpenState(Duration.ofSeconds(30))

356

.recordExceptions(IOException.class, TimeoutException.class)

357

.build())

358

.timeLimiterConfig(TimeLimiterConfig.custom()

359

.timeoutDuration(Duration.ofSeconds(3))

360

.build())

361

.build());

362

}

363

}

364

365

@Service

366

public class ReactivePaymentService {

367

368

@Autowired

369

private ReactiveCircuitBreakerFactory circuitBreakerFactory;

370

371

private ReactiveCircuitBreaker circuitBreaker;

372

private WebClient webClient;

373

374

@PostConstruct

375

public void init() {

376

this.circuitBreaker = circuitBreakerFactory.create("payment-service");

377

this.webClient = WebClient.create("http://payment-service");

378

}

379

380

public Mono<PaymentResult> processPayment(PaymentRequest request) {

381

return circuitBreaker.run(

382

webClient.post()

383

.uri("/payments")

384

.bodyValue(request)

385

.retrieve()

386

.bodyToMono(PaymentResult.class),

387

throwable -> Mono.just(PaymentResult.failed("Circuit breaker fallback"))

388

);

389

}

390

391

public Flux<PaymentStatus> getPaymentStatusStream(String paymentId) {

392

return circuitBreaker.run(

393

webClient.get()

394

.uri("/payments/{id}/status/stream", paymentId)

395

.retrieve()

396

.bodyToFlux(PaymentStatus.class),

397

throwable -> Flux.just(PaymentStatus.unknown(paymentId))

398

);

399

}

400

}

401

```

402

403

**Custom Reactive Discovery with CloudFlux:**

404

405

```java

406

@Component

407

public class MultiRegionDiscoveryClient implements ReactiveDiscoveryClient {

408

409

private final List<ReactiveDiscoveryClient> regionalClients;

410

411

public MultiRegionDiscoveryClient(List<ReactiveDiscoveryClient> regionalClients) {

412

this.regionalClients = regionalClients;

413

}

414

415

@Override

416

public String description() {

417

return "Multi-region discovery client";

418

}

419

420

@Override

421

public Flux<ServiceInstance> getInstances(String serviceId) {

422

// Get instances from all regions, return first non-empty

423

List<Publisher<ServiceInstance>> publishers = regionalClients.stream()

424

.map(client -> client.getInstances(serviceId))

425

.collect(Collectors.toList());

426

427

return CloudFlux.firstNonEmpty(publishers)

428

.switchIfEmpty(Flux.defer(() -> {

429

log.warn("No instances found in any region for service: {}", serviceId);

430

return Flux.empty();

431

}));

432

}

433

434

@Override

435

public Flux<String> getServices() {

436

// Combine services from all regions

437

return Flux.fromIterable(regionalClients)

438

.flatMap(ReactiveDiscoveryClient::getServices)

439

.distinct()

440

.sort();

441

}

442

}

443

```

444

445

## Auto-Configuration

446

447

```java { .api }

448

/**

449

* Auto-configuration for reactive client features

450

*/

451

@Configuration

452

@ConditionalOnClass({Flux.class, Mono.class})

453

public class ReactiveCommonsClientAutoConfiguration {

454

455

@Bean

456

@ConditionalOnMissingBean

457

public ReactiveDiscoveryClient reactiveDiscoveryClient(List<ReactiveDiscoveryClient> discoveryClients) {

458

return new ReactiveCompositeDiscoveryClient(discoveryClients);

459

}

460

461

@Bean

462

@ConditionalOnMissingBean

463

@ConditionalOnBean(ReactiveLoadBalancer.Factory.class)

464

public ReactorLoadBalancerExchangeFilterFunction loadBalancerExchangeFilterFunction(

465

ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory) {

466

return new ReactorLoadBalancerExchangeFilterFunction(loadBalancerFactory);

467

}

468

}

469

```