A framework for building message-driven microservice applications on Spring Boot with Spring Integration.
—
Spring Cloud Stream's message conversion framework handles different content types, MIME types, and serialization formats across various messaging systems. It provides automatic conversion between message formats and supports custom converters for specialized use cases.
Factory for creating and managing composite message converters that can handle multiple content types.
/**
* Factory for creating composite message converters.
* Manages multiple converters and selects appropriate ones based on content type.
*/
public class CompositeMessageConverterFactory implements BeanFactoryAware, InitializingBean {
private BeanFactory beanFactory;
private MessageConverter messageConverterForAllRegistered;
/**
* Get a message converter that supports all registered content types.
* @return composite message converter
*/
public MessageConverter getMessageConverterForAllRegistered();
/**
* Get a message converter for a specific MIME type.
* @param mimeType the target MIME type
* @return message converter for the specified type, or null if not supported
*/
public static MessageConverter getMessageConverterForType(MimeType mimeType);
/**
* Register a new message converter.
* @param messageConverter the converter to register
*/
public void addMessageConverter(MessageConverter messageConverter);
/**
* Get all registered message converters.
* @return collection of registered converters
*/
public Collection<MessageConverter> getMessageConverters();
public void setBeanFactory(BeanFactory beanFactory);
public void afterPropertiesSet();
}Converter that handles conversion between objects and string representations.
/**
* Message converter that converts between objects and strings.
* Supports various serialization formats and MIME types.
*/
public class ObjectStringMessageConverter extends AbstractMessageConverter implements BeanFactoryAware, BeanClassLoaderAware {
private BeanFactory beanFactory;
private ClassLoader beanClassLoader;
/**
* Create converter with default supported MIME types.
*/
public ObjectStringMessageConverter();
/**
* Create converter with custom MIME type.
* @param supportedMimeType the MIME type to support
*/
public ObjectStringMessageConverter(MimeType supportedMimeType);
/**
* Check if this converter supports the given class.
* @param clazz the class to check
* @return true if the class is supported
*/
protected boolean supports(Class<?> clazz);
/**
* Convert from internal message format to target class.
* @param message the source message
* @param targetClass the target class
* @param conversionHint optional conversion hint
* @return converted object
*/
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint);
/**
* Convert to internal message format.
* @param payload the payload to convert
* @param headers the message headers
* @param conversionHint optional conversion hint
* @return converted payload
*/
protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers, @Nullable Object conversionHint);
public void setBeanFactory(BeanFactory beanFactory);
public void setBeanClassLoader(ClassLoader classLoader);
}Converter for handling custom MIME type parsing and formatting.
/**
* Converter for custom MIME type handling.
* Implements Spring's Converter interface for MIME type conversion.
*/
public class CustomMimeTypeConverter implements Converter<String, MimeType> {
/**
* Convert string representation to MIME type.
* @param source the string representation of MIME type
* @return parsed MimeType object
* @throws InvalidMimeTypeException if the string is not a valid MIME type
*/
public MimeType convert(String source);
/**
* Check if the given string is a valid MIME type format.
* @param mimeTypeString the string to validate
* @return true if valid MIME type format
*/
public static boolean isValidMimeType(String mimeTypeString);
/**
* Parse MIME type with validation.
* @param mimeTypeString the MIME type string
* @return parsed MimeType
* @throws ConversionException if parsing fails
*/
public static MimeType parseMimeType(String mimeTypeString);
}Utility class for message converter operations and constants.
/**
* Utility class for message converter operations.
* Contains constants and helper methods for message conversion.
*/
public class MessageConverterUtils {
/**
* MIME type for Java objects.
*/
public static final String X_JAVA_OBJECT = "application/x-java-object";
/**
* MIME type for serialized Java objects.
*/
public static final String X_JAVA_SERIALIZED_OBJECT = "application/x-java-serialized-object";
/**
* Default JSON MIME type.
*/
public static final MimeType APPLICATION_JSON = MimeType.valueOf("application/json");
/**
* Default text MIME type.
*/
public static final MimeType TEXT_PLAIN = MimeType.valueOf("text/plain");
/**
* Header name for original content type.
*/
public static final String ORIGINAL_CONTENT_TYPE_HEADER = "originalContentType";
/**
* Get the content type from message headers.
* @param headers the message headers
* @return the content type, or null if not present
*/
public static MimeType getContentType(MessageHeaders headers);
/**
* Set content type in message headers.
* @param headers the message headers (mutable)
* @param contentType the content type to set
*/
public static void setContentType(MessageHeaders headers, MimeType contentType);
/**
* Check if content type indicates JSON format.
* @param contentType the content type to check
* @return true if JSON format
*/
public static boolean isJsonType(MimeType contentType);
/**
* Check if content type indicates text format.
* @param contentType the content type to check
* @return true if text format
*/
public static boolean isTextType(MimeType contentType);
/**
* Check if content type indicates binary format.
* @param contentType the content type to check
* @return true if binary format
*/
public static boolean isBinaryType(MimeType contentType);
/**
* Create MIME type with charset parameter.
* @param type the base MIME type
* @param charset the charset
* @return MIME type with charset
*/
public static MimeType createMimeTypeWithCharset(String type, Charset charset);
/**
* Extract Java class name from MIME type parameters.
* @param mimeType the MIME type
* @return Java class name, or null if not present
*/
public static String extractJavaClassFromMimeType(MimeType mimeType);
}Base class for implementing custom message converters.
/**
* Abstract base class for Spring Cloud Stream message converters.
* Provides common functionality for content type handling and conversion.
*/
public abstract class AbstractStreamMessageConverter extends AbstractMessageConverter {
/**
* Create converter with supported MIME types.
* @param supportedMimeTypes the MIME types this converter supports
*/
protected AbstractStreamMessageConverter(MimeType... supportedMimeTypes);
/**
* Check if conversion is needed based on content types.
* @param sourceContentType the source content type
* @param targetContentType the target content type
* @return true if conversion is needed
*/
protected boolean isConversionNeeded(MimeType sourceContentType, MimeType targetContentType);
/**
* Get the default content type for this converter.
* @return the default content type
*/
protected abstract MimeType getDefaultContentType();
/**
* Perform the actual conversion from source to target format.
* @param payload the payload to convert
* @param sourceContentType the source content type
* @param targetContentType the target content type
* @return converted payload
*/
protected abstract Object doConvert(Object payload, MimeType sourceContentType, MimeType targetContentType);
}Specialized converter for JSON message handling.
/**
* Message converter for JSON format handling.
* Uses Jackson ObjectMapper for serialization/deserialization.
*/
public class JsonMessageConverter extends AbstractStreamMessageConverter implements BeanFactoryAware {
private ObjectMapper objectMapper;
private BeanFactory beanFactory;
/**
* Create JSON converter with default ObjectMapper.
*/
public JsonMessageConverter();
/**
* Create JSON converter with custom ObjectMapper.
* @param objectMapper the ObjectMapper to use
*/
public JsonMessageConverter(ObjectMapper objectMapper);
protected MimeType getDefaultContentType();
protected Object doConvert(Object payload, MimeType sourceContentType, MimeType targetContentType);
/**
* Convert object to JSON bytes.
* @param object the object to convert
* @return JSON byte array
* @throws ConversionException if conversion fails
*/
protected byte[] convertToJson(Object object);
/**
* Convert JSON bytes to object.
* @param jsonBytes the JSON byte array
* @param targetClass the target class
* @return deserialized object
* @throws ConversionException if conversion fails
*/
protected Object convertFromJson(byte[] jsonBytes, Class<?> targetClass);
public void setBeanFactory(BeanFactory beanFactory);
}Specialized converter for Apache Avro message handling.
/**
* Message converter for Apache Avro format.
* Handles Avro serialization and deserialization with schema support.
*/
public class AvroMessageConverter extends AbstractStreamMessageConverter implements InitializingBean {
private SchemaRegistryClient schemaRegistryClient;
private boolean useSchemaRegistry = true;
/**
* Create Avro converter.
*/
public AvroMessageConverter();
/**
* Create Avro converter with schema registry client.
* @param schemaRegistryClient the schema registry client
*/
public AvroMessageConverter(SchemaRegistryClient schemaRegistryClient);
protected MimeType getDefaultContentType();
protected Object doConvert(Object payload, MimeType sourceContentType, MimeType targetContentType);
/**
* Convert object to Avro bytes.
* @param object the object to convert
* @param schema the Avro schema
* @return Avro byte array
*/
protected byte[] convertToAvro(Object object, Schema schema);
/**
* Convert Avro bytes to object.
* @param avroBytes the Avro byte array
* @param schema the Avro schema
* @return deserialized object
*/
protected Object convertFromAvro(byte[] avroBytes, Schema schema);
public boolean isUseSchemaRegistry();
public void setUseSchemaRegistry(boolean useSchemaRegistry);
public SchemaRegistryClient getSchemaRegistryClient();
public void setSchemaRegistryClient(SchemaRegistryClient schemaRegistryClient);
public void afterPropertiesSet();
}Configuration classes for setting up message converters.
/**
* Configuration for content type handling and message conversion.
*/
@Configuration
@ConditionalOnClass(MessageConverter.class)
public class MessageConverterConfiguration {
/**
* Create composite message converter factory.
* @return configured CompositeMessageConverterFactory
*/
@Bean
@ConditionalOnMissingBean
public CompositeMessageConverterFactory compositeMessageConverterFactory();
/**
* Create object string message converter.
* @return configured ObjectStringMessageConverter
*/
@Bean
@ConditionalOnMissingBean
public ObjectStringMessageConverter objectStringMessageConverter();
/**
* Create JSON message converter.
* @return configured JsonMessageConverter
*/
@Bean
@ConditionalOnMissingBean
@ConditionalOnClass(ObjectMapper.class)
public JsonMessageConverter jsonMessageConverter();
/**
* Create custom MIME type converter.
* @return configured CustomMimeTypeConverter
*/
@Bean
@ConditionalOnMissingBean
public CustomMimeTypeConverter customMimeTypeConverter();
/**
* Create Avro message converter.
* @return configured AvroMessageConverter
*/
@Bean
@ConditionalOnMissingBean
@ConditionalOnClass(name = "org.apache.avro.Schema")
public AvroMessageConverter avroMessageConverter();
}Registry for managing and discovering message converters.
/**
* Registry for managing message converters.
* Provides lookup and registration capabilities for different converter types.
*/
public class MessageConverterRegistry implements BeanFactoryAware, ApplicationContextAware {
private final Map<MimeType, MessageConverter> converters = new HashMap<>();
private BeanFactory beanFactory;
private ApplicationContext applicationContext;
/**
* Register a message converter for specific MIME types.
* @param converter the converter to register
* @param mimeTypes the MIME types this converter handles
*/
public void registerConverter(MessageConverter converter, MimeType... mimeTypes);
/**
* Find a converter for the specified MIME type.
* @param mimeType the MIME type
* @return the converter, or null if none found
*/
public MessageConverter findConverter(MimeType mimeType);
/**
* Get all registered converters.
* @return map of converters keyed by MIME type
*/
public Map<MimeType, MessageConverter> getAllConverters();
/**
* Check if a converter is registered for the MIME type.
* @param mimeType the MIME type to check
* @return true if converter exists
*/
public boolean hasConverter(MimeType mimeType);
/**
* Remove converter for the specified MIME type.
* @param mimeType the MIME type
* @return the removed converter, or null if none existed
*/
public MessageConverter removeConverter(MimeType mimeType);
public void setBeanFactory(BeanFactory beanFactory);
public void setApplicationContext(ApplicationContext applicationContext);
}Exception classes for message conversion errors.
/**
* Exception thrown during message conversion operations.
*/
public class ConversionException extends RuntimeException {
/**
* Create conversion exception with message.
* @param message the error message
*/
public ConversionException(String message);
/**
* Create conversion exception with message and cause.
* @param message the error message
* @param cause the underlying cause
*/
public ConversionException(String message, Throwable cause);
/**
* Create conversion exception with cause.
* @param cause the underlying cause
*/
public ConversionException(Throwable cause);
}
/**
* Exception thrown for invalid MIME type operations.
*/
public class InvalidMimeTypeException extends ConversionException {
private final String mimeType;
/**
* Create invalid MIME type exception.
* @param mimeType the invalid MIME type string
* @param message the error message
*/
public InvalidMimeTypeException(String mimeType, String message);
/**
* Get the invalid MIME type that caused this exception.
* @return the invalid MIME type string
*/
public String getMimeType();
}Usage Examples:
import org.springframework.cloud.stream.converter.*;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeType;
// Using CompositeMessageConverterFactory
@Service
public class MessageProcessingService {
private final CompositeMessageConverterFactory converterFactory;
public MessageProcessingService(CompositeMessageConverterFactory converterFactory) {
this.converterFactory = converterFactory;
}
public void processMessage(Message<?> message) {
MessageConverter converter = converterFactory.getMessageConverterForAllRegistered();
// Convert to different types
String stringPayload = (String) converter.fromMessage(message, String.class);
Map<String, Object> mapPayload = (Map<String, Object>) converter.fromMessage(message, Map.class);
// Create new message with different content type
Message<String> newMessage = MessageBuilder
.withPayload(stringPayload)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeType.valueOf("text/plain"))
.build();
}
}
// Custom message converter
@Component
public class CustomXmlMessageConverter extends AbstractStreamMessageConverter {
private final XmlMapper xmlMapper;
public CustomXmlMessageConverter() {
super(MimeType.valueOf("application/xml"), MimeType.valueOf("text/xml"));
this.xmlMapper = new XmlMapper();
}
@Override
protected MimeType getDefaultContentType() {
return MimeType.valueOf("application/xml");
}
@Override
protected Object doConvert(Object payload, MimeType sourceContentType, MimeType targetContentType) {
if (payload instanceof String) {
try {
return xmlMapper.readValue((String) payload, Object.class);
} catch (Exception e) {
throw new ConversionException("Failed to convert XML", e);
}
} else {
try {
return xmlMapper.writeValueAsString(payload);
} catch (Exception e) {
throw new ConversionException("Failed to convert to XML", e);
}
}
}
@Override
protected boolean supports(Class<?> clazz) {
return true; // Support all classes
}
}
// Configuration for custom converters
@Configuration
public class CustomConverterConfiguration {
@Bean
public CustomXmlMessageConverter xmlMessageConverter() {
return new CustomXmlMessageConverter();
}
@Bean
public MessageConverter protobufMessageConverter() {
return new ProtobufMessageConverter();
}
@Bean
public CompositeMessageConverterFactory customCompositeFactory(
List<MessageConverter> converters) {
CompositeMessageConverterFactory factory = new CompositeMessageConverterFactory();
converters.forEach(factory::addMessageConverter);
return factory;
}
}
// Content type handling
@Service
public class ContentTypeService {
public void handleDifferentContentTypes(Message<?> message) {
MimeType contentType = MessageConverterUtils.getContentType(message.getHeaders());
if (MessageConverterUtils.isJsonType(contentType)) {
// Handle JSON content
processJsonMessage(message);
} else if (MessageConverterUtils.isTextType(contentType)) {
// Handle text content
processTextMessage(message);
} else if (MessageConverterUtils.isBinaryType(contentType)) {
// Handle binary content
processBinaryMessage(message);
}
}
private void processJsonMessage(Message<?> message) {
// JSON-specific processing
ObjectMapper mapper = new ObjectMapper();
try {
Object jsonObject = mapper.readValue((String) message.getPayload(), Object.class);
// Process JSON object
} catch (Exception e) {
throw new ConversionException("Failed to process JSON message", e);
}
}
private void processTextMessage(Message<?> message) {
String text = (String) message.getPayload();
// Process text content
}
private void processBinaryMessage(Message<?> message) {
byte[] bytes = (byte[]) message.getPayload();
// Process binary content
}
}
// Using message converter registry
@Component
public class ConverterRegistryManager {
private final MessageConverterRegistry registry;
public ConverterRegistryManager(MessageConverterRegistry registry) {
this.registry = registry;
}
@PostConstruct
public void registerCustomConverters() {
// Register custom converters
registry.registerConverter(
new CustomXmlMessageConverter(),
MimeType.valueOf("application/xml"),
MimeType.valueOf("text/xml")
);
registry.registerConverter(
new CustomCsvMessageConverter(),
MimeType.valueOf("text/csv"),
MimeType.valueOf("application/csv")
);
}
public void convertMessage(Object payload, MimeType fromType, MimeType toType) {
MessageConverter fromConverter = registry.findConverter(fromType);
MessageConverter toConverter = registry.findConverter(toType);
if (fromConverter != null && toConverter != null) {
// Perform conversion
Message<?> message = MessageBuilder.withPayload(payload)
.setHeader(MessageHeaders.CONTENT_TYPE, fromType)
.build();
Object converted = fromConverter.fromMessage(message, Object.class);
Message<?> newMessage = toConverter.toMessage(converted, new MessageHeaders(Collections.emptyMap()));
}
}
}
// Error handling
@Component
public class MessageConversionErrorHandler {
public void handleConversionError(ConversionException e, Message<?> originalMessage) {
if (e instanceof InvalidMimeTypeException) {
InvalidMimeTypeException mimeException = (InvalidMimeTypeException) e;
logger.error("Invalid MIME type: {} for message: {}",
mimeException.getMimeType(), originalMessage);
} else {
logger.error("Conversion failed for message: {}", originalMessage, e);
}
// Send to error channel or dead letter queue
sendToErrorChannel(originalMessage, e);
}
private void sendToErrorChannel(Message<?> message, Exception error) {
Message<byte[]> errorMessage = MessageBuilder
.withPayload(message.getPayload().toString().getBytes())
.setHeader("x-exception-message", error.getMessage())
.setHeader("x-exception-stacktrace", getStackTrace(error))
.setHeader("x-original-message", message)
.build();
// Send to error handling destination
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-springframework-cloud--spring-cloud-stream