CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-springframework-boot--spring-boot-starter-rsocket

Starter for building RSocket clients and servers with Spring Boot auto-configuration.

Pending
Overview
Eval results
Files

codec-configuration.mddocs/

Codec Configuration

Encoding and decoding configuration for RSocket payloads, with built-in support for JSON and CBOR formats via Jackson integration.

Capabilities

RSocket Strategies

Central configuration for RSocket codecs, routing, and data buffer management.

/**
 * Strategies for RSocket handling including codecs and routing
 */
interface RSocketStrategies {
    
    /** Get configured encoders */
    List<Encoder<?>> encoders();
    
    /** Get configured decoders */
    List<Decoder<?>> decoders();
    
    /** Get route matcher for message routing */
    RouteMatcher routeMatcher();
    
    /** Get data buffer factory */
    DataBufferFactory dataBufferFactory();
    
    /** Create new builder for customization */
    static Builder builder();
    
    /**
     * Builder for creating RSocketStrategies instances
     */
    interface Builder {
        
        /** Add custom encoder */
        Builder encoder(Encoder<?>... encoders);
        
        /** Add custom decoder */
        Builder decoder(Decoder<?>... decoders);
        
        /** Configure route matcher */
        Builder routeMatcher(RouteMatcher routeMatcher);
        
        /** Configure data buffer factory */
        Builder dataBufferFactory(DataBufferFactory bufferFactory);
        
        /** Build strategies instance */
        RSocketStrategies build();
    }
}

Built-in Codec Configuration

Automatic configuration of CBOR and JSON codecs with Jackson integration.

/**
 * CBOR codec configuration with Jackson
 * Registered with Order(0) - highest priority
 */
class JacksonCborStrategyConfiguration {
    
    @Bean
    @Order(0)
    @ConditionalOnBean(Jackson2ObjectMapperBuilder.class)
    RSocketStrategiesCustomizer jacksonCborRSocketStrategyCustomizer(Jackson2ObjectMapperBuilder builder);
}

/**
 * JSON codec configuration with Jackson  
 * Registered with Order(1) - lower priority than CBOR
 */
class JacksonJsonStrategyConfiguration {
    
    @Bean
    @Order(1)
    @ConditionalOnBean(ObjectMapper.class)
    RSocketStrategiesCustomizer jacksonJsonRSocketStrategyCustomizer(ObjectMapper objectMapper);
}

Default Codec Priority:

  1. CBOR (application/cbor) - Priority 0 (highest)
  2. JSON (application/json, application/*+json) - Priority 1

Configuration Properties:

# MIME type defaults (automatically configured)
spring:
  rsocket:
    # Default data format (can be overridden per request)
    data-mime-type: application/cbor
    # Default metadata format for routing
    metadata-mime-type: message/x.rsocket.routing.v0

Custom Codec Registration

Register custom encoders and decoders for specialized data formats.

/**
 * Customizer interface for modifying RSocket strategies
 */
@FunctionalInterface
interface RSocketStrategiesCustomizer {
    
    /**
     * Customize RSocket strategies builder
     * @param strategies Builder to customize
     */
    void customize(RSocketStrategies.Builder strategies);
}

Custom Codec Examples:

@Configuration
public class CustomCodecConfiguration {
    
    // Protocol Buffers support
    @Bean
    @Order(-1) // Higher priority than defaults
    public RSocketStrategiesCustomizer protobufCustomizer() {
        return strategies -> {
            strategies.encoder(new ProtobufEncoder());
            strategies.decoder(new ProtobufDecoder());
        };
    }
    
    // MessagePack support
    @Bean
    @Order(2) // Lower priority than JSON/CBOR
    public RSocketStrategiesCustomizer messagePackCustomizer() {
        return strategies -> {
            strategies.encoder(new MessagePackEncoder());
            strategies.decoder(new MessagePackDecoder());
        };
    }
    
    // Custom binary format
    @Bean
    public RSocketStrategiesCustomizer binaryCustomizer() {
        return strategies -> {
            strategies.encoder(new CustomBinaryEncoder());
            strategies.decoder(new CustomBinaryDecoder());
        };
    }
    
    // String codec with custom charset
    @Bean
    public RSocketStrategiesCustomizer stringCustomizer() {
        return strategies -> {
            strategies.encoder(StringEncoder.textPlainOnly(StandardCharsets.UTF_16));
            strategies.decoder(StringDecoder.textPlainOnly());
        };
    }
}

Jackson Customization

Customize Jackson ObjectMapper configuration for JSON and CBOR codecs.

@Configuration
public class JacksonRSocketConfiguration {
    
    // Custom ObjectMapper for RSocket JSON
    @Bean
    @Primary
    public ObjectMapper rSocketObjectMapper() {
        return Jackson2ObjectMapperBuilder.json()
            .propertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE)
            .serializationInclusion(JsonInclude.Include.NON_NULL)
            .featuresToDisable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
            .modules(new JavaTimeModule())
            .build();
    }
    
    // CBOR-specific ObjectMapper
    @Bean
    public RSocketStrategiesCustomizer cborMapperCustomizer() {
        return strategies -> {
            ObjectMapper cborMapper = Jackson2ObjectMapperBuilder.json()
                .factory(new CBORFactory())
                .build();
                
            strategies.encoder(new Jackson2CborEncoder(cborMapper));
            strategies.decoder(new Jackson2CborDecoder(cborMapper));
        };
    }
    
    // Custom serialization modules
    @Bean
    public RSocketStrategiesCustomizer jacksonModuleCustomizer() {
        return strategies -> {
            ObjectMapper mapper = new ObjectMapper();
            mapper.registerModule(new CustomSerializationModule());
            mapper.registerModule(new ParameterNamesModule());
            
            strategies.encoder(new Jackson2JsonEncoder(mapper));
            strategies.decoder(new Jackson2JsonDecoder(mapper));
        };
    }
}

Data Buffer Configuration

Configure data buffer management for memory optimization.

@Configuration
public class DataBufferConfiguration {
    
    // Netty data buffer factory (default)
    @Bean
    public RSocketStrategiesCustomizer nettyBufferCustomizer() {
        return strategies -> {
            NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(
                PooledByteBufAllocator.DEFAULT
            );
            strategies.dataBufferFactory(bufferFactory);
        };
    }
    
    // Default data buffer factory
    @Bean
    public RSocketStrategiesCustomizer defaultBufferCustomizer() {
        return strategies -> {
            strategies.dataBufferFactory(new DefaultDataBufferFactory());
        };
    }
    
    // Custom buffer size limits
    @Bean
    public RSocketStrategiesCustomizer bufferSizeCustomizer() {
        return strategies -> {
            strategies.codecs(configurer -> {
                configurer.defaultCodecs().maxInMemorySize(2 * 1024 * 1024); // 2MB
            });
        };
    }
}

Route Matching Configuration

Configure route matching for @MessageMapping annotations.

@Configuration
public class RoutingConfiguration {
    
    // Path pattern route matcher (default)
    @Bean
    public RSocketStrategiesCustomizer pathPatternRouteCustomizer() {
        return strategies -> {
            PathPatternRouteMatcher matcher = new PathPatternRouteMatcher();
            matcher.setCaseSensitive(false);
            strategies.routeMatcher(matcher);
        };
    }
    
    // Simple route matcher
    @Bean
    public RSocketStrategiesCustomizer simpleRouteCustomizer() {
        return strategies -> {
            strategies.routeMatcher(new SimpleRouteMatcher());
        };
    }
    
    // Custom route matcher
    @Bean
    public RSocketStrategiesCustomizer customRouteCustomizer() {
        return strategies -> {
            strategies.routeMatcher(new CustomRouteMatcher());
        };
    }
}

MIME Type Configuration

Configure MIME types for different data formats.

@Configuration
public class MimeTypeConfiguration {
    
    @Bean
    public RSocketStrategiesCustomizer mimeTypeCustomizer() {
        return strategies -> {
            // Custom MIME types for specific encoders/decoders
            strategies.encoder(new CustomEncoder(MimeType.valueOf("application/x-custom")));
            strategies.decoder(new CustomDecoder(MimeType.valueOf("application/x-custom")));
        };
    }
}

// Usage in message handlers
@Controller
public class MimeTypeController {
    
    // Specify data MIME type per request
    @MessageMapping("cbor-data")
    public Mono<ResponseEntity<MyData>> handleCborData(MyData data) {
        return Mono.just(
            ResponseEntity.ok()
                .contentType(MediaType.APPLICATION_CBOR)
                .body(processedData)
        );
    }
    
    // Handle multiple MIME types
    @MessageMapping("multi-format")
    public Mono<String> handleMultiFormat(@RequestBody MyData data, @Header("content-type") String contentType) {
        if (MediaType.APPLICATION_CBOR_VALUE.equals(contentType)) {
            return handleCbor(data);
        } else if (MediaType.APPLICATION_JSON_VALUE.equals(contentType)) {
            return handleJson(data);
        }
        return Mono.error(new UnsupportedMediaTypeException("Unsupported format: " + contentType));
    }
}

Performance Optimization

Optimize codec performance for high-throughput scenarios.

@Configuration
public class PerformanceConfiguration {
    
    // Zero-copy optimizations for Netty
    @Bean
    public RSocketStrategiesCustomizer zeroCopyCustomizer() {
        return strategies -> {
            // Use Netty's zero-copy capabilities
            NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(
                PooledByteBufAllocator.DEFAULT
            );
            strategies.dataBufferFactory(bufferFactory);
        };
    }
    
    // Streaming codec configuration
    @Bean 
    public RSocketStrategiesCustomizer streamingCustomizer() {
        return strategies -> {
            strategies.codecs(configurer -> {
                // Enable streaming for large payloads
                configurer.defaultCodecs().enableLoggingRequestDetails(false);
                configurer.defaultCodecs().maxInMemorySize(-1); // Unlimited
            });
        };
    }
    
    // Custom thread pool for codec operations
    @Bean
    public RSocketStrategiesCustomizer threadPoolCustomizer() {
        return strategies -> {
            Scheduler codecScheduler = Schedulers.newParallel("rsocket-codec", 4);
            strategies.encoder(new AsyncEncoder(codecScheduler));
            strategies.decoder(new AsyncDecoder(codecScheduler));
        };
    }
}

Error Handling in Codecs

Handle encoding/decoding errors gracefully.

@Configuration
public class CodecErrorConfiguration {
    
    @Bean
    public RSocketStrategiesCustomizer errorHandlingCustomizer() {
        return strategies -> {
            strategies.encoder(new ResilientEncoder());
            strategies.decoder(new ResilientDecoder());
        };
    }
}

// Custom error-handling encoder
public class ResilientEncoder implements Encoder<Object> {
    
    private final Encoder<Object> delegate;
    
    @Override
    public Flux<DataBuffer> encode(Publisher<?> inputStream, DataBufferFactory bufferFactory, 
                                  ResolvableType elementType, MimeType mimeType, Map<String, Object> hints) {
        return delegate.encode(inputStream, bufferFactory, elementType, mimeType, hints)
            .onErrorResume(EncodingException.class, ex -> {
                log.warn("Encoding failed for type {}: {}", elementType, ex.getMessage());
                return Flux.error(new RSocketException(0x301, "Encoding error: " + ex.getMessage()));
            });
    }
}

// Custom error-handling decoder  
public class ResilientDecoder implements Decoder<Object> {
    
    private final Decoder<Object> delegate;
    
    @Override
    public Flux<Object> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
                              MimeType mimeType, Map<String, Object> hints) {
        return delegate.decode(inputStream, elementType, mimeType, hints)
            .onErrorResume(DecodingException.class, ex -> {
                log.warn("Decoding failed for type {}: {}", elementType, ex.getMessage());
                return Flux.error(new RSocketException(0x302, "Decoding error: " + ex.getMessage()));
            });
    }
}

Codec Examples

Custom Binary Codec

public class CustomBinaryCodec implements Encoder<CustomData>, Decoder<CustomData> {
    
    @Override
    public boolean canEncode(ResolvableType elementType, MimeType mimeType) {
        return CustomData.class.isAssignableFrom(elementType.toClass()) &&
               MimeType.valueOf("application/x-custom-binary").equals(mimeType);
    }
    
    @Override
    public Flux<DataBuffer> encode(Publisher<? extends CustomData> inputStream, 
                                   DataBufferFactory bufferFactory, ResolvableType elementType,
                                   MimeType mimeType, Map<String, Object> hints) {
        return Flux.from(inputStream)
            .map(data -> {
                DataBuffer buffer = bufferFactory.allocateBuffer();
                // Custom binary serialization
                buffer.write(data.toByteArray());
                return buffer;
            });
    }
    
    @Override
    public boolean canDecode(ResolvableType elementType, MimeType mimeType) {
        return CustomData.class.isAssignableFrom(elementType.toClass()) &&
               MimeType.valueOf("application/x-custom-binary").equals(mimeType);
    }
    
    @Override
    public Flux<CustomData> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
                                   MimeType mimeType, Map<String, Object> hints) {
        return Flux.from(inputStream)
            .map(buffer -> {
                byte[] bytes = new byte[buffer.readableByteCount()];
                buffer.read(bytes);
                DataBufferUtils.release(buffer);
                return CustomData.fromByteArray(bytes);
            });
    }
}

Compression Codec Wrapper

public class CompressionCodecWrapper<T> implements Encoder<T>, Decoder<T> {
    
    private final Encoder<T> encoder;
    private final Decoder<T> decoder;
    private final Compressor compressor;
    
    @Override
    public Flux<DataBuffer> encode(Publisher<? extends T> inputStream, 
                                   DataBufferFactory bufferFactory, ResolvableType elementType,
                                   MimeType mimeType, Map<String, Object> hints) {
        return encoder.encode(inputStream, bufferFactory, elementType, mimeType, hints)
            .collectList()
            .map(buffers -> {
                // Combine and compress buffers
                DataBuffer combined = bufferFactory.join(buffers);
                byte[] compressed = compressor.compress(combined.asByteBuffer().array());
                DataBufferUtils.release(combined);
                
                DataBuffer result = bufferFactory.allocateBuffer(compressed.length);
                result.write(compressed);
                return result;
            })
            .flux();
    }
    
    @Override
    public Flux<T> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
                          MimeType mimeType, Map<String, Object> hints) {
        return Flux.from(inputStream)
            .collectList()
            .map(buffers -> {
                // Combine and decompress buffers
                DataBuffer combined = bufferFactory.join(buffers);
                byte[] decompressed = compressor.decompress(combined.asByteBuffer().array());
                DataBufferUtils.release(combined);
                
                return bufferFactory.wrap(decompressed);
            })
            .flux()
            .transform(decompressed -> decoder.decode(decompressed, elementType, mimeType, hints));
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-springframework-boot--spring-boot-starter-rsocket

docs

auto-configuration.md

client-configuration.md

codec-configuration.md

index.md

message-handling.md

server-configuration.md

tile.json