or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aop.mdcore-container.mddata-access.mdindex.mdintegration.mdmessaging.mdreactive-web.mdtesting.mdweb-framework.md

messaging.mddocs/

0

# Messaging & JMS

1

2

Spring provides comprehensive support for message-driven applications through the Spring Messaging abstraction and JMS (Java Message Service) integration. This includes support for both synchronous and asynchronous messaging patterns with various message brokers.

3

4

## Maven Dependencies

5

6

```xml

7

<!-- Spring Messaging (Core abstractions) -->

8

<dependency>

9

<groupId>org.springframework</groupId>

10

<artifactId>spring-messaging</artifactId>

11

<version>5.3.39</version>

12

</dependency>

13

14

<!-- Spring JMS -->

15

<dependency>

16

<groupId>org.springframework</groupId>

17

<artifactId>spring-jms</artifactId>

18

<version>5.3.39</version>

19

</dependency>

20

21

<!-- JMS API -->

22

<dependency>

23

<groupId>javax.jms</groupId>

24

<artifactId>javax.jms-api</artifactId>

25

<version>2.0.1</version>

26

</dependency>

27

28

<!-- ActiveMQ (example JMS broker) -->

29

<dependency>

30

<groupId>org.apache.activemq</groupId>

31

<artifactId>activemq-broker</artifactId>

32

<version>5.17.6</version>

33

</dependency>

34

35

<!-- RabbitMQ support -->

36

<dependency>

37

<groupId>org.springframework.amqp</groupId>

38

<artifactId>spring-rabbit</artifactId>

39

<version>2.4.16</version>

40

</dependency>

41

```

42

43

## Core Imports

44

45

```java { .api }

46

// Core messaging abstractions

47

import org.springframework.messaging.Message;

48

import org.springframework.messaging.MessageHeaders;

49

import org.springframework.messaging.MessageChannel;

50

import org.springframework.messaging.MessagingException;

51

import org.springframework.messaging.PollableChannel;

52

import org.springframework.messaging.SubscribableChannel;

53

54

// Message handling

55

import org.springframework.messaging.MessageHandler;

56

import org.springframework.messaging.handler.annotation.MessageMapping;

57

import org.springframework.messaging.handler.annotation.Payload;

58

import org.springframework.messaging.handler.annotation.Header;

59

import org.springframework.messaging.handler.annotation.Headers;

60

61

// JMS Core

62

import org.springframework.jms.core.JmsTemplate;

63

import org.springframework.jms.core.MessageCreator;

64

import org.springframework.jms.core.MessagePostProcessor;

65

import org.springframework.jms.core.SessionCallback;

66

67

// JMS Annotations

68

import org.springframework.jms.annotation.JmsListener;

69

import org.springframework.jms.annotation.EnableJms;

70

71

// JMS Configuration

72

import org.springframework.jms.config.DefaultJmsListenerContainerFactory;

73

import org.springframework.jms.config.JmsListenerContainerFactory;

74

import org.springframework.jms.listener.DefaultMessageListenerContainer;

75

76

// Connection Factory

77

import javax.jms.ConnectionFactory;

78

import org.springframework.jms.connection.CachingConnectionFactory;

79

80

// Message conversion

81

import org.springframework.jms.support.converter.MessageConverter;

82

import org.springframework.jms.support.converter.MappingJackson2MessageConverter;

83

```

84

85

## Core Messaging Abstractions

86

87

### Message Interface

88

89

```java { .api }

90

// Generic message interface

91

public interface Message<T> {

92

93

T getPayload();

94

95

MessageHeaders getHeaders();

96

}

97

98

// Message headers container

99

public final class MessageHeaders implements Map<String, Object>, Serializable {

100

101

public static final String ID = "id";

102

public static final String TIMESTAMP = "timestamp";

103

public static final String CONTENT_TYPE = "contentType";

104

public static final String REPLY_CHANNEL = "replyChannel";

105

public static final String ERROR_CHANNEL = "errorChannel";

106

107

public MessageHeaders(Map<String, Object> headers);

108

public MessageHeaders(Map<String, Object> headers, UUID id, Long timestamp);

109

110

public UUID getId();

111

public Long getTimestamp();

112

public Object getReplyChannel();

113

public Object getErrorChannel();

114

115

@Override

116

public Object get(Object key);

117

118

@Override

119

public boolean containsKey(Object key);

120

121

public <T> T get(Object key, Class<T> type);

122

}

123

124

// Message builder

125

public final class MessageBuilder<T> {

126

127

public static <T> MessageBuilder<T> withPayload(T payload);

128

public static <T> MessageBuilder<T> fromMessage(Message<T> message);

129

130

public MessageBuilder<T> setHeader(String headerName, Object headerValue);

131

public MessageBuilder<T> setHeaderIfAbsent(String headerName, Object headerValue);

132

public MessageBuilder<T> removeHeaders(String... headerPatterns);

133

public MessageBuilder<T> removeHeader(String headerName);

134

public MessageBuilder<T> copyHeaders(Map<String, ?> headersToCopy);

135

public MessageBuilder<T> copyHeadersIfAbsent(Map<String, ?> headersToCopy);

136

137

public Message<T> build();

138

}

139

```

140

141

### Message Channels

142

143

```java { .api }

144

// Base interface for message channels

145

public interface MessageChannel {

146

147

long INDEFINITE_TIMEOUT = -1;

148

149

default boolean send(Message<?> message) {

150

return send(message, INDEFINITE_TIMEOUT);

151

}

152

153

boolean send(Message<?> message, long timeout);

154

}

155

156

// Channel that can be polled for messages

157

public interface PollableChannel extends MessageChannel {

158

159

Message<?> receive();

160

161

Message<?> receive(long timeout);

162

}

163

164

// Channel that supports message handler subscription

165

public interface SubscribableChannel extends MessageChannel {

166

167

boolean subscribe(MessageHandler handler);

168

169

boolean unsubscribe(MessageHandler handler);

170

}

171

172

// Exception thrown on messaging errors

173

public class MessagingException extends RuntimeException {

174

175

public MessagingException(String description);

176

public MessagingException(String description, Throwable cause);

177

public MessagingException(Message<?> failedMessage, String description);

178

public MessagingException(Message<?> failedMessage, String description, Throwable cause);

179

180

public Message<?> getFailedMessage();

181

}

182

```

183

184

### Message Handling

185

186

```java { .api }

187

// Interface for handling messages

188

@FunctionalInterface

189

public interface MessageHandler {

190

191

void handleMessage(Message<?> message) throws MessagingException;

192

}

193

194

// Annotation for mapping messages to handler methods

195

@Target({ElementType.TYPE, ElementType.METHOD})

196

@Retention(RetentionPolicy.RUNTIME)

197

@Documented

198

public @interface MessageMapping {

199

200

@AliasFor("value")

201

String[] destination() default {};

202

203

@AliasFor("destination")

204

String[] value() default {};

205

}

206

207

// Annotation to bind method parameter to message payload

208

@Target(ElementType.PARAMETER)

209

@Retention(RetentionPolicy.RUNTIME)

210

@Documented

211

public @interface Payload {

212

213

String value() default "";

214

215

boolean required() default true;

216

}

217

218

// Annotation to bind method parameter to header value

219

@Target(ElementType.PARAMETER)

220

@Retention(RetentionPolicy.RUNTIME)

221

@Documented

222

public @interface Header {

223

224

@AliasFor("name")

225

String value() default "";

226

227

@AliasFor("value")

228

String name() default "";

229

230

boolean required() default true;

231

232

String defaultValue() default ValueConstants.DEFAULT_NONE;

233

}

234

235

// Annotation to bind method parameter to all message headers

236

@Target(ElementType.PARAMETER)

237

@Retention(RetentionPolicy.RUNTIME)

238

@Documented

239

public @interface Headers {

240

}

241

```

242

243

## JMS Support

244

245

### JmsTemplate

246

247

```java { .api }

248

// Central class for JMS operations

249

public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations {

250

251

// Send operations

252

public void send(String destinationName, MessageCreator messageCreator) throws JmsException;

253

public void send(Destination destination, MessageCreator messageCreator) throws JmsException;

254

public void send(MessageCreator messageCreator) throws JmsException;

255

256

// Convert and send operations

257

public void convertAndSend(String destinationName, Object message) throws JmsException;

258

public void convertAndSend(Destination destination, Object message) throws JmsException;

259

public void convertAndSend(Object message) throws JmsException;

260

public void convertAndSend(String destinationName, Object message, MessagePostProcessor postProcessor) throws JmsException;

261

262

// Receive operations

263

public Message receive() throws JmsException;

264

public Message receive(String destinationName) throws JmsException;

265

public Message receive(Destination destination) throws JmsException;

266

267

// Receive and convert operations

268

public Object receiveAndConvert() throws JmsException;

269

public Object receiveAndConvert(String destinationName) throws JmsException;

270

public Object receiveAndConvert(Destination destination) throws JmsException;

271

272

// Browse operations

273

public <T> T browse(String queueName, BrowserCallback<T> action) throws JmsException;

274

public <T> T browse(Queue queue, BrowserCallback<T> action) throws JmsException;

275

276

// Execute operations

277

public <T> T execute(SessionCallback<T> action) throws JmsException;

278

public <T> T execute(ProducerCallback<T> action) throws JmsException;

279

280

// Configuration

281

public void setConnectionFactory(ConnectionFactory connectionFactory);

282

public void setDefaultDestinationName(String defaultDestinationName);

283

public void setDefaultDestination(Destination defaultDestination);

284

public void setMessageConverter(MessageConverter messageConverter);

285

public void setPubSubDomain(boolean pubSubDomain);

286

public void setReceiveTimeout(long receiveTimeout);

287

public void setDeliveryMode(int deliveryMode);

288

public void setPriority(int priority);

289

public void setTimeToLive(long timeToLive);

290

}

291

292

// Interface for creating JMS messages

293

@FunctionalInterface

294

public interface MessageCreator {

295

Message createMessage(Session session) throws JMSException;

296

}

297

298

// Interface for post-processing messages

299

@FunctionalInterface

300

public interface MessagePostProcessor {

301

Message postProcessMessage(Message message) throws JMSException;

302

}

303

304

// Callback interface for JMS Session operations

305

@FunctionalInterface

306

public interface SessionCallback<T> {

307

T doInJms(Session session) throws JMSException;

308

}

309

```

310

311

### JMS Annotations

312

313

```java { .api }

314

// Annotation to mark a method as a JMS message listener

315

@Target({ElementType.TYPE, ElementType.METHOD})

316

@Retention(RetentionPolicy.RUNTIME)

317

@Documented

318

@Repeatable(JmsListeners.class)

319

public @interface JmsListener {

320

321

String id() default "";

322

323

String containerFactory() default "";

324

325

@AliasFor("destination")

326

String[] value() default {};

327

328

@AliasFor("value")

329

String[] destination() default {};

330

331

String subscription() default "";

332

333

String selector() default "";

334

335

String concurrency() default "";

336

}

337

338

// Enable JMS listener annotated endpoints

339

@Target(ElementType.TYPE)

340

@Retention(RetentionPolicy.RUNTIME)

341

@Documented

342

@Import(JmsBootstrapConfiguration.class)

343

public @interface EnableJms {

344

}

345

346

// Annotation to send a message as reply

347

@Target({ElementType.TYPE, ElementType.METHOD})

348

@Retention(RetentionPolicy.RUNTIME)

349

@Documented

350

public @interface SendTo {

351

352

@AliasFor("destinations")

353

String[] value() default {};

354

355

@AliasFor("value")

356

String[] destinations() default {};

357

}

358

```

359

360

### JMS Configuration

361

362

```java { .api }

363

// Factory for creating JMS listener containers

364

public interface JmsListenerContainerFactory<C extends MessageListenerContainer> {

365

366

C createListenerContainer(JmsListenerEndpoint endpoint);

367

}

368

369

// Default implementation of JmsListenerContainerFactory

370

public class DefaultJmsListenerContainerFactory

371

implements JmsListenerContainerFactory<DefaultMessageListenerContainer>, BeanNameAware, InitializingBean {

372

373

public void setConnectionFactory(ConnectionFactory connectionFactory);

374

public void setDestinationResolver(DestinationResolver destinationResolver);

375

public void setMessageConverter(MessageConverter messageConverter);

376

public void setPubSubDomain(Boolean pubSubDomain);

377

public void setSubscriptionDurable(Boolean subscriptionDurable);

378

public void setClientId(String clientId);

379

public void setConcurrency(String concurrency);

380

public void setMaxConcurrency(Integer maxConcurrency);

381

public void setCacheLevel(Integer cacheLevel);

382

public void setReceiveTimeout(Long receiveTimeout);

383

public void setAutoStartup(Boolean autoStartup);

384

public void setPhase(Integer phase);

385

386

@Override

387

public DefaultMessageListenerContainer createListenerContainer(JmsListenerEndpoint endpoint);

388

}

389

390

// Message listener container for JMS

391

public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer

392

implements BeanNameAware, DisposableBean {

393

394

// Concurrency settings

395

public void setConcurrentConsumers(int concurrentConsumers);

396

public void setMaxConcurrentConsumers(int maxConcurrentConsumers);

397

public void setMaxMessagesPerTask(int maxMessagesPerTask);

398

399

// Connection settings

400

public void setCacheLevelName(String constantName) throws IllegalArgumentException;

401

public void setCacheLevel(int cacheLevel);

402

403

// Recovery settings

404

public void setRecoveryInterval(long recoveryInterval);

405

public void setBackOffMultiplier(double backOffMultiplier);

406

public void setMaxRecoveryTime(long maxRecoveryTime);

407

408

// Transaction settings

409

public void setTransactionManager(PlatformTransactionManager transactionManager);

410

public void setTransactionName(String transactionName);

411

public void setTransactionTimeout(int transactionTimeout);

412

}

413

```

414

415

### Message Conversion

416

417

```java { .api }

418

// Strategy interface for converting between Java objects and JMS messages

419

public interface MessageConverter {

420

421

Message toMessage(Object object, Session session) throws JMSException, MessageConversionException;

422

423

Object fromMessage(Message message) throws JMSException, MessageConversionException;

424

}

425

426

// Base implementation of MessageConverter

427

public abstract class MessageConverterSupport implements MessageConverter {

428

429

public static final String TYPE_ID_PROPERTY = "__TypeId__";

430

public static final String CONTENT_TYPE_PROPERTY = "__ContentTypeId__";

431

public static final String KEY_TYPE_ID_PROPERTY = "__KeyTypeId__";

432

433

protected abstract Message createMessageForByteArray(byte[] bytes, Session session) throws JMSException;

434

protected abstract Message createMessageForString(String string, Session session) throws JMSException;

435

protected abstract Message createMessageForMap(Map<String, Object> map, Session session) throws JMSException;

436

protected abstract Message createMessageForSerializable(Serializable object, Session session) throws JMSException;

437

438

protected abstract byte[] extractByteArrayFromMessage(Message message) throws JMSException;

439

protected abstract String extractStringFromMessage(Message message) throws JMSException;

440

protected abstract Map<String, Object> extractMapFromMessage(Message message) throws JMSException;

441

protected abstract Serializable extractSerializableFromMessage(Message message) throws JMSException;

442

}

443

444

// JSON message converter using Jackson

445

public class MappingJackson2MessageConverter extends MessageConverterSupport {

446

447

public MappingJackson2MessageConverter();

448

449

public void setObjectMapper(ObjectMapper objectMapper);

450

public void setTargetType(MessageType targetType);

451

public void setTypeIdPropertyName(String typeIdPropertyName);

452

public void setTypeIdMappings(Map<String, Class<?>> typeIdMappings);

453

454

// Message type enum

455

public enum MessageType {

456

BYTES, TEXT

457

}

458

}

459

```

460

461

## Practical Usage Examples

462

463

### Basic JMS Configuration

464

465

```java { .api }

466

@Configuration

467

@EnableJms

468

public class JmsConfig {

469

470

@Bean

471

public ConnectionFactory connectionFactory() {

472

// Using ActiveMQ

473

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();

474

connectionFactory.setBrokerURL("tcp://localhost:61616");

475

connectionFactory.setUserName("admin");

476

connectionFactory.setPassword("admin");

477

478

// Wrap with caching connection factory for better performance

479

CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory);

480

cachingConnectionFactory.setCacheConsumers(true);

481

cachingConnectionFactory.setCacheProducers(true);

482

cachingConnectionFactory.setSessionCacheSize(10);

483

484

return cachingConnectionFactory;

485

}

486

487

@Bean

488

public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {

489

JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);

490

jmsTemplate.setMessageConverter(messageConverter());

491

jmsTemplate.setReceiveTimeout(5000); // 5 seconds

492

jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);

493

jmsTemplate.setPriority(Message.DEFAULT_PRIORITY);

494

jmsTemplate.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);

495

return jmsTemplate;

496

}

497

498

@Bean

499

public MessageConverter messageConverter() {

500

MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();

501

converter.setTargetType(MessageType.TEXT);

502

converter.setTypeIdPropertyName("_type");

503

504

// Configure type mappings

505

Map<String, Class<?>> typeIdMappings = new HashMap<>();

506

typeIdMappings.put("user", UserMessage.class);

507

typeIdMappings.put("order", OrderMessage.class);

508

converter.setTypeIdMappings(typeIdMappings);

509

510

return converter;

511

}

512

513

@Bean

514

public JmsListenerContainerFactory<?> jmsListenerContainerFactory(ConnectionFactory connectionFactory) {

515

DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

516

factory.setConnectionFactory(connectionFactory);

517

factory.setMessageConverter(messageConverter());

518

factory.setConcurrency("3-10"); // Min 3, max 10 consumers

519

factory.setReceiveTimeout(5000L);

520

factory.setAutoStartup(true);

521

522

// Error handling

523

factory.setErrorHandler(t -> {

524

System.err.println("Error in JMS listener: " + t.getMessage());

525

// Log error or send to dead letter queue

526

});

527

528

return factory;

529

}

530

531

// Topic listener factory for pub/sub

532

@Bean

533

public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) {

534

DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

535

factory.setConnectionFactory(connectionFactory);

536

factory.setMessageConverter(messageConverter());

537

factory.setPubSubDomain(true); // Enable topic mode

538

factory.setSubscriptionDurable(true);

539

factory.setClientId("myapp-client");

540

return factory;

541

}

542

543

// Dead letter queue configuration

544

@Bean

545

public Queue deadLetterQueue() {

546

return new ActiveMQQueue("DLQ.MyApp");

547

}

548

}

549

```

550

551

### Message Producer Service

552

553

```java { .api }

554

@Service

555

public class MessageProducerService {

556

557

private final JmsTemplate jmsTemplate;

558

559

public MessageProducerService(JmsTemplate jmsTemplate) {

560

this.jmsTemplate = jmsTemplate;

561

}

562

563

// Simple message sending

564

public void sendMessage(String destination, Object message) {

565

jmsTemplate.convertAndSend(destination, message);

566

}

567

568

// Send message with headers

569

public void sendMessageWithHeaders(String destination, Object message, Map<String, Object> headers) {

570

jmsTemplate.convertAndSend(destination, message, messagePostProcessor -> {

571

headers.forEach((key, value) -> {

572

try {

573

messagePostProcessor.setObjectProperty(key, value);

574

} catch (JMSException e) {

575

throw new RuntimeException("Failed to set header: " + key, e);

576

}

577

});

578

return messagePostProcessor;

579

});

580

}

581

582

// Send user notification

583

public void sendUserNotification(Long userId, String notificationType, String content) {

584

UserNotificationMessage notification = UserNotificationMessage.builder()

585

.userId(userId)

586

.type(notificationType)

587

.content(content)

588

.timestamp(LocalDateTime.now())

589

.build();

590

591

Map<String, Object> headers = new HashMap<>();

592

headers.put("userId", userId);

593

headers.put("notificationType", notificationType);

594

headers.put("priority", "HIGH");

595

596

sendMessageWithHeaders("user.notifications", notification, headers);

597

}

598

599

// Send order events

600

public void sendOrderCreatedEvent(Order order) {

601

OrderCreatedEvent event = OrderCreatedEvent.builder()

602

.orderId(order.getId())

603

.userId(order.getUserId())

604

.totalAmount(order.getTotalAmount())

605

.items(order.getItems())

606

.timestamp(LocalDateTime.now())

607

.build();

608

609

// Send to multiple destinations

610

jmsTemplate.convertAndSend("order.created", event);

611

jmsTemplate.convertAndSend("audit.events", event);

612

613

// Send to topic for pub/sub

614

jmsTemplate.setPubSubDomain(true);

615

jmsTemplate.convertAndSend("order.events", event);

616

jmsTemplate.setPubSubDomain(false); // Reset to queue mode

617

}

618

619

// Request-reply pattern

620

public String sendRequestReply(String destination, Object request) {

621

return (String) jmsTemplate.sendAndReceive(destination, session -> {

622

ObjectMessage message = session.createObjectMessage((Serializable) request);

623

message.setJMSReplyTo(session.createTemporaryQueue());

624

return message;

625

});

626

}

627

628

// Delayed message sending

629

public void sendDelayedMessage(String destination, Object message, Duration delay) {

630

jmsTemplate.convertAndSend(destination, message, messagePostProcessor -> {

631

try {

632

// ActiveMQ specific - set delivery delay

633

messagePostProcessor.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,

634

delay.toMillis());

635

} catch (JMSException e) {

636

throw new RuntimeException("Failed to set delay", e);

637

}

638

return messagePostProcessor;

639

});

640

}

641

642

// Priority message sending

643

public void sendPriorityMessage(String destination, Object message, int priority) {

644

jmsTemplate.convertAndSend(destination, message, messagePostProcessor -> {

645

try {

646

messagePostProcessor.setJMSPriority(priority);

647

} catch (JMSException e) {

648

throw new RuntimeException("Failed to set priority", e);

649

}

650

return messagePostProcessor;

651

});

652

}

653

654

// Transactional message sending

655

@Transactional

656

public void sendTransactionalMessages(List<MessageRequest> requests) {

657

for (MessageRequest request : requests) {

658

jmsTemplate.convertAndSend(request.getDestination(), request.getPayload());

659

}

660

661

// If any exception occurs, all messages will be rolled back

662

if (requests.size() > 10) {

663

throw new IllegalArgumentException("Too many messages in single transaction");

664

}

665

}

666

}

667

```

668

669

### Message Listener Service

670

671

```java { .api }

672

@Component

673

public class MessageListenerService {

674

675

private static final Logger logger = LoggerFactory.getLogger(MessageListenerService.class);

676

677

private final UserService userService;

678

private final EmailService emailService;

679

private final AuditService auditService;

680

681

public MessageListenerService(UserService userService, EmailService emailService, AuditService auditService) {

682

this.userService = userService;

683

this.emailService = emailService;

684

this.auditService = auditService;

685

}

686

687

// Simple message listener

688

@JmsListener(destination = "user.notifications")

689

public void handleUserNotification(UserNotificationMessage notification) {

690

logger.info("Received user notification: {}", notification);

691

692

try {

693

User user = userService.findById(notification.getUserId());

694

emailService.sendNotificationEmail(user, notification.getContent());

695

} catch (Exception e) {

696

logger.error("Failed to process user notification", e);

697

throw new RuntimeException("Notification processing failed", e);

698

}

699

}

700

701

// Listener with message headers

702

@JmsListener(destination = "order.created")

703

public void handleOrderCreated(

704

@Payload OrderCreatedEvent event,

705

@Header("userId") Long userId,

706

@Header(value = "priority", defaultValue = "NORMAL") String priority) {

707

708

logger.info("Processing order created event for user {}: {}", userId, event);

709

710

// Process based on priority

711

if ("HIGH".equals(priority)) {

712

processHighPriorityOrder(event);

713

} else {

714

processNormalOrder(event);

715

}

716

}

717

718

// Listener with all headers

719

@JmsListener(destination = "audit.events")

720

public void handleAuditEvent(

721

@Payload Object event,

722

@Headers Map<String, Object> headers) {

723

724

AuditRecord audit = AuditRecord.builder()

725

.eventType(event.getClass().getSimpleName())

726

.payload(event)

727

.headers(headers)

728

.timestamp(LocalDateTime.now())

729

.build();

730

731

auditService.saveAuditRecord(audit);

732

}

733

734

// Topic listener with durable subscription

735

@JmsListener(

736

destination = "order.events",

737

containerFactory = "topicListenerFactory",

738

subscription = "orderEventSubscription"

739

)

740

public void handleOrderEvents(OrderEvent event) {

741

logger.info("Received order event: {}", event);

742

743

switch (event.getEventType()) {

744

case "ORDER_CREATED":

745

handleOrderCreatedFromTopic((OrderCreatedEvent) event);

746

break;

747

case "ORDER_UPDATED":

748

handleOrderUpdated((OrderUpdatedEvent) event);

749

break;

750

case "ORDER_CANCELLED":

751

handleOrderCancelled((OrderCancelledEvent) event);

752

break;

753

default:

754

logger.warn("Unknown order event type: {}", event.getEventType());

755

}

756

}

757

758

// Listener with message selector

759

@JmsListener(

760

destination = "notifications",

761

selector = "notificationType = 'EMAIL' OR priority = 'HIGH'"

762

)

763

public void handleHighPriorityNotifications(NotificationMessage notification) {

764

logger.info("Processing high priority notification: {}", notification);

765

// Process urgent notifications immediately

766

processUrgentNotification(notification);

767

}

768

769

// Listener with concurrency control

770

@JmsListener(

771

destination = "file.processing",

772

concurrency = "2-5" // Min 2, max 5 concurrent listeners

773

)

774

public void handleFileProcessing(FileProcessingRequest request) {

775

logger.info("Processing file: {}", request.getFilename());

776

777

try {

778

// Simulate file processing

779

Thread.sleep(5000);

780

processFile(request);

781

logger.info("File processing completed: {}", request.getFilename());

782

} catch (InterruptedException e) {

783

Thread.currentThread().interrupt();

784

throw new RuntimeException("File processing interrupted", e);

785

}

786

}

787

788

// Error handling listener

789

@JmsListener(destination = "error.queue")

790

public void handleErrors(

791

Message failedMessage,

792

@Header(JmsHeaders.DELIVERY_COUNT) int deliveryCount) {

793

794

logger.error("Processing failed message (attempt {}): {}", deliveryCount, failedMessage);

795

796

if (deliveryCount >= 3) {

797

// Move to dead letter queue after 3 attempts

798

sendToDeadLetterQueue(failedMessage);

799

} else {

800

// Retry processing

801

retryProcessing(failedMessage);

802

}

803

}

804

805

// Reply listener for request-response pattern

806

@JmsListener(destination = "user.lookup.request")

807

@SendTo("user.lookup.response")

808

public UserResponse handleUserLookupRequest(UserLookupRequest request) {

809

logger.info("Looking up user: {}", request.getUserId());

810

811

User user = userService.findById(request.getUserId());

812

813

return UserResponse.builder()

814

.userId(user.getId())

815

.username(user.getUsername())

816

.email(user.getEmail())

817

.found(true)

818

.build();

819

}

820

821

// Conditional listener (only active in certain profiles)

822

@JmsListener(

823

destination = "development.debug",

824

condition = "#{environment.acceptsProfiles('development')}"

825

)

826

public void handleDebugMessages(DebugMessage message) {

827

logger.debug("Debug message: {}", message);

828

// Only process in development environment

829

}

830

831

private void processHighPriorityOrder(OrderCreatedEvent event) {

832

// Expedited processing for VIP customers

833

logger.info("Processing high priority order: {}", event.getOrderId());

834

}

835

836

private void processNormalOrder(OrderCreatedEvent event) {

837

// Standard order processing

838

logger.info("Processing normal order: {}", event.getOrderId());

839

}

840

841

private void handleOrderCreatedFromTopic(OrderCreatedEvent event) {

842

// Update inventory, send confirmation email, etc.

843

logger.info("Handling order created from topic: {}", event.getOrderId());

844

}

845

846

private void handleOrderUpdated(OrderUpdatedEvent event) {

847

logger.info("Handling order updated: {}", event.getOrderId());

848

}

849

850

private void handleOrderCancelled(OrderCancelledEvent event) {

851

logger.info("Handling order cancelled: {}", event.getOrderId());

852

}

853

854

private void processUrgentNotification(NotificationMessage notification) {

855

// Immediate processing for urgent notifications

856

logger.info("Processing urgent notification: {}", notification.getId());

857

}

858

859

private void processFile(FileProcessingRequest request) {

860

// File processing logic

861

logger.info("Processing file: {}", request.getFilename());

862

}

863

864

private void sendToDeadLetterQueue(Message message) {

865

// Send to DLQ for manual inspection

866

logger.error("Sending message to dead letter queue: {}", message);

867

}

868

869

private void retryProcessing(Message message) {

870

// Retry logic

871

logger.info("Retrying message processing: {}", message);

872

}

873

}

874

```

875

876

### Advanced JMS Features

877

878

```java { .api }

879

// Custom message converter

880

@Component

881

public class CustomMessageConverter implements MessageConverter {

882

883

private final ObjectMapper objectMapper;

884

885

public CustomMessageConverter(ObjectMapper objectMapper) {

886

this.objectMapper = objectMapper;

887

}

888

889

@Override

890

public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {

891

try {

892

if (object instanceof String) {

893

return session.createTextMessage((String) object);

894

} else if (object instanceof byte[]) {

895

BytesMessage message = session.createBytesMessage();

896

message.writeBytes((byte[]) object);

897

return message;

898

} else {

899

// Convert to JSON

900

String json = objectMapper.writeValueAsString(object);

901

TextMessage message = session.createTextMessage(json);

902

message.setStringProperty("_type", object.getClass().getName());

903

return message;

904

}

905

} catch (Exception e) {

906

throw new MessageConversionException("Failed to convert message", e);

907

}

908

}

909

910

@Override

911

public Object fromMessage(Message message) throws JMSException, MessageConversionException {

912

try {

913

if (message instanceof TextMessage) {

914

TextMessage textMessage = (TextMessage) message;

915

String text = textMessage.getText();

916

917

// Check if it has type information

918

String typeProperty = message.getStringProperty("_type");

919

if (typeProperty != null) {

920

Class<?> targetClass = Class.forName(typeProperty);

921

return objectMapper.readValue(text, targetClass);

922

} else {

923

return text;

924

}

925

} else if (message instanceof BytesMessage) {

926

BytesMessage bytesMessage = (BytesMessage) message;

927

long bodyLength = bytesMessage.getBodyLength();

928

byte[] bytes = new byte[(int) bodyLength];

929

bytesMessage.readBytes(bytes);

930

return bytes;

931

} else {

932

throw new MessageConversionException("Unsupported message type: " + message.getClass());

933

}

934

} catch (Exception e) {

935

throw new MessageConversionException("Failed to convert message", e);

936

}

937

}

938

}

939

940

// JMS transaction manager configuration

941

@Configuration

942

public class JmsTransactionConfig {

943

944

@Bean

945

public PlatformTransactionManager jmsTransactionManager(ConnectionFactory connectionFactory) {

946

return new JmsTransactionManager(connectionFactory);

947

}

948

949

@Bean

950

public JmsTemplate transactionalJmsTemplate(ConnectionFactory connectionFactory) {

951

JmsTemplate template = new JmsTemplate(connectionFactory);

952

template.setSessionTransacted(true);

953

return template;

954

}

955

}

956

957

// Message-driven service with transactions

958

@Service

959

@Transactional

960

public class TransactionalMessageService {

961

962

private final JmsTemplate jmsTemplate;

963

private final UserRepository userRepository;

964

965

public TransactionalMessageService(JmsTemplate jmsTemplate, UserRepository userRepository) {

966

this.jmsTemplate = jmsTemplate;

967

this.userRepository = userRepository;

968

}

969

970

@JmsListener(destination = "user.updates")

971

@Transactional

972

public void handleUserUpdate(UserUpdateMessage message) {

973

// Update user in database

974

User user = userRepository.findById(message.getUserId())

975

.orElseThrow(() -> new UserNotFoundException("User not found: " + message.getUserId()));

976

977

user.setEmail(message.getNewEmail());

978

userRepository.save(user);

979

980

// Send confirmation message - both operations in same transaction

981

jmsTemplate.convertAndSend("user.update.confirmation",

982

UserUpdateConfirmation.builder()

983

.userId(message.getUserId())

984

.status("SUCCESS")

985

.timestamp(LocalDateTime.now())

986

.build());

987

}

988

989

@Transactional(rollbackFor = Exception.class)

990

public void processOrderWithRollback(OrderProcessingRequest request) {

991

try {

992

// Process order

993

processOrder(request);

994

995

// Send success notification

996

jmsTemplate.convertAndSend("order.processed",

997

OrderProcessedEvent.builder()

998

.orderId(request.getOrderId())

999

.status("PROCESSED")

1000

.build());

1001

1002

} catch (Exception e) {

1003

// Transaction will be rolled back, JMS message won't be sent

1004

throw new OrderProcessingException("Failed to process order", e);

1005

}

1006

}

1007

1008

private void processOrder(OrderProcessingRequest request) {

1009

// Order processing logic

1010

}

1011

}

1012

1013

// Message listener with retry and dead letter handling

1014

@Component

1015

public class RobustMessageListener {

1016

1017

private static final Logger logger = LoggerFactory.getLogger(RobustMessageListener.class);

1018

1019

@Retryable(

1020

value = {ProcessingException.class},

1021

maxAttempts = 3,

1022

backoff = @Backoff(delay = 1000, multiplier = 2)

1023

)

1024

@JmsListener(destination = "robust.processing")

1025

public void handleMessage(ProcessingMessage message) {

1026

logger.info("Processing message: {}", message.getId());

1027

1028

try {

1029

processMessage(message);

1030

} catch (ProcessingException e) {

1031

logger.warn("Processing failed for message {}, will retry", message.getId());

1032

throw e; // Will trigger retry

1033

} catch (Exception e) {

1034

logger.error("Unexpected error processing message {}", message.getId(), e);

1035

handleProcessingError(message, e);

1036

}

1037

}

1038

1039

@Recover

1040

public void recover(ProcessingException ex, ProcessingMessage message) {

1041

logger.error("All retry attempts exhausted for message {}", message.getId());

1042

sendToDeadLetterQueue(message, ex);

1043

}

1044

1045

private void processMessage(ProcessingMessage message) {

1046

// Message processing logic that might fail

1047

if (message.isCorrupted()) {

1048

throw new ProcessingException("Message is corrupted");

1049

}

1050

1051

// Actual processing...

1052

}

1053

1054

private void handleProcessingError(ProcessingMessage message, Exception e) {

1055

// Send to error queue for analysis

1056

ErrorMessage errorMessage = ErrorMessage.builder()

1057

.originalMessage(message)

1058

.errorMessage(e.getMessage())

1059

.timestamp(LocalDateTime.now())

1060

.build();

1061

1062

jmsTemplate.convertAndSend("error.analysis", errorMessage);

1063

}

1064

1065

private void sendToDeadLetterQueue(ProcessingMessage message, Exception e) {

1066

DeadLetterMessage dlqMessage = DeadLetterMessage.builder()

1067

.originalMessage(message)

1068

.failureReason(e.getMessage())

1069

.maxAttemptsReached(true)

1070

.timestamp(LocalDateTime.now())

1071

.build();

1072

1073

jmsTemplate.convertAndSend("dlq.processing", dlqMessage);

1074

}

1075

}

1076

```

1077

1078

### Integration with Spring Integration

1079

1080

```java { .api }

1081

// Spring Integration configuration for JMS

1082

@Configuration

1083

@EnableIntegration

1084

public class JmsIntegrationConfig {

1085

1086

@Bean

1087

public IntegrationFlow jmsInboundFlow(ConnectionFactory connectionFactory) {

1088

return IntegrationFlows

1089

.from(Jms.messageDrivenChannelAdapter(connectionFactory)

1090

.destination("integration.inbound")

1091

.configureListenerContainer(c -> c.sessionTransacted(true)))

1092

.transform(Transformers.objectToString())

1093

.filter("payload.contains('PRIORITY')")

1094

.channel("priorityChannel")

1095

.get();

1096

}

1097

1098

@Bean

1099

public IntegrationFlow jmsOutboundFlow(ConnectionFactory connectionFactory) {

1100

return IntegrationFlows

1101

.from("outboundChannel")

1102

.transform(Transformers.toJson())

1103

.handle(Jms.outboundAdapter(connectionFactory)

1104

.destination("integration.outbound"))

1105

.get();

1106

}

1107

1108

@Bean

1109

public MessageChannel priorityChannel() {

1110

return MessageChannels.queue().get();

1111

}

1112

1113

@Bean

1114

public MessageChannel outboundChannel() {

1115

return MessageChannels.direct().get();

1116

}

1117

1118

@ServiceActivator(inputChannel = "priorityChannel")

1119

public void handlePriorityMessage(String message) {

1120

logger.info("Handling priority message: {}", message);

1121

// Process priority messages

1122

}

1123

}

1124

```

1125

1126

### Message Testing

1127

1128

```java { .api }

1129

@SpringBootTest

1130

@DirtiesContext

1131

class JmsMessageTest {

1132

1133

@Autowired

1134

private JmsTemplate jmsTemplate;

1135

1136

@Autowired

1137

private MessageListenerService messageListener;

1138

1139

@Test

1140

@Transactional

1141

@Rollback

1142

void shouldSendAndReceiveMessage() {

1143

// Given

1144

UserNotificationMessage notification = UserNotificationMessage.builder()

1145

.userId(1L)

1146

.type("WELCOME")

1147

.content("Welcome to our platform!")

1148

.timestamp(LocalDateTime.now())

1149

.build();

1150

1151

// When

1152

jmsTemplate.convertAndSend("test.notifications", notification);

1153

1154

// Then

1155

UserNotificationMessage received = (UserNotificationMessage)

1156

jmsTemplate.receiveAndConvert("test.notifications");

1157

1158

assertThat(received).isNotNull();

1159

assertThat(received.getUserId()).isEqualTo(1L);

1160

assertThat(received.getType()).isEqualTo("WELCOME");

1161

}

1162

1163

@Test

1164

void shouldProcessMessageThroughListener() {

1165

// Given

1166

OrderCreatedEvent event = OrderCreatedEvent.builder()

1167

.orderId(123L)

1168

.userId(1L)

1169

.totalAmount(BigDecimal.valueOf(99.99))

1170

.build();

1171

1172

// When

1173

messageListener.handleOrderCreated(event, 1L, "HIGH");

1174

1175

// Then - verify the processing was successful

1176

// (This would typically involve verifying database changes or other side effects)

1177

}

1178

}

1179

1180

// Integration test with embedded broker

1181

@SpringBootTest

1182

@TestPropertySource(properties = {

1183

"spring.activemq.broker-url=vm://localhost?broker.persistent=false",

1184

"spring.jms.cache.enabled=false"

1185

})

1186

class JmsIntegrationTest {

1187

1188

@Autowired

1189

private MessageProducerService producerService;

1190

1191

@MockBean

1192

private EmailService emailService;

1193

1194

@Test

1195

void shouldProcessNotificationEndToEnd() throws InterruptedException {

1196

// Given

1197

Long userId = 1L;

1198

String content = "Test notification";

1199

1200

// When

1201

producerService.sendUserNotification(userId, "EMAIL", content);

1202

1203

// Wait for async processing

1204

Thread.sleep(1000);

1205

1206

// Then

1207

verify(emailService, timeout(5000)).sendNotificationEmail(any(User.class), eq(content));

1208

}

1209

}

1210

```

1211

1212

Spring Messaging and JMS support provide a robust foundation for building message-driven applications with support for both point-to-point and publish-subscribe messaging patterns, transaction management, and error handling.