Inbound endpoints receive HTTP requests reactively and route them to Spring Integration message channels. They act as the server-side component, exposing HTTP endpoints that trigger integration flows.
Required Dependencies:
spring-webflux must be on classpath for inbound endpoints to workspring-integration-core is requiredDefault Behaviors:
expectReply=true by default for gateways (request-reply pattern)expectReply=false by default for channel adapters (fire-and-forget)extractReplyPayload=true by default (only payload in response, not full Message)Threading Model:
Lifecycle:
WebHandler and integrate with WebFlux's handler chainautoStartup=true by defaultExceptions:
WebExchangeBindException - validation failuresHttpMessageNotReadableException - request body conversion failuresHttpMessageNotWritableException - response body conversion failuresWebClientException - WebFlux infrastructure errorsServerWebExchangeException - exchange processing errorsEdge Cases:
expectReply=false and no reply channel set, gateway returns immediately with 202 AcceptedSpring Integration WebFlux provides two types of inbound endpoints:
Both types are fully reactive, using Spring WebFlux for non-blocking request handling.
Create inbound gateways using the WebFlux DSL factory methods.
/**
* Creates an inbound gateway that receives HTTP requests and sends responses.
* @param path - One or more path patterns to match (e.g., "/api/users", "/api/**")
* @return WebFluxInboundEndpointSpec for fluent configuration
*/
public static WebFluxInboundEndpointSpec inboundGateway(String... path)Usage Example:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.webflux.dsl.WebFlux;
import org.springframework.http.HttpMethod;
@Configuration
public class InboundGatewayConfig {
@Bean
public IntegrationFlow orderProcessingEndpoint() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/orders")
.requestMapping(m -> m
.methods(HttpMethod.POST)
.consumes("application/json")
.produces("application/json"))
.requestPayloadType(Order.class))
.handle("orderService", "createOrder")
.get();
}
}Create inbound channel adapters for one-way message flows without HTTP responses.
/**
* Creates an inbound channel adapter that receives HTTP requests without sending responses.
* Useful for webhooks, notifications, and fire-and-forget patterns.
* @param path - One or more path patterns to match
* @return WebFluxInboundEndpointSpec for fluent configuration
*/
public static WebFluxInboundEndpointSpec inboundChannelAdapter(String... path)Usage Example:
@Bean
public IntegrationFlow webhookEndpoint() {
return IntegrationFlow
.from(WebFlux.inboundChannelAdapter("/webhooks/notification")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(WebhookEvent.class))
.channel("webhookProcessingChannel")
.get();
}Configure detailed request mapping including HTTP methods, paths, headers, and content types.
/**
* Configure request mapping for the inbound endpoint.
* @param requestMappingConfigurer - Consumer to configure RequestMapping
* @return Spec for method chaining
*/
public WebFluxInboundEndpointSpec requestMapping(
Consumer<RequestMappingSpec> requestMappingConfigurer)
/**
* RequestMappingSpec provides methods to configure request matching.
*/
interface RequestMappingSpec {
RequestMappingSpec methods(HttpMethod... methods);
RequestMappingSpec headers(String... headers);
RequestMappingSpec consumes(String... mediaTypes);
RequestMappingSpec produces(String... mediaTypes);
RequestMappingSpec params(String... params);
}Usage Example:
@Bean
public IntegrationFlow restrictedEndpoint() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/admin/users")
.requestMapping(m -> m
.methods(HttpMethod.GET, HttpMethod.POST)
.headers("X-Admin-Token")
.consumes("application/json")
.produces("application/json")
.params("active=true")))
.handle("adminService", "manageUsers")
.get();
}Configure Cross-Origin Resource Sharing (CORS) for inbound endpoints.
/**
* Configure CORS settings for the inbound endpoint.
* @param corsConfigurer - Consumer to configure CORS
* @return Spec for method chaining
*/
public WebFluxInboundEndpointSpec crossOrigin(
Consumer<CrossOriginSpec> corsConfigurer)
/**
* CrossOriginSpec provides methods to configure CORS.
*/
interface CrossOriginSpec {
CrossOriginSpec origin(String... origins);
CrossOriginSpec allowedHeaders(String... headers);
CrossOriginSpec exposedHeaders(String... headers);
CrossOriginSpec method(RequestMethod... methods);
CrossOriginSpec allowCredentials(boolean allowCredentials);
CrossOriginSpec maxAge(long maxAge);
}Usage Example:
import org.springframework.web.bind.annotation.RequestMethod;
@Bean
public IntegrationFlow publicApiEndpoint() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/public/data")
.crossOrigin(cors -> cors
.origin("https://example.com", "https://app.example.com")
.allowedHeaders("*")
.exposedHeaders("X-Custom-Header")
.method(RequestMethod.GET, RequestMethod.POST)
.allowCredentials(true)
.maxAge(3600)))
.handle("dataService", "getData")
.get();
}Specify the Java type for automatic request body deserialization.
/**
* Set the expected request payload type for deserialization.
* @param requestPayloadType - Java class to deserialize request body into
* @return Spec for method chaining
*/
public WebFluxInboundEndpointSpec requestPayloadType(Class<?> requestPayloadType)
/**
* Set the request payload type using ResolvableType for generic types.
* @param requestPayloadType - ResolvableType for complex generic types
* @return Spec for method chaining
*/
public WebFluxInboundEndpointSpec requestPayloadType(ResolvableType requestPayloadType)Usage Example:
import org.springframework.core.ResolvableType;
@Bean
public IntegrationFlow typedRequestEndpoint() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/users")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(User.class))
.transform(User::validate)
.handle("userService", "createUser")
.get();
}
// For generic types
@Bean
public IntegrationFlow genericRequestEndpoint() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/items")
.requestPayloadType(
ResolvableType.forClassWithGenerics(List.class, Item.class)))
.handle("itemService", "processItems")
.get();
}Configure custom codecs for request/response serialization and deserialization.
/**
* Set a custom ServerCodecConfigurer for encoding/decoding HTTP messages.
* @param codecConfigurer - ServerCodecConfigurer instance
* @return Spec for method chaining
*/
public WebFluxInboundEndpointSpec codecConfigurer(
ServerCodecConfigurer codecConfigurer)Usage Example:
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.HttpMessageWriter;
@Bean
public ServerCodecConfigurer customCodecConfigurer() {
ServerCodecConfigurer configurer = ServerCodecConfigurer.create();
// Add custom codecs
configurer.customCodecs().register(new CustomMessageReader());
configurer.customCodecs().register(new CustomMessageWriter());
return configurer;
}
@Bean
public IntegrationFlow customCodecEndpoint(ServerCodecConfigurer customCodecConfigurer) {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/custom")
.codecConfigurer(customCodecConfigurer)
.requestPayloadType(CustomMessage.class))
.handle("customService", "process")
.get();
}Configure custom content type resolution strategy.
/**
* Set a custom RequestedContentTypeResolver for content negotiation.
* @param requestedContentTypeResolver - RequestedContentTypeResolver instance
* @return Spec for method chaining
*/
public WebFluxInboundEndpointSpec requestedContentTypeResolver(
RequestedContentTypeResolver requestedContentTypeResolver)Usage Example:
import org.springframework.web.reactive.accept.HeaderContentTypeResolver;
import org.springframework.http.MediaType;
@Bean
public RequestedContentTypeResolver customContentTypeResolver() {
HeaderContentTypeResolver resolver = new HeaderContentTypeResolver();
resolver.setSupportedMediaTypes(List.of(
MediaType.APPLICATION_JSON,
MediaType.APPLICATION_XML,
MediaType.valueOf("application/vnd.company.v1+json")
));
return resolver;
}
@Bean
public IntegrationFlow negotiatedEndpoint(
RequestedContentTypeResolver customContentTypeResolver) {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/versioned")
.requestedContentTypeResolver(customContentTypeResolver))
.handle("versionedService", "process")
.get();
}Configure support for different reactive types (RxJava, Kotlin Coroutines, etc.).
/**
* Set a custom ReactiveAdapterRegistry for reactive type conversions.
* @param reactiveAdapterRegistry - ReactiveAdapterRegistry instance
* @return Spec for method chaining
*/
public WebFluxInboundEndpointSpec reactiveAdapterRegistry(
ReactiveAdapterRegistry reactiveAdapterRegistry)Usage Example:
import org.springframework.core.ReactiveAdapterRegistry;
@Bean
public IntegrationFlow rxJavaEndpoint() {
ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance();
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/reactive")
.reactiveAdapterRegistry(registry))
.handle((payload, headers) -> {
// Can return RxJava Single, Completable, etc.
return Single.just(processPayload(payload));
})
.get();
}Control which HTTP headers are mapped to/from Spring Integration message headers.
/**
* Set custom header mapper for HTTP header mapping.
* @param headerMapper - HeaderMapper implementation
* @return Spec for method chaining
*/
public WebFluxInboundEndpointSpec headerMapper(HeaderMapper<HttpHeaders> headerMapper)
/**
* Specify which request headers to map to message headers.
* Supports patterns like "X-*", "!X-Secret-*", etc.
* @param mappedRequestHeaders - Header name patterns
* @return Spec for method chaining
*/
public WebFluxInboundEndpointSpec mappedRequestHeaders(String... mappedRequestHeaders)
/**
* Specify which message headers to map to response headers.
* @param mappedResponseHeaders - Header name patterns
* @return Spec for method chaining
*/
public WebFluxInboundEndpointSpec mappedResponseHeaders(String... mappedResponseHeaders)Usage Example:
@Bean
public IntegrationFlow headerMappingEndpoint() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/tracked")
.mappedRequestHeaders(
"X-Request-ID",
"X-User-*", // All headers starting with X-User-
"!X-User-Secret" // Except X-User-Secret
)
.mappedResponseHeaders("X-Response-ID", "X-Processing-Time"))
.enrichHeaders(h -> h
.header("X-Response-ID", UUID.randomUUID().toString())
.header("X-Processing-Time", System.currentTimeMillis()))
.handle("trackedService", "process")
.get();
}Use SpEL expressions to extract or transform the request payload.
/**
* Set a SpEL expression to evaluate against the HTTP request for payload extraction.
* @param payloadExpression - SpEL expression string
* @return Spec for method chaining
*/
public WebFluxInboundEndpointSpec payloadExpression(String payloadExpression)Usage Example:
@Bean
public IntegrationFlow payloadExpressionEndpoint() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/custom-payload")
// Extract only specific fields from request
.payloadExpression("#requestParams.id + ':' + #requestParams.name"))
.handle("customService", "processPayloadString")
.get();
}Enable automatic payload validation using JSR-303 Bean Validation.
/**
* Set a Validator for request payload validation.
* @param validator - Validator instance
* @return Spec for method chaining
*/
public WebFluxInboundEndpointSpec validator(Validator validator)Usage Example:
import jakarta.validation.Validator;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Email;
public class UserRequest {
@NotNull
private String name;
@Email
private String email;
// getters/setters
}
@Bean
public IntegrationFlow validatedEndpoint(Validator validator) {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/users")
.requestPayloadType(UserRequest.class)
.validator(validator))
.handle("userService", "createUser")
.get();
}Customize HTTP status codes in responses using SpEL expressions.
/**
* Set a SpEL expression to determine the HTTP status code for responses.
* @param statusCodeExpression - SpEL expression evaluating to HttpStatus
* @return Spec for method chaining
*/
public WebFluxInboundEndpointSpec statusCodeExpression(String statusCodeExpression)Usage Example:
@Bean
public IntegrationFlow customStatusEndpoint() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/conditional")
.statusCodeExpression(
"payload.isSuccess() ? T(org.springframework.http.HttpStatus).OK " +
": T(org.springframework.http.HttpStatus).ACCEPTED"))
.handle("conditionalService", "process")
.get();
}Control whether to extract the payload from the reply message or send the entire Message.
/**
* Set whether to extract the payload from the reply message for the HTTP response.
* Default is true.
* @param extractReplyPayload - true to extract payload, false to use entire Message
* @return Spec for method chaining
*/
public WebFluxInboundEndpointSpec extractReplyPayload(boolean extractReplyPayload)Usage Example:
@Bean
public IntegrationFlow fullMessageResponseEndpoint() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/full-message")
.extractReplyPayload(false)) // Send full Message with headers
.handle("messageService", "processWithHeaders")
.get();
}For advanced use cases, you can configure WebFluxInboundEndpoint beans directly without the DSL.
package org.springframework.integration.webflux.inbound;
/**
* Direct bean configuration for WebFluxInboundEndpoint.
*/
public class WebFluxInboundEndpoint extends BaseHttpInboundEndpoint
implements WebHandler {
/**
* Create an inbound channel adapter (no reply expected).
*/
public WebFluxInboundEndpoint();
/**
* Create an inbound gateway or adapter.
* @param expectReply - true for gateway, false for adapter
*/
public WebFluxInboundEndpoint(boolean expectReply);
// Configuration methods
public void setCodecConfigurer(ServerCodecConfigurer codecConfigurer);
public void setRequestedContentTypeResolver(
RequestedContentTypeResolver requestedContentTypeResolver);
public void setReactiveAdapterRegistry(ReactiveAdapterRegistry reactiveAdapterRegistry);
// Runtime method - implements WebHandler
public Mono<Void> handle(ServerWebExchange exchange);
}Usage Example:
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.messaging.MessageChannel;
@Bean
public WebFluxInboundEndpoint customInboundEndpoint(
MessageChannel requestChannel,
ServerCodecConfigurer codecConfigurer) {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint(true);
endpoint.setRequestChannel(requestChannel);
endpoint.setPath("/custom/endpoint");
endpoint.setCodecConfigurer(codecConfigurer);
RequestMapping mapping = new RequestMapping();
mapping.setMethods(HttpMethod.POST);
mapping.setConsumes("application/json");
endpoint.setRequestMapping(mapping);
return endpoint;
}Inbound endpoints seamlessly work with reactive types in your integration flow.
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@Bean
public IntegrationFlow monoResponseEndpoint() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/mono-response"))
.handle((payload, headers) -> {
// Return Mono for async processing
return Mono.fromCallable(() -> processAsync(payload))
.subscribeOn(Schedulers.boundedElastic());
})
.get();
}import reactor.core.publisher.Flux;
import java.time.Duration;
@Bean
public IntegrationFlow fluxResponseEndpoint() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/flux-response")
.requestMapping(m -> m
.methods(HttpMethod.GET)
.produces("application/stream+json")))
.handle((payload, headers) -> {
// Return Flux for streaming response
return Flux.interval(Duration.ofSeconds(1))
.map(i -> new DataEvent(i, "event-" + i))
.take(10);
})
.get();
}@Bean
public IntegrationFlow reactiveErrorEndpoint() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/reactive-error"))
.handle((payload, headers) -> {
return Mono.just(payload)
.flatMap(data -> {
if (data.isValid()) {
return Mono.just(processData(data));
} else {
return Mono.error(new ValidationException("Invalid data"));
}
})
.onErrorResume(ValidationException.class, ex ->
Mono.just(ErrorResponse.from(ex)));
})
.get();
}WebFlux inbound endpoints automatically handle form data and multipart requests.
import org.springframework.util.MultiValueMap;
@Bean
public IntegrationFlow formDataEndpoint() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/form")
.requestMapping(m -> m
.methods(HttpMethod.POST)
.consumes("application/x-www-form-urlencoded"))
.payloadExpression("#requestParams"))
.handle((payload, headers) -> {
MultiValueMap<String, String> formData =
(MultiValueMap<String, String>) payload;
String name = formData.getFirst("name");
String email = formData.getFirst("email");
return processFormData(name, email);
})
.get();
}import org.springframework.http.codec.multipart.FilePart;
import org.springframework.http.codec.multipart.Part;
import org.springframework.core.io.buffer.DataBuffer;
@Bean
public IntegrationFlow fileUploadEndpoint() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/upload")
.requestMapping(m -> m
.methods(HttpMethod.POST)
.consumes("multipart/form-data")))
.handle((payload, headers) -> {
MultiValueMap<String, Part> parts = (MultiValueMap<String, Part>) payload;
FilePart filePart = (FilePart) parts.getFirst("file");
return filePart.content()
.reduce(DataBuffer::write)
.map(dataBuffer -> saveFile(dataBuffer));
})
.get();
}Access path variables and query parameters using SpEL expressions.
@Bean
public IntegrationFlow pathVariableEndpoint() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/users/{userId}/orders/{orderId}")
.payloadExpression("#pathVariables"))
.handle((payload, headers) -> {
Map<String, String> pathVars = (Map<String, String>) payload;
String userId = pathVars.get("userId");
String orderId = pathVars.get("orderId");
return fetchOrder(userId, orderId);
})
.get();
}
@Bean
public IntegrationFlow queryParamEndpoint() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/search")
.requestMapping(m -> m.methods(HttpMethod.GET))
.payloadExpression("#requestParams.query"))
.handle("searchService", "search")
.get();
}Configure error handling for inbound endpoints.
@Bean
public IntegrationFlow errorHandlingEndpoint() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/safe")
.requestPayloadType(Request.class))
.handle("riskyService", "process")
.handle((payload, headers) -> {
if (payload instanceof Exception) {
return ErrorResponse.from((Exception) payload);
}
return payload;
})
.get();
}import reactor.core.publisher.Mono;
@Bean
public IntegrationFlow reactiveErrorHandlingEndpoint() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/reactive-safe")
.requestPayloadType(Request.class))
.handle((payload, headers) -> {
return Mono.just(payload)
.flatMap(request -> processRequest(request))
.onErrorResume(Exception.class, ex ->
Mono.just(ErrorResponse.from(ex)));
})
.get();
}@Bean
public IntegrationFlow errorChannelEndpoint() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/api/error-channel")
.requestPayloadType(Request.class))
.channel("processingChannel")
.get();
}
@Bean
public IntegrationFlow errorHandlerFlow() {
return IntegrationFlow
.from("errorChannel")
.handle((payload, headers) -> {
Exception ex = (Exception) payload;
return ErrorResponse.from(ex);
})
.get();
}package org.springframework.integration.webflux.dsl;
/**
* Spec builder for configuring WebFlux inbound endpoints.
*/
public class WebFluxInboundEndpointSpec
extends HttpInboundEndpointSupportSpec<WebFluxInboundEndpointSpec,
WebFluxInboundEndpoint> {
// All configuration methods as documented above
}