0
# Codec Configuration
1
2
Encoding and decoding configuration for RSocket payloads, with built-in support for JSON and CBOR formats via Jackson integration.
3
4
## Capabilities
5
6
### RSocket Strategies
7
8
Central configuration for RSocket codecs, routing, and data buffer management.
9
10
```java { .api }
11
/**
12
* Strategies for RSocket handling including codecs and routing
13
*/
14
interface RSocketStrategies {
15
16
/** Get configured encoders */
17
List<Encoder<?>> encoders();
18
19
/** Get configured decoders */
20
List<Decoder<?>> decoders();
21
22
/** Get route matcher for message routing */
23
RouteMatcher routeMatcher();
24
25
/** Get data buffer factory */
26
DataBufferFactory dataBufferFactory();
27
28
/** Create new builder for customization */
29
static Builder builder();
30
31
/**
32
* Builder for creating RSocketStrategies instances
33
*/
34
interface Builder {
35
36
/** Add custom encoder */
37
Builder encoder(Encoder<?>... encoders);
38
39
/** Add custom decoder */
40
Builder decoder(Decoder<?>... decoders);
41
42
/** Configure route matcher */
43
Builder routeMatcher(RouteMatcher routeMatcher);
44
45
/** Configure data buffer factory */
46
Builder dataBufferFactory(DataBufferFactory bufferFactory);
47
48
/** Build strategies instance */
49
RSocketStrategies build();
50
}
51
}
52
```
53
54
### Built-in Codec Configuration
55
56
Automatic configuration of CBOR and JSON codecs with Jackson integration.
57
58
```java { .api }
59
/**
60
* CBOR codec configuration with Jackson
61
* Registered with Order(0) - highest priority
62
*/
63
class JacksonCborStrategyConfiguration {
64
65
@Bean
66
@Order(0)
67
@ConditionalOnBean(Jackson2ObjectMapperBuilder.class)
68
RSocketStrategiesCustomizer jacksonCborRSocketStrategyCustomizer(Jackson2ObjectMapperBuilder builder);
69
}
70
71
/**
72
* JSON codec configuration with Jackson
73
* Registered with Order(1) - lower priority than CBOR
74
*/
75
class JacksonJsonStrategyConfiguration {
76
77
@Bean
78
@Order(1)
79
@ConditionalOnBean(ObjectMapper.class)
80
RSocketStrategiesCustomizer jacksonJsonRSocketStrategyCustomizer(ObjectMapper objectMapper);
81
}
82
```
83
84
**Default Codec Priority:**
85
1. **CBOR** (`application/cbor`) - Priority 0 (highest)
86
2. **JSON** (`application/json`, `application/*+json`) - Priority 1
87
88
**Configuration Properties:**
89
90
```yaml
91
# MIME type defaults (automatically configured)
92
spring:
93
rsocket:
94
# Default data format (can be overridden per request)
95
data-mime-type: application/cbor
96
# Default metadata format for routing
97
metadata-mime-type: message/x.rsocket.routing.v0
98
```
99
100
### Custom Codec Registration
101
102
Register custom encoders and decoders for specialized data formats.
103
104
```java { .api }
105
/**
106
* Customizer interface for modifying RSocket strategies
107
*/
108
@FunctionalInterface
109
interface RSocketStrategiesCustomizer {
110
111
/**
112
* Customize RSocket strategies builder
113
* @param strategies Builder to customize
114
*/
115
void customize(RSocketStrategies.Builder strategies);
116
}
117
```
118
119
**Custom Codec Examples:**
120
121
```java
122
@Configuration
123
public class CustomCodecConfiguration {
124
125
// Protocol Buffers support
126
@Bean
127
@Order(-1) // Higher priority than defaults
128
public RSocketStrategiesCustomizer protobufCustomizer() {
129
return strategies -> {
130
strategies.encoder(new ProtobufEncoder());
131
strategies.decoder(new ProtobufDecoder());
132
};
133
}
134
135
// MessagePack support
136
@Bean
137
@Order(2) // Lower priority than JSON/CBOR
138
public RSocketStrategiesCustomizer messagePackCustomizer() {
139
return strategies -> {
140
strategies.encoder(new MessagePackEncoder());
141
strategies.decoder(new MessagePackDecoder());
142
};
143
}
144
145
// Custom binary format
146
@Bean
147
public RSocketStrategiesCustomizer binaryCustomizer() {
148
return strategies -> {
149
strategies.encoder(new CustomBinaryEncoder());
150
strategies.decoder(new CustomBinaryDecoder());
151
};
152
}
153
154
// String codec with custom charset
155
@Bean
156
public RSocketStrategiesCustomizer stringCustomizer() {
157
return strategies -> {
158
strategies.encoder(StringEncoder.textPlainOnly(StandardCharsets.UTF_16));
159
strategies.decoder(StringDecoder.textPlainOnly());
160
};
161
}
162
}
163
```
164
165
### Jackson Customization
166
167
Customize Jackson ObjectMapper configuration for JSON and CBOR codecs.
168
169
```java
170
@Configuration
171
public class JacksonRSocketConfiguration {
172
173
// Custom ObjectMapper for RSocket JSON
174
@Bean
175
@Primary
176
public ObjectMapper rSocketObjectMapper() {
177
return Jackson2ObjectMapperBuilder.json()
178
.propertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE)
179
.serializationInclusion(JsonInclude.Include.NON_NULL)
180
.featuresToDisable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
181
.modules(new JavaTimeModule())
182
.build();
183
}
184
185
// CBOR-specific ObjectMapper
186
@Bean
187
public RSocketStrategiesCustomizer cborMapperCustomizer() {
188
return strategies -> {
189
ObjectMapper cborMapper = Jackson2ObjectMapperBuilder.json()
190
.factory(new CBORFactory())
191
.build();
192
193
strategies.encoder(new Jackson2CborEncoder(cborMapper));
194
strategies.decoder(new Jackson2CborDecoder(cborMapper));
195
};
196
}
197
198
// Custom serialization modules
199
@Bean
200
public RSocketStrategiesCustomizer jacksonModuleCustomizer() {
201
return strategies -> {
202
ObjectMapper mapper = new ObjectMapper();
203
mapper.registerModule(new CustomSerializationModule());
204
mapper.registerModule(new ParameterNamesModule());
205
206
strategies.encoder(new Jackson2JsonEncoder(mapper));
207
strategies.decoder(new Jackson2JsonDecoder(mapper));
208
};
209
}
210
}
211
```
212
213
### Data Buffer Configuration
214
215
Configure data buffer management for memory optimization.
216
217
```java
218
@Configuration
219
public class DataBufferConfiguration {
220
221
// Netty data buffer factory (default)
222
@Bean
223
public RSocketStrategiesCustomizer nettyBufferCustomizer() {
224
return strategies -> {
225
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(
226
PooledByteBufAllocator.DEFAULT
227
);
228
strategies.dataBufferFactory(bufferFactory);
229
};
230
}
231
232
// Default data buffer factory
233
@Bean
234
public RSocketStrategiesCustomizer defaultBufferCustomizer() {
235
return strategies -> {
236
strategies.dataBufferFactory(new DefaultDataBufferFactory());
237
};
238
}
239
240
// Custom buffer size limits
241
@Bean
242
public RSocketStrategiesCustomizer bufferSizeCustomizer() {
243
return strategies -> {
244
strategies.codecs(configurer -> {
245
configurer.defaultCodecs().maxInMemorySize(2 * 1024 * 1024); // 2MB
246
});
247
};
248
}
249
}
250
```
251
252
### Route Matching Configuration
253
254
Configure route matching for @MessageMapping annotations.
255
256
```java
257
@Configuration
258
public class RoutingConfiguration {
259
260
// Path pattern route matcher (default)
261
@Bean
262
public RSocketStrategiesCustomizer pathPatternRouteCustomizer() {
263
return strategies -> {
264
PathPatternRouteMatcher matcher = new PathPatternRouteMatcher();
265
matcher.setCaseSensitive(false);
266
strategies.routeMatcher(matcher);
267
};
268
}
269
270
// Simple route matcher
271
@Bean
272
public RSocketStrategiesCustomizer simpleRouteCustomizer() {
273
return strategies -> {
274
strategies.routeMatcher(new SimpleRouteMatcher());
275
};
276
}
277
278
// Custom route matcher
279
@Bean
280
public RSocketStrategiesCustomizer customRouteCustomizer() {
281
return strategies -> {
282
strategies.routeMatcher(new CustomRouteMatcher());
283
};
284
}
285
}
286
```
287
288
### MIME Type Configuration
289
290
Configure MIME types for different data formats.
291
292
```java
293
@Configuration
294
public class MimeTypeConfiguration {
295
296
@Bean
297
public RSocketStrategiesCustomizer mimeTypeCustomizer() {
298
return strategies -> {
299
// Custom MIME types for specific encoders/decoders
300
strategies.encoder(new CustomEncoder(MimeType.valueOf("application/x-custom")));
301
strategies.decoder(new CustomDecoder(MimeType.valueOf("application/x-custom")));
302
};
303
}
304
}
305
306
// Usage in message handlers
307
@Controller
308
public class MimeTypeController {
309
310
// Specify data MIME type per request
311
@MessageMapping("cbor-data")
312
public Mono<ResponseEntity<MyData>> handleCborData(MyData data) {
313
return Mono.just(
314
ResponseEntity.ok()
315
.contentType(MediaType.APPLICATION_CBOR)
316
.body(processedData)
317
);
318
}
319
320
// Handle multiple MIME types
321
@MessageMapping("multi-format")
322
public Mono<String> handleMultiFormat(@RequestBody MyData data, @Header("content-type") String contentType) {
323
if (MediaType.APPLICATION_CBOR_VALUE.equals(contentType)) {
324
return handleCbor(data);
325
} else if (MediaType.APPLICATION_JSON_VALUE.equals(contentType)) {
326
return handleJson(data);
327
}
328
return Mono.error(new UnsupportedMediaTypeException("Unsupported format: " + contentType));
329
}
330
}
331
```
332
333
### Performance Optimization
334
335
Optimize codec performance for high-throughput scenarios.
336
337
```java
338
@Configuration
339
public class PerformanceConfiguration {
340
341
// Zero-copy optimizations for Netty
342
@Bean
343
public RSocketStrategiesCustomizer zeroCopyCustomizer() {
344
return strategies -> {
345
// Use Netty's zero-copy capabilities
346
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(
347
PooledByteBufAllocator.DEFAULT
348
);
349
strategies.dataBufferFactory(bufferFactory);
350
};
351
}
352
353
// Streaming codec configuration
354
@Bean
355
public RSocketStrategiesCustomizer streamingCustomizer() {
356
return strategies -> {
357
strategies.codecs(configurer -> {
358
// Enable streaming for large payloads
359
configurer.defaultCodecs().enableLoggingRequestDetails(false);
360
configurer.defaultCodecs().maxInMemorySize(-1); // Unlimited
361
});
362
};
363
}
364
365
// Custom thread pool for codec operations
366
@Bean
367
public RSocketStrategiesCustomizer threadPoolCustomizer() {
368
return strategies -> {
369
Scheduler codecScheduler = Schedulers.newParallel("rsocket-codec", 4);
370
strategies.encoder(new AsyncEncoder(codecScheduler));
371
strategies.decoder(new AsyncDecoder(codecScheduler));
372
};
373
}
374
}
375
```
376
377
### Error Handling in Codecs
378
379
Handle encoding/decoding errors gracefully.
380
381
```java
382
@Configuration
383
public class CodecErrorConfiguration {
384
385
@Bean
386
public RSocketStrategiesCustomizer errorHandlingCustomizer() {
387
return strategies -> {
388
strategies.encoder(new ResilientEncoder());
389
strategies.decoder(new ResilientDecoder());
390
};
391
}
392
}
393
394
// Custom error-handling encoder
395
public class ResilientEncoder implements Encoder<Object> {
396
397
private final Encoder<Object> delegate;
398
399
@Override
400
public Flux<DataBuffer> encode(Publisher<?> inputStream, DataBufferFactory bufferFactory,
401
ResolvableType elementType, MimeType mimeType, Map<String, Object> hints) {
402
return delegate.encode(inputStream, bufferFactory, elementType, mimeType, hints)
403
.onErrorResume(EncodingException.class, ex -> {
404
log.warn("Encoding failed for type {}: {}", elementType, ex.getMessage());
405
return Flux.error(new RSocketException(0x301, "Encoding error: " + ex.getMessage()));
406
});
407
}
408
}
409
410
// Custom error-handling decoder
411
public class ResilientDecoder implements Decoder<Object> {
412
413
private final Decoder<Object> delegate;
414
415
@Override
416
public Flux<Object> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
417
MimeType mimeType, Map<String, Object> hints) {
418
return delegate.decode(inputStream, elementType, mimeType, hints)
419
.onErrorResume(DecodingException.class, ex -> {
420
log.warn("Decoding failed for type {}: {}", elementType, ex.getMessage());
421
return Flux.error(new RSocketException(0x302, "Decoding error: " + ex.getMessage()));
422
});
423
}
424
}
425
```
426
427
## Codec Examples
428
429
### Custom Binary Codec
430
431
```java
432
public class CustomBinaryCodec implements Encoder<CustomData>, Decoder<CustomData> {
433
434
@Override
435
public boolean canEncode(ResolvableType elementType, MimeType mimeType) {
436
return CustomData.class.isAssignableFrom(elementType.toClass()) &&
437
MimeType.valueOf("application/x-custom-binary").equals(mimeType);
438
}
439
440
@Override
441
public Flux<DataBuffer> encode(Publisher<? extends CustomData> inputStream,
442
DataBufferFactory bufferFactory, ResolvableType elementType,
443
MimeType mimeType, Map<String, Object> hints) {
444
return Flux.from(inputStream)
445
.map(data -> {
446
DataBuffer buffer = bufferFactory.allocateBuffer();
447
// Custom binary serialization
448
buffer.write(data.toByteArray());
449
return buffer;
450
});
451
}
452
453
@Override
454
public boolean canDecode(ResolvableType elementType, MimeType mimeType) {
455
return CustomData.class.isAssignableFrom(elementType.toClass()) &&
456
MimeType.valueOf("application/x-custom-binary").equals(mimeType);
457
}
458
459
@Override
460
public Flux<CustomData> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
461
MimeType mimeType, Map<String, Object> hints) {
462
return Flux.from(inputStream)
463
.map(buffer -> {
464
byte[] bytes = new byte[buffer.readableByteCount()];
465
buffer.read(bytes);
466
DataBufferUtils.release(buffer);
467
return CustomData.fromByteArray(bytes);
468
});
469
}
470
}
471
```
472
473
### Compression Codec Wrapper
474
475
```java
476
public class CompressionCodecWrapper<T> implements Encoder<T>, Decoder<T> {
477
478
private final Encoder<T> encoder;
479
private final Decoder<T> decoder;
480
private final Compressor compressor;
481
482
@Override
483
public Flux<DataBuffer> encode(Publisher<? extends T> inputStream,
484
DataBufferFactory bufferFactory, ResolvableType elementType,
485
MimeType mimeType, Map<String, Object> hints) {
486
return encoder.encode(inputStream, bufferFactory, elementType, mimeType, hints)
487
.collectList()
488
.map(buffers -> {
489
// Combine and compress buffers
490
DataBuffer combined = bufferFactory.join(buffers);
491
byte[] compressed = compressor.compress(combined.asByteBuffer().array());
492
DataBufferUtils.release(combined);
493
494
DataBuffer result = bufferFactory.allocateBuffer(compressed.length);
495
result.write(compressed);
496
return result;
497
})
498
.flux();
499
}
500
501
@Override
502
public Flux<T> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
503
MimeType mimeType, Map<String, Object> hints) {
504
return Flux.from(inputStream)
505
.collectList()
506
.map(buffers -> {
507
// Combine and decompress buffers
508
DataBuffer combined = bufferFactory.join(buffers);
509
byte[] decompressed = compressor.decompress(combined.asByteBuffer().array());
510
DataBufferUtils.release(combined);
511
512
return bufferFactory.wrap(decompressed);
513
})
514
.flux()
515
.transform(decompressed -> decoder.decode(decompressed, elementType, mimeType, hints));
516
}
517
}
518
```