or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

channel-support.mdcore-messaging.mdhandler-annotations.mdindex.mdmessage-converters.mdmessaging-templates.mdrsocket.mdsimp-configuration.mdstomp-websocket.md
tile.json

rsocket.mddocs/

RSocket Support

RSocket protocol integration with reactive streams, supporting request-response, fire-and-forget, request-stream, and bidirectional channel interaction models for efficient, multiplexed messaging over TCP, WebSocket, and other transports.

Capabilities

RSocketRequester Interface

Fluent API for making RSocket requests with support for all RSocket interaction models.

/**
 * A thin wrapper around a sending RSocket with a fluent API accepting
 * and returning higher level Objects for input and output.
 */
public interface RSocketRequester extends Disposable {
    /**
     * Return the underlying RSocketClient.
     */
    RSocketClient rsocketClient();

    /**
     * Return the underlying RSocket if the requester was created with a live RSocket
     * via wrap() or via deprecated connect methods, or null otherwise.
     */
    @Nullable
    RSocket rsocket();

    /**
     * Return the data MimeType selected for the underlying RSocket at connection time.
     */
    MimeType dataMimeType();

    /**
     * Return the metadata MimeType selected for the underlying RSocket at connection time.
     */
    MimeType metadataMimeType();

    /**
     * Return the configured RSocketStrategies.
     */
    RSocketStrategies strategies();

    /**
     * Begin to specify a new request with the given route.
     */
    RequestSpec route(String route);

    /**
     * Variant of route(String) with route template variables.
     */
    RequestSpec route(String route, Object... routeVars);

    /**
     * Request specification.
     */
    interface RequestSpec {
        RequestSpec metadata(Object metadata, MimeType mimeType);
        RequestSpec metadata(Consumer<MetadataSpec<?>> configurer);

        ResponseSpec data(Object data);
        ResponseSpec data(Publisher<?> data);
        ResponseSpec data(Publisher<?> data, Class<?> dataType);
        ResponseSpec data(Publisher<?> data, ParameterizedTypeReference<?> dataType);
    }

    /**
     * Response specification for retrieving responses.
     */
    interface ResponseSpec {
        <T> Mono<T> retrieveMono(Class<T> dataType);
        <T> Mono<T> retrieveMono(ParameterizedTypeReference<T> dataType);
        <T> Flux<T> retrieveFlux(Class<T> dataType);
        <T> Flux<T> retrieveFlux(ParameterizedTypeReference<T> dataType);
    }

    /**
     * Builder to create an RSocketRequester.
     */
    interface Builder {
        Builder dataMimeType(MimeType mimeType);
        Builder metadataMimeType(MimeType mimeType);
        Builder rsocketStrategies(RSocketStrategies strategies);
        Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer);
        Builder rsocketConnector(RSocketConnectorConfigurer configurer);

        RSocketRequester tcp(String host, int port);
        RSocketRequester websocket(URI uri);
        Mono<RSocketRequester> connectTcp(String host, int port);
        Mono<RSocketRequester> connectWebSocket(URI uri);
    }

    static Builder builder();
}

Usage Examples:

import org.springframework.messaging.rsocket.RSocketRequester;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import org.springframework.core.ParameterizedTypeReference;
import java.net.URI;
import java.time.Duration;

// Create requester
RSocketRequester requester = RSocketRequester.builder()
    .tcp("localhost", 7000);

// Request-Response
Mono<String> response = requester
    .route("greeting")
    .data("World")
    .retrieveMono(String.class);

response.subscribe(result ->
    System.out.println("Response: " + result)
);

// Request-Stream
Flux<Message> messages = requester
    .route("stream")
    .data("start")
    .retrieveFlux(Message.class);

messages.subscribe(msg ->
    System.out.println("Received: " + msg.getContent())
);

// Fire-and-Forget
requester
    .route("notify")
    .data(new Event("user.login"))
    .send()
    .subscribe();

// Channel (bidirectional)
Flux<InputData> inputStream = Flux.interval(Duration.ofSeconds(1))
    .map(i -> new InputData("data-" + i));

Flux<OutputData> outputStream = requester
    .route("channel")
    .data(inputStream)
    .retrieveFlux(OutputData.class);

outputStream.subscribe(output ->
    System.out.println("Channel output: " + output.getValue())
);

// With metadata
requester
    .route("secure.endpoint")
    .metadata("Bearer token123", org.springframework.util.MimeTypeUtils.TEXT_PLAIN)
    .data(request)
    .retrieveMono(Response.class)
    .subscribe();

// Typed response
ParameterizedTypeReference<List<User>> typeRef =
    new ParameterizedTypeReference<List<User>>() {};

Mono<List<User>> users = requester
    .route("users.list")
    .retrieveMono(typeRef);

class Message {
    private String content;
    public String getContent() { return content; }
    public void setContent(String content) { this.content = content; }
}

class Event {
    private String type;
    public Event(String type) { this.type = type; }
    public String getType() { return type; }
}

class InputData {
    private String value;
    public InputData(String value) { this.value = value; }
    public String getValue() { return value; }
}

class OutputData {
    private String value;
    public String getValue() { return value; }
    public void setValue(String value) { this.value = value; }
}

Object request = new Object();
class Response {}
class User {}

RSocketStrategies Interface

Configuration for encoders, decoders, and other strategies used by RSocket requester and responder.

/**
 * Access to strategies for use by RSocket requester and responder components.
 */
public interface RSocketStrategies {
    List<Encoder<?>> encoders();
    List<Decoder<?>> decoders();
    RouteMatcher routeMatcher();
    ReactiveAdapterRegistry reactiveAdapterRegistry();
    DataBufferFactory dataBufferFactory();
    MetadataExtractor metadataExtractor();

    interface Builder {
        Builder encoder(Encoder<?>... encoders);
        Builder encoders(Consumer<List<Encoder<?>>> consumer);
        Builder decoder(Decoder<?>... decoders);
        Builder decoders(Consumer<List<Decoder<?>>> consumer);
        Builder routeMatcher(RouteMatcher routeMatcher);
        Builder reactiveAdapterStrategy(ReactiveAdapterRegistry registry);
        Builder dataBufferFactory(DataBufferFactory bufferFactory);
        Builder metadataExtractor(MetadataExtractor extractor);
        Builder metadataExtractorRegistry(Consumer<MetadataExtractorRegistry> configurer);

        RSocketStrategies build();
    }

    static Builder builder();
}

Usage Examples:

import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import com.fasterxml.jackson.databind.ObjectMapper;

// Configure custom strategies
ObjectMapper objectMapper = new ObjectMapper();
// Configure objectMapper...

RSocketStrategies strategies = RSocketStrategies.builder()
    .encoder(new Jackson2JsonEncoder(objectMapper))
    .decoder(new Jackson2JsonDecoder(objectMapper))
    .build();

// Use with requester
RSocketRequester requester = RSocketRequester.builder()
    .rsocketStrategies(strategies)
    .tcp("localhost", 7000);

// Or configure inline
RSocketRequester requester2 = RSocketRequester.builder()
    .rsocketStrategies(builder -> builder
        .encoder(new Jackson2JsonEncoder())
        .decoder(new Jackson2JsonDecoder())
    )
    .tcp("localhost", 7000);

@ConnectMapping Annotation

Handles RSocket connection setup.

/**
 * Annotation to map an RSocket connection to a handler method.
 */
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ConnectMapping {
    /**
     * Patterns to match the route of SETUP frames.
     */
    String[] value() default {};
}

Usage Examples:

import org.springframework.messaging.rsocket.annotation.ConnectMapping;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Mono;

@Controller
public class RSocketController {

    // Handle connection setup
    @ConnectMapping
    public Mono<Void> handleConnect() {
        System.out.println("RSocket connection established");
        return Mono.empty();
    }

    // Handle connection with specific route
    @ConnectMapping("/secure")
    public Mono<Void> handleSecureConnect(@Payload AuthRequest auth) {
        System.out.println("Secure connection for: " + auth.getUsername());
        // Validate authentication
        return Mono.empty();
    }

    // Request-Response
    @MessageMapping("greeting")
    public Mono<String> handleGreeting(String name) {
        return Mono.just("Hello, " + name + "!");
    }

    // Request-Stream
    @MessageMapping("stream")
    public Flux<String> handleStream(String request) {
        return Flux.interval(Duration.ofSeconds(1))
            .map(i -> "Stream item " + i);
    }

    // Fire-and-Forget
    @MessageMapping("notify")
    public Mono<Void> handleNotification(Event event) {
        System.out.println("Notification: " + event.getType());
        return Mono.empty();
    }

    // Channel (bidirectional)
    @MessageMapping("channel")
    public Flux<OutputData> handleChannel(Flux<InputData> input) {
        return input.map(data ->
            new OutputData("Processed: " + data.getValue())
        );
    }
}

class AuthRequest {
    private String username;
    public String getUsername() { return username; }
}

@RSocketExchange Annotation

Declares RSocket service interface for client proxy generation.

/**
 * Annotation to declare an RSocket service interface with request methods.
 * Used with RSocketServiceProxyFactory to create client proxies.
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RSocketExchange {
    /**
     * Common route prefix for all methods in the interface.
     */
    String value() default "";
}

Usage Examples:

import org.springframework.messaging.rsocket.service.RSocketExchange;
import org.springframework.messaging.handler.annotation.MessageMapping;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;

// Define service interface
@RSocketExchange
public interface ChatService {

    @MessageMapping("chat.send")
    Mono<Void> sendMessage(ChatMessage message);

    @MessageMapping("chat.history")
    Flux<ChatMessage> getHistory(String roomId);

    @MessageMapping("chat.user")
    Mono<User> getUserInfo(String userId);
}

// Create service proxy
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder()
    .rsocketRequester(requester)
    .build();

ChatService chatService = factory.createClient(ChatService.class);

// Use the proxy
chatService.sendMessage(new ChatMessage("Hello", "user1"))
    .subscribe();

chatService.getHistory("room1")
    .subscribe(msg -> System.out.println(msg.getContent()));

// Service with common prefix
@RSocketExchange("/api")
public interface ApiService {

    @MessageMapping("users.get")
    Mono<User> getUser(String id);

    @MessageMapping("users.list")
    Flux<User> listUsers();
}

RSocketServiceProxyFactory Interface

Factory for creating RSocket service client proxies.

/**
 * Factory to create client proxies for @RSocketExchange annotated interfaces.
 */
public interface RSocketServiceProxyFactory {
    /**
     * Create a client proxy for the given service interface.
     */
    <S> S createClient(Class<S> serviceType);

    interface Builder {
        Builder rsocketRequester(RSocketRequester requester);
        Builder blockTimeout(Duration blockTimeout);
        Builder customArgumentResolver(RSocketServiceArgumentResolver resolver);

        RSocketServiceProxyFactory build();
    }

    static Builder builder();
}

MetadataExtractor Interface

Extracts metadata from RSocket frames.

/**
 * Strategy to extract a map of values from Payload metadata.
 */
public interface MetadataExtractor {
    String ROUTE_KEY = "route";

    Map<String, Object> extract(Payload payload, MimeType metadataMimeType);
}

Complete RSocket Application Example

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;

@SpringBootApplication
public class RSocketApplication {
    public static void main(String[] args) {
        SpringApplication.run(RSocketApplication.class, args);
    }

    @Bean
    public RSocketRequester rSocketRequester(RSocketRequester.Builder builder) {
        return builder.tcp("localhost", 7000);
    }
}

@Controller
public class RSocketServerController {

    @ConnectMapping
    public Mono<Void> handleConnect() {
        System.out.println("Client connected");
        return Mono.empty();
    }

    @MessageMapping("request-response")
    public Mono<Response> requestResponse(Request request) {
        return Mono.just(new Response("Processed: " + request.getData()));
    }

    @MessageMapping("fire-and-forget")
    public Mono<Void> fireAndForget(Event event) {
        System.out.println("Event: " + event.getType());
        return Mono.empty();
    }

    @MessageMapping("request-stream")
    public Flux<StreamData> requestStream(String request) {
        return Flux.interval(Duration.ofSeconds(1))
            .map(i -> new StreamData("Item " + i))
            .take(10);
    }

    @MessageMapping("stream-stream")
    public Flux<OutputData> streamStream(Flux<InputData> input) {
        return input.map(data ->
            new OutputData("Echo: " + data.getValue())
        );
    }
}

class Request {
    private String data;
    public String getData() { return data; }
}

class StreamData {
    private String value;
    public StreamData(String value) { this.value = value; }
    public String getValue() { return value; }
}