or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

dsl-api.mdinbound-endpoints.mdindex.mdoutbound-endpoints.mdxml-configuration.md
tile.json

dsl-api.mddocs/

Java DSL API

The Java DSL API provides a fluent, type-safe way to configure Spring Integration WebFlux endpoints. It uses the builder pattern with spec classes to enable readable, chainable configuration.

Overview

The DSL centers around the WebFlux factory class, which provides static methods to create endpoint specs. These specs offer fluent methods for comprehensive configuration. The DSL integrates seamlessly with Spring Integration's IntegrationFlow DSL.

Capabilities

WebFlux Factory Class

The main entry point for creating WebFlux integration components.

package org.springframework.integration.webflux.dsl;

/**
 * Factory class providing static methods for creating WebFlux integration endpoints.
 * This is the primary entry point for the Java DSL API.
 */
public final class WebFlux {
    // Outbound gateway methods
    public static WebFluxMessageHandlerSpec outboundGateway(URI uri);
    public static WebFluxMessageHandlerSpec outboundGateway(String uri);
    public static WebFluxMessageHandlerSpec outboundGateway(URI uri, WebClient webClient);
    public static WebFluxMessageHandlerSpec outboundGateway(String uri, WebClient webClient);
    public static <P> WebFluxMessageHandlerSpec outboundGateway(
        Function<Message<P>, ?> uriFunction);
    public static WebFluxMessageHandlerSpec outboundGateway(Expression uriExpression);
    public static <P> WebFluxMessageHandlerSpec outboundGateway(
        Function<Message<P>, ?> uriFunction, WebClient webClient);
    public static WebFluxMessageHandlerSpec outboundGateway(
        Expression uriExpression, WebClient webClient);

    // Outbound channel adapter methods
    public static WebFluxMessageHandlerSpec outboundChannelAdapter(URI uri);
    public static WebFluxMessageHandlerSpec outboundChannelAdapter(String uri);
    public static WebFluxMessageHandlerSpec outboundChannelAdapter(
        URI uri, WebClient webClient);
    public static WebFluxMessageHandlerSpec outboundChannelAdapter(
        String uri, WebClient webClient);
    public static <P> WebFluxMessageHandlerSpec outboundChannelAdapter(
        Function<Message<P>, ?> uriFunction);
    public static WebFluxMessageHandlerSpec outboundChannelAdapter(Expression uriExpression);
    public static <P> WebFluxMessageHandlerSpec outboundChannelAdapter(
        Function<Message<P>, ?> uriFunction, WebClient webClient);
    public static WebFluxMessageHandlerSpec outboundChannelAdapter(
        Expression uriExpression, WebClient webClient);

    // Inbound gateway methods
    public static WebFluxInboundEndpointSpec inboundGateway(String... path);

    // Inbound channel adapter methods
    public static WebFluxInboundEndpointSpec inboundChannelAdapter(String... path);
}

Import:

import org.springframework.integration.webflux.dsl.WebFlux;

Basic Usage:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.webflux.dsl.WebFlux;

@Configuration
public class BasicWebFluxConfig {

    @Bean
    public IntegrationFlow basicFlow() {
        return IntegrationFlow
            .from("inputChannel")
            .handle(WebFlux.outboundGateway("https://api.example.com/data"))
            .get();
    }
}

WebFluxMessageHandlerSpec

Spec class for configuring outbound endpoints (gateways and adapters).

package org.springframework.integration.webflux.dsl;

/**
 * Spec builder for WebFlux outbound message handlers.
 * Provides fluent configuration methods for HTTP requests.
 */
public class WebFluxMessageHandlerSpec
       extends BaseHttpMessageHandlerSpec<WebFluxMessageHandlerSpec,
                                          WebFluxRequestExecutingMessageHandler> {

    // HTTP method configuration
    public WebFluxMessageHandlerSpec httpMethod(HttpMethod httpMethod);
    public WebFluxMessageHandlerSpec httpMethodExpression(Expression expression);
    public <P> WebFluxMessageHandlerSpec httpMethodFunction(
        Function<Message<P>, HttpMethod> httpMethodFunction);

    // Response type configuration
    public WebFluxMessageHandlerSpec expectedResponseType(Class<?> type);
    public WebFluxMessageHandlerSpec expectedResponseType(
        ParameterizedTypeReference<?> type);
    public WebFluxMessageHandlerSpec expectedResponseTypeExpression(Expression expression);
    public <P> WebFluxMessageHandlerSpec expectedResponseTypeFunction(
        Function<Message<P>, ?> typeFunction);

    // URI variables
    public WebFluxMessageHandlerSpec uriVariable(String variable, Object value);
    public WebFluxMessageHandlerSpec uriVariable(String variable, String expression);
    public WebFluxMessageHandlerSpec uriVariable(String variable, Expression expression);
    public WebFluxMessageHandlerSpec uriVariablesExpression(String expression);
    public WebFluxMessageHandlerSpec uriVariablesExpression(Expression expression);

    // Header configuration
    public WebFluxMessageHandlerSpec headerMapper(HeaderMapper<HttpHeaders> headerMapper);
    public WebFluxMessageHandlerSpec mappedRequestHeaders(String... patterns);
    public WebFluxMessageHandlerSpec mappedResponseHeaders(String... patterns);

    // Payload extraction
    public WebFluxMessageHandlerSpec extractPayload(boolean extractPayload);
    public WebFluxMessageHandlerSpec extractRequestPayload(boolean extractRequestPayload);

    // WebFlux-specific configuration
    public WebFluxMessageHandlerSpec replyPayloadToFlux(boolean replyPayloadToFlux);
    public WebFluxMessageHandlerSpec bodyExtractor(
        BodyExtractor<?, ? super ClientHttpResponse> bodyExtractor);
    public WebFluxMessageHandlerSpec publisherElementType(Class<?> type);
    public WebFluxMessageHandlerSpec publisherElementType(
        ParameterizedTypeReference<?> type);
    public <P> WebFluxMessageHandlerSpec publisherElementTypeFunction(
        Function<Message<P>, ?> typeFunction);
    public WebFluxMessageHandlerSpec publisherElementTypeExpression(
        Expression expression);

    // Encoding and transfer
    public WebFluxMessageHandlerSpec encodingMode(
        DefaultUriBuilderFactory.EncodingMode encodingMode);
    public WebFluxMessageHandlerSpec transferCookies(boolean transferCookies);

    // Request attributes
    public WebFluxMessageHandlerSpec attributeVariablesExpression(String expression);
    public WebFluxMessageHandlerSpec attributeVariablesExpression(Expression expression);

    // Additional inherited methods from BaseHttpMessageHandlerSpec
    public WebFluxMessageHandlerSpec charset(String charset);
    public WebFluxMessageHandlerSpec extractRequestPayload(boolean extract);
    public WebFluxMessageHandlerSpec trustedSpel(boolean trustedSpel);
}

Usage Example - Comprehensive Configuration:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.http.HttpMethod;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.webflux.dsl.WebFlux;

@Configuration
public class ComprehensiveOutboundConfig {

    @Bean
    public IntegrationFlow comprehensiveOutboundFlow() {
        return IntegrationFlow
            .from("requestChannel")
            .enrichHeaders(h -> h
                .header("Authorization", "Bearer token123")
                .headerExpression("X-Request-ID", 
                    "T(java.util.UUID).randomUUID().toString()"))
            .handle(WebFlux.outboundGateway("https://api.example.com/users/{id}/orders")
                .httpMethod(HttpMethod.GET)
                .uriVariable("id", "headers.userId")
                .expectedResponseType(new ParameterizedTypeReference<List<Order>>() {})
                .mappedRequestHeaders("X-*", "Authorization")
                .mappedResponseHeaders("X-Response-*", "ETag")
                .extractPayload(true)
                .charset("UTF-8")
                .transferCookies(true))
            .channel("responseChannel")
            .get();
    }
}

Dynamic Configuration Example:

@Bean
public IntegrationFlow dynamicOutboundFlow() {
    SpelExpressionParser parser = new SpelExpressionParser();
    
    return IntegrationFlow
        .from("requestChannel")
        .handle(WebFlux.outboundGateway(
            message -> "https://api.example.com/" + 
                       message.getHeaders().get("endpoint"))
            .httpMethodFunction(message -> 
                message.getHeaders().get("httpMethod", HttpMethod.class))
            .expectedResponseTypeFunction(message ->
                message.getHeaders().get("responseType", Class.class))
            .uriVariablesExpression(parser.parseExpression("headers.uriVars"))
            .publisherElementTypeFunction(message ->
                message.getHeaders().get("elementType", Class.class)))
        .get();
}

WebFluxInboundEndpointSpec

Spec class for configuring inbound endpoints (gateways and adapters).

package org.springframework.integration.webflux.dsl;

/**
 * Spec builder for WebFlux inbound endpoints.
 * Provides fluent configuration methods for HTTP server endpoints.
 */
public class WebFluxInboundEndpointSpec
       extends HttpInboundEndpointSupportSpec<WebFluxInboundEndpointSpec,
                                               WebFluxInboundEndpoint> {

    // Request mapping configuration
    public WebFluxInboundEndpointSpec requestMapping(
        Consumer<RequestMappingSpec> requestMappingConfigurer);

    // CORS configuration
    public WebFluxInboundEndpointSpec crossOrigin(
        Consumer<CrossOriginSpec> corsConfigurer);

    // Payload configuration
    public WebFluxInboundEndpointSpec requestPayloadType(Class<?> type);
    public WebFluxInboundEndpointSpec requestPayloadType(ResolvableType type);
    public WebFluxInboundEndpointSpec payloadExpression(String expression);

    // Header mapping
    public WebFluxInboundEndpointSpec headerMapper(
        HeaderMapper<HttpHeaders> headerMapper);
    public WebFluxInboundEndpointSpec mappedRequestHeaders(String... patterns);
    public WebFluxInboundEndpointSpec mappedResponseHeaders(String... patterns);

    // Validation
    public WebFluxInboundEndpointSpec validator(Validator validator);

    // Status code configuration
    public WebFluxInboundEndpointSpec statusCodeExpression(String expression);

    // Reply configuration
    public WebFluxInboundEndpointSpec extractReplyPayload(boolean extract);

    // WebFlux-specific configuration
    public WebFluxInboundEndpointSpec codecConfigurer(
        ServerCodecConfigurer codecConfigurer);
    public WebFluxInboundEndpointSpec requestedContentTypeResolver(
        RequestedContentTypeResolver resolver);
    public WebFluxInboundEndpointSpec reactiveAdapterRegistry(
        ReactiveAdapterRegistry registry);

    // Additional inherited methods
    public WebFluxInboundEndpointSpec autoStartup(boolean autoStartup);
    public WebFluxInboundEndpointSpec phase(int phase);
    public WebFluxInboundEndpointSpec role(String role);
}

Usage Example - Comprehensive Configuration:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpMethod;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.webflux.dsl.WebFlux;
import org.springframework.validation.Validator;
import org.springframework.web.bind.annotation.RequestMethod;

@Configuration
public class ComprehensiveInboundConfig {

    @Bean
    public IntegrationFlow comprehensiveInboundFlow(
            ServerCodecConfigurer codecConfigurer,
            Validator validator) {
        return IntegrationFlow
            .from(WebFlux.inboundGateway("/api/orders")
                .requestMapping(m -> m
                    .methods(HttpMethod.POST, HttpMethod.PUT)
                    .consumes("application/json")
                    .produces("application/json")
                    .headers("X-API-Key"))
                .crossOrigin(cors -> cors
                    .origin("https://example.com")
                    .allowedHeaders("*")
                    .method(RequestMethod.POST, RequestMethod.PUT)
                    .allowCredentials(true))
                .requestPayloadType(Order.class)
                .mappedRequestHeaders("X-*", "Authorization")
                .mappedResponseHeaders("X-Response-*")
                .codecConfigurer(codecConfigurer)
                .validator(validator)
                .statusCodeExpression(
                    "payload.isValid ? T(org.springframework.http.HttpStatus).CREATED " +
                    ": T(org.springframework.http.HttpStatus).BAD_REQUEST")
                .extractReplyPayload(true))
            .handle("orderService", "processOrder")
            .get();
    }
}

RequestMappingSpec

Inner spec for configuring request mapping details.

/**
 * Spec for configuring HTTP request mapping.
 */
public interface RequestMappingSpec {
    /**
     * Set HTTP methods for this endpoint.
     */
    RequestMappingSpec methods(HttpMethod... methods);

    /**
     * Set required request headers.
     * Format: "headerName" or "headerName=value"
     */
    RequestMappingSpec headers(String... headers);

    /**
     * Set consumable media types (Content-Type matching).
     */
    RequestMappingSpec consumes(String... mediaTypes);

    /**
     * Set producible media types (Accept matching).
     */
    RequestMappingSpec produces(String... mediaTypes);

    /**
     * Set required request parameters.
     * Format: "paramName" or "paramName=value"
     */
    RequestMappingSpec params(String... params);
}

Usage Example:

@Bean
public IntegrationFlow requestMappingFlow() {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/search")
            .requestMapping(m -> m
                .methods(HttpMethod.GET, HttpMethod.POST)
                .headers("X-API-Version=2", "X-Client-ID")
                .consumes("application/json", "application/xml")
                .produces("application/json")
                .params("active=true")))
        .handle("searchService", "search")
        .get();
}

CrossOriginSpec

Inner spec for configuring CORS.

/**
 * Spec for configuring CORS (Cross-Origin Resource Sharing).
 */
public interface CrossOriginSpec {
    /**
     * Set allowed origins. Use "*" for all origins.
     */
    CrossOriginSpec origin(String... origins);

    /**
     * Set allowed request headers.
     */
    CrossOriginSpec allowedHeaders(String... headers);

    /**
     * Set exposed response headers.
     */
    CrossOriginSpec exposedHeaders(String... headers);

    /**
     * Set allowed HTTP methods.
     */
    CrossOriginSpec method(RequestMethod... methods);

    /**
     * Set whether to allow credentials (cookies, authorization headers).
     */
    CrossOriginSpec allowCredentials(boolean allowCredentials);

    /**
     * Set max age in seconds for preflight cache.
     */
    CrossOriginSpec maxAge(long maxAge);
}

Usage Example:

import org.springframework.web.bind.annotation.RequestMethod;

@Bean
public IntegrationFlow corsFlow() {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/public")
            .crossOrigin(cors -> cors
                .origin("https://example.com", "https://app.example.com")
                .allowedHeaders("Content-Type", "Authorization", "X-*")
                .exposedHeaders("X-Total-Count", "X-Page-Number")
                .method(RequestMethod.GET, RequestMethod.POST, RequestMethod.PUT)
                .allowCredentials(true)
                .maxAge(3600)))
        .handle("publicService", "process")
        .get();
}

Integration with IntegrationFlow DSL

The WebFlux DSL integrates seamlessly with Spring Integration's IntegrationFlow DSL.

Multiple Endpoints in One Flow:

@Bean
public IntegrationFlow multiEndpointFlow() {
    return IntegrationFlow
        // Receive HTTP request
        .from(WebFlux.inboundGateway("/api/process")
            .requestMapping(m -> m.methods(HttpMethod.POST))
            .requestPayloadType(Request.class))
        // Transform
        .transform(Request::normalize)
        // Call external API
        .handle(WebFlux.outboundGateway("https://external.api.com/validate")
            .httpMethod(HttpMethod.POST)
            .expectedResponseType(ValidationResult.class))
        // Process result
        .handle("resultProcessor", "process")
        // Return response (automatically handled by inbound gateway)
        .get();
}

Conditional Routing:

@Bean
public IntegrationFlow conditionalRoutingFlow() {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/route"))
        .route(Message.class, m -> m.getHeaders().get("type"),
            mapping -> mapping
                .subFlowMapping("typeA", sf -> sf
                    .handle(WebFlux.outboundGateway("https://api.a.com/process")))
                .subFlowMapping("typeB", sf -> sf
                    .handle(WebFlux.outboundGateway("https://api.b.com/process"))))
        .get();
}

Parallel Processing:

@Bean
public IntegrationFlow parallelFlow() {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/parallel"))
        .scatterGather(
            scatterer -> scatterer.applySequence(true).recipientFlow(
                f -> f.handle(WebFlux.outboundGateway("https://api1.com/data"))),
            scatterer -> scatterer.recipientFlow(
                f -> f.handle(WebFlux.outboundGateway("https://api2.com/data"))),
            scatterer -> scatterer.recipientFlow(
                f -> f.handle(WebFlux.outboundGateway("https://api3.com/data")))
        )
        .aggregate()
        .get();
}

SpEL Expression Support

The DSL extensively supports Spring Expression Language (SpEL) for dynamic configuration.

Common SpEL Variables:

  • payload - Message payload
  • headers - Message headers (Map)
  • #root - The Message itself
  • #requestParams - HTTP request parameters (inbound)
  • #pathVariables - HTTP path variables (inbound)
  • T(Class) - Access static methods and constants

SpEL Examples:

import org.springframework.expression.spel.standard.SpelExpressionParser;

@Bean
public IntegrationFlow spelFlow() {
    SpelExpressionParser parser = new SpelExpressionParser();
    
    return IntegrationFlow
        .from("requestChannel")
        .enrichHeaders(h -> h
            .headerExpression("X-Request-Time", 
                "T(java.time.Instant).now().toString()")
            .headerExpression("X-User-ID", 
                "headers.containsKey('userId') ? headers.userId : 'anonymous'"))
        .handle(WebFlux.outboundGateway("https://api.example.com/{resource}/{id}")
            // URI variables from payload and headers
            .uriVariable("resource", "payload.resourceType")
            .uriVariable("id", "headers.entityId")
            // HTTP method from headers with fallback
            .httpMethodExpression(parser.parseExpression(
                "headers.httpMethod ?: T(org.springframework.http.HttpMethod).GET"))
            // Expected response type from headers
            .expectedResponseTypeExpression(
                parser.parseExpression("headers.responseType")))
        .get();
}

Function-Based Dynamic Configuration

Use Java functions for type-safe dynamic configuration.

import java.util.function.Function;
import org.springframework.messaging.Message;

@Bean
public IntegrationFlow functionBasedFlow() {
    return IntegrationFlow
        .from("requestChannel")
        .handle(WebFlux.outboundGateway(
            // Dynamic URI from function
            (Function<Message<RequestData>, String>) message ->
                "https://api.example.com/" +
                message.getPayload().getEndpoint() + "/" +
                message.getHeaders().get("entityId"))
            // Publisher element type from function
            .publisherElementTypeFunction(message ->
                message.getHeaders().get("elementClass", Class.class)))
        .get();
}

Common Patterns

Pattern 1: API Gateway Proxy

@Bean
public IntegrationFlow apiGatewayProxy(WebClient authWebClient) {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/proxy/**")
            .requestMapping(m -> m.methods(
                HttpMethod.GET, HttpMethod.POST, HttpMethod.PUT, HttpMethod.DELETE))
            .payloadExpression("#pathVariables.get('**')"))
        .enrichHeaders(h -> h.headerExpression("targetPath", "payload"))
        .handle(WebFlux.outboundGateway(
            message -> "https://backend.api.com/" + 
                       message.getHeaders().get("targetPath"),
            authWebClient)
            .httpMethodFunction(message ->
                message.getHeaders().get("http_requestMethod", HttpMethod.class))
            .extractPayload(false))
        .get();
}

Pattern 2: Retry with Fallback

import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;

@Bean
public IntegrationFlow retryWithFallbackFlow() {
    RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(3);
    retryAdvice.setRetryPolicy(retryPolicy);
    
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(1000);
    retryAdvice.setBackOffPolicy(backOffPolicy);
    
    return IntegrationFlow
        .from("requestChannel")
        .handle(WebFlux.outboundGateway("https://primary.api.com/data"),
            e -> e.advice(retryAdvice))
        .handle((payload, headers) -> {
            if (payload instanceof Exception) {
                // Fallback to secondary API
                return callSecondaryApi();
            }
            return payload;
        })
        .get();
}

Pattern 3: Enrichment from Multiple Sources

@Bean
public IntegrationFlow enrichmentFlow() {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/enriched-data"))
        .enrichHeaders(h -> h.header("originalPayload", "payload"))
        .handle(WebFlux.outboundGateway("https://api1.com/user")
            .uriVariable("id", "payload.userId"))
        .enrichHeaders(h -> h.header("userInfo", "payload"))
        .transform(m -> m.getHeaders().get("originalPayload"))
        .handle(WebFlux.outboundGateway("https://api2.com/orders")
            .uriVariable("userId", "payload.userId"))
        .enrichHeaders(h -> h.header("orderInfo", "payload"))
        .handle((payload, headers) -> {
            return Map.of(
                "user", headers.get("userInfo"),
                "orders", headers.get("orderInfo")
            );
        })
        .get();
}

Pattern 4: Webhook Receiver with Async Processing

import java.util.concurrent.Executors;
import org.springframework.integration.channel.MessageChannels;

@Bean
public IntegrationFlow webhookReceiverFlow() {
    return IntegrationFlow
        .from(WebFlux.inboundChannelAdapter("/webhooks/github")
            .requestMapping(m -> m
                .methods(HttpMethod.POST)
                .headers("X-GitHub-Event"))
            .requestPayloadType(GitHubEvent.class))
        .channel(MessageChannels.executor("webhookExecutor",
            Executors.newFixedThreadPool(10)))
        .handle("githubService", "processEvent")
        .get();
}

Pattern 5: Reactive Streaming

@Bean
public IntegrationFlow reactiveStreamingFlow() {
    return IntegrationFlow
        .from("streamChannel")
        .handle(WebFlux.outboundGateway("https://api.example.com/stream")
            .httpMethod(HttpMethod.POST)
            .publisherElementType(DataEvent.class)
            .replyPayloadToFlux(true))
        .split()
        .handle("eventProcessor", "process")
        .get();
}

Best Practices

1. Use Custom WebClient for Production

import io.netty.channel.ChannelOption;
import reactor.netty.http.client.HttpClient;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import java.time.Duration;

@Bean
public WebClient productionWebClient() {
    return WebClient.builder()
        .baseUrl("https://api.example.com")
        .filter(loggingFilter())
        .filter(authenticationFilter())
        .filter(retryFilter())
        .clientConnector(new ReactorClientHttpConnector(
            HttpClient.create()
                .responseTimeout(Duration.ofSeconds(10))
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                .metrics(true, Function.identity())
        ))
        .build();
}

2. Externalize Configuration

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties("app.api")
public class ApiConfig {
    private String baseUrl;
    private int timeout;
    private String authToken;
    // getters/setters
}

@Bean
public IntegrationFlow configuredFlow(ApiConfig config, WebClient webClient) {
    return IntegrationFlow
        .from("requestChannel")
        .handle(WebFlux.outboundGateway(config.getBaseUrl() + "/resource", webClient)
            .mappedRequestHeaders("Authorization"))
        .get();
}

3. Type-Safe Response Handling

import org.springframework.core.ParameterizedTypeReference;
import java.util.List;
import java.util.stream.Collectors;

@Bean
public IntegrationFlow typeSafeFlow() {
    return IntegrationFlow
        .from("requestChannel")
        .handle(WebFlux.outboundGateway("https://api.example.com/users")
            .expectedResponseType(new ParameterizedTypeReference<List<User>>() {}))
        .<List<User>>handle((payload, headers) -> {
            // Payload is properly typed as List<User>
            return payload.stream()
                .filter(User::isActive)
                .collect(Collectors.toList());
        })
        .get();
}

4. Proper Error Handling

import org.springframework.http.ResponseEntity;

@Bean
public IntegrationFlow errorHandlingFlow() {
    return IntegrationFlow
        .from("requestChannel")
        .handle(WebFlux.outboundGateway("https://api.example.com/data")
            .extractPayload(false))
        .handle((payload, headers) -> {
            ResponseEntity<?> response = (ResponseEntity<?>) payload;
            return switch (response.getStatusCode().value()) {
                case 200, 201 -> response.getBody();
                case 404 -> throw new NotFoundException();
                case 429 -> throw new RateLimitException();
                default -> throw new ApiException(response.getStatusCode());
            };
        })
        .get();
}

5. Reactive Error Handling

import reactor.core.publisher.Mono;
import org.springframework.web.reactive.function.client.WebClientResponseException;

@Bean
public IntegrationFlow reactiveErrorHandlingFlow() {
    return IntegrationFlow
        .from("requestChannel")
        .handle(WebFlux.outboundGateway("https://api.example.com/data")
            .expectedResponseType(Data.class))
        .handle((payload, headers) -> {
            if (payload instanceof WebClientResponseException) {
                WebClientResponseException ex = (WebClientResponseException) payload;
                return Mono.error(new CustomException(ex));
            }
            return Mono.just(payload);
        })
        .get();
}

Types

package org.springframework.integration.webflux.dsl;

/**
 * Main factory class for WebFlux DSL.
 */
public final class WebFlux {
    // All factory methods as documented
}

/**
 * Spec builder for outbound endpoints.
 */
public class WebFluxMessageHandlerSpec
       extends BaseHttpMessageHandlerSpec<WebFluxMessageHandlerSpec,
                                          WebFluxRequestExecutingMessageHandler> {
    // All configuration methods as documented
}

/**
 * Spec builder for inbound endpoints.
 */
public class WebFluxInboundEndpointSpec
       extends HttpInboundEndpointSupportSpec<WebFluxInboundEndpointSpec,
                                               WebFluxInboundEndpoint> {
    // All configuration methods as documented
}