CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-springframework--spring

Comprehensive application framework and inversion of control container for the Java platform providing dependency injection, AOP, data access, transaction management, and web framework capabilities

Overview
Eval results
Files

reactive-web.mddocs/

Reactive Web Framework (WebFlux)

Spring WebFlux is the reactive-stack web framework introduced in Spring 5.0. It provides a non-blocking, reactive programming model for building web applications that can handle high concurrency with a small number of threads and scale with fewer hardware resources.

Maven Dependencies

<!-- Spring WebFlux -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-webflux</artifactId>
    <version>5.3.39</version>
</dependency>

<!-- Reactor Core (reactive streams) -->
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.4.32</version>
</dependency>

<!-- Netty (default reactive server) -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.112.Final</version>
</dependency>

<!-- Jackson for JSON (reactive) -->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.12.7</version>
</dependency>

Core Imports

// Reactive types
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import org.reactivestreams.Publisher;

// WebFlux annotations
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.PathVariable;

// Functional routing
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.reactive.function.server.HandlerFunction;

// Configuration
import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.reactive.config.WebFluxConfigurer;
import org.springframework.context.annotation.Configuration;

// WebClient (reactive HTTP client)
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.ClientResponse;

// Server support
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;

Core Reactive Types

Mono and Flux

// Reactive stream publisher for 0-1 elements
public abstract class Mono<T> implements CorePublisher<T> {
    
    // Creation methods
    public static <T> Mono<T> just(T data);
    public static <T> Mono<T> justOrEmpty(T data);
    public static <T> Mono<T> empty();
    public static <T> Mono<T> error(Throwable error);
    public static <T> Mono<T> fromCallable(Callable<? extends T> supplier);
    public static <T> Mono<T> fromSupplier(Supplier<? extends T> supplier);
    public static <T> Mono<T> defer(Supplier<? extends Mono<? extends T>> supplier);
    
    // Transformation methods
    public <R> Mono<R> map(Function<? super T, ? extends R> mapper);
    public <R> Mono<R> flatMap(Function<? super T, ? extends Mono<? extends R>> transformer);
    public <R> Flux<R> flatMapMany(Function<? super T, ? extends Publisher<? extends R>> mapper);
    public Mono<T> filter(Predicate<? super T> tester);
    public Mono<T> switchIfEmpty(Mono<? extends T> alternate);
    
    // Error handling
    public Mono<T> onErrorReturn(T fallback);
    public Mono<T> onErrorResume(Function<? super Throwable, ? extends Mono<? extends T>> fallback);
    public Mono<T> doOnError(Consumer<? super Throwable> onError);
    
    // Side effects
    public Mono<T> doOnNext(Consumer<? super T> onNext);
    public Mono<T> doOnSuccess(Consumer<? super T> onSuccess);
    public Mono<T> doFinally(Consumer<SignalType> onFinally);
    
    // Blocking operations (avoid in production)
    public T block();
    public T block(Duration timeout);
    
    // Subscription
    public Disposable subscribe();
    public Disposable subscribe(Consumer<? super T> consumer);
    public Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer);
}

// Reactive stream publisher for 0-N elements
public abstract class Flux<T> implements CorePublisher<T> {
    
    // Creation methods
    public static <T> Flux<T> just(T... data);
    public static <T> Flux<T> fromIterable(Iterable<? extends T> it);
    public static <T> Flux<T> fromArray(T[] array);
    public static <T> Flux<T> fromStream(Stream<? extends T> s);
    public static <T> Flux<T> empty();
    public static <T> Flux<T> error(Throwable error);
    public static Flux<Long> interval(Duration period);
    public static <T> Flux<T> defer(Supplier<? extends Publisher<T>> supplier);
    
    // Transformation methods
    public <V> Flux<V> map(Function<? super T, ? extends V> mapper);
    public <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);
    public Flux<T> filter(Predicate<? super T> p);
    public Flux<T> take(long n);
    public Flux<T> skip(long skipped);
    public Flux<T> distinct();
    public Flux<T> sort();
    
    // Aggregation methods
    public Mono<T> reduce(BinaryOperator<T> aggregator);
    public <A> Mono<A> reduce(A initial, BiFunction<A, ? super T, A> accumulator);
    public Mono<List<T>> collectList();
    public Mono<Map<K, T>> collectMap(Function<? super T, ? extends K> keyExtractor);
    public Mono<Long> count();
    
    // Error handling
    public Flux<T> onErrorReturn(T fallback);
    public Flux<T> onErrorResume(Function<? super Throwable, ? extends Publisher<? extends T>> fallback);
    public Flux<T> retry();
    public Flux<T> retry(long numRetries);
    
    // Backpressure
    public Flux<T> onBackpressureBuffer();
    public Flux<T> onBackpressureDrop();
    public Flux<T> onBackpressureLatest();
    
    // Blocking operations (avoid in production)
    public List<T> collectList().block();
    public Stream<T> toStream();
}

Functional Routing

RouterFunction and ServerRequest/ServerResponse

// Represents a function that routes to a handler function
@FunctionalInterface
public interface RouterFunction<T extends ServerResponse> {
    
    Mono<HandlerFunction<T>> route(ServerRequest request);
    
    default RouterFunction<T> and(RouterFunction<T> other);
    default RouterFunction<T> andRoute(RequestPredicate predicate, HandlerFunction<T> handlerFunction);
    default RouterFunction<T> andNest(RequestPredicate predicate, RouterFunction<T> routerFunction);
    default <S extends ServerResponse> RouterFunction<?> andOther(RouterFunction<S> other);
    default RouterFunction<T> filter(HandlerFilterFunction<T, T> filterFunction);
}

// Represents a typed server-side HTTP request
public interface ServerRequest {
    
    // Request line
    HttpMethod method();
    URI uri();
    String path();
    
    // Headers
    ServerRequest.Headers headers();
    MultiValueMap<String, String> queryParams();
    MultiValueMap<String, String> pathVariables();
    
    // Body
    <T> Mono<T> bodyToMono(Class<? extends T> elementClass);
    <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> typeReference);
    <T> Flux<T> bodyToFlux(Class<? extends T> elementClass);
    <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> typeReference);
    
    // Attributes
    Map<String, Object> attributes();
    Optional<Object> attribute(String name);
    
    // Path matching
    String pathVariable(String name);
    MultiValueMap<String, String> pathVariables();
    
    // Convenience methods
    Optional<String> queryParam(String name);
    Principal principal();
    
    // Headers interface
    interface Headers {
        List<MediaType> accept();
        OptionalLong contentLength();
        Optional<MediaType> contentType();
        String host();
        List<String> header(String headerName);
        HttpHeaders asHttpHeaders();
    }
}

// Represents a typed server-side HTTP response
public interface ServerResponse {
    
    HttpStatus statusCode();
    int rawStatusCode();
    HttpHeaders headers();
    MultiValueMap<String, ResponseCookie> cookies();
    
    // Static factory methods
    static BodyBuilder status(HttpStatus status);
    static BodyBuilder status(int status);
    static BodyBuilder ok();
    static HeadersBuilder<?> noContent();
    static BodyBuilder badRequest();
    static HeadersBuilder<?> notFound();
    static BodyBuilder unprocessableEntity();
    
    // Builder interfaces
    interface BodyBuilder extends HeadersBuilder<BodyBuilder> {
        <T> Mono<ServerResponse> body(Publisher<T> publisher, Class<T> elementClass);
        <T> Mono<ServerResponse> body(Publisher<T> publisher, ParameterizedTypeReference<T> typeRef);
        Mono<ServerResponse> body(Object body, Class<?> bodyType);
        <T> Mono<ServerResponse> body(Mono<T> body, Class<T> bodyType);
        <T> Mono<ServerResponse> body(Flux<T> body, Class<T> bodyType);
        Mono<ServerResponse> syncBody(Object body);
        
        Mono<ServerResponse> render(String name, Object... modelAttributes);
        Mono<ServerResponse> render(String name, Map<String, ?> model);
    }
    
    interface HeadersBuilder<B extends HeadersBuilder<B>> {
        B header(String headerName, String... headerValues);
        B headers(HttpHeaders headers);
        B cookie(ResponseCookie cookie);
        B cookies(Consumer<MultiValueMap<String, ResponseCookie>> cookiesConsumer);
        B allow(HttpMethod... allowedMethods);
        B eTag(String etag);
        B lastModified(ZonedDateTime lastModified);
        B location(URI location);
        B cacheControl(CacheControl cacheControl);
        B varyBy(String... requestHeaders);
        
        Mono<ServerResponse> build();
        <T> Mono<ServerResponse> build(Publisher<T> voidPublisher);
    }
}

// Function that handles a server request and returns a response
@FunctionalInterface
public interface HandlerFunction<T extends ServerResponse> {
    Mono<T> handle(ServerRequest request);
}

RouterFunctions Utility

// Factory methods for RouterFunction
public abstract class RouterFunctions {
    
    // Route creation
    public static <T extends ServerResponse> RouterFunction<T> route(RequestPredicate predicate, HandlerFunction<T> handlerFunction);
    
    // Convenience methods for HTTP methods
    public static RouterFunction<ServerResponse> GET(String pattern, HandlerFunction<ServerResponse> handlerFunction);
    public static RouterFunction<ServerResponse> POST(String pattern, HandlerFunction<ServerResponse> handlerFunction);
    public static RouterFunction<ServerResponse> PUT(String pattern, HandlerFunction<ServerResponse> handlerFunction);
    public static RouterFunction<ServerResponse> DELETE(String pattern, HandlerFunction<ServerResponse> handlerFunction);
    public static RouterFunction<ServerResponse> PATCH(String pattern, HandlerFunction<ServerResponse> handlerFunction);
    
    // Nesting routes
    public static <T extends ServerResponse> RouterFunction<T> nest(RequestPredicate predicate, RouterFunction<T> routerFunction);
    
    // Resource handling
    public static RouterFunction<ServerResponse> resources(String pattern, Resource location);
    public static RouterFunction<ServerResponse> resources(Function<ServerRequest, Mono<Resource>> lookupFunction);
    
    // Route conversion
    public static HandlerMapping toHandlerMapping(RouterFunction<?> routerFunction);
    public static HttpHandler toHttpHandler(RouterFunction<?> routerFunction);
}

// Predicates for request matching
public abstract class RequestPredicates {
    
    // HTTP methods
    public static RequestPredicate GET(String pattern);
    public static RequestPredicate POST(String pattern);
    public static RequestPredicate PUT(String pattern);
    public static RequestPredicate DELETE(String pattern);
    public static RequestPredicate PATCH(String pattern);
    public static RequestPredicate HEAD(String pattern);
    public static RequestPredicate OPTIONS(String pattern);
    
    // Path matching
    public static RequestPredicate path(String pattern);
    public static RequestPredicate pathExtension(String extension);
    public static RequestPredicate pathExtension(Predicate<String> extensionPredicate);
    
    // Content type
    public static RequestPredicate accept(MediaType... mediaTypes);
    public static RequestPredicate contentType(MediaType... mediaTypes);
    
    // Headers
    public static RequestPredicate headers(Predicate<ServerRequest.Headers> headersPredicate);
    public static RequestPredicate header(String name, String value);
    
    // Query parameters
    public static RequestPredicate queryParam(String name, String value);
    public static RequestPredicate queryParam(String name, Predicate<String> predicate);
    
    // Logical operations
    public static RequestPredicate and(RequestPredicate left, RequestPredicate right);
    public static RequestPredicate or(RequestPredicate left, RequestPredicate right);
    public static RequestPredicate not(RequestPredicate predicate);
}

Annotated Controllers

WebFlux Controller Annotations

// WebFlux supports same annotations as Spring MVC but with reactive return types
@RestController
@RequestMapping("/api/reactive")
public class ReactiveController {
    
    @GetMapping("/mono")
    public Mono<String> getMono() {
        return Mono.just("Hello Reactive World!");
    }
    
    @GetMapping("/flux")
    public Flux<String> getFlux() {
        return Flux.just("Item 1", "Item 2", "Item 3");
    }
    
    @PostMapping("/mono")
    public Mono<ResponseEntity<String>> postMono(@RequestBody Mono<String> body) {
        return body.map(value -> ResponseEntity.ok("Received: " + value));
    }
    
    @GetMapping(value = "/stream", produces = MediaType.TEXT_PLAIN_VALUE)
    public Flux<String> getStream() {
        return Flux.interval(Duration.ofSeconds(1))
            .map(i -> "Data " + i + "\n");
    }
}

WebClient (Reactive HTTP Client)

WebClient Interface

// Non-blocking, reactive client to perform HTTP requests
public interface WebClient {
    
    // Request specification
    RequestHeadersUriSpec<?> get();
    RequestBodyUriSpec post();
    RequestBodyUriSpec put();
    RequestBodyUriSpec patch();
    RequestHeadersUriSpec<?> delete();
    RequestHeadersUriSpec<?> options();
    RequestHeadersUriSpec<?> head();
    RequestBodyUriSpec method(HttpMethod method);
    
    // Builder
    static WebClient create();
    static WebClient create(String baseUrl);
    static Builder builder();
    
    // Builder interface
    interface Builder {
        Builder baseUrl(String baseUrl);
        Builder defaultHeader(String header, String... values);
        Builder defaultHeaders(Consumer<HttpHeaders> headersConsumer);
        Builder defaultCookie(String cookie, String... values);
        Builder defaultCookies(Consumer<MultiValueMap<String, String>> cookiesConsumer);
        Builder filter(ExchangeFilterFunction filter);
        Builder filters(Consumer<List<ExchangeFilterFunction>> filtersConsumer);
        Builder clientConnector(ClientHttpConnector connector);
        Builder codecs(Consumer<ClientCodecConfigurer> configurer);
        
        WebClient build();
    }
    
    // Request specifications
    interface RequestBodyUriSpec extends RequestBodySpec, RequestHeadersUriSpec<RequestBodySpec> {
    }
    
    interface RequestBodySpec extends RequestHeadersSpec<RequestBodySpec> {
        RequestBodySpec body(Publisher<DataBuffer> body, Class<? extends DataBuffer> bodyType);
        <T> RequestBodySpec body(Publisher<T> body, Class<T> bodyType);
        <T> RequestBodySpec body(Publisher<T> body, ParameterizedTypeReference<T> bodyTypeRef);
        RequestBodySpec body(Object body);
        <T> RequestBodySpec body(Mono<T> body, Class<T> bodyType);
        <T> RequestBodySpec body(Mono<T> body, ParameterizedTypeReference<T> bodyTypeRef);
        RequestBodySpec syncBody(Object body);
    }
    
    interface RequestHeadersSpec<S extends RequestHeadersSpec<S>> {
        S header(String headerName, String... headerValues);
        S headers(Consumer<HttpHeaders> headersConsumer);
        S attribute(String name, Object value);
        S attributes(Consumer<Map<String, Object>> attributesConsumer);
        S cookie(String name, String value);
        S cookies(Consumer<MultiValueMap<String, String>> cookiesConsumer);
        S ifNoneMatch(String... ifNoneMatches);
        S ifModifiedSince(ZonedDateTime ifModifiedSince);
        
        ResponseSpec retrieve();
        Mono<ClientResponse> exchange();
    }
}

// Response specification
public interface ResponseSpec {
    ResponseSpec onStatus(Predicate<HttpStatus> statusPredicate, Function<ClientResponse, Mono<? extends Throwable>> exceptionFunction);
    <T> Mono<T> bodyToMono(Class<T> bodyType);
    <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> bodyTypeReference);
    <T> Flux<T> bodyToFlux(Class<T> bodyType);
    <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> bodyTypeReference);
    Mono<ResponseEntity<Void>> toBodilessEntity();
    <T> Mono<ResponseEntity<T>> toEntity(Class<T> bodyType);
    <T> Mono<ResponseEntity<T>> toEntity(ParameterizedTypeReference<T> bodyTypeReference);
    <T> Mono<ResponseEntity<List<T>>> toEntityList(Class<T> bodyType);
    <T> Mono<ResponseEntity<List<T>>> toEntityList(ParameterizedTypeReference<T> bodyTypeReference);
}

Configuration

WebFlux Configuration

// Enables WebFlux configuration
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(DelegatingWebFluxConfiguration.class)
public @interface EnableWebFlux {
}

// Interface for customizing WebFlux configuration
public interface WebFluxConfigurer {
    
    default void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
    }
    
    default void addFormatters(FormatterRegistry registry) {
    }
    
    default Validator getValidator() {
        return null;
    }
    
    default MessageCodesResolver getMessageCodesResolver() {
        return null;
    }
    
    default void addResourceHandlers(ResourceHandlerRegistry registry) {
    }
    
    default void addCorsMappings(CorsRegistry registry) {
    }
    
    default void configurePathMatching(PathMatchConfigurer configurer) {
    }
    
    default void configureArgumentResolvers(ArgumentResolverConfigurer configurer) {
    }
    
    default void configureViewResolvers(ViewResolverRegistry registry) {
    }
}

Practical Usage Examples

Basic Reactive Controllers

@RestController
@RequestMapping("/api/users")
public class ReactiveUserController {
    
    private final ReactiveUserService userService;
    
    public ReactiveUserController(ReactiveUserService userService) {
        this.userService = userService;
    }
    
    @GetMapping
    public Flux<User> getAllUsers(
            @RequestParam(defaultValue = "0") int page,
            @RequestParam(defaultValue = "10") int size) {
        
        return userService.findAll(PageRequest.of(page, size));
    }
    
    @GetMapping("/{id}")
    public Mono<ResponseEntity<User>> getUserById(@PathVariable Long id) {
        return userService.findById(id)
            .map(user -> ResponseEntity.ok(user))
            .defaultIfEmpty(ResponseEntity.notFound().build());
    }
    
    @PostMapping
    public Mono<ResponseEntity<User>> createUser(@Valid @RequestBody Mono<CreateUserRequest> request) {
        return request
            .flatMap(userService::createUser)
            .map(user -> ResponseEntity.status(HttpStatus.CREATED).body(user))
            .onErrorResume(ValidationException.class, e -> 
                Mono.just(ResponseEntity.badRequest().build()));
    }
    
    @PutMapping("/{id}")
    public Mono<ResponseEntity<User>> updateUser(
            @PathVariable Long id, 
            @Valid @RequestBody Mono<UpdateUserRequest> request) {
        
        return request
            .flatMap(req -> userService.updateUser(id, req))
            .map(ResponseEntity::ok)
            .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
    }
    
    @DeleteMapping("/{id}")
    public Mono<ResponseEntity<Void>> deleteUser(@PathVariable Long id) {
        return userService.deleteById(id)
            .then(Mono.just(ResponseEntity.noContent().<Void>build()))
            .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
    }
    
    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<User> streamUsers() {
        return userService.findAll()
            .delayElements(Duration.ofSeconds(1));
    }
    
    @GetMapping("/search")
    public Flux<User> searchUsers(@RequestParam String query) {
        return userService.searchByName(query)
            .take(Duration.ofSeconds(5)); // Timeout after 5 seconds
    }
}

Functional Routing Configuration

@Configuration
public class RouterConfig {
    
    @Bean
    public RouterFunction<ServerResponse> userRoutes(UserHandler userHandler) {
        return RouterFunctions
            .nest(RequestPredicates.path("/api/users"),
                RouterFunctions.GET("", userHandler::getUsers)
                    .andRoute(GET("/{id}"), userHandler::getUser)
                    .andRoute(POST(""), userHandler::createUser)
                    .andRoute(PUT("/{id}"), userHandler::updateUser)
                    .andRoute(DELETE("/{id}"), userHandler::deleteUser)
                    .andRoute(GET("/search"), userHandler::searchUsers)
            );
    }
    
    @Bean
    public RouterFunction<ServerResponse> fileRoutes(FileHandler fileHandler) {
        return RouterFunctions
            .nest(RequestPredicates.path("/api/files"),
                RouterFunctions.POST("/upload", fileHandler::uploadFile)
                    .andRoute(GET("/{id}/download"), fileHandler::downloadFile)
                    .andRoute(DELETE("/{id}"), fileHandler::deleteFile)
            );
    }
    
    @Bean
    public RouterFunction<ServerResponse> adminRoutes(AdminHandler adminHandler) {
        return RouterFunctions
            .nest(RequestPredicates.path("/api/admin").and(accept(APPLICATION_JSON)),
                RouterFunctions.GET("/stats", adminHandler::getSystemStats)
                    .andRoute(POST("/cache/clear"), adminHandler::clearCache)
                    .andRoute(GET("/health"), adminHandler::healthCheck)
            )
            .filter(authenticationFilter()); // Apply authentication filter
    }
    
    // Global router combining all routes
    @Bean
    public RouterFunction<ServerResponse> routes(
            RouterFunction<ServerResponse> userRoutes,
            RouterFunction<ServerResponse> fileRoutes,
            RouterFunction<ServerResponse> adminRoutes) {
        
        return userRoutes
            .and(fileRoutes)
            .and(adminRoutes)
            .filter(loggingFilter())
            .filter(errorHandlingFilter());
    }
    
    private HandlerFilterFunction<ServerResponse, ServerResponse> loggingFilter() {
        return (request, next) -> {
            long startTime = System.currentTimeMillis();
            return next.handle(request)
                .doOnNext(response -> {
                    long duration = System.currentTimeMillis() - startTime;
                    System.out.println(request.method() + " " + request.path() + 
                                     " - " + response.statusCode() + " (" + duration + "ms)");
                });
        };
    }
    
    private HandlerFilterFunction<ServerResponse, ServerResponse> errorHandlingFilter() {
        return (request, next) -> {
            return next.handle(request)
                .onErrorResume(Exception.class, e -> {
                    System.err.println("Error handling request: " + e.getMessage());
                    return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
                        .bodyValue("Internal Server Error");
                });
        };
    }
    
    private HandlerFilterFunction<ServerResponse, ServerResponse> authenticationFilter() {
        return (request, next) -> {
            String token = request.headers().firstHeader("Authorization");
            if (token == null || !token.startsWith("Bearer ")) {
                return ServerResponse.status(HttpStatus.UNAUTHORIZED)
                    .bodyValue("Authentication required");
            }
            
            // Validate token (simplified)
            if (!isValidToken(token.substring(7))) {
                return ServerResponse.status(HttpStatus.FORBIDDEN)
                    .bodyValue("Invalid token");
            }
            
            return next.handle(request);
        };
    }
    
    private boolean isValidToken(String token) {
        // Token validation logic
        return token != null && !token.isEmpty();
    }
}

// Handler class for functional endpoints
@Component
public class UserHandler {
    
    private final ReactiveUserService userService;
    
    public UserHandler(ReactiveUserService userService) {
        this.userService = userService;
    }
    
    public Mono<ServerResponse> getUsers(ServerRequest request) {
        int page = Integer.parseInt(request.queryParam("page").orElse("0"));
        int size = Integer.parseInt(request.queryParam("size").orElse("10"));
        
        Flux<User> users = userService.findAll(PageRequest.of(page, size));
        
        return ServerResponse.ok()
            .contentType(MediaType.APPLICATION_JSON)
            .body(users, User.class);
    }
    
    public Mono<ServerResponse> getUser(ServerRequest request) {
        Long id = Long.valueOf(request.pathVariable("id"));
        
        return userService.findById(id)
            .flatMap(user -> ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(user))
            .switchIfEmpty(ServerResponse.notFound().build());
    }
    
    public Mono<ServerResponse> createUser(ServerRequest request) {
        return request.bodyToMono(CreateUserRequest.class)
            .flatMap(userService::createUser)
            .flatMap(user -> ServerResponse.status(HttpStatus.CREATED)
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(user))
            .onErrorResume(ValidationException.class, e ->
                ServerResponse.badRequest().bodyValue(e.getMessage()));
    }
    
    public Mono<ServerResponse> updateUser(ServerRequest request) {
        Long id = Long.valueOf(request.pathVariable("id"));
        
        return request.bodyToMono(UpdateUserRequest.class)
            .flatMap(req -> userService.updateUser(id, req))
            .flatMap(user -> ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(user))
            .switchIfEmpty(ServerResponse.notFound().build());
    }
    
    public Mono<ServerResponse> deleteUser(ServerRequest request) {
        Long id = Long.valueOf(request.pathVariable("id"));
        
        return userService.deleteById(id)
            .then(ServerResponse.noContent().build())
            .switchIfEmpty(ServerResponse.notFound().build());
    }
    
    public Mono<ServerResponse> searchUsers(ServerRequest request) {
        String query = request.queryParam("q").orElse("");
        
        if (query.isEmpty()) {
            return ServerResponse.badRequest().bodyValue("Query parameter 'q' is required");
        }
        
        Flux<User> users = userService.searchByName(query);
        
        return ServerResponse.ok()
            .contentType(MediaType.APPLICATION_JSON)
            .body(users, User.class);
    }
}

WebClient Usage

@Service
public class ExternalApiService {
    
    private final WebClient webClient;
    
    public ExternalApiService(WebClient.Builder webClientBuilder) {
        this.webClient = webClientBuilder
            .baseUrl("https://api.external-service.com")
            .defaultHeader(HttpHeaders.USER_AGENT, "MyApp/1.0")
            .filter(logRequest())
            .filter(handleErrors())
            .build();
    }
    
    public Mono<User> getUser(Long userId) {
        return webClient
            .get()
            .uri("/users/{id}", userId)
            .header(HttpHeaders.AUTHORIZATION, "Bearer " + getToken())
            .retrieve()
            .bodyToMono(User.class)
            .timeout(Duration.ofSeconds(10))
            .retry(3);
    }
    
    public Flux<User> getUsers(int page, int size) {
        return webClient
            .get()
            .uri(uriBuilder -> uriBuilder
                .path("/users")
                .queryParam("page", page)
                .queryParam("size", size)
                .build())
            .retrieve()
            .bodyToFlux(User.class);
    }
    
    public Mono<User> createUser(CreateUserRequest request) {
        return webClient
            .post()
            .uri("/users")
            .contentType(MediaType.APPLICATION_JSON)
            .body(Mono.just(request), CreateUserRequest.class)
            .retrieve()
            .bodyToMono(User.class);
    }
    
    public Mono<Void> deleteUser(Long userId) {
        return webClient
            .delete()
            .uri("/users/{id}", userId)
            .retrieve()
            .bodyToMono(Void.class);
    }
    
    // File upload example
    public Mono<UploadResponse> uploadFile(String filename, Flux<DataBuffer> fileData) {
        return webClient
            .post()
            .uri("/files/upload")
            .contentType(MediaType.APPLICATION_OCTET_STREAM)
            .header("X-Filename", filename)
            .body(fileData, DataBuffer.class)
            .retrieve()
            .bodyToMono(UploadResponse.class);
    }
    
    // Streaming response
    public Flux<String> getStreamingData() {
        return webClient
            .get()
            .uri("/stream")
            .accept(MediaType.TEXT_EVENT_STREAM)
            .retrieve()
            .bodyToFlux(String.class);
    }
    
    // Error handling with exchange()
    public Mono<User> getUserWithDetailedErrorHandling(Long userId) {
        return webClient
            .get()
            .uri("/users/{id}", userId)
            .exchange()
            .flatMap(response -> {
                if (response.statusCode().is2xxSuccessful()) {
                    return response.bodyToMono(User.class);
                } else if (response.statusCode() == HttpStatus.NOT_FOUND) {
                    return Mono.error(new UserNotFoundException("User not found: " + userId));
                } else {
                    return response.bodyToMono(String.class)
                        .flatMap(errorBody -> Mono.error(new ApiException("API Error: " + errorBody)));
                }
            });
    }
    
    private ExchangeFilterFunction logRequest() {
        return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
            System.out.println("Request: " + clientRequest.method() + " " + clientRequest.url());
            return Mono.just(clientRequest);
        });
    }
    
    private ExchangeFilterFunction handleErrors() {
        return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
            if (clientResponse.statusCode().is5xxServerError()) {
                return clientResponse.bodyToMono(String.class)
                    .flatMap(errorBody -> Mono.error(new ServerException("Server error: " + errorBody)));
            }
            return Mono.just(clientResponse);
        });
    }
    
    private String getToken() {
        // Get authentication token
        return "your-auth-token";
    }
}

Reactive Data Service

@Service
public class ReactiveUserService {
    
    private final ReactiveUserRepository userRepository;
    private final WebClient notificationClient;
    
    public ReactiveUserService(ReactiveUserRepository userRepository, 
                              WebClient notificationClient) {
        this.userRepository = userRepository;
        this.notificationClient = notificationClient;
    }
    
    public Flux<User> findAll(Pageable pageable) {
        return userRepository.findAllBy(pageable)
            .onErrorResume(DataAccessException.class, e -> {
                System.err.println("Database error: " + e.getMessage());
                return Flux.empty();
            });
    }
    
    public Mono<User> findById(Long id) {
        return userRepository.findById(id)
            .switchIfEmpty(Mono.error(new UserNotFoundException("User not found: " + id)));
    }
    
    @Transactional
    public Mono<User> createUser(CreateUserRequest request) {
        return Mono.fromCallable(() -> convertToUser(request))
            .flatMap(userRepository::save)
            .flatMap(this::sendWelcomeNotification)
            .onErrorMap(DataIntegrityViolationException.class, e -> 
                new UserAlreadyExistsException("User already exists"));
    }
    
    @Transactional
    public Mono<User> updateUser(Long id, UpdateUserRequest request) {
        return userRepository.findById(id)
            .switchIfEmpty(Mono.error(new UserNotFoundException("User not found: " + id)))
            .map(existingUser -> updateUserFromRequest(existingUser, request))
            .flatMap(userRepository::save);
    }
    
    @Transactional
    public Mono<Void> deleteById(Long id) {
        return userRepository.existsById(id)
            .flatMap(exists -> {
                if (!exists) {
                    return Mono.error(new UserNotFoundException("User not found: " + id));
                }
                return userRepository.deleteById(id);
            });
    }
    
    public Flux<User> searchByName(String name) {
        return userRepository.findByNameContainingIgnoreCase(name)
            .timeout(Duration.ofSeconds(5))
            .onErrorResume(TimeoutException.class, e -> {
                System.err.println("Search timeout for query: " + name);
                return Flux.empty();
            });
    }
    
    // Reactive composition example
    public Mono<UserWithStats> getUserWithStats(Long userId) {
        Mono<User> userMono = findById(userId);
        Mono<UserStats> statsMono = getUserStats(userId);
        
        return Mono.zip(userMono, statsMono)
            .map(tuple -> new UserWithStats(tuple.getT1(), tuple.getT2()));
    }
    
    // Parallel processing example
    public Flux<User> processUsersInParallel(List<Long> userIds) {
        return Flux.fromIterable(userIds)
            .parallel(4) // Use 4 parallel threads
            .runOn(Schedulers.boundedElastic())
            .flatMap(this::findById)
            .sequential();
    }
    
    // Reactive caching example
    public Mono<User> findByIdWithCache(Long id) {
        return cacheManager.getFromCache("users", id, User.class)
            .switchIfEmpty(
                userRepository.findById(id)
                    .flatMap(user -> cacheManager.putInCache("users", id, user)
                        .thenReturn(user))
            );
    }
    
    private Mono<User> sendWelcomeNotification(User user) {
        NotificationRequest notification = NotificationRequest.builder()
            .userId(user.getId())
            .type("WELCOME")
            .message("Welcome to our platform!")
            .build();
        
        return notificationClient
            .post()
            .uri("/notifications")
            .bodyValue(notification)
            .retrieve()
            .bodyToMono(Void.class)
            .then(Mono.just(user))
            .onErrorResume(e -> {
                System.err.println("Failed to send notification: " + e.getMessage());
                return Mono.just(user); // Don't fail the user creation
            });
    }
    
    private Mono<UserStats> getUserStats(Long userId) {
        return Mono.fromCallable(() -> {
            // Simulate stats calculation
            return new UserStats(userId, 100, 50);
        }).subscribeOn(Schedulers.boundedElastic());
    }
    
    private User convertToUser(CreateUserRequest request) {
        // Convert request to user entity
        User user = new User();
        user.setUsername(request.getUsername());
        user.setEmail(request.getEmail());
        return user;
    }
    
    private User updateUserFromRequest(User user, UpdateUserRequest request) {
        // Update user from request
        if (request.getUsername() != null) {
            user.setUsername(request.getUsername());
        }
        if (request.getEmail() != null) {
            user.setEmail(request.getEmail());
        }
        return user;
    }
}

WebFlux Configuration

@Configuration
@EnableWebFlux
public class WebFluxConfig implements WebFluxConfigurer {
    
    @Override
    public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
        // JSON configuration
        configurer.defaultCodecs().jackson2JsonEncoder(
            new Jackson2JsonEncoder(objectMapper(), MediaType.APPLICATION_JSON));
        configurer.defaultCodecs().jackson2JsonDecoder(
            new Jackson2JsonDecoder(objectMapper(), MediaType.APPLICATION_JSON));
        
        // Increase buffer size for large payloads
        configurer.defaultCodecs().maxInMemorySize(1024 * 1024); // 1MB
    }
    
    @Override
    public void addCorsMappings(CorsRegistry registry) {
        registry.addMapping("/api/**")
            .allowedOrigins("http://localhost:3000", "https://myapp.com")
            .allowedMethods("GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS")
            .allowedHeaders("*")
            .allowCredentials(true)
            .maxAge(3600);
    }
    
    @Override
    public void addResourceHandlers(ResourceHandlerRegistry registry) {
        registry.addResourceHandler("/static/**")
            .addResourceLocations("classpath:/static/")
            .setCacheControl(CacheControl.maxAge(Duration.ofDays(365)));
    }
    
    @Bean
    public ObjectMapper objectMapper() {
        ObjectMapper mapper = new ObjectMapper();
        mapper.registerModule(new JavaTimeModule());
        mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        mapper.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
        return mapper;
    }
    
    @Bean
    public WebClient webClient(WebClient.Builder builder) {
        return builder
            .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(1024 * 1024))
            .build();
    }
    
    // Custom error handler
    @Bean
    @Order(-2) // Higher precedence than DefaultErrorWebExceptionHandler
    public WebExceptionHandler globalExceptionHandler() {
        return new GlobalErrorWebExceptionHandler();
    }
}

// Global error handler
public class GlobalErrorWebExceptionHandler implements WebExceptionHandler {
    
    private final ObjectMapper objectMapper;
    
    public GlobalErrorWebExceptionHandler() {
        this.objectMapper = new ObjectMapper();
        this.objectMapper.registerModule(new JavaTimeModule());
    }
    
    @Override
    public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
        ServerHttpResponse response = exchange.getResponse();
        
        if (response.isCommitted()) {
            return Mono.error(ex);
        }
        
        response.getHeaders().add("Content-Type", "application/json");
        
        ErrorResponse errorResponse;
        HttpStatus status;
        
        if (ex instanceof UserNotFoundException) {
            status = HttpStatus.NOT_FOUND;
            errorResponse = new ErrorResponse("User not found", "USER_NOT_FOUND");
        } else if (ex instanceof ValidationException) {
            status = HttpStatus.BAD_REQUEST;
            errorResponse = new ErrorResponse(ex.getMessage(), "VALIDATION_ERROR");
        } else {
            status = HttpStatus.INTERNAL_SERVER_ERROR;
            errorResponse = new ErrorResponse("Internal server error", "INTERNAL_ERROR");
        }
        
        response.setStatusCode(status);
        
        try {
            byte[] bytes = objectMapper.writeValueAsBytes(errorResponse);
            DataBuffer buffer = response.bufferFactory().wrap(bytes);
            return response.writeWith(Mono.just(buffer));
        } catch (Exception e) {
            return Mono.error(e);
        }
    }
}

Server-Sent Events (SSE)

@RestController
@RequestMapping("/api/events")
public class EventController {
    
    private final EventService eventService;
    
    public EventController(EventService eventService) {
        this.eventService = eventService;
    }
    
    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> streamEvents() {
        return eventService.getEventStream()
            .map(event -> ServerSentEvent.<String>builder()
                .id(String.valueOf(event.getId()))
                .event(event.getType())
                .data(event.getData())
                .build())
            .onErrorResume(e -> {
                return Flux.just(ServerSentEvent.<String>builder()
                    .event("error")
                    .data("Error occurred: " + e.getMessage())
                    .build());
            });
    }
    
    @GetMapping(value = "/notifications/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<NotificationEvent>> streamUserNotifications(@PathVariable Long userId) {
        return eventService.getUserNotifications(userId)
            .map(notification -> ServerSentEvent.<NotificationEvent>builder()
                .id(notification.getId())
                .event("notification")
                .data(notification)
                .build())
            .doOnCancel(() -> System.out.println("Client disconnected from notifications stream"));
    }
}

Spring WebFlux provides a complete reactive programming model for building scalable, non-blocking web applications that can efficiently handle high concurrency with minimal resource usage.

Install with Tessl CLI

npx tessl i tessl/maven-org-springframework--spring

docs

aop.md

core-container.md

data-access.md

index.md

integration.md

messaging.md

reactive-web.md

testing.md

web-framework.md

tile.json