0
# Client Configuration
1
2
RSocket client configuration with RSocketRequester builder for establishing connections and making requests to RSocket servers.
3
4
## Capabilities
5
6
### RSocket Requester Builder
7
8
Prototype-scoped builder for creating RSocket client connections with customized configuration.
9
10
```java { .api }
11
/**
12
* Builder for creating RSocket requesters (client connections)
13
* Provided as prototype bean - new instance per injection point
14
*/
15
interface RSocketRequester.Builder {
16
17
/** Configure RSocket strategies (codecs, routing) */
18
RSocketRequester.Builder rsocketStrategies(RSocketStrategies strategies);
19
20
/** Configure RSocket connector */
21
RSocketRequester.Builder rsocketConnector(Consumer<RSocketConnector.RSocketConnectorBuilder> configurer);
22
23
/** Configure data MIME type */
24
RSocketRequester.Builder dataMimeType(MimeType mimeType);
25
26
/** Configure metadata MIME type */
27
RSocketRequester.Builder metadataMimeType(MimeType mimeType);
28
29
/** Set setup route for connection */
30
RSocketRequester.Builder setupRoute(String route);
31
32
/** Set setup data */
33
RSocketRequester.Builder setupData(Object data);
34
35
/** Set setup metadata */
36
RSocketRequester.Builder setupMetadata(Object metadata, MimeType mimeType);
37
38
/** Connect via TCP transport */
39
Mono<RSocketRequester> tcp(String host, int port);
40
41
/** Connect via WebSocket transport */
42
Mono<RSocketRequester> websocket(URI uri);
43
44
/** Connect with custom transport */
45
Mono<RSocketRequester> transport(ClientTransport transport);
46
}
47
```
48
49
**Basic Usage Examples:**
50
51
```java
52
@Service
53
public class RSocketClientService {
54
55
private final RSocketRequester.Builder requesterBuilder;
56
57
public RSocketClientService(RSocketRequester.Builder requesterBuilder) {
58
this.requesterBuilder = requesterBuilder;
59
}
60
61
// TCP connection
62
public Mono<String> connectTcp() {
63
return requesterBuilder
64
.tcp("localhost", 9898)
65
.route("hello")
66
.data("World")
67
.retrieveMono(String.class);
68
}
69
70
// WebSocket connection
71
public Mono<String> connectWebSocket() {
72
return requesterBuilder
73
.websocket(URI.create("ws://localhost:8080/rsocket"))
74
.route("greeting")
75
.data("Client")
76
.retrieveMono(String.class);
77
}
78
79
// Connection with setup
80
public Mono<RSocketRequester> connectWithSetup() {
81
return requesterBuilder
82
.setupRoute("client.setup")
83
.setupData("client-id-123")
84
.tcp("localhost", 9898);
85
}
86
}
87
```
88
89
### RSocket Requester Interface
90
91
Client interface for making RSocket requests with various interaction patterns.
92
93
```java { .api }
94
/**
95
* RSocket client requester for making requests to RSocket servers
96
*/
97
interface RSocketRequester {
98
99
/** Create request spec with routing metadata */
100
RequestSpec route(String route, Object... routeVars);
101
102
/** Create request spec with custom metadata */
103
RequestSpec metadata(Object metadata, MimeType mimeType);
104
105
/** Get RSocket instance for low-level operations */
106
RSocket rsocket();
107
108
/** Get configured data MIME type */
109
MimeType dataMimeType();
110
111
/** Get configured metadata MIME type */
112
MimeType metadataMimeType();
113
114
/** Close the connection */
115
void dispose();
116
117
/** Check if connection is disposed */
118
boolean isDisposed();
119
}
120
121
/**
122
* Request specification for configuring RSocket requests
123
*/
124
interface RequestSpec {
125
126
/** Set request data payload */
127
RequestSpec data(Object data);
128
129
/** Add metadata to request */
130
RequestSpec metadata(Object metadata, MimeType mimeType);
131
132
/** Fire-and-forget interaction pattern */
133
Mono<Void> send();
134
135
/** Request-response interaction pattern */
136
<T> Mono<T> retrieveMono(Class<T> dataType);
137
<T> Mono<T> retrieveMono(ParameterizedTypeReference<T> dataTypeRef);
138
139
/** Request-stream interaction pattern */
140
<T> Flux<T> retrieveFlux(Class<T> dataType);
141
<T> Flux<T> retrieveFlux(ParameterizedTypeReference<T> dataTypeRef);
142
}
143
```
144
145
**Interaction Pattern Examples:**
146
147
```java
148
@Service
149
public class RSocketInteractionService {
150
151
private final Mono<RSocketRequester> requesterMono;
152
153
public RSocketInteractionService(RSocketRequester.Builder builder) {
154
this.requesterMono = builder.tcp("localhost", 9898).cache();
155
}
156
157
// Fire-and-forget
158
public Mono<Void> sendNotification(String message) {
159
return requesterMono.flatMap(requester ->
160
requester.route("notification")
161
.data(message)
162
.send()
163
);
164
}
165
166
// Request-response
167
public Mono<User> getUser(String userId) {
168
return requesterMono.flatMap(requester ->
169
requester.route("user.get")
170
.data(userId)
171
.retrieveMono(User.class)
172
);
173
}
174
175
// Request-stream
176
public Flux<StockPrice> streamPrices(String symbol) {
177
return requesterMono.flatMapMany(requester ->
178
requester.route("stock.stream")
179
.data(symbol)
180
.retrieveFlux(StockPrice.class)
181
);
182
}
183
184
// Request-channel (bidirectional streaming)
185
public Flux<ChatMessage> chat(Flux<ChatMessage> outbound) {
186
return requesterMono.flatMapMany(requester ->
187
requester.route("chat")
188
.data(outbound)
189
.retrieveFlux(ChatMessage.class)
190
);
191
}
192
}
193
```
194
195
### Connector Configuration
196
197
Customize RSocket connector for advanced client configuration.
198
199
```java { .api }
200
/**
201
* Configurer for RSocket connectors
202
*/
203
@FunctionalInterface
204
interface RSocketConnectorConfigurer {
205
206
/**
207
* Configure the RSocket connector
208
* @param connector Connector builder to customize
209
*/
210
void configure(RSocketConnector.RSocketConnectorBuilder connector);
211
}
212
```
213
214
**Advanced Configuration Examples:**
215
216
```java
217
@Configuration
218
public class RSocketClientConfiguration {
219
220
// Connection timeout and retry configuration
221
@Bean
222
public RSocketConnectorConfigurer connectionConfigurer() {
223
return connector -> connector
224
.connectTimeout(Duration.ofSeconds(30))
225
.reconnect(Retry.backoff(3, Duration.ofSeconds(1)));
226
}
227
228
// Resume capability configuration
229
@Bean
230
public RSocketConnectorConfigurer resumeConfigurer() {
231
return connector -> connector
232
.resume(Resume.create()
233
.sessionDuration(Duration.ofMinutes(5))
234
.retry(Retry.fixedDelay(3, Duration.ofSeconds(1))));
235
}
236
237
// Lease configuration
238
@Bean
239
public RSocketConnectorConfigurer leaseConfigurer() {
240
return connector -> connector
241
.lease(Leases::create);
242
}
243
244
// Interceptor configuration
245
@Bean
246
public RSocketConnectorConfigurer interceptorConfigurer() {
247
return connector -> connector
248
.interceptors(registry -> {
249
registry.forConnection(new LoggingConnectionInterceptor());
250
registry.forRequester(new MetricsRequesterInterceptor());
251
});
252
}
253
}
254
```
255
256
### Transport Configuration
257
258
Support for different transport protocols and configurations.
259
260
```java { .api }
261
// TCP Transport
262
ClientTransport tcpTransport = TcpClientTransport.create("localhost", 9898);
263
264
// WebSocket Transport
265
ClientTransport wsTransport = WebsocketClientTransport.create(URI.create("ws://localhost:8080/rsocket"));
266
267
// Custom transport usage
268
Mono<RSocketRequester> requester = builder.transport(tcpTransport);
269
```
270
271
**SSL/TLS Transport Example:**
272
273
```java
274
@Configuration
275
public class SecureClientConfiguration {
276
277
@Bean
278
public RSocketRequester.Builder secureRequesterBuilder(RSocketStrategies strategies) {
279
SslContext sslContext = SslContextBuilder.forClient()
280
.trustManager(InsecureTrustManagerFactory.INSTANCE)
281
.build();
282
283
TcpClient tcpClient = TcpClient.create()
284
.host("secure-server.example.com")
285
.port(9443)
286
.secure(sslSpec -> sslSpec.sslContext(sslContext));
287
288
ClientTransport transport = TcpClientTransport.create(tcpClient);
289
290
return RSocketRequester.builder()
291
.rsocketStrategies(strategies)
292
.transport(transport);
293
}
294
}
295
```
296
297
### Connection Pooling and Management
298
299
Efficient connection management for high-throughput applications.
300
301
```java
302
@Service
303
public class PooledRSocketService {
304
305
private final Mono<RSocketRequester> sharedRequester;
306
private final LoadBalancer<RSocketRequester> loadBalancer;
307
308
public PooledRSocketService(RSocketRequester.Builder builder) {
309
// Shared connection with connection pooling
310
this.sharedRequester = builder
311
.tcp("localhost", 9898)
312
.cache();
313
314
// Load balancer for multiple servers
315
this.loadBalancer = LoadBalancer.create(
316
Flux.just("server1:9898", "server2:9898", "server3:9898")
317
.map(address -> {
318
String[] parts = address.split(":");
319
return builder.tcp(parts[0], Integer.parseInt(parts[1]));
320
})
321
);
322
}
323
324
public <T> Mono<T> makeRequest(String route, Object data, Class<T> responseType) {
325
return sharedRequester.flatMap(requester ->
326
requester.route(route)
327
.data(data)
328
.retrieveMono(responseType)
329
);
330
}
331
332
public <T> Mono<T> makeLoadBalancedRequest(String route, Object data, Class<T> responseType) {
333
return loadBalancer.next().flatMap(requester ->
334
requester.route(route)
335
.data(data)
336
.retrieveMono(responseType)
337
);
338
}
339
}
340
```
341
342
### Error Handling and Retry
343
344
Robust error handling patterns for RSocket clients.
345
346
```java
347
@Service
348
public class ResilientRSocketService {
349
350
private final Mono<RSocketRequester> requesterMono;
351
352
public ResilientRSocketService(RSocketRequester.Builder builder) {
353
this.requesterMono = builder
354
.tcp("localhost", 9898)
355
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
356
.filter(throwable -> throwable instanceof ConnectException))
357
.cache();
358
}
359
360
public <T> Mono<T> resilientRequest(String route, Object data, Class<T> responseType) {
361
return requesterMono.flatMap(requester ->
362
requester.route(route)
363
.data(data)
364
.retrieveMono(responseType)
365
.retryWhen(Retry.backoff(2, Duration.ofMillis(500))
366
.filter(throwable -> !(throwable instanceof ApplicationException)))
367
.timeout(Duration.ofSeconds(10))
368
.onErrorResume(TimeoutException.class,
369
ex -> Mono.error(new ServiceUnavailableException("Request timeout")))
370
.onErrorResume(RSocketException.class,
371
ex -> Mono.error(new CommunicationException("RSocket error", ex)))
372
);
373
}
374
}
375
```
376
377
## Configuration Properties
378
379
Client-side configuration through application properties:
380
381
```yaml
382
# No specific client properties - configuration done through code
383
# Server connection details configured per requester builder
384
385
# Custom MIME types can be set globally
386
spring:
387
rsocket:
388
# These apply to both client and server
389
strategies:
390
data-mime-type: application/cbor
391
metadata-mime-type: message/x.rsocket.routing.v0
392
```
393
394
## Testing Support
395
396
Testing utilities for RSocket clients:
397
398
```java
399
@SpringBootTest
400
class RSocketClientTest {
401
402
@Autowired
403
private RSocketRequester.Builder requesterBuilder;
404
405
@Test
406
void testClientConnection() {
407
StepVerifier.create(
408
requesterBuilder
409
.tcp("localhost", 9898)
410
.route("test")
411
.data("ping")
412
.retrieveMono(String.class)
413
)
414
.expectNext("pong")
415
.verifyComplete();
416
}
417
418
@Test
419
void testWithMockServer() {
420
RSocketRequester requester = requesterBuilder
421
.transport(LocalClientTransport.create("test-server"))
422
.block();
423
424
StepVerifier.create(
425
requester.route("mock.test")
426
.data("test-data")
427
.retrieveMono(String.class)
428
)
429
.expectNext("mock-response")
430
.verifyComplete();
431
}
432
}
433
```