Starter for building RSocket clients and servers with Spring Boot auto-configuration.
—
Spring messaging integration enabling @MessageMapping annotations and bidirectional communication patterns in RSocket controllers.
Core message handler that processes @MessageMapping annotations and integrates with Spring's messaging infrastructure.
/**
* Message handler for RSocket @MessageMapping support
*/
class RSocketMessageHandler extends MessagingMethodMessageHandler {
/** Set RSocket strategies for codec configuration */
void setRSocketStrategies(RSocketStrategies strategies);
/** Get configured RSocket strategies */
RSocketStrategies getRSocketStrategies();
/** Set default data MIME type */
void setDefaultDataMimeType(MimeType defaultDataMimeType);
/** Set default metadata MIME type */
void setDefaultMetadataMimeType(MimeType defaultMetadataMimeType);
/** Configure routing matcher for message routing */
void setRouteMatcher(RouteMatcher routeMatcher);
/** Create socket acceptor for server integration */
SocketAcceptor responder();
}Core annotations for defining RSocket message handlers.
/**
* Marks a method as an RSocket message handler
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@interface MessageMapping {
/** Route patterns this handler responds to */
String[] value() default {};
}
/**
* Extracts values from route variables
*/
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@interface DestinationVariable {
/** Variable name to extract */
String value() default "";
}
/**
* Accesses message headers/metadata
*/
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@interface Header {
/** Header name to access */
String value() default "";
/** Whether header is required */
boolean required() default true;
/** Default value if header missing */
String defaultValue() default ValueConstants.DEFAULT_NONE;
}
/**
* Accesses all message headers
*/
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@interface Headers {
}Different patterns for implementing RSocket message handlers.
/**
* Basic RSocket controller with message mappings
*/
@Controller
public class RSocketController {
// Request-Response pattern
@MessageMapping("user.get")
public Mono<User> getUser(String userId) {
return userService.findById(userId);
}
// Fire-and-Forget pattern
@MessageMapping("user.update")
public Mono<Void> updateUser(User user) {
return userService.save(user).then();
}
// Request-Stream pattern
@MessageMapping("notifications.stream")
public Flux<Notification> streamNotifications(String userId) {
return notificationService.streamForUser(userId);
}
// Request-Channel pattern (bidirectional streaming)
@MessageMapping("chat")
public Flux<ChatMessage> handleChat(Flux<ChatMessage> inbound) {
return chatService.handleConversation(inbound);
}
}Advanced Controller Examples:
@Controller
public class AdvancedRSocketController {
// Route variables and headers
@MessageMapping("user.{id}.profile")
public Mono<Profile> getUserProfile(
@DestinationVariable String id,
@Header("user-role") String userRole,
@Headers Map<String, Object> headers
) {
return profileService.getProfile(id, userRole, headers);
}
// Complex data types
@MessageMapping("search")
public Flux<SearchResult> search(SearchRequest request) {
return searchService.search(request);
}
// Exception handling
@MessageMapping("risky-operation")
public Mono<String> riskyOperation(String input) {
return processService.process(input)
.onErrorResume(ValidationException.class,
ex -> Mono.error(new RSocketException(0x201, ex.getMessage())));
}
// Requester injection for bidirectional communication
@MessageMapping("interactive")
public Mono<Void> interactive(String data, RSocketRequester requester) {
return requester.route("callback")
.data("processed: " + data)
.send();
}
}Customize message handler behavior through the customizer interface.
/**
* Callback interface for customizing RSocketMessageHandler
*/
@FunctionalInterface
interface RSocketMessageHandlerCustomizer {
/**
* Customize the message handler
* @param messageHandler Handler to customize
*/
void customize(RSocketMessageHandler messageHandler);
}Customization Examples:
@Configuration
public class MessageHandlerConfiguration {
// Configure default MIME types
@Bean
public RSocketMessageHandlerCustomizer mimeTypeCustomizer() {
return messageHandler -> {
messageHandler.setDefaultDataMimeType(MimeTypeUtils.APPLICATION_CBOR);
messageHandler.setDefaultMetadataMimeType(MimeTypeUtils.parseMediaType("message/x.rsocket.routing.v0"));
};
}
// Configure routing
@Bean
public RSocketMessageHandlerCustomizer routingCustomizer() {
return messageHandler -> {
PathPatternRouteMatcher matcher = new PathPatternRouteMatcher();
matcher.setCaseSensitive(false);
messageHandler.setRouteMatcher(matcher);
};
}
// Add argument resolvers
@Bean
public RSocketMessageHandlerCustomizer argumentResolverCustomizer() {
return messageHandler -> {
messageHandler.getArgumentResolverConfigurer()
.addCustomResolver(new CustomArgumentResolver());
};
}
// Configure encoding
@Bean
public RSocketMessageHandlerCustomizer encodingCustomizer() {
return messageHandler -> {
messageHandler.setEncoderConfigurer(configurer -> {
configurer.defaultCodecs().maxInMemorySize(1024 * 1024); // 1MB
});
};
}
}Comprehensive error handling patterns for RSocket message handlers.
@Controller
@MessageExceptionHandler
public class RSocketErrorController {
// Global error handler
@MessageExceptionHandler
public Mono<ErrorResponse> handleValidationError(ValidationException ex) {
return Mono.just(new ErrorResponse("VALIDATION_ERROR", ex.getMessage()));
}
// Specific error handler
@MessageExceptionHandler(DataIntegrityViolationException.class)
public Mono<ErrorResponse> handleDataError(DataIntegrityViolationException ex) {
return Mono.just(new ErrorResponse("DATA_ERROR", "Data constraint violation"));
}
// RSocket-specific error handler
@MessageExceptionHandler(RSocketException.class)
public Mono<Void> handleRSocketError(RSocketException ex) {
log.error("RSocket error: code={}, message={}", ex.errorCode(), ex.getMessage());
return Mono.empty();
}
}
// Custom error response
class ErrorResponse {
private final String code;
private final String message;
private final Instant timestamp;
public ErrorResponse(String code, String message) {
this.code = code;
this.message = message;
this.timestamp = Instant.now();
}
// getters...
}Security patterns for RSocket message handlers.
@Controller
@PreAuthorize("hasRole('USER')")
public class SecureRSocketController {
// Method-level security
@MessageMapping("admin.users")
@PreAuthorize("hasRole('ADMIN')")
public Flux<User> getAllUsers() {
return userService.findAll();
}
// Principal injection
@MessageMapping("profile")
public Mono<Profile> getCurrentProfile(Principal principal) {
return profileService.getProfile(principal.getName());
}
// Authentication context
@MessageMapping("secure-data")
public Mono<SecureData> getSecureData() {
return ReactiveSecurityContextHolder.getContext()
.map(SecurityContext::getAuthentication)
.cast(JwtAuthenticationToken.class)
.flatMap(auth -> secureDataService.getData(auth.getToken()));
}
}Working with RSocket metadata and custom headers.
@Controller
public class MetadataController {
// Composite metadata handling
@MessageMapping("metadata-example")
public Mono<String> handleMetadata(
String data,
@Header("custom-header") String customValue,
@Header(value = "optional-header", required = false) Optional<String> optionalValue
) {
return Mono.just("Processed: " + data + " with " + customValue);
}
// Manual metadata extraction
@MessageMapping("raw-metadata")
public Mono<String> handleRawMetadata(String data, @Headers Map<String, Object> headers) {
String traceId = (String) headers.get("trace-id");
String userId = (String) headers.get("user-id");
return processWithContext(data, traceId, userId);
}
// Setting response metadata
@MessageMapping("with-response-metadata")
public Mono<ResponseEntity<String>> responseWithMetadata(String data) {
return Mono.just(
ResponseEntity.ok()
.header("response-time", Instant.now().toString())
.header("server-id", "server-1")
.body("Processed: " + data)
);
}
}Testing patterns for RSocket message handlers.
@SpringBootTest
class RSocketMessageHandlerTest {
@Autowired
private RSocketRequester.Builder requesterBuilder;
@LocalServerPort
private int port;
private RSocketRequester requester;
@BeforeEach
void setup() {
requester = requesterBuilder
.tcp("localhost", port)
.block();
}
@Test
void testRequestResponse() {
StepVerifier.create(
requester.route("user.get")
.data("123")
.retrieveMono(User.class)
)
.assertNext(user -> {
assertThat(user.getId()).isEqualTo("123");
assertThat(user.getName()).isNotBlank();
})
.verifyComplete();
}
@Test
void testRequestStream() {
StepVerifier.create(
requester.route("notifications.stream")
.data("user123")
.retrieveFlux(Notification.class)
.take(3)
)
.expectNextCount(3)
.verifyComplete();
}
@Test
void testErrorHandling() {
StepVerifier.create(
requester.route("invalid-operation")
.data("bad-data")
.retrieveMono(String.class)
)
.expectError(RSocketException.class)
.verify();
}
@AfterEach
void cleanup() {
if (requester != null) {
requester.dispose();
}
}
}Single request, single response pattern.
@MessageMapping("echo")
public Mono<String> echo(String message) {
return Mono.just("Echo: " + message);
}Send data without expecting a response.
@MessageMapping("log")
public Mono<Void> logMessage(LogEntry entry) {
return logService.save(entry).then();
}Single request, stream of responses.
@MessageMapping("events")
public Flux<Event> streamEvents(String topic) {
return eventService.streamEvents(topic);
}Bidirectional streaming of data.
@MessageMapping("process-stream")
public Flux<ProcessedData> processStream(Flux<RawData> dataStream) {
return dataStream
.map(this::process)
.filter(Objects::nonNull);
}Install with Tessl CLI
npx tessl i tessl/maven-org-springframework-boot--spring-boot-starter-rsocket