or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

dsl-api.mdinbound-endpoints.mdindex.mdoutbound-endpoints.mdxml-configuration.md
tile.json

inbound-endpoints.mddocs/

Inbound Endpoints

Inbound endpoints receive HTTP requests reactively and route them to Spring Integration message channels. They act as the server-side component, exposing HTTP endpoints that trigger integration flows.

Key Information for Agents

Required Dependencies:

  • spring-webflux must be on classpath for inbound endpoints to work
  • spring-integration-core is required
  • Spring WebFlux infrastructure must be configured (RouterFunction or WebHandler chain)

Default Behaviors:

  • expectReply=true by default for gateways (request-reply pattern)
  • expectReply=false by default for channel adapters (fire-and-forget)
  • Request channel is required; reply channel is optional (uses temporary channel if not set)
  • RequestMapping must be configured or endpoint won't be registered
  • Default codecs: JSON, XML, String, Form (via ServerCodecConfigurer defaults)
  • No CORS by default (must be explicitly configured)
  • No validation by default (must be explicitly configured)
  • extractReplyPayload=true by default (only payload in response, not full Message)
  • Default status code: 200 OK

Threading Model:

  • Requests handled on reactive scheduler threads (non-blocking)
  • Fully reactive - no thread blocking during request/response processing
  • Backpressure is properly handled through reactive streams
  • Reactor Context is propagated through message headers

Lifecycle:

  • Endpoints implement WebHandler and integrate with WebFlux's handler chain
  • Endpoints registered automatically when bean is initialized
  • autoStartup=true by default
  • Graceful shutdown supported through lifecycle callbacks

Exceptions:

  • WebExchangeBindException - validation failures
  • HttpMessageNotReadableException - request body conversion failures
  • HttpMessageNotWritableException - response body conversion failures
  • WebClientException - WebFlux infrastructure errors
  • ServerWebExchangeException - exchange processing errors

Edge Cases:

  • If expectReply=false and no reply channel set, gateway returns immediately with 202 Accepted
  • Empty request body: payload is null or empty Mono
  • Empty response body: returns 204 No Content
  • Publisher payloads (Flux/Mono) require proper reactive type handling
  • Path variables not provided: IllegalArgumentException at runtime
  • Missing required headers/params: 400 Bad Request

Overview

Spring Integration WebFlux provides two types of inbound endpoints:

  • Inbound Channel Adapters - One-way endpoints that accept HTTP requests but don't send HTTP responses (fire-and-forget pattern)
  • Inbound Gateways - Request-reply endpoints that accept HTTP requests and return HTTP responses

Both types are fully reactive, using Spring WebFlux for non-blocking request handling.

Capabilities

Creating Inbound Gateways

Create inbound gateways using the WebFlux DSL factory methods.

/**
 * Creates an inbound gateway that receives HTTP requests and sends responses.
 * @param path - One or more path patterns to match (e.g., "/api/users", "/api/**")
 * @return WebFluxInboundEndpointSpec for fluent configuration
 */
public static WebFluxInboundEndpointSpec inboundGateway(String... path)

Usage Example:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.webflux.dsl.WebFlux;
import org.springframework.http.HttpMethod;

@Configuration
public class InboundGatewayConfig {

    @Bean
    public IntegrationFlow orderProcessingEndpoint() {
        return IntegrationFlow
            .from(WebFlux.inboundGateway("/api/orders")
                .requestMapping(m -> m
                    .methods(HttpMethod.POST)
                    .consumes("application/json")
                    .produces("application/json"))
                .requestPayloadType(Order.class))
            .handle("orderService", "createOrder")
            .get();
    }
}

Creating Inbound Channel Adapters

Create inbound channel adapters for one-way message flows without HTTP responses.

/**
 * Creates an inbound channel adapter that receives HTTP requests without sending responses.
 * Useful for webhooks, notifications, and fire-and-forget patterns.
 * @param path - One or more path patterns to match
 * @return WebFluxInboundEndpointSpec for fluent configuration
 */
public static WebFluxInboundEndpointSpec inboundChannelAdapter(String... path)

Usage Example:

@Bean
public IntegrationFlow webhookEndpoint() {
    return IntegrationFlow
        .from(WebFlux.inboundChannelAdapter("/webhooks/notification")
            .requestMapping(m -> m.methods(HttpMethod.POST))
            .requestPayloadType(WebhookEvent.class))
        .channel("webhookProcessingChannel")
        .get();
}

Request Mapping Configuration

Configure detailed request mapping including HTTP methods, paths, headers, and content types.

/**
 * Configure request mapping for the inbound endpoint.
 * @param requestMappingConfigurer - Consumer to configure RequestMapping
 * @return Spec for method chaining
 */
public WebFluxInboundEndpointSpec requestMapping(
    Consumer<RequestMappingSpec> requestMappingConfigurer)

/**
 * RequestMappingSpec provides methods to configure request matching.
 */
interface RequestMappingSpec {
    RequestMappingSpec methods(HttpMethod... methods);
    RequestMappingSpec headers(String... headers);
    RequestMappingSpec consumes(String... mediaTypes);
    RequestMappingSpec produces(String... mediaTypes);
    RequestMappingSpec params(String... params);
}

Usage Example:

@Bean
public IntegrationFlow restrictedEndpoint() {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/admin/users")
            .requestMapping(m -> m
                .methods(HttpMethod.GET, HttpMethod.POST)
                .headers("X-Admin-Token")
                .consumes("application/json")
                .produces("application/json")
                .params("active=true")))
        .handle("adminService", "manageUsers")
        .get();
}

CORS Configuration

Configure Cross-Origin Resource Sharing (CORS) for inbound endpoints.

/**
 * Configure CORS settings for the inbound endpoint.
 * @param corsConfigurer - Consumer to configure CORS
 * @return Spec for method chaining
 */
public WebFluxInboundEndpointSpec crossOrigin(
    Consumer<CrossOriginSpec> corsConfigurer)

/**
 * CrossOriginSpec provides methods to configure CORS.
 */
interface CrossOriginSpec {
    CrossOriginSpec origin(String... origins);
    CrossOriginSpec allowedHeaders(String... headers);
    CrossOriginSpec exposedHeaders(String... headers);
    CrossOriginSpec method(RequestMethod... methods);
    CrossOriginSpec allowCredentials(boolean allowCredentials);
    CrossOriginSpec maxAge(long maxAge);
}

Usage Example:

import org.springframework.web.bind.annotation.RequestMethod;

@Bean
public IntegrationFlow publicApiEndpoint() {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/public/data")
            .crossOrigin(cors -> cors
                .origin("https://example.com", "https://app.example.com")
                .allowedHeaders("*")
                .exposedHeaders("X-Custom-Header")
                .method(RequestMethod.GET, RequestMethod.POST)
                .allowCredentials(true)
                .maxAge(3600)))
        .handle("dataService", "getData")
        .get();
}

Request Payload Type Configuration

Specify the Java type for automatic request body deserialization.

/**
 * Set the expected request payload type for deserialization.
 * @param requestPayloadType - Java class to deserialize request body into
 * @return Spec for method chaining
 */
public WebFluxInboundEndpointSpec requestPayloadType(Class<?> requestPayloadType)

/**
 * Set the request payload type using ResolvableType for generic types.
 * @param requestPayloadType - ResolvableType for complex generic types
 * @return Spec for method chaining
 */
public WebFluxInboundEndpointSpec requestPayloadType(ResolvableType requestPayloadType)

Usage Example:

import org.springframework.core.ResolvableType;

@Bean
public IntegrationFlow typedRequestEndpoint() {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/users")
            .requestMapping(m -> m.methods(HttpMethod.POST))
            .requestPayloadType(User.class))
        .transform(User::validate)
        .handle("userService", "createUser")
        .get();
}

// For generic types
@Bean
public IntegrationFlow genericRequestEndpoint() {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/items")
            .requestPayloadType(
                ResolvableType.forClassWithGenerics(List.class, Item.class)))
        .handle("itemService", "processItems")
        .get();
}

Codec Configuration

Configure custom codecs for request/response serialization and deserialization.

/**
 * Set a custom ServerCodecConfigurer for encoding/decoding HTTP messages.
 * @param codecConfigurer - ServerCodecConfigurer instance
 * @return Spec for method chaining
 */
public WebFluxInboundEndpointSpec codecConfigurer(
    ServerCodecConfigurer codecConfigurer)

Usage Example:

import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.HttpMessageWriter;

@Bean
public ServerCodecConfigurer customCodecConfigurer() {
    ServerCodecConfigurer configurer = ServerCodecConfigurer.create();
    // Add custom codecs
    configurer.customCodecs().register(new CustomMessageReader());
    configurer.customCodecs().register(new CustomMessageWriter());
    return configurer;
}

@Bean
public IntegrationFlow customCodecEndpoint(ServerCodecConfigurer customCodecConfigurer) {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/custom")
            .codecConfigurer(customCodecConfigurer)
            .requestPayloadType(CustomMessage.class))
        .handle("customService", "process")
        .get();
}

Content Type Resolution

Configure custom content type resolution strategy.

/**
 * Set a custom RequestedContentTypeResolver for content negotiation.
 * @param requestedContentTypeResolver - RequestedContentTypeResolver instance
 * @return Spec for method chaining
 */
public WebFluxInboundEndpointSpec requestedContentTypeResolver(
    RequestedContentTypeResolver requestedContentTypeResolver)

Usage Example:

import org.springframework.web.reactive.accept.HeaderContentTypeResolver;
import org.springframework.http.MediaType;

@Bean
public RequestedContentTypeResolver customContentTypeResolver() {
    HeaderContentTypeResolver resolver = new HeaderContentTypeResolver();
    resolver.setSupportedMediaTypes(List.of(
        MediaType.APPLICATION_JSON,
        MediaType.APPLICATION_XML,
        MediaType.valueOf("application/vnd.company.v1+json")
    ));
    return resolver;
}

@Bean
public IntegrationFlow negotiatedEndpoint(
        RequestedContentTypeResolver customContentTypeResolver) {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/versioned")
            .requestedContentTypeResolver(customContentTypeResolver))
        .handle("versionedService", "process")
        .get();
}

Reactive Adapter Registry

Configure support for different reactive types (RxJava, Kotlin Coroutines, etc.).

/**
 * Set a custom ReactiveAdapterRegistry for reactive type conversions.
 * @param reactiveAdapterRegistry - ReactiveAdapterRegistry instance
 * @return Spec for method chaining
 */
public WebFluxInboundEndpointSpec reactiveAdapterRegistry(
    ReactiveAdapterRegistry reactiveAdapterRegistry)

Usage Example:

import org.springframework.core.ReactiveAdapterRegistry;

@Bean
public IntegrationFlow rxJavaEndpoint() {
    ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance();

    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/reactive")
            .reactiveAdapterRegistry(registry))
        .handle((payload, headers) -> {
            // Can return RxJava Single, Completable, etc.
            return Single.just(processPayload(payload));
        })
        .get();
}

Header Mapping

Control which HTTP headers are mapped to/from Spring Integration message headers.

/**
 * Set custom header mapper for HTTP header mapping.
 * @param headerMapper - HeaderMapper implementation
 * @return Spec for method chaining
 */
public WebFluxInboundEndpointSpec headerMapper(HeaderMapper<HttpHeaders> headerMapper)

/**
 * Specify which request headers to map to message headers.
 * Supports patterns like "X-*", "!X-Secret-*", etc.
 * @param mappedRequestHeaders - Header name patterns
 * @return Spec for method chaining
 */
public WebFluxInboundEndpointSpec mappedRequestHeaders(String... mappedRequestHeaders)

/**
 * Specify which message headers to map to response headers.
 * @param mappedResponseHeaders - Header name patterns
 * @return Spec for method chaining
 */
public WebFluxInboundEndpointSpec mappedResponseHeaders(String... mappedResponseHeaders)

Usage Example:

@Bean
public IntegrationFlow headerMappingEndpoint() {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/tracked")
            .mappedRequestHeaders(
                "X-Request-ID",
                "X-User-*",     // All headers starting with X-User-
                "!X-User-Secret" // Except X-User-Secret
            )
            .mappedResponseHeaders("X-Response-ID", "X-Processing-Time"))
        .enrichHeaders(h -> h
            .header("X-Response-ID", UUID.randomUUID().toString())
            .header("X-Processing-Time", System.currentTimeMillis()))
        .handle("trackedService", "process")
        .get();
}

Payload Expression

Use SpEL expressions to extract or transform the request payload.

/**
 * Set a SpEL expression to evaluate against the HTTP request for payload extraction.
 * @param payloadExpression - SpEL expression string
 * @return Spec for method chaining
 */
public WebFluxInboundEndpointSpec payloadExpression(String payloadExpression)

Usage Example:

@Bean
public IntegrationFlow payloadExpressionEndpoint() {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/custom-payload")
            // Extract only specific fields from request
            .payloadExpression("#requestParams.id + ':' + #requestParams.name"))
        .handle("customService", "processPayloadString")
        .get();
}

Validation

Enable automatic payload validation using JSR-303 Bean Validation.

/**
 * Set a Validator for request payload validation.
 * @param validator - Validator instance
 * @return Spec for method chaining
 */
public WebFluxInboundEndpointSpec validator(Validator validator)

Usage Example:

import jakarta.validation.Validator;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Email;

public class UserRequest {
    @NotNull
    private String name;

    @Email
    private String email;

    // getters/setters
}

@Bean
public IntegrationFlow validatedEndpoint(Validator validator) {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/users")
            .requestPayloadType(UserRequest.class)
            .validator(validator))
        .handle("userService", "createUser")
        .get();
}

Status Code Expression

Customize HTTP status codes in responses using SpEL expressions.

/**
 * Set a SpEL expression to determine the HTTP status code for responses.
 * @param statusCodeExpression - SpEL expression evaluating to HttpStatus
 * @return Spec for method chaining
 */
public WebFluxInboundEndpointSpec statusCodeExpression(String statusCodeExpression)

Usage Example:

@Bean
public IntegrationFlow customStatusEndpoint() {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/conditional")
            .statusCodeExpression(
                "payload.isSuccess() ? T(org.springframework.http.HttpStatus).OK " +
                ": T(org.springframework.http.HttpStatus).ACCEPTED"))
        .handle("conditionalService", "process")
        .get();
}

Extract Reply Payload

Control whether to extract the payload from the reply message or send the entire Message.

/**
 * Set whether to extract the payload from the reply message for the HTTP response.
 * Default is true.
 * @param extractReplyPayload - true to extract payload, false to use entire Message
 * @return Spec for method chaining
 */
public WebFluxInboundEndpointSpec extractReplyPayload(boolean extractReplyPayload)

Usage Example:

@Bean
public IntegrationFlow fullMessageResponseEndpoint() {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/full-message")
            .extractReplyPayload(false)) // Send full Message with headers
        .handle("messageService", "processWithHeaders")
        .get();
}

Direct Bean Configuration

For advanced use cases, you can configure WebFluxInboundEndpoint beans directly without the DSL.

package org.springframework.integration.webflux.inbound;

/**
 * Direct bean configuration for WebFluxInboundEndpoint.
 */
public class WebFluxInboundEndpoint extends BaseHttpInboundEndpoint
                                     implements WebHandler {
    /**
     * Create an inbound channel adapter (no reply expected).
     */
    public WebFluxInboundEndpoint();

    /**
     * Create an inbound gateway or adapter.
     * @param expectReply - true for gateway, false for adapter
     */
    public WebFluxInboundEndpoint(boolean expectReply);

    // Configuration methods
    public void setCodecConfigurer(ServerCodecConfigurer codecConfigurer);
    public void setRequestedContentTypeResolver(
        RequestedContentTypeResolver requestedContentTypeResolver);
    public void setReactiveAdapterRegistry(ReactiveAdapterRegistry reactiveAdapterRegistry);

    // Runtime method - implements WebHandler
    public Mono<Void> handle(ServerWebExchange exchange);
}

Usage Example:

import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.messaging.MessageChannel;

@Bean
public WebFluxInboundEndpoint customInboundEndpoint(
        MessageChannel requestChannel,
        ServerCodecConfigurer codecConfigurer) {
    WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint(true);
    endpoint.setRequestChannel(requestChannel);
    endpoint.setPath("/custom/endpoint");
    endpoint.setCodecConfigurer(codecConfigurer);

    RequestMapping mapping = new RequestMapping();
    mapping.setMethods(HttpMethod.POST);
    mapping.setConsumes("application/json");
    endpoint.setRequestMapping(mapping);

    return endpoint;
}

Working with Reactive Types

Inbound endpoints seamlessly work with reactive types in your integration flow.

Returning Mono

import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@Bean
public IntegrationFlow monoResponseEndpoint() {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/mono-response"))
        .handle((payload, headers) -> {
            // Return Mono for async processing
            return Mono.fromCallable(() -> processAsync(payload))
                .subscribeOn(Schedulers.boundedElastic());
        })
        .get();
}

Returning Flux

import reactor.core.publisher.Flux;
import java.time.Duration;

@Bean
public IntegrationFlow fluxResponseEndpoint() {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/flux-response")
            .requestMapping(m -> m
                .methods(HttpMethod.GET)
                .produces("application/stream+json")))
        .handle((payload, headers) -> {
            // Return Flux for streaming response
            return Flux.interval(Duration.ofSeconds(1))
                .map(i -> new DataEvent(i, "event-" + i))
                .take(10);
        })
        .get();
}

Handling Reactive Errors

@Bean
public IntegrationFlow reactiveErrorEndpoint() {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/reactive-error"))
        .handle((payload, headers) -> {
            return Mono.just(payload)
                .flatMap(data -> {
                    if (data.isValid()) {
                        return Mono.just(processData(data));
                    } else {
                        return Mono.error(new ValidationException("Invalid data"));
                    }
                })
                .onErrorResume(ValidationException.class, ex ->
                    Mono.just(ErrorResponse.from(ex)));
        })
        .get();
}

Form Data and Multipart Handling

WebFlux inbound endpoints automatically handle form data and multipart requests.

Form Data

import org.springframework.util.MultiValueMap;

@Bean
public IntegrationFlow formDataEndpoint() {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/form")
            .requestMapping(m -> m
                .methods(HttpMethod.POST)
                .consumes("application/x-www-form-urlencoded"))
            .payloadExpression("#requestParams"))
        .handle((payload, headers) -> {
            MultiValueMap<String, String> formData =
                (MultiValueMap<String, String>) payload;
            String name = formData.getFirst("name");
            String email = formData.getFirst("email");
            return processFormData(name, email);
        })
        .get();
}

Multipart File Upload

import org.springframework.http.codec.multipart.FilePart;
import org.springframework.http.codec.multipart.Part;
import org.springframework.core.io.buffer.DataBuffer;

@Bean
public IntegrationFlow fileUploadEndpoint() {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/upload")
            .requestMapping(m -> m
                .methods(HttpMethod.POST)
                .consumes("multipart/form-data")))
        .handle((payload, headers) -> {
            MultiValueMap<String, Part> parts = (MultiValueMap<String, Part>) payload;
            FilePart filePart = (FilePart) parts.getFirst("file");
            return filePart.content()
                .reduce(DataBuffer::write)
                .map(dataBuffer -> saveFile(dataBuffer));
        })
        .get();
}

Path Variables and Query Parameters

Access path variables and query parameters using SpEL expressions.

@Bean
public IntegrationFlow pathVariableEndpoint() {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/users/{userId}/orders/{orderId}")
            .payloadExpression("#pathVariables"))
        .handle((payload, headers) -> {
            Map<String, String> pathVars = (Map<String, String>) payload;
            String userId = pathVars.get("userId");
            String orderId = pathVars.get("orderId");
            return fetchOrder(userId, orderId);
        })
        .get();
}

@Bean
public IntegrationFlow queryParamEndpoint() {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/search")
            .requestMapping(m -> m.methods(HttpMethod.GET))
            .payloadExpression("#requestParams.query"))
        .handle("searchService", "search")
        .get();
}

Error Handling

Configure error handling for inbound endpoints.

Basic Error Handling

@Bean
public IntegrationFlow errorHandlingEndpoint() {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/safe")
            .requestPayloadType(Request.class))
        .handle("riskyService", "process")
        .handle((payload, headers) -> {
            if (payload instanceof Exception) {
                return ErrorResponse.from((Exception) payload);
            }
            return payload;
        })
        .get();
}

Reactive Error Handling

import reactor.core.publisher.Mono;

@Bean
public IntegrationFlow reactiveErrorHandlingEndpoint() {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/reactive-safe")
            .requestPayloadType(Request.class))
        .handle((payload, headers) -> {
            return Mono.just(payload)
                .flatMap(request -> processRequest(request))
                .onErrorResume(Exception.class, ex ->
                    Mono.just(ErrorResponse.from(ex)));
        })
        .get();
}

Error Channel

@Bean
public IntegrationFlow errorChannelEndpoint() {
    return IntegrationFlow
        .from(WebFlux.inboundGateway("/api/error-channel")
            .requestPayloadType(Request.class))
        .channel("processingChannel")
        .get();
}

@Bean
public IntegrationFlow errorHandlerFlow() {
    return IntegrationFlow
        .from("errorChannel")
        .handle((payload, headers) -> {
            Exception ex = (Exception) payload;
            return ErrorResponse.from(ex);
        })
        .get();
}

Types

package org.springframework.integration.webflux.dsl;

/**
 * Spec builder for configuring WebFlux inbound endpoints.
 */
public class WebFluxInboundEndpointSpec
       extends HttpInboundEndpointSupportSpec<WebFluxInboundEndpointSpec,
                                               WebFluxInboundEndpoint> {
    // All configuration methods as documented above
}