CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-springframework--spring

Comprehensive application framework and inversion of control container for the Java platform providing dependency injection, AOP, data access, transaction management, and web framework capabilities

Overview
Eval results
Files

messaging.mddocs/

Messaging & JMS

Spring provides comprehensive support for message-driven applications through the Spring Messaging abstraction and JMS (Java Message Service) integration. This includes support for both synchronous and asynchronous messaging patterns with various message brokers.

Maven Dependencies

<!-- Spring Messaging (Core abstractions) -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-messaging</artifactId>
    <version>5.3.39</version>
</dependency>

<!-- Spring JMS -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>5.3.39</version>
</dependency>

<!-- JMS API -->
<dependency>
    <groupId>javax.jms</groupId>
    <artifactId>javax.jms-api</artifactId>
    <version>2.0.1</version>
</dependency>

<!-- ActiveMQ (example JMS broker) -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-broker</artifactId>
    <version>5.17.6</version>
</dependency>

<!-- RabbitMQ support -->
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.4.16</version>
</dependency>

Core Imports

// Core messaging abstractions
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.SubscribableChannel;

// Message handling
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;

// JMS Core
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.MessagePostProcessor;
import org.springframework.jms.core.SessionCallback;

// JMS Annotations
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.annotation.EnableJms;

// JMS Configuration
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

// Connection Factory
import javax.jms.ConnectionFactory;
import org.springframework.jms.connection.CachingConnectionFactory;

// Message conversion
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;

Core Messaging Abstractions

Message Interface

// Generic message interface
public interface Message<T> {
    
    T getPayload();
    
    MessageHeaders getHeaders();
}

// Message headers container
public final class MessageHeaders implements Map<String, Object>, Serializable {
    
    public static final String ID = "id";
    public static final String TIMESTAMP = "timestamp";
    public static final String CONTENT_TYPE = "contentType";
    public static final String REPLY_CHANNEL = "replyChannel";
    public static final String ERROR_CHANNEL = "errorChannel";
    
    public MessageHeaders(Map<String, Object> headers);
    public MessageHeaders(Map<String, Object> headers, UUID id, Long timestamp);
    
    public UUID getId();
    public Long getTimestamp();
    public Object getReplyChannel();
    public Object getErrorChannel();
    
    @Override
    public Object get(Object key);
    
    @Override
    public boolean containsKey(Object key);
    
    public <T> T get(Object key, Class<T> type);
}

// Message builder
public final class MessageBuilder<T> {
    
    public static <T> MessageBuilder<T> withPayload(T payload);
    public static <T> MessageBuilder<T> fromMessage(Message<T> message);
    
    public MessageBuilder<T> setHeader(String headerName, Object headerValue);
    public MessageBuilder<T> setHeaderIfAbsent(String headerName, Object headerValue);
    public MessageBuilder<T> removeHeaders(String... headerPatterns);
    public MessageBuilder<T> removeHeader(String headerName);
    public MessageBuilder<T> copyHeaders(Map<String, ?> headersToCopy);
    public MessageBuilder<T> copyHeadersIfAbsent(Map<String, ?> headersToCopy);
    
    public Message<T> build();
}

Message Channels

// Base interface for message channels
public interface MessageChannel {
    
    long INDEFINITE_TIMEOUT = -1;
    
    default boolean send(Message<?> message) {
        return send(message, INDEFINITE_TIMEOUT);
    }
    
    boolean send(Message<?> message, long timeout);
}

// Channel that can be polled for messages
public interface PollableChannel extends MessageChannel {
    
    Message<?> receive();
    
    Message<?> receive(long timeout);
}

// Channel that supports message handler subscription
public interface SubscribableChannel extends MessageChannel {
    
    boolean subscribe(MessageHandler handler);
    
    boolean unsubscribe(MessageHandler handler);
}

// Exception thrown on messaging errors
public class MessagingException extends RuntimeException {
    
    public MessagingException(String description);
    public MessagingException(String description, Throwable cause);
    public MessagingException(Message<?> failedMessage, String description);
    public MessagingException(Message<?> failedMessage, String description, Throwable cause);
    
    public Message<?> getFailedMessage();
}

Message Handling

// Interface for handling messages
@FunctionalInterface
public interface MessageHandler {
    
    void handleMessage(Message<?> message) throws MessagingException;
}

// Annotation for mapping messages to handler methods
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MessageMapping {
    
    @AliasFor("value")
    String[] destination() default {};
    
    @AliasFor("destination")  
    String[] value() default {};
}

// Annotation to bind method parameter to message payload
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Payload {
    
    String value() default "";
    
    boolean required() default true;
}

// Annotation to bind method parameter to header value
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Header {
    
    @AliasFor("name")
    String value() default "";
    
    @AliasFor("value")
    String name() default "";
    
    boolean required() default true;
    
    String defaultValue() default ValueConstants.DEFAULT_NONE;
}

// Annotation to bind method parameter to all message headers
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Headers {
}

JMS Support

JmsTemplate

// Central class for JMS operations
public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations {
    
    // Send operations
    public void send(String destinationName, MessageCreator messageCreator) throws JmsException;
    public void send(Destination destination, MessageCreator messageCreator) throws JmsException;
    public void send(MessageCreator messageCreator) throws JmsException;
    
    // Convert and send operations
    public void convertAndSend(String destinationName, Object message) throws JmsException;
    public void convertAndSend(Destination destination, Object message) throws JmsException;
    public void convertAndSend(Object message) throws JmsException;
    public void convertAndSend(String destinationName, Object message, MessagePostProcessor postProcessor) throws JmsException;
    
    // Receive operations
    public Message receive() throws JmsException;
    public Message receive(String destinationName) throws JmsException;
    public Message receive(Destination destination) throws JmsException;
    
    // Receive and convert operations
    public Object receiveAndConvert() throws JmsException;
    public Object receiveAndConvert(String destinationName) throws JmsException;
    public Object receiveAndConvert(Destination destination) throws JmsException;
    
    // Browse operations
    public <T> T browse(String queueName, BrowserCallback<T> action) throws JmsException;
    public <T> T browse(Queue queue, BrowserCallback<T> action) throws JmsException;
    
    // Execute operations
    public <T> T execute(SessionCallback<T> action) throws JmsException;
    public <T> T execute(ProducerCallback<T> action) throws JmsException;
    
    // Configuration
    public void setConnectionFactory(ConnectionFactory connectionFactory);
    public void setDefaultDestinationName(String defaultDestinationName);
    public void setDefaultDestination(Destination defaultDestination);
    public void setMessageConverter(MessageConverter messageConverter);
    public void setPubSubDomain(boolean pubSubDomain);
    public void setReceiveTimeout(long receiveTimeout);
    public void setDeliveryMode(int deliveryMode);
    public void setPriority(int priority);
    public void setTimeToLive(long timeToLive);
}

// Interface for creating JMS messages
@FunctionalInterface
public interface MessageCreator {
    Message createMessage(Session session) throws JMSException;
}

// Interface for post-processing messages
@FunctionalInterface
public interface MessagePostProcessor {
    Message postProcessMessage(Message message) throws JMSException;
}

// Callback interface for JMS Session operations
@FunctionalInterface
public interface SessionCallback<T> {
    T doInJms(Session session) throws JMSException;
}

JMS Annotations

// Annotation to mark a method as a JMS message listener
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(JmsListeners.class)
public @interface JmsListener {
    
    String id() default "";
    
    String containerFactory() default "";
    
    @AliasFor("destination")
    String[] value() default {};
    
    @AliasFor("value")
    String[] destination() default {};
    
    String subscription() default "";
    
    String selector() default "";
    
    String concurrency() default "";
}

// Enable JMS listener annotated endpoints
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(JmsBootstrapConfiguration.class)
public @interface EnableJms {
}

// Annotation to send a message as reply
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface SendTo {
    
    @AliasFor("destinations")
    String[] value() default {};
    
    @AliasFor("value")
    String[] destinations() default {};
}

JMS Configuration

// Factory for creating JMS listener containers
public interface JmsListenerContainerFactory<C extends MessageListenerContainer> {
    
    C createListenerContainer(JmsListenerEndpoint endpoint);
}

// Default implementation of JmsListenerContainerFactory
public class DefaultJmsListenerContainerFactory 
        implements JmsListenerContainerFactory<DefaultMessageListenerContainer>, BeanNameAware, InitializingBean {
    
    public void setConnectionFactory(ConnectionFactory connectionFactory);
    public void setDestinationResolver(DestinationResolver destinationResolver);
    public void setMessageConverter(MessageConverter messageConverter);
    public void setPubSubDomain(Boolean pubSubDomain);
    public void setSubscriptionDurable(Boolean subscriptionDurable);
    public void setClientId(String clientId);
    public void setConcurrency(String concurrency);
    public void setMaxConcurrency(Integer maxConcurrency);
    public void setCacheLevel(Integer cacheLevel);
    public void setReceiveTimeout(Long receiveTimeout);
    public void setAutoStartup(Boolean autoStartup);
    public void setPhase(Integer phase);
    
    @Override
    public DefaultMessageListenerContainer createListenerContainer(JmsListenerEndpoint endpoint);
}

// Message listener container for JMS
public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer 
        implements BeanNameAware, DisposableBean {
    
    // Concurrency settings
    public void setConcurrentConsumers(int concurrentConsumers);
    public void setMaxConcurrentConsumers(int maxConcurrentConsumers);
    public void setMaxMessagesPerTask(int maxMessagesPerTask);
    
    // Connection settings
    public void setCacheLevelName(String constantName) throws IllegalArgumentException;
    public void setCacheLevel(int cacheLevel);
    
    // Recovery settings
    public void setRecoveryInterval(long recoveryInterval);
    public void setBackOffMultiplier(double backOffMultiplier);
    public void setMaxRecoveryTime(long maxRecoveryTime);
    
    // Transaction settings
    public void setTransactionManager(PlatformTransactionManager transactionManager);
    public void setTransactionName(String transactionName);
    public void setTransactionTimeout(int transactionTimeout);
}

Message Conversion

// Strategy interface for converting between Java objects and JMS messages
public interface MessageConverter {
    
    Message toMessage(Object object, Session session) throws JMSException, MessageConversionException;
    
    Object fromMessage(Message message) throws JMSException, MessageConversionException;
}

// Base implementation of MessageConverter
public abstract class MessageConverterSupport implements MessageConverter {
    
    public static final String TYPE_ID_PROPERTY = "__TypeId__";
    public static final String CONTENT_TYPE_PROPERTY = "__ContentTypeId__";
    public static final String KEY_TYPE_ID_PROPERTY = "__KeyTypeId__";
    
    protected abstract Message createMessageForByteArray(byte[] bytes, Session session) throws JMSException;
    protected abstract Message createMessageForString(String string, Session session) throws JMSException;
    protected abstract Message createMessageForMap(Map<String, Object> map, Session session) throws JMSException;
    protected abstract Message createMessageForSerializable(Serializable object, Session session) throws JMSException;
    
    protected abstract byte[] extractByteArrayFromMessage(Message message) throws JMSException;
    protected abstract String extractStringFromMessage(Message message) throws JMSException;
    protected abstract Map<String, Object> extractMapFromMessage(Message message) throws JMSException;
    protected abstract Serializable extractSerializableFromMessage(Message message) throws JMSException;
}

// JSON message converter using Jackson
public class MappingJackson2MessageConverter extends MessageConverterSupport {
    
    public MappingJackson2MessageConverter();
    
    public void setObjectMapper(ObjectMapper objectMapper);
    public void setTargetType(MessageType targetType);
    public void setTypeIdPropertyName(String typeIdPropertyName);
    public void setTypeIdMappings(Map<String, Class<?>> typeIdMappings);
    
    // Message type enum
    public enum MessageType {
        BYTES, TEXT
    }
}

Practical Usage Examples

Basic JMS Configuration

@Configuration
@EnableJms
public class JmsConfig {
    
    @Bean
    public ConnectionFactory connectionFactory() {
        // Using ActiveMQ
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL("tcp://localhost:61616");
        connectionFactory.setUserName("admin");
        connectionFactory.setPassword("admin");
        
        // Wrap with caching connection factory for better performance
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory);
        cachingConnectionFactory.setCacheConsumers(true);
        cachingConnectionFactory.setCacheProducers(true);
        cachingConnectionFactory.setSessionCacheSize(10);
        
        return cachingConnectionFactory;
    }
    
    @Bean
    public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
        JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
        jmsTemplate.setMessageConverter(messageConverter());
        jmsTemplate.setReceiveTimeout(5000); // 5 seconds
        jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
        jmsTemplate.setPriority(Message.DEFAULT_PRIORITY);
        jmsTemplate.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
        return jmsTemplate;
    }
    
    @Bean
    public MessageConverter messageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        
        // Configure type mappings
        Map<String, Class<?>> typeIdMappings = new HashMap<>();
        typeIdMappings.put("user", UserMessage.class);
        typeIdMappings.put("order", OrderMessage.class);
        converter.setTypeIdMappings(typeIdMappings);
        
        return converter;
    }
    
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(messageConverter());
        factory.setConcurrency("3-10"); // Min 3, max 10 consumers
        factory.setReceiveTimeout(5000L);
        factory.setAutoStartup(true);
        
        // Error handling
        factory.setErrorHandler(t -> {
            System.err.println("Error in JMS listener: " + t.getMessage());
            // Log error or send to dead letter queue
        });
        
        return factory;
    }
    
    // Topic listener factory for pub/sub
    @Bean
    public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(messageConverter());
        factory.setPubSubDomain(true); // Enable topic mode
        factory.setSubscriptionDurable(true);
        factory.setClientId("myapp-client");
        return factory;
    }
    
    // Dead letter queue configuration
    @Bean
    public Queue deadLetterQueue() {
        return new ActiveMQQueue("DLQ.MyApp");
    }
}

Message Producer Service

@Service
public class MessageProducerService {
    
    private final JmsTemplate jmsTemplate;
    
    public MessageProducerService(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }
    
    // Simple message sending
    public void sendMessage(String destination, Object message) {
        jmsTemplate.convertAndSend(destination, message);
    }
    
    // Send message with headers
    public void sendMessageWithHeaders(String destination, Object message, Map<String, Object> headers) {
        jmsTemplate.convertAndSend(destination, message, messagePostProcessor -> {
            headers.forEach((key, value) -> {
                try {
                    messagePostProcessor.setObjectProperty(key, value);
                } catch (JMSException e) {
                    throw new RuntimeException("Failed to set header: " + key, e);
                }
            });
            return messagePostProcessor;
        });
    }
    
    // Send user notification
    public void sendUserNotification(Long userId, String notificationType, String content) {
        UserNotificationMessage notification = UserNotificationMessage.builder()
            .userId(userId)
            .type(notificationType)
            .content(content)
            .timestamp(LocalDateTime.now())
            .build();
        
        Map<String, Object> headers = new HashMap<>();
        headers.put("userId", userId);
        headers.put("notificationType", notificationType);
        headers.put("priority", "HIGH");
        
        sendMessageWithHeaders("user.notifications", notification, headers);
    }
    
    // Send order events
    public void sendOrderCreatedEvent(Order order) {
        OrderCreatedEvent event = OrderCreatedEvent.builder()
            .orderId(order.getId())
            .userId(order.getUserId())
            .totalAmount(order.getTotalAmount())
            .items(order.getItems())
            .timestamp(LocalDateTime.now())
            .build();
        
        // Send to multiple destinations
        jmsTemplate.convertAndSend("order.created", event);
        jmsTemplate.convertAndSend("audit.events", event);
        
        // Send to topic for pub/sub
        jmsTemplate.setPubSubDomain(true);
        jmsTemplate.convertAndSend("order.events", event);
        jmsTemplate.setPubSubDomain(false); // Reset to queue mode
    }
    
    // Request-reply pattern
    public String sendRequestReply(String destination, Object request) {
        return (String) jmsTemplate.sendAndReceive(destination, session -> {
            ObjectMessage message = session.createObjectMessage((Serializable) request);
            message.setJMSReplyTo(session.createTemporaryQueue());
            return message;
        });
    }
    
    // Delayed message sending
    public void sendDelayedMessage(String destination, Object message, Duration delay) {
        jmsTemplate.convertAndSend(destination, message, messagePostProcessor -> {
            try {
                // ActiveMQ specific - set delivery delay
                messagePostProcessor.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 
                    delay.toMillis());
            } catch (JMSException e) {
                throw new RuntimeException("Failed to set delay", e);
            }
            return messagePostProcessor;
        });
    }
    
    // Priority message sending
    public void sendPriorityMessage(String destination, Object message, int priority) {
        jmsTemplate.convertAndSend(destination, message, messagePostProcessor -> {
            try {
                messagePostProcessor.setJMSPriority(priority);
            } catch (JMSException e) {
                throw new RuntimeException("Failed to set priority", e);
            }
            return messagePostProcessor;
        });
    }
    
    // Transactional message sending
    @Transactional
    public void sendTransactionalMessages(List<MessageRequest> requests) {
        for (MessageRequest request : requests) {
            jmsTemplate.convertAndSend(request.getDestination(), request.getPayload());
        }
        
        // If any exception occurs, all messages will be rolled back
        if (requests.size() > 10) {
            throw new IllegalArgumentException("Too many messages in single transaction");
        }
    }
}

Message Listener Service

@Component
public class MessageListenerService {
    
    private static final Logger logger = LoggerFactory.getLogger(MessageListenerService.class);
    
    private final UserService userService;
    private final EmailService emailService;
    private final AuditService auditService;
    
    public MessageListenerService(UserService userService, EmailService emailService, AuditService auditService) {
        this.userService = userService;
        this.emailService = emailService;
        this.auditService = auditService;
    }
    
    // Simple message listener
    @JmsListener(destination = "user.notifications")
    public void handleUserNotification(UserNotificationMessage notification) {
        logger.info("Received user notification: {}", notification);
        
        try {
            User user = userService.findById(notification.getUserId());
            emailService.sendNotificationEmail(user, notification.getContent());
        } catch (Exception e) {
            logger.error("Failed to process user notification", e);
            throw new RuntimeException("Notification processing failed", e);
        }
    }
    
    // Listener with message headers
    @JmsListener(destination = "order.created")
    public void handleOrderCreated(
            @Payload OrderCreatedEvent event,
            @Header("userId") Long userId,
            @Header(value = "priority", defaultValue = "NORMAL") String priority) {
        
        logger.info("Processing order created event for user {}: {}", userId, event);
        
        // Process based on priority
        if ("HIGH".equals(priority)) {
            processHighPriorityOrder(event);
        } else {
            processNormalOrder(event);
        }
    }
    
    // Listener with all headers
    @JmsListener(destination = "audit.events")
    public void handleAuditEvent(
            @Payload Object event,
            @Headers Map<String, Object> headers) {
        
        AuditRecord audit = AuditRecord.builder()
            .eventType(event.getClass().getSimpleName())
            .payload(event)
            .headers(headers)
            .timestamp(LocalDateTime.now())
            .build();
        
        auditService.saveAuditRecord(audit);
    }
    
    // Topic listener with durable subscription
    @JmsListener(
        destination = "order.events",
        containerFactory = "topicListenerFactory",
        subscription = "orderEventSubscription"
    )
    public void handleOrderEvents(OrderEvent event) {
        logger.info("Received order event: {}", event);
        
        switch (event.getEventType()) {
            case "ORDER_CREATED":
                handleOrderCreatedFromTopic((OrderCreatedEvent) event);
                break;
            case "ORDER_UPDATED":
                handleOrderUpdated((OrderUpdatedEvent) event);
                break;
            case "ORDER_CANCELLED":
                handleOrderCancelled((OrderCancelledEvent) event);
                break;
            default:
                logger.warn("Unknown order event type: {}", event.getEventType());
        }
    }
    
    // Listener with message selector
    @JmsListener(
        destination = "notifications",
        selector = "notificationType = 'EMAIL' OR priority = 'HIGH'"
    )
    public void handleHighPriorityNotifications(NotificationMessage notification) {
        logger.info("Processing high priority notification: {}", notification);
        // Process urgent notifications immediately
        processUrgentNotification(notification);
    }
    
    // Listener with concurrency control
    @JmsListener(
        destination = "file.processing",
        concurrency = "2-5" // Min 2, max 5 concurrent listeners
    )
    public void handleFileProcessing(FileProcessingRequest request) {
        logger.info("Processing file: {}", request.getFilename());
        
        try {
            // Simulate file processing
            Thread.sleep(5000);
            processFile(request);
            logger.info("File processing completed: {}", request.getFilename());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("File processing interrupted", e);
        }
    }
    
    // Error handling listener
    @JmsListener(destination = "error.queue")
    public void handleErrors(
            Message failedMessage,
            @Header(JmsHeaders.DELIVERY_COUNT) int deliveryCount) {
        
        logger.error("Processing failed message (attempt {}): {}", deliveryCount, failedMessage);
        
        if (deliveryCount >= 3) {
            // Move to dead letter queue after 3 attempts
            sendToDeadLetterQueue(failedMessage);
        } else {
            // Retry processing
            retryProcessing(failedMessage);
        }
    }
    
    // Reply listener for request-response pattern
    @JmsListener(destination = "user.lookup.request")
    @SendTo("user.lookup.response")
    public UserResponse handleUserLookupRequest(UserLookupRequest request) {
        logger.info("Looking up user: {}", request.getUserId());
        
        User user = userService.findById(request.getUserId());
        
        return UserResponse.builder()
            .userId(user.getId())
            .username(user.getUsername())
            .email(user.getEmail())
            .found(true)
            .build();
    }
    
    // Conditional listener (only active in certain profiles)
    @JmsListener(
        destination = "development.debug",
        condition = "#{environment.acceptsProfiles('development')}"
    )
    public void handleDebugMessages(DebugMessage message) {
        logger.debug("Debug message: {}", message);
        // Only process in development environment
    }
    
    private void processHighPriorityOrder(OrderCreatedEvent event) {
        // Expedited processing for VIP customers
        logger.info("Processing high priority order: {}", event.getOrderId());
    }
    
    private void processNormalOrder(OrderCreatedEvent event) {
        // Standard order processing
        logger.info("Processing normal order: {}", event.getOrderId());
    }
    
    private void handleOrderCreatedFromTopic(OrderCreatedEvent event) {
        // Update inventory, send confirmation email, etc.
        logger.info("Handling order created from topic: {}", event.getOrderId());
    }
    
    private void handleOrderUpdated(OrderUpdatedEvent event) {
        logger.info("Handling order updated: {}", event.getOrderId());
    }
    
    private void handleOrderCancelled(OrderCancelledEvent event) {
        logger.info("Handling order cancelled: {}", event.getOrderId());
    }
    
    private void processUrgentNotification(NotificationMessage notification) {
        // Immediate processing for urgent notifications
        logger.info("Processing urgent notification: {}", notification.getId());
    }
    
    private void processFile(FileProcessingRequest request) {
        // File processing logic
        logger.info("Processing file: {}", request.getFilename());
    }
    
    private void sendToDeadLetterQueue(Message message) {
        // Send to DLQ for manual inspection
        logger.error("Sending message to dead letter queue: {}", message);
    }
    
    private void retryProcessing(Message message) {
        // Retry logic
        logger.info("Retrying message processing: {}", message);
    }
}

Advanced JMS Features

// Custom message converter
@Component
public class CustomMessageConverter implements MessageConverter {
    
    private final ObjectMapper objectMapper;
    
    public CustomMessageConverter(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }
    
    @Override
    public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {
        try {
            if (object instanceof String) {
                return session.createTextMessage((String) object);
            } else if (object instanceof byte[]) {
                BytesMessage message = session.createBytesMessage();
                message.writeBytes((byte[]) object);
                return message;
            } else {
                // Convert to JSON
                String json = objectMapper.writeValueAsString(object);
                TextMessage message = session.createTextMessage(json);
                message.setStringProperty("_type", object.getClass().getName());
                return message;
            }
        } catch (Exception e) {
            throw new MessageConversionException("Failed to convert message", e);
        }
    }
    
    @Override
    public Object fromMessage(Message message) throws JMSException, MessageConversionException {
        try {
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                String text = textMessage.getText();
                
                // Check if it has type information
                String typeProperty = message.getStringProperty("_type");
                if (typeProperty != null) {
                    Class<?> targetClass = Class.forName(typeProperty);
                    return objectMapper.readValue(text, targetClass);
                } else {
                    return text;
                }
            } else if (message instanceof BytesMessage) {
                BytesMessage bytesMessage = (BytesMessage) message;
                long bodyLength = bytesMessage.getBodyLength();
                byte[] bytes = new byte[(int) bodyLength];
                bytesMessage.readBytes(bytes);
                return bytes;
            } else {
                throw new MessageConversionException("Unsupported message type: " + message.getClass());
            }
        } catch (Exception e) {
            throw new MessageConversionException("Failed to convert message", e);
        }
    }
}

// JMS transaction manager configuration
@Configuration
public class JmsTransactionConfig {
    
    @Bean
    public PlatformTransactionManager jmsTransactionManager(ConnectionFactory connectionFactory) {
        return new JmsTransactionManager(connectionFactory);
    }
    
    @Bean
    public JmsTemplate transactionalJmsTemplate(ConnectionFactory connectionFactory) {
        JmsTemplate template = new JmsTemplate(connectionFactory);
        template.setSessionTransacted(true);
        return template;
    }
}

// Message-driven service with transactions
@Service
@Transactional
public class TransactionalMessageService {
    
    private final JmsTemplate jmsTemplate;
    private final UserRepository userRepository;
    
    public TransactionalMessageService(JmsTemplate jmsTemplate, UserRepository userRepository) {
        this.jmsTemplate = jmsTemplate;
        this.userRepository = userRepository;
    }
    
    @JmsListener(destination = "user.updates")
    @Transactional
    public void handleUserUpdate(UserUpdateMessage message) {
        // Update user in database
        User user = userRepository.findById(message.getUserId())
            .orElseThrow(() -> new UserNotFoundException("User not found: " + message.getUserId()));
        
        user.setEmail(message.getNewEmail());
        userRepository.save(user);
        
        // Send confirmation message - both operations in same transaction
        jmsTemplate.convertAndSend("user.update.confirmation", 
            UserUpdateConfirmation.builder()
                .userId(message.getUserId())
                .status("SUCCESS")
                .timestamp(LocalDateTime.now())
                .build());
    }
    
    @Transactional(rollbackFor = Exception.class)
    public void processOrderWithRollback(OrderProcessingRequest request) {
        try {
            // Process order
            processOrder(request);
            
            // Send success notification
            jmsTemplate.convertAndSend("order.processed", 
                OrderProcessedEvent.builder()
                    .orderId(request.getOrderId())
                    .status("PROCESSED")
                    .build());
            
        } catch (Exception e) {
            // Transaction will be rolled back, JMS message won't be sent
            throw new OrderProcessingException("Failed to process order", e);
        }
    }
    
    private void processOrder(OrderProcessingRequest request) {
        // Order processing logic
    }
}

// Message listener with retry and dead letter handling
@Component
public class RobustMessageListener {
    
    private static final Logger logger = LoggerFactory.getLogger(RobustMessageListener.class);
    
    @Retryable(
        value = {ProcessingException.class},
        maxAttempts = 3,
        backoff = @Backoff(delay = 1000, multiplier = 2)
    )
    @JmsListener(destination = "robust.processing")
    public void handleMessage(ProcessingMessage message) {
        logger.info("Processing message: {}", message.getId());
        
        try {
            processMessage(message);
        } catch (ProcessingException e) {
            logger.warn("Processing failed for message {}, will retry", message.getId());
            throw e; // Will trigger retry
        } catch (Exception e) {
            logger.error("Unexpected error processing message {}", message.getId(), e);
            handleProcessingError(message, e);
        }
    }
    
    @Recover
    public void recover(ProcessingException ex, ProcessingMessage message) {
        logger.error("All retry attempts exhausted for message {}", message.getId());
        sendToDeadLetterQueue(message, ex);
    }
    
    private void processMessage(ProcessingMessage message) {
        // Message processing logic that might fail
        if (message.isCorrupted()) {
            throw new ProcessingException("Message is corrupted");
        }
        
        // Actual processing...
    }
    
    private void handleProcessingError(ProcessingMessage message, Exception e) {
        // Send to error queue for analysis
        ErrorMessage errorMessage = ErrorMessage.builder()
            .originalMessage(message)
            .errorMessage(e.getMessage())
            .timestamp(LocalDateTime.now())
            .build();
        
        jmsTemplate.convertAndSend("error.analysis", errorMessage);
    }
    
    private void sendToDeadLetterQueue(ProcessingMessage message, Exception e) {
        DeadLetterMessage dlqMessage = DeadLetterMessage.builder()
            .originalMessage(message)
            .failureReason(e.getMessage())
            .maxAttemptsReached(true)
            .timestamp(LocalDateTime.now())
            .build();
        
        jmsTemplate.convertAndSend("dlq.processing", dlqMessage);
    }
}

Integration with Spring Integration

// Spring Integration configuration for JMS
@Configuration
@EnableIntegration
public class JmsIntegrationConfig {
    
    @Bean
    public IntegrationFlow jmsInboundFlow(ConnectionFactory connectionFactory) {
        return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory)
                .destination("integration.inbound")
                .configureListenerContainer(c -> c.sessionTransacted(true)))
            .transform(Transformers.objectToString())
            .filter("payload.contains('PRIORITY')")
            .channel("priorityChannel")
            .get();
    }
    
    @Bean
    public IntegrationFlow jmsOutboundFlow(ConnectionFactory connectionFactory) {
        return IntegrationFlows
            .from("outboundChannel")
            .transform(Transformers.toJson())
            .handle(Jms.outboundAdapter(connectionFactory)
                .destination("integration.outbound"))
            .get();
    }
    
    @Bean
    public MessageChannel priorityChannel() {
        return MessageChannels.queue().get();
    }
    
    @Bean
    public MessageChannel outboundChannel() {
        return MessageChannels.direct().get();
    }
    
    @ServiceActivator(inputChannel = "priorityChannel")
    public void handlePriorityMessage(String message) {
        logger.info("Handling priority message: {}", message);
        // Process priority messages
    }
}

Message Testing

@SpringBootTest
@DirtiesContext
class JmsMessageTest {
    
    @Autowired
    private JmsTemplate jmsTemplate;
    
    @Autowired
    private MessageListenerService messageListener;
    
    @Test
    @Transactional
    @Rollback
    void shouldSendAndReceiveMessage() {
        // Given
        UserNotificationMessage notification = UserNotificationMessage.builder()
            .userId(1L)
            .type("WELCOME")
            .content("Welcome to our platform!")
            .timestamp(LocalDateTime.now())
            .build();
        
        // When
        jmsTemplate.convertAndSend("test.notifications", notification);
        
        // Then
        UserNotificationMessage received = (UserNotificationMessage) 
            jmsTemplate.receiveAndConvert("test.notifications");
        
        assertThat(received).isNotNull();
        assertThat(received.getUserId()).isEqualTo(1L);
        assertThat(received.getType()).isEqualTo("WELCOME");
    }
    
    @Test
    void shouldProcessMessageThroughListener() {
        // Given
        OrderCreatedEvent event = OrderCreatedEvent.builder()
            .orderId(123L)
            .userId(1L)
            .totalAmount(BigDecimal.valueOf(99.99))
            .build();
        
        // When
        messageListener.handleOrderCreated(event, 1L, "HIGH");
        
        // Then - verify the processing was successful
        // (This would typically involve verifying database changes or other side effects)
    }
}

// Integration test with embedded broker
@SpringBootTest
@TestPropertySource(properties = {
    "spring.activemq.broker-url=vm://localhost?broker.persistent=false",
    "spring.jms.cache.enabled=false"
})
class JmsIntegrationTest {
    
    @Autowired
    private MessageProducerService producerService;
    
    @MockBean
    private EmailService emailService;
    
    @Test
    void shouldProcessNotificationEndToEnd() throws InterruptedException {
        // Given
        Long userId = 1L;
        String content = "Test notification";
        
        // When
        producerService.sendUserNotification(userId, "EMAIL", content);
        
        // Wait for async processing
        Thread.sleep(1000);
        
        // Then
        verify(emailService, timeout(5000)).sendNotificationEmail(any(User.class), eq(content));
    }
}

Spring Messaging and JMS support provide a robust foundation for building message-driven applications with support for both point-to-point and publish-subscribe messaging patterns, transaction management, and error handling.

Install with Tessl CLI

npx tessl i tessl/maven-org-springframework--spring

docs

aop.md

core-container.md

data-access.md

index.md

integration.md

messaging.md

reactive-web.md

testing.md

web-framework.md

tile.json