CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-springframework-boot--spring-boot-starter-rsocket

Starter for building RSocket clients and servers with Spring Boot auto-configuration.

Pending
Overview
Eval results
Files

client-configuration.mddocs/

Client Configuration

RSocket client configuration with RSocketRequester builder for establishing connections and making requests to RSocket servers.

Capabilities

RSocket Requester Builder

Prototype-scoped builder for creating RSocket client connections with customized configuration.

/**
 * Builder for creating RSocket requesters (client connections)
 * Provided as prototype bean - new instance per injection point
 */
interface RSocketRequester.Builder {
    
    /** Configure RSocket strategies (codecs, routing) */
    RSocketRequester.Builder rsocketStrategies(RSocketStrategies strategies);
    
    /** Configure RSocket connector */
    RSocketRequester.Builder rsocketConnector(Consumer<RSocketConnector.RSocketConnectorBuilder> configurer);
    
    /** Configure data MIME type */
    RSocketRequester.Builder dataMimeType(MimeType mimeType);
    
    /** Configure metadata MIME type */
    RSocketRequester.Builder metadataMimeType(MimeType mimeType);
    
    /** Set setup route for connection */
    RSocketRequester.Builder setupRoute(String route);
    
    /** Set setup data */  
    RSocketRequester.Builder setupData(Object data);
    
    /** Set setup metadata */
    RSocketRequester.Builder setupMetadata(Object metadata, MimeType mimeType);
    
    /** Connect via TCP transport */
    Mono<RSocketRequester> tcp(String host, int port);
    
    /** Connect via WebSocket transport */
    Mono<RSocketRequester> websocket(URI uri);
    
    /** Connect with custom transport */
    Mono<RSocketRequester> transport(ClientTransport transport);
}

Basic Usage Examples:

@Service
public class RSocketClientService {
    
    private final RSocketRequester.Builder requesterBuilder;
    
    public RSocketClientService(RSocketRequester.Builder requesterBuilder) {
        this.requesterBuilder = requesterBuilder;
    }
    
    // TCP connection
    public Mono<String> connectTcp() {
        return requesterBuilder
            .tcp("localhost", 9898)
            .route("hello")
            .data("World")
            .retrieveMono(String.class);
    }
    
    // WebSocket connection
    public Mono<String> connectWebSocket() {
        return requesterBuilder
            .websocket(URI.create("ws://localhost:8080/rsocket"))
            .route("greeting")
            .data("Client")
            .retrieveMono(String.class);
    }
    
    // Connection with setup
    public Mono<RSocketRequester> connectWithSetup() {
        return requesterBuilder
            .setupRoute("client.setup")
            .setupData("client-id-123")
            .tcp("localhost", 9898);
    }
}

RSocket Requester Interface

Client interface for making RSocket requests with various interaction patterns.

/**
 * RSocket client requester for making requests to RSocket servers
 */
interface RSocketRequester {
    
    /** Create request spec with routing metadata */
    RequestSpec route(String route, Object... routeVars);
    
    /** Create request spec with custom metadata */
    RequestSpec metadata(Object metadata, MimeType mimeType);
    
    /** Get RSocket instance for low-level operations */
    RSocket rsocket();
    
    /** Get configured data MIME type */
    MimeType dataMimeType();
    
    /** Get configured metadata MIME type */ 
    MimeType metadataMimeType();
    
    /** Close the connection */
    void dispose();
    
    /** Check if connection is disposed */
    boolean isDisposed();
}

/**
 * Request specification for configuring RSocket requests
 */
interface RequestSpec {
    
    /** Set request data payload */
    RequestSpec data(Object data);
    
    /** Add metadata to request */
    RequestSpec metadata(Object metadata, MimeType mimeType);
    
    /** Fire-and-forget interaction pattern */
    Mono<Void> send();
    
    /** Request-response interaction pattern */
    <T> Mono<T> retrieveMono(Class<T> dataType);
    <T> Mono<T> retrieveMono(ParameterizedTypeReference<T> dataTypeRef);
    
    /** Request-stream interaction pattern */
    <T> Flux<T> retrieveFlux(Class<T> dataType);  
    <T> Flux<T> retrieveFlux(ParameterizedTypeReference<T> dataTypeRef);
}

Interaction Pattern Examples:

@Service
public class RSocketInteractionService {
    
    private final Mono<RSocketRequester> requesterMono;
    
    public RSocketInteractionService(RSocketRequester.Builder builder) {
        this.requesterMono = builder.tcp("localhost", 9898).cache();
    }
    
    // Fire-and-forget
    public Mono<Void> sendNotification(String message) {
        return requesterMono.flatMap(requester ->
            requester.route("notification")
                .data(message)
                .send()
        );
    }
    
    // Request-response
    public Mono<User> getUser(String userId) {
        return requesterMono.flatMap(requester ->
            requester.route("user.get")
                .data(userId)
                .retrieveMono(User.class)
        );
    }
    
    // Request-stream
    public Flux<StockPrice> streamPrices(String symbol) {
        return requesterMono.flatMapMany(requester ->
            requester.route("stock.stream")
                .data(symbol)
                .retrieveFlux(StockPrice.class)
        );
    }
    
    // Request-channel (bidirectional streaming)
    public Flux<ChatMessage> chat(Flux<ChatMessage> outbound) {
        return requesterMono.flatMapMany(requester ->
            requester.route("chat")
                .data(outbound)
                .retrieveFlux(ChatMessage.class)
        );
    }
}

Connector Configuration

Customize RSocket connector for advanced client configuration.

/**
 * Configurer for RSocket connectors
 */
@FunctionalInterface
interface RSocketConnectorConfigurer {
    
    /**
     * Configure the RSocket connector
     * @param connector Connector builder to customize
     */
    void configure(RSocketConnector.RSocketConnectorBuilder connector);
}

Advanced Configuration Examples:

@Configuration
public class RSocketClientConfiguration {
    
    // Connection timeout and retry configuration
    @Bean
    public RSocketConnectorConfigurer connectionConfigurer() {
        return connector -> connector
            .connectTimeout(Duration.ofSeconds(30))
            .reconnect(Retry.backoff(3, Duration.ofSeconds(1)));
    }
    
    // Resume capability configuration  
    @Bean
    public RSocketConnectorConfigurer resumeConfigurer() {
        return connector -> connector
            .resume(Resume.create()
                .sessionDuration(Duration.ofMinutes(5))
                .retry(Retry.fixedDelay(3, Duration.ofSeconds(1))));
    }
    
    // Lease configuration
    @Bean  
    public RSocketConnectorConfigurer leaseConfigurer() {
        return connector -> connector
            .lease(Leases::create);
    }
    
    // Interceptor configuration
    @Bean
    public RSocketConnectorConfigurer interceptorConfigurer() {
        return connector -> connector
            .interceptors(registry -> {
                registry.forConnection(new LoggingConnectionInterceptor());
                registry.forRequester(new MetricsRequesterInterceptor());
            });
    }
}

Transport Configuration

Support for different transport protocols and configurations.

// TCP Transport
ClientTransport tcpTransport = TcpClientTransport.create("localhost", 9898);

// WebSocket Transport  
ClientTransport wsTransport = WebsocketClientTransport.create(URI.create("ws://localhost:8080/rsocket"));

// Custom transport usage
Mono<RSocketRequester> requester = builder.transport(tcpTransport);

SSL/TLS Transport Example:

@Configuration
public class SecureClientConfiguration {
    
    @Bean
    public RSocketRequester.Builder secureRequesterBuilder(RSocketStrategies strategies) {
        SslContext sslContext = SslContextBuilder.forClient()
            .trustManager(InsecureTrustManagerFactory.INSTANCE)
            .build();
            
        TcpClient tcpClient = TcpClient.create()
            .host("secure-server.example.com")
            .port(9443)
            .secure(sslSpec -> sslSpec.sslContext(sslContext));
            
        ClientTransport transport = TcpClientTransport.create(tcpClient);
        
        return RSocketRequester.builder()
            .rsocketStrategies(strategies)
            .transport(transport);
    }
}

Connection Pooling and Management

Efficient connection management for high-throughput applications.

@Service
public class PooledRSocketService {
    
    private final Mono<RSocketRequester> sharedRequester;
    private final LoadBalancer<RSocketRequester> loadBalancer;
    
    public PooledRSocketService(RSocketRequester.Builder builder) {
        // Shared connection with connection pooling
        this.sharedRequester = builder
            .tcp("localhost", 9898)
            .cache();
            
        // Load balancer for multiple servers
        this.loadBalancer = LoadBalancer.create(
            Flux.just("server1:9898", "server2:9898", "server3:9898")
                .map(address -> {
                    String[] parts = address.split(":");
                    return builder.tcp(parts[0], Integer.parseInt(parts[1]));
                })
        );
    }
    
    public <T> Mono<T> makeRequest(String route, Object data, Class<T> responseType) {
        return sharedRequester.flatMap(requester ->
            requester.route(route)
                .data(data)
                .retrieveMono(responseType)
        );
    }
    
    public <T> Mono<T> makeLoadBalancedRequest(String route, Object data, Class<T> responseType) {
        return loadBalancer.next().flatMap(requester ->
            requester.route(route)
                .data(data)
                .retrieveMono(responseType)
        );
    }
}

Error Handling and Retry

Robust error handling patterns for RSocket clients.

@Service  
public class ResilientRSocketService {
    
    private final Mono<RSocketRequester> requesterMono;
    
    public ResilientRSocketService(RSocketRequester.Builder builder) {
        this.requesterMono = builder
            .tcp("localhost", 9898)
            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
                .filter(throwable -> throwable instanceof ConnectException))
            .cache();
    }
    
    public <T> Mono<T> resilientRequest(String route, Object data, Class<T> responseType) {
        return requesterMono.flatMap(requester ->
            requester.route(route)
                .data(data)
                .retrieveMono(responseType)
                .retryWhen(Retry.backoff(2, Duration.ofMillis(500))
                    .filter(throwable -> !(throwable instanceof ApplicationException)))
                .timeout(Duration.ofSeconds(10))
                .onErrorResume(TimeoutException.class, 
                    ex -> Mono.error(new ServiceUnavailableException("Request timeout")))
                .onErrorResume(RSocketException.class,
                    ex -> Mono.error(new CommunicationException("RSocket error", ex)))
        );
    }
}

Configuration Properties

Client-side configuration through application properties:

# No specific client properties - configuration done through code
# Server connection details configured per requester builder

# Custom MIME types can be set globally  
spring:
  rsocket:
    # These apply to both client and server
    strategies:
      data-mime-type: application/cbor
      metadata-mime-type: message/x.rsocket.routing.v0

Testing Support

Testing utilities for RSocket clients:

@SpringBootTest
class RSocketClientTest {
    
    @Autowired
    private RSocketRequester.Builder requesterBuilder;
    
    @Test
    void testClientConnection() {
        StepVerifier.create(
            requesterBuilder
                .tcp("localhost", 9898)
                .route("test")
                .data("ping")
                .retrieveMono(String.class)
        )
        .expectNext("pong")
        .verifyComplete();
    }
    
    @Test
    void testWithMockServer() {
        RSocketRequester requester = requesterBuilder
            .transport(LocalClientTransport.create("test-server"))
            .block();
            
        StepVerifier.create(
            requester.route("mock.test")
                .data("test-data")
                .retrieveMono(String.class)
        )
        .expectNext("mock-response")
        .verifyComplete();
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-springframework-boot--spring-boot-starter-rsocket

docs

auto-configuration.md

client-configuration.md

codec-configuration.md

index.md

message-handling.md

server-configuration.md

tile.json