Starter for building WebFlux applications using Spring Framework's Reactive Web support
—
WebClient is Spring WebFlux's reactive HTTP client, providing a modern, non-blocking alternative to RestTemplate. It supports reactive streams, backpressure, and integrates seamlessly with the reactive ecosystem for building scalable applications.
public interface WebClient {
RequestHeadersUriSpec<?> get();
RequestBodyUriSpec post();
RequestBodyUriSpec put();
RequestBodyUriSpec patch();
RequestHeadersUriSpec<?> delete();
RequestHeadersUriSpec<?> options();
RequestHeadersUriSpec<?> head();
RequestBodyUriSpec method(HttpMethod method);
Builder mutate();
static WebClient create();
static WebClient create(String baseUrl);
static Builder builder();
interface Builder {
Builder baseUrl(String baseUrl);
Builder defaultUriVariables(Map<String, ?> defaultUriVariables);
Builder uriBuilderFactory(UriBuilderFactory uriBuilderFactory);
Builder defaultHeader(String header, String... values);
Builder defaultHeaders(Consumer<HttpHeaders> headersConsumer);
Builder defaultCookie(String cookie, String... values);
Builder defaultCookies(Consumer<MultiValueMap<String, String>> cookiesConsumer);
Builder defaultRequest(Consumer<RequestHeadersSpec<?>> defaultRequest);
Builder filter(ExchangeFilterFunction filter);
Builder filters(Consumer<List<ExchangeFilterFunction>> filtersConsumer);
Builder clientConnector(ClientHttpConnector connector);
Builder codecs(Consumer<ClientCodecConfigurer> configurer);
Builder exchangeStrategies(ExchangeStrategies strategies);
Builder exchangeFunction(ExchangeFunction exchangeFunction);
Builder clone();
WebClient build();
}
}public interface RequestHeadersUriSpec<S extends RequestHeadersSpec<S>> extends UriSpec<S> {
}
public interface UriSpec<S extends RequestHeadersSpec<S>> {
S uri(String uri, Object... uriVariables);
S uri(String uri, Map<String, ?> uriVariables);
S uri(String uri, Function<UriBuilder, URI> uriFunction);
S uri(URI uri);
}
public interface RequestHeadersSpec<S extends RequestHeadersSpec<S>> {
S header(String headerName, String... headerValues);
S headers(Consumer<HttpHeaders> headersConsumer);
S accept(MediaType... acceptableMediaTypes);
S acceptCharset(Charset... acceptableCharsets);
S cookie(String name, String value);
S cookies(Consumer<MultiValueMap<String, String>> cookiesConsumer);
S ifModifiedSince(ZonedDateTime ifModifiedSince);
S ifNoneMatch(String... ifNoneMatches);
S attribute(String name, Object value);
S attributes(Consumer<Map<String, Object>> attributesConsumer);
S context(Function<Context, Context> contextModifier);
S httpRequest(Consumer<ClientHttpRequest> requestConsumer);
ResponseSpec retrieve();
Mono<ClientResponse> exchange();
<V> Mono<V> exchangeToMono(Function<ClientResponse, ? extends Mono<V>> responseHandler);
<V> Flux<V> exchangeToFlux(Function<ClientResponse, ? extends Flux<V>> responseHandler);
}public interface RequestBodyUriSpec extends RequestBodySpec, RequestHeadersUriSpec<RequestBodySpec> {
}
public interface RequestBodySpec extends RequestHeadersSpec<RequestBodySpec> {
RequestBodySpec contentLength(long contentLength);
RequestBodySpec contentType(MediaType contentType);
RequestHeadersSpec<?> body(BodyInserter<?, ? super ClientHttpRequest> inserter);
<T> RequestHeadersSpec<?> body(Publisher<T> publisher, Class<T> elementClass);
<T> RequestHeadersSpec<?> body(Publisher<T> publisher, ParameterizedTypeReference<T> elementTypeRef);
RequestHeadersSpec<?> body(Object body, Class<?> bodyType);
RequestHeadersSpec<?> body(Object body, ParameterizedTypeReference<?> bodyType);
<T> RequestHeadersSpec<?> body(T body, Class<T> bodyType);
<T> RequestHeadersSpec<?> body(T body, ParameterizedTypeReference<T> bodyType);
RequestHeadersSpec<?> bodyValue(Object body);
RequestHeadersSpec<?> syncBody(Object body);
}public interface ResponseSpec {
ResponseSpec onStatus(Predicate<HttpStatus> statusPredicate, Function<ClientResponse, Mono<? extends Throwable>> exceptionFunction);
ResponseSpec onRawStatus(IntPredicate statusCodePredicate, Function<ClientResponse, Mono<? extends Throwable>> exceptionFunction);
ResponseSpec onStatus(HttpStatus statusCode, Function<ClientResponse, Mono<? extends Throwable>> exceptionFunction);
<T> Mono<T> bodyToMono(Class<T> elementClass);
<T> Mono<T> bodyToMono(ParameterizedTypeReference<T> elementTypeRef);
<T> Flux<T> bodyToFlux(Class<T> elementClass);
<T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> elementTypeRef);
Mono<ResponseEntity<Void>> toEntity(Void unused);
<T> Mono<ResponseEntity<T>> toEntity(Class<T> bodyClass);
<T> Mono<ResponseEntity<T>> toEntity(ParameterizedTypeReference<T> bodyTypeReference);
<T> Mono<ResponseEntity<List<T>>> toEntityList(Class<T> elementClass);
<T> Mono<ResponseEntity<List<T>>> toEntityList(ParameterizedTypeReference<T> elementTypeRef);
Mono<ResponseEntity<Flux<DataBuffer>>> toEntityFlux();
<T> ResponseSpec bodyToMono(Class<T> elementClass);
<T> ResponseSpec bodyToFlux(Class<T> elementClass);
}public interface ClientResponse {
HttpStatus statusCode();
int rawStatusCode();
ResponseCookies cookies();
<T> Mono<T> bodyToMono(Class<T> elementClass);
<T> Mono<T> bodyToMono(ParameterizedTypeReference<T> elementTypeRef);
<T> Flux<T> bodyToFlux(Class<T> elementClass);
<T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> elementTypeRef);
Mono<ResponseEntity<Void>> toEntity(Void unused);
<T> Mono<ResponseEntity<T>> toEntity(Class<T> bodyClass);
<T> Mono<ResponseEntity<T>> toEntity(ParameterizedTypeReference<T> bodyTypeReference);
<T> Mono<ResponseEntity<List<T>>> toEntityList(Class<T> elementClass);
<T> Mono<ResponseEntity<List<T>>> toEntityList(ParameterizedTypeReference<T> elementTypeRef);
Mono<WebClientResponseException> createException();
static Builder from(ClientResponse other);
interface Headers {
OptionalLong contentLength();
Optional<MediaType> contentType();
List<String> header(String headerName);
HttpHeaders asHttpHeaders();
}
interface Builder {
Builder statusCode(HttpStatus statusCode);
Builder rawStatusCode(int statusCode);
Builder header(String headerName, String... headerValues);
Builder headers(Consumer<HttpHeaders> headersConsumer);
Builder cookie(ResponseCookie cookie);
Builder cookies(Consumer<MultiValueMap<String, ResponseCookie>> cookiesConsumer);
Builder body(Flux<DataBuffer> body);
Builder body(String body);
Builder request(HttpRequest request);
ClientResponse build();
}
}@FunctionalInterface
public interface ExchangeFilterFunction {
Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next);
default ExchangeFilterFunction andThen(ExchangeFilterFunction afterFilter);
default RequestFilterFunction requestProcessor();
default ResponseFilterFunction responseProcessor();
static ExchangeFilterFunction ofRequestProcessor(Function<ClientRequest, Mono<ClientRequest>> processor);
static ExchangeFilterFunction ofResponseProcessor(Function<ClientResponse, Mono<ClientResponse>> processor);
}public class ExchangeFilterFunctions {
public static ExchangeFilterFunction basicAuthentication(String username, String password);
public static ExchangeFilterFunction basicAuthentication(String username, String password, Charset charset);
public static ExchangeFilterFunction limitResponseSize(long maxByteCount);
public static ExchangeFilterFunction statusError(Predicate<HttpStatus> statusPredicate, Function<ClientResponse, ? extends Throwable> exceptionFunction);
}public interface ClientCodecConfigurer extends CodecConfigurer {
ClientDefaultCodecs defaultCodecs();
void clone(Consumer<ClientCodecConfigurer> consumer);
interface ClientDefaultCodecs extends DefaultCodecs {
void jackson2JsonEncoder(Encoder<?> encoder);
void jackson2JsonDecoder(Decoder<?> decoder);
void protobufDecoder(Decoder<?> decoder);
void multipartCodecs();
void serverSentEventDecoder(Decoder<?> decoder);
}
}@Service
public class ApiService {
private final WebClient webClient;
public ApiService(WebClient.Builder builder) {
this.webClient = builder
.baseUrl("https://api.example.com")
.defaultHeader(HttpHeaders.USER_AGENT, "MyApp/1.0")
.build();
}
public Mono<User> getUser(String id) {
return webClient.get()
.uri("/users/{id}", id)
.retrieve()
.bodyToMono(User.class);
}
public Flux<User> getAllUsers() {
return webClient.get()
.uri("/users")
.retrieve()
.bodyToFlux(User.class);
}
public Mono<User> createUser(User user) {
return webClient.post()
.uri("/users")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(user)
.retrieve()
.bodyToMono(User.class);
}
public Mono<User> updateUser(String id, User user) {
return webClient.put()
.uri("/users/{id}", id)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(user)
.retrieve()
.bodyToMono(User.class);
}
public Mono<Void> deleteUser(String id) {
return webClient.delete()
.uri("/users/{id}", id)
.retrieve()
.bodyToMono(Void.class);
}
}@Service
public class RobustApiService {
private final WebClient webClient;
public RobustApiService(WebClient.Builder builder) {
this.webClient = builder
.baseUrl("https://api.example.com")
.build();
}
public Mono<User> getUser(String id) {
return webClient.get()
.uri("/users/{id}", id)
.retrieve()
.onStatus(HttpStatus::is4xxClientError, response -> {
if (response.statusCode() == HttpStatus.NOT_FOUND) {
return Mono.error(new UserNotFoundException("User not found: " + id));
}
return response.bodyToMono(String.class)
.flatMap(body -> Mono.error(new ApiException("Client error: " + body)));
})
.onStatus(HttpStatus::is5xxServerError, response ->
Mono.error(new ApiException("Server error: " + response.statusCode())))
.bodyToMono(User.class)
.doOnError(WebClientResponseException.class, ex ->
log.error("API call failed: {} {}", ex.getStatusCode(), ex.getResponseBodyAsString()))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.filter(throwable -> throwable instanceof WebClientResponseException &&
((WebClientResponseException) throwable).getStatusCode().is5xxServerError()));
}
public Mono<Optional<User>> getUserSafely(String id) {
return webClient.get()
.uri("/users/{id}", id)
.retrieve()
.bodyToMono(User.class)
.map(Optional::of)
.onErrorReturn(WebClientResponseException.NotFound.class, Optional.empty())
.onErrorMap(WebClientResponseException.class, ex ->
new ApiException("Failed to fetch user: " + ex.getMessage()));
}
}@Configuration
public class WebClientConfiguration {
@Bean
public WebClient webClient(WebClient.Builder builder) {
return builder
.baseUrl("https://api.example.com")
.defaultHeader(HttpHeaders.USER_AGENT, "MyApp/1.0")
.defaultCookie("session", "abc123")
.filter(loggingFilter())
.filter(authenticationFilter())
.filter(retryFilter())
.codecs(configurer -> {
configurer.defaultCodecs().maxInMemorySize(2 * 1024 * 1024); // 2MB
configurer.defaultCodecs().jackson2JsonEncoder(customJsonEncoder());
configurer.defaultCodecs().jackson2JsonDecoder(customJsonDecoder());
})
.clientConnector(new ReactorClientHttpConnector(
HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.responseTimeout(Duration.ofSeconds(30))
.followRedirect(true)
))
.build();
}
private ExchangeFilterFunction loggingFilter() {
return ExchangeFilterFunction.ofRequestProcessor(request -> {
log.info("Sending {} request to {}", request.method(), request.url());
return Mono.just(request);
});
}
private ExchangeFilterFunction authenticationFilter() {
return (request, next) -> {
ClientRequest authorizedRequest = ClientRequest.from(request)
.header(HttpHeaders.AUTHORIZATION, "Bearer " + getAccessToken())
.build();
return next.exchange(authorizedRequest);
};
}
private ExchangeFilterFunction retryFilter() {
return (request, next) -> {
return next.exchange(request)
.flatMap(response -> {
if (response.statusCode().is5xxServerError()) {
return Mono.error(new ServerException("Server error"));
}
return Mono.just(response);
})
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.filter(ServerException.class::isInstance));
};
}
}@Service
public class FileService {
private final WebClient webClient;
public FileService(WebClient.Builder builder) {
this.webClient = builder
.baseUrl("https://file-api.example.com")
.codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1)) // Unlimited
.build();
}
public Mono<String> uploadFile(String filename, Resource file) {
MultiValueMap<String, Object> parts = new LinkedMultiValueMap<>();
parts.add("file", file);
parts.add("filename", filename);
return webClient.post()
.uri("/upload")
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(parts))
.retrieve()
.bodyToMono(String.class);
}
public Mono<Resource> downloadFile(String fileId) {
return webClient.get()
.uri("/files/{id}", fileId)
.accept(MediaType.APPLICATION_OCTET_STREAM)
.retrieve()
.bodyToMono(Resource.class);
}
public Flux<DataBuffer> streamFile(String fileId) {
return webClient.get()
.uri("/files/{id}/stream", fileId)
.accept(MediaType.APPLICATION_OCTET_STREAM)
.retrieve()
.bodyToFlux(DataBuffer.class);
}
public Mono<Void> uploadLargeFile(String filename, Flux<DataBuffer> fileData) {
return webClient.post()
.uri("/upload/stream")
.contentType(MediaType.APPLICATION_OCTET_STREAM)
.header("X-Filename", filename)
.body(fileData, DataBuffer.class)
.retrieve()
.bodyToMono(Void.class);
}
}@Service
public class EventStreamService {
private final WebClient webClient;
public EventStreamService(WebClient.Builder builder) {
this.webClient = builder
.baseUrl("https://events.example.com")
.build();
}
public Flux<ServerSentEvent<String>> subscribeToEvents(String topic) {
ParameterizedTypeReference<ServerSentEvent<String>> type =
new ParameterizedTypeReference<ServerSentEvent<String>>() {};
return webClient.get()
.uri("/events/{topic}", topic)
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(type)
.doOnSubscribe(subscription -> log.info("Subscribed to events for topic: {}", topic))
.doOnNext(event -> log.debug("Received event: {}", event.data()))
.doOnError(throwable -> log.error("Error in event stream", throwable))
.doOnComplete(() -> log.info("Event stream completed for topic: {}", topic));
}
public Flux<String> subscribeToSimpleEvents(String topic) {
return webClient.get()
.uri("/events/{topic}/simple", topic)
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class);
}
}@Service
public class OAuth2ApiService {
private final WebClient webClient;
private final OAuth2AuthorizedClientManager authorizedClientManager;
public OAuth2ApiService(WebClient.Builder builder,
OAuth2AuthorizedClientManager authorizedClientManager) {
this.authorizedClientManager = authorizedClientManager;
this.webClient = builder
.baseUrl("https://secure-api.example.com")
.filter(oauth2Filter())
.build();
}
private ExchangeFilterFunction oauth2Filter() {
return (request, next) -> {
return authorizedClient()
.map(client -> {
String token = client.getAccessToken().getTokenValue();
return ClientRequest.from(request)
.header(HttpHeaders.AUTHORIZATION, "Bearer " + token)
.build();
})
.flatMap(next::exchange);
};
}
private Mono<OAuth2AuthorizedClient> authorizedClient() {
OAuth2AuthorizeRequest authorizeRequest = OAuth2AuthorizeRequest
.withClientRegistrationId("my-client")
.principal("user")
.build();
return Mono.fromCallable(() -> authorizedClientManager.authorize(authorizeRequest))
.subscribeOn(Schedulers.boundedElastic());
}
public Mono<UserProfile> getUserProfile() {
return webClient.get()
.uri("/profile")
.retrieve()
.bodyToMono(UserProfile.class);
}
}@ExtendWith(MockitoExtension.class)
class WebClientServiceTest {
@Mock
private WebClient.Builder webClientBuilder;
@Mock
private WebClient webClient;
@Mock
private WebClient.RequestHeadersUriSpec requestHeadersUriSpec;
@Mock
private WebClient.ResponseSpec responseSpec;
@InjectMocks
private ApiService apiService;
@Test
void shouldGetUser() {
// Given
String userId = "123";
User expectedUser = new User(userId, "John Doe");
when(webClientBuilder.baseUrl(anyString())).thenReturn(webClientBuilder);
when(webClientBuilder.build()).thenReturn(webClient);
when(webClient.get()).thenReturn(requestHeadersUriSpec);
when(requestHeadersUriSpec.uri("/users/{id}", userId)).thenReturn(requestHeadersUriSpec);
when(requestHeadersUriSpec.retrieve()).thenReturn(responseSpec);
when(responseSpec.bodyToMono(User.class)).thenReturn(Mono.just(expectedUser));
// When
Mono<User> result = apiService.getUser(userId);
// Then
StepVerifier.create(result)
.expectNext(expectedUser)
.verifyComplete();
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-springframework-boot--spring-boot-starter-webflux