Comprehensive application framework and inversion of control container for the Java platform providing dependency injection, AOP, data access, transaction management, and web framework capabilities
Spring provides comprehensive support for message-driven applications through the Spring Messaging abstraction and JMS (Java Message Service) integration. This includes support for both synchronous and asynchronous messaging patterns with various message brokers.
<!-- Spring Messaging (Core abstractions) -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>5.3.39</version>
</dependency>
<!-- Spring JMS -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.3.39</version>
</dependency>
<!-- JMS API -->
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0.1</version>
</dependency>
<!-- ActiveMQ (example JMS broker) -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<version>5.17.6</version>
</dependency>
<!-- RabbitMQ support -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.4.16</version>
</dependency>// Core messaging abstractions
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.SubscribableChannel;
// Message handling
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
// JMS Core
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.MessagePostProcessor;
import org.springframework.jms.core.SessionCallback;
// JMS Annotations
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.annotation.EnableJms;
// JMS Configuration
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
// Connection Factory
import javax.jms.ConnectionFactory;
import org.springframework.jms.connection.CachingConnectionFactory;
// Message conversion
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;// Generic message interface
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}
// Message headers container
public final class MessageHeaders implements Map<String, Object>, Serializable {
public static final String ID = "id";
public static final String TIMESTAMP = "timestamp";
public static final String CONTENT_TYPE = "contentType";
public static final String REPLY_CHANNEL = "replyChannel";
public static final String ERROR_CHANNEL = "errorChannel";
public MessageHeaders(Map<String, Object> headers);
public MessageHeaders(Map<String, Object> headers, UUID id, Long timestamp);
public UUID getId();
public Long getTimestamp();
public Object getReplyChannel();
public Object getErrorChannel();
@Override
public Object get(Object key);
@Override
public boolean containsKey(Object key);
public <T> T get(Object key, Class<T> type);
}
// Message builder
public final class MessageBuilder<T> {
public static <T> MessageBuilder<T> withPayload(T payload);
public static <T> MessageBuilder<T> fromMessage(Message<T> message);
public MessageBuilder<T> setHeader(String headerName, Object headerValue);
public MessageBuilder<T> setHeaderIfAbsent(String headerName, Object headerValue);
public MessageBuilder<T> removeHeaders(String... headerPatterns);
public MessageBuilder<T> removeHeader(String headerName);
public MessageBuilder<T> copyHeaders(Map<String, ?> headersToCopy);
public MessageBuilder<T> copyHeadersIfAbsent(Map<String, ?> headersToCopy);
public Message<T> build();
}// Base interface for message channels
public interface MessageChannel {
long INDEFINITE_TIMEOUT = -1;
default boolean send(Message<?> message) {
return send(message, INDEFINITE_TIMEOUT);
}
boolean send(Message<?> message, long timeout);
}
// Channel that can be polled for messages
public interface PollableChannel extends MessageChannel {
Message<?> receive();
Message<?> receive(long timeout);
}
// Channel that supports message handler subscription
public interface SubscribableChannel extends MessageChannel {
boolean subscribe(MessageHandler handler);
boolean unsubscribe(MessageHandler handler);
}
// Exception thrown on messaging errors
public class MessagingException extends RuntimeException {
public MessagingException(String description);
public MessagingException(String description, Throwable cause);
public MessagingException(Message<?> failedMessage, String description);
public MessagingException(Message<?> failedMessage, String description, Throwable cause);
public Message<?> getFailedMessage();
}// Interface for handling messages
@FunctionalInterface
public interface MessageHandler {
void handleMessage(Message<?> message) throws MessagingException;
}
// Annotation for mapping messages to handler methods
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MessageMapping {
@AliasFor("value")
String[] destination() default {};
@AliasFor("destination")
String[] value() default {};
}
// Annotation to bind method parameter to message payload
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Payload {
String value() default "";
boolean required() default true;
}
// Annotation to bind method parameter to header value
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Header {
@AliasFor("name")
String value() default "";
@AliasFor("value")
String name() default "";
boolean required() default true;
String defaultValue() default ValueConstants.DEFAULT_NONE;
}
// Annotation to bind method parameter to all message headers
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Headers {
}// Central class for JMS operations
public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations {
// Send operations
public void send(String destinationName, MessageCreator messageCreator) throws JmsException;
public void send(Destination destination, MessageCreator messageCreator) throws JmsException;
public void send(MessageCreator messageCreator) throws JmsException;
// Convert and send operations
public void convertAndSend(String destinationName, Object message) throws JmsException;
public void convertAndSend(Destination destination, Object message) throws JmsException;
public void convertAndSend(Object message) throws JmsException;
public void convertAndSend(String destinationName, Object message, MessagePostProcessor postProcessor) throws JmsException;
// Receive operations
public Message receive() throws JmsException;
public Message receive(String destinationName) throws JmsException;
public Message receive(Destination destination) throws JmsException;
// Receive and convert operations
public Object receiveAndConvert() throws JmsException;
public Object receiveAndConvert(String destinationName) throws JmsException;
public Object receiveAndConvert(Destination destination) throws JmsException;
// Browse operations
public <T> T browse(String queueName, BrowserCallback<T> action) throws JmsException;
public <T> T browse(Queue queue, BrowserCallback<T> action) throws JmsException;
// Execute operations
public <T> T execute(SessionCallback<T> action) throws JmsException;
public <T> T execute(ProducerCallback<T> action) throws JmsException;
// Configuration
public void setConnectionFactory(ConnectionFactory connectionFactory);
public void setDefaultDestinationName(String defaultDestinationName);
public void setDefaultDestination(Destination defaultDestination);
public void setMessageConverter(MessageConverter messageConverter);
public void setPubSubDomain(boolean pubSubDomain);
public void setReceiveTimeout(long receiveTimeout);
public void setDeliveryMode(int deliveryMode);
public void setPriority(int priority);
public void setTimeToLive(long timeToLive);
}
// Interface for creating JMS messages
@FunctionalInterface
public interface MessageCreator {
Message createMessage(Session session) throws JMSException;
}
// Interface for post-processing messages
@FunctionalInterface
public interface MessagePostProcessor {
Message postProcessMessage(Message message) throws JMSException;
}
// Callback interface for JMS Session operations
@FunctionalInterface
public interface SessionCallback<T> {
T doInJms(Session session) throws JMSException;
}// Annotation to mark a method as a JMS message listener
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(JmsListeners.class)
public @interface JmsListener {
String id() default "";
String containerFactory() default "";
@AliasFor("destination")
String[] value() default {};
@AliasFor("value")
String[] destination() default {};
String subscription() default "";
String selector() default "";
String concurrency() default "";
}
// Enable JMS listener annotated endpoints
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(JmsBootstrapConfiguration.class)
public @interface EnableJms {
}
// Annotation to send a message as reply
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface SendTo {
@AliasFor("destinations")
String[] value() default {};
@AliasFor("value")
String[] destinations() default {};
}// Factory for creating JMS listener containers
public interface JmsListenerContainerFactory<C extends MessageListenerContainer> {
C createListenerContainer(JmsListenerEndpoint endpoint);
}
// Default implementation of JmsListenerContainerFactory
public class DefaultJmsListenerContainerFactory
implements JmsListenerContainerFactory<DefaultMessageListenerContainer>, BeanNameAware, InitializingBean {
public void setConnectionFactory(ConnectionFactory connectionFactory);
public void setDestinationResolver(DestinationResolver destinationResolver);
public void setMessageConverter(MessageConverter messageConverter);
public void setPubSubDomain(Boolean pubSubDomain);
public void setSubscriptionDurable(Boolean subscriptionDurable);
public void setClientId(String clientId);
public void setConcurrency(String concurrency);
public void setMaxConcurrency(Integer maxConcurrency);
public void setCacheLevel(Integer cacheLevel);
public void setReceiveTimeout(Long receiveTimeout);
public void setAutoStartup(Boolean autoStartup);
public void setPhase(Integer phase);
@Override
public DefaultMessageListenerContainer createListenerContainer(JmsListenerEndpoint endpoint);
}
// Message listener container for JMS
public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer
implements BeanNameAware, DisposableBean {
// Concurrency settings
public void setConcurrentConsumers(int concurrentConsumers);
public void setMaxConcurrentConsumers(int maxConcurrentConsumers);
public void setMaxMessagesPerTask(int maxMessagesPerTask);
// Connection settings
public void setCacheLevelName(String constantName) throws IllegalArgumentException;
public void setCacheLevel(int cacheLevel);
// Recovery settings
public void setRecoveryInterval(long recoveryInterval);
public void setBackOffMultiplier(double backOffMultiplier);
public void setMaxRecoveryTime(long maxRecoveryTime);
// Transaction settings
public void setTransactionManager(PlatformTransactionManager transactionManager);
public void setTransactionName(String transactionName);
public void setTransactionTimeout(int transactionTimeout);
}// Strategy interface for converting between Java objects and JMS messages
public interface MessageConverter {
Message toMessage(Object object, Session session) throws JMSException, MessageConversionException;
Object fromMessage(Message message) throws JMSException, MessageConversionException;
}
// Base implementation of MessageConverter
public abstract class MessageConverterSupport implements MessageConverter {
public static final String TYPE_ID_PROPERTY = "__TypeId__";
public static final String CONTENT_TYPE_PROPERTY = "__ContentTypeId__";
public static final String KEY_TYPE_ID_PROPERTY = "__KeyTypeId__";
protected abstract Message createMessageForByteArray(byte[] bytes, Session session) throws JMSException;
protected abstract Message createMessageForString(String string, Session session) throws JMSException;
protected abstract Message createMessageForMap(Map<String, Object> map, Session session) throws JMSException;
protected abstract Message createMessageForSerializable(Serializable object, Session session) throws JMSException;
protected abstract byte[] extractByteArrayFromMessage(Message message) throws JMSException;
protected abstract String extractStringFromMessage(Message message) throws JMSException;
protected abstract Map<String, Object> extractMapFromMessage(Message message) throws JMSException;
protected abstract Serializable extractSerializableFromMessage(Message message) throws JMSException;
}
// JSON message converter using Jackson
public class MappingJackson2MessageConverter extends MessageConverterSupport {
public MappingJackson2MessageConverter();
public void setObjectMapper(ObjectMapper objectMapper);
public void setTargetType(MessageType targetType);
public void setTypeIdPropertyName(String typeIdPropertyName);
public void setTypeIdMappings(Map<String, Class<?>> typeIdMappings);
// Message type enum
public enum MessageType {
BYTES, TEXT
}
}@Configuration
@EnableJms
public class JmsConfig {
@Bean
public ConnectionFactory connectionFactory() {
// Using ActiveMQ
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://localhost:61616");
connectionFactory.setUserName("admin");
connectionFactory.setPassword("admin");
// Wrap with caching connection factory for better performance
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory);
cachingConnectionFactory.setCacheConsumers(true);
cachingConnectionFactory.setCacheProducers(true);
cachingConnectionFactory.setSessionCacheSize(10);
return cachingConnectionFactory;
}
@Bean
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.setMessageConverter(messageConverter());
jmsTemplate.setReceiveTimeout(5000); // 5 seconds
jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
jmsTemplate.setPriority(Message.DEFAULT_PRIORITY);
jmsTemplate.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
return jmsTemplate;
}
@Bean
public MessageConverter messageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
// Configure type mappings
Map<String, Class<?>> typeIdMappings = new HashMap<>();
typeIdMappings.put("user", UserMessage.class);
typeIdMappings.put("order", OrderMessage.class);
converter.setTypeIdMappings(typeIdMappings);
return converter;
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(messageConverter());
factory.setConcurrency("3-10"); // Min 3, max 10 consumers
factory.setReceiveTimeout(5000L);
factory.setAutoStartup(true);
// Error handling
factory.setErrorHandler(t -> {
System.err.println("Error in JMS listener: " + t.getMessage());
// Log error or send to dead letter queue
});
return factory;
}
// Topic listener factory for pub/sub
@Bean
public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(messageConverter());
factory.setPubSubDomain(true); // Enable topic mode
factory.setSubscriptionDurable(true);
factory.setClientId("myapp-client");
return factory;
}
// Dead letter queue configuration
@Bean
public Queue deadLetterQueue() {
return new ActiveMQQueue("DLQ.MyApp");
}
}@Service
public class MessageProducerService {
private final JmsTemplate jmsTemplate;
public MessageProducerService(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
// Simple message sending
public void sendMessage(String destination, Object message) {
jmsTemplate.convertAndSend(destination, message);
}
// Send message with headers
public void sendMessageWithHeaders(String destination, Object message, Map<String, Object> headers) {
jmsTemplate.convertAndSend(destination, message, messagePostProcessor -> {
headers.forEach((key, value) -> {
try {
messagePostProcessor.setObjectProperty(key, value);
} catch (JMSException e) {
throw new RuntimeException("Failed to set header: " + key, e);
}
});
return messagePostProcessor;
});
}
// Send user notification
public void sendUserNotification(Long userId, String notificationType, String content) {
UserNotificationMessage notification = UserNotificationMessage.builder()
.userId(userId)
.type(notificationType)
.content(content)
.timestamp(LocalDateTime.now())
.build();
Map<String, Object> headers = new HashMap<>();
headers.put("userId", userId);
headers.put("notificationType", notificationType);
headers.put("priority", "HIGH");
sendMessageWithHeaders("user.notifications", notification, headers);
}
// Send order events
public void sendOrderCreatedEvent(Order order) {
OrderCreatedEvent event = OrderCreatedEvent.builder()
.orderId(order.getId())
.userId(order.getUserId())
.totalAmount(order.getTotalAmount())
.items(order.getItems())
.timestamp(LocalDateTime.now())
.build();
// Send to multiple destinations
jmsTemplate.convertAndSend("order.created", event);
jmsTemplate.convertAndSend("audit.events", event);
// Send to topic for pub/sub
jmsTemplate.setPubSubDomain(true);
jmsTemplate.convertAndSend("order.events", event);
jmsTemplate.setPubSubDomain(false); // Reset to queue mode
}
// Request-reply pattern
public String sendRequestReply(String destination, Object request) {
return (String) jmsTemplate.sendAndReceive(destination, session -> {
ObjectMessage message = session.createObjectMessage((Serializable) request);
message.setJMSReplyTo(session.createTemporaryQueue());
return message;
});
}
// Delayed message sending
public void sendDelayedMessage(String destination, Object message, Duration delay) {
jmsTemplate.convertAndSend(destination, message, messagePostProcessor -> {
try {
// ActiveMQ specific - set delivery delay
messagePostProcessor.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,
delay.toMillis());
} catch (JMSException e) {
throw new RuntimeException("Failed to set delay", e);
}
return messagePostProcessor;
});
}
// Priority message sending
public void sendPriorityMessage(String destination, Object message, int priority) {
jmsTemplate.convertAndSend(destination, message, messagePostProcessor -> {
try {
messagePostProcessor.setJMSPriority(priority);
} catch (JMSException e) {
throw new RuntimeException("Failed to set priority", e);
}
return messagePostProcessor;
});
}
// Transactional message sending
@Transactional
public void sendTransactionalMessages(List<MessageRequest> requests) {
for (MessageRequest request : requests) {
jmsTemplate.convertAndSend(request.getDestination(), request.getPayload());
}
// If any exception occurs, all messages will be rolled back
if (requests.size() > 10) {
throw new IllegalArgumentException("Too many messages in single transaction");
}
}
}@Component
public class MessageListenerService {
private static final Logger logger = LoggerFactory.getLogger(MessageListenerService.class);
private final UserService userService;
private final EmailService emailService;
private final AuditService auditService;
public MessageListenerService(UserService userService, EmailService emailService, AuditService auditService) {
this.userService = userService;
this.emailService = emailService;
this.auditService = auditService;
}
// Simple message listener
@JmsListener(destination = "user.notifications")
public void handleUserNotification(UserNotificationMessage notification) {
logger.info("Received user notification: {}", notification);
try {
User user = userService.findById(notification.getUserId());
emailService.sendNotificationEmail(user, notification.getContent());
} catch (Exception e) {
logger.error("Failed to process user notification", e);
throw new RuntimeException("Notification processing failed", e);
}
}
// Listener with message headers
@JmsListener(destination = "order.created")
public void handleOrderCreated(
@Payload OrderCreatedEvent event,
@Header("userId") Long userId,
@Header(value = "priority", defaultValue = "NORMAL") String priority) {
logger.info("Processing order created event for user {}: {}", userId, event);
// Process based on priority
if ("HIGH".equals(priority)) {
processHighPriorityOrder(event);
} else {
processNormalOrder(event);
}
}
// Listener with all headers
@JmsListener(destination = "audit.events")
public void handleAuditEvent(
@Payload Object event,
@Headers Map<String, Object> headers) {
AuditRecord audit = AuditRecord.builder()
.eventType(event.getClass().getSimpleName())
.payload(event)
.headers(headers)
.timestamp(LocalDateTime.now())
.build();
auditService.saveAuditRecord(audit);
}
// Topic listener with durable subscription
@JmsListener(
destination = "order.events",
containerFactory = "topicListenerFactory",
subscription = "orderEventSubscription"
)
public void handleOrderEvents(OrderEvent event) {
logger.info("Received order event: {}", event);
switch (event.getEventType()) {
case "ORDER_CREATED":
handleOrderCreatedFromTopic((OrderCreatedEvent) event);
break;
case "ORDER_UPDATED":
handleOrderUpdated((OrderUpdatedEvent) event);
break;
case "ORDER_CANCELLED":
handleOrderCancelled((OrderCancelledEvent) event);
break;
default:
logger.warn("Unknown order event type: {}", event.getEventType());
}
}
// Listener with message selector
@JmsListener(
destination = "notifications",
selector = "notificationType = 'EMAIL' OR priority = 'HIGH'"
)
public void handleHighPriorityNotifications(NotificationMessage notification) {
logger.info("Processing high priority notification: {}", notification);
// Process urgent notifications immediately
processUrgentNotification(notification);
}
// Listener with concurrency control
@JmsListener(
destination = "file.processing",
concurrency = "2-5" // Min 2, max 5 concurrent listeners
)
public void handleFileProcessing(FileProcessingRequest request) {
logger.info("Processing file: {}", request.getFilename());
try {
// Simulate file processing
Thread.sleep(5000);
processFile(request);
logger.info("File processing completed: {}", request.getFilename());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("File processing interrupted", e);
}
}
// Error handling listener
@JmsListener(destination = "error.queue")
public void handleErrors(
Message failedMessage,
@Header(JmsHeaders.DELIVERY_COUNT) int deliveryCount) {
logger.error("Processing failed message (attempt {}): {}", deliveryCount, failedMessage);
if (deliveryCount >= 3) {
// Move to dead letter queue after 3 attempts
sendToDeadLetterQueue(failedMessage);
} else {
// Retry processing
retryProcessing(failedMessage);
}
}
// Reply listener for request-response pattern
@JmsListener(destination = "user.lookup.request")
@SendTo("user.lookup.response")
public UserResponse handleUserLookupRequest(UserLookupRequest request) {
logger.info("Looking up user: {}", request.getUserId());
User user = userService.findById(request.getUserId());
return UserResponse.builder()
.userId(user.getId())
.username(user.getUsername())
.email(user.getEmail())
.found(true)
.build();
}
// Conditional listener (only active in certain profiles)
@JmsListener(
destination = "development.debug",
condition = "#{environment.acceptsProfiles('development')}"
)
public void handleDebugMessages(DebugMessage message) {
logger.debug("Debug message: {}", message);
// Only process in development environment
}
private void processHighPriorityOrder(OrderCreatedEvent event) {
// Expedited processing for VIP customers
logger.info("Processing high priority order: {}", event.getOrderId());
}
private void processNormalOrder(OrderCreatedEvent event) {
// Standard order processing
logger.info("Processing normal order: {}", event.getOrderId());
}
private void handleOrderCreatedFromTopic(OrderCreatedEvent event) {
// Update inventory, send confirmation email, etc.
logger.info("Handling order created from topic: {}", event.getOrderId());
}
private void handleOrderUpdated(OrderUpdatedEvent event) {
logger.info("Handling order updated: {}", event.getOrderId());
}
private void handleOrderCancelled(OrderCancelledEvent event) {
logger.info("Handling order cancelled: {}", event.getOrderId());
}
private void processUrgentNotification(NotificationMessage notification) {
// Immediate processing for urgent notifications
logger.info("Processing urgent notification: {}", notification.getId());
}
private void processFile(FileProcessingRequest request) {
// File processing logic
logger.info("Processing file: {}", request.getFilename());
}
private void sendToDeadLetterQueue(Message message) {
// Send to DLQ for manual inspection
logger.error("Sending message to dead letter queue: {}", message);
}
private void retryProcessing(Message message) {
// Retry logic
logger.info("Retrying message processing: {}", message);
}
}// Custom message converter
@Component
public class CustomMessageConverter implements MessageConverter {
private final ObjectMapper objectMapper;
public CustomMessageConverter(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
@Override
public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {
try {
if (object instanceof String) {
return session.createTextMessage((String) object);
} else if (object instanceof byte[]) {
BytesMessage message = session.createBytesMessage();
message.writeBytes((byte[]) object);
return message;
} else {
// Convert to JSON
String json = objectMapper.writeValueAsString(object);
TextMessage message = session.createTextMessage(json);
message.setStringProperty("_type", object.getClass().getName());
return message;
}
} catch (Exception e) {
throw new MessageConversionException("Failed to convert message", e);
}
}
@Override
public Object fromMessage(Message message) throws JMSException, MessageConversionException {
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
// Check if it has type information
String typeProperty = message.getStringProperty("_type");
if (typeProperty != null) {
Class<?> targetClass = Class.forName(typeProperty);
return objectMapper.readValue(text, targetClass);
} else {
return text;
}
} else if (message instanceof BytesMessage) {
BytesMessage bytesMessage = (BytesMessage) message;
long bodyLength = bytesMessage.getBodyLength();
byte[] bytes = new byte[(int) bodyLength];
bytesMessage.readBytes(bytes);
return bytes;
} else {
throw new MessageConversionException("Unsupported message type: " + message.getClass());
}
} catch (Exception e) {
throw new MessageConversionException("Failed to convert message", e);
}
}
}
// JMS transaction manager configuration
@Configuration
public class JmsTransactionConfig {
@Bean
public PlatformTransactionManager jmsTransactionManager(ConnectionFactory connectionFactory) {
return new JmsTransactionManager(connectionFactory);
}
@Bean
public JmsTemplate transactionalJmsTemplate(ConnectionFactory connectionFactory) {
JmsTemplate template = new JmsTemplate(connectionFactory);
template.setSessionTransacted(true);
return template;
}
}
// Message-driven service with transactions
@Service
@Transactional
public class TransactionalMessageService {
private final JmsTemplate jmsTemplate;
private final UserRepository userRepository;
public TransactionalMessageService(JmsTemplate jmsTemplate, UserRepository userRepository) {
this.jmsTemplate = jmsTemplate;
this.userRepository = userRepository;
}
@JmsListener(destination = "user.updates")
@Transactional
public void handleUserUpdate(UserUpdateMessage message) {
// Update user in database
User user = userRepository.findById(message.getUserId())
.orElseThrow(() -> new UserNotFoundException("User not found: " + message.getUserId()));
user.setEmail(message.getNewEmail());
userRepository.save(user);
// Send confirmation message - both operations in same transaction
jmsTemplate.convertAndSend("user.update.confirmation",
UserUpdateConfirmation.builder()
.userId(message.getUserId())
.status("SUCCESS")
.timestamp(LocalDateTime.now())
.build());
}
@Transactional(rollbackFor = Exception.class)
public void processOrderWithRollback(OrderProcessingRequest request) {
try {
// Process order
processOrder(request);
// Send success notification
jmsTemplate.convertAndSend("order.processed",
OrderProcessedEvent.builder()
.orderId(request.getOrderId())
.status("PROCESSED")
.build());
} catch (Exception e) {
// Transaction will be rolled back, JMS message won't be sent
throw new OrderProcessingException("Failed to process order", e);
}
}
private void processOrder(OrderProcessingRequest request) {
// Order processing logic
}
}
// Message listener with retry and dead letter handling
@Component
public class RobustMessageListener {
private static final Logger logger = LoggerFactory.getLogger(RobustMessageListener.class);
@Retryable(
value = {ProcessingException.class},
maxAttempts = 3,
backoff = @Backoff(delay = 1000, multiplier = 2)
)
@JmsListener(destination = "robust.processing")
public void handleMessage(ProcessingMessage message) {
logger.info("Processing message: {}", message.getId());
try {
processMessage(message);
} catch (ProcessingException e) {
logger.warn("Processing failed for message {}, will retry", message.getId());
throw e; // Will trigger retry
} catch (Exception e) {
logger.error("Unexpected error processing message {}", message.getId(), e);
handleProcessingError(message, e);
}
}
@Recover
public void recover(ProcessingException ex, ProcessingMessage message) {
logger.error("All retry attempts exhausted for message {}", message.getId());
sendToDeadLetterQueue(message, ex);
}
private void processMessage(ProcessingMessage message) {
// Message processing logic that might fail
if (message.isCorrupted()) {
throw new ProcessingException("Message is corrupted");
}
// Actual processing...
}
private void handleProcessingError(ProcessingMessage message, Exception e) {
// Send to error queue for analysis
ErrorMessage errorMessage = ErrorMessage.builder()
.originalMessage(message)
.errorMessage(e.getMessage())
.timestamp(LocalDateTime.now())
.build();
jmsTemplate.convertAndSend("error.analysis", errorMessage);
}
private void sendToDeadLetterQueue(ProcessingMessage message, Exception e) {
DeadLetterMessage dlqMessage = DeadLetterMessage.builder()
.originalMessage(message)
.failureReason(e.getMessage())
.maxAttemptsReached(true)
.timestamp(LocalDateTime.now())
.build();
jmsTemplate.convertAndSend("dlq.processing", dlqMessage);
}
}// Spring Integration configuration for JMS
@Configuration
@EnableIntegration
public class JmsIntegrationConfig {
@Bean
public IntegrationFlow jmsInboundFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination("integration.inbound")
.configureListenerContainer(c -> c.sessionTransacted(true)))
.transform(Transformers.objectToString())
.filter("payload.contains('PRIORITY')")
.channel("priorityChannel")
.get();
}
@Bean
public IntegrationFlow jmsOutboundFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows
.from("outboundChannel")
.transform(Transformers.toJson())
.handle(Jms.outboundAdapter(connectionFactory)
.destination("integration.outbound"))
.get();
}
@Bean
public MessageChannel priorityChannel() {
return MessageChannels.queue().get();
}
@Bean
public MessageChannel outboundChannel() {
return MessageChannels.direct().get();
}
@ServiceActivator(inputChannel = "priorityChannel")
public void handlePriorityMessage(String message) {
logger.info("Handling priority message: {}", message);
// Process priority messages
}
}@SpringBootTest
@DirtiesContext
class JmsMessageTest {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private MessageListenerService messageListener;
@Test
@Transactional
@Rollback
void shouldSendAndReceiveMessage() {
// Given
UserNotificationMessage notification = UserNotificationMessage.builder()
.userId(1L)
.type("WELCOME")
.content("Welcome to our platform!")
.timestamp(LocalDateTime.now())
.build();
// When
jmsTemplate.convertAndSend("test.notifications", notification);
// Then
UserNotificationMessage received = (UserNotificationMessage)
jmsTemplate.receiveAndConvert("test.notifications");
assertThat(received).isNotNull();
assertThat(received.getUserId()).isEqualTo(1L);
assertThat(received.getType()).isEqualTo("WELCOME");
}
@Test
void shouldProcessMessageThroughListener() {
// Given
OrderCreatedEvent event = OrderCreatedEvent.builder()
.orderId(123L)
.userId(1L)
.totalAmount(BigDecimal.valueOf(99.99))
.build();
// When
messageListener.handleOrderCreated(event, 1L, "HIGH");
// Then - verify the processing was successful
// (This would typically involve verifying database changes or other side effects)
}
}
// Integration test with embedded broker
@SpringBootTest
@TestPropertySource(properties = {
"spring.activemq.broker-url=vm://localhost?broker.persistent=false",
"spring.jms.cache.enabled=false"
})
class JmsIntegrationTest {
@Autowired
private MessageProducerService producerService;
@MockBean
private EmailService emailService;
@Test
void shouldProcessNotificationEndToEnd() throws InterruptedException {
// Given
Long userId = 1L;
String content = "Test notification";
// When
producerService.sendUserNotification(userId, "EMAIL", content);
// Wait for async processing
Thread.sleep(1000);
// Then
verify(emailService, timeout(5000)).sendNotificationEmail(any(User.class), eq(content));
}
}Spring Messaging and JMS support provide a robust foundation for building message-driven applications with support for both point-to-point and publish-subscribe messaging patterns, transaction management, and error handling.
Install with Tessl CLI
npx tessl i tessl/maven-org-springframework--spring