0
# Message Handling
1
2
Spring messaging integration enabling @MessageMapping annotations and bidirectional communication patterns in RSocket controllers.
3
4
## Capabilities
5
6
### RSocket Message Handler
7
8
Core message handler that processes @MessageMapping annotations and integrates with Spring's messaging infrastructure.
9
10
```java { .api }
11
/**
12
* Message handler for RSocket @MessageMapping support
13
*/
14
class RSocketMessageHandler extends MessagingMethodMessageHandler {
15
16
/** Set RSocket strategies for codec configuration */
17
void setRSocketStrategies(RSocketStrategies strategies);
18
19
/** Get configured RSocket strategies */
20
RSocketStrategies getRSocketStrategies();
21
22
/** Set default data MIME type */
23
void setDefaultDataMimeType(MimeType defaultDataMimeType);
24
25
/** Set default metadata MIME type */
26
void setDefaultMetadataMimeType(MimeType defaultMetadataMimeType);
27
28
/** Configure routing matcher for message routing */
29
void setRouteMatcher(RouteMatcher routeMatcher);
30
31
/** Create socket acceptor for server integration */
32
SocketAcceptor responder();
33
}
34
```
35
36
### Message Mapping Annotations
37
38
Core annotations for defining RSocket message handlers.
39
40
```java { .api }
41
/**
42
* Marks a method as an RSocket message handler
43
*/
44
@Target(ElementType.METHOD)
45
@Retention(RetentionPolicy.RUNTIME)
46
@interface MessageMapping {
47
/** Route patterns this handler responds to */
48
String[] value() default {};
49
}
50
51
/**
52
* Extracts values from route variables
53
*/
54
@Target(ElementType.PARAMETER)
55
@Retention(RetentionPolicy.RUNTIME)
56
@interface DestinationVariable {
57
/** Variable name to extract */
58
String value() default "";
59
}
60
61
/**
62
* Accesses message headers/metadata
63
*/
64
@Target(ElementType.PARAMETER)
65
@Retention(RetentionPolicy.RUNTIME)
66
@interface Header {
67
/** Header name to access */
68
String value() default "";
69
/** Whether header is required */
70
boolean required() default true;
71
/** Default value if header missing */
72
String defaultValue() default ValueConstants.DEFAULT_NONE;
73
}
74
75
/**
76
* Accesses all message headers
77
*/
78
@Target(ElementType.PARAMETER)
79
@Retention(RetentionPolicy.RUNTIME)
80
@interface Headers {
81
}
82
```
83
84
### Controller Implementation Patterns
85
86
Different patterns for implementing RSocket message handlers.
87
88
```java { .api }
89
/**
90
* Basic RSocket controller with message mappings
91
*/
92
@Controller
93
public class RSocketController {
94
95
// Request-Response pattern
96
@MessageMapping("user.get")
97
public Mono<User> getUser(String userId) {
98
return userService.findById(userId);
99
}
100
101
// Fire-and-Forget pattern
102
@MessageMapping("user.update")
103
public Mono<Void> updateUser(User user) {
104
return userService.save(user).then();
105
}
106
107
// Request-Stream pattern
108
@MessageMapping("notifications.stream")
109
public Flux<Notification> streamNotifications(String userId) {
110
return notificationService.streamForUser(userId);
111
}
112
113
// Request-Channel pattern (bidirectional streaming)
114
@MessageMapping("chat")
115
public Flux<ChatMessage> handleChat(Flux<ChatMessage> inbound) {
116
return chatService.handleConversation(inbound);
117
}
118
}
119
```
120
121
**Advanced Controller Examples:**
122
123
```java
124
@Controller
125
public class AdvancedRSocketController {
126
127
// Route variables and headers
128
@MessageMapping("user.{id}.profile")
129
public Mono<Profile> getUserProfile(
130
@DestinationVariable String id,
131
@Header("user-role") String userRole,
132
@Headers Map<String, Object> headers
133
) {
134
return profileService.getProfile(id, userRole, headers);
135
}
136
137
// Complex data types
138
@MessageMapping("search")
139
public Flux<SearchResult> search(SearchRequest request) {
140
return searchService.search(request);
141
}
142
143
// Exception handling
144
@MessageMapping("risky-operation")
145
public Mono<String> riskyOperation(String input) {
146
return processService.process(input)
147
.onErrorResume(ValidationException.class,
148
ex -> Mono.error(new RSocketException(0x201, ex.getMessage())));
149
}
150
151
// Requester injection for bidirectional communication
152
@MessageMapping("interactive")
153
public Mono<Void> interactive(String data, RSocketRequester requester) {
154
return requester.route("callback")
155
.data("processed: " + data)
156
.send();
157
}
158
}
159
```
160
161
### Message Handler Customization
162
163
Customize message handler behavior through the customizer interface.
164
165
```java { .api }
166
/**
167
* Callback interface for customizing RSocketMessageHandler
168
*/
169
@FunctionalInterface
170
interface RSocketMessageHandlerCustomizer {
171
172
/**
173
* Customize the message handler
174
* @param messageHandler Handler to customize
175
*/
176
void customize(RSocketMessageHandler messageHandler);
177
}
178
```
179
180
**Customization Examples:**
181
182
```java
183
@Configuration
184
public class MessageHandlerConfiguration {
185
186
// Configure default MIME types
187
@Bean
188
public RSocketMessageHandlerCustomizer mimeTypeCustomizer() {
189
return messageHandler -> {
190
messageHandler.setDefaultDataMimeType(MimeTypeUtils.APPLICATION_CBOR);
191
messageHandler.setDefaultMetadataMimeType(MimeTypeUtils.parseMediaType("message/x.rsocket.routing.v0"));
192
};
193
}
194
195
// Configure routing
196
@Bean
197
public RSocketMessageHandlerCustomizer routingCustomizer() {
198
return messageHandler -> {
199
PathPatternRouteMatcher matcher = new PathPatternRouteMatcher();
200
matcher.setCaseSensitive(false);
201
messageHandler.setRouteMatcher(matcher);
202
};
203
}
204
205
// Add argument resolvers
206
@Bean
207
public RSocketMessageHandlerCustomizer argumentResolverCustomizer() {
208
return messageHandler -> {
209
messageHandler.getArgumentResolverConfigurer()
210
.addCustomResolver(new CustomArgumentResolver());
211
};
212
}
213
214
// Configure encoding
215
@Bean
216
public RSocketMessageHandlerCustomizer encodingCustomizer() {
217
return messageHandler -> {
218
messageHandler.setEncoderConfigurer(configurer -> {
219
configurer.defaultCodecs().maxInMemorySize(1024 * 1024); // 1MB
220
});
221
};
222
}
223
}
224
```
225
226
### Error Handling
227
228
Comprehensive error handling patterns for RSocket message handlers.
229
230
```java
231
@Controller
232
@MessageExceptionHandler
233
public class RSocketErrorController {
234
235
// Global error handler
236
@MessageExceptionHandler
237
public Mono<ErrorResponse> handleValidationError(ValidationException ex) {
238
return Mono.just(new ErrorResponse("VALIDATION_ERROR", ex.getMessage()));
239
}
240
241
// Specific error handler
242
@MessageExceptionHandler(DataIntegrityViolationException.class)
243
public Mono<ErrorResponse> handleDataError(DataIntegrityViolationException ex) {
244
return Mono.just(new ErrorResponse("DATA_ERROR", "Data constraint violation"));
245
}
246
247
// RSocket-specific error handler
248
@MessageExceptionHandler(RSocketException.class)
249
public Mono<Void> handleRSocketError(RSocketException ex) {
250
log.error("RSocket error: code={}, message={}", ex.errorCode(), ex.getMessage());
251
return Mono.empty();
252
}
253
}
254
255
// Custom error response
256
class ErrorResponse {
257
private final String code;
258
private final String message;
259
private final Instant timestamp;
260
261
public ErrorResponse(String code, String message) {
262
this.code = code;
263
this.message = message;
264
this.timestamp = Instant.now();
265
}
266
267
// getters...
268
}
269
```
270
271
### Security Integration
272
273
Security patterns for RSocket message handlers.
274
275
```java
276
@Controller
277
@PreAuthorize("hasRole('USER')")
278
public class SecureRSocketController {
279
280
// Method-level security
281
@MessageMapping("admin.users")
282
@PreAuthorize("hasRole('ADMIN')")
283
public Flux<User> getAllUsers() {
284
return userService.findAll();
285
}
286
287
// Principal injection
288
@MessageMapping("profile")
289
public Mono<Profile> getCurrentProfile(Principal principal) {
290
return profileService.getProfile(principal.getName());
291
}
292
293
// Authentication context
294
@MessageMapping("secure-data")
295
public Mono<SecureData> getSecureData() {
296
return ReactiveSecurityContextHolder.getContext()
297
.map(SecurityContext::getAuthentication)
298
.cast(JwtAuthenticationToken.class)
299
.flatMap(auth -> secureDataService.getData(auth.getToken()));
300
}
301
}
302
```
303
304
### Metadata and Headers
305
306
Working with RSocket metadata and custom headers.
307
308
```java
309
@Controller
310
public class MetadataController {
311
312
// Composite metadata handling
313
@MessageMapping("metadata-example")
314
public Mono<String> handleMetadata(
315
String data,
316
@Header("custom-header") String customValue,
317
@Header(value = "optional-header", required = false) Optional<String> optionalValue
318
) {
319
return Mono.just("Processed: " + data + " with " + customValue);
320
}
321
322
// Manual metadata extraction
323
@MessageMapping("raw-metadata")
324
public Mono<String> handleRawMetadata(String data, @Headers Map<String, Object> headers) {
325
String traceId = (String) headers.get("trace-id");
326
String userId = (String) headers.get("user-id");
327
328
return processWithContext(data, traceId, userId);
329
}
330
331
// Setting response metadata
332
@MessageMapping("with-response-metadata")
333
public Mono<ResponseEntity<String>> responseWithMetadata(String data) {
334
return Mono.just(
335
ResponseEntity.ok()
336
.header("response-time", Instant.now().toString())
337
.header("server-id", "server-1")
338
.body("Processed: " + data)
339
);
340
}
341
}
342
```
343
344
### Testing Message Handlers
345
346
Testing patterns for RSocket message handlers.
347
348
```java
349
@SpringBootTest
350
class RSocketMessageHandlerTest {
351
352
@Autowired
353
private RSocketRequester.Builder requesterBuilder;
354
355
@LocalServerPort
356
private int port;
357
358
private RSocketRequester requester;
359
360
@BeforeEach
361
void setup() {
362
requester = requesterBuilder
363
.tcp("localhost", port)
364
.block();
365
}
366
367
@Test
368
void testRequestResponse() {
369
StepVerifier.create(
370
requester.route("user.get")
371
.data("123")
372
.retrieveMono(User.class)
373
)
374
.assertNext(user -> {
375
assertThat(user.getId()).isEqualTo("123");
376
assertThat(user.getName()).isNotBlank();
377
})
378
.verifyComplete();
379
}
380
381
@Test
382
void testRequestStream() {
383
StepVerifier.create(
384
requester.route("notifications.stream")
385
.data("user123")
386
.retrieveFlux(Notification.class)
387
.take(3)
388
)
389
.expectNextCount(3)
390
.verifyComplete();
391
}
392
393
@Test
394
void testErrorHandling() {
395
StepVerifier.create(
396
requester.route("invalid-operation")
397
.data("bad-data")
398
.retrieveMono(String.class)
399
)
400
.expectError(RSocketException.class)
401
.verify();
402
}
403
404
@AfterEach
405
void cleanup() {
406
if (requester != null) {
407
requester.dispose();
408
}
409
}
410
}
411
```
412
413
## Interaction Patterns
414
415
### Request-Response
416
Single request, single response pattern.
417
418
```java
419
@MessageMapping("echo")
420
public Mono<String> echo(String message) {
421
return Mono.just("Echo: " + message);
422
}
423
```
424
425
### Fire-and-Forget
426
Send data without expecting a response.
427
428
```java
429
@MessageMapping("log")
430
public Mono<Void> logMessage(LogEntry entry) {
431
return logService.save(entry).then();
432
}
433
```
434
435
### Request-Stream
436
Single request, stream of responses.
437
438
```java
439
@MessageMapping("events")
440
public Flux<Event> streamEvents(String topic) {
441
return eventService.streamEvents(topic);
442
}
443
```
444
445
### Request-Channel
446
Bidirectional streaming of data.
447
448
```java
449
@MessageMapping("process-stream")
450
public Flux<ProcessedData> processStream(Flux<RawData> dataStream) {
451
return dataStream
452
.map(this::process)
453
.filter(Objects::nonNull);
454
}
455
```