Comprehensive application framework and inversion of control container for the Java platform providing dependency injection, AOP, data access, transaction management, and web framework capabilities
Spring WebFlux is the reactive-stack web framework introduced in Spring 5.0. It provides a non-blocking, reactive programming model for building web applications that can handle high concurrency with a small number of threads and scale with fewer hardware resources.
<!-- Spring WebFlux -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
<version>5.3.39</version>
</dependency>
<!-- Reactor Core (reactive streams) -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.32</version>
</dependency>
<!-- Netty (default reactive server) -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.112.Final</version>
</dependency>
<!-- Jackson for JSON (reactive) -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.7</version>
</dependency>// Reactive types
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import org.reactivestreams.Publisher;
// WebFlux annotations
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.PathVariable;
// Functional routing
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.reactive.function.server.HandlerFunction;
// Configuration
import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.reactive.config.WebFluxConfigurer;
import org.springframework.context.annotation.Configuration;
// WebClient (reactive HTTP client)
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.ClientResponse;
// Server support
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;// Reactive stream publisher for 0-1 elements
public abstract class Mono<T> implements CorePublisher<T> {
// Creation methods
public static <T> Mono<T> just(T data);
public static <T> Mono<T> justOrEmpty(T data);
public static <T> Mono<T> empty();
public static <T> Mono<T> error(Throwable error);
public static <T> Mono<T> fromCallable(Callable<? extends T> supplier);
public static <T> Mono<T> fromSupplier(Supplier<? extends T> supplier);
public static <T> Mono<T> defer(Supplier<? extends Mono<? extends T>> supplier);
// Transformation methods
public <R> Mono<R> map(Function<? super T, ? extends R> mapper);
public <R> Mono<R> flatMap(Function<? super T, ? extends Mono<? extends R>> transformer);
public <R> Flux<R> flatMapMany(Function<? super T, ? extends Publisher<? extends R>> mapper);
public Mono<T> filter(Predicate<? super T> tester);
public Mono<T> switchIfEmpty(Mono<? extends T> alternate);
// Error handling
public Mono<T> onErrorReturn(T fallback);
public Mono<T> onErrorResume(Function<? super Throwable, ? extends Mono<? extends T>> fallback);
public Mono<T> doOnError(Consumer<? super Throwable> onError);
// Side effects
public Mono<T> doOnNext(Consumer<? super T> onNext);
public Mono<T> doOnSuccess(Consumer<? super T> onSuccess);
public Mono<T> doFinally(Consumer<SignalType> onFinally);
// Blocking operations (avoid in production)
public T block();
public T block(Duration timeout);
// Subscription
public Disposable subscribe();
public Disposable subscribe(Consumer<? super T> consumer);
public Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer);
}
// Reactive stream publisher for 0-N elements
public abstract class Flux<T> implements CorePublisher<T> {
// Creation methods
public static <T> Flux<T> just(T... data);
public static <T> Flux<T> fromIterable(Iterable<? extends T> it);
public static <T> Flux<T> fromArray(T[] array);
public static <T> Flux<T> fromStream(Stream<? extends T> s);
public static <T> Flux<T> empty();
public static <T> Flux<T> error(Throwable error);
public static Flux<Long> interval(Duration period);
public static <T> Flux<T> defer(Supplier<? extends Publisher<T>> supplier);
// Transformation methods
public <V> Flux<V> map(Function<? super T, ? extends V> mapper);
public <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);
public Flux<T> filter(Predicate<? super T> p);
public Flux<T> take(long n);
public Flux<T> skip(long skipped);
public Flux<T> distinct();
public Flux<T> sort();
// Aggregation methods
public Mono<T> reduce(BinaryOperator<T> aggregator);
public <A> Mono<A> reduce(A initial, BiFunction<A, ? super T, A> accumulator);
public Mono<List<T>> collectList();
public Mono<Map<K, T>> collectMap(Function<? super T, ? extends K> keyExtractor);
public Mono<Long> count();
// Error handling
public Flux<T> onErrorReturn(T fallback);
public Flux<T> onErrorResume(Function<? super Throwable, ? extends Publisher<? extends T>> fallback);
public Flux<T> retry();
public Flux<T> retry(long numRetries);
// Backpressure
public Flux<T> onBackpressureBuffer();
public Flux<T> onBackpressureDrop();
public Flux<T> onBackpressureLatest();
// Blocking operations (avoid in production)
public List<T> collectList().block();
public Stream<T> toStream();
}// Represents a function that routes to a handler function
@FunctionalInterface
public interface RouterFunction<T extends ServerResponse> {
Mono<HandlerFunction<T>> route(ServerRequest request);
default RouterFunction<T> and(RouterFunction<T> other);
default RouterFunction<T> andRoute(RequestPredicate predicate, HandlerFunction<T> handlerFunction);
default RouterFunction<T> andNest(RequestPredicate predicate, RouterFunction<T> routerFunction);
default <S extends ServerResponse> RouterFunction<?> andOther(RouterFunction<S> other);
default RouterFunction<T> filter(HandlerFilterFunction<T, T> filterFunction);
}
// Represents a typed server-side HTTP request
public interface ServerRequest {
// Request line
HttpMethod method();
URI uri();
String path();
// Headers
ServerRequest.Headers headers();
MultiValueMap<String, String> queryParams();
MultiValueMap<String, String> pathVariables();
// Body
<T> Mono<T> bodyToMono(Class<? extends T> elementClass);
<T> Mono<T> bodyToMono(ParameterizedTypeReference<T> typeReference);
<T> Flux<T> bodyToFlux(Class<? extends T> elementClass);
<T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> typeReference);
// Attributes
Map<String, Object> attributes();
Optional<Object> attribute(String name);
// Path matching
String pathVariable(String name);
MultiValueMap<String, String> pathVariables();
// Convenience methods
Optional<String> queryParam(String name);
Principal principal();
// Headers interface
interface Headers {
List<MediaType> accept();
OptionalLong contentLength();
Optional<MediaType> contentType();
String host();
List<String> header(String headerName);
HttpHeaders asHttpHeaders();
}
}
// Represents a typed server-side HTTP response
public interface ServerResponse {
HttpStatus statusCode();
int rawStatusCode();
HttpHeaders headers();
MultiValueMap<String, ResponseCookie> cookies();
// Static factory methods
static BodyBuilder status(HttpStatus status);
static BodyBuilder status(int status);
static BodyBuilder ok();
static HeadersBuilder<?> noContent();
static BodyBuilder badRequest();
static HeadersBuilder<?> notFound();
static BodyBuilder unprocessableEntity();
// Builder interfaces
interface BodyBuilder extends HeadersBuilder<BodyBuilder> {
<T> Mono<ServerResponse> body(Publisher<T> publisher, Class<T> elementClass);
<T> Mono<ServerResponse> body(Publisher<T> publisher, ParameterizedTypeReference<T> typeRef);
Mono<ServerResponse> body(Object body, Class<?> bodyType);
<T> Mono<ServerResponse> body(Mono<T> body, Class<T> bodyType);
<T> Mono<ServerResponse> body(Flux<T> body, Class<T> bodyType);
Mono<ServerResponse> syncBody(Object body);
Mono<ServerResponse> render(String name, Object... modelAttributes);
Mono<ServerResponse> render(String name, Map<String, ?> model);
}
interface HeadersBuilder<B extends HeadersBuilder<B>> {
B header(String headerName, String... headerValues);
B headers(HttpHeaders headers);
B cookie(ResponseCookie cookie);
B cookies(Consumer<MultiValueMap<String, ResponseCookie>> cookiesConsumer);
B allow(HttpMethod... allowedMethods);
B eTag(String etag);
B lastModified(ZonedDateTime lastModified);
B location(URI location);
B cacheControl(CacheControl cacheControl);
B varyBy(String... requestHeaders);
Mono<ServerResponse> build();
<T> Mono<ServerResponse> build(Publisher<T> voidPublisher);
}
}
// Function that handles a server request and returns a response
@FunctionalInterface
public interface HandlerFunction<T extends ServerResponse> {
Mono<T> handle(ServerRequest request);
}// Factory methods for RouterFunction
public abstract class RouterFunctions {
// Route creation
public static <T extends ServerResponse> RouterFunction<T> route(RequestPredicate predicate, HandlerFunction<T> handlerFunction);
// Convenience methods for HTTP methods
public static RouterFunction<ServerResponse> GET(String pattern, HandlerFunction<ServerResponse> handlerFunction);
public static RouterFunction<ServerResponse> POST(String pattern, HandlerFunction<ServerResponse> handlerFunction);
public static RouterFunction<ServerResponse> PUT(String pattern, HandlerFunction<ServerResponse> handlerFunction);
public static RouterFunction<ServerResponse> DELETE(String pattern, HandlerFunction<ServerResponse> handlerFunction);
public static RouterFunction<ServerResponse> PATCH(String pattern, HandlerFunction<ServerResponse> handlerFunction);
// Nesting routes
public static <T extends ServerResponse> RouterFunction<T> nest(RequestPredicate predicate, RouterFunction<T> routerFunction);
// Resource handling
public static RouterFunction<ServerResponse> resources(String pattern, Resource location);
public static RouterFunction<ServerResponse> resources(Function<ServerRequest, Mono<Resource>> lookupFunction);
// Route conversion
public static HandlerMapping toHandlerMapping(RouterFunction<?> routerFunction);
public static HttpHandler toHttpHandler(RouterFunction<?> routerFunction);
}
// Predicates for request matching
public abstract class RequestPredicates {
// HTTP methods
public static RequestPredicate GET(String pattern);
public static RequestPredicate POST(String pattern);
public static RequestPredicate PUT(String pattern);
public static RequestPredicate DELETE(String pattern);
public static RequestPredicate PATCH(String pattern);
public static RequestPredicate HEAD(String pattern);
public static RequestPredicate OPTIONS(String pattern);
// Path matching
public static RequestPredicate path(String pattern);
public static RequestPredicate pathExtension(String extension);
public static RequestPredicate pathExtension(Predicate<String> extensionPredicate);
// Content type
public static RequestPredicate accept(MediaType... mediaTypes);
public static RequestPredicate contentType(MediaType... mediaTypes);
// Headers
public static RequestPredicate headers(Predicate<ServerRequest.Headers> headersPredicate);
public static RequestPredicate header(String name, String value);
// Query parameters
public static RequestPredicate queryParam(String name, String value);
public static RequestPredicate queryParam(String name, Predicate<String> predicate);
// Logical operations
public static RequestPredicate and(RequestPredicate left, RequestPredicate right);
public static RequestPredicate or(RequestPredicate left, RequestPredicate right);
public static RequestPredicate not(RequestPredicate predicate);
}// WebFlux supports same annotations as Spring MVC but with reactive return types
@RestController
@RequestMapping("/api/reactive")
public class ReactiveController {
@GetMapping("/mono")
public Mono<String> getMono() {
return Mono.just("Hello Reactive World!");
}
@GetMapping("/flux")
public Flux<String> getFlux() {
return Flux.just("Item 1", "Item 2", "Item 3");
}
@PostMapping("/mono")
public Mono<ResponseEntity<String>> postMono(@RequestBody Mono<String> body) {
return body.map(value -> ResponseEntity.ok("Received: " + value));
}
@GetMapping(value = "/stream", produces = MediaType.TEXT_PLAIN_VALUE)
public Flux<String> getStream() {
return Flux.interval(Duration.ofSeconds(1))
.map(i -> "Data " + i + "\n");
}
}// Non-blocking, reactive client to perform HTTP requests
public interface WebClient {
// Request specification
RequestHeadersUriSpec<?> get();
RequestBodyUriSpec post();
RequestBodyUriSpec put();
RequestBodyUriSpec patch();
RequestHeadersUriSpec<?> delete();
RequestHeadersUriSpec<?> options();
RequestHeadersUriSpec<?> head();
RequestBodyUriSpec method(HttpMethod method);
// Builder
static WebClient create();
static WebClient create(String baseUrl);
static Builder builder();
// Builder interface
interface Builder {
Builder baseUrl(String baseUrl);
Builder defaultHeader(String header, String... values);
Builder defaultHeaders(Consumer<HttpHeaders> headersConsumer);
Builder defaultCookie(String cookie, String... values);
Builder defaultCookies(Consumer<MultiValueMap<String, String>> cookiesConsumer);
Builder filter(ExchangeFilterFunction filter);
Builder filters(Consumer<List<ExchangeFilterFunction>> filtersConsumer);
Builder clientConnector(ClientHttpConnector connector);
Builder codecs(Consumer<ClientCodecConfigurer> configurer);
WebClient build();
}
// Request specifications
interface RequestBodyUriSpec extends RequestBodySpec, RequestHeadersUriSpec<RequestBodySpec> {
}
interface RequestBodySpec extends RequestHeadersSpec<RequestBodySpec> {
RequestBodySpec body(Publisher<DataBuffer> body, Class<? extends DataBuffer> bodyType);
<T> RequestBodySpec body(Publisher<T> body, Class<T> bodyType);
<T> RequestBodySpec body(Publisher<T> body, ParameterizedTypeReference<T> bodyTypeRef);
RequestBodySpec body(Object body);
<T> RequestBodySpec body(Mono<T> body, Class<T> bodyType);
<T> RequestBodySpec body(Mono<T> body, ParameterizedTypeReference<T> bodyTypeRef);
RequestBodySpec syncBody(Object body);
}
interface RequestHeadersSpec<S extends RequestHeadersSpec<S>> {
S header(String headerName, String... headerValues);
S headers(Consumer<HttpHeaders> headersConsumer);
S attribute(String name, Object value);
S attributes(Consumer<Map<String, Object>> attributesConsumer);
S cookie(String name, String value);
S cookies(Consumer<MultiValueMap<String, String>> cookiesConsumer);
S ifNoneMatch(String... ifNoneMatches);
S ifModifiedSince(ZonedDateTime ifModifiedSince);
ResponseSpec retrieve();
Mono<ClientResponse> exchange();
}
}
// Response specification
public interface ResponseSpec {
ResponseSpec onStatus(Predicate<HttpStatus> statusPredicate, Function<ClientResponse, Mono<? extends Throwable>> exceptionFunction);
<T> Mono<T> bodyToMono(Class<T> bodyType);
<T> Mono<T> bodyToMono(ParameterizedTypeReference<T> bodyTypeReference);
<T> Flux<T> bodyToFlux(Class<T> bodyType);
<T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> bodyTypeReference);
Mono<ResponseEntity<Void>> toBodilessEntity();
<T> Mono<ResponseEntity<T>> toEntity(Class<T> bodyType);
<T> Mono<ResponseEntity<T>> toEntity(ParameterizedTypeReference<T> bodyTypeReference);
<T> Mono<ResponseEntity<List<T>>> toEntityList(Class<T> bodyType);
<T> Mono<ResponseEntity<List<T>>> toEntityList(ParameterizedTypeReference<T> bodyTypeReference);
}// Enables WebFlux configuration
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(DelegatingWebFluxConfiguration.class)
public @interface EnableWebFlux {
}
// Interface for customizing WebFlux configuration
public interface WebFluxConfigurer {
default void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
}
default void addFormatters(FormatterRegistry registry) {
}
default Validator getValidator() {
return null;
}
default MessageCodesResolver getMessageCodesResolver() {
return null;
}
default void addResourceHandlers(ResourceHandlerRegistry registry) {
}
default void addCorsMappings(CorsRegistry registry) {
}
default void configurePathMatching(PathMatchConfigurer configurer) {
}
default void configureArgumentResolvers(ArgumentResolverConfigurer configurer) {
}
default void configureViewResolvers(ViewResolverRegistry registry) {
}
}@RestController
@RequestMapping("/api/users")
public class ReactiveUserController {
private final ReactiveUserService userService;
public ReactiveUserController(ReactiveUserService userService) {
this.userService = userService;
}
@GetMapping
public Flux<User> getAllUsers(
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "10") int size) {
return userService.findAll(PageRequest.of(page, size));
}
@GetMapping("/{id}")
public Mono<ResponseEntity<User>> getUserById(@PathVariable Long id) {
return userService.findById(id)
.map(user -> ResponseEntity.ok(user))
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@PostMapping
public Mono<ResponseEntity<User>> createUser(@Valid @RequestBody Mono<CreateUserRequest> request) {
return request
.flatMap(userService::createUser)
.map(user -> ResponseEntity.status(HttpStatus.CREATED).body(user))
.onErrorResume(ValidationException.class, e ->
Mono.just(ResponseEntity.badRequest().build()));
}
@PutMapping("/{id}")
public Mono<ResponseEntity<User>> updateUser(
@PathVariable Long id,
@Valid @RequestBody Mono<UpdateUserRequest> request) {
return request
.flatMap(req -> userService.updateUser(id, req))
.map(ResponseEntity::ok)
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
}
@DeleteMapping("/{id}")
public Mono<ResponseEntity<Void>> deleteUser(@PathVariable Long id) {
return userService.deleteById(id)
.then(Mono.just(ResponseEntity.noContent().<Void>build()))
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
}
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamUsers() {
return userService.findAll()
.delayElements(Duration.ofSeconds(1));
}
@GetMapping("/search")
public Flux<User> searchUsers(@RequestParam String query) {
return userService.searchByName(query)
.take(Duration.ofSeconds(5)); // Timeout after 5 seconds
}
}@Configuration
public class RouterConfig {
@Bean
public RouterFunction<ServerResponse> userRoutes(UserHandler userHandler) {
return RouterFunctions
.nest(RequestPredicates.path("/api/users"),
RouterFunctions.GET("", userHandler::getUsers)
.andRoute(GET("/{id}"), userHandler::getUser)
.andRoute(POST(""), userHandler::createUser)
.andRoute(PUT("/{id}"), userHandler::updateUser)
.andRoute(DELETE("/{id}"), userHandler::deleteUser)
.andRoute(GET("/search"), userHandler::searchUsers)
);
}
@Bean
public RouterFunction<ServerResponse> fileRoutes(FileHandler fileHandler) {
return RouterFunctions
.nest(RequestPredicates.path("/api/files"),
RouterFunctions.POST("/upload", fileHandler::uploadFile)
.andRoute(GET("/{id}/download"), fileHandler::downloadFile)
.andRoute(DELETE("/{id}"), fileHandler::deleteFile)
);
}
@Bean
public RouterFunction<ServerResponse> adminRoutes(AdminHandler adminHandler) {
return RouterFunctions
.nest(RequestPredicates.path("/api/admin").and(accept(APPLICATION_JSON)),
RouterFunctions.GET("/stats", adminHandler::getSystemStats)
.andRoute(POST("/cache/clear"), adminHandler::clearCache)
.andRoute(GET("/health"), adminHandler::healthCheck)
)
.filter(authenticationFilter()); // Apply authentication filter
}
// Global router combining all routes
@Bean
public RouterFunction<ServerResponse> routes(
RouterFunction<ServerResponse> userRoutes,
RouterFunction<ServerResponse> fileRoutes,
RouterFunction<ServerResponse> adminRoutes) {
return userRoutes
.and(fileRoutes)
.and(adminRoutes)
.filter(loggingFilter())
.filter(errorHandlingFilter());
}
private HandlerFilterFunction<ServerResponse, ServerResponse> loggingFilter() {
return (request, next) -> {
long startTime = System.currentTimeMillis();
return next.handle(request)
.doOnNext(response -> {
long duration = System.currentTimeMillis() - startTime;
System.out.println(request.method() + " " + request.path() +
" - " + response.statusCode() + " (" + duration + "ms)");
});
};
}
private HandlerFilterFunction<ServerResponse, ServerResponse> errorHandlingFilter() {
return (request, next) -> {
return next.handle(request)
.onErrorResume(Exception.class, e -> {
System.err.println("Error handling request: " + e.getMessage());
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.bodyValue("Internal Server Error");
});
};
}
private HandlerFilterFunction<ServerResponse, ServerResponse> authenticationFilter() {
return (request, next) -> {
String token = request.headers().firstHeader("Authorization");
if (token == null || !token.startsWith("Bearer ")) {
return ServerResponse.status(HttpStatus.UNAUTHORIZED)
.bodyValue("Authentication required");
}
// Validate token (simplified)
if (!isValidToken(token.substring(7))) {
return ServerResponse.status(HttpStatus.FORBIDDEN)
.bodyValue("Invalid token");
}
return next.handle(request);
};
}
private boolean isValidToken(String token) {
// Token validation logic
return token != null && !token.isEmpty();
}
}
// Handler class for functional endpoints
@Component
public class UserHandler {
private final ReactiveUserService userService;
public UserHandler(ReactiveUserService userService) {
this.userService = userService;
}
public Mono<ServerResponse> getUsers(ServerRequest request) {
int page = Integer.parseInt(request.queryParam("page").orElse("0"));
int size = Integer.parseInt(request.queryParam("size").orElse("10"));
Flux<User> users = userService.findAll(PageRequest.of(page, size));
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(users, User.class);
}
public Mono<ServerResponse> getUser(ServerRequest request) {
Long id = Long.valueOf(request.pathVariable("id"));
return userService.findById(id)
.flatMap(user -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> createUser(ServerRequest request) {
return request.bodyToMono(CreateUserRequest.class)
.flatMap(userService::createUser)
.flatMap(user -> ServerResponse.status(HttpStatus.CREATED)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(user))
.onErrorResume(ValidationException.class, e ->
ServerResponse.badRequest().bodyValue(e.getMessage()));
}
public Mono<ServerResponse> updateUser(ServerRequest request) {
Long id = Long.valueOf(request.pathVariable("id"));
return request.bodyToMono(UpdateUserRequest.class)
.flatMap(req -> userService.updateUser(id, req))
.flatMap(user -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> deleteUser(ServerRequest request) {
Long id = Long.valueOf(request.pathVariable("id"));
return userService.deleteById(id)
.then(ServerResponse.noContent().build())
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> searchUsers(ServerRequest request) {
String query = request.queryParam("q").orElse("");
if (query.isEmpty()) {
return ServerResponse.badRequest().bodyValue("Query parameter 'q' is required");
}
Flux<User> users = userService.searchByName(query);
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(users, User.class);
}
}@Service
public class ExternalApiService {
private final WebClient webClient;
public ExternalApiService(WebClient.Builder webClientBuilder) {
this.webClient = webClientBuilder
.baseUrl("https://api.external-service.com")
.defaultHeader(HttpHeaders.USER_AGENT, "MyApp/1.0")
.filter(logRequest())
.filter(handleErrors())
.build();
}
public Mono<User> getUser(Long userId) {
return webClient
.get()
.uri("/users/{id}", userId)
.header(HttpHeaders.AUTHORIZATION, "Bearer " + getToken())
.retrieve()
.bodyToMono(User.class)
.timeout(Duration.ofSeconds(10))
.retry(3);
}
public Flux<User> getUsers(int page, int size) {
return webClient
.get()
.uri(uriBuilder -> uriBuilder
.path("/users")
.queryParam("page", page)
.queryParam("size", size)
.build())
.retrieve()
.bodyToFlux(User.class);
}
public Mono<User> createUser(CreateUserRequest request) {
return webClient
.post()
.uri("/users")
.contentType(MediaType.APPLICATION_JSON)
.body(Mono.just(request), CreateUserRequest.class)
.retrieve()
.bodyToMono(User.class);
}
public Mono<Void> deleteUser(Long userId) {
return webClient
.delete()
.uri("/users/{id}", userId)
.retrieve()
.bodyToMono(Void.class);
}
// File upload example
public Mono<UploadResponse> uploadFile(String filename, Flux<DataBuffer> fileData) {
return webClient
.post()
.uri("/files/upload")
.contentType(MediaType.APPLICATION_OCTET_STREAM)
.header("X-Filename", filename)
.body(fileData, DataBuffer.class)
.retrieve()
.bodyToMono(UploadResponse.class);
}
// Streaming response
public Flux<String> getStreamingData() {
return webClient
.get()
.uri("/stream")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class);
}
// Error handling with exchange()
public Mono<User> getUserWithDetailedErrorHandling(Long userId) {
return webClient
.get()
.uri("/users/{id}", userId)
.exchange()
.flatMap(response -> {
if (response.statusCode().is2xxSuccessful()) {
return response.bodyToMono(User.class);
} else if (response.statusCode() == HttpStatus.NOT_FOUND) {
return Mono.error(new UserNotFoundException("User not found: " + userId));
} else {
return response.bodyToMono(String.class)
.flatMap(errorBody -> Mono.error(new ApiException("API Error: " + errorBody)));
}
});
}
private ExchangeFilterFunction logRequest() {
return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
System.out.println("Request: " + clientRequest.method() + " " + clientRequest.url());
return Mono.just(clientRequest);
});
}
private ExchangeFilterFunction handleErrors() {
return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
if (clientResponse.statusCode().is5xxServerError()) {
return clientResponse.bodyToMono(String.class)
.flatMap(errorBody -> Mono.error(new ServerException("Server error: " + errorBody)));
}
return Mono.just(clientResponse);
});
}
private String getToken() {
// Get authentication token
return "your-auth-token";
}
}@Service
public class ReactiveUserService {
private final ReactiveUserRepository userRepository;
private final WebClient notificationClient;
public ReactiveUserService(ReactiveUserRepository userRepository,
WebClient notificationClient) {
this.userRepository = userRepository;
this.notificationClient = notificationClient;
}
public Flux<User> findAll(Pageable pageable) {
return userRepository.findAllBy(pageable)
.onErrorResume(DataAccessException.class, e -> {
System.err.println("Database error: " + e.getMessage());
return Flux.empty();
});
}
public Mono<User> findById(Long id) {
return userRepository.findById(id)
.switchIfEmpty(Mono.error(new UserNotFoundException("User not found: " + id)));
}
@Transactional
public Mono<User> createUser(CreateUserRequest request) {
return Mono.fromCallable(() -> convertToUser(request))
.flatMap(userRepository::save)
.flatMap(this::sendWelcomeNotification)
.onErrorMap(DataIntegrityViolationException.class, e ->
new UserAlreadyExistsException("User already exists"));
}
@Transactional
public Mono<User> updateUser(Long id, UpdateUserRequest request) {
return userRepository.findById(id)
.switchIfEmpty(Mono.error(new UserNotFoundException("User not found: " + id)))
.map(existingUser -> updateUserFromRequest(existingUser, request))
.flatMap(userRepository::save);
}
@Transactional
public Mono<Void> deleteById(Long id) {
return userRepository.existsById(id)
.flatMap(exists -> {
if (!exists) {
return Mono.error(new UserNotFoundException("User not found: " + id));
}
return userRepository.deleteById(id);
});
}
public Flux<User> searchByName(String name) {
return userRepository.findByNameContainingIgnoreCase(name)
.timeout(Duration.ofSeconds(5))
.onErrorResume(TimeoutException.class, e -> {
System.err.println("Search timeout for query: " + name);
return Flux.empty();
});
}
// Reactive composition example
public Mono<UserWithStats> getUserWithStats(Long userId) {
Mono<User> userMono = findById(userId);
Mono<UserStats> statsMono = getUserStats(userId);
return Mono.zip(userMono, statsMono)
.map(tuple -> new UserWithStats(tuple.getT1(), tuple.getT2()));
}
// Parallel processing example
public Flux<User> processUsersInParallel(List<Long> userIds) {
return Flux.fromIterable(userIds)
.parallel(4) // Use 4 parallel threads
.runOn(Schedulers.boundedElastic())
.flatMap(this::findById)
.sequential();
}
// Reactive caching example
public Mono<User> findByIdWithCache(Long id) {
return cacheManager.getFromCache("users", id, User.class)
.switchIfEmpty(
userRepository.findById(id)
.flatMap(user -> cacheManager.putInCache("users", id, user)
.thenReturn(user))
);
}
private Mono<User> sendWelcomeNotification(User user) {
NotificationRequest notification = NotificationRequest.builder()
.userId(user.getId())
.type("WELCOME")
.message("Welcome to our platform!")
.build();
return notificationClient
.post()
.uri("/notifications")
.bodyValue(notification)
.retrieve()
.bodyToMono(Void.class)
.then(Mono.just(user))
.onErrorResume(e -> {
System.err.println("Failed to send notification: " + e.getMessage());
return Mono.just(user); // Don't fail the user creation
});
}
private Mono<UserStats> getUserStats(Long userId) {
return Mono.fromCallable(() -> {
// Simulate stats calculation
return new UserStats(userId, 100, 50);
}).subscribeOn(Schedulers.boundedElastic());
}
private User convertToUser(CreateUserRequest request) {
// Convert request to user entity
User user = new User();
user.setUsername(request.getUsername());
user.setEmail(request.getEmail());
return user;
}
private User updateUserFromRequest(User user, UpdateUserRequest request) {
// Update user from request
if (request.getUsername() != null) {
user.setUsername(request.getUsername());
}
if (request.getEmail() != null) {
user.setEmail(request.getEmail());
}
return user;
}
}@Configuration
@EnableWebFlux
public class WebFluxConfig implements WebFluxConfigurer {
@Override
public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
// JSON configuration
configurer.defaultCodecs().jackson2JsonEncoder(
new Jackson2JsonEncoder(objectMapper(), MediaType.APPLICATION_JSON));
configurer.defaultCodecs().jackson2JsonDecoder(
new Jackson2JsonDecoder(objectMapper(), MediaType.APPLICATION_JSON));
// Increase buffer size for large payloads
configurer.defaultCodecs().maxInMemorySize(1024 * 1024); // 1MB
}
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/api/**")
.allowedOrigins("http://localhost:3000", "https://myapp.com")
.allowedMethods("GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS")
.allowedHeaders("*")
.allowCredentials(true)
.maxAge(3600);
}
@Override
public void addResourceHandlers(ResourceHandlerRegistry registry) {
registry.addResourceHandler("/static/**")
.addResourceLocations("classpath:/static/")
.setCacheControl(CacheControl.maxAge(Duration.ofDays(365)));
}
@Bean
public ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
mapper.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
return mapper;
}
@Bean
public WebClient webClient(WebClient.Builder builder) {
return builder
.codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(1024 * 1024))
.build();
}
// Custom error handler
@Bean
@Order(-2) // Higher precedence than DefaultErrorWebExceptionHandler
public WebExceptionHandler globalExceptionHandler() {
return new GlobalErrorWebExceptionHandler();
}
}
// Global error handler
public class GlobalErrorWebExceptionHandler implements WebExceptionHandler {
private final ObjectMapper objectMapper;
public GlobalErrorWebExceptionHandler() {
this.objectMapper = new ObjectMapper();
this.objectMapper.registerModule(new JavaTimeModule());
}
@Override
public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
ServerHttpResponse response = exchange.getResponse();
if (response.isCommitted()) {
return Mono.error(ex);
}
response.getHeaders().add("Content-Type", "application/json");
ErrorResponse errorResponse;
HttpStatus status;
if (ex instanceof UserNotFoundException) {
status = HttpStatus.NOT_FOUND;
errorResponse = new ErrorResponse("User not found", "USER_NOT_FOUND");
} else if (ex instanceof ValidationException) {
status = HttpStatus.BAD_REQUEST;
errorResponse = new ErrorResponse(ex.getMessage(), "VALIDATION_ERROR");
} else {
status = HttpStatus.INTERNAL_SERVER_ERROR;
errorResponse = new ErrorResponse("Internal server error", "INTERNAL_ERROR");
}
response.setStatusCode(status);
try {
byte[] bytes = objectMapper.writeValueAsBytes(errorResponse);
DataBuffer buffer = response.bufferFactory().wrap(bytes);
return response.writeWith(Mono.just(buffer));
} catch (Exception e) {
return Mono.error(e);
}
}
}@RestController
@RequestMapping("/api/events")
public class EventController {
private final EventService eventService;
public EventController(EventService eventService) {
this.eventService = eventService;
}
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamEvents() {
return eventService.getEventStream()
.map(event -> ServerSentEvent.<String>builder()
.id(String.valueOf(event.getId()))
.event(event.getType())
.data(event.getData())
.build())
.onErrorResume(e -> {
return Flux.just(ServerSentEvent.<String>builder()
.event("error")
.data("Error occurred: " + e.getMessage())
.build());
});
}
@GetMapping(value = "/notifications/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<NotificationEvent>> streamUserNotifications(@PathVariable Long userId) {
return eventService.getUserNotifications(userId)
.map(notification -> ServerSentEvent.<NotificationEvent>builder()
.id(notification.getId())
.event("notification")
.data(notification)
.build())
.doOnCancel(() -> System.out.println("Client disconnected from notifications stream"));
}
}Spring WebFlux provides a complete reactive programming model for building scalable, non-blocking web applications that can efficiently handle high concurrency with minimal resource usage.
Install with Tessl CLI
npx tessl i tessl/maven-org-springframework--spring