0
# RSocket Integration
1
2
RSocket protocol support for reactive, binary messaging with backpressure handling, request-response patterns, and reactive streams integration.
3
4
## Capabilities
5
6
### RSocket Requester Interface
7
8
Main interface for RSocket client interactions with fluent API design.
9
10
```java { .api }
11
/**
12
* A contract for executing RSocket requests.
13
*/
14
public interface RSocketRequester {
15
16
/**
17
* Begin to specify a new request with the route to a handler method.
18
*/
19
RequestSpec route(String route, Object... routeVars);
20
21
/**
22
* Begin to specify a new request with a formatted route.
23
*/
24
RequestSpec route(String route);
25
26
/**
27
* Access to the underlying RSocket for more advanced scenarios.
28
*/
29
RSocket rsocket();
30
31
/**
32
* Return the data MimeType selected for the underlying RSocket at connection time.
33
*/
34
MimeType dataMimeType();
35
36
/**
37
* Return the metadata MimeType selected for the underlying RSocket at connection time.
38
*/
39
MimeType metadataMimeType();
40
41
/**
42
* Specification for a request including the route and data.
43
*/
44
interface RequestSpec {
45
46
/**
47
* Provide request data.
48
*/
49
ResponseSpec data(Object data);
50
51
/**
52
* Provide request data with a specific MimeType.
53
*/
54
ResponseSpec data(Object data, MimeType mimeType);
55
56
/**
57
* Provide request data as Publisher.
58
*/
59
ResponseSpec data(Publisher<?> data, Class<?> dataType);
60
61
/**
62
* Provide request data as Publisher with MimeType.
63
*/
64
ResponseSpec data(Publisher<?> data, ParameterizedTypeReference<?> dataTypeRef);
65
66
/**
67
* Add metadata.
68
*/
69
RequestSpec metadata(Object metadata, MimeType mimeType);
70
71
/**
72
* Proceed without data for requests that don't need data.
73
*/
74
ResponseSpec noData();
75
}
76
77
/**
78
* Specification for the expected RSocket response.
79
*/
80
interface ResponseSpec {
81
82
/**
83
* Perform a Fire-and-forget interaction model.
84
*/
85
Mono<Void> send();
86
87
/**
88
* Perform a Request-Response interaction model.
89
*/
90
<T> Mono<T> retrieveMono(Class<T> dataType);
91
92
/**
93
* Perform a Request-Response interaction model with parameterized type.
94
*/
95
<T> Mono<T> retrieveMono(ParameterizedTypeReference<T> dataTypeRef);
96
97
/**
98
* Perform a Request-Stream interaction model.
99
*/
100
<T> Flux<T> retrieveFlux(Class<T> dataType);
101
102
/**
103
* Perform a Request-Stream interaction model with parameterized type.
104
*/
105
<T> Flux<T> retrieveFlux(ParameterizedTypeReference<T> dataTypeRef);
106
}
107
}
108
```
109
110
### RSocket Strategies
111
112
Configuration interface for RSocket messaging strategies.
113
114
```java { .api }
115
/**
116
* Access to strategies for use by RSocket messaging.
117
*/
118
public interface RSocketStrategies {
119
120
/**
121
* Return the configured Encoder instances.
122
*/
123
List<Encoder<?>> encoders();
124
125
/**
126
* Return the configured Decoder instances.
127
*/
128
List<Decoder<?>> decoders();
129
130
/**
131
* Return the configured ReactiveAdapterRegistry.
132
*/
133
ReactiveAdapterRegistry reactiveAdapterRegistry();
134
135
/**
136
* Return a builder to create a new RSocketStrategies instance.
137
*/
138
static Builder builder() {
139
return new DefaultRSocketStrategiesBuilder();
140
}
141
142
/**
143
* Builder to create RSocketStrategies.
144
*/
145
interface Builder {
146
147
/**
148
* Append to the list of encoders to use for serializing Objects to the payload of RSocket frames.
149
*/
150
Builder encoder(Encoder<?>... encoder);
151
152
/**
153
* Append to the list of decoders to use for de-serializing the payload of RSocket frames.
154
*/
155
Builder decoder(Decoder<?>... decoder);
156
157
/**
158
* Configure the ReactiveAdapterRegistry to use.
159
*/
160
Builder reactiveAdapterStrategy(ReactiveAdapterRegistry registry);
161
162
/**
163
* Configure Jackson for JSON encoding and decoding.
164
*/
165
Builder jackson2JsonEncoder(Jackson2JsonEncoder encoder);
166
167
/**
168
* Configure Jackson for JSON encoding and decoding.
169
*/
170
Builder jackson2JsonDecoder(Jackson2JsonDecoder decoder);
171
172
/**
173
* Build the RSocketStrategies instance.
174
*/
175
RSocketStrategies build();
176
}
177
}
178
```
179
180
### Metadata Extraction
181
182
Interface and registry for extracting metadata from RSocket frames.
183
184
```java { .api }
185
/**
186
* Strategy to extract a map of values from MetadataAndPayload frame metadata.
187
*/
188
public interface MetadataExtractor {
189
190
/**
191
* Extract a map of values from the metadata of a MetadataAndPayload frame.
192
*/
193
Map<String, Object> extract(Payload payload, MimeType metadataMimeType);
194
}
195
196
/**
197
* Registry of MetadataExtractor instances mapped by route.
198
*/
199
public interface MetadataExtractorRegistry {
200
201
/**
202
* Register a MetadataExtractor for a specific mime type.
203
*/
204
void metadataToExtract(MimeType mimeType, Class<?> targetType, String name);
205
206
/**
207
* Register a MetadataExtractor for a specific mime type with BiConsumer.
208
*/
209
void metadataToExtract(MimeType mimeType, Class<?> targetType, BiConsumer<Object, Map<String, Object>> consumer);
210
211
/**
212
* Register a custom MetadataExtractor for a mime type.
213
*/
214
void metadataToExtract(MimeType mimeType, MetadataExtractor extractor);
215
}
216
217
/**
218
* Default implementation of MetadataExtractor.
219
*/
220
public class DefaultMetadataExtractor implements MetadataExtractor, MetadataExtractorRegistry {
221
222
public DefaultMetadataExtractor();
223
224
public DefaultMetadataExtractor(Decoder<?>... decoders);
225
226
/**
227
* Configure the mime type for routing metadata.
228
*/
229
public void metadataToExtract(MimeType mimeType, Class<?> targetType, String name);
230
231
/**
232
* Configure the mime type for routing metadata with BiConsumer.
233
*/
234
public void metadataToExtract(MimeType mimeType, Class<?> targetType, BiConsumer<Object, Map<String, Object>> consumer);
235
}
236
```
237
238
### RSocket Message Handler
239
240
Main message handler for processing RSocket requests.
241
242
```java { .api }
243
/**
244
* MessageHandler for RSocket requests.
245
*/
246
public class RSocketMessageHandler implements MessageHandler, ApplicationContextAware, InitializingBean {
247
248
public RSocketMessageHandler();
249
250
/**
251
* Configure the RSocketStrategies to use for access to encoders, decoders, and a reactive adapter registry.
252
*/
253
public void setRSocketStrategies(RSocketStrategies rsocketStrategies);
254
255
/**
256
* Return the configured RSocketStrategies.
257
*/
258
public RSocketStrategies getRSocketStrategies();
259
260
/**
261
* Set the MetadataExtractor to extract route and other metadata.
262
*/
263
public void setMetadataExtractor(MetadataExtractor metadataExtractor);
264
265
/**
266
* Return the configured MetadataExtractor.
267
*/
268
public MetadataExtractor getMetadataExtractor();
269
270
/**
271
* Configure the registry for adapting various reactive types.
272
*/
273
public void setReactiveAdapterRegistry(ReactiveAdapterRegistry registry);
274
275
/**
276
* Return the configured ReactiveAdapterRegistry.
277
*/
278
public ReactiveAdapterRegistry getReactiveAdapterRegistry();
279
280
/**
281
* Set the HandlerMethodArgumentResolver instances to use.
282
*/
283
public void setArgumentResolvers(List<HandlerMethodArgumentResolver> resolvers);
284
285
/**
286
* Return the configured argument resolvers.
287
*/
288
public List<HandlerMethodArgumentResolver> getArgumentResolvers();
289
290
/**
291
* Set the HandlerMethodReturnValueHandler instances to use.
292
*/
293
public void setReturnValueHandlers(List<HandlerMethodReturnValueHandler> handlers);
294
295
/**
296
* Return the configured return value handlers.
297
*/
298
public List<HandlerMethodReturnValueHandler> getReturnValueHandlers();
299
}
300
```
301
302
### RSocket Request Mapping
303
304
Information about RSocket request mappings.
305
306
```java { .api }
307
/**
308
* Request mapping information for RSocket requests.
309
*/
310
public final class RSocketRequestMappingInfo implements RequestMappingInfo {
311
312
/**
313
* Return the mapping paths.
314
*/
315
@Override
316
public Set<String> getPatterns();
317
318
/**
319
* Return a new instance with the given mapping paths.
320
*/
321
public RSocketRequestMappingInfo paths(String... paths);
322
323
/**
324
* Builder for creating RSocketRequestMappingInfo instances.
325
*/
326
public static Builder paths(String... paths) {
327
return new Builder().paths(paths);
328
}
329
330
/**
331
* Builder class for RSocketRequestMappingInfo.
332
*/
333
public static class Builder {
334
335
/**
336
* Set the path mapping.
337
*/
338
public Builder paths(String... paths);
339
340
/**
341
* Build the RSocketRequestMappingInfo.
342
*/
343
public RSocketRequestMappingInfo build();
344
}
345
}
346
```
347
348
**Usage Examples:**
349
350
```java
351
import org.springframework.messaging.rsocket.RSocketRequester;
352
import org.springframework.messaging.rsocket.RSocketStrategies;
353
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
354
import org.springframework.messaging.handler.annotation.MessageMapping;
355
import org.springframework.messaging.handler.annotation.Payload;
356
import reactor.core.publisher.Flux;
357
import reactor.core.publisher.Mono;
358
359
// RSocket client setup
360
RSocketRequester requester = RSocketRequester.builder()
361
.rsocketStrategies(RSocketStrategies.builder()
362
.encoder(new Jackson2JsonEncoder())
363
.decoder(new Jackson2JsonDecoder())
364
.build())
365
.tcp("localhost", 7000);
366
367
// Request-Response interaction
368
Mono<String> response = requester
369
.route("greeting")
370
.data("Hello")
371
.retrieveMono(String.class);
372
373
// Request-Stream interaction
374
Flux<String> stream = requester
375
.route("greetings")
376
.data("Hello")
377
.retrieveFlux(String.class);
378
379
// Fire-and-Forget interaction
380
Mono<Void> fireAndForget = requester
381
.route("log")
382
.data("Important message")
383
.send();
384
385
// RSocket server handler
386
@Controller
387
public class RSocketController {
388
389
// Handle connection setup
390
@ConnectMapping("setup")
391
public void handleConnection(@Payload ConnectionSetupPayload setup) {
392
System.out.println("Client connected with setup: " + setup);
393
}
394
395
// Request-Response
396
@MessageMapping("greeting")
397
public Mono<String> greeting(@Payload String name) {
398
return Mono.just("Hello, " + name + "!");
399
}
400
401
// Request-Stream
402
@MessageMapping("greetings")
403
public Flux<String> greetings(@Payload String name) {
404
return Flux.interval(Duration.ofSeconds(1))
405
.map(i -> "Hello " + name + " #" + i);
406
}
407
408
// Fire-and-Forget
409
@MessageMapping("log")
410
public Mono<Void> log(@Payload String message) {
411
System.out.println("Log: " + message);
412
return Mono.empty();
413
}
414
415
// Request-Channel (bidirectional streaming)
416
@MessageMapping("chat")
417
public Flux<ChatMessage> chat(Flux<ChatMessage> messages) {
418
return messages
419
.doOnNext(msg -> System.out.println("Received: " + msg))
420
.map(msg -> new ChatMessage("Echo: " + msg.getContent()));
421
}
422
}
423
424
// Custom metadata extraction
425
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor();
426
extractor.metadataToExtract(MimeTypeUtils.APPLICATION_JSON, User.class, "user");
427
428
// RSocket strategies configuration
429
RSocketStrategies strategies = RSocketStrategies.builder()
430
.encoder(new Jackson2JsonEncoder())
431
.decoder(new Jackson2JsonDecoder())
432
.encoder(new Jackson2CborEncoder())
433
.decoder(new Jackson2CborDecoder())
434
.build();
435
```