Spring Integration WebFlux provides reactive HTTP integration capabilities for Spring Integration applications. It enables both inbound (receiving HTTP requests) and outbound (making HTTP requests) message processing in a fully reactive, non-blocking manner using Spring WebFlux and Project Reactor.
Maven:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-webflux</artifactId>
<version>7.0.0</version>
</dependency>Gradle:
implementation 'org.springframework.integration:spring-integration-webflux:7.0.0'Required Dependencies:
spring-webflux must be on classpath (provides WebClient and WebHandler)spring-integration-core is requiredreactor-core is required (provided transitively)Default Behaviors:
expectReply=true by default (request-reply pattern)extractPayload=true by default (extracts body from ResponseEntity)extractRequestPayload=true by default (extracts payload from Message)extractReplyPayload=true by default (extracts payload from reply Message)replyPayloadToFlux=false by default (resolves Mono to single value)Threading Model:
IntegrationMessageHeaderAccessor.REACTOR_CONTEXT)Lifecycle:
WebHandler and integrate with WebFlux's handler chainautoStartup=true by defaultExceptions:
WebClientException - WebClient request failuresHttpServerErrorException - 5xx server errorsHttpClientErrorException - 4xx client errorsWebExchangeBindException - validation failures (inbound)HttpMessageNotReadableException - request body conversion failuresHttpMessageNotWritableException - response body conversion failuresEdge Cases:
expectReply=false for inbound gateway, returns 202 Accepted immediatelyreplyPayloadToFlux=true, response is always Flux even for single valuesextractPayload=false, full ResponseEntity is available in message payloadpublisherElementType configuration// Java DSL factory
import org.springframework.integration.webflux.dsl.WebFlux;
import org.springframework.integration.webflux.dsl.WebFluxMessageHandlerSpec;
import org.springframework.integration.webflux.dsl.WebFluxInboundEndpointSpec;
// Inbound endpoints
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
// Outbound handlers
import org.springframework.integration.webflux.outbound.WebFluxRequestExecutingMessageHandler;
// WebFlux infrastructure
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebHandler;
// Reactive types
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
// HTTP types
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.web.reactive.function.BodyExtractor;
import org.springframework.web.reactive.function.client.ClientHttpResponse;
// Integration types
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.webflux.dsl.WebFlux;
import org.springframework.http.HttpMethod;
@Configuration
public class WebFluxOutboundConfig {
@Bean
public IntegrationFlow webFluxOutboundFlow() {
return IntegrationFlow
.from("requestChannel")
.handle(WebFlux.outboundGateway("https://api.example.com/users/{id}")
.uriVariable("id", "payload.userId")
.httpMethod(HttpMethod.GET)
.expectedResponseType(User.class))
.channel("responseChannel")
.get();
}
}import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.webflux.dsl.WebFlux;
import org.springframework.http.HttpMethod;
@Configuration
public class WebFluxInboundConfig {
@Bean
public IntegrationFlow webFluxInboundFlow() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/orders")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(Order.class))
.handle("orderService", "processOrder")
.get();
}
}Spring Integration WebFlux is built around several key components:
IntegrationMessageHeaderAccessor.REACTOR_CONTEXT)Receive HTTP requests reactively and route them to Spring Integration message channels. Supports both one-way adapters (no response) and request-reply gateways.
// Create inbound channel adapter (no response)
public static WebFluxInboundEndpointSpec inboundChannelAdapter(String... path)
// Create inbound gateway (request-reply)
public static WebFluxInboundEndpointSpec inboundGateway(String... path)Key features:
Execute HTTP requests reactively using WebClient and handle responses in Spring Integration flows. Supports both fire-and-forget adapters and request-reply gateways.
// Create outbound channel adapter (fire-and-forget)
public static WebFluxMessageHandlerSpec outboundChannelAdapter(String uri)
public static WebFluxMessageHandlerSpec outboundChannelAdapter(String uri, WebClient webClient)
public static <P> WebFluxMessageHandlerSpec outboundChannelAdapter(
Function<Message<P>, ?> uriFunction)
public static WebFluxMessageHandlerSpec outboundChannelAdapter(Expression uriExpression)
// Create outbound gateway (request-reply)
public static WebFluxMessageHandlerSpec outboundGateway(String uri)
public static WebFluxMessageHandlerSpec outboundGateway(String uri, WebClient webClient)
public static <P> WebFluxMessageHandlerSpec outboundGateway(
Function<Message<P>, ?> uriFunction)
public static WebFluxMessageHandlerSpec outboundGateway(Expression uriExpression)Key features:
Fluent Java DSL for configuring WebFlux integration endpoints with a rich set of configuration options.
// Main factory class
public final class WebFlux {
// Outbound factory methods
public static WebFluxMessageHandlerSpec outboundGateway(String uri);
public static WebFluxMessageHandlerSpec outboundGateway(String uri, WebClient webClient);
public static <P> WebFluxMessageHandlerSpec outboundGateway(
Function<Message<P>, ?> uriFunction);
public static WebFluxMessageHandlerSpec outboundGateway(Expression uriExpression);
public static WebFluxMessageHandlerSpec outboundChannelAdapter(String uri);
public static WebFluxMessageHandlerSpec outboundChannelAdapter(String uri, WebClient webClient);
public static <P> WebFluxMessageHandlerSpec outboundChannelAdapter(
Function<Message<P>, ?> uriFunction);
public static WebFluxMessageHandlerSpec outboundChannelAdapter(Expression uriExpression);
// Inbound factory methods
public static WebFluxInboundEndpointSpec inboundGateway(String... path);
public static WebFluxInboundEndpointSpec inboundChannelAdapter(String... path);
}The DSL provides spec builders that enable fluent configuration of all aspects of inbound and outbound endpoints including HTTP methods, headers, URI variables, codecs, and reactive behaviors.
Traditional Spring XML-based configuration using the int-webflux namespace for teams preferring XML over Java configuration.
<int-webflux:inbound-gateway
request-channel="requestChannel"
path="/api/endpoint"/>
<int-webflux:outbound-gateway
request-channel="requestChannel"
url="https://api.example.com/resource"/>Supports all features available in the Java DSL with equivalent XML attributes and sub-elements.
Use outbound gateways to call external REST APIs reactively within integration flows:
@Bean
public IntegrationFlow callExternalApi() {
return IntegrationFlow
.from("apiRequestChannel")
.handle(WebFlux.outboundGateway("https://api.service.com/data/{id}")
.uriVariable("id", "headers.entityId")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ApiResponse.class))
.transform(ApiResponse::getData)
.channel("processedDataChannel")
.get();
}Use inbound gateways to expose HTTP endpoints that trigger integration flows:
@Bean
public IntegrationFlow exposeHttpEndpoint() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/process")
.requestMapping(m -> m
.methods(HttpMethod.POST)
.consumes("application/json"))
.requestPayloadType(ProcessRequest.class))
.handle("processingService", "process")
.get();
}Send or receive streaming data using Flux:
@Bean
public IntegrationFlow streamDataFlow() {
return IntegrationFlow
.from("dataStreamChannel")
.handle(WebFlux.outboundGateway("https://stream.service.com/ingest")
.httpMethod(HttpMethod.POST)
.publisherElementType(DataEvent.class)
.replyPayloadToFlux(true))
.channel("streamResultChannel")
.get();
}Configure WebClient with custom settings like authentication, timeouts, and codecs:
@Bean
public WebClient customWebClient() {
return WebClient.builder()
.baseUrl("https://api.example.com")
.defaultHeader("Authorization", "Bearer token")
.defaultHeader("Accept", "application/json")
.clientConnector(new ReactorClientHttpConnector(
HttpClient.create()
.responseTimeout(Duration.ofSeconds(10))
))
.build();
}
@Bean
public IntegrationFlow customClientFlow(WebClient customWebClient) {
return IntegrationFlow
.from("requestChannel")
.handle(WebFlux.outboundGateway("/resource/{id}", customWebClient)
.uriVariable("id", "payload.id"))
.get();
}Handle errors reactively:
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlow
.from("requestChannel")
.handle(WebFlux.outboundGateway("https://api.example.com/data")
.extractPayload(false))
.handle((payload, headers) -> {
ResponseEntity<?> response = (ResponseEntity<?>) payload;
if (response.getStatusCode().is2xxSuccessful()) {
return response.getBody();
} else {
return Mono.error(new ApiException(
"Request failed: " + response.getStatusCode()));
}
})
.get();
}package org.springframework.integration.webflux.dsl;
/**
* Main entry point for creating WebFlux integration components using Java DSL.
* Provides static factory methods for creating inbound and outbound endpoints.
*/
public final class WebFlux {
// Factory methods documented in DSL API section
}package org.springframework.integration.webflux.inbound;
import org.springframework.integration.http.inbound.BaseHttpInboundEndpoint;
import org.springframework.web.server.WebHandler;
import reactor.core.publisher.Mono;
/**
* Main inbound endpoint for receiving HTTP requests reactively.
* Implements WebHandler to integrate with Spring WebFlux infrastructure.
*/
public class WebFluxInboundEndpoint extends BaseHttpInboundEndpoint
implements WebHandler {
public WebFluxInboundEndpoint();
public WebFluxInboundEndpoint(boolean expectReply);
public void setCodecConfigurer(ServerCodecConfigurer codecConfigurer);
public void setRequestedContentTypeResolver(RequestedContentTypeResolver resolver);
public void setReactiveAdapterRegistry(ReactiveAdapterRegistry registry);
public Mono<Void> handle(ServerWebExchange exchange);
}package org.springframework.integration.webflux.outbound;
import org.springframework.integration.http.outbound.AbstractHttpRequestExecutingMessageHandler;
import org.springframework.web.reactive.function.BodyExtractor;
/**
* Message handler that executes HTTP requests using WebClient reactively.
* Supports both adapter (fire-and-forget) and gateway (request-reply) patterns.
*/
public class WebFluxRequestExecutingMessageHandler
extends AbstractHttpRequestExecutingMessageHandler {
public WebFluxRequestExecutingMessageHandler(String uri);
public WebFluxRequestExecutingMessageHandler(Expression uriExpression);
public WebFluxRequestExecutingMessageHandler(String uri, WebClient webClient);
public void setReplyPayloadToFlux(boolean replyPayloadToFlux);
public void setBodyExtractor(BodyExtractor<?, ?> bodyExtractor);
public void setPublisherElementType(Class<?> publisherElementType);
public void setPublisherElementTypeExpression(Expression expression);
public void setAttributeVariablesExpression(Expression expression);
public void setEncodingMode(DefaultUriBuilderFactory.EncodingMode encodingMode);
}All components are designed for reactive environments:
IntegrationMessageHeaderAccessor.REACTOR_CONTEXTSpring Integration WebFlux integrates seamlessly with: