0
# Messaging & JMS
1
2
Spring provides comprehensive support for message-driven applications through the Spring Messaging abstraction and JMS (Java Message Service) integration. This includes support for both synchronous and asynchronous messaging patterns with various message brokers.
3
4
## Maven Dependencies
5
6
```xml
7
<!-- Spring Messaging (Core abstractions) -->
8
<dependency>
9
<groupId>org.springframework</groupId>
10
<artifactId>spring-messaging</artifactId>
11
<version>5.3.39</version>
12
</dependency>
13
14
<!-- Spring JMS -->
15
<dependency>
16
<groupId>org.springframework</groupId>
17
<artifactId>spring-jms</artifactId>
18
<version>5.3.39</version>
19
</dependency>
20
21
<!-- JMS API -->
22
<dependency>
23
<groupId>javax.jms</groupId>
24
<artifactId>javax.jms-api</artifactId>
25
<version>2.0.1</version>
26
</dependency>
27
28
<!-- ActiveMQ (example JMS broker) -->
29
<dependency>
30
<groupId>org.apache.activemq</groupId>
31
<artifactId>activemq-broker</artifactId>
32
<version>5.17.6</version>
33
</dependency>
34
35
<!-- RabbitMQ support -->
36
<dependency>
37
<groupId>org.springframework.amqp</groupId>
38
<artifactId>spring-rabbit</artifactId>
39
<version>2.4.16</version>
40
</dependency>
41
```
42
43
## Core Imports
44
45
```java { .api }
46
// Core messaging abstractions
47
import org.springframework.messaging.Message;
48
import org.springframework.messaging.MessageHeaders;
49
import org.springframework.messaging.MessageChannel;
50
import org.springframework.messaging.MessagingException;
51
import org.springframework.messaging.PollableChannel;
52
import org.springframework.messaging.SubscribableChannel;
53
54
// Message handling
55
import org.springframework.messaging.MessageHandler;
56
import org.springframework.messaging.handler.annotation.MessageMapping;
57
import org.springframework.messaging.handler.annotation.Payload;
58
import org.springframework.messaging.handler.annotation.Header;
59
import org.springframework.messaging.handler.annotation.Headers;
60
61
// JMS Core
62
import org.springframework.jms.core.JmsTemplate;
63
import org.springframework.jms.core.MessageCreator;
64
import org.springframework.jms.core.MessagePostProcessor;
65
import org.springframework.jms.core.SessionCallback;
66
67
// JMS Annotations
68
import org.springframework.jms.annotation.JmsListener;
69
import org.springframework.jms.annotation.EnableJms;
70
71
// JMS Configuration
72
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
73
import org.springframework.jms.config.JmsListenerContainerFactory;
74
import org.springframework.jms.listener.DefaultMessageListenerContainer;
75
76
// Connection Factory
77
import javax.jms.ConnectionFactory;
78
import org.springframework.jms.connection.CachingConnectionFactory;
79
80
// Message conversion
81
import org.springframework.jms.support.converter.MessageConverter;
82
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
83
```
84
85
## Core Messaging Abstractions
86
87
### Message Interface
88
89
```java { .api }
90
// Generic message interface
91
public interface Message<T> {
92
93
T getPayload();
94
95
MessageHeaders getHeaders();
96
}
97
98
// Message headers container
99
public final class MessageHeaders implements Map<String, Object>, Serializable {
100
101
public static final String ID = "id";
102
public static final String TIMESTAMP = "timestamp";
103
public static final String CONTENT_TYPE = "contentType";
104
public static final String REPLY_CHANNEL = "replyChannel";
105
public static final String ERROR_CHANNEL = "errorChannel";
106
107
public MessageHeaders(Map<String, Object> headers);
108
public MessageHeaders(Map<String, Object> headers, UUID id, Long timestamp);
109
110
public UUID getId();
111
public Long getTimestamp();
112
public Object getReplyChannel();
113
public Object getErrorChannel();
114
115
@Override
116
public Object get(Object key);
117
118
@Override
119
public boolean containsKey(Object key);
120
121
public <T> T get(Object key, Class<T> type);
122
}
123
124
// Message builder
125
public final class MessageBuilder<T> {
126
127
public static <T> MessageBuilder<T> withPayload(T payload);
128
public static <T> MessageBuilder<T> fromMessage(Message<T> message);
129
130
public MessageBuilder<T> setHeader(String headerName, Object headerValue);
131
public MessageBuilder<T> setHeaderIfAbsent(String headerName, Object headerValue);
132
public MessageBuilder<T> removeHeaders(String... headerPatterns);
133
public MessageBuilder<T> removeHeader(String headerName);
134
public MessageBuilder<T> copyHeaders(Map<String, ?> headersToCopy);
135
public MessageBuilder<T> copyHeadersIfAbsent(Map<String, ?> headersToCopy);
136
137
public Message<T> build();
138
}
139
```
140
141
### Message Channels
142
143
```java { .api }
144
// Base interface for message channels
145
public interface MessageChannel {
146
147
long INDEFINITE_TIMEOUT = -1;
148
149
default boolean send(Message<?> message) {
150
return send(message, INDEFINITE_TIMEOUT);
151
}
152
153
boolean send(Message<?> message, long timeout);
154
}
155
156
// Channel that can be polled for messages
157
public interface PollableChannel extends MessageChannel {
158
159
Message<?> receive();
160
161
Message<?> receive(long timeout);
162
}
163
164
// Channel that supports message handler subscription
165
public interface SubscribableChannel extends MessageChannel {
166
167
boolean subscribe(MessageHandler handler);
168
169
boolean unsubscribe(MessageHandler handler);
170
}
171
172
// Exception thrown on messaging errors
173
public class MessagingException extends RuntimeException {
174
175
public MessagingException(String description);
176
public MessagingException(String description, Throwable cause);
177
public MessagingException(Message<?> failedMessage, String description);
178
public MessagingException(Message<?> failedMessage, String description, Throwable cause);
179
180
public Message<?> getFailedMessage();
181
}
182
```
183
184
### Message Handling
185
186
```java { .api }
187
// Interface for handling messages
188
@FunctionalInterface
189
public interface MessageHandler {
190
191
void handleMessage(Message<?> message) throws MessagingException;
192
}
193
194
// Annotation for mapping messages to handler methods
195
@Target({ElementType.TYPE, ElementType.METHOD})
196
@Retention(RetentionPolicy.RUNTIME)
197
@Documented
198
public @interface MessageMapping {
199
200
@AliasFor("value")
201
String[] destination() default {};
202
203
@AliasFor("destination")
204
String[] value() default {};
205
}
206
207
// Annotation to bind method parameter to message payload
208
@Target(ElementType.PARAMETER)
209
@Retention(RetentionPolicy.RUNTIME)
210
@Documented
211
public @interface Payload {
212
213
String value() default "";
214
215
boolean required() default true;
216
}
217
218
// Annotation to bind method parameter to header value
219
@Target(ElementType.PARAMETER)
220
@Retention(RetentionPolicy.RUNTIME)
221
@Documented
222
public @interface Header {
223
224
@AliasFor("name")
225
String value() default "";
226
227
@AliasFor("value")
228
String name() default "";
229
230
boolean required() default true;
231
232
String defaultValue() default ValueConstants.DEFAULT_NONE;
233
}
234
235
// Annotation to bind method parameter to all message headers
236
@Target(ElementType.PARAMETER)
237
@Retention(RetentionPolicy.RUNTIME)
238
@Documented
239
public @interface Headers {
240
}
241
```
242
243
## JMS Support
244
245
### JmsTemplate
246
247
```java { .api }
248
// Central class for JMS operations
249
public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations {
250
251
// Send operations
252
public void send(String destinationName, MessageCreator messageCreator) throws JmsException;
253
public void send(Destination destination, MessageCreator messageCreator) throws JmsException;
254
public void send(MessageCreator messageCreator) throws JmsException;
255
256
// Convert and send operations
257
public void convertAndSend(String destinationName, Object message) throws JmsException;
258
public void convertAndSend(Destination destination, Object message) throws JmsException;
259
public void convertAndSend(Object message) throws JmsException;
260
public void convertAndSend(String destinationName, Object message, MessagePostProcessor postProcessor) throws JmsException;
261
262
// Receive operations
263
public Message receive() throws JmsException;
264
public Message receive(String destinationName) throws JmsException;
265
public Message receive(Destination destination) throws JmsException;
266
267
// Receive and convert operations
268
public Object receiveAndConvert() throws JmsException;
269
public Object receiveAndConvert(String destinationName) throws JmsException;
270
public Object receiveAndConvert(Destination destination) throws JmsException;
271
272
// Browse operations
273
public <T> T browse(String queueName, BrowserCallback<T> action) throws JmsException;
274
public <T> T browse(Queue queue, BrowserCallback<T> action) throws JmsException;
275
276
// Execute operations
277
public <T> T execute(SessionCallback<T> action) throws JmsException;
278
public <T> T execute(ProducerCallback<T> action) throws JmsException;
279
280
// Configuration
281
public void setConnectionFactory(ConnectionFactory connectionFactory);
282
public void setDefaultDestinationName(String defaultDestinationName);
283
public void setDefaultDestination(Destination defaultDestination);
284
public void setMessageConverter(MessageConverter messageConverter);
285
public void setPubSubDomain(boolean pubSubDomain);
286
public void setReceiveTimeout(long receiveTimeout);
287
public void setDeliveryMode(int deliveryMode);
288
public void setPriority(int priority);
289
public void setTimeToLive(long timeToLive);
290
}
291
292
// Interface for creating JMS messages
293
@FunctionalInterface
294
public interface MessageCreator {
295
Message createMessage(Session session) throws JMSException;
296
}
297
298
// Interface for post-processing messages
299
@FunctionalInterface
300
public interface MessagePostProcessor {
301
Message postProcessMessage(Message message) throws JMSException;
302
}
303
304
// Callback interface for JMS Session operations
305
@FunctionalInterface
306
public interface SessionCallback<T> {
307
T doInJms(Session session) throws JMSException;
308
}
309
```
310
311
### JMS Annotations
312
313
```java { .api }
314
// Annotation to mark a method as a JMS message listener
315
@Target({ElementType.TYPE, ElementType.METHOD})
316
@Retention(RetentionPolicy.RUNTIME)
317
@Documented
318
@Repeatable(JmsListeners.class)
319
public @interface JmsListener {
320
321
String id() default "";
322
323
String containerFactory() default "";
324
325
@AliasFor("destination")
326
String[] value() default {};
327
328
@AliasFor("value")
329
String[] destination() default {};
330
331
String subscription() default "";
332
333
String selector() default "";
334
335
String concurrency() default "";
336
}
337
338
// Enable JMS listener annotated endpoints
339
@Target(ElementType.TYPE)
340
@Retention(RetentionPolicy.RUNTIME)
341
@Documented
342
@Import(JmsBootstrapConfiguration.class)
343
public @interface EnableJms {
344
}
345
346
// Annotation to send a message as reply
347
@Target({ElementType.TYPE, ElementType.METHOD})
348
@Retention(RetentionPolicy.RUNTIME)
349
@Documented
350
public @interface SendTo {
351
352
@AliasFor("destinations")
353
String[] value() default {};
354
355
@AliasFor("value")
356
String[] destinations() default {};
357
}
358
```
359
360
### JMS Configuration
361
362
```java { .api }
363
// Factory for creating JMS listener containers
364
public interface JmsListenerContainerFactory<C extends MessageListenerContainer> {
365
366
C createListenerContainer(JmsListenerEndpoint endpoint);
367
}
368
369
// Default implementation of JmsListenerContainerFactory
370
public class DefaultJmsListenerContainerFactory
371
implements JmsListenerContainerFactory<DefaultMessageListenerContainer>, BeanNameAware, InitializingBean {
372
373
public void setConnectionFactory(ConnectionFactory connectionFactory);
374
public void setDestinationResolver(DestinationResolver destinationResolver);
375
public void setMessageConverter(MessageConverter messageConverter);
376
public void setPubSubDomain(Boolean pubSubDomain);
377
public void setSubscriptionDurable(Boolean subscriptionDurable);
378
public void setClientId(String clientId);
379
public void setConcurrency(String concurrency);
380
public void setMaxConcurrency(Integer maxConcurrency);
381
public void setCacheLevel(Integer cacheLevel);
382
public void setReceiveTimeout(Long receiveTimeout);
383
public void setAutoStartup(Boolean autoStartup);
384
public void setPhase(Integer phase);
385
386
@Override
387
public DefaultMessageListenerContainer createListenerContainer(JmsListenerEndpoint endpoint);
388
}
389
390
// Message listener container for JMS
391
public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer
392
implements BeanNameAware, DisposableBean {
393
394
// Concurrency settings
395
public void setConcurrentConsumers(int concurrentConsumers);
396
public void setMaxConcurrentConsumers(int maxConcurrentConsumers);
397
public void setMaxMessagesPerTask(int maxMessagesPerTask);
398
399
// Connection settings
400
public void setCacheLevelName(String constantName) throws IllegalArgumentException;
401
public void setCacheLevel(int cacheLevel);
402
403
// Recovery settings
404
public void setRecoveryInterval(long recoveryInterval);
405
public void setBackOffMultiplier(double backOffMultiplier);
406
public void setMaxRecoveryTime(long maxRecoveryTime);
407
408
// Transaction settings
409
public void setTransactionManager(PlatformTransactionManager transactionManager);
410
public void setTransactionName(String transactionName);
411
public void setTransactionTimeout(int transactionTimeout);
412
}
413
```
414
415
### Message Conversion
416
417
```java { .api }
418
// Strategy interface for converting between Java objects and JMS messages
419
public interface MessageConverter {
420
421
Message toMessage(Object object, Session session) throws JMSException, MessageConversionException;
422
423
Object fromMessage(Message message) throws JMSException, MessageConversionException;
424
}
425
426
// Base implementation of MessageConverter
427
public abstract class MessageConverterSupport implements MessageConverter {
428
429
public static final String TYPE_ID_PROPERTY = "__TypeId__";
430
public static final String CONTENT_TYPE_PROPERTY = "__ContentTypeId__";
431
public static final String KEY_TYPE_ID_PROPERTY = "__KeyTypeId__";
432
433
protected abstract Message createMessageForByteArray(byte[] bytes, Session session) throws JMSException;
434
protected abstract Message createMessageForString(String string, Session session) throws JMSException;
435
protected abstract Message createMessageForMap(Map<String, Object> map, Session session) throws JMSException;
436
protected abstract Message createMessageForSerializable(Serializable object, Session session) throws JMSException;
437
438
protected abstract byte[] extractByteArrayFromMessage(Message message) throws JMSException;
439
protected abstract String extractStringFromMessage(Message message) throws JMSException;
440
protected abstract Map<String, Object> extractMapFromMessage(Message message) throws JMSException;
441
protected abstract Serializable extractSerializableFromMessage(Message message) throws JMSException;
442
}
443
444
// JSON message converter using Jackson
445
public class MappingJackson2MessageConverter extends MessageConverterSupport {
446
447
public MappingJackson2MessageConverter();
448
449
public void setObjectMapper(ObjectMapper objectMapper);
450
public void setTargetType(MessageType targetType);
451
public void setTypeIdPropertyName(String typeIdPropertyName);
452
public void setTypeIdMappings(Map<String, Class<?>> typeIdMappings);
453
454
// Message type enum
455
public enum MessageType {
456
BYTES, TEXT
457
}
458
}
459
```
460
461
## Practical Usage Examples
462
463
### Basic JMS Configuration
464
465
```java { .api }
466
@Configuration
467
@EnableJms
468
public class JmsConfig {
469
470
@Bean
471
public ConnectionFactory connectionFactory() {
472
// Using ActiveMQ
473
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
474
connectionFactory.setBrokerURL("tcp://localhost:61616");
475
connectionFactory.setUserName("admin");
476
connectionFactory.setPassword("admin");
477
478
// Wrap with caching connection factory for better performance
479
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory);
480
cachingConnectionFactory.setCacheConsumers(true);
481
cachingConnectionFactory.setCacheProducers(true);
482
cachingConnectionFactory.setSessionCacheSize(10);
483
484
return cachingConnectionFactory;
485
}
486
487
@Bean
488
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
489
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
490
jmsTemplate.setMessageConverter(messageConverter());
491
jmsTemplate.setReceiveTimeout(5000); // 5 seconds
492
jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
493
jmsTemplate.setPriority(Message.DEFAULT_PRIORITY);
494
jmsTemplate.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
495
return jmsTemplate;
496
}
497
498
@Bean
499
public MessageConverter messageConverter() {
500
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
501
converter.setTargetType(MessageType.TEXT);
502
converter.setTypeIdPropertyName("_type");
503
504
// Configure type mappings
505
Map<String, Class<?>> typeIdMappings = new HashMap<>();
506
typeIdMappings.put("user", UserMessage.class);
507
typeIdMappings.put("order", OrderMessage.class);
508
converter.setTypeIdMappings(typeIdMappings);
509
510
return converter;
511
}
512
513
@Bean
514
public JmsListenerContainerFactory<?> jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
515
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
516
factory.setConnectionFactory(connectionFactory);
517
factory.setMessageConverter(messageConverter());
518
factory.setConcurrency("3-10"); // Min 3, max 10 consumers
519
factory.setReceiveTimeout(5000L);
520
factory.setAutoStartup(true);
521
522
// Error handling
523
factory.setErrorHandler(t -> {
524
System.err.println("Error in JMS listener: " + t.getMessage());
525
// Log error or send to dead letter queue
526
});
527
528
return factory;
529
}
530
531
// Topic listener factory for pub/sub
532
@Bean
533
public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) {
534
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
535
factory.setConnectionFactory(connectionFactory);
536
factory.setMessageConverter(messageConverter());
537
factory.setPubSubDomain(true); // Enable topic mode
538
factory.setSubscriptionDurable(true);
539
factory.setClientId("myapp-client");
540
return factory;
541
}
542
543
// Dead letter queue configuration
544
@Bean
545
public Queue deadLetterQueue() {
546
return new ActiveMQQueue("DLQ.MyApp");
547
}
548
}
549
```
550
551
### Message Producer Service
552
553
```java { .api }
554
@Service
555
public class MessageProducerService {
556
557
private final JmsTemplate jmsTemplate;
558
559
public MessageProducerService(JmsTemplate jmsTemplate) {
560
this.jmsTemplate = jmsTemplate;
561
}
562
563
// Simple message sending
564
public void sendMessage(String destination, Object message) {
565
jmsTemplate.convertAndSend(destination, message);
566
}
567
568
// Send message with headers
569
public void sendMessageWithHeaders(String destination, Object message, Map<String, Object> headers) {
570
jmsTemplate.convertAndSend(destination, message, messagePostProcessor -> {
571
headers.forEach((key, value) -> {
572
try {
573
messagePostProcessor.setObjectProperty(key, value);
574
} catch (JMSException e) {
575
throw new RuntimeException("Failed to set header: " + key, e);
576
}
577
});
578
return messagePostProcessor;
579
});
580
}
581
582
// Send user notification
583
public void sendUserNotification(Long userId, String notificationType, String content) {
584
UserNotificationMessage notification = UserNotificationMessage.builder()
585
.userId(userId)
586
.type(notificationType)
587
.content(content)
588
.timestamp(LocalDateTime.now())
589
.build();
590
591
Map<String, Object> headers = new HashMap<>();
592
headers.put("userId", userId);
593
headers.put("notificationType", notificationType);
594
headers.put("priority", "HIGH");
595
596
sendMessageWithHeaders("user.notifications", notification, headers);
597
}
598
599
// Send order events
600
public void sendOrderCreatedEvent(Order order) {
601
OrderCreatedEvent event = OrderCreatedEvent.builder()
602
.orderId(order.getId())
603
.userId(order.getUserId())
604
.totalAmount(order.getTotalAmount())
605
.items(order.getItems())
606
.timestamp(LocalDateTime.now())
607
.build();
608
609
// Send to multiple destinations
610
jmsTemplate.convertAndSend("order.created", event);
611
jmsTemplate.convertAndSend("audit.events", event);
612
613
// Send to topic for pub/sub
614
jmsTemplate.setPubSubDomain(true);
615
jmsTemplate.convertAndSend("order.events", event);
616
jmsTemplate.setPubSubDomain(false); // Reset to queue mode
617
}
618
619
// Request-reply pattern
620
public String sendRequestReply(String destination, Object request) {
621
return (String) jmsTemplate.sendAndReceive(destination, session -> {
622
ObjectMessage message = session.createObjectMessage((Serializable) request);
623
message.setJMSReplyTo(session.createTemporaryQueue());
624
return message;
625
});
626
}
627
628
// Delayed message sending
629
public void sendDelayedMessage(String destination, Object message, Duration delay) {
630
jmsTemplate.convertAndSend(destination, message, messagePostProcessor -> {
631
try {
632
// ActiveMQ specific - set delivery delay
633
messagePostProcessor.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,
634
delay.toMillis());
635
} catch (JMSException e) {
636
throw new RuntimeException("Failed to set delay", e);
637
}
638
return messagePostProcessor;
639
});
640
}
641
642
// Priority message sending
643
public void sendPriorityMessage(String destination, Object message, int priority) {
644
jmsTemplate.convertAndSend(destination, message, messagePostProcessor -> {
645
try {
646
messagePostProcessor.setJMSPriority(priority);
647
} catch (JMSException e) {
648
throw new RuntimeException("Failed to set priority", e);
649
}
650
return messagePostProcessor;
651
});
652
}
653
654
// Transactional message sending
655
@Transactional
656
public void sendTransactionalMessages(List<MessageRequest> requests) {
657
for (MessageRequest request : requests) {
658
jmsTemplate.convertAndSend(request.getDestination(), request.getPayload());
659
}
660
661
// If any exception occurs, all messages will be rolled back
662
if (requests.size() > 10) {
663
throw new IllegalArgumentException("Too many messages in single transaction");
664
}
665
}
666
}
667
```
668
669
### Message Listener Service
670
671
```java { .api }
672
@Component
673
public class MessageListenerService {
674
675
private static final Logger logger = LoggerFactory.getLogger(MessageListenerService.class);
676
677
private final UserService userService;
678
private final EmailService emailService;
679
private final AuditService auditService;
680
681
public MessageListenerService(UserService userService, EmailService emailService, AuditService auditService) {
682
this.userService = userService;
683
this.emailService = emailService;
684
this.auditService = auditService;
685
}
686
687
// Simple message listener
688
@JmsListener(destination = "user.notifications")
689
public void handleUserNotification(UserNotificationMessage notification) {
690
logger.info("Received user notification: {}", notification);
691
692
try {
693
User user = userService.findById(notification.getUserId());
694
emailService.sendNotificationEmail(user, notification.getContent());
695
} catch (Exception e) {
696
logger.error("Failed to process user notification", e);
697
throw new RuntimeException("Notification processing failed", e);
698
}
699
}
700
701
// Listener with message headers
702
@JmsListener(destination = "order.created")
703
public void handleOrderCreated(
704
@Payload OrderCreatedEvent event,
705
@Header("userId") Long userId,
706
@Header(value = "priority", defaultValue = "NORMAL") String priority) {
707
708
logger.info("Processing order created event for user {}: {}", userId, event);
709
710
// Process based on priority
711
if ("HIGH".equals(priority)) {
712
processHighPriorityOrder(event);
713
} else {
714
processNormalOrder(event);
715
}
716
}
717
718
// Listener with all headers
719
@JmsListener(destination = "audit.events")
720
public void handleAuditEvent(
721
@Payload Object event,
722
@Headers Map<String, Object> headers) {
723
724
AuditRecord audit = AuditRecord.builder()
725
.eventType(event.getClass().getSimpleName())
726
.payload(event)
727
.headers(headers)
728
.timestamp(LocalDateTime.now())
729
.build();
730
731
auditService.saveAuditRecord(audit);
732
}
733
734
// Topic listener with durable subscription
735
@JmsListener(
736
destination = "order.events",
737
containerFactory = "topicListenerFactory",
738
subscription = "orderEventSubscription"
739
)
740
public void handleOrderEvents(OrderEvent event) {
741
logger.info("Received order event: {}", event);
742
743
switch (event.getEventType()) {
744
case "ORDER_CREATED":
745
handleOrderCreatedFromTopic((OrderCreatedEvent) event);
746
break;
747
case "ORDER_UPDATED":
748
handleOrderUpdated((OrderUpdatedEvent) event);
749
break;
750
case "ORDER_CANCELLED":
751
handleOrderCancelled((OrderCancelledEvent) event);
752
break;
753
default:
754
logger.warn("Unknown order event type: {}", event.getEventType());
755
}
756
}
757
758
// Listener with message selector
759
@JmsListener(
760
destination = "notifications",
761
selector = "notificationType = 'EMAIL' OR priority = 'HIGH'"
762
)
763
public void handleHighPriorityNotifications(NotificationMessage notification) {
764
logger.info("Processing high priority notification: {}", notification);
765
// Process urgent notifications immediately
766
processUrgentNotification(notification);
767
}
768
769
// Listener with concurrency control
770
@JmsListener(
771
destination = "file.processing",
772
concurrency = "2-5" // Min 2, max 5 concurrent listeners
773
)
774
public void handleFileProcessing(FileProcessingRequest request) {
775
logger.info("Processing file: {}", request.getFilename());
776
777
try {
778
// Simulate file processing
779
Thread.sleep(5000);
780
processFile(request);
781
logger.info("File processing completed: {}", request.getFilename());
782
} catch (InterruptedException e) {
783
Thread.currentThread().interrupt();
784
throw new RuntimeException("File processing interrupted", e);
785
}
786
}
787
788
// Error handling listener
789
@JmsListener(destination = "error.queue")
790
public void handleErrors(
791
Message failedMessage,
792
@Header(JmsHeaders.DELIVERY_COUNT) int deliveryCount) {
793
794
logger.error("Processing failed message (attempt {}): {}", deliveryCount, failedMessage);
795
796
if (deliveryCount >= 3) {
797
// Move to dead letter queue after 3 attempts
798
sendToDeadLetterQueue(failedMessage);
799
} else {
800
// Retry processing
801
retryProcessing(failedMessage);
802
}
803
}
804
805
// Reply listener for request-response pattern
806
@JmsListener(destination = "user.lookup.request")
807
@SendTo("user.lookup.response")
808
public UserResponse handleUserLookupRequest(UserLookupRequest request) {
809
logger.info("Looking up user: {}", request.getUserId());
810
811
User user = userService.findById(request.getUserId());
812
813
return UserResponse.builder()
814
.userId(user.getId())
815
.username(user.getUsername())
816
.email(user.getEmail())
817
.found(true)
818
.build();
819
}
820
821
// Conditional listener (only active in certain profiles)
822
@JmsListener(
823
destination = "development.debug",
824
condition = "#{environment.acceptsProfiles('development')}"
825
)
826
public void handleDebugMessages(DebugMessage message) {
827
logger.debug("Debug message: {}", message);
828
// Only process in development environment
829
}
830
831
private void processHighPriorityOrder(OrderCreatedEvent event) {
832
// Expedited processing for VIP customers
833
logger.info("Processing high priority order: {}", event.getOrderId());
834
}
835
836
private void processNormalOrder(OrderCreatedEvent event) {
837
// Standard order processing
838
logger.info("Processing normal order: {}", event.getOrderId());
839
}
840
841
private void handleOrderCreatedFromTopic(OrderCreatedEvent event) {
842
// Update inventory, send confirmation email, etc.
843
logger.info("Handling order created from topic: {}", event.getOrderId());
844
}
845
846
private void handleOrderUpdated(OrderUpdatedEvent event) {
847
logger.info("Handling order updated: {}", event.getOrderId());
848
}
849
850
private void handleOrderCancelled(OrderCancelledEvent event) {
851
logger.info("Handling order cancelled: {}", event.getOrderId());
852
}
853
854
private void processUrgentNotification(NotificationMessage notification) {
855
// Immediate processing for urgent notifications
856
logger.info("Processing urgent notification: {}", notification.getId());
857
}
858
859
private void processFile(FileProcessingRequest request) {
860
// File processing logic
861
logger.info("Processing file: {}", request.getFilename());
862
}
863
864
private void sendToDeadLetterQueue(Message message) {
865
// Send to DLQ for manual inspection
866
logger.error("Sending message to dead letter queue: {}", message);
867
}
868
869
private void retryProcessing(Message message) {
870
// Retry logic
871
logger.info("Retrying message processing: {}", message);
872
}
873
}
874
```
875
876
### Advanced JMS Features
877
878
```java { .api }
879
// Custom message converter
880
@Component
881
public class CustomMessageConverter implements MessageConverter {
882
883
private final ObjectMapper objectMapper;
884
885
public CustomMessageConverter(ObjectMapper objectMapper) {
886
this.objectMapper = objectMapper;
887
}
888
889
@Override
890
public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {
891
try {
892
if (object instanceof String) {
893
return session.createTextMessage((String) object);
894
} else if (object instanceof byte[]) {
895
BytesMessage message = session.createBytesMessage();
896
message.writeBytes((byte[]) object);
897
return message;
898
} else {
899
// Convert to JSON
900
String json = objectMapper.writeValueAsString(object);
901
TextMessage message = session.createTextMessage(json);
902
message.setStringProperty("_type", object.getClass().getName());
903
return message;
904
}
905
} catch (Exception e) {
906
throw new MessageConversionException("Failed to convert message", e);
907
}
908
}
909
910
@Override
911
public Object fromMessage(Message message) throws JMSException, MessageConversionException {
912
try {
913
if (message instanceof TextMessage) {
914
TextMessage textMessage = (TextMessage) message;
915
String text = textMessage.getText();
916
917
// Check if it has type information
918
String typeProperty = message.getStringProperty("_type");
919
if (typeProperty != null) {
920
Class<?> targetClass = Class.forName(typeProperty);
921
return objectMapper.readValue(text, targetClass);
922
} else {
923
return text;
924
}
925
} else if (message instanceof BytesMessage) {
926
BytesMessage bytesMessage = (BytesMessage) message;
927
long bodyLength = bytesMessage.getBodyLength();
928
byte[] bytes = new byte[(int) bodyLength];
929
bytesMessage.readBytes(bytes);
930
return bytes;
931
} else {
932
throw new MessageConversionException("Unsupported message type: " + message.getClass());
933
}
934
} catch (Exception e) {
935
throw new MessageConversionException("Failed to convert message", e);
936
}
937
}
938
}
939
940
// JMS transaction manager configuration
941
@Configuration
942
public class JmsTransactionConfig {
943
944
@Bean
945
public PlatformTransactionManager jmsTransactionManager(ConnectionFactory connectionFactory) {
946
return new JmsTransactionManager(connectionFactory);
947
}
948
949
@Bean
950
public JmsTemplate transactionalJmsTemplate(ConnectionFactory connectionFactory) {
951
JmsTemplate template = new JmsTemplate(connectionFactory);
952
template.setSessionTransacted(true);
953
return template;
954
}
955
}
956
957
// Message-driven service with transactions
958
@Service
959
@Transactional
960
public class TransactionalMessageService {
961
962
private final JmsTemplate jmsTemplate;
963
private final UserRepository userRepository;
964
965
public TransactionalMessageService(JmsTemplate jmsTemplate, UserRepository userRepository) {
966
this.jmsTemplate = jmsTemplate;
967
this.userRepository = userRepository;
968
}
969
970
@JmsListener(destination = "user.updates")
971
@Transactional
972
public void handleUserUpdate(UserUpdateMessage message) {
973
// Update user in database
974
User user = userRepository.findById(message.getUserId())
975
.orElseThrow(() -> new UserNotFoundException("User not found: " + message.getUserId()));
976
977
user.setEmail(message.getNewEmail());
978
userRepository.save(user);
979
980
// Send confirmation message - both operations in same transaction
981
jmsTemplate.convertAndSend("user.update.confirmation",
982
UserUpdateConfirmation.builder()
983
.userId(message.getUserId())
984
.status("SUCCESS")
985
.timestamp(LocalDateTime.now())
986
.build());
987
}
988
989
@Transactional(rollbackFor = Exception.class)
990
public void processOrderWithRollback(OrderProcessingRequest request) {
991
try {
992
// Process order
993
processOrder(request);
994
995
// Send success notification
996
jmsTemplate.convertAndSend("order.processed",
997
OrderProcessedEvent.builder()
998
.orderId(request.getOrderId())
999
.status("PROCESSED")
1000
.build());
1001
1002
} catch (Exception e) {
1003
// Transaction will be rolled back, JMS message won't be sent
1004
throw new OrderProcessingException("Failed to process order", e);
1005
}
1006
}
1007
1008
private void processOrder(OrderProcessingRequest request) {
1009
// Order processing logic
1010
}
1011
}
1012
1013
// Message listener with retry and dead letter handling
1014
@Component
1015
public class RobustMessageListener {
1016
1017
private static final Logger logger = LoggerFactory.getLogger(RobustMessageListener.class);
1018
1019
@Retryable(
1020
value = {ProcessingException.class},
1021
maxAttempts = 3,
1022
backoff = @Backoff(delay = 1000, multiplier = 2)
1023
)
1024
@JmsListener(destination = "robust.processing")
1025
public void handleMessage(ProcessingMessage message) {
1026
logger.info("Processing message: {}", message.getId());
1027
1028
try {
1029
processMessage(message);
1030
} catch (ProcessingException e) {
1031
logger.warn("Processing failed for message {}, will retry", message.getId());
1032
throw e; // Will trigger retry
1033
} catch (Exception e) {
1034
logger.error("Unexpected error processing message {}", message.getId(), e);
1035
handleProcessingError(message, e);
1036
}
1037
}
1038
1039
@Recover
1040
public void recover(ProcessingException ex, ProcessingMessage message) {
1041
logger.error("All retry attempts exhausted for message {}", message.getId());
1042
sendToDeadLetterQueue(message, ex);
1043
}
1044
1045
private void processMessage(ProcessingMessage message) {
1046
// Message processing logic that might fail
1047
if (message.isCorrupted()) {
1048
throw new ProcessingException("Message is corrupted");
1049
}
1050
1051
// Actual processing...
1052
}
1053
1054
private void handleProcessingError(ProcessingMessage message, Exception e) {
1055
// Send to error queue for analysis
1056
ErrorMessage errorMessage = ErrorMessage.builder()
1057
.originalMessage(message)
1058
.errorMessage(e.getMessage())
1059
.timestamp(LocalDateTime.now())
1060
.build();
1061
1062
jmsTemplate.convertAndSend("error.analysis", errorMessage);
1063
}
1064
1065
private void sendToDeadLetterQueue(ProcessingMessage message, Exception e) {
1066
DeadLetterMessage dlqMessage = DeadLetterMessage.builder()
1067
.originalMessage(message)
1068
.failureReason(e.getMessage())
1069
.maxAttemptsReached(true)
1070
.timestamp(LocalDateTime.now())
1071
.build();
1072
1073
jmsTemplate.convertAndSend("dlq.processing", dlqMessage);
1074
}
1075
}
1076
```
1077
1078
### Integration with Spring Integration
1079
1080
```java { .api }
1081
// Spring Integration configuration for JMS
1082
@Configuration
1083
@EnableIntegration
1084
public class JmsIntegrationConfig {
1085
1086
@Bean
1087
public IntegrationFlow jmsInboundFlow(ConnectionFactory connectionFactory) {
1088
return IntegrationFlows
1089
.from(Jms.messageDrivenChannelAdapter(connectionFactory)
1090
.destination("integration.inbound")
1091
.configureListenerContainer(c -> c.sessionTransacted(true)))
1092
.transform(Transformers.objectToString())
1093
.filter("payload.contains('PRIORITY')")
1094
.channel("priorityChannel")
1095
.get();
1096
}
1097
1098
@Bean
1099
public IntegrationFlow jmsOutboundFlow(ConnectionFactory connectionFactory) {
1100
return IntegrationFlows
1101
.from("outboundChannel")
1102
.transform(Transformers.toJson())
1103
.handle(Jms.outboundAdapter(connectionFactory)
1104
.destination("integration.outbound"))
1105
.get();
1106
}
1107
1108
@Bean
1109
public MessageChannel priorityChannel() {
1110
return MessageChannels.queue().get();
1111
}
1112
1113
@Bean
1114
public MessageChannel outboundChannel() {
1115
return MessageChannels.direct().get();
1116
}
1117
1118
@ServiceActivator(inputChannel = "priorityChannel")
1119
public void handlePriorityMessage(String message) {
1120
logger.info("Handling priority message: {}", message);
1121
// Process priority messages
1122
}
1123
}
1124
```
1125
1126
### Message Testing
1127
1128
```java { .api }
1129
@SpringBootTest
1130
@DirtiesContext
1131
class JmsMessageTest {
1132
1133
@Autowired
1134
private JmsTemplate jmsTemplate;
1135
1136
@Autowired
1137
private MessageListenerService messageListener;
1138
1139
@Test
1140
@Transactional
1141
@Rollback
1142
void shouldSendAndReceiveMessage() {
1143
// Given
1144
UserNotificationMessage notification = UserNotificationMessage.builder()
1145
.userId(1L)
1146
.type("WELCOME")
1147
.content("Welcome to our platform!")
1148
.timestamp(LocalDateTime.now())
1149
.build();
1150
1151
// When
1152
jmsTemplate.convertAndSend("test.notifications", notification);
1153
1154
// Then
1155
UserNotificationMessage received = (UserNotificationMessage)
1156
jmsTemplate.receiveAndConvert("test.notifications");
1157
1158
assertThat(received).isNotNull();
1159
assertThat(received.getUserId()).isEqualTo(1L);
1160
assertThat(received.getType()).isEqualTo("WELCOME");
1161
}
1162
1163
@Test
1164
void shouldProcessMessageThroughListener() {
1165
// Given
1166
OrderCreatedEvent event = OrderCreatedEvent.builder()
1167
.orderId(123L)
1168
.userId(1L)
1169
.totalAmount(BigDecimal.valueOf(99.99))
1170
.build();
1171
1172
// When
1173
messageListener.handleOrderCreated(event, 1L, "HIGH");
1174
1175
// Then - verify the processing was successful
1176
// (This would typically involve verifying database changes or other side effects)
1177
}
1178
}
1179
1180
// Integration test with embedded broker
1181
@SpringBootTest
1182
@TestPropertySource(properties = {
1183
"spring.activemq.broker-url=vm://localhost?broker.persistent=false",
1184
"spring.jms.cache.enabled=false"
1185
})
1186
class JmsIntegrationTest {
1187
1188
@Autowired
1189
private MessageProducerService producerService;
1190
1191
@MockBean
1192
private EmailService emailService;
1193
1194
@Test
1195
void shouldProcessNotificationEndToEnd() throws InterruptedException {
1196
// Given
1197
Long userId = 1L;
1198
String content = "Test notification";
1199
1200
// When
1201
producerService.sendUserNotification(userId, "EMAIL", content);
1202
1203
// Wait for async processing
1204
Thread.sleep(1000);
1205
1206
// Then
1207
verify(emailService, timeout(5000)).sendNotificationEmail(any(User.class), eq(content));
1208
}
1209
}
1210
```
1211
1212
Spring Messaging and JMS support provide a robust foundation for building message-driven applications with support for both point-to-point and publish-subscribe messaging patterns, transaction management, and error handling.