Starter for building RSocket clients and servers with Spring Boot auto-configuration.
—
RSocket client configuration with RSocketRequester builder for establishing connections and making requests to RSocket servers.
Prototype-scoped builder for creating RSocket client connections with customized configuration.
/**
* Builder for creating RSocket requesters (client connections)
* Provided as prototype bean - new instance per injection point
*/
interface RSocketRequester.Builder {
/** Configure RSocket strategies (codecs, routing) */
RSocketRequester.Builder rsocketStrategies(RSocketStrategies strategies);
/** Configure RSocket connector */
RSocketRequester.Builder rsocketConnector(Consumer<RSocketConnector.RSocketConnectorBuilder> configurer);
/** Configure data MIME type */
RSocketRequester.Builder dataMimeType(MimeType mimeType);
/** Configure metadata MIME type */
RSocketRequester.Builder metadataMimeType(MimeType mimeType);
/** Set setup route for connection */
RSocketRequester.Builder setupRoute(String route);
/** Set setup data */
RSocketRequester.Builder setupData(Object data);
/** Set setup metadata */
RSocketRequester.Builder setupMetadata(Object metadata, MimeType mimeType);
/** Connect via TCP transport */
Mono<RSocketRequester> tcp(String host, int port);
/** Connect via WebSocket transport */
Mono<RSocketRequester> websocket(URI uri);
/** Connect with custom transport */
Mono<RSocketRequester> transport(ClientTransport transport);
}Basic Usage Examples:
@Service
public class RSocketClientService {
private final RSocketRequester.Builder requesterBuilder;
public RSocketClientService(RSocketRequester.Builder requesterBuilder) {
this.requesterBuilder = requesterBuilder;
}
// TCP connection
public Mono<String> connectTcp() {
return requesterBuilder
.tcp("localhost", 9898)
.route("hello")
.data("World")
.retrieveMono(String.class);
}
// WebSocket connection
public Mono<String> connectWebSocket() {
return requesterBuilder
.websocket(URI.create("ws://localhost:8080/rsocket"))
.route("greeting")
.data("Client")
.retrieveMono(String.class);
}
// Connection with setup
public Mono<RSocketRequester> connectWithSetup() {
return requesterBuilder
.setupRoute("client.setup")
.setupData("client-id-123")
.tcp("localhost", 9898);
}
}Client interface for making RSocket requests with various interaction patterns.
/**
* RSocket client requester for making requests to RSocket servers
*/
interface RSocketRequester {
/** Create request spec with routing metadata */
RequestSpec route(String route, Object... routeVars);
/** Create request spec with custom metadata */
RequestSpec metadata(Object metadata, MimeType mimeType);
/** Get RSocket instance for low-level operations */
RSocket rsocket();
/** Get configured data MIME type */
MimeType dataMimeType();
/** Get configured metadata MIME type */
MimeType metadataMimeType();
/** Close the connection */
void dispose();
/** Check if connection is disposed */
boolean isDisposed();
}
/**
* Request specification for configuring RSocket requests
*/
interface RequestSpec {
/** Set request data payload */
RequestSpec data(Object data);
/** Add metadata to request */
RequestSpec metadata(Object metadata, MimeType mimeType);
/** Fire-and-forget interaction pattern */
Mono<Void> send();
/** Request-response interaction pattern */
<T> Mono<T> retrieveMono(Class<T> dataType);
<T> Mono<T> retrieveMono(ParameterizedTypeReference<T> dataTypeRef);
/** Request-stream interaction pattern */
<T> Flux<T> retrieveFlux(Class<T> dataType);
<T> Flux<T> retrieveFlux(ParameterizedTypeReference<T> dataTypeRef);
}Interaction Pattern Examples:
@Service
public class RSocketInteractionService {
private final Mono<RSocketRequester> requesterMono;
public RSocketInteractionService(RSocketRequester.Builder builder) {
this.requesterMono = builder.tcp("localhost", 9898).cache();
}
// Fire-and-forget
public Mono<Void> sendNotification(String message) {
return requesterMono.flatMap(requester ->
requester.route("notification")
.data(message)
.send()
);
}
// Request-response
public Mono<User> getUser(String userId) {
return requesterMono.flatMap(requester ->
requester.route("user.get")
.data(userId)
.retrieveMono(User.class)
);
}
// Request-stream
public Flux<StockPrice> streamPrices(String symbol) {
return requesterMono.flatMapMany(requester ->
requester.route("stock.stream")
.data(symbol)
.retrieveFlux(StockPrice.class)
);
}
// Request-channel (bidirectional streaming)
public Flux<ChatMessage> chat(Flux<ChatMessage> outbound) {
return requesterMono.flatMapMany(requester ->
requester.route("chat")
.data(outbound)
.retrieveFlux(ChatMessage.class)
);
}
}Customize RSocket connector for advanced client configuration.
/**
* Configurer for RSocket connectors
*/
@FunctionalInterface
interface RSocketConnectorConfigurer {
/**
* Configure the RSocket connector
* @param connector Connector builder to customize
*/
void configure(RSocketConnector.RSocketConnectorBuilder connector);
}Advanced Configuration Examples:
@Configuration
public class RSocketClientConfiguration {
// Connection timeout and retry configuration
@Bean
public RSocketConnectorConfigurer connectionConfigurer() {
return connector -> connector
.connectTimeout(Duration.ofSeconds(30))
.reconnect(Retry.backoff(3, Duration.ofSeconds(1)));
}
// Resume capability configuration
@Bean
public RSocketConnectorConfigurer resumeConfigurer() {
return connector -> connector
.resume(Resume.create()
.sessionDuration(Duration.ofMinutes(5))
.retry(Retry.fixedDelay(3, Duration.ofSeconds(1))));
}
// Lease configuration
@Bean
public RSocketConnectorConfigurer leaseConfigurer() {
return connector -> connector
.lease(Leases::create);
}
// Interceptor configuration
@Bean
public RSocketConnectorConfigurer interceptorConfigurer() {
return connector -> connector
.interceptors(registry -> {
registry.forConnection(new LoggingConnectionInterceptor());
registry.forRequester(new MetricsRequesterInterceptor());
});
}
}Support for different transport protocols and configurations.
// TCP Transport
ClientTransport tcpTransport = TcpClientTransport.create("localhost", 9898);
// WebSocket Transport
ClientTransport wsTransport = WebsocketClientTransport.create(URI.create("ws://localhost:8080/rsocket"));
// Custom transport usage
Mono<RSocketRequester> requester = builder.transport(tcpTransport);SSL/TLS Transport Example:
@Configuration
public class SecureClientConfiguration {
@Bean
public RSocketRequester.Builder secureRequesterBuilder(RSocketStrategies strategies) {
SslContext sslContext = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
TcpClient tcpClient = TcpClient.create()
.host("secure-server.example.com")
.port(9443)
.secure(sslSpec -> sslSpec.sslContext(sslContext));
ClientTransport transport = TcpClientTransport.create(tcpClient);
return RSocketRequester.builder()
.rsocketStrategies(strategies)
.transport(transport);
}
}Efficient connection management for high-throughput applications.
@Service
public class PooledRSocketService {
private final Mono<RSocketRequester> sharedRequester;
private final LoadBalancer<RSocketRequester> loadBalancer;
public PooledRSocketService(RSocketRequester.Builder builder) {
// Shared connection with connection pooling
this.sharedRequester = builder
.tcp("localhost", 9898)
.cache();
// Load balancer for multiple servers
this.loadBalancer = LoadBalancer.create(
Flux.just("server1:9898", "server2:9898", "server3:9898")
.map(address -> {
String[] parts = address.split(":");
return builder.tcp(parts[0], Integer.parseInt(parts[1]));
})
);
}
public <T> Mono<T> makeRequest(String route, Object data, Class<T> responseType) {
return sharedRequester.flatMap(requester ->
requester.route(route)
.data(data)
.retrieveMono(responseType)
);
}
public <T> Mono<T> makeLoadBalancedRequest(String route, Object data, Class<T> responseType) {
return loadBalancer.next().flatMap(requester ->
requester.route(route)
.data(data)
.retrieveMono(responseType)
);
}
}Robust error handling patterns for RSocket clients.
@Service
public class ResilientRSocketService {
private final Mono<RSocketRequester> requesterMono;
public ResilientRSocketService(RSocketRequester.Builder builder) {
this.requesterMono = builder
.tcp("localhost", 9898)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.filter(throwable -> throwable instanceof ConnectException))
.cache();
}
public <T> Mono<T> resilientRequest(String route, Object data, Class<T> responseType) {
return requesterMono.flatMap(requester ->
requester.route(route)
.data(data)
.retrieveMono(responseType)
.retryWhen(Retry.backoff(2, Duration.ofMillis(500))
.filter(throwable -> !(throwable instanceof ApplicationException)))
.timeout(Duration.ofSeconds(10))
.onErrorResume(TimeoutException.class,
ex -> Mono.error(new ServiceUnavailableException("Request timeout")))
.onErrorResume(RSocketException.class,
ex -> Mono.error(new CommunicationException("RSocket error", ex)))
);
}
}Client-side configuration through application properties:
# No specific client properties - configuration done through code
# Server connection details configured per requester builder
# Custom MIME types can be set globally
spring:
rsocket:
# These apply to both client and server
strategies:
data-mime-type: application/cbor
metadata-mime-type: message/x.rsocket.routing.v0Testing utilities for RSocket clients:
@SpringBootTest
class RSocketClientTest {
@Autowired
private RSocketRequester.Builder requesterBuilder;
@Test
void testClientConnection() {
StepVerifier.create(
requesterBuilder
.tcp("localhost", 9898)
.route("test")
.data("ping")
.retrieveMono(String.class)
)
.expectNext("pong")
.verifyComplete();
}
@Test
void testWithMockServer() {
RSocketRequester requester = requesterBuilder
.transport(LocalClientTransport.create("test-server"))
.block();
StepVerifier.create(
requester.route("mock.test")
.data("test-data")
.retrieveMono(String.class)
)
.expectNext("mock-response")
.verifyComplete();
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-springframework-boot--spring-boot-starter-rsocket