0
# Messaging Configuration
1
2
Spring Boot's messaging configuration provides comprehensive auto-configuration for messaging systems including JMS, AMQP (RabbitMQ), Apache Kafka, and Apache Pulsar.
3
4
## Capabilities
5
6
### JMS Configuration
7
8
Auto-configuration for Java Message Service (JMS) with support for various message brokers.
9
10
```java { .api }
11
/**
12
* Auto-configuration for JMS
13
* Provides configuration for JMS connection factories and templates
14
*/
15
@AutoConfiguration
16
@ConditionalOnClass({Message.class, JmsTemplate.class})
17
@ConditionalOnBean(ConnectionFactory.class)
18
@EnableConfigurationProperties(JmsProperties.class)
19
public class JmsAutoConfiguration {
20
21
/**
22
* JMS template for sending messages
23
*/
24
@Bean
25
@Primary
26
@ConditionalOnMissingBean(JmsTemplate.class)
27
@ConditionalOnSingleCandidate(ConnectionFactory.class)
28
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
29
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
30
jmsTemplate.setPubSubDomain(this.properties.isPubSubDomain());
31
Duration receiveTimeout = this.properties.getTemplate().getReceiveTimeout();
32
if (receiveTimeout != null) {
33
jmsTemplate.setReceiveTimeout(receiveTimeout.toMillis());
34
}
35
Duration deliveryDelay = this.properties.getTemplate().getDeliveryDelay();
36
if (deliveryDelay != null) {
37
jmsTemplate.setDeliveryDelay(deliveryDelay.toMillis());
38
}
39
return jmsTemplate;
40
}
41
42
/**
43
* JMS listener container factory
44
*/
45
@Bean
46
@ConditionalOnClass(DefaultJmsListenerContainerFactory.class)
47
@ConditionalOnMissingBean(DefaultJmsListenerContainerFactory.class)
48
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(
49
ConnectionFactory connectionFactory,
50
JmsProperties properties,
51
ObjectProvider<JmsListenerContainerFactoryConfigurer> configurerProvider) {
52
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
53
configurerProvider.ifAvailable(configurer -> configurer.configure(factory, connectionFactory));
54
return factory;
55
}
56
}
57
58
/**
59
* JMS configuration properties
60
*/
61
@ConfigurationProperties(prefix = "spring.jms")
62
public class JmsProperties {
63
64
/**
65
* Whether the default destination type is topic
66
*/
67
private boolean pubSubDomain = false;
68
69
/**
70
* JMS template configuration
71
*/
72
private final Template template = new Template();
73
74
/**
75
* JMS listener configuration
76
*/
77
private final Listener listener = new Listener();
78
79
public boolean isPubSubDomain() { return this.pubSubDomain; }
80
public void setPubSubDomain(boolean pubSubDomain) { this.pubSubDomain = pubSubDomain; }
81
public Template getTemplate() { return this.template; }
82
public Listener getListener() { return this.listener; }
83
84
/**
85
* JMS template configuration
86
*/
87
public static class Template {
88
/**
89
* Default destination to use for send operations
90
*/
91
private String defaultDestination;
92
93
/**
94
* Delivery delay to use for send calls
95
*/
96
private Duration deliveryDelay;
97
98
/**
99
* Delivery mode for messages
100
*/
101
private DeliveryMode deliveryMode;
102
103
/**
104
* Priority of a message when sending
105
*/
106
private Integer priority;
107
108
/**
109
* Time-to-live of a message when sending
110
*/
111
private Duration timeToLive;
112
113
/**
114
* Timeout to use for receive calls
115
*/
116
private Duration receiveTimeout;
117
118
// Getters and setters
119
public String getDefaultDestination() { return this.defaultDestination; }
120
public void setDefaultDestination(String defaultDestination) { this.defaultDestination = defaultDestination; }
121
public Duration getReceiveTimeout() { return this.receiveTimeout; }
122
public void setReceiveTimeout(Duration receiveTimeout) { this.receiveTimeout = receiveTimeout; }
123
124
public enum DeliveryMode {
125
NON_PERSISTENT(1), PERSISTENT(2);
126
127
private final int value;
128
129
DeliveryMode(int value) { this.value = value; }
130
public int getValue() { return this.value; }
131
}
132
}
133
134
/**
135
* JMS listener configuration
136
*/
137
public static class Listener {
138
/**
139
* Start the container automatically on startup
140
*/
141
private Boolean autoStartup;
142
143
/**
144
* Acknowledge mode of the container
145
*/
146
private AcknowledgeMode acknowledgeMode;
147
148
/**
149
* Minimum number of concurrent consumers
150
*/
151
private Integer concurrency;
152
153
/**
154
* Maximum number of concurrent consumers
155
*/
156
private Integer maxConcurrency;
157
158
// Getters and setters
159
public AcknowledgeMode getAcknowledgeMode() { return this.acknowledgeMode; }
160
public void setAcknowledgeMode(AcknowledgeMode acknowledgeMode) { this.acknowledgeMode = acknowledgeMode; }
161
162
public enum AcknowledgeMode {
163
AUTO(1), CLIENT(2), DUPS_OK(3);
164
165
private final int mode;
166
167
AcknowledgeMode(int mode) { this.mode = mode; }
168
public int getMode() { return this.mode; }
169
}
170
}
171
}
172
```
173
174
### RabbitMQ AMQP Configuration
175
176
Auto-configuration for RabbitMQ Advanced Message Queuing Protocol (AMQP).
177
178
```java { .api }
179
/**
180
* Auto-configuration for RabbitMQ
181
* Configures RabbitMQ connection factory, template, and admin
182
*/
183
@AutoConfiguration
184
@ConditionalOnClass({RabbitTemplate.class, Channel.class})
185
@EnableConfigurationProperties(RabbitProperties.class)
186
public class RabbitAutoConfiguration {
187
188
/**
189
* RabbitMQ connection factory
190
*/
191
@Bean
192
@ConditionalOnMissingBean
193
public CachingConnectionFactory rabbitConnectionFactory(
194
RabbitProperties properties,
195
ObjectProvider<CredentialsProvider> credentialsProvider,
196
ObjectProvider<CredentialsRefreshService> credentialsRefreshService,
197
ObjectProvider<ConnectionNameStrategy> connectionNameStrategy,
198
ObjectProvider<ConnectionFactoryCustomizer> customizers) throws Exception {
199
200
CachingConnectionFactory factory = new CachingConnectionFactory(
201
getRabbitConnectionFactoryBean(properties).getObject());
202
factory.setAddresses(properties.determineAddresses());
203
factory.setPublisherConfirmType(properties.getPublisherConfirmType());
204
factory.setPublisherReturns(properties.isPublisherReturns());
205
206
RabbitProperties.Cache.Channel channel = properties.getCache().getChannel();
207
factory.setChannelCacheSize(channel.getSize());
208
factory.setChannelCheckoutTimeout(channel.getCheckoutTimeout().toMillis());
209
210
RabbitProperties.Cache.Connection connection = properties.getCache().getConnection();
211
factory.setCacheMode(connection.getMode());
212
if (connection.getMode() == CachingConnectionFactory.CacheMode.CONNECTION) {
213
factory.setConnectionCacheSize(connection.getSize());
214
}
215
216
return factory;
217
}
218
219
/**
220
* RabbitMQ template for message operations
221
*/
222
@Bean
223
@ConditionalOnSingleCandidate(ConnectionFactory.class)
224
@ConditionalOnMissingBean
225
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
226
ObjectProvider<MessageConverter> messageConverter,
227
ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers,
228
ObjectProvider<RabbitTemplateCustomizer> templateCustomizers,
229
RabbitProperties properties) {
230
RabbitTemplate template = new RabbitTemplate(connectionFactory);
231
messageConverter.ifUnique(template::setMessageConverter);
232
template.setMandatory(determineMandatoryFlag(properties));
233
234
RabbitProperties.Template templateProperties = properties.getTemplate();
235
if (templateProperties.getRetry().isEnabled()) {
236
template.setRetryTemplate(createRetryTemplate(templateProperties.getRetry(), retryTemplateCustomizers));
237
}
238
if (templateProperties.getReceiveTimeout() != null) {
239
template.setReceiveTimeout(templateProperties.getReceiveTimeout().toMillis());
240
}
241
if (templateProperties.getReplyTimeout() != null) {
242
template.setReplyTimeout(templateProperties.getReplyTimeout().toMillis());
243
}
244
245
templateCustomizers.orderedStream().forEach(customizer -> customizer.customize(template));
246
return template;
247
}
248
249
/**
250
* RabbitMQ admin for managing queues, exchanges, and bindings
251
*/
252
@Bean
253
@ConditionalOnSingleCandidate(ConnectionFactory.class)
254
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
255
@ConditionalOnMissingBean
256
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
257
return new RabbitAdmin(connectionFactory);
258
}
259
}
260
261
/**
262
* RabbitMQ configuration properties
263
*/
264
@ConfigurationProperties(prefix = "spring.rabbitmq")
265
public class RabbitProperties {
266
267
/**
268
* RabbitMQ host
269
*/
270
private String host = "localhost";
271
272
/**
273
* RabbitMQ port
274
*/
275
private int port = 5672;
276
277
/**
278
* Login username to authenticate to the broker
279
*/
280
private String username = "guest";
281
282
/**
283
* Login password to authenticate to the broker
284
*/
285
private String password = "guest";
286
287
/**
288
* SSL configuration
289
*/
290
private final Ssl ssl = new Ssl();
291
292
/**
293
* Virtual host to use when connecting to the broker
294
*/
295
private String virtualHost;
296
297
/**
298
* Comma-separated list of addresses to which the client should connect
299
*/
300
private String addresses;
301
302
/**
303
* Requested heartbeat timeout
304
*/
305
private Duration requestedHeartbeat;
306
307
/**
308
* Whether to enable publisher confirmations
309
*/
310
private CachingConnectionFactory.ConfirmType publisherConfirmType;
311
312
/**
313
* Whether to enable publisher returns
314
*/
315
private boolean publisherReturns;
316
317
/**
318
* Connection timeout
319
*/
320
private Duration connectionTimeout;
321
322
/**
323
* Cache configuration
324
*/
325
private final Cache cache = new Cache();
326
327
/**
328
* Listener configuration
329
*/
330
private final Listener listener = new Listener();
331
332
/**
333
* Template configuration
334
*/
335
private final Template template = new Template();
336
337
// Getters and setters
338
public String getHost() { return this.host; }
339
public void setHost(String host) { this.host = host; }
340
public int getPort() { return this.port; }
341
public void setPort(int port) { this.port = port; }
342
343
/**
344
* SSL configuration for secure connections
345
*/
346
public static class Ssl {
347
/**
348
* Whether to enable SSL support
349
*/
350
private boolean enabled;
351
352
/**
353
* Path to the key store that holds the SSL certificate
354
*/
355
private String keyStore;
356
357
/**
358
* Key store type
359
*/
360
private String keyStoreType = "PKCS12";
361
362
/**
363
* Password used to access the key store
364
*/
365
private String keyStorePassword;
366
367
/**
368
* Path to the trust store that holds SSL certificates
369
*/
370
private String trustStore;
371
372
/**
373
* Trust store type
374
*/
375
private String trustStoreType = "JKS";
376
377
/**
378
* Password used to access the trust store
379
*/
380
private String trustStorePassword;
381
382
/**
383
* SSL algorithm to use
384
*/
385
private String algorithm;
386
387
/**
388
* Whether to validate the server certificate
389
*/
390
private boolean validateServerCertificate = true;
391
392
/**
393
* Whether to verify the hostname
394
*/
395
private boolean verifyHostname = true;
396
397
// Getters and setters
398
public boolean isEnabled() { return this.enabled; }
399
public void setEnabled(boolean enabled) { this.enabled = enabled; }
400
}
401
}
402
```
403
404
### Apache Kafka Configuration
405
406
Auto-configuration for Apache Kafka messaging system.
407
408
```java { .api }
409
/**
410
* Auto-configuration for Apache Kafka
411
* Configures Kafka producers, consumers, and admin clients
412
*/
413
@AutoConfiguration
414
@ConditionalOnClass(KafkaTemplate.class)
415
@EnableConfigurationProperties(KafkaProperties.class)
416
public class KafkaAutoConfiguration {
417
418
/**
419
* Kafka template for message operations
420
*/
421
@Bean
422
@ConditionalOnMissingBean(KafkaTemplate.class)
423
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
424
ProducerListener<Object, Object> kafkaProducerListener,
425
ObjectProvider<RecordMessageConverter> messageConverter) {
426
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
427
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
428
kafkaTemplate.setProducerListener(kafkaProducerListener);
429
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
430
return kafkaTemplate;
431
}
432
433
/**
434
* Kafka producer factory
435
*/
436
@Bean
437
@ConditionalOnMissingBean(ProducerFactory.class)
438
public ProducerFactory<?, ?> kafkaProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
439
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
440
this.properties.buildProducerProperties());
441
String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
442
if (transactionIdPrefix != null) {
443
factory.setTransactionIdPrefix(transactionIdPrefix);
444
}
445
customizers.orderedStream().forEach(customizer -> customizer.customize(factory));
446
return factory;
447
}
448
449
/**
450
* Kafka consumer factory
451
*/
452
@Bean
453
@ConditionalOnMissingBean(ConsumerFactory.class)
454
public ConsumerFactory<?, ?> kafkaConsumerFactory(ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
455
DefaultKafkaConsumerFactory<?, ?> factory = new DefaultKafkaConsumerFactory<>(
456
this.properties.buildConsumerProperties());
457
customizers.orderedStream().forEach(customizer -> customizer.customize(factory));
458
return factory;
459
}
460
461
/**
462
* Kafka admin client
463
*/
464
@Bean
465
@ConditionalOnMissingBean(KafkaAdmin.class)
466
public KafkaAdmin kafkaAdmin() {
467
KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
468
kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());
469
kafkaAdmin.setModifyTopicConfigs(this.properties.getAdmin().isModifyTopicConfigs());
470
return kafkaAdmin;
471
}
472
}
473
474
/**
475
* Kafka configuration properties
476
*/
477
@ConfigurationProperties(prefix = "spring.kafka")
478
public class KafkaProperties {
479
480
/**
481
* Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster
482
*/
483
private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));
484
485
/**
486
* ID to pass to the server when making requests
487
*/
488
private String clientId;
489
490
/**
491
* Additional properties, common to producers and consumers, used to configure the client
492
*/
493
private final Map<String, String> properties = new HashMap<>();
494
495
/**
496
* Producer configuration
497
*/
498
private final Producer producer = new Producer();
499
500
/**
501
* Consumer configuration
502
*/
503
private final Consumer consumer = new Consumer();
504
505
/**
506
* Admin client configuration
507
*/
508
private final Admin admin = new Admin();
509
510
/**
511
* Streams configuration
512
*/
513
private final Streams streams = new Streams();
514
515
/**
516
* Template configuration
517
*/
518
private final Template template = new Template();
519
520
/**
521
* Security configuration
522
*/
523
private final Security security = new Security();
524
525
// Getters and setters
526
public List<String> getBootstrapServers() { return this.bootstrapServers; }
527
public void setBootstrapServers(List<String> bootstrapServers) { this.bootstrapServers = bootstrapServers; }
528
529
/**
530
* Kafka producer configuration
531
*/
532
public static class Producer {
533
/**
534
* Default topic to send messages to
535
*/
536
private String transactionIdPrefix;
537
538
/**
539
* Number of acknowledgments the producer requires the leader to have received
540
*/
541
private String acks = "1";
542
543
/**
544
* Default batch size in bytes
545
*/
546
private Integer batchSize;
547
548
/**
549
* Total bytes of memory the producer can use to buffer records
550
*/
551
private Long bufferMemory;
552
553
/**
554
* Compression type for all data generated by the producer
555
*/
556
private String compressionType;
557
558
/**
559
* Key serializer class
560
*/
561
private Class<?> keySerializer = StringSerializer.class;
562
563
/**
564
* Value serializer class
565
*/
566
private Class<?> valueSerializer = StringSerializer.class;
567
568
// Getters and setters
569
public String getAcks() { return this.acks; }
570
public void setAcks(String acks) { this.acks = acks; }
571
public Class<?> getKeySerializer() { return this.keySerializer; }
572
public void setKeySerializer(Class<?> keySerializer) { this.keySerializer = keySerializer; }
573
}
574
575
/**
576
* Kafka consumer configuration
577
*/
578
public static class Consumer {
579
/**
580
* Unique string that identifies the consumer group this consumer belongs to
581
*/
582
private String groupId;
583
584
/**
585
* What to do when there is no initial offset in Kafka
586
*/
587
private String autoOffsetReset = "latest";
588
589
/**
590
* Whether the consumer's offset is periodically committed in the background
591
*/
592
private Boolean enableAutoCommit;
593
594
/**
595
* Frequency with which the consumer offsets are auto-committed to Kafka
596
*/
597
private Duration autoCommitInterval;
598
599
/**
600
* Key deserializer class
601
*/
602
private Class<?> keyDeserializer = StringDeserializer.class;
603
604
/**
605
* Value deserializer class
606
*/
607
private Class<?> valueDeserializer = StringDeserializer.class;
608
609
/**
610
* Maximum number of records returned in a single call to poll()
611
*/
612
private Integer maxPollRecords;
613
614
// Getters and setters
615
public String getGroupId() { return this.groupId; }
616
public void setGroupId(String groupId) { this.groupId = groupId; }
617
public Class<?> getKeyDeserializer() { return this.keyDeserializer; }
618
public void setKeyDeserializer(Class<?> keyDeserializer) { this.keyDeserializer = keyDeserializer; }
619
}
620
}
621
```
622
623
**Usage Examples:**
624
625
```java
626
// JMS message producer
627
@Component
628
public class OrderMessageProducer {
629
630
private final JmsTemplate jmsTemplate;
631
632
public OrderMessageProducer(JmsTemplate jmsTemplate) {
633
this.jmsTemplate = jmsTemplate;
634
}
635
636
public void sendOrderMessage(Order order) {
637
jmsTemplate.convertAndSend("order.queue", order);
638
}
639
}
640
641
// JMS message listener
642
@Component
643
public class OrderMessageListener {
644
645
@JmsListener(destination = "order.queue")
646
public void handleOrderMessage(Order order) {
647
System.out.println("Received order: " + order.getId());
648
// Process order
649
}
650
}
651
652
// Kafka producer
653
@Service
654
public class EventProducer {
655
656
private final KafkaTemplate<String, Object> kafkaTemplate;
657
658
public EventProducer(KafkaTemplate<String, Object> kafkaTemplate) {
659
this.kafkaTemplate = kafkaTemplate;
660
}
661
662
public void publishEvent(String topic, String key, Object event) {
663
kafkaTemplate.send(topic, key, event);
664
}
665
}
666
667
// Kafka consumer
668
@Component
669
public class EventConsumer {
670
671
@KafkaListener(topics = "user-events", groupId = "user-service")
672
public void handleUserEvent(UserEvent event) {
673
System.out.println("Processing user event: " + event.getType());
674
// Process event
675
}
676
}
677
678
// Properties configuration
679
# application.properties
680
# JMS
681
spring.jms.pub-sub-domain=false
682
spring.jms.template.default-destination=default.queue
683
spring.jms.template.delivery-mode=persistent
684
685
# Kafka
686
spring.kafka.bootstrap-servers=localhost:9092
687
spring.kafka.consumer.group-id=my-group
688
spring.kafka.consumer.auto-offset-reset=earliest
689
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
690
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
691
692
# RabbitMQ
693
spring.rabbitmq.host=localhost
694
spring.rabbitmq.port=5672
695
spring.rabbitmq.username=guest
696
spring.rabbitmq.password=guest
697
spring.rabbitmq.virtual-host=/
698
```
699
700
## Types
701
702
### Messaging Configuration Types
703
704
```java { .api }
705
/**
706
* Message converter for different formats
707
*/
708
public interface MessageConverter {
709
/**
710
* Convert Java object to Message
711
* @param object the object to convert
712
* @param session the JMS session
713
* @return converted Message
714
*/
715
Message toMessage(Object object, Session session) throws JMSException, MessageConversionException;
716
717
/**
718
* Convert Message to Java object
719
* @param message the message to convert
720
* @return converted object
721
*/
722
Object fromMessage(Message message) throws JMSException, MessageConversionException;
723
}
724
725
/**
726
* Customizer for Kafka producer factories
727
*/
728
@FunctionalInterface
729
public interface DefaultKafkaProducerFactoryCustomizer {
730
/**
731
* Customize the Kafka producer factory
732
* @param producerFactory the producer factory to customize
733
*/
734
void customize(DefaultKafkaProducerFactory<?, ?> producerFactory);
735
}
736
737
/**
738
* Customizer for Kafka consumer factories
739
*/
740
@FunctionalInterface
741
public interface DefaultKafkaConsumerFactoryCustomizer {
742
/**
743
* Customize the Kafka consumer factory
744
* @param consumerFactory the consumer factory to customize
745
*/
746
void customize(DefaultKafkaConsumerFactory<?, ?> consumerFactory);
747
}
748
749
/**
750
* Customizer for RabbitMQ templates
751
*/
752
@FunctionalInterface
753
public interface RabbitTemplateCustomizer {
754
/**
755
* Customize the RabbitMQ template
756
* @param rabbitTemplate the template to customize
757
*/
758
void customize(RabbitTemplate rabbitTemplate);
759
}
760
```