Starter for building RSocket clients and servers with Spring Boot auto-configuration.
—
Encoding and decoding configuration for RSocket payloads, with built-in support for JSON and CBOR formats via Jackson integration.
Central configuration for RSocket codecs, routing, and data buffer management.
/**
* Strategies for RSocket handling including codecs and routing
*/
interface RSocketStrategies {
/** Get configured encoders */
List<Encoder<?>> encoders();
/** Get configured decoders */
List<Decoder<?>> decoders();
/** Get route matcher for message routing */
RouteMatcher routeMatcher();
/** Get data buffer factory */
DataBufferFactory dataBufferFactory();
/** Create new builder for customization */
static Builder builder();
/**
* Builder for creating RSocketStrategies instances
*/
interface Builder {
/** Add custom encoder */
Builder encoder(Encoder<?>... encoders);
/** Add custom decoder */
Builder decoder(Decoder<?>... decoders);
/** Configure route matcher */
Builder routeMatcher(RouteMatcher routeMatcher);
/** Configure data buffer factory */
Builder dataBufferFactory(DataBufferFactory bufferFactory);
/** Build strategies instance */
RSocketStrategies build();
}
}Automatic configuration of CBOR and JSON codecs with Jackson integration.
/**
* CBOR codec configuration with Jackson
* Registered with Order(0) - highest priority
*/
class JacksonCborStrategyConfiguration {
@Bean
@Order(0)
@ConditionalOnBean(Jackson2ObjectMapperBuilder.class)
RSocketStrategiesCustomizer jacksonCborRSocketStrategyCustomizer(Jackson2ObjectMapperBuilder builder);
}
/**
* JSON codec configuration with Jackson
* Registered with Order(1) - lower priority than CBOR
*/
class JacksonJsonStrategyConfiguration {
@Bean
@Order(1)
@ConditionalOnBean(ObjectMapper.class)
RSocketStrategiesCustomizer jacksonJsonRSocketStrategyCustomizer(ObjectMapper objectMapper);
}Default Codec Priority:
application/cbor) - Priority 0 (highest)application/json, application/*+json) - Priority 1Configuration Properties:
# MIME type defaults (automatically configured)
spring:
rsocket:
# Default data format (can be overridden per request)
data-mime-type: application/cbor
# Default metadata format for routing
metadata-mime-type: message/x.rsocket.routing.v0Register custom encoders and decoders for specialized data formats.
/**
* Customizer interface for modifying RSocket strategies
*/
@FunctionalInterface
interface RSocketStrategiesCustomizer {
/**
* Customize RSocket strategies builder
* @param strategies Builder to customize
*/
void customize(RSocketStrategies.Builder strategies);
}Custom Codec Examples:
@Configuration
public class CustomCodecConfiguration {
// Protocol Buffers support
@Bean
@Order(-1) // Higher priority than defaults
public RSocketStrategiesCustomizer protobufCustomizer() {
return strategies -> {
strategies.encoder(new ProtobufEncoder());
strategies.decoder(new ProtobufDecoder());
};
}
// MessagePack support
@Bean
@Order(2) // Lower priority than JSON/CBOR
public RSocketStrategiesCustomizer messagePackCustomizer() {
return strategies -> {
strategies.encoder(new MessagePackEncoder());
strategies.decoder(new MessagePackDecoder());
};
}
// Custom binary format
@Bean
public RSocketStrategiesCustomizer binaryCustomizer() {
return strategies -> {
strategies.encoder(new CustomBinaryEncoder());
strategies.decoder(new CustomBinaryDecoder());
};
}
// String codec with custom charset
@Bean
public RSocketStrategiesCustomizer stringCustomizer() {
return strategies -> {
strategies.encoder(StringEncoder.textPlainOnly(StandardCharsets.UTF_16));
strategies.decoder(StringDecoder.textPlainOnly());
};
}
}Customize Jackson ObjectMapper configuration for JSON and CBOR codecs.
@Configuration
public class JacksonRSocketConfiguration {
// Custom ObjectMapper for RSocket JSON
@Bean
@Primary
public ObjectMapper rSocketObjectMapper() {
return Jackson2ObjectMapperBuilder.json()
.propertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE)
.serializationInclusion(JsonInclude.Include.NON_NULL)
.featuresToDisable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.modules(new JavaTimeModule())
.build();
}
// CBOR-specific ObjectMapper
@Bean
public RSocketStrategiesCustomizer cborMapperCustomizer() {
return strategies -> {
ObjectMapper cborMapper = Jackson2ObjectMapperBuilder.json()
.factory(new CBORFactory())
.build();
strategies.encoder(new Jackson2CborEncoder(cborMapper));
strategies.decoder(new Jackson2CborDecoder(cborMapper));
};
}
// Custom serialization modules
@Bean
public RSocketStrategiesCustomizer jacksonModuleCustomizer() {
return strategies -> {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new CustomSerializationModule());
mapper.registerModule(new ParameterNamesModule());
strategies.encoder(new Jackson2JsonEncoder(mapper));
strategies.decoder(new Jackson2JsonDecoder(mapper));
};
}
}Configure data buffer management for memory optimization.
@Configuration
public class DataBufferConfiguration {
// Netty data buffer factory (default)
@Bean
public RSocketStrategiesCustomizer nettyBufferCustomizer() {
return strategies -> {
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(
PooledByteBufAllocator.DEFAULT
);
strategies.dataBufferFactory(bufferFactory);
};
}
// Default data buffer factory
@Bean
public RSocketStrategiesCustomizer defaultBufferCustomizer() {
return strategies -> {
strategies.dataBufferFactory(new DefaultDataBufferFactory());
};
}
// Custom buffer size limits
@Bean
public RSocketStrategiesCustomizer bufferSizeCustomizer() {
return strategies -> {
strategies.codecs(configurer -> {
configurer.defaultCodecs().maxInMemorySize(2 * 1024 * 1024); // 2MB
});
};
}
}Configure route matching for @MessageMapping annotations.
@Configuration
public class RoutingConfiguration {
// Path pattern route matcher (default)
@Bean
public RSocketStrategiesCustomizer pathPatternRouteCustomizer() {
return strategies -> {
PathPatternRouteMatcher matcher = new PathPatternRouteMatcher();
matcher.setCaseSensitive(false);
strategies.routeMatcher(matcher);
};
}
// Simple route matcher
@Bean
public RSocketStrategiesCustomizer simpleRouteCustomizer() {
return strategies -> {
strategies.routeMatcher(new SimpleRouteMatcher());
};
}
// Custom route matcher
@Bean
public RSocketStrategiesCustomizer customRouteCustomizer() {
return strategies -> {
strategies.routeMatcher(new CustomRouteMatcher());
};
}
}Configure MIME types for different data formats.
@Configuration
public class MimeTypeConfiguration {
@Bean
public RSocketStrategiesCustomizer mimeTypeCustomizer() {
return strategies -> {
// Custom MIME types for specific encoders/decoders
strategies.encoder(new CustomEncoder(MimeType.valueOf("application/x-custom")));
strategies.decoder(new CustomDecoder(MimeType.valueOf("application/x-custom")));
};
}
}
// Usage in message handlers
@Controller
public class MimeTypeController {
// Specify data MIME type per request
@MessageMapping("cbor-data")
public Mono<ResponseEntity<MyData>> handleCborData(MyData data) {
return Mono.just(
ResponseEntity.ok()
.contentType(MediaType.APPLICATION_CBOR)
.body(processedData)
);
}
// Handle multiple MIME types
@MessageMapping("multi-format")
public Mono<String> handleMultiFormat(@RequestBody MyData data, @Header("content-type") String contentType) {
if (MediaType.APPLICATION_CBOR_VALUE.equals(contentType)) {
return handleCbor(data);
} else if (MediaType.APPLICATION_JSON_VALUE.equals(contentType)) {
return handleJson(data);
}
return Mono.error(new UnsupportedMediaTypeException("Unsupported format: " + contentType));
}
}Optimize codec performance for high-throughput scenarios.
@Configuration
public class PerformanceConfiguration {
// Zero-copy optimizations for Netty
@Bean
public RSocketStrategiesCustomizer zeroCopyCustomizer() {
return strategies -> {
// Use Netty's zero-copy capabilities
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(
PooledByteBufAllocator.DEFAULT
);
strategies.dataBufferFactory(bufferFactory);
};
}
// Streaming codec configuration
@Bean
public RSocketStrategiesCustomizer streamingCustomizer() {
return strategies -> {
strategies.codecs(configurer -> {
// Enable streaming for large payloads
configurer.defaultCodecs().enableLoggingRequestDetails(false);
configurer.defaultCodecs().maxInMemorySize(-1); // Unlimited
});
};
}
// Custom thread pool for codec operations
@Bean
public RSocketStrategiesCustomizer threadPoolCustomizer() {
return strategies -> {
Scheduler codecScheduler = Schedulers.newParallel("rsocket-codec", 4);
strategies.encoder(new AsyncEncoder(codecScheduler));
strategies.decoder(new AsyncDecoder(codecScheduler));
};
}
}Handle encoding/decoding errors gracefully.
@Configuration
public class CodecErrorConfiguration {
@Bean
public RSocketStrategiesCustomizer errorHandlingCustomizer() {
return strategies -> {
strategies.encoder(new ResilientEncoder());
strategies.decoder(new ResilientDecoder());
};
}
}
// Custom error-handling encoder
public class ResilientEncoder implements Encoder<Object> {
private final Encoder<Object> delegate;
@Override
public Flux<DataBuffer> encode(Publisher<?> inputStream, DataBufferFactory bufferFactory,
ResolvableType elementType, MimeType mimeType, Map<String, Object> hints) {
return delegate.encode(inputStream, bufferFactory, elementType, mimeType, hints)
.onErrorResume(EncodingException.class, ex -> {
log.warn("Encoding failed for type {}: {}", elementType, ex.getMessage());
return Flux.error(new RSocketException(0x301, "Encoding error: " + ex.getMessage()));
});
}
}
// Custom error-handling decoder
public class ResilientDecoder implements Decoder<Object> {
private final Decoder<Object> delegate;
@Override
public Flux<Object> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
MimeType mimeType, Map<String, Object> hints) {
return delegate.decode(inputStream, elementType, mimeType, hints)
.onErrorResume(DecodingException.class, ex -> {
log.warn("Decoding failed for type {}: {}", elementType, ex.getMessage());
return Flux.error(new RSocketException(0x302, "Decoding error: " + ex.getMessage()));
});
}
}public class CustomBinaryCodec implements Encoder<CustomData>, Decoder<CustomData> {
@Override
public boolean canEncode(ResolvableType elementType, MimeType mimeType) {
return CustomData.class.isAssignableFrom(elementType.toClass()) &&
MimeType.valueOf("application/x-custom-binary").equals(mimeType);
}
@Override
public Flux<DataBuffer> encode(Publisher<? extends CustomData> inputStream,
DataBufferFactory bufferFactory, ResolvableType elementType,
MimeType mimeType, Map<String, Object> hints) {
return Flux.from(inputStream)
.map(data -> {
DataBuffer buffer = bufferFactory.allocateBuffer();
// Custom binary serialization
buffer.write(data.toByteArray());
return buffer;
});
}
@Override
public boolean canDecode(ResolvableType elementType, MimeType mimeType) {
return CustomData.class.isAssignableFrom(elementType.toClass()) &&
MimeType.valueOf("application/x-custom-binary").equals(mimeType);
}
@Override
public Flux<CustomData> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
MimeType mimeType, Map<String, Object> hints) {
return Flux.from(inputStream)
.map(buffer -> {
byte[] bytes = new byte[buffer.readableByteCount()];
buffer.read(bytes);
DataBufferUtils.release(buffer);
return CustomData.fromByteArray(bytes);
});
}
}public class CompressionCodecWrapper<T> implements Encoder<T>, Decoder<T> {
private final Encoder<T> encoder;
private final Decoder<T> decoder;
private final Compressor compressor;
@Override
public Flux<DataBuffer> encode(Publisher<? extends T> inputStream,
DataBufferFactory bufferFactory, ResolvableType elementType,
MimeType mimeType, Map<String, Object> hints) {
return encoder.encode(inputStream, bufferFactory, elementType, mimeType, hints)
.collectList()
.map(buffers -> {
// Combine and compress buffers
DataBuffer combined = bufferFactory.join(buffers);
byte[] compressed = compressor.compress(combined.asByteBuffer().array());
DataBufferUtils.release(combined);
DataBuffer result = bufferFactory.allocateBuffer(compressed.length);
result.write(compressed);
return result;
})
.flux();
}
@Override
public Flux<T> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
MimeType mimeType, Map<String, Object> hints) {
return Flux.from(inputStream)
.collectList()
.map(buffers -> {
// Combine and decompress buffers
DataBuffer combined = bufferFactory.join(buffers);
byte[] decompressed = compressor.decompress(combined.asByteBuffer().array());
DataBufferUtils.release(combined);
return bufferFactory.wrap(decompressed);
})
.flux()
.transform(decompressed -> decoder.decode(decompressed, elementType, mimeType, hints));
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-springframework-boot--spring-boot-starter-rsocket