CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-springframework-boot--spring-boot-starter-rsocket

Starter for building RSocket clients and servers with Spring Boot auto-configuration.

Pending
Overview
Eval results
Files

message-handling.mddocs/

Message Handling

Spring messaging integration enabling @MessageMapping annotations and bidirectional communication patterns in RSocket controllers.

Capabilities

RSocket Message Handler

Core message handler that processes @MessageMapping annotations and integrates with Spring's messaging infrastructure.

/**
 * Message handler for RSocket @MessageMapping support
 */
class RSocketMessageHandler extends MessagingMethodMessageHandler {
    
    /** Set RSocket strategies for codec configuration */
    void setRSocketStrategies(RSocketStrategies strategies);
    
    /** Get configured RSocket strategies */
    RSocketStrategies getRSocketStrategies();
    
    /** Set default data MIME type */
    void setDefaultDataMimeType(MimeType defaultDataMimeType);
    
    /** Set default metadata MIME type */
    void setDefaultMetadataMimeType(MimeType defaultMetadataMimeType);
    
    /** Configure routing matcher for message routing */
    void setRouteMatcher(RouteMatcher routeMatcher);
    
    /** Create socket acceptor for server integration */
    SocketAcceptor responder();
}

Message Mapping Annotations

Core annotations for defining RSocket message handlers.

/**
 * Marks a method as an RSocket message handler
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@interface MessageMapping {
    /** Route patterns this handler responds to */
    String[] value() default {};
}

/**
 * Extracts values from route variables
 */
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)  
@interface DestinationVariable {
    /** Variable name to extract */
    String value() default "";
}

/**
 * Accesses message headers/metadata
 */
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@interface Header {
    /** Header name to access */
    String value() default "";
    /** Whether header is required */
    boolean required() default true;
    /** Default value if header missing */
    String defaultValue() default ValueConstants.DEFAULT_NONE;
}

/**
 * Accesses all message headers
 */
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@interface Headers {
}

Controller Implementation Patterns

Different patterns for implementing RSocket message handlers.

/**
 * Basic RSocket controller with message mappings
 */
@Controller
public class RSocketController {
    
    // Request-Response pattern
    @MessageMapping("user.get")
    public Mono<User> getUser(String userId) {
        return userService.findById(userId);
    }
    
    // Fire-and-Forget pattern
    @MessageMapping("user.update")
    public Mono<Void> updateUser(User user) {
        return userService.save(user).then();
    }
    
    // Request-Stream pattern
    @MessageMapping("notifications.stream")
    public Flux<Notification> streamNotifications(String userId) {
        return notificationService.streamForUser(userId);
    }
    
    // Request-Channel pattern (bidirectional streaming)
    @MessageMapping("chat")
    public Flux<ChatMessage> handleChat(Flux<ChatMessage> inbound) {
        return chatService.handleConversation(inbound);
    }
}

Advanced Controller Examples:

@Controller
public class AdvancedRSocketController {
    
    // Route variables and headers
    @MessageMapping("user.{id}.profile")
    public Mono<Profile> getUserProfile(
        @DestinationVariable String id,
        @Header("user-role") String userRole,
        @Headers Map<String, Object> headers
    ) {
        return profileService.getProfile(id, userRole, headers);
    }
    
    // Complex data types
    @MessageMapping("search")
    public Flux<SearchResult> search(SearchRequest request) {
        return searchService.search(request);
    }
    
    // Exception handling
    @MessageMapping("risky-operation") 
    public Mono<String> riskyOperation(String input) {
        return processService.process(input)
            .onErrorResume(ValidationException.class, 
                ex -> Mono.error(new RSocketException(0x201, ex.getMessage())));
    }
    
    // Requester injection for bidirectional communication
    @MessageMapping("interactive")
    public Mono<Void> interactive(String data, RSocketRequester requester) {
        return requester.route("callback")
            .data("processed: " + data)
            .send();
    }
}

Message Handler Customization

Customize message handler behavior through the customizer interface.

/**
 * Callback interface for customizing RSocketMessageHandler
 */
@FunctionalInterface
interface RSocketMessageHandlerCustomizer {
    
    /**
     * Customize the message handler
     * @param messageHandler Handler to customize
     */
    void customize(RSocketMessageHandler messageHandler);
}

Customization Examples:

@Configuration
public class MessageHandlerConfiguration {
    
    // Configure default MIME types
    @Bean
    public RSocketMessageHandlerCustomizer mimeTypeCustomizer() {
        return messageHandler -> {
            messageHandler.setDefaultDataMimeType(MimeTypeUtils.APPLICATION_CBOR);
            messageHandler.setDefaultMetadataMimeType(MimeTypeUtils.parseMediaType("message/x.rsocket.routing.v0"));
        };
    }
    
    // Configure routing
    @Bean
    public RSocketMessageHandlerCustomizer routingCustomizer() {
        return messageHandler -> {
            PathPatternRouteMatcher matcher = new PathPatternRouteMatcher();
            matcher.setCaseSensitive(false);
            messageHandler.setRouteMatcher(matcher);
        };
    }
    
    // Add argument resolvers
    @Bean
    public RSocketMessageHandlerCustomizer argumentResolverCustomizer() {
        return messageHandler -> {
            messageHandler.getArgumentResolverConfigurer()
                .addCustomResolver(new CustomArgumentResolver());
        };
    }
    
    // Configure encoding
    @Bean
    public RSocketMessageHandlerCustomizer encodingCustomizer() {
        return messageHandler -> {
            messageHandler.setEncoderConfigurer(configurer -> {
                configurer.defaultCodecs().maxInMemorySize(1024 * 1024); // 1MB
            });
        };
    }
}

Error Handling

Comprehensive error handling patterns for RSocket message handlers.

@Controller
@MessageExceptionHandler
public class RSocketErrorController {
    
    // Global error handler
    @MessageExceptionHandler
    public Mono<ErrorResponse> handleValidationError(ValidationException ex) {
        return Mono.just(new ErrorResponse("VALIDATION_ERROR", ex.getMessage()));
    }
    
    // Specific error handler
    @MessageExceptionHandler(DataIntegrityViolationException.class)
    public Mono<ErrorResponse> handleDataError(DataIntegrityViolationException ex) {
        return Mono.just(new ErrorResponse("DATA_ERROR", "Data constraint violation"));
    }
    
    // RSocket-specific error handler
    @MessageExceptionHandler(RSocketException.class)
    public Mono<Void> handleRSocketError(RSocketException ex) {
        log.error("RSocket error: code={}, message={}", ex.errorCode(), ex.getMessage());
        return Mono.empty();
    }
}

// Custom error response
class ErrorResponse {
    private final String code;
    private final String message;
    private final Instant timestamp;
    
    public ErrorResponse(String code, String message) {
        this.code = code;
        this.message = message;
        this.timestamp = Instant.now();
    }
    
    // getters...
}

Security Integration

Security patterns for RSocket message handlers.

@Controller
@PreAuthorize("hasRole('USER')")
public class SecureRSocketController {
    
    // Method-level security
    @MessageMapping("admin.users")
    @PreAuthorize("hasRole('ADMIN')")
    public Flux<User> getAllUsers() {
        return userService.findAll();
    }
    
    // Principal injection
    @MessageMapping("profile")
    public Mono<Profile> getCurrentProfile(Principal principal) {
        return profileService.getProfile(principal.getName());
    }
    
    // Authentication context
    @MessageMapping("secure-data")
    public Mono<SecureData> getSecureData() {
        return ReactiveSecurityContextHolder.getContext()
            .map(SecurityContext::getAuthentication)
            .cast(JwtAuthenticationToken.class)
            .flatMap(auth -> secureDataService.getData(auth.getToken()));
    }
}

Metadata and Headers

Working with RSocket metadata and custom headers.

@Controller
public class MetadataController {
    
    // Composite metadata handling
    @MessageMapping("metadata-example")
    public Mono<String> handleMetadata(
        String data,
        @Header("custom-header") String customValue,
        @Header(value = "optional-header", required = false) Optional<String> optionalValue
    ) {
        return Mono.just("Processed: " + data + " with " + customValue);
    }
    
    // Manual metadata extraction
    @MessageMapping("raw-metadata")
    public Mono<String> handleRawMetadata(String data, @Headers Map<String, Object> headers) {
        String traceId = (String) headers.get("trace-id");
        String userId = (String) headers.get("user-id");
        
        return processWithContext(data, traceId, userId);
    }
    
    // Setting response metadata
    @MessageMapping("with-response-metadata")
    public Mono<ResponseEntity<String>> responseWithMetadata(String data) {
        return Mono.just(
            ResponseEntity.ok()
                .header("response-time", Instant.now().toString())
                .header("server-id", "server-1")
                .body("Processed: " + data)
        );
    }
}

Testing Message Handlers

Testing patterns for RSocket message handlers.

@SpringBootTest
class RSocketMessageHandlerTest {
    
    @Autowired
    private RSocketRequester.Builder requesterBuilder;
    
    @LocalServerPort
    private int port;
    
    private RSocketRequester requester;
    
    @BeforeEach
    void setup() {
        requester = requesterBuilder
            .tcp("localhost", port)
            .block();
    }
    
    @Test
    void testRequestResponse() {
        StepVerifier.create(
            requester.route("user.get")
                .data("123")
                .retrieveMono(User.class)
        )
        .assertNext(user -> {
            assertThat(user.getId()).isEqualTo("123");
            assertThat(user.getName()).isNotBlank();
        })
        .verifyComplete();
    }
    
    @Test
    void testRequestStream() {
        StepVerifier.create(
            requester.route("notifications.stream")
                .data("user123")
                .retrieveFlux(Notification.class)
                .take(3)
        )
        .expectNextCount(3)
        .verifyComplete();
    }
    
    @Test
    void testErrorHandling() {
        StepVerifier.create(
            requester.route("invalid-operation")
                .data("bad-data")
                .retrieveMono(String.class)
        )
        .expectError(RSocketException.class)
        .verify();
    }
    
    @AfterEach
    void cleanup() {
        if (requester != null) {
            requester.dispose();
        }
    }
}

Interaction Patterns

Request-Response

Single request, single response pattern.

@MessageMapping("echo")
public Mono<String> echo(String message) {
    return Mono.just("Echo: " + message);
}

Fire-and-Forget

Send data without expecting a response.

@MessageMapping("log")
public Mono<Void> logMessage(LogEntry entry) {
    return logService.save(entry).then();
}

Request-Stream

Single request, stream of responses.

@MessageMapping("events")
public Flux<Event> streamEvents(String topic) {
    return eventService.streamEvents(topic);
}

Request-Channel

Bidirectional streaming of data.

@MessageMapping("process-stream")
public Flux<ProcessedData> processStream(Flux<RawData> dataStream) {
    return dataStream
        .map(this::process)
        .filter(Objects::nonNull);
}

Install with Tessl CLI

npx tessl i tessl/maven-org-springframework-boot--spring-boot-starter-rsocket

docs

auto-configuration.md

client-configuration.md

codec-configuration.md

index.md

message-handling.md

server-configuration.md

tile.json