0
# Reactive Web Framework (WebFlux)
1
2
Spring WebFlux is the reactive-stack web framework introduced in Spring 5.0. It provides a non-blocking, reactive programming model for building web applications that can handle high concurrency with a small number of threads and scale with fewer hardware resources.
3
4
## Maven Dependencies
5
6
```xml
7
<!-- Spring WebFlux -->
8
<dependency>
9
<groupId>org.springframework</groupId>
10
<artifactId>spring-webflux</artifactId>
11
<version>5.3.39</version>
12
</dependency>
13
14
<!-- Reactor Core (reactive streams) -->
15
<dependency>
16
<groupId>io.projectreactor</groupId>
17
<artifactId>reactor-core</artifactId>
18
<version>3.4.32</version>
19
</dependency>
20
21
<!-- Netty (default reactive server) -->
22
<dependency>
23
<groupId>io.netty</groupId>
24
<artifactId>netty-all</artifactId>
25
<version>4.1.112.Final</version>
26
</dependency>
27
28
<!-- Jackson for JSON (reactive) -->
29
<dependency>
30
<groupId>com.fasterxml.jackson.core</groupId>
31
<artifactId>jackson-databind</artifactId>
32
<version>2.12.7</version>
33
</dependency>
34
```
35
36
## Core Imports
37
38
```java { .api }
39
// Reactive types
40
import reactor.core.publisher.Mono;
41
import reactor.core.publisher.Flux;
42
import org.reactivestreams.Publisher;
43
44
// WebFlux annotations
45
import org.springframework.web.bind.annotation.RestController;
46
import org.springframework.web.bind.annotation.GetMapping;
47
import org.springframework.web.bind.annotation.PostMapping;
48
import org.springframework.web.bind.annotation.RequestBody;
49
import org.springframework.web.bind.annotation.PathVariable;
50
51
// Functional routing
52
import org.springframework.web.reactive.function.server.RouterFunction;
53
import org.springframework.web.reactive.function.server.RouterFunctions;
54
import org.springframework.web.reactive.function.server.ServerRequest;
55
import org.springframework.web.reactive.function.server.ServerResponse;
56
import org.springframework.web.reactive.function.server.HandlerFunction;
57
58
// Configuration
59
import org.springframework.web.reactive.config.EnableWebFlux;
60
import org.springframework.web.reactive.config.WebFluxConfigurer;
61
import org.springframework.context.annotation.Configuration;
62
63
// WebClient (reactive HTTP client)
64
import org.springframework.web.reactive.function.client.WebClient;
65
import org.springframework.web.reactive.function.client.ClientResponse;
66
67
// Server support
68
import org.springframework.web.reactive.function.server.RequestPredicates;
69
import org.springframework.http.MediaType;
70
import org.springframework.http.ResponseEntity;
71
```
72
73
## Core Reactive Types
74
75
### Mono and Flux
76
77
```java { .api }
78
// Reactive stream publisher for 0-1 elements
79
public abstract class Mono<T> implements CorePublisher<T> {
80
81
// Creation methods
82
public static <T> Mono<T> just(T data);
83
public static <T> Mono<T> justOrEmpty(T data);
84
public static <T> Mono<T> empty();
85
public static <T> Mono<T> error(Throwable error);
86
public static <T> Mono<T> fromCallable(Callable<? extends T> supplier);
87
public static <T> Mono<T> fromSupplier(Supplier<? extends T> supplier);
88
public static <T> Mono<T> defer(Supplier<? extends Mono<? extends T>> supplier);
89
90
// Transformation methods
91
public <R> Mono<R> map(Function<? super T, ? extends R> mapper);
92
public <R> Mono<R> flatMap(Function<? super T, ? extends Mono<? extends R>> transformer);
93
public <R> Flux<R> flatMapMany(Function<? super T, ? extends Publisher<? extends R>> mapper);
94
public Mono<T> filter(Predicate<? super T> tester);
95
public Mono<T> switchIfEmpty(Mono<? extends T> alternate);
96
97
// Error handling
98
public Mono<T> onErrorReturn(T fallback);
99
public Mono<T> onErrorResume(Function<? super Throwable, ? extends Mono<? extends T>> fallback);
100
public Mono<T> doOnError(Consumer<? super Throwable> onError);
101
102
// Side effects
103
public Mono<T> doOnNext(Consumer<? super T> onNext);
104
public Mono<T> doOnSuccess(Consumer<? super T> onSuccess);
105
public Mono<T> doFinally(Consumer<SignalType> onFinally);
106
107
// Blocking operations (avoid in production)
108
public T block();
109
public T block(Duration timeout);
110
111
// Subscription
112
public Disposable subscribe();
113
public Disposable subscribe(Consumer<? super T> consumer);
114
public Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer);
115
}
116
117
// Reactive stream publisher for 0-N elements
118
public abstract class Flux<T> implements CorePublisher<T> {
119
120
// Creation methods
121
public static <T> Flux<T> just(T... data);
122
public static <T> Flux<T> fromIterable(Iterable<? extends T> it);
123
public static <T> Flux<T> fromArray(T[] array);
124
public static <T> Flux<T> fromStream(Stream<? extends T> s);
125
public static <T> Flux<T> empty();
126
public static <T> Flux<T> error(Throwable error);
127
public static Flux<Long> interval(Duration period);
128
public static <T> Flux<T> defer(Supplier<? extends Publisher<T>> supplier);
129
130
// Transformation methods
131
public <V> Flux<V> map(Function<? super T, ? extends V> mapper);
132
public <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);
133
public Flux<T> filter(Predicate<? super T> p);
134
public Flux<T> take(long n);
135
public Flux<T> skip(long skipped);
136
public Flux<T> distinct();
137
public Flux<T> sort();
138
139
// Aggregation methods
140
public Mono<T> reduce(BinaryOperator<T> aggregator);
141
public <A> Mono<A> reduce(A initial, BiFunction<A, ? super T, A> accumulator);
142
public Mono<List<T>> collectList();
143
public Mono<Map<K, T>> collectMap(Function<? super T, ? extends K> keyExtractor);
144
public Mono<Long> count();
145
146
// Error handling
147
public Flux<T> onErrorReturn(T fallback);
148
public Flux<T> onErrorResume(Function<? super Throwable, ? extends Publisher<? extends T>> fallback);
149
public Flux<T> retry();
150
public Flux<T> retry(long numRetries);
151
152
// Backpressure
153
public Flux<T> onBackpressureBuffer();
154
public Flux<T> onBackpressureDrop();
155
public Flux<T> onBackpressureLatest();
156
157
// Blocking operations (avoid in production)
158
public List<T> collectList().block();
159
public Stream<T> toStream();
160
}
161
```
162
163
## Functional Routing
164
165
### RouterFunction and ServerRequest/ServerResponse
166
167
```java { .api }
168
// Represents a function that routes to a handler function
169
@FunctionalInterface
170
public interface RouterFunction<T extends ServerResponse> {
171
172
Mono<HandlerFunction<T>> route(ServerRequest request);
173
174
default RouterFunction<T> and(RouterFunction<T> other);
175
default RouterFunction<T> andRoute(RequestPredicate predicate, HandlerFunction<T> handlerFunction);
176
default RouterFunction<T> andNest(RequestPredicate predicate, RouterFunction<T> routerFunction);
177
default <S extends ServerResponse> RouterFunction<?> andOther(RouterFunction<S> other);
178
default RouterFunction<T> filter(HandlerFilterFunction<T, T> filterFunction);
179
}
180
181
// Represents a typed server-side HTTP request
182
public interface ServerRequest {
183
184
// Request line
185
HttpMethod method();
186
URI uri();
187
String path();
188
189
// Headers
190
ServerRequest.Headers headers();
191
MultiValueMap<String, String> queryParams();
192
MultiValueMap<String, String> pathVariables();
193
194
// Body
195
<T> Mono<T> bodyToMono(Class<? extends T> elementClass);
196
<T> Mono<T> bodyToMono(ParameterizedTypeReference<T> typeReference);
197
<T> Flux<T> bodyToFlux(Class<? extends T> elementClass);
198
<T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> typeReference);
199
200
// Attributes
201
Map<String, Object> attributes();
202
Optional<Object> attribute(String name);
203
204
// Path matching
205
String pathVariable(String name);
206
MultiValueMap<String, String> pathVariables();
207
208
// Convenience methods
209
Optional<String> queryParam(String name);
210
Principal principal();
211
212
// Headers interface
213
interface Headers {
214
List<MediaType> accept();
215
OptionalLong contentLength();
216
Optional<MediaType> contentType();
217
String host();
218
List<String> header(String headerName);
219
HttpHeaders asHttpHeaders();
220
}
221
}
222
223
// Represents a typed server-side HTTP response
224
public interface ServerResponse {
225
226
HttpStatus statusCode();
227
int rawStatusCode();
228
HttpHeaders headers();
229
MultiValueMap<String, ResponseCookie> cookies();
230
231
// Static factory methods
232
static BodyBuilder status(HttpStatus status);
233
static BodyBuilder status(int status);
234
static BodyBuilder ok();
235
static HeadersBuilder<?> noContent();
236
static BodyBuilder badRequest();
237
static HeadersBuilder<?> notFound();
238
static BodyBuilder unprocessableEntity();
239
240
// Builder interfaces
241
interface BodyBuilder extends HeadersBuilder<BodyBuilder> {
242
<T> Mono<ServerResponse> body(Publisher<T> publisher, Class<T> elementClass);
243
<T> Mono<ServerResponse> body(Publisher<T> publisher, ParameterizedTypeReference<T> typeRef);
244
Mono<ServerResponse> body(Object body, Class<?> bodyType);
245
<T> Mono<ServerResponse> body(Mono<T> body, Class<T> bodyType);
246
<T> Mono<ServerResponse> body(Flux<T> body, Class<T> bodyType);
247
Mono<ServerResponse> syncBody(Object body);
248
249
Mono<ServerResponse> render(String name, Object... modelAttributes);
250
Mono<ServerResponse> render(String name, Map<String, ?> model);
251
}
252
253
interface HeadersBuilder<B extends HeadersBuilder<B>> {
254
B header(String headerName, String... headerValues);
255
B headers(HttpHeaders headers);
256
B cookie(ResponseCookie cookie);
257
B cookies(Consumer<MultiValueMap<String, ResponseCookie>> cookiesConsumer);
258
B allow(HttpMethod... allowedMethods);
259
B eTag(String etag);
260
B lastModified(ZonedDateTime lastModified);
261
B location(URI location);
262
B cacheControl(CacheControl cacheControl);
263
B varyBy(String... requestHeaders);
264
265
Mono<ServerResponse> build();
266
<T> Mono<ServerResponse> build(Publisher<T> voidPublisher);
267
}
268
}
269
270
// Function that handles a server request and returns a response
271
@FunctionalInterface
272
public interface HandlerFunction<T extends ServerResponse> {
273
Mono<T> handle(ServerRequest request);
274
}
275
```
276
277
### RouterFunctions Utility
278
279
```java { .api }
280
// Factory methods for RouterFunction
281
public abstract class RouterFunctions {
282
283
// Route creation
284
public static <T extends ServerResponse> RouterFunction<T> route(RequestPredicate predicate, HandlerFunction<T> handlerFunction);
285
286
// Convenience methods for HTTP methods
287
public static RouterFunction<ServerResponse> GET(String pattern, HandlerFunction<ServerResponse> handlerFunction);
288
public static RouterFunction<ServerResponse> POST(String pattern, HandlerFunction<ServerResponse> handlerFunction);
289
public static RouterFunction<ServerResponse> PUT(String pattern, HandlerFunction<ServerResponse> handlerFunction);
290
public static RouterFunction<ServerResponse> DELETE(String pattern, HandlerFunction<ServerResponse> handlerFunction);
291
public static RouterFunction<ServerResponse> PATCH(String pattern, HandlerFunction<ServerResponse> handlerFunction);
292
293
// Nesting routes
294
public static <T extends ServerResponse> RouterFunction<T> nest(RequestPredicate predicate, RouterFunction<T> routerFunction);
295
296
// Resource handling
297
public static RouterFunction<ServerResponse> resources(String pattern, Resource location);
298
public static RouterFunction<ServerResponse> resources(Function<ServerRequest, Mono<Resource>> lookupFunction);
299
300
// Route conversion
301
public static HandlerMapping toHandlerMapping(RouterFunction<?> routerFunction);
302
public static HttpHandler toHttpHandler(RouterFunction<?> routerFunction);
303
}
304
305
// Predicates for request matching
306
public abstract class RequestPredicates {
307
308
// HTTP methods
309
public static RequestPredicate GET(String pattern);
310
public static RequestPredicate POST(String pattern);
311
public static RequestPredicate PUT(String pattern);
312
public static RequestPredicate DELETE(String pattern);
313
public static RequestPredicate PATCH(String pattern);
314
public static RequestPredicate HEAD(String pattern);
315
public static RequestPredicate OPTIONS(String pattern);
316
317
// Path matching
318
public static RequestPredicate path(String pattern);
319
public static RequestPredicate pathExtension(String extension);
320
public static RequestPredicate pathExtension(Predicate<String> extensionPredicate);
321
322
// Content type
323
public static RequestPredicate accept(MediaType... mediaTypes);
324
public static RequestPredicate contentType(MediaType... mediaTypes);
325
326
// Headers
327
public static RequestPredicate headers(Predicate<ServerRequest.Headers> headersPredicate);
328
public static RequestPredicate header(String name, String value);
329
330
// Query parameters
331
public static RequestPredicate queryParam(String name, String value);
332
public static RequestPredicate queryParam(String name, Predicate<String> predicate);
333
334
// Logical operations
335
public static RequestPredicate and(RequestPredicate left, RequestPredicate right);
336
public static RequestPredicate or(RequestPredicate left, RequestPredicate right);
337
public static RequestPredicate not(RequestPredicate predicate);
338
}
339
```
340
341
## Annotated Controllers
342
343
### WebFlux Controller Annotations
344
345
```java { .api }
346
// WebFlux supports same annotations as Spring MVC but with reactive return types
347
@RestController
348
@RequestMapping("/api/reactive")
349
public class ReactiveController {
350
351
@GetMapping("/mono")
352
public Mono<String> getMono() {
353
return Mono.just("Hello Reactive World!");
354
}
355
356
@GetMapping("/flux")
357
public Flux<String> getFlux() {
358
return Flux.just("Item 1", "Item 2", "Item 3");
359
}
360
361
@PostMapping("/mono")
362
public Mono<ResponseEntity<String>> postMono(@RequestBody Mono<String> body) {
363
return body.map(value -> ResponseEntity.ok("Received: " + value));
364
}
365
366
@GetMapping(value = "/stream", produces = MediaType.TEXT_PLAIN_VALUE)
367
public Flux<String> getStream() {
368
return Flux.interval(Duration.ofSeconds(1))
369
.map(i -> "Data " + i + "\n");
370
}
371
}
372
```
373
374
## WebClient (Reactive HTTP Client)
375
376
### WebClient Interface
377
378
```java { .api }
379
// Non-blocking, reactive client to perform HTTP requests
380
public interface WebClient {
381
382
// Request specification
383
RequestHeadersUriSpec<?> get();
384
RequestBodyUriSpec post();
385
RequestBodyUriSpec put();
386
RequestBodyUriSpec patch();
387
RequestHeadersUriSpec<?> delete();
388
RequestHeadersUriSpec<?> options();
389
RequestHeadersUriSpec<?> head();
390
RequestBodyUriSpec method(HttpMethod method);
391
392
// Builder
393
static WebClient create();
394
static WebClient create(String baseUrl);
395
static Builder builder();
396
397
// Builder interface
398
interface Builder {
399
Builder baseUrl(String baseUrl);
400
Builder defaultHeader(String header, String... values);
401
Builder defaultHeaders(Consumer<HttpHeaders> headersConsumer);
402
Builder defaultCookie(String cookie, String... values);
403
Builder defaultCookies(Consumer<MultiValueMap<String, String>> cookiesConsumer);
404
Builder filter(ExchangeFilterFunction filter);
405
Builder filters(Consumer<List<ExchangeFilterFunction>> filtersConsumer);
406
Builder clientConnector(ClientHttpConnector connector);
407
Builder codecs(Consumer<ClientCodecConfigurer> configurer);
408
409
WebClient build();
410
}
411
412
// Request specifications
413
interface RequestBodyUriSpec extends RequestBodySpec, RequestHeadersUriSpec<RequestBodySpec> {
414
}
415
416
interface RequestBodySpec extends RequestHeadersSpec<RequestBodySpec> {
417
RequestBodySpec body(Publisher<DataBuffer> body, Class<? extends DataBuffer> bodyType);
418
<T> RequestBodySpec body(Publisher<T> body, Class<T> bodyType);
419
<T> RequestBodySpec body(Publisher<T> body, ParameterizedTypeReference<T> bodyTypeRef);
420
RequestBodySpec body(Object body);
421
<T> RequestBodySpec body(Mono<T> body, Class<T> bodyType);
422
<T> RequestBodySpec body(Mono<T> body, ParameterizedTypeReference<T> bodyTypeRef);
423
RequestBodySpec syncBody(Object body);
424
}
425
426
interface RequestHeadersSpec<S extends RequestHeadersSpec<S>> {
427
S header(String headerName, String... headerValues);
428
S headers(Consumer<HttpHeaders> headersConsumer);
429
S attribute(String name, Object value);
430
S attributes(Consumer<Map<String, Object>> attributesConsumer);
431
S cookie(String name, String value);
432
S cookies(Consumer<MultiValueMap<String, String>> cookiesConsumer);
433
S ifNoneMatch(String... ifNoneMatches);
434
S ifModifiedSince(ZonedDateTime ifModifiedSince);
435
436
ResponseSpec retrieve();
437
Mono<ClientResponse> exchange();
438
}
439
}
440
441
// Response specification
442
public interface ResponseSpec {
443
ResponseSpec onStatus(Predicate<HttpStatus> statusPredicate, Function<ClientResponse, Mono<? extends Throwable>> exceptionFunction);
444
<T> Mono<T> bodyToMono(Class<T> bodyType);
445
<T> Mono<T> bodyToMono(ParameterizedTypeReference<T> bodyTypeReference);
446
<T> Flux<T> bodyToFlux(Class<T> bodyType);
447
<T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> bodyTypeReference);
448
Mono<ResponseEntity<Void>> toBodilessEntity();
449
<T> Mono<ResponseEntity<T>> toEntity(Class<T> bodyType);
450
<T> Mono<ResponseEntity<T>> toEntity(ParameterizedTypeReference<T> bodyTypeReference);
451
<T> Mono<ResponseEntity<List<T>>> toEntityList(Class<T> bodyType);
452
<T> Mono<ResponseEntity<List<T>>> toEntityList(ParameterizedTypeReference<T> bodyTypeReference);
453
}
454
```
455
456
## Configuration
457
458
### WebFlux Configuration
459
460
```java { .api }
461
// Enables WebFlux configuration
462
@Retention(RetentionPolicy.RUNTIME)
463
@Target(ElementType.TYPE)
464
@Documented
465
@Import(DelegatingWebFluxConfiguration.class)
466
public @interface EnableWebFlux {
467
}
468
469
// Interface for customizing WebFlux configuration
470
public interface WebFluxConfigurer {
471
472
default void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
473
}
474
475
default void addFormatters(FormatterRegistry registry) {
476
}
477
478
default Validator getValidator() {
479
return null;
480
}
481
482
default MessageCodesResolver getMessageCodesResolver() {
483
return null;
484
}
485
486
default void addResourceHandlers(ResourceHandlerRegistry registry) {
487
}
488
489
default void addCorsMappings(CorsRegistry registry) {
490
}
491
492
default void configurePathMatching(PathMatchConfigurer configurer) {
493
}
494
495
default void configureArgumentResolvers(ArgumentResolverConfigurer configurer) {
496
}
497
498
default void configureViewResolvers(ViewResolverRegistry registry) {
499
}
500
}
501
```
502
503
## Practical Usage Examples
504
505
### Basic Reactive Controllers
506
507
```java { .api }
508
@RestController
509
@RequestMapping("/api/users")
510
public class ReactiveUserController {
511
512
private final ReactiveUserService userService;
513
514
public ReactiveUserController(ReactiveUserService userService) {
515
this.userService = userService;
516
}
517
518
@GetMapping
519
public Flux<User> getAllUsers(
520
@RequestParam(defaultValue = "0") int page,
521
@RequestParam(defaultValue = "10") int size) {
522
523
return userService.findAll(PageRequest.of(page, size));
524
}
525
526
@GetMapping("/{id}")
527
public Mono<ResponseEntity<User>> getUserById(@PathVariable Long id) {
528
return userService.findById(id)
529
.map(user -> ResponseEntity.ok(user))
530
.defaultIfEmpty(ResponseEntity.notFound().build());
531
}
532
533
@PostMapping
534
public Mono<ResponseEntity<User>> createUser(@Valid @RequestBody Mono<CreateUserRequest> request) {
535
return request
536
.flatMap(userService::createUser)
537
.map(user -> ResponseEntity.status(HttpStatus.CREATED).body(user))
538
.onErrorResume(ValidationException.class, e ->
539
Mono.just(ResponseEntity.badRequest().build()));
540
}
541
542
@PutMapping("/{id}")
543
public Mono<ResponseEntity<User>> updateUser(
544
@PathVariable Long id,
545
@Valid @RequestBody Mono<UpdateUserRequest> request) {
546
547
return request
548
.flatMap(req -> userService.updateUser(id, req))
549
.map(ResponseEntity::ok)
550
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
551
}
552
553
@DeleteMapping("/{id}")
554
public Mono<ResponseEntity<Void>> deleteUser(@PathVariable Long id) {
555
return userService.deleteById(id)
556
.then(Mono.just(ResponseEntity.noContent().<Void>build()))
557
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
558
}
559
560
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
561
public Flux<User> streamUsers() {
562
return userService.findAll()
563
.delayElements(Duration.ofSeconds(1));
564
}
565
566
@GetMapping("/search")
567
public Flux<User> searchUsers(@RequestParam String query) {
568
return userService.searchByName(query)
569
.take(Duration.ofSeconds(5)); // Timeout after 5 seconds
570
}
571
}
572
```
573
574
### Functional Routing Configuration
575
576
```java { .api }
577
@Configuration
578
public class RouterConfig {
579
580
@Bean
581
public RouterFunction<ServerResponse> userRoutes(UserHandler userHandler) {
582
return RouterFunctions
583
.nest(RequestPredicates.path("/api/users"),
584
RouterFunctions.GET("", userHandler::getUsers)
585
.andRoute(GET("/{id}"), userHandler::getUser)
586
.andRoute(POST(""), userHandler::createUser)
587
.andRoute(PUT("/{id}"), userHandler::updateUser)
588
.andRoute(DELETE("/{id}"), userHandler::deleteUser)
589
.andRoute(GET("/search"), userHandler::searchUsers)
590
);
591
}
592
593
@Bean
594
public RouterFunction<ServerResponse> fileRoutes(FileHandler fileHandler) {
595
return RouterFunctions
596
.nest(RequestPredicates.path("/api/files"),
597
RouterFunctions.POST("/upload", fileHandler::uploadFile)
598
.andRoute(GET("/{id}/download"), fileHandler::downloadFile)
599
.andRoute(DELETE("/{id}"), fileHandler::deleteFile)
600
);
601
}
602
603
@Bean
604
public RouterFunction<ServerResponse> adminRoutes(AdminHandler adminHandler) {
605
return RouterFunctions
606
.nest(RequestPredicates.path("/api/admin").and(accept(APPLICATION_JSON)),
607
RouterFunctions.GET("/stats", adminHandler::getSystemStats)
608
.andRoute(POST("/cache/clear"), adminHandler::clearCache)
609
.andRoute(GET("/health"), adminHandler::healthCheck)
610
)
611
.filter(authenticationFilter()); // Apply authentication filter
612
}
613
614
// Global router combining all routes
615
@Bean
616
public RouterFunction<ServerResponse> routes(
617
RouterFunction<ServerResponse> userRoutes,
618
RouterFunction<ServerResponse> fileRoutes,
619
RouterFunction<ServerResponse> adminRoutes) {
620
621
return userRoutes
622
.and(fileRoutes)
623
.and(adminRoutes)
624
.filter(loggingFilter())
625
.filter(errorHandlingFilter());
626
}
627
628
private HandlerFilterFunction<ServerResponse, ServerResponse> loggingFilter() {
629
return (request, next) -> {
630
long startTime = System.currentTimeMillis();
631
return next.handle(request)
632
.doOnNext(response -> {
633
long duration = System.currentTimeMillis() - startTime;
634
System.out.println(request.method() + " " + request.path() +
635
" - " + response.statusCode() + " (" + duration + "ms)");
636
});
637
};
638
}
639
640
private HandlerFilterFunction<ServerResponse, ServerResponse> errorHandlingFilter() {
641
return (request, next) -> {
642
return next.handle(request)
643
.onErrorResume(Exception.class, e -> {
644
System.err.println("Error handling request: " + e.getMessage());
645
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
646
.bodyValue("Internal Server Error");
647
});
648
};
649
}
650
651
private HandlerFilterFunction<ServerResponse, ServerResponse> authenticationFilter() {
652
return (request, next) -> {
653
String token = request.headers().firstHeader("Authorization");
654
if (token == null || !token.startsWith("Bearer ")) {
655
return ServerResponse.status(HttpStatus.UNAUTHORIZED)
656
.bodyValue("Authentication required");
657
}
658
659
// Validate token (simplified)
660
if (!isValidToken(token.substring(7))) {
661
return ServerResponse.status(HttpStatus.FORBIDDEN)
662
.bodyValue("Invalid token");
663
}
664
665
return next.handle(request);
666
};
667
}
668
669
private boolean isValidToken(String token) {
670
// Token validation logic
671
return token != null && !token.isEmpty();
672
}
673
}
674
675
// Handler class for functional endpoints
676
@Component
677
public class UserHandler {
678
679
private final ReactiveUserService userService;
680
681
public UserHandler(ReactiveUserService userService) {
682
this.userService = userService;
683
}
684
685
public Mono<ServerResponse> getUsers(ServerRequest request) {
686
int page = Integer.parseInt(request.queryParam("page").orElse("0"));
687
int size = Integer.parseInt(request.queryParam("size").orElse("10"));
688
689
Flux<User> users = userService.findAll(PageRequest.of(page, size));
690
691
return ServerResponse.ok()
692
.contentType(MediaType.APPLICATION_JSON)
693
.body(users, User.class);
694
}
695
696
public Mono<ServerResponse> getUser(ServerRequest request) {
697
Long id = Long.valueOf(request.pathVariable("id"));
698
699
return userService.findById(id)
700
.flatMap(user -> ServerResponse.ok()
701
.contentType(MediaType.APPLICATION_JSON)
702
.bodyValue(user))
703
.switchIfEmpty(ServerResponse.notFound().build());
704
}
705
706
public Mono<ServerResponse> createUser(ServerRequest request) {
707
return request.bodyToMono(CreateUserRequest.class)
708
.flatMap(userService::createUser)
709
.flatMap(user -> ServerResponse.status(HttpStatus.CREATED)
710
.contentType(MediaType.APPLICATION_JSON)
711
.bodyValue(user))
712
.onErrorResume(ValidationException.class, e ->
713
ServerResponse.badRequest().bodyValue(e.getMessage()));
714
}
715
716
public Mono<ServerResponse> updateUser(ServerRequest request) {
717
Long id = Long.valueOf(request.pathVariable("id"));
718
719
return request.bodyToMono(UpdateUserRequest.class)
720
.flatMap(req -> userService.updateUser(id, req))
721
.flatMap(user -> ServerResponse.ok()
722
.contentType(MediaType.APPLICATION_JSON)
723
.bodyValue(user))
724
.switchIfEmpty(ServerResponse.notFound().build());
725
}
726
727
public Mono<ServerResponse> deleteUser(ServerRequest request) {
728
Long id = Long.valueOf(request.pathVariable("id"));
729
730
return userService.deleteById(id)
731
.then(ServerResponse.noContent().build())
732
.switchIfEmpty(ServerResponse.notFound().build());
733
}
734
735
public Mono<ServerResponse> searchUsers(ServerRequest request) {
736
String query = request.queryParam("q").orElse("");
737
738
if (query.isEmpty()) {
739
return ServerResponse.badRequest().bodyValue("Query parameter 'q' is required");
740
}
741
742
Flux<User> users = userService.searchByName(query);
743
744
return ServerResponse.ok()
745
.contentType(MediaType.APPLICATION_JSON)
746
.body(users, User.class);
747
}
748
}
749
```
750
751
### WebClient Usage
752
753
```java { .api }
754
@Service
755
public class ExternalApiService {
756
757
private final WebClient webClient;
758
759
public ExternalApiService(WebClient.Builder webClientBuilder) {
760
this.webClient = webClientBuilder
761
.baseUrl("https://api.external-service.com")
762
.defaultHeader(HttpHeaders.USER_AGENT, "MyApp/1.0")
763
.filter(logRequest())
764
.filter(handleErrors())
765
.build();
766
}
767
768
public Mono<User> getUser(Long userId) {
769
return webClient
770
.get()
771
.uri("/users/{id}", userId)
772
.header(HttpHeaders.AUTHORIZATION, "Bearer " + getToken())
773
.retrieve()
774
.bodyToMono(User.class)
775
.timeout(Duration.ofSeconds(10))
776
.retry(3);
777
}
778
779
public Flux<User> getUsers(int page, int size) {
780
return webClient
781
.get()
782
.uri(uriBuilder -> uriBuilder
783
.path("/users")
784
.queryParam("page", page)
785
.queryParam("size", size)
786
.build())
787
.retrieve()
788
.bodyToFlux(User.class);
789
}
790
791
public Mono<User> createUser(CreateUserRequest request) {
792
return webClient
793
.post()
794
.uri("/users")
795
.contentType(MediaType.APPLICATION_JSON)
796
.body(Mono.just(request), CreateUserRequest.class)
797
.retrieve()
798
.bodyToMono(User.class);
799
}
800
801
public Mono<Void> deleteUser(Long userId) {
802
return webClient
803
.delete()
804
.uri("/users/{id}", userId)
805
.retrieve()
806
.bodyToMono(Void.class);
807
}
808
809
// File upload example
810
public Mono<UploadResponse> uploadFile(String filename, Flux<DataBuffer> fileData) {
811
return webClient
812
.post()
813
.uri("/files/upload")
814
.contentType(MediaType.APPLICATION_OCTET_STREAM)
815
.header("X-Filename", filename)
816
.body(fileData, DataBuffer.class)
817
.retrieve()
818
.bodyToMono(UploadResponse.class);
819
}
820
821
// Streaming response
822
public Flux<String> getStreamingData() {
823
return webClient
824
.get()
825
.uri("/stream")
826
.accept(MediaType.TEXT_EVENT_STREAM)
827
.retrieve()
828
.bodyToFlux(String.class);
829
}
830
831
// Error handling with exchange()
832
public Mono<User> getUserWithDetailedErrorHandling(Long userId) {
833
return webClient
834
.get()
835
.uri("/users/{id}", userId)
836
.exchange()
837
.flatMap(response -> {
838
if (response.statusCode().is2xxSuccessful()) {
839
return response.bodyToMono(User.class);
840
} else if (response.statusCode() == HttpStatus.NOT_FOUND) {
841
return Mono.error(new UserNotFoundException("User not found: " + userId));
842
} else {
843
return response.bodyToMono(String.class)
844
.flatMap(errorBody -> Mono.error(new ApiException("API Error: " + errorBody)));
845
}
846
});
847
}
848
849
private ExchangeFilterFunction logRequest() {
850
return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
851
System.out.println("Request: " + clientRequest.method() + " " + clientRequest.url());
852
return Mono.just(clientRequest);
853
});
854
}
855
856
private ExchangeFilterFunction handleErrors() {
857
return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
858
if (clientResponse.statusCode().is5xxServerError()) {
859
return clientResponse.bodyToMono(String.class)
860
.flatMap(errorBody -> Mono.error(new ServerException("Server error: " + errorBody)));
861
}
862
return Mono.just(clientResponse);
863
});
864
}
865
866
private String getToken() {
867
// Get authentication token
868
return "your-auth-token";
869
}
870
}
871
```
872
873
### Reactive Data Service
874
875
```java { .api }
876
@Service
877
public class ReactiveUserService {
878
879
private final ReactiveUserRepository userRepository;
880
private final WebClient notificationClient;
881
882
public ReactiveUserService(ReactiveUserRepository userRepository,
883
WebClient notificationClient) {
884
this.userRepository = userRepository;
885
this.notificationClient = notificationClient;
886
}
887
888
public Flux<User> findAll(Pageable pageable) {
889
return userRepository.findAllBy(pageable)
890
.onErrorResume(DataAccessException.class, e -> {
891
System.err.println("Database error: " + e.getMessage());
892
return Flux.empty();
893
});
894
}
895
896
public Mono<User> findById(Long id) {
897
return userRepository.findById(id)
898
.switchIfEmpty(Mono.error(new UserNotFoundException("User not found: " + id)));
899
}
900
901
@Transactional
902
public Mono<User> createUser(CreateUserRequest request) {
903
return Mono.fromCallable(() -> convertToUser(request))
904
.flatMap(userRepository::save)
905
.flatMap(this::sendWelcomeNotification)
906
.onErrorMap(DataIntegrityViolationException.class, e ->
907
new UserAlreadyExistsException("User already exists"));
908
}
909
910
@Transactional
911
public Mono<User> updateUser(Long id, UpdateUserRequest request) {
912
return userRepository.findById(id)
913
.switchIfEmpty(Mono.error(new UserNotFoundException("User not found: " + id)))
914
.map(existingUser -> updateUserFromRequest(existingUser, request))
915
.flatMap(userRepository::save);
916
}
917
918
@Transactional
919
public Mono<Void> deleteById(Long id) {
920
return userRepository.existsById(id)
921
.flatMap(exists -> {
922
if (!exists) {
923
return Mono.error(new UserNotFoundException("User not found: " + id));
924
}
925
return userRepository.deleteById(id);
926
});
927
}
928
929
public Flux<User> searchByName(String name) {
930
return userRepository.findByNameContainingIgnoreCase(name)
931
.timeout(Duration.ofSeconds(5))
932
.onErrorResume(TimeoutException.class, e -> {
933
System.err.println("Search timeout for query: " + name);
934
return Flux.empty();
935
});
936
}
937
938
// Reactive composition example
939
public Mono<UserWithStats> getUserWithStats(Long userId) {
940
Mono<User> userMono = findById(userId);
941
Mono<UserStats> statsMono = getUserStats(userId);
942
943
return Mono.zip(userMono, statsMono)
944
.map(tuple -> new UserWithStats(tuple.getT1(), tuple.getT2()));
945
}
946
947
// Parallel processing example
948
public Flux<User> processUsersInParallel(List<Long> userIds) {
949
return Flux.fromIterable(userIds)
950
.parallel(4) // Use 4 parallel threads
951
.runOn(Schedulers.boundedElastic())
952
.flatMap(this::findById)
953
.sequential();
954
}
955
956
// Reactive caching example
957
public Mono<User> findByIdWithCache(Long id) {
958
return cacheManager.getFromCache("users", id, User.class)
959
.switchIfEmpty(
960
userRepository.findById(id)
961
.flatMap(user -> cacheManager.putInCache("users", id, user)
962
.thenReturn(user))
963
);
964
}
965
966
private Mono<User> sendWelcomeNotification(User user) {
967
NotificationRequest notification = NotificationRequest.builder()
968
.userId(user.getId())
969
.type("WELCOME")
970
.message("Welcome to our platform!")
971
.build();
972
973
return notificationClient
974
.post()
975
.uri("/notifications")
976
.bodyValue(notification)
977
.retrieve()
978
.bodyToMono(Void.class)
979
.then(Mono.just(user))
980
.onErrorResume(e -> {
981
System.err.println("Failed to send notification: " + e.getMessage());
982
return Mono.just(user); // Don't fail the user creation
983
});
984
}
985
986
private Mono<UserStats> getUserStats(Long userId) {
987
return Mono.fromCallable(() -> {
988
// Simulate stats calculation
989
return new UserStats(userId, 100, 50);
990
}).subscribeOn(Schedulers.boundedElastic());
991
}
992
993
private User convertToUser(CreateUserRequest request) {
994
// Convert request to user entity
995
User user = new User();
996
user.setUsername(request.getUsername());
997
user.setEmail(request.getEmail());
998
return user;
999
}
1000
1001
private User updateUserFromRequest(User user, UpdateUserRequest request) {
1002
// Update user from request
1003
if (request.getUsername() != null) {
1004
user.setUsername(request.getUsername());
1005
}
1006
if (request.getEmail() != null) {
1007
user.setEmail(request.getEmail());
1008
}
1009
return user;
1010
}
1011
}
1012
```
1013
1014
### WebFlux Configuration
1015
1016
```java { .api }
1017
@Configuration
1018
@EnableWebFlux
1019
public class WebFluxConfig implements WebFluxConfigurer {
1020
1021
@Override
1022
public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
1023
// JSON configuration
1024
configurer.defaultCodecs().jackson2JsonEncoder(
1025
new Jackson2JsonEncoder(objectMapper(), MediaType.APPLICATION_JSON));
1026
configurer.defaultCodecs().jackson2JsonDecoder(
1027
new Jackson2JsonDecoder(objectMapper(), MediaType.APPLICATION_JSON));
1028
1029
// Increase buffer size for large payloads
1030
configurer.defaultCodecs().maxInMemorySize(1024 * 1024); // 1MB
1031
}
1032
1033
@Override
1034
public void addCorsMappings(CorsRegistry registry) {
1035
registry.addMapping("/api/**")
1036
.allowedOrigins("http://localhost:3000", "https://myapp.com")
1037
.allowedMethods("GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS")
1038
.allowedHeaders("*")
1039
.allowCredentials(true)
1040
.maxAge(3600);
1041
}
1042
1043
@Override
1044
public void addResourceHandlers(ResourceHandlerRegistry registry) {
1045
registry.addResourceHandler("/static/**")
1046
.addResourceLocations("classpath:/static/")
1047
.setCacheControl(CacheControl.maxAge(Duration.ofDays(365)));
1048
}
1049
1050
@Bean
1051
public ObjectMapper objectMapper() {
1052
ObjectMapper mapper = new ObjectMapper();
1053
mapper.registerModule(new JavaTimeModule());
1054
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
1055
mapper.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
1056
return mapper;
1057
}
1058
1059
@Bean
1060
public WebClient webClient(WebClient.Builder builder) {
1061
return builder
1062
.codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(1024 * 1024))
1063
.build();
1064
}
1065
1066
// Custom error handler
1067
@Bean
1068
@Order(-2) // Higher precedence than DefaultErrorWebExceptionHandler
1069
public WebExceptionHandler globalExceptionHandler() {
1070
return new GlobalErrorWebExceptionHandler();
1071
}
1072
}
1073
1074
// Global error handler
1075
public class GlobalErrorWebExceptionHandler implements WebExceptionHandler {
1076
1077
private final ObjectMapper objectMapper;
1078
1079
public GlobalErrorWebExceptionHandler() {
1080
this.objectMapper = new ObjectMapper();
1081
this.objectMapper.registerModule(new JavaTimeModule());
1082
}
1083
1084
@Override
1085
public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
1086
ServerHttpResponse response = exchange.getResponse();
1087
1088
if (response.isCommitted()) {
1089
return Mono.error(ex);
1090
}
1091
1092
response.getHeaders().add("Content-Type", "application/json");
1093
1094
ErrorResponse errorResponse;
1095
HttpStatus status;
1096
1097
if (ex instanceof UserNotFoundException) {
1098
status = HttpStatus.NOT_FOUND;
1099
errorResponse = new ErrorResponse("User not found", "USER_NOT_FOUND");
1100
} else if (ex instanceof ValidationException) {
1101
status = HttpStatus.BAD_REQUEST;
1102
errorResponse = new ErrorResponse(ex.getMessage(), "VALIDATION_ERROR");
1103
} else {
1104
status = HttpStatus.INTERNAL_SERVER_ERROR;
1105
errorResponse = new ErrorResponse("Internal server error", "INTERNAL_ERROR");
1106
}
1107
1108
response.setStatusCode(status);
1109
1110
try {
1111
byte[] bytes = objectMapper.writeValueAsBytes(errorResponse);
1112
DataBuffer buffer = response.bufferFactory().wrap(bytes);
1113
return response.writeWith(Mono.just(buffer));
1114
} catch (Exception e) {
1115
return Mono.error(e);
1116
}
1117
}
1118
}
1119
```
1120
1121
### Server-Sent Events (SSE)
1122
1123
```java { .api }
1124
@RestController
1125
@RequestMapping("/api/events")
1126
public class EventController {
1127
1128
private final EventService eventService;
1129
1130
public EventController(EventService eventService) {
1131
this.eventService = eventService;
1132
}
1133
1134
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
1135
public Flux<ServerSentEvent<String>> streamEvents() {
1136
return eventService.getEventStream()
1137
.map(event -> ServerSentEvent.<String>builder()
1138
.id(String.valueOf(event.getId()))
1139
.event(event.getType())
1140
.data(event.getData())
1141
.build())
1142
.onErrorResume(e -> {
1143
return Flux.just(ServerSentEvent.<String>builder()
1144
.event("error")
1145
.data("Error occurred: " + e.getMessage())
1146
.build());
1147
});
1148
}
1149
1150
@GetMapping(value = "/notifications/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
1151
public Flux<ServerSentEvent<NotificationEvent>> streamUserNotifications(@PathVariable Long userId) {
1152
return eventService.getUserNotifications(userId)
1153
.map(notification -> ServerSentEvent.<NotificationEvent>builder()
1154
.id(notification.getId())
1155
.event("notification")
1156
.data(notification)
1157
.build())
1158
.doOnCancel(() -> System.out.println("Client disconnected from notifications stream"));
1159
}
1160
}
1161
```
1162
1163
Spring WebFlux provides a complete reactive programming model for building scalable, non-blocking web applications that can efficiently handle high concurrency with minimal resource usage.