or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cache-configuration.mdconditional-configuration.mdcore-autoconfiguration.mddata-configuration.mdindex.mdjson-configuration.mdmessaging-configuration.mdsecurity-configuration.mdservice-connections.mdtemplate-configuration.mdweb-configuration.md

messaging-configuration.mddocs/

0

# Messaging Configuration

1

2

Spring Boot's messaging configuration provides comprehensive auto-configuration for messaging systems including JMS, AMQP (RabbitMQ), Apache Kafka, and Apache Pulsar.

3

4

## Capabilities

5

6

### JMS Configuration

7

8

Auto-configuration for Java Message Service (JMS) with support for various message brokers.

9

10

```java { .api }

11

/**

12

* Auto-configuration for JMS

13

* Provides configuration for JMS connection factories and templates

14

*/

15

@AutoConfiguration

16

@ConditionalOnClass({Message.class, JmsTemplate.class})

17

@ConditionalOnBean(ConnectionFactory.class)

18

@EnableConfigurationProperties(JmsProperties.class)

19

public class JmsAutoConfiguration {

20

21

/**

22

* JMS template for sending messages

23

*/

24

@Bean

25

@Primary

26

@ConditionalOnMissingBean(JmsTemplate.class)

27

@ConditionalOnSingleCandidate(ConnectionFactory.class)

28

public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {

29

JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);

30

jmsTemplate.setPubSubDomain(this.properties.isPubSubDomain());

31

Duration receiveTimeout = this.properties.getTemplate().getReceiveTimeout();

32

if (receiveTimeout != null) {

33

jmsTemplate.setReceiveTimeout(receiveTimeout.toMillis());

34

}

35

Duration deliveryDelay = this.properties.getTemplate().getDeliveryDelay();

36

if (deliveryDelay != null) {

37

jmsTemplate.setDeliveryDelay(deliveryDelay.toMillis());

38

}

39

return jmsTemplate;

40

}

41

42

/**

43

* JMS listener container factory

44

*/

45

@Bean

46

@ConditionalOnClass(DefaultJmsListenerContainerFactory.class)

47

@ConditionalOnMissingBean(DefaultJmsListenerContainerFactory.class)

48

public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(

49

ConnectionFactory connectionFactory,

50

JmsProperties properties,

51

ObjectProvider<JmsListenerContainerFactoryConfigurer> configurerProvider) {

52

DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

53

configurerProvider.ifAvailable(configurer -> configurer.configure(factory, connectionFactory));

54

return factory;

55

}

56

}

57

58

/**

59

* JMS configuration properties

60

*/

61

@ConfigurationProperties(prefix = "spring.jms")

62

public class JmsProperties {

63

64

/**

65

* Whether the default destination type is topic

66

*/

67

private boolean pubSubDomain = false;

68

69

/**

70

* JMS template configuration

71

*/

72

private final Template template = new Template();

73

74

/**

75

* JMS listener configuration

76

*/

77

private final Listener listener = new Listener();

78

79

public boolean isPubSubDomain() { return this.pubSubDomain; }

80

public void setPubSubDomain(boolean pubSubDomain) { this.pubSubDomain = pubSubDomain; }

81

public Template getTemplate() { return this.template; }

82

public Listener getListener() { return this.listener; }

83

84

/**

85

* JMS template configuration

86

*/

87

public static class Template {

88

/**

89

* Default destination to use for send operations

90

*/

91

private String defaultDestination;

92

93

/**

94

* Delivery delay to use for send calls

95

*/

96

private Duration deliveryDelay;

97

98

/**

99

* Delivery mode for messages

100

*/

101

private DeliveryMode deliveryMode;

102

103

/**

104

* Priority of a message when sending

105

*/

106

private Integer priority;

107

108

/**

109

* Time-to-live of a message when sending

110

*/

111

private Duration timeToLive;

112

113

/**

114

* Timeout to use for receive calls

115

*/

116

private Duration receiveTimeout;

117

118

// Getters and setters

119

public String getDefaultDestination() { return this.defaultDestination; }

120

public void setDefaultDestination(String defaultDestination) { this.defaultDestination = defaultDestination; }

121

public Duration getReceiveTimeout() { return this.receiveTimeout; }

122

public void setReceiveTimeout(Duration receiveTimeout) { this.receiveTimeout = receiveTimeout; }

123

124

public enum DeliveryMode {

125

NON_PERSISTENT(1), PERSISTENT(2);

126

127

private final int value;

128

129

DeliveryMode(int value) { this.value = value; }

130

public int getValue() { return this.value; }

131

}

132

}

133

134

/**

135

* JMS listener configuration

136

*/

137

public static class Listener {

138

/**

139

* Start the container automatically on startup

140

*/

141

private Boolean autoStartup;

142

143

/**

144

* Acknowledge mode of the container

145

*/

146

private AcknowledgeMode acknowledgeMode;

147

148

/**

149

* Minimum number of concurrent consumers

150

*/

151

private Integer concurrency;

152

153

/**

154

* Maximum number of concurrent consumers

155

*/

156

private Integer maxConcurrency;

157

158

// Getters and setters

159

public AcknowledgeMode getAcknowledgeMode() { return this.acknowledgeMode; }

160

public void setAcknowledgeMode(AcknowledgeMode acknowledgeMode) { this.acknowledgeMode = acknowledgeMode; }

161

162

public enum AcknowledgeMode {

163

AUTO(1), CLIENT(2), DUPS_OK(3);

164

165

private final int mode;

166

167

AcknowledgeMode(int mode) { this.mode = mode; }

168

public int getMode() { return this.mode; }

169

}

170

}

171

}

172

```

173

174

### RabbitMQ AMQP Configuration

175

176

Auto-configuration for RabbitMQ Advanced Message Queuing Protocol (AMQP).

177

178

```java { .api }

179

/**

180

* Auto-configuration for RabbitMQ

181

* Configures RabbitMQ connection factory, template, and admin

182

*/

183

@AutoConfiguration

184

@ConditionalOnClass({RabbitTemplate.class, Channel.class})

185

@EnableConfigurationProperties(RabbitProperties.class)

186

public class RabbitAutoConfiguration {

187

188

/**

189

* RabbitMQ connection factory

190

*/

191

@Bean

192

@ConditionalOnMissingBean

193

public CachingConnectionFactory rabbitConnectionFactory(

194

RabbitProperties properties,

195

ObjectProvider<CredentialsProvider> credentialsProvider,

196

ObjectProvider<CredentialsRefreshService> credentialsRefreshService,

197

ObjectProvider<ConnectionNameStrategy> connectionNameStrategy,

198

ObjectProvider<ConnectionFactoryCustomizer> customizers) throws Exception {

199

200

CachingConnectionFactory factory = new CachingConnectionFactory(

201

getRabbitConnectionFactoryBean(properties).getObject());

202

factory.setAddresses(properties.determineAddresses());

203

factory.setPublisherConfirmType(properties.getPublisherConfirmType());

204

factory.setPublisherReturns(properties.isPublisherReturns());

205

206

RabbitProperties.Cache.Channel channel = properties.getCache().getChannel();

207

factory.setChannelCacheSize(channel.getSize());

208

factory.setChannelCheckoutTimeout(channel.getCheckoutTimeout().toMillis());

209

210

RabbitProperties.Cache.Connection connection = properties.getCache().getConnection();

211

factory.setCacheMode(connection.getMode());

212

if (connection.getMode() == CachingConnectionFactory.CacheMode.CONNECTION) {

213

factory.setConnectionCacheSize(connection.getSize());

214

}

215

216

return factory;

217

}

218

219

/**

220

* RabbitMQ template for message operations

221

*/

222

@Bean

223

@ConditionalOnSingleCandidate(ConnectionFactory.class)

224

@ConditionalOnMissingBean

225

public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,

226

ObjectProvider<MessageConverter> messageConverter,

227

ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers,

228

ObjectProvider<RabbitTemplateCustomizer> templateCustomizers,

229

RabbitProperties properties) {

230

RabbitTemplate template = new RabbitTemplate(connectionFactory);

231

messageConverter.ifUnique(template::setMessageConverter);

232

template.setMandatory(determineMandatoryFlag(properties));

233

234

RabbitProperties.Template templateProperties = properties.getTemplate();

235

if (templateProperties.getRetry().isEnabled()) {

236

template.setRetryTemplate(createRetryTemplate(templateProperties.getRetry(), retryTemplateCustomizers));

237

}

238

if (templateProperties.getReceiveTimeout() != null) {

239

template.setReceiveTimeout(templateProperties.getReceiveTimeout().toMillis());

240

}

241

if (templateProperties.getReplyTimeout() != null) {

242

template.setReplyTimeout(templateProperties.getReplyTimeout().toMillis());

243

}

244

245

templateCustomizers.orderedStream().forEach(customizer -> customizer.customize(template));

246

return template;

247

}

248

249

/**

250

* RabbitMQ admin for managing queues, exchanges, and bindings

251

*/

252

@Bean

253

@ConditionalOnSingleCandidate(ConnectionFactory.class)

254

@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)

255

@ConditionalOnMissingBean

256

public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {

257

return new RabbitAdmin(connectionFactory);

258

}

259

}

260

261

/**

262

* RabbitMQ configuration properties

263

*/

264

@ConfigurationProperties(prefix = "spring.rabbitmq")

265

public class RabbitProperties {

266

267

/**

268

* RabbitMQ host

269

*/

270

private String host = "localhost";

271

272

/**

273

* RabbitMQ port

274

*/

275

private int port = 5672;

276

277

/**

278

* Login username to authenticate to the broker

279

*/

280

private String username = "guest";

281

282

/**

283

* Login password to authenticate to the broker

284

*/

285

private String password = "guest";

286

287

/**

288

* SSL configuration

289

*/

290

private final Ssl ssl = new Ssl();

291

292

/**

293

* Virtual host to use when connecting to the broker

294

*/

295

private String virtualHost;

296

297

/**

298

* Comma-separated list of addresses to which the client should connect

299

*/

300

private String addresses;

301

302

/**

303

* Requested heartbeat timeout

304

*/

305

private Duration requestedHeartbeat;

306

307

/**

308

* Whether to enable publisher confirmations

309

*/

310

private CachingConnectionFactory.ConfirmType publisherConfirmType;

311

312

/**

313

* Whether to enable publisher returns

314

*/

315

private boolean publisherReturns;

316

317

/**

318

* Connection timeout

319

*/

320

private Duration connectionTimeout;

321

322

/**

323

* Cache configuration

324

*/

325

private final Cache cache = new Cache();

326

327

/**

328

* Listener configuration

329

*/

330

private final Listener listener = new Listener();

331

332

/**

333

* Template configuration

334

*/

335

private final Template template = new Template();

336

337

// Getters and setters

338

public String getHost() { return this.host; }

339

public void setHost(String host) { this.host = host; }

340

public int getPort() { return this.port; }

341

public void setPort(int port) { this.port = port; }

342

343

/**

344

* SSL configuration for secure connections

345

*/

346

public static class Ssl {

347

/**

348

* Whether to enable SSL support

349

*/

350

private boolean enabled;

351

352

/**

353

* Path to the key store that holds the SSL certificate

354

*/

355

private String keyStore;

356

357

/**

358

* Key store type

359

*/

360

private String keyStoreType = "PKCS12";

361

362

/**

363

* Password used to access the key store

364

*/

365

private String keyStorePassword;

366

367

/**

368

* Path to the trust store that holds SSL certificates

369

*/

370

private String trustStore;

371

372

/**

373

* Trust store type

374

*/

375

private String trustStoreType = "JKS";

376

377

/**

378

* Password used to access the trust store

379

*/

380

private String trustStorePassword;

381

382

/**

383

* SSL algorithm to use

384

*/

385

private String algorithm;

386

387

/**

388

* Whether to validate the server certificate

389

*/

390

private boolean validateServerCertificate = true;

391

392

/**

393

* Whether to verify the hostname

394

*/

395

private boolean verifyHostname = true;

396

397

// Getters and setters

398

public boolean isEnabled() { return this.enabled; }

399

public void setEnabled(boolean enabled) { this.enabled = enabled; }

400

}

401

}

402

```

403

404

### Apache Kafka Configuration

405

406

Auto-configuration for Apache Kafka messaging system.

407

408

```java { .api }

409

/**

410

* Auto-configuration for Apache Kafka

411

* Configures Kafka producers, consumers, and admin clients

412

*/

413

@AutoConfiguration

414

@ConditionalOnClass(KafkaTemplate.class)

415

@EnableConfigurationProperties(KafkaProperties.class)

416

public class KafkaAutoConfiguration {

417

418

/**

419

* Kafka template for message operations

420

*/

421

@Bean

422

@ConditionalOnMissingBean(KafkaTemplate.class)

423

public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,

424

ProducerListener<Object, Object> kafkaProducerListener,

425

ObjectProvider<RecordMessageConverter> messageConverter) {

426

KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);

427

messageConverter.ifUnique(kafkaTemplate::setMessageConverter);

428

kafkaTemplate.setProducerListener(kafkaProducerListener);

429

kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());

430

return kafkaTemplate;

431

}

432

433

/**

434

* Kafka producer factory

435

*/

436

@Bean

437

@ConditionalOnMissingBean(ProducerFactory.class)

438

public ProducerFactory<?, ?> kafkaProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {

439

DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(

440

this.properties.buildProducerProperties());

441

String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();

442

if (transactionIdPrefix != null) {

443

factory.setTransactionIdPrefix(transactionIdPrefix);

444

}

445

customizers.orderedStream().forEach(customizer -> customizer.customize(factory));

446

return factory;

447

}

448

449

/**

450

* Kafka consumer factory

451

*/

452

@Bean

453

@ConditionalOnMissingBean(ConsumerFactory.class)

454

public ConsumerFactory<?, ?> kafkaConsumerFactory(ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {

455

DefaultKafkaConsumerFactory<?, ?> factory = new DefaultKafkaConsumerFactory<>(

456

this.properties.buildConsumerProperties());

457

customizers.orderedStream().forEach(customizer -> customizer.customize(factory));

458

return factory;

459

}

460

461

/**

462

* Kafka admin client

463

*/

464

@Bean

465

@ConditionalOnMissingBean(KafkaAdmin.class)

466

public KafkaAdmin kafkaAdmin() {

467

KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());

468

kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());

469

kafkaAdmin.setModifyTopicConfigs(this.properties.getAdmin().isModifyTopicConfigs());

470

return kafkaAdmin;

471

}

472

}

473

474

/**

475

* Kafka configuration properties

476

*/

477

@ConfigurationProperties(prefix = "spring.kafka")

478

public class KafkaProperties {

479

480

/**

481

* Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster

482

*/

483

private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));

484

485

/**

486

* ID to pass to the server when making requests

487

*/

488

private String clientId;

489

490

/**

491

* Additional properties, common to producers and consumers, used to configure the client

492

*/

493

private final Map<String, String> properties = new HashMap<>();

494

495

/**

496

* Producer configuration

497

*/

498

private final Producer producer = new Producer();

499

500

/**

501

* Consumer configuration

502

*/

503

private final Consumer consumer = new Consumer();

504

505

/**

506

* Admin client configuration

507

*/

508

private final Admin admin = new Admin();

509

510

/**

511

* Streams configuration

512

*/

513

private final Streams streams = new Streams();

514

515

/**

516

* Template configuration

517

*/

518

private final Template template = new Template();

519

520

/**

521

* Security configuration

522

*/

523

private final Security security = new Security();

524

525

// Getters and setters

526

public List<String> getBootstrapServers() { return this.bootstrapServers; }

527

public void setBootstrapServers(List<String> bootstrapServers) { this.bootstrapServers = bootstrapServers; }

528

529

/**

530

* Kafka producer configuration

531

*/

532

public static class Producer {

533

/**

534

* Default topic to send messages to

535

*/

536

private String transactionIdPrefix;

537

538

/**

539

* Number of acknowledgments the producer requires the leader to have received

540

*/

541

private String acks = "1";

542

543

/**

544

* Default batch size in bytes

545

*/

546

private Integer batchSize;

547

548

/**

549

* Total bytes of memory the producer can use to buffer records

550

*/

551

private Long bufferMemory;

552

553

/**

554

* Compression type for all data generated by the producer

555

*/

556

private String compressionType;

557

558

/**

559

* Key serializer class

560

*/

561

private Class<?> keySerializer = StringSerializer.class;

562

563

/**

564

* Value serializer class

565

*/

566

private Class<?> valueSerializer = StringSerializer.class;

567

568

// Getters and setters

569

public String getAcks() { return this.acks; }

570

public void setAcks(String acks) { this.acks = acks; }

571

public Class<?> getKeySerializer() { return this.keySerializer; }

572

public void setKeySerializer(Class<?> keySerializer) { this.keySerializer = keySerializer; }

573

}

574

575

/**

576

* Kafka consumer configuration

577

*/

578

public static class Consumer {

579

/**

580

* Unique string that identifies the consumer group this consumer belongs to

581

*/

582

private String groupId;

583

584

/**

585

* What to do when there is no initial offset in Kafka

586

*/

587

private String autoOffsetReset = "latest";

588

589

/**

590

* Whether the consumer's offset is periodically committed in the background

591

*/

592

private Boolean enableAutoCommit;

593

594

/**

595

* Frequency with which the consumer offsets are auto-committed to Kafka

596

*/

597

private Duration autoCommitInterval;

598

599

/**

600

* Key deserializer class

601

*/

602

private Class<?> keyDeserializer = StringDeserializer.class;

603

604

/**

605

* Value deserializer class

606

*/

607

private Class<?> valueDeserializer = StringDeserializer.class;

608

609

/**

610

* Maximum number of records returned in a single call to poll()

611

*/

612

private Integer maxPollRecords;

613

614

// Getters and setters

615

public String getGroupId() { return this.groupId; }

616

public void setGroupId(String groupId) { this.groupId = groupId; }

617

public Class<?> getKeyDeserializer() { return this.keyDeserializer; }

618

public void setKeyDeserializer(Class<?> keyDeserializer) { this.keyDeserializer = keyDeserializer; }

619

}

620

}

621

```

622

623

**Usage Examples:**

624

625

```java

626

// JMS message producer

627

@Component

628

public class OrderMessageProducer {

629

630

private final JmsTemplate jmsTemplate;

631

632

public OrderMessageProducer(JmsTemplate jmsTemplate) {

633

this.jmsTemplate = jmsTemplate;

634

}

635

636

public void sendOrderMessage(Order order) {

637

jmsTemplate.convertAndSend("order.queue", order);

638

}

639

}

640

641

// JMS message listener

642

@Component

643

public class OrderMessageListener {

644

645

@JmsListener(destination = "order.queue")

646

public void handleOrderMessage(Order order) {

647

System.out.println("Received order: " + order.getId());

648

// Process order

649

}

650

}

651

652

// Kafka producer

653

@Service

654

public class EventProducer {

655

656

private final KafkaTemplate<String, Object> kafkaTemplate;

657

658

public EventProducer(KafkaTemplate<String, Object> kafkaTemplate) {

659

this.kafkaTemplate = kafkaTemplate;

660

}

661

662

public void publishEvent(String topic, String key, Object event) {

663

kafkaTemplate.send(topic, key, event);

664

}

665

}

666

667

// Kafka consumer

668

@Component

669

public class EventConsumer {

670

671

@KafkaListener(topics = "user-events", groupId = "user-service")

672

public void handleUserEvent(UserEvent event) {

673

System.out.println("Processing user event: " + event.getType());

674

// Process event

675

}

676

}

677

678

// Properties configuration

679

# application.properties

680

# JMS

681

spring.jms.pub-sub-domain=false

682

spring.jms.template.default-destination=default.queue

683

spring.jms.template.delivery-mode=persistent

684

685

# Kafka

686

spring.kafka.bootstrap-servers=localhost:9092

687

spring.kafka.consumer.group-id=my-group

688

spring.kafka.consumer.auto-offset-reset=earliest

689

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

690

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

691

692

# RabbitMQ

693

spring.rabbitmq.host=localhost

694

spring.rabbitmq.port=5672

695

spring.rabbitmq.username=guest

696

spring.rabbitmq.password=guest

697

spring.rabbitmq.virtual-host=/

698

```

699

700

## Types

701

702

### Messaging Configuration Types

703

704

```java { .api }

705

/**

706

* Message converter for different formats

707

*/

708

public interface MessageConverter {

709

/**

710

* Convert Java object to Message

711

* @param object the object to convert

712

* @param session the JMS session

713

* @return converted Message

714

*/

715

Message toMessage(Object object, Session session) throws JMSException, MessageConversionException;

716

717

/**

718

* Convert Message to Java object

719

* @param message the message to convert

720

* @return converted object

721

*/

722

Object fromMessage(Message message) throws JMSException, MessageConversionException;

723

}

724

725

/**

726

* Customizer for Kafka producer factories

727

*/

728

@FunctionalInterface

729

public interface DefaultKafkaProducerFactoryCustomizer {

730

/**

731

* Customize the Kafka producer factory

732

* @param producerFactory the producer factory to customize

733

*/

734

void customize(DefaultKafkaProducerFactory<?, ?> producerFactory);

735

}

736

737

/**

738

* Customizer for Kafka consumer factories

739

*/

740

@FunctionalInterface

741

public interface DefaultKafkaConsumerFactoryCustomizer {

742

/**

743

* Customize the Kafka consumer factory

744

* @param consumerFactory the consumer factory to customize

745

*/

746

void customize(DefaultKafkaConsumerFactory<?, ?> consumerFactory);

747

}

748

749

/**

750

* Customizer for RabbitMQ templates

751

*/

752

@FunctionalInterface

753

public interface RabbitTemplateCustomizer {

754

/**

755

* Customize the RabbitMQ template

756

* @param rabbitTemplate the template to customize

757

*/

758

void customize(RabbitTemplate rabbitTemplate);

759

}

760

```