RSocket protocol integration with reactive streams, supporting request-response, fire-and-forget, request-stream, and bidirectional channel interaction models for efficient, multiplexed messaging over TCP, WebSocket, and other transports.
Fluent API for making RSocket requests with support for all RSocket interaction models.
/**
* A thin wrapper around a sending RSocket with a fluent API accepting
* and returning higher level Objects for input and output.
*/
public interface RSocketRequester extends Disposable {
/**
* Return the underlying RSocketClient.
*/
RSocketClient rsocketClient();
/**
* Return the underlying RSocket if the requester was created with a live RSocket
* via wrap() or via deprecated connect methods, or null otherwise.
*/
@Nullable
RSocket rsocket();
/**
* Return the data MimeType selected for the underlying RSocket at connection time.
*/
MimeType dataMimeType();
/**
* Return the metadata MimeType selected for the underlying RSocket at connection time.
*/
MimeType metadataMimeType();
/**
* Return the configured RSocketStrategies.
*/
RSocketStrategies strategies();
/**
* Begin to specify a new request with the given route.
*/
RequestSpec route(String route);
/**
* Variant of route(String) with route template variables.
*/
RequestSpec route(String route, Object... routeVars);
/**
* Request specification.
*/
interface RequestSpec {
RequestSpec metadata(Object metadata, MimeType mimeType);
RequestSpec metadata(Consumer<MetadataSpec<?>> configurer);
ResponseSpec data(Object data);
ResponseSpec data(Publisher<?> data);
ResponseSpec data(Publisher<?> data, Class<?> dataType);
ResponseSpec data(Publisher<?> data, ParameterizedTypeReference<?> dataType);
}
/**
* Response specification for retrieving responses.
*/
interface ResponseSpec {
<T> Mono<T> retrieveMono(Class<T> dataType);
<T> Mono<T> retrieveMono(ParameterizedTypeReference<T> dataType);
<T> Flux<T> retrieveFlux(Class<T> dataType);
<T> Flux<T> retrieveFlux(ParameterizedTypeReference<T> dataType);
}
/**
* Builder to create an RSocketRequester.
*/
interface Builder {
Builder dataMimeType(MimeType mimeType);
Builder metadataMimeType(MimeType mimeType);
Builder rsocketStrategies(RSocketStrategies strategies);
Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer);
Builder rsocketConnector(RSocketConnectorConfigurer configurer);
RSocketRequester tcp(String host, int port);
RSocketRequester websocket(URI uri);
Mono<RSocketRequester> connectTcp(String host, int port);
Mono<RSocketRequester> connectWebSocket(URI uri);
}
static Builder builder();
}Usage Examples:
import org.springframework.messaging.rsocket.RSocketRequester;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import org.springframework.core.ParameterizedTypeReference;
import java.net.URI;
import java.time.Duration;
// Create requester
RSocketRequester requester = RSocketRequester.builder()
.tcp("localhost", 7000);
// Request-Response
Mono<String> response = requester
.route("greeting")
.data("World")
.retrieveMono(String.class);
response.subscribe(result ->
System.out.println("Response: " + result)
);
// Request-Stream
Flux<Message> messages = requester
.route("stream")
.data("start")
.retrieveFlux(Message.class);
messages.subscribe(msg ->
System.out.println("Received: " + msg.getContent())
);
// Fire-and-Forget
requester
.route("notify")
.data(new Event("user.login"))
.send()
.subscribe();
// Channel (bidirectional)
Flux<InputData> inputStream = Flux.interval(Duration.ofSeconds(1))
.map(i -> new InputData("data-" + i));
Flux<OutputData> outputStream = requester
.route("channel")
.data(inputStream)
.retrieveFlux(OutputData.class);
outputStream.subscribe(output ->
System.out.println("Channel output: " + output.getValue())
);
// With metadata
requester
.route("secure.endpoint")
.metadata("Bearer token123", org.springframework.util.MimeTypeUtils.TEXT_PLAIN)
.data(request)
.retrieveMono(Response.class)
.subscribe();
// Typed response
ParameterizedTypeReference<List<User>> typeRef =
new ParameterizedTypeReference<List<User>>() {};
Mono<List<User>> users = requester
.route("users.list")
.retrieveMono(typeRef);
class Message {
private String content;
public String getContent() { return content; }
public void setContent(String content) { this.content = content; }
}
class Event {
private String type;
public Event(String type) { this.type = type; }
public String getType() { return type; }
}
class InputData {
private String value;
public InputData(String value) { this.value = value; }
public String getValue() { return value; }
}
class OutputData {
private String value;
public String getValue() { return value; }
public void setValue(String value) { this.value = value; }
}
Object request = new Object();
class Response {}
class User {}Configuration for encoders, decoders, and other strategies used by RSocket requester and responder.
/**
* Access to strategies for use by RSocket requester and responder components.
*/
public interface RSocketStrategies {
List<Encoder<?>> encoders();
List<Decoder<?>> decoders();
RouteMatcher routeMatcher();
ReactiveAdapterRegistry reactiveAdapterRegistry();
DataBufferFactory dataBufferFactory();
MetadataExtractor metadataExtractor();
interface Builder {
Builder encoder(Encoder<?>... encoders);
Builder encoders(Consumer<List<Encoder<?>>> consumer);
Builder decoder(Decoder<?>... decoders);
Builder decoders(Consumer<List<Decoder<?>>> consumer);
Builder routeMatcher(RouteMatcher routeMatcher);
Builder reactiveAdapterStrategy(ReactiveAdapterRegistry registry);
Builder dataBufferFactory(DataBufferFactory bufferFactory);
Builder metadataExtractor(MetadataExtractor extractor);
Builder metadataExtractorRegistry(Consumer<MetadataExtractorRegistry> configurer);
RSocketStrategies build();
}
static Builder builder();
}Usage Examples:
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import com.fasterxml.jackson.databind.ObjectMapper;
// Configure custom strategies
ObjectMapper objectMapper = new ObjectMapper();
// Configure objectMapper...
RSocketStrategies strategies = RSocketStrategies.builder()
.encoder(new Jackson2JsonEncoder(objectMapper))
.decoder(new Jackson2JsonDecoder(objectMapper))
.build();
// Use with requester
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000);
// Or configure inline
RSocketRequester requester2 = RSocketRequester.builder()
.rsocketStrategies(builder -> builder
.encoder(new Jackson2JsonEncoder())
.decoder(new Jackson2JsonDecoder())
)
.tcp("localhost", 7000);Handles RSocket connection setup.
/**
* Annotation to map an RSocket connection to a handler method.
*/
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ConnectMapping {
/**
* Patterns to match the route of SETUP frames.
*/
String[] value() default {};
}Usage Examples:
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Mono;
@Controller
public class RSocketController {
// Handle connection setup
@ConnectMapping
public Mono<Void> handleConnect() {
System.out.println("RSocket connection established");
return Mono.empty();
}
// Handle connection with specific route
@ConnectMapping("/secure")
public Mono<Void> handleSecureConnect(@Payload AuthRequest auth) {
System.out.println("Secure connection for: " + auth.getUsername());
// Validate authentication
return Mono.empty();
}
// Request-Response
@MessageMapping("greeting")
public Mono<String> handleGreeting(String name) {
return Mono.just("Hello, " + name + "!");
}
// Request-Stream
@MessageMapping("stream")
public Flux<String> handleStream(String request) {
return Flux.interval(Duration.ofSeconds(1))
.map(i -> "Stream item " + i);
}
// Fire-and-Forget
@MessageMapping("notify")
public Mono<Void> handleNotification(Event event) {
System.out.println("Notification: " + event.getType());
return Mono.empty();
}
// Channel (bidirectional)
@MessageMapping("channel")
public Flux<OutputData> handleChannel(Flux<InputData> input) {
return input.map(data ->
new OutputData("Processed: " + data.getValue())
);
}
}
class AuthRequest {
private String username;
public String getUsername() { return username; }
}Declares RSocket service interface for client proxy generation.
/**
* Annotation to declare an RSocket service interface with request methods.
* Used with RSocketServiceProxyFactory to create client proxies.
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RSocketExchange {
/**
* Common route prefix for all methods in the interface.
*/
String value() default "";
}Usage Examples:
import org.springframework.messaging.rsocket.service.RSocketExchange;
import org.springframework.messaging.handler.annotation.MessageMapping;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
// Define service interface
@RSocketExchange
public interface ChatService {
@MessageMapping("chat.send")
Mono<Void> sendMessage(ChatMessage message);
@MessageMapping("chat.history")
Flux<ChatMessage> getHistory(String roomId);
@MessageMapping("chat.user")
Mono<User> getUserInfo(String userId);
}
// Create service proxy
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder()
.rsocketRequester(requester)
.build();
ChatService chatService = factory.createClient(ChatService.class);
// Use the proxy
chatService.sendMessage(new ChatMessage("Hello", "user1"))
.subscribe();
chatService.getHistory("room1")
.subscribe(msg -> System.out.println(msg.getContent()));
// Service with common prefix
@RSocketExchange("/api")
public interface ApiService {
@MessageMapping("users.get")
Mono<User> getUser(String id);
@MessageMapping("users.list")
Flux<User> listUsers();
}Factory for creating RSocket service client proxies.
/**
* Factory to create client proxies for @RSocketExchange annotated interfaces.
*/
public interface RSocketServiceProxyFactory {
/**
* Create a client proxy for the given service interface.
*/
<S> S createClient(Class<S> serviceType);
interface Builder {
Builder rsocketRequester(RSocketRequester requester);
Builder blockTimeout(Duration blockTimeout);
Builder customArgumentResolver(RSocketServiceArgumentResolver resolver);
RSocketServiceProxyFactory build();
}
static Builder builder();
}Extracts metadata from RSocket frames.
/**
* Strategy to extract a map of values from Payload metadata.
*/
public interface MetadataExtractor {
String ROUTE_KEY = "route";
Map<String, Object> extract(Payload payload, MimeType metadataMimeType);
}import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
@SpringBootApplication
public class RSocketApplication {
public static void main(String[] args) {
SpringApplication.run(RSocketApplication.class, args);
}
@Bean
public RSocketRequester rSocketRequester(RSocketRequester.Builder builder) {
return builder.tcp("localhost", 7000);
}
}
@Controller
public class RSocketServerController {
@ConnectMapping
public Mono<Void> handleConnect() {
System.out.println("Client connected");
return Mono.empty();
}
@MessageMapping("request-response")
public Mono<Response> requestResponse(Request request) {
return Mono.just(new Response("Processed: " + request.getData()));
}
@MessageMapping("fire-and-forget")
public Mono<Void> fireAndForget(Event event) {
System.out.println("Event: " + event.getType());
return Mono.empty();
}
@MessageMapping("request-stream")
public Flux<StreamData> requestStream(String request) {
return Flux.interval(Duration.ofSeconds(1))
.map(i -> new StreamData("Item " + i))
.take(10);
}
@MessageMapping("stream-stream")
public Flux<OutputData> streamStream(Flux<InputData> input) {
return input.map(data ->
new OutputData("Echo: " + data.getValue())
);
}
}
class Request {
private String data;
public String getData() { return data; }
}
class StreamData {
private String value;
public StreamData(String value) { this.value = value; }
public String getValue() { return value; }
}