Spring Integration WebFlux provides reactive HTTP integration capabilities for Spring Integration applications
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Spring Integration WebFlux provides reactive HTTP integration capabilities for Spring Integration applications. It enables both inbound (receiving HTTP requests) and outbound (making HTTP requests) message processing in a fully reactive, non-blocking manner using Spring WebFlux and Project Reactor.
Maven:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-webflux</artifactId>
<version>7.0.0</version>
</dependency>Gradle:
implementation 'org.springframework.integration:spring-integration-webflux:7.0.0'Required Dependencies:
spring-webflux must be on classpath (provides WebClient and WebHandler)spring-integration-core is requiredreactor-core is required (provided transitively)Default Behaviors:
expectReply=true by default (request-reply pattern)extractPayload=true by default (extracts body from ResponseEntity)extractRequestPayload=true by default (extracts payload from Message)extractReplyPayload=true by default (extracts payload from reply Message)replyPayloadToFlux=false by default (resolves Mono to single value)Threading Model:
IntegrationMessageHeaderAccessor.REACTOR_CONTEXT)Lifecycle:
WebHandler and integrate with WebFlux's handler chainautoStartup=true by defaultExceptions:
WebClientException - WebClient request failuresHttpServerErrorException - 5xx server errorsHttpClientErrorException - 4xx client errorsWebExchangeBindException - validation failures (inbound)HttpMessageNotReadableException - request body conversion failuresHttpMessageNotWritableException - response body conversion failuresEdge Cases:
expectReply=false for inbound gateway, returns 202 Accepted immediatelyreplyPayloadToFlux=true, response is always Flux even for single valuesextractPayload=false, full ResponseEntity is available in message payloadpublisherElementType configuration// Java DSL factory
import org.springframework.integration.webflux.dsl.WebFlux;
import org.springframework.integration.webflux.dsl.WebFluxMessageHandlerSpec;
import org.springframework.integration.webflux.dsl.WebFluxInboundEndpointSpec;
// Inbound endpoints
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
// Outbound handlers
import org.springframework.integration.webflux.outbound.WebFluxRequestExecutingMessageHandler;
// WebFlux infrastructure
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebHandler;
// Reactive types
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
// HTTP types
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.web.reactive.function.BodyExtractor;
import org.springframework.web.reactive.function.client.ClientHttpResponse;
// Integration types
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.webflux.dsl.WebFlux;
import org.springframework.http.HttpMethod;
@Configuration
public class WebFluxOutboundConfig {
@Bean
public IntegrationFlow webFluxOutboundFlow() {
return IntegrationFlow
.from("requestChannel")
.handle(WebFlux.outboundGateway("https://api.example.com/users/{id}")
.uriVariable("id", "payload.userId")
.httpMethod(HttpMethod.GET)
.expectedResponseType(User.class))
.channel("responseChannel")
.get();
}
}import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.webflux.dsl.WebFlux;
import org.springframework.http.HttpMethod;
@Configuration
public class WebFluxInboundConfig {
@Bean
public IntegrationFlow webFluxInboundFlow() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/orders")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(Order.class))
.handle("orderService", "processOrder")
.get();
}
}Spring Integration WebFlux is built around several key components:
IntegrationMessageHeaderAccessor.REACTOR_CONTEXT)Receive HTTP requests reactively and route them to Spring Integration message channels. Supports both one-way adapters (no response) and request-reply gateways.
// Create inbound channel adapter (no response)
public static WebFluxInboundEndpointSpec inboundChannelAdapter(String... path)
// Create inbound gateway (request-reply)
public static WebFluxInboundEndpointSpec inboundGateway(String... path)Key features:
Execute HTTP requests reactively using WebClient and handle responses in Spring Integration flows. Supports both fire-and-forget adapters and request-reply gateways.
// Create outbound channel adapter (fire-and-forget)
public static WebFluxMessageHandlerSpec outboundChannelAdapter(String uri)
public static WebFluxMessageHandlerSpec outboundChannelAdapter(String uri, WebClient webClient)
public static <P> WebFluxMessageHandlerSpec outboundChannelAdapter(
Function<Message<P>, ?> uriFunction)
public static WebFluxMessageHandlerSpec outboundChannelAdapter(Expression uriExpression)
// Create outbound gateway (request-reply)
public static WebFluxMessageHandlerSpec outboundGateway(String uri)
public static WebFluxMessageHandlerSpec outboundGateway(String uri, WebClient webClient)
public static <P> WebFluxMessageHandlerSpec outboundGateway(
Function<Message<P>, ?> uriFunction)
public static WebFluxMessageHandlerSpec outboundGateway(Expression uriExpression)Key features:
Fluent Java DSL for configuring WebFlux integration endpoints with a rich set of configuration options.
// Main factory class
public final class WebFlux {
// Outbound factory methods
public static WebFluxMessageHandlerSpec outboundGateway(String uri);
public static WebFluxMessageHandlerSpec outboundGateway(String uri, WebClient webClient);
public static <P> WebFluxMessageHandlerSpec outboundGateway(
Function<Message<P>, ?> uriFunction);
public static WebFluxMessageHandlerSpec outboundGateway(Expression uriExpression);
public static WebFluxMessageHandlerSpec outboundChannelAdapter(String uri);
public static WebFluxMessageHandlerSpec outboundChannelAdapter(String uri, WebClient webClient);
public static <P> WebFluxMessageHandlerSpec outboundChannelAdapter(
Function<Message<P>, ?> uriFunction);
public static WebFluxMessageHandlerSpec outboundChannelAdapter(Expression uriExpression);
// Inbound factory methods
public static WebFluxInboundEndpointSpec inboundGateway(String... path);
public static WebFluxInboundEndpointSpec inboundChannelAdapter(String... path);
}The DSL provides spec builders that enable fluent configuration of all aspects of inbound and outbound endpoints including HTTP methods, headers, URI variables, codecs, and reactive behaviors.
Traditional Spring XML-based configuration using the int-webflux namespace for teams preferring XML over Java configuration.
<int-webflux:inbound-gateway
request-channel="requestChannel"
path="/api/endpoint"/>
<int-webflux:outbound-gateway
request-channel="requestChannel"
url="https://api.example.com/resource"/>Supports all features available in the Java DSL with equivalent XML attributes and sub-elements.
Use outbound gateways to call external REST APIs reactively within integration flows:
@Bean
public IntegrationFlow callExternalApi() {
return IntegrationFlow
.from("apiRequestChannel")
.handle(WebFlux.outboundGateway("https://api.service.com/data/{id}")
.uriVariable("id", "headers.entityId")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ApiResponse.class))
.transform(ApiResponse::getData)
.channel("processedDataChannel")
.get();
}Use inbound gateways to expose HTTP endpoints that trigger integration flows:
@Bean
public IntegrationFlow exposeHttpEndpoint() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/process")
.requestMapping(m -> m
.methods(HttpMethod.POST)
.consumes("application/json"))
.requestPayloadType(ProcessRequest.class))
.handle("processingService", "process")
.get();
}Send or receive streaming data using Flux:
@Bean
public IntegrationFlow streamDataFlow() {
return IntegrationFlow
.from("dataStreamChannel")
.handle(WebFlux.outboundGateway("https://stream.service.com/ingest")
.httpMethod(HttpMethod.POST)
.publisherElementType(DataEvent.class)
.replyPayloadToFlux(true))
.channel("streamResultChannel")
.get();
}Configure WebClient with custom settings like authentication, timeouts, and codecs:
@Bean
public WebClient customWebClient() {
return WebClient.builder()
.baseUrl("https://api.example.com")
.defaultHeader("Authorization", "Bearer token")
.defaultHeader("Accept", "application/json")
.clientConnector(new ReactorClientHttpConnector(
HttpClient.create()
.responseTimeout(Duration.ofSeconds(10))
))
.build();
}
@Bean
public IntegrationFlow customClientFlow(WebClient customWebClient) {
return IntegrationFlow
.from("requestChannel")
.handle(WebFlux.outboundGateway("/resource/{id}", customWebClient)
.uriVariable("id", "payload.id"))
.get();
}Handle errors reactively:
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlow
.from("requestChannel")
.handle(WebFlux.outboundGateway("https://api.example.com/data")
.extractPayload(false))
.handle((payload, headers) -> {
ResponseEntity<?> response = (ResponseEntity<?>) payload;
if (response.getStatusCode().is2xxSuccessful()) {
return response.getBody();
} else {
return Mono.error(new ApiException(
"Request failed: " + response.getStatusCode()));
}
})
.get();
}package org.springframework.integration.webflux.dsl;
/**
* Main entry point for creating WebFlux integration components using Java DSL.
* Provides static factory methods for creating inbound and outbound endpoints.
*/
public final class WebFlux {
// Factory methods documented in DSL API section
}package org.springframework.integration.webflux.inbound;
import org.springframework.integration.http.inbound.BaseHttpInboundEndpoint;
import org.springframework.web.server.WebHandler;
import reactor.core.publisher.Mono;
/**
* Main inbound endpoint for receiving HTTP requests reactively.
* Implements WebHandler to integrate with Spring WebFlux infrastructure.
*/
public class WebFluxInboundEndpoint extends BaseHttpInboundEndpoint
implements WebHandler {
public WebFluxInboundEndpoint();
public WebFluxInboundEndpoint(boolean expectReply);
public void setCodecConfigurer(ServerCodecConfigurer codecConfigurer);
public void setRequestedContentTypeResolver(RequestedContentTypeResolver resolver);
public void setReactiveAdapterRegistry(ReactiveAdapterRegistry registry);
public Mono<Void> handle(ServerWebExchange exchange);
}package org.springframework.integration.webflux.outbound;
import org.springframework.integration.http.outbound.AbstractHttpRequestExecutingMessageHandler;
import org.springframework.web.reactive.function.BodyExtractor;
/**
* Message handler that executes HTTP requests using WebClient reactively.
* Supports both adapter (fire-and-forget) and gateway (request-reply) patterns.
*/
public class WebFluxRequestExecutingMessageHandler
extends AbstractHttpRequestExecutingMessageHandler {
public WebFluxRequestExecutingMessageHandler(String uri);
public WebFluxRequestExecutingMessageHandler(Expression uriExpression);
public WebFluxRequestExecutingMessageHandler(String uri, WebClient webClient);
public void setReplyPayloadToFlux(boolean replyPayloadToFlux);
public void setBodyExtractor(BodyExtractor<?, ?> bodyExtractor);
public void setPublisherElementType(Class<?> publisherElementType);
public void setPublisherElementTypeExpression(Expression expression);
public void setAttributeVariablesExpression(Expression expression);
public void setEncodingMode(DefaultUriBuilderFactory.EncodingMode encodingMode);
}All components are designed for reactive environments:
IntegrationMessageHeaderAccessor.REACTOR_CONTEXTSpring Integration WebFlux integrates seamlessly with: