or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aop.mdcore-container.mddata-access.mdindex.mdintegration.mdmessaging.mdreactive-web.mdtesting.mdweb-framework.md

reactive-web.mddocs/

0

# Reactive Web Framework (WebFlux)

1

2

Spring WebFlux is the reactive-stack web framework introduced in Spring 5.0. It provides a non-blocking, reactive programming model for building web applications that can handle high concurrency with a small number of threads and scale with fewer hardware resources.

3

4

## Maven Dependencies

5

6

```xml

7

<!-- Spring WebFlux -->

8

<dependency>

9

<groupId>org.springframework</groupId>

10

<artifactId>spring-webflux</artifactId>

11

<version>5.3.39</version>

12

</dependency>

13

14

<!-- Reactor Core (reactive streams) -->

15

<dependency>

16

<groupId>io.projectreactor</groupId>

17

<artifactId>reactor-core</artifactId>

18

<version>3.4.32</version>

19

</dependency>

20

21

<!-- Netty (default reactive server) -->

22

<dependency>

23

<groupId>io.netty</groupId>

24

<artifactId>netty-all</artifactId>

25

<version>4.1.112.Final</version>

26

</dependency>

27

28

<!-- Jackson for JSON (reactive) -->

29

<dependency>

30

<groupId>com.fasterxml.jackson.core</groupId>

31

<artifactId>jackson-databind</artifactId>

32

<version>2.12.7</version>

33

</dependency>

34

```

35

36

## Core Imports

37

38

```java { .api }

39

// Reactive types

40

import reactor.core.publisher.Mono;

41

import reactor.core.publisher.Flux;

42

import org.reactivestreams.Publisher;

43

44

// WebFlux annotations

45

import org.springframework.web.bind.annotation.RestController;

46

import org.springframework.web.bind.annotation.GetMapping;

47

import org.springframework.web.bind.annotation.PostMapping;

48

import org.springframework.web.bind.annotation.RequestBody;

49

import org.springframework.web.bind.annotation.PathVariable;

50

51

// Functional routing

52

import org.springframework.web.reactive.function.server.RouterFunction;

53

import org.springframework.web.reactive.function.server.RouterFunctions;

54

import org.springframework.web.reactive.function.server.ServerRequest;

55

import org.springframework.web.reactive.function.server.ServerResponse;

56

import org.springframework.web.reactive.function.server.HandlerFunction;

57

58

// Configuration

59

import org.springframework.web.reactive.config.EnableWebFlux;

60

import org.springframework.web.reactive.config.WebFluxConfigurer;

61

import org.springframework.context.annotation.Configuration;

62

63

// WebClient (reactive HTTP client)

64

import org.springframework.web.reactive.function.client.WebClient;

65

import org.springframework.web.reactive.function.client.ClientResponse;

66

67

// Server support

68

import org.springframework.web.reactive.function.server.RequestPredicates;

69

import org.springframework.http.MediaType;

70

import org.springframework.http.ResponseEntity;

71

```

72

73

## Core Reactive Types

74

75

### Mono and Flux

76

77

```java { .api }

78

// Reactive stream publisher for 0-1 elements

79

public abstract class Mono<T> implements CorePublisher<T> {

80

81

// Creation methods

82

public static <T> Mono<T> just(T data);

83

public static <T> Mono<T> justOrEmpty(T data);

84

public static <T> Mono<T> empty();

85

public static <T> Mono<T> error(Throwable error);

86

public static <T> Mono<T> fromCallable(Callable<? extends T> supplier);

87

public static <T> Mono<T> fromSupplier(Supplier<? extends T> supplier);

88

public static <T> Mono<T> defer(Supplier<? extends Mono<? extends T>> supplier);

89

90

// Transformation methods

91

public <R> Mono<R> map(Function<? super T, ? extends R> mapper);

92

public <R> Mono<R> flatMap(Function<? super T, ? extends Mono<? extends R>> transformer);

93

public <R> Flux<R> flatMapMany(Function<? super T, ? extends Publisher<? extends R>> mapper);

94

public Mono<T> filter(Predicate<? super T> tester);

95

public Mono<T> switchIfEmpty(Mono<? extends T> alternate);

96

97

// Error handling

98

public Mono<T> onErrorReturn(T fallback);

99

public Mono<T> onErrorResume(Function<? super Throwable, ? extends Mono<? extends T>> fallback);

100

public Mono<T> doOnError(Consumer<? super Throwable> onError);

101

102

// Side effects

103

public Mono<T> doOnNext(Consumer<? super T> onNext);

104

public Mono<T> doOnSuccess(Consumer<? super T> onSuccess);

105

public Mono<T> doFinally(Consumer<SignalType> onFinally);

106

107

// Blocking operations (avoid in production)

108

public T block();

109

public T block(Duration timeout);

110

111

// Subscription

112

public Disposable subscribe();

113

public Disposable subscribe(Consumer<? super T> consumer);

114

public Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer);

115

}

116

117

// Reactive stream publisher for 0-N elements

118

public abstract class Flux<T> implements CorePublisher<T> {

119

120

// Creation methods

121

public static <T> Flux<T> just(T... data);

122

public static <T> Flux<T> fromIterable(Iterable<? extends T> it);

123

public static <T> Flux<T> fromArray(T[] array);

124

public static <T> Flux<T> fromStream(Stream<? extends T> s);

125

public static <T> Flux<T> empty();

126

public static <T> Flux<T> error(Throwable error);

127

public static Flux<Long> interval(Duration period);

128

public static <T> Flux<T> defer(Supplier<? extends Publisher<T>> supplier);

129

130

// Transformation methods

131

public <V> Flux<V> map(Function<? super T, ? extends V> mapper);

132

public <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);

133

public Flux<T> filter(Predicate<? super T> p);

134

public Flux<T> take(long n);

135

public Flux<T> skip(long skipped);

136

public Flux<T> distinct();

137

public Flux<T> sort();

138

139

// Aggregation methods

140

public Mono<T> reduce(BinaryOperator<T> aggregator);

141

public <A> Mono<A> reduce(A initial, BiFunction<A, ? super T, A> accumulator);

142

public Mono<List<T>> collectList();

143

public Mono<Map<K, T>> collectMap(Function<? super T, ? extends K> keyExtractor);

144

public Mono<Long> count();

145

146

// Error handling

147

public Flux<T> onErrorReturn(T fallback);

148

public Flux<T> onErrorResume(Function<? super Throwable, ? extends Publisher<? extends T>> fallback);

149

public Flux<T> retry();

150

public Flux<T> retry(long numRetries);

151

152

// Backpressure

153

public Flux<T> onBackpressureBuffer();

154

public Flux<T> onBackpressureDrop();

155

public Flux<T> onBackpressureLatest();

156

157

// Blocking operations (avoid in production)

158

public List<T> collectList().block();

159

public Stream<T> toStream();

160

}

161

```

162

163

## Functional Routing

164

165

### RouterFunction and ServerRequest/ServerResponse

166

167

```java { .api }

168

// Represents a function that routes to a handler function

169

@FunctionalInterface

170

public interface RouterFunction<T extends ServerResponse> {

171

172

Mono<HandlerFunction<T>> route(ServerRequest request);

173

174

default RouterFunction<T> and(RouterFunction<T> other);

175

default RouterFunction<T> andRoute(RequestPredicate predicate, HandlerFunction<T> handlerFunction);

176

default RouterFunction<T> andNest(RequestPredicate predicate, RouterFunction<T> routerFunction);

177

default <S extends ServerResponse> RouterFunction<?> andOther(RouterFunction<S> other);

178

default RouterFunction<T> filter(HandlerFilterFunction<T, T> filterFunction);

179

}

180

181

// Represents a typed server-side HTTP request

182

public interface ServerRequest {

183

184

// Request line

185

HttpMethod method();

186

URI uri();

187

String path();

188

189

// Headers

190

ServerRequest.Headers headers();

191

MultiValueMap<String, String> queryParams();

192

MultiValueMap<String, String> pathVariables();

193

194

// Body

195

<T> Mono<T> bodyToMono(Class<? extends T> elementClass);

196

<T> Mono<T> bodyToMono(ParameterizedTypeReference<T> typeReference);

197

<T> Flux<T> bodyToFlux(Class<? extends T> elementClass);

198

<T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> typeReference);

199

200

// Attributes

201

Map<String, Object> attributes();

202

Optional<Object> attribute(String name);

203

204

// Path matching

205

String pathVariable(String name);

206

MultiValueMap<String, String> pathVariables();

207

208

// Convenience methods

209

Optional<String> queryParam(String name);

210

Principal principal();

211

212

// Headers interface

213

interface Headers {

214

List<MediaType> accept();

215

OptionalLong contentLength();

216

Optional<MediaType> contentType();

217

String host();

218

List<String> header(String headerName);

219

HttpHeaders asHttpHeaders();

220

}

221

}

222

223

// Represents a typed server-side HTTP response

224

public interface ServerResponse {

225

226

HttpStatus statusCode();

227

int rawStatusCode();

228

HttpHeaders headers();

229

MultiValueMap<String, ResponseCookie> cookies();

230

231

// Static factory methods

232

static BodyBuilder status(HttpStatus status);

233

static BodyBuilder status(int status);

234

static BodyBuilder ok();

235

static HeadersBuilder<?> noContent();

236

static BodyBuilder badRequest();

237

static HeadersBuilder<?> notFound();

238

static BodyBuilder unprocessableEntity();

239

240

// Builder interfaces

241

interface BodyBuilder extends HeadersBuilder<BodyBuilder> {

242

<T> Mono<ServerResponse> body(Publisher<T> publisher, Class<T> elementClass);

243

<T> Mono<ServerResponse> body(Publisher<T> publisher, ParameterizedTypeReference<T> typeRef);

244

Mono<ServerResponse> body(Object body, Class<?> bodyType);

245

<T> Mono<ServerResponse> body(Mono<T> body, Class<T> bodyType);

246

<T> Mono<ServerResponse> body(Flux<T> body, Class<T> bodyType);

247

Mono<ServerResponse> syncBody(Object body);

248

249

Mono<ServerResponse> render(String name, Object... modelAttributes);

250

Mono<ServerResponse> render(String name, Map<String, ?> model);

251

}

252

253

interface HeadersBuilder<B extends HeadersBuilder<B>> {

254

B header(String headerName, String... headerValues);

255

B headers(HttpHeaders headers);

256

B cookie(ResponseCookie cookie);

257

B cookies(Consumer<MultiValueMap<String, ResponseCookie>> cookiesConsumer);

258

B allow(HttpMethod... allowedMethods);

259

B eTag(String etag);

260

B lastModified(ZonedDateTime lastModified);

261

B location(URI location);

262

B cacheControl(CacheControl cacheControl);

263

B varyBy(String... requestHeaders);

264

265

Mono<ServerResponse> build();

266

<T> Mono<ServerResponse> build(Publisher<T> voidPublisher);

267

}

268

}

269

270

// Function that handles a server request and returns a response

271

@FunctionalInterface

272

public interface HandlerFunction<T extends ServerResponse> {

273

Mono<T> handle(ServerRequest request);

274

}

275

```

276

277

### RouterFunctions Utility

278

279

```java { .api }

280

// Factory methods for RouterFunction

281

public abstract class RouterFunctions {

282

283

// Route creation

284

public static <T extends ServerResponse> RouterFunction<T> route(RequestPredicate predicate, HandlerFunction<T> handlerFunction);

285

286

// Convenience methods for HTTP methods

287

public static RouterFunction<ServerResponse> GET(String pattern, HandlerFunction<ServerResponse> handlerFunction);

288

public static RouterFunction<ServerResponse> POST(String pattern, HandlerFunction<ServerResponse> handlerFunction);

289

public static RouterFunction<ServerResponse> PUT(String pattern, HandlerFunction<ServerResponse> handlerFunction);

290

public static RouterFunction<ServerResponse> DELETE(String pattern, HandlerFunction<ServerResponse> handlerFunction);

291

public static RouterFunction<ServerResponse> PATCH(String pattern, HandlerFunction<ServerResponse> handlerFunction);

292

293

// Nesting routes

294

public static <T extends ServerResponse> RouterFunction<T> nest(RequestPredicate predicate, RouterFunction<T> routerFunction);

295

296

// Resource handling

297

public static RouterFunction<ServerResponse> resources(String pattern, Resource location);

298

public static RouterFunction<ServerResponse> resources(Function<ServerRequest, Mono<Resource>> lookupFunction);

299

300

// Route conversion

301

public static HandlerMapping toHandlerMapping(RouterFunction<?> routerFunction);

302

public static HttpHandler toHttpHandler(RouterFunction<?> routerFunction);

303

}

304

305

// Predicates for request matching

306

public abstract class RequestPredicates {

307

308

// HTTP methods

309

public static RequestPredicate GET(String pattern);

310

public static RequestPredicate POST(String pattern);

311

public static RequestPredicate PUT(String pattern);

312

public static RequestPredicate DELETE(String pattern);

313

public static RequestPredicate PATCH(String pattern);

314

public static RequestPredicate HEAD(String pattern);

315

public static RequestPredicate OPTIONS(String pattern);

316

317

// Path matching

318

public static RequestPredicate path(String pattern);

319

public static RequestPredicate pathExtension(String extension);

320

public static RequestPredicate pathExtension(Predicate<String> extensionPredicate);

321

322

// Content type

323

public static RequestPredicate accept(MediaType... mediaTypes);

324

public static RequestPredicate contentType(MediaType... mediaTypes);

325

326

// Headers

327

public static RequestPredicate headers(Predicate<ServerRequest.Headers> headersPredicate);

328

public static RequestPredicate header(String name, String value);

329

330

// Query parameters

331

public static RequestPredicate queryParam(String name, String value);

332

public static RequestPredicate queryParam(String name, Predicate<String> predicate);

333

334

// Logical operations

335

public static RequestPredicate and(RequestPredicate left, RequestPredicate right);

336

public static RequestPredicate or(RequestPredicate left, RequestPredicate right);

337

public static RequestPredicate not(RequestPredicate predicate);

338

}

339

```

340

341

## Annotated Controllers

342

343

### WebFlux Controller Annotations

344

345

```java { .api }

346

// WebFlux supports same annotations as Spring MVC but with reactive return types

347

@RestController

348

@RequestMapping("/api/reactive")

349

public class ReactiveController {

350

351

@GetMapping("/mono")

352

public Mono<String> getMono() {

353

return Mono.just("Hello Reactive World!");

354

}

355

356

@GetMapping("/flux")

357

public Flux<String> getFlux() {

358

return Flux.just("Item 1", "Item 2", "Item 3");

359

}

360

361

@PostMapping("/mono")

362

public Mono<ResponseEntity<String>> postMono(@RequestBody Mono<String> body) {

363

return body.map(value -> ResponseEntity.ok("Received: " + value));

364

}

365

366

@GetMapping(value = "/stream", produces = MediaType.TEXT_PLAIN_VALUE)

367

public Flux<String> getStream() {

368

return Flux.interval(Duration.ofSeconds(1))

369

.map(i -> "Data " + i + "\n");

370

}

371

}

372

```

373

374

## WebClient (Reactive HTTP Client)

375

376

### WebClient Interface

377

378

```java { .api }

379

// Non-blocking, reactive client to perform HTTP requests

380

public interface WebClient {

381

382

// Request specification

383

RequestHeadersUriSpec<?> get();

384

RequestBodyUriSpec post();

385

RequestBodyUriSpec put();

386

RequestBodyUriSpec patch();

387

RequestHeadersUriSpec<?> delete();

388

RequestHeadersUriSpec<?> options();

389

RequestHeadersUriSpec<?> head();

390

RequestBodyUriSpec method(HttpMethod method);

391

392

// Builder

393

static WebClient create();

394

static WebClient create(String baseUrl);

395

static Builder builder();

396

397

// Builder interface

398

interface Builder {

399

Builder baseUrl(String baseUrl);

400

Builder defaultHeader(String header, String... values);

401

Builder defaultHeaders(Consumer<HttpHeaders> headersConsumer);

402

Builder defaultCookie(String cookie, String... values);

403

Builder defaultCookies(Consumer<MultiValueMap<String, String>> cookiesConsumer);

404

Builder filter(ExchangeFilterFunction filter);

405

Builder filters(Consumer<List<ExchangeFilterFunction>> filtersConsumer);

406

Builder clientConnector(ClientHttpConnector connector);

407

Builder codecs(Consumer<ClientCodecConfigurer> configurer);

408

409

WebClient build();

410

}

411

412

// Request specifications

413

interface RequestBodyUriSpec extends RequestBodySpec, RequestHeadersUriSpec<RequestBodySpec> {

414

}

415

416

interface RequestBodySpec extends RequestHeadersSpec<RequestBodySpec> {

417

RequestBodySpec body(Publisher<DataBuffer> body, Class<? extends DataBuffer> bodyType);

418

<T> RequestBodySpec body(Publisher<T> body, Class<T> bodyType);

419

<T> RequestBodySpec body(Publisher<T> body, ParameterizedTypeReference<T> bodyTypeRef);

420

RequestBodySpec body(Object body);

421

<T> RequestBodySpec body(Mono<T> body, Class<T> bodyType);

422

<T> RequestBodySpec body(Mono<T> body, ParameterizedTypeReference<T> bodyTypeRef);

423

RequestBodySpec syncBody(Object body);

424

}

425

426

interface RequestHeadersSpec<S extends RequestHeadersSpec<S>> {

427

S header(String headerName, String... headerValues);

428

S headers(Consumer<HttpHeaders> headersConsumer);

429

S attribute(String name, Object value);

430

S attributes(Consumer<Map<String, Object>> attributesConsumer);

431

S cookie(String name, String value);

432

S cookies(Consumer<MultiValueMap<String, String>> cookiesConsumer);

433

S ifNoneMatch(String... ifNoneMatches);

434

S ifModifiedSince(ZonedDateTime ifModifiedSince);

435

436

ResponseSpec retrieve();

437

Mono<ClientResponse> exchange();

438

}

439

}

440

441

// Response specification

442

public interface ResponseSpec {

443

ResponseSpec onStatus(Predicate<HttpStatus> statusPredicate, Function<ClientResponse, Mono<? extends Throwable>> exceptionFunction);

444

<T> Mono<T> bodyToMono(Class<T> bodyType);

445

<T> Mono<T> bodyToMono(ParameterizedTypeReference<T> bodyTypeReference);

446

<T> Flux<T> bodyToFlux(Class<T> bodyType);

447

<T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> bodyTypeReference);

448

Mono<ResponseEntity<Void>> toBodilessEntity();

449

<T> Mono<ResponseEntity<T>> toEntity(Class<T> bodyType);

450

<T> Mono<ResponseEntity<T>> toEntity(ParameterizedTypeReference<T> bodyTypeReference);

451

<T> Mono<ResponseEntity<List<T>>> toEntityList(Class<T> bodyType);

452

<T> Mono<ResponseEntity<List<T>>> toEntityList(ParameterizedTypeReference<T> bodyTypeReference);

453

}

454

```

455

456

## Configuration

457

458

### WebFlux Configuration

459

460

```java { .api }

461

// Enables WebFlux configuration

462

@Retention(RetentionPolicy.RUNTIME)

463

@Target(ElementType.TYPE)

464

@Documented

465

@Import(DelegatingWebFluxConfiguration.class)

466

public @interface EnableWebFlux {

467

}

468

469

// Interface for customizing WebFlux configuration

470

public interface WebFluxConfigurer {

471

472

default void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {

473

}

474

475

default void addFormatters(FormatterRegistry registry) {

476

}

477

478

default Validator getValidator() {

479

return null;

480

}

481

482

default MessageCodesResolver getMessageCodesResolver() {

483

return null;

484

}

485

486

default void addResourceHandlers(ResourceHandlerRegistry registry) {

487

}

488

489

default void addCorsMappings(CorsRegistry registry) {

490

}

491

492

default void configurePathMatching(PathMatchConfigurer configurer) {

493

}

494

495

default void configureArgumentResolvers(ArgumentResolverConfigurer configurer) {

496

}

497

498

default void configureViewResolvers(ViewResolverRegistry registry) {

499

}

500

}

501

```

502

503

## Practical Usage Examples

504

505

### Basic Reactive Controllers

506

507

```java { .api }

508

@RestController

509

@RequestMapping("/api/users")

510

public class ReactiveUserController {

511

512

private final ReactiveUserService userService;

513

514

public ReactiveUserController(ReactiveUserService userService) {

515

this.userService = userService;

516

}

517

518

@GetMapping

519

public Flux<User> getAllUsers(

520

@RequestParam(defaultValue = "0") int page,

521

@RequestParam(defaultValue = "10") int size) {

522

523

return userService.findAll(PageRequest.of(page, size));

524

}

525

526

@GetMapping("/{id}")

527

public Mono<ResponseEntity<User>> getUserById(@PathVariable Long id) {

528

return userService.findById(id)

529

.map(user -> ResponseEntity.ok(user))

530

.defaultIfEmpty(ResponseEntity.notFound().build());

531

}

532

533

@PostMapping

534

public Mono<ResponseEntity<User>> createUser(@Valid @RequestBody Mono<CreateUserRequest> request) {

535

return request

536

.flatMap(userService::createUser)

537

.map(user -> ResponseEntity.status(HttpStatus.CREATED).body(user))

538

.onErrorResume(ValidationException.class, e ->

539

Mono.just(ResponseEntity.badRequest().build()));

540

}

541

542

@PutMapping("/{id}")

543

public Mono<ResponseEntity<User>> updateUser(

544

@PathVariable Long id,

545

@Valid @RequestBody Mono<UpdateUserRequest> request) {

546

547

return request

548

.flatMap(req -> userService.updateUser(id, req))

549

.map(ResponseEntity::ok)

550

.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));

551

}

552

553

@DeleteMapping("/{id}")

554

public Mono<ResponseEntity<Void>> deleteUser(@PathVariable Long id) {

555

return userService.deleteById(id)

556

.then(Mono.just(ResponseEntity.noContent().<Void>build()))

557

.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));

558

}

559

560

@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)

561

public Flux<User> streamUsers() {

562

return userService.findAll()

563

.delayElements(Duration.ofSeconds(1));

564

}

565

566

@GetMapping("/search")

567

public Flux<User> searchUsers(@RequestParam String query) {

568

return userService.searchByName(query)

569

.take(Duration.ofSeconds(5)); // Timeout after 5 seconds

570

}

571

}

572

```

573

574

### Functional Routing Configuration

575

576

```java { .api }

577

@Configuration

578

public class RouterConfig {

579

580

@Bean

581

public RouterFunction<ServerResponse> userRoutes(UserHandler userHandler) {

582

return RouterFunctions

583

.nest(RequestPredicates.path("/api/users"),

584

RouterFunctions.GET("", userHandler::getUsers)

585

.andRoute(GET("/{id}"), userHandler::getUser)

586

.andRoute(POST(""), userHandler::createUser)

587

.andRoute(PUT("/{id}"), userHandler::updateUser)

588

.andRoute(DELETE("/{id}"), userHandler::deleteUser)

589

.andRoute(GET("/search"), userHandler::searchUsers)

590

);

591

}

592

593

@Bean

594

public RouterFunction<ServerResponse> fileRoutes(FileHandler fileHandler) {

595

return RouterFunctions

596

.nest(RequestPredicates.path("/api/files"),

597

RouterFunctions.POST("/upload", fileHandler::uploadFile)

598

.andRoute(GET("/{id}/download"), fileHandler::downloadFile)

599

.andRoute(DELETE("/{id}"), fileHandler::deleteFile)

600

);

601

}

602

603

@Bean

604

public RouterFunction<ServerResponse> adminRoutes(AdminHandler adminHandler) {

605

return RouterFunctions

606

.nest(RequestPredicates.path("/api/admin").and(accept(APPLICATION_JSON)),

607

RouterFunctions.GET("/stats", adminHandler::getSystemStats)

608

.andRoute(POST("/cache/clear"), adminHandler::clearCache)

609

.andRoute(GET("/health"), adminHandler::healthCheck)

610

)

611

.filter(authenticationFilter()); // Apply authentication filter

612

}

613

614

// Global router combining all routes

615

@Bean

616

public RouterFunction<ServerResponse> routes(

617

RouterFunction<ServerResponse> userRoutes,

618

RouterFunction<ServerResponse> fileRoutes,

619

RouterFunction<ServerResponse> adminRoutes) {

620

621

return userRoutes

622

.and(fileRoutes)

623

.and(adminRoutes)

624

.filter(loggingFilter())

625

.filter(errorHandlingFilter());

626

}

627

628

private HandlerFilterFunction<ServerResponse, ServerResponse> loggingFilter() {

629

return (request, next) -> {

630

long startTime = System.currentTimeMillis();

631

return next.handle(request)

632

.doOnNext(response -> {

633

long duration = System.currentTimeMillis() - startTime;

634

System.out.println(request.method() + " " + request.path() +

635

" - " + response.statusCode() + " (" + duration + "ms)");

636

});

637

};

638

}

639

640

private HandlerFilterFunction<ServerResponse, ServerResponse> errorHandlingFilter() {

641

return (request, next) -> {

642

return next.handle(request)

643

.onErrorResume(Exception.class, e -> {

644

System.err.println("Error handling request: " + e.getMessage());

645

return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)

646

.bodyValue("Internal Server Error");

647

});

648

};

649

}

650

651

private HandlerFilterFunction<ServerResponse, ServerResponse> authenticationFilter() {

652

return (request, next) -> {

653

String token = request.headers().firstHeader("Authorization");

654

if (token == null || !token.startsWith("Bearer ")) {

655

return ServerResponse.status(HttpStatus.UNAUTHORIZED)

656

.bodyValue("Authentication required");

657

}

658

659

// Validate token (simplified)

660

if (!isValidToken(token.substring(7))) {

661

return ServerResponse.status(HttpStatus.FORBIDDEN)

662

.bodyValue("Invalid token");

663

}

664

665

return next.handle(request);

666

};

667

}

668

669

private boolean isValidToken(String token) {

670

// Token validation logic

671

return token != null && !token.isEmpty();

672

}

673

}

674

675

// Handler class for functional endpoints

676

@Component

677

public class UserHandler {

678

679

private final ReactiveUserService userService;

680

681

public UserHandler(ReactiveUserService userService) {

682

this.userService = userService;

683

}

684

685

public Mono<ServerResponse> getUsers(ServerRequest request) {

686

int page = Integer.parseInt(request.queryParam("page").orElse("0"));

687

int size = Integer.parseInt(request.queryParam("size").orElse("10"));

688

689

Flux<User> users = userService.findAll(PageRequest.of(page, size));

690

691

return ServerResponse.ok()

692

.contentType(MediaType.APPLICATION_JSON)

693

.body(users, User.class);

694

}

695

696

public Mono<ServerResponse> getUser(ServerRequest request) {

697

Long id = Long.valueOf(request.pathVariable("id"));

698

699

return userService.findById(id)

700

.flatMap(user -> ServerResponse.ok()

701

.contentType(MediaType.APPLICATION_JSON)

702

.bodyValue(user))

703

.switchIfEmpty(ServerResponse.notFound().build());

704

}

705

706

public Mono<ServerResponse> createUser(ServerRequest request) {

707

return request.bodyToMono(CreateUserRequest.class)

708

.flatMap(userService::createUser)

709

.flatMap(user -> ServerResponse.status(HttpStatus.CREATED)

710

.contentType(MediaType.APPLICATION_JSON)

711

.bodyValue(user))

712

.onErrorResume(ValidationException.class, e ->

713

ServerResponse.badRequest().bodyValue(e.getMessage()));

714

}

715

716

public Mono<ServerResponse> updateUser(ServerRequest request) {

717

Long id = Long.valueOf(request.pathVariable("id"));

718

719

return request.bodyToMono(UpdateUserRequest.class)

720

.flatMap(req -> userService.updateUser(id, req))

721

.flatMap(user -> ServerResponse.ok()

722

.contentType(MediaType.APPLICATION_JSON)

723

.bodyValue(user))

724

.switchIfEmpty(ServerResponse.notFound().build());

725

}

726

727

public Mono<ServerResponse> deleteUser(ServerRequest request) {

728

Long id = Long.valueOf(request.pathVariable("id"));

729

730

return userService.deleteById(id)

731

.then(ServerResponse.noContent().build())

732

.switchIfEmpty(ServerResponse.notFound().build());

733

}

734

735

public Mono<ServerResponse> searchUsers(ServerRequest request) {

736

String query = request.queryParam("q").orElse("");

737

738

if (query.isEmpty()) {

739

return ServerResponse.badRequest().bodyValue("Query parameter 'q' is required");

740

}

741

742

Flux<User> users = userService.searchByName(query);

743

744

return ServerResponse.ok()

745

.contentType(MediaType.APPLICATION_JSON)

746

.body(users, User.class);

747

}

748

}

749

```

750

751

### WebClient Usage

752

753

```java { .api }

754

@Service

755

public class ExternalApiService {

756

757

private final WebClient webClient;

758

759

public ExternalApiService(WebClient.Builder webClientBuilder) {

760

this.webClient = webClientBuilder

761

.baseUrl("https://api.external-service.com")

762

.defaultHeader(HttpHeaders.USER_AGENT, "MyApp/1.0")

763

.filter(logRequest())

764

.filter(handleErrors())

765

.build();

766

}

767

768

public Mono<User> getUser(Long userId) {

769

return webClient

770

.get()

771

.uri("/users/{id}", userId)

772

.header(HttpHeaders.AUTHORIZATION, "Bearer " + getToken())

773

.retrieve()

774

.bodyToMono(User.class)

775

.timeout(Duration.ofSeconds(10))

776

.retry(3);

777

}

778

779

public Flux<User> getUsers(int page, int size) {

780

return webClient

781

.get()

782

.uri(uriBuilder -> uriBuilder

783

.path("/users")

784

.queryParam("page", page)

785

.queryParam("size", size)

786

.build())

787

.retrieve()

788

.bodyToFlux(User.class);

789

}

790

791

public Mono<User> createUser(CreateUserRequest request) {

792

return webClient

793

.post()

794

.uri("/users")

795

.contentType(MediaType.APPLICATION_JSON)

796

.body(Mono.just(request), CreateUserRequest.class)

797

.retrieve()

798

.bodyToMono(User.class);

799

}

800

801

public Mono<Void> deleteUser(Long userId) {

802

return webClient

803

.delete()

804

.uri("/users/{id}", userId)

805

.retrieve()

806

.bodyToMono(Void.class);

807

}

808

809

// File upload example

810

public Mono<UploadResponse> uploadFile(String filename, Flux<DataBuffer> fileData) {

811

return webClient

812

.post()

813

.uri("/files/upload")

814

.contentType(MediaType.APPLICATION_OCTET_STREAM)

815

.header("X-Filename", filename)

816

.body(fileData, DataBuffer.class)

817

.retrieve()

818

.bodyToMono(UploadResponse.class);

819

}

820

821

// Streaming response

822

public Flux<String> getStreamingData() {

823

return webClient

824

.get()

825

.uri("/stream")

826

.accept(MediaType.TEXT_EVENT_STREAM)

827

.retrieve()

828

.bodyToFlux(String.class);

829

}

830

831

// Error handling with exchange()

832

public Mono<User> getUserWithDetailedErrorHandling(Long userId) {

833

return webClient

834

.get()

835

.uri("/users/{id}", userId)

836

.exchange()

837

.flatMap(response -> {

838

if (response.statusCode().is2xxSuccessful()) {

839

return response.bodyToMono(User.class);

840

} else if (response.statusCode() == HttpStatus.NOT_FOUND) {

841

return Mono.error(new UserNotFoundException("User not found: " + userId));

842

} else {

843

return response.bodyToMono(String.class)

844

.flatMap(errorBody -> Mono.error(new ApiException("API Error: " + errorBody)));

845

}

846

});

847

}

848

849

private ExchangeFilterFunction logRequest() {

850

return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {

851

System.out.println("Request: " + clientRequest.method() + " " + clientRequest.url());

852

return Mono.just(clientRequest);

853

});

854

}

855

856

private ExchangeFilterFunction handleErrors() {

857

return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {

858

if (clientResponse.statusCode().is5xxServerError()) {

859

return clientResponse.bodyToMono(String.class)

860

.flatMap(errorBody -> Mono.error(new ServerException("Server error: " + errorBody)));

861

}

862

return Mono.just(clientResponse);

863

});

864

}

865

866

private String getToken() {

867

// Get authentication token

868

return "your-auth-token";

869

}

870

}

871

```

872

873

### Reactive Data Service

874

875

```java { .api }

876

@Service

877

public class ReactiveUserService {

878

879

private final ReactiveUserRepository userRepository;

880

private final WebClient notificationClient;

881

882

public ReactiveUserService(ReactiveUserRepository userRepository,

883

WebClient notificationClient) {

884

this.userRepository = userRepository;

885

this.notificationClient = notificationClient;

886

}

887

888

public Flux<User> findAll(Pageable pageable) {

889

return userRepository.findAllBy(pageable)

890

.onErrorResume(DataAccessException.class, e -> {

891

System.err.println("Database error: " + e.getMessage());

892

return Flux.empty();

893

});

894

}

895

896

public Mono<User> findById(Long id) {

897

return userRepository.findById(id)

898

.switchIfEmpty(Mono.error(new UserNotFoundException("User not found: " + id)));

899

}

900

901

@Transactional

902

public Mono<User> createUser(CreateUserRequest request) {

903

return Mono.fromCallable(() -> convertToUser(request))

904

.flatMap(userRepository::save)

905

.flatMap(this::sendWelcomeNotification)

906

.onErrorMap(DataIntegrityViolationException.class, e ->

907

new UserAlreadyExistsException("User already exists"));

908

}

909

910

@Transactional

911

public Mono<User> updateUser(Long id, UpdateUserRequest request) {

912

return userRepository.findById(id)

913

.switchIfEmpty(Mono.error(new UserNotFoundException("User not found: " + id)))

914

.map(existingUser -> updateUserFromRequest(existingUser, request))

915

.flatMap(userRepository::save);

916

}

917

918

@Transactional

919

public Mono<Void> deleteById(Long id) {

920

return userRepository.existsById(id)

921

.flatMap(exists -> {

922

if (!exists) {

923

return Mono.error(new UserNotFoundException("User not found: " + id));

924

}

925

return userRepository.deleteById(id);

926

});

927

}

928

929

public Flux<User> searchByName(String name) {

930

return userRepository.findByNameContainingIgnoreCase(name)

931

.timeout(Duration.ofSeconds(5))

932

.onErrorResume(TimeoutException.class, e -> {

933

System.err.println("Search timeout for query: " + name);

934

return Flux.empty();

935

});

936

}

937

938

// Reactive composition example

939

public Mono<UserWithStats> getUserWithStats(Long userId) {

940

Mono<User> userMono = findById(userId);

941

Mono<UserStats> statsMono = getUserStats(userId);

942

943

return Mono.zip(userMono, statsMono)

944

.map(tuple -> new UserWithStats(tuple.getT1(), tuple.getT2()));

945

}

946

947

// Parallel processing example

948

public Flux<User> processUsersInParallel(List<Long> userIds) {

949

return Flux.fromIterable(userIds)

950

.parallel(4) // Use 4 parallel threads

951

.runOn(Schedulers.boundedElastic())

952

.flatMap(this::findById)

953

.sequential();

954

}

955

956

// Reactive caching example

957

public Mono<User> findByIdWithCache(Long id) {

958

return cacheManager.getFromCache("users", id, User.class)

959

.switchIfEmpty(

960

userRepository.findById(id)

961

.flatMap(user -> cacheManager.putInCache("users", id, user)

962

.thenReturn(user))

963

);

964

}

965

966

private Mono<User> sendWelcomeNotification(User user) {

967

NotificationRequest notification = NotificationRequest.builder()

968

.userId(user.getId())

969

.type("WELCOME")

970

.message("Welcome to our platform!")

971

.build();

972

973

return notificationClient

974

.post()

975

.uri("/notifications")

976

.bodyValue(notification)

977

.retrieve()

978

.bodyToMono(Void.class)

979

.then(Mono.just(user))

980

.onErrorResume(e -> {

981

System.err.println("Failed to send notification: " + e.getMessage());

982

return Mono.just(user); // Don't fail the user creation

983

});

984

}

985

986

private Mono<UserStats> getUserStats(Long userId) {

987

return Mono.fromCallable(() -> {

988

// Simulate stats calculation

989

return new UserStats(userId, 100, 50);

990

}).subscribeOn(Schedulers.boundedElastic());

991

}

992

993

private User convertToUser(CreateUserRequest request) {

994

// Convert request to user entity

995

User user = new User();

996

user.setUsername(request.getUsername());

997

user.setEmail(request.getEmail());

998

return user;

999

}

1000

1001

private User updateUserFromRequest(User user, UpdateUserRequest request) {

1002

// Update user from request

1003

if (request.getUsername() != null) {

1004

user.setUsername(request.getUsername());

1005

}

1006

if (request.getEmail() != null) {

1007

user.setEmail(request.getEmail());

1008

}

1009

return user;

1010

}

1011

}

1012

```

1013

1014

### WebFlux Configuration

1015

1016

```java { .api }

1017

@Configuration

1018

@EnableWebFlux

1019

public class WebFluxConfig implements WebFluxConfigurer {

1020

1021

@Override

1022

public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {

1023

// JSON configuration

1024

configurer.defaultCodecs().jackson2JsonEncoder(

1025

new Jackson2JsonEncoder(objectMapper(), MediaType.APPLICATION_JSON));

1026

configurer.defaultCodecs().jackson2JsonDecoder(

1027

new Jackson2JsonDecoder(objectMapper(), MediaType.APPLICATION_JSON));

1028

1029

// Increase buffer size for large payloads

1030

configurer.defaultCodecs().maxInMemorySize(1024 * 1024); // 1MB

1031

}

1032

1033

@Override

1034

public void addCorsMappings(CorsRegistry registry) {

1035

registry.addMapping("/api/**")

1036

.allowedOrigins("http://localhost:3000", "https://myapp.com")

1037

.allowedMethods("GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS")

1038

.allowedHeaders("*")

1039

.allowCredentials(true)

1040

.maxAge(3600);

1041

}

1042

1043

@Override

1044

public void addResourceHandlers(ResourceHandlerRegistry registry) {

1045

registry.addResourceHandler("/static/**")

1046

.addResourceLocations("classpath:/static/")

1047

.setCacheControl(CacheControl.maxAge(Duration.ofDays(365)));

1048

}

1049

1050

@Bean

1051

public ObjectMapper objectMapper() {

1052

ObjectMapper mapper = new ObjectMapper();

1053

mapper.registerModule(new JavaTimeModule());

1054

mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);

1055

mapper.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);

1056

return mapper;

1057

}

1058

1059

@Bean

1060

public WebClient webClient(WebClient.Builder builder) {

1061

return builder

1062

.codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(1024 * 1024))

1063

.build();

1064

}

1065

1066

// Custom error handler

1067

@Bean

1068

@Order(-2) // Higher precedence than DefaultErrorWebExceptionHandler

1069

public WebExceptionHandler globalExceptionHandler() {

1070

return new GlobalErrorWebExceptionHandler();

1071

}

1072

}

1073

1074

// Global error handler

1075

public class GlobalErrorWebExceptionHandler implements WebExceptionHandler {

1076

1077

private final ObjectMapper objectMapper;

1078

1079

public GlobalErrorWebExceptionHandler() {

1080

this.objectMapper = new ObjectMapper();

1081

this.objectMapper.registerModule(new JavaTimeModule());

1082

}

1083

1084

@Override

1085

public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {

1086

ServerHttpResponse response = exchange.getResponse();

1087

1088

if (response.isCommitted()) {

1089

return Mono.error(ex);

1090

}

1091

1092

response.getHeaders().add("Content-Type", "application/json");

1093

1094

ErrorResponse errorResponse;

1095

HttpStatus status;

1096

1097

if (ex instanceof UserNotFoundException) {

1098

status = HttpStatus.NOT_FOUND;

1099

errorResponse = new ErrorResponse("User not found", "USER_NOT_FOUND");

1100

} else if (ex instanceof ValidationException) {

1101

status = HttpStatus.BAD_REQUEST;

1102

errorResponse = new ErrorResponse(ex.getMessage(), "VALIDATION_ERROR");

1103

} else {

1104

status = HttpStatus.INTERNAL_SERVER_ERROR;

1105

errorResponse = new ErrorResponse("Internal server error", "INTERNAL_ERROR");

1106

}

1107

1108

response.setStatusCode(status);

1109

1110

try {

1111

byte[] bytes = objectMapper.writeValueAsBytes(errorResponse);

1112

DataBuffer buffer = response.bufferFactory().wrap(bytes);

1113

return response.writeWith(Mono.just(buffer));

1114

} catch (Exception e) {

1115

return Mono.error(e);

1116

}

1117

}

1118

}

1119

```

1120

1121

### Server-Sent Events (SSE)

1122

1123

```java { .api }

1124

@RestController

1125

@RequestMapping("/api/events")

1126

public class EventController {

1127

1128

private final EventService eventService;

1129

1130

public EventController(EventService eventService) {

1131

this.eventService = eventService;

1132

}

1133

1134

@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)

1135

public Flux<ServerSentEvent<String>> streamEvents() {

1136

return eventService.getEventStream()

1137

.map(event -> ServerSentEvent.<String>builder()

1138

.id(String.valueOf(event.getId()))

1139

.event(event.getType())

1140

.data(event.getData())

1141

.build())

1142

.onErrorResume(e -> {

1143

return Flux.just(ServerSentEvent.<String>builder()

1144

.event("error")

1145

.data("Error occurred: " + e.getMessage())

1146

.build());

1147

});

1148

}

1149

1150

@GetMapping(value = "/notifications/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)

1151

public Flux<ServerSentEvent<NotificationEvent>> streamUserNotifications(@PathVariable Long userId) {

1152

return eventService.getUserNotifications(userId)

1153

.map(notification -> ServerSentEvent.<NotificationEvent>builder()

1154

.id(notification.getId())

1155

.event("notification")

1156

.data(notification)

1157

.build())

1158

.doOnCancel(() -> System.out.println("Client disconnected from notifications stream"));

1159

}

1160

}

1161

```

1162

1163

Spring WebFlux provides a complete reactive programming model for building scalable, non-blocking web applications that can efficiently handle high concurrency with minimal resource usage.