0
# Message Conversion
1
2
Spring Cloud Stream's message conversion framework handles different content types, MIME types, and serialization formats across various messaging systems. It provides automatic conversion between message formats and supports custom converters for specialized use cases.
3
4
## Capabilities
5
6
### Composite Message Converter Factory
7
8
Factory for creating and managing composite message converters that can handle multiple content types.
9
10
```java { .api }
11
/**
12
* Factory for creating composite message converters.
13
* Manages multiple converters and selects appropriate ones based on content type.
14
*/
15
public class CompositeMessageConverterFactory implements BeanFactoryAware, InitializingBean {
16
17
private BeanFactory beanFactory;
18
private MessageConverter messageConverterForAllRegistered;
19
20
/**
21
* Get a message converter that supports all registered content types.
22
* @return composite message converter
23
*/
24
public MessageConverter getMessageConverterForAllRegistered();
25
26
/**
27
* Get a message converter for a specific MIME type.
28
* @param mimeType the target MIME type
29
* @return message converter for the specified type, or null if not supported
30
*/
31
public static MessageConverter getMessageConverterForType(MimeType mimeType);
32
33
/**
34
* Register a new message converter.
35
* @param messageConverter the converter to register
36
*/
37
public void addMessageConverter(MessageConverter messageConverter);
38
39
/**
40
* Get all registered message converters.
41
* @return collection of registered converters
42
*/
43
public Collection<MessageConverter> getMessageConverters();
44
45
public void setBeanFactory(BeanFactory beanFactory);
46
public void afterPropertiesSet();
47
}
48
```
49
50
### Object String Message Converter
51
52
Converter that handles conversion between objects and string representations.
53
54
```java { .api }
55
/**
56
* Message converter that converts between objects and strings.
57
* Supports various serialization formats and MIME types.
58
*/
59
public class ObjectStringMessageConverter extends AbstractMessageConverter implements BeanFactoryAware, BeanClassLoaderAware {
60
61
private BeanFactory beanFactory;
62
private ClassLoader beanClassLoader;
63
64
/**
65
* Create converter with default supported MIME types.
66
*/
67
public ObjectStringMessageConverter();
68
69
/**
70
* Create converter with custom MIME type.
71
* @param supportedMimeType the MIME type to support
72
*/
73
public ObjectStringMessageConverter(MimeType supportedMimeType);
74
75
/**
76
* Check if this converter supports the given class.
77
* @param clazz the class to check
78
* @return true if the class is supported
79
*/
80
protected boolean supports(Class<?> clazz);
81
82
/**
83
* Convert from internal message format to target class.
84
* @param message the source message
85
* @param targetClass the target class
86
* @param conversionHint optional conversion hint
87
* @return converted object
88
*/
89
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint);
90
91
/**
92
* Convert to internal message format.
93
* @param payload the payload to convert
94
* @param headers the message headers
95
* @param conversionHint optional conversion hint
96
* @return converted payload
97
*/
98
protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers, @Nullable Object conversionHint);
99
100
public void setBeanFactory(BeanFactory beanFactory);
101
public void setBeanClassLoader(ClassLoader classLoader);
102
}
103
```
104
105
### Custom MIME Type Converter
106
107
Converter for handling custom MIME type parsing and formatting.
108
109
```java { .api }
110
/**
111
* Converter for custom MIME type handling.
112
* Implements Spring's Converter interface for MIME type conversion.
113
*/
114
public class CustomMimeTypeConverter implements Converter<String, MimeType> {
115
116
/**
117
* Convert string representation to MIME type.
118
* @param source the string representation of MIME type
119
* @return parsed MimeType object
120
* @throws InvalidMimeTypeException if the string is not a valid MIME type
121
*/
122
public MimeType convert(String source);
123
124
/**
125
* Check if the given string is a valid MIME type format.
126
* @param mimeTypeString the string to validate
127
* @return true if valid MIME type format
128
*/
129
public static boolean isValidMimeType(String mimeTypeString);
130
131
/**
132
* Parse MIME type with validation.
133
* @param mimeTypeString the MIME type string
134
* @return parsed MimeType
135
* @throws ConversionException if parsing fails
136
*/
137
public static MimeType parseMimeType(String mimeTypeString);
138
}
139
```
140
141
### Message Converter Utils
142
143
Utility class for message converter operations and constants.
144
145
```java { .api }
146
/**
147
* Utility class for message converter operations.
148
* Contains constants and helper methods for message conversion.
149
*/
150
public class MessageConverterUtils {
151
152
/**
153
* MIME type for Java objects.
154
*/
155
public static final String X_JAVA_OBJECT = "application/x-java-object";
156
157
/**
158
* MIME type for serialized Java objects.
159
*/
160
public static final String X_JAVA_SERIALIZED_OBJECT = "application/x-java-serialized-object";
161
162
/**
163
* Default JSON MIME type.
164
*/
165
public static final MimeType APPLICATION_JSON = MimeType.valueOf("application/json");
166
167
/**
168
* Default text MIME type.
169
*/
170
public static final MimeType TEXT_PLAIN = MimeType.valueOf("text/plain");
171
172
/**
173
* Header name for original content type.
174
*/
175
public static final String ORIGINAL_CONTENT_TYPE_HEADER = "originalContentType";
176
177
/**
178
* Get the content type from message headers.
179
* @param headers the message headers
180
* @return the content type, or null if not present
181
*/
182
public static MimeType getContentType(MessageHeaders headers);
183
184
/**
185
* Set content type in message headers.
186
* @param headers the message headers (mutable)
187
* @param contentType the content type to set
188
*/
189
public static void setContentType(MessageHeaders headers, MimeType contentType);
190
191
/**
192
* Check if content type indicates JSON format.
193
* @param contentType the content type to check
194
* @return true if JSON format
195
*/
196
public static boolean isJsonType(MimeType contentType);
197
198
/**
199
* Check if content type indicates text format.
200
* @param contentType the content type to check
201
* @return true if text format
202
*/
203
public static boolean isTextType(MimeType contentType);
204
205
/**
206
* Check if content type indicates binary format.
207
* @param contentType the content type to check
208
* @return true if binary format
209
*/
210
public static boolean isBinaryType(MimeType contentType);
211
212
/**
213
* Create MIME type with charset parameter.
214
* @param type the base MIME type
215
* @param charset the charset
216
* @return MIME type with charset
217
*/
218
public static MimeType createMimeTypeWithCharset(String type, Charset charset);
219
220
/**
221
* Extract Java class name from MIME type parameters.
222
* @param mimeType the MIME type
223
* @return Java class name, or null if not present
224
*/
225
public static String extractJavaClassFromMimeType(MimeType mimeType);
226
}
227
```
228
229
### Abstract Base Converter
230
231
Base class for implementing custom message converters.
232
233
```java { .api }
234
/**
235
* Abstract base class for Spring Cloud Stream message converters.
236
* Provides common functionality for content type handling and conversion.
237
*/
238
public abstract class AbstractStreamMessageConverter extends AbstractMessageConverter {
239
240
/**
241
* Create converter with supported MIME types.
242
* @param supportedMimeTypes the MIME types this converter supports
243
*/
244
protected AbstractStreamMessageConverter(MimeType... supportedMimeTypes);
245
246
/**
247
* Check if conversion is needed based on content types.
248
* @param sourceContentType the source content type
249
* @param targetContentType the target content type
250
* @return true if conversion is needed
251
*/
252
protected boolean isConversionNeeded(MimeType sourceContentType, MimeType targetContentType);
253
254
/**
255
* Get the default content type for this converter.
256
* @return the default content type
257
*/
258
protected abstract MimeType getDefaultContentType();
259
260
/**
261
* Perform the actual conversion from source to target format.
262
* @param payload the payload to convert
263
* @param sourceContentType the source content type
264
* @param targetContentType the target content type
265
* @return converted payload
266
*/
267
protected abstract Object doConvert(Object payload, MimeType sourceContentType, MimeType targetContentType);
268
}
269
```
270
271
### JSON Message Converter
272
273
Specialized converter for JSON message handling.
274
275
```java { .api }
276
/**
277
* Message converter for JSON format handling.
278
* Uses Jackson ObjectMapper for serialization/deserialization.
279
*/
280
public class JsonMessageConverter extends AbstractStreamMessageConverter implements BeanFactoryAware {
281
282
private ObjectMapper objectMapper;
283
private BeanFactory beanFactory;
284
285
/**
286
* Create JSON converter with default ObjectMapper.
287
*/
288
public JsonMessageConverter();
289
290
/**
291
* Create JSON converter with custom ObjectMapper.
292
* @param objectMapper the ObjectMapper to use
293
*/
294
public JsonMessageConverter(ObjectMapper objectMapper);
295
296
protected MimeType getDefaultContentType();
297
298
protected Object doConvert(Object payload, MimeType sourceContentType, MimeType targetContentType);
299
300
/**
301
* Convert object to JSON bytes.
302
* @param object the object to convert
303
* @return JSON byte array
304
* @throws ConversionException if conversion fails
305
*/
306
protected byte[] convertToJson(Object object);
307
308
/**
309
* Convert JSON bytes to object.
310
* @param jsonBytes the JSON byte array
311
* @param targetClass the target class
312
* @return deserialized object
313
* @throws ConversionException if conversion fails
314
*/
315
protected Object convertFromJson(byte[] jsonBytes, Class<?> targetClass);
316
317
public void setBeanFactory(BeanFactory beanFactory);
318
}
319
```
320
321
### Avro Message Converter
322
323
Specialized converter for Apache Avro message handling.
324
325
```java { .api }
326
/**
327
* Message converter for Apache Avro format.
328
* Handles Avro serialization and deserialization with schema support.
329
*/
330
public class AvroMessageConverter extends AbstractStreamMessageConverter implements InitializingBean {
331
332
private SchemaRegistryClient schemaRegistryClient;
333
private boolean useSchemaRegistry = true;
334
335
/**
336
* Create Avro converter.
337
*/
338
public AvroMessageConverter();
339
340
/**
341
* Create Avro converter with schema registry client.
342
* @param schemaRegistryClient the schema registry client
343
*/
344
public AvroMessageConverter(SchemaRegistryClient schemaRegistryClient);
345
346
protected MimeType getDefaultContentType();
347
348
protected Object doConvert(Object payload, MimeType sourceContentType, MimeType targetContentType);
349
350
/**
351
* Convert object to Avro bytes.
352
* @param object the object to convert
353
* @param schema the Avro schema
354
* @return Avro byte array
355
*/
356
protected byte[] convertToAvro(Object object, Schema schema);
357
358
/**
359
* Convert Avro bytes to object.
360
* @param avroBytes the Avro byte array
361
* @param schema the Avro schema
362
* @return deserialized object
363
*/
364
protected Object convertFromAvro(byte[] avroBytes, Schema schema);
365
366
public boolean isUseSchemaRegistry();
367
public void setUseSchemaRegistry(boolean useSchemaRegistry);
368
369
public SchemaRegistryClient getSchemaRegistryClient();
370
public void setSchemaRegistryClient(SchemaRegistryClient schemaRegistryClient);
371
372
public void afterPropertiesSet();
373
}
374
```
375
376
### Converter Configuration
377
378
Configuration classes for setting up message converters.
379
380
```java { .api }
381
/**
382
* Configuration for content type handling and message conversion.
383
*/
384
@Configuration
385
@ConditionalOnClass(MessageConverter.class)
386
public class MessageConverterConfiguration {
387
388
/**
389
* Create composite message converter factory.
390
* @return configured CompositeMessageConverterFactory
391
*/
392
@Bean
393
@ConditionalOnMissingBean
394
public CompositeMessageConverterFactory compositeMessageConverterFactory();
395
396
/**
397
* Create object string message converter.
398
* @return configured ObjectStringMessageConverter
399
*/
400
@Bean
401
@ConditionalOnMissingBean
402
public ObjectStringMessageConverter objectStringMessageConverter();
403
404
/**
405
* Create JSON message converter.
406
* @return configured JsonMessageConverter
407
*/
408
@Bean
409
@ConditionalOnMissingBean
410
@ConditionalOnClass(ObjectMapper.class)
411
public JsonMessageConverter jsonMessageConverter();
412
413
/**
414
* Create custom MIME type converter.
415
* @return configured CustomMimeTypeConverter
416
*/
417
@Bean
418
@ConditionalOnMissingBean
419
public CustomMimeTypeConverter customMimeTypeConverter();
420
421
/**
422
* Create Avro message converter.
423
* @return configured AvroMessageConverter
424
*/
425
@Bean
426
@ConditionalOnMissingBean
427
@ConditionalOnClass(name = "org.apache.avro.Schema")
428
public AvroMessageConverter avroMessageConverter();
429
}
430
```
431
432
### Converter Registry
433
434
Registry for managing and discovering message converters.
435
436
```java { .api }
437
/**
438
* Registry for managing message converters.
439
* Provides lookup and registration capabilities for different converter types.
440
*/
441
public class MessageConverterRegistry implements BeanFactoryAware, ApplicationContextAware {
442
443
private final Map<MimeType, MessageConverter> converters = new HashMap<>();
444
private BeanFactory beanFactory;
445
private ApplicationContext applicationContext;
446
447
/**
448
* Register a message converter for specific MIME types.
449
* @param converter the converter to register
450
* @param mimeTypes the MIME types this converter handles
451
*/
452
public void registerConverter(MessageConverter converter, MimeType... mimeTypes);
453
454
/**
455
* Find a converter for the specified MIME type.
456
* @param mimeType the MIME type
457
* @return the converter, or null if none found
458
*/
459
public MessageConverter findConverter(MimeType mimeType);
460
461
/**
462
* Get all registered converters.
463
* @return map of converters keyed by MIME type
464
*/
465
public Map<MimeType, MessageConverter> getAllConverters();
466
467
/**
468
* Check if a converter is registered for the MIME type.
469
* @param mimeType the MIME type to check
470
* @return true if converter exists
471
*/
472
public boolean hasConverter(MimeType mimeType);
473
474
/**
475
* Remove converter for the specified MIME type.
476
* @param mimeType the MIME type
477
* @return the removed converter, or null if none existed
478
*/
479
public MessageConverter removeConverter(MimeType mimeType);
480
481
public void setBeanFactory(BeanFactory beanFactory);
482
public void setApplicationContext(ApplicationContext applicationContext);
483
}
484
```
485
486
### Exception Classes
487
488
Exception classes for message conversion errors.
489
490
```java { .api }
491
/**
492
* Exception thrown during message conversion operations.
493
*/
494
public class ConversionException extends RuntimeException {
495
496
/**
497
* Create conversion exception with message.
498
* @param message the error message
499
*/
500
public ConversionException(String message);
501
502
/**
503
* Create conversion exception with message and cause.
504
* @param message the error message
505
* @param cause the underlying cause
506
*/
507
public ConversionException(String message, Throwable cause);
508
509
/**
510
* Create conversion exception with cause.
511
* @param cause the underlying cause
512
*/
513
public ConversionException(Throwable cause);
514
}
515
516
/**
517
* Exception thrown for invalid MIME type operations.
518
*/
519
public class InvalidMimeTypeException extends ConversionException {
520
521
private final String mimeType;
522
523
/**
524
* Create invalid MIME type exception.
525
* @param mimeType the invalid MIME type string
526
* @param message the error message
527
*/
528
public InvalidMimeTypeException(String mimeType, String message);
529
530
/**
531
* Get the invalid MIME type that caused this exception.
532
* @return the invalid MIME type string
533
*/
534
public String getMimeType();
535
}
536
```
537
538
**Usage Examples:**
539
540
```java
541
import org.springframework.cloud.stream.converter.*;
542
import org.springframework.messaging.converter.MessageConverter;
543
import org.springframework.messaging.support.MessageBuilder;
544
import org.springframework.util.MimeType;
545
546
// Using CompositeMessageConverterFactory
547
@Service
548
public class MessageProcessingService {
549
550
private final CompositeMessageConverterFactory converterFactory;
551
552
public MessageProcessingService(CompositeMessageConverterFactory converterFactory) {
553
this.converterFactory = converterFactory;
554
}
555
556
public void processMessage(Message<?> message) {
557
MessageConverter converter = converterFactory.getMessageConverterForAllRegistered();
558
559
// Convert to different types
560
String stringPayload = (String) converter.fromMessage(message, String.class);
561
Map<String, Object> mapPayload = (Map<String, Object>) converter.fromMessage(message, Map.class);
562
563
// Create new message with different content type
564
Message<String> newMessage = MessageBuilder
565
.withPayload(stringPayload)
566
.setHeader(MessageHeaders.CONTENT_TYPE, MimeType.valueOf("text/plain"))
567
.build();
568
}
569
}
570
571
// Custom message converter
572
@Component
573
public class CustomXmlMessageConverter extends AbstractStreamMessageConverter {
574
575
private final XmlMapper xmlMapper;
576
577
public CustomXmlMessageConverter() {
578
super(MimeType.valueOf("application/xml"), MimeType.valueOf("text/xml"));
579
this.xmlMapper = new XmlMapper();
580
}
581
582
@Override
583
protected MimeType getDefaultContentType() {
584
return MimeType.valueOf("application/xml");
585
}
586
587
@Override
588
protected Object doConvert(Object payload, MimeType sourceContentType, MimeType targetContentType) {
589
if (payload instanceof String) {
590
try {
591
return xmlMapper.readValue((String) payload, Object.class);
592
} catch (Exception e) {
593
throw new ConversionException("Failed to convert XML", e);
594
}
595
} else {
596
try {
597
return xmlMapper.writeValueAsString(payload);
598
} catch (Exception e) {
599
throw new ConversionException("Failed to convert to XML", e);
600
}
601
}
602
}
603
604
@Override
605
protected boolean supports(Class<?> clazz) {
606
return true; // Support all classes
607
}
608
}
609
610
// Configuration for custom converters
611
@Configuration
612
public class CustomConverterConfiguration {
613
614
@Bean
615
public CustomXmlMessageConverter xmlMessageConverter() {
616
return new CustomXmlMessageConverter();
617
}
618
619
@Bean
620
public MessageConverter protobufMessageConverter() {
621
return new ProtobufMessageConverter();
622
}
623
624
@Bean
625
public CompositeMessageConverterFactory customCompositeFactory(
626
List<MessageConverter> converters) {
627
CompositeMessageConverterFactory factory = new CompositeMessageConverterFactory();
628
converters.forEach(factory::addMessageConverter);
629
return factory;
630
}
631
}
632
633
// Content type handling
634
@Service
635
public class ContentTypeService {
636
637
public void handleDifferentContentTypes(Message<?> message) {
638
MimeType contentType = MessageConverterUtils.getContentType(message.getHeaders());
639
640
if (MessageConverterUtils.isJsonType(contentType)) {
641
// Handle JSON content
642
processJsonMessage(message);
643
} else if (MessageConverterUtils.isTextType(contentType)) {
644
// Handle text content
645
processTextMessage(message);
646
} else if (MessageConverterUtils.isBinaryType(contentType)) {
647
// Handle binary content
648
processBinaryMessage(message);
649
}
650
}
651
652
private void processJsonMessage(Message<?> message) {
653
// JSON-specific processing
654
ObjectMapper mapper = new ObjectMapper();
655
try {
656
Object jsonObject = mapper.readValue((String) message.getPayload(), Object.class);
657
// Process JSON object
658
} catch (Exception e) {
659
throw new ConversionException("Failed to process JSON message", e);
660
}
661
}
662
663
private void processTextMessage(Message<?> message) {
664
String text = (String) message.getPayload();
665
// Process text content
666
}
667
668
private void processBinaryMessage(Message<?> message) {
669
byte[] bytes = (byte[]) message.getPayload();
670
// Process binary content
671
}
672
}
673
674
// Using message converter registry
675
@Component
676
public class ConverterRegistryManager {
677
678
private final MessageConverterRegistry registry;
679
680
public ConverterRegistryManager(MessageConverterRegistry registry) {
681
this.registry = registry;
682
}
683
684
@PostConstruct
685
public void registerCustomConverters() {
686
// Register custom converters
687
registry.registerConverter(
688
new CustomXmlMessageConverter(),
689
MimeType.valueOf("application/xml"),
690
MimeType.valueOf("text/xml")
691
);
692
693
registry.registerConverter(
694
new CustomCsvMessageConverter(),
695
MimeType.valueOf("text/csv"),
696
MimeType.valueOf("application/csv")
697
);
698
}
699
700
public void convertMessage(Object payload, MimeType fromType, MimeType toType) {
701
MessageConverter fromConverter = registry.findConverter(fromType);
702
MessageConverter toConverter = registry.findConverter(toType);
703
704
if (fromConverter != null && toConverter != null) {
705
// Perform conversion
706
Message<?> message = MessageBuilder.withPayload(payload)
707
.setHeader(MessageHeaders.CONTENT_TYPE, fromType)
708
.build();
709
710
Object converted = fromConverter.fromMessage(message, Object.class);
711
Message<?> newMessage = toConverter.toMessage(converted, new MessageHeaders(Collections.emptyMap()));
712
}
713
}
714
}
715
716
// Error handling
717
@Component
718
public class MessageConversionErrorHandler {
719
720
public void handleConversionError(ConversionException e, Message<?> originalMessage) {
721
if (e instanceof InvalidMimeTypeException) {
722
InvalidMimeTypeException mimeException = (InvalidMimeTypeException) e;
723
logger.error("Invalid MIME type: {} for message: {}",
724
mimeException.getMimeType(), originalMessage);
725
} else {
726
logger.error("Conversion failed for message: {}", originalMessage, e);
727
}
728
729
// Send to error channel or dead letter queue
730
sendToErrorChannel(originalMessage, e);
731
}
732
733
private void sendToErrorChannel(Message<?> message, Exception error) {
734
Message<byte[]> errorMessage = MessageBuilder
735
.withPayload(message.getPayload().toString().getBytes())
736
.setHeader("x-exception-message", error.getMessage())
737
.setHeader("x-exception-stacktrace", getStackTrace(error))
738
.setHeader("x-original-message", message)
739
.build();
740
741
// Send to error handling destination
742
}
743
}
744
```