0
# Listener Configuration
1
2
Configuration and usage of @RabbitListener annotation for creating message consumers with various listener container types and advanced configuration options.
3
4
## Basic Listener Configuration
5
6
### Simple Queue Listeners
7
8
```java { .api }
9
@Component
10
public class MessageListeners {
11
12
@RabbitListener(queues = "user.registration")
13
public void handleUserRegistration(UserRegistrationEvent event) {
14
// Process user registration
15
}
16
17
@RabbitListener(queues = {"orders.created", "orders.updated"})
18
public void handleOrderEvents(OrderEvent event) {
19
// Process order events from multiple queues
20
}
21
22
@RabbitListener(queues = "#{userQueue.name}")
23
public void handleDynamicQueue(String message) {
24
// Use SpEL to reference queue bean
25
}
26
}
27
```
28
29
### Queue Binding with Exchanges
30
31
```java { .api }
32
@Component
33
public class AdvancedListeners {
34
35
@RabbitListener(bindings = @QueueBinding(
36
value = @Queue(value = "order.processed", durable = "true"),
37
exchange = @Exchange(value = "orders", type = ExchangeTypes.TOPIC),
38
key = "order.processed.*"
39
))
40
public void handleOrderProcessed(OrderProcessedEvent event) {
41
// Process order events with topic binding
42
}
43
44
@RabbitListener(bindings = @QueueBinding(
45
value = @Queue(value = "user.notifications",
46
arguments = @Argument(name = "x-message-ttl", value = "60000", type = "java.lang.Integer")),
47
exchange = @Exchange(value = "notifications", type = ExchangeTypes.DIRECT),
48
key = "user.#{authentication.name}"
49
))
50
public void handleUserNotification(NotificationEvent event) {
51
// Handle user-specific notifications with TTL
52
}
53
}
54
```
55
56
## Container Configuration
57
58
### Simple Container Configuration
59
60
```java { .api }
61
@RabbitListener(
62
queues = "simple.queue",
63
containerFactory = "simpleRabbitListenerContainerFactory"
64
)
65
public void handleWithSimpleContainer(String message) {
66
// Processed by SimpleMessageListenerContainer
67
}
68
69
// Container Factory Configurer
70
public class SimpleRabbitListenerContainerFactoryConfigurer {
71
public void configure(SimpleRabbitListenerContainerFactory factory, ConnectionFactory connectionFactory) {
72
// Configures factory with default properties and connection factory
73
}
74
}
75
76
@Configuration
77
public class SimpleContainerConfig {
78
79
@Bean
80
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
81
ConnectionFactory connectionFactory,
82
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
83
84
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
85
configurer.configure(factory, connectionFactory);
86
87
factory.setConcurrentConsumers(2);
88
factory.setMaxConcurrentConsumers(10);
89
factory.setPrefetchCount(5);
90
factory.setTxSize(3);
91
92
return factory;
93
}
94
}
95
```
96
97
### Direct Container Configuration
98
99
```java { .api }
100
@RabbitListener(
101
queues = "direct.queue",
102
containerFactory = "directRabbitListenerContainerFactory"
103
)
104
public void handleWithDirectContainer(String message) {
105
// Processed by DirectMessageListenerContainer
106
}
107
108
// Container Factory Configurer
109
public class DirectRabbitListenerContainerFactoryConfigurer {
110
public void configure(DirectRabbitListenerContainerFactory factory, ConnectionFactory connectionFactory) {
111
// Configures factory with default properties and connection factory
112
}
113
}
114
115
@Configuration
116
public class DirectContainerConfig {
117
118
@Bean
119
public DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory(
120
ConnectionFactory connectionFactory,
121
DirectRabbitListenerContainerFactoryConfigurer configurer) {
122
123
DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
124
configurer.configure(factory, connectionFactory);
125
126
factory.setConsumersPerQueue(3);
127
factory.setPrefetchCount(10);
128
129
return factory;
130
}
131
}
132
```
133
134
## Message Handling Patterns
135
136
### Message Parameter Types
137
138
```java { .api }
139
@Component
140
public class MessageHandlers {
141
142
// Raw message
143
@RabbitListener(queues = "raw.queue")
144
public void handleRawMessage(Message message) {
145
byte[] body = message.getBody();
146
MessageProperties properties = message.getMessageProperties();
147
}
148
149
// String message
150
@RabbitListener(queues = "text.queue")
151
public void handleTextMessage(String message) {
152
// Handle text message
153
}
154
155
// JSON object
156
@RabbitListener(queues = "json.queue")
157
public void handleJsonMessage(UserEvent event) {
158
// Automatically deserialized from JSON
159
}
160
161
// With headers
162
@RabbitListener(queues = "headers.queue")
163
public void handleWithHeaders(UserEvent event, @Header("userId") String userId) {
164
// Access specific header
165
}
166
167
// With all headers
168
@RabbitListener(queues = "all-headers.queue")
169
public void handleWithAllHeaders(UserEvent event, @Header Map<String, Object> headers) {
170
// Access all headers
171
}
172
173
// With message properties
174
@RabbitListener(queues = "properties.queue")
175
public void handleWithProperties(UserEvent event, MessageProperties properties) {
176
String correlationId = properties.getCorrelationId();
177
Date timestamp = properties.getTimestamp();
178
}
179
}
180
```
181
182
### Return Values and Replies
183
184
```java { .api }
185
@Component
186
public class ReplyHandlers {
187
188
// Return reply message
189
@RabbitListener(queues = "request.queue")
190
public String handleRequest(String request) {
191
return "Processed: " + request;
192
}
193
194
// Return complex object
195
@RabbitListener(queues = "user.lookup")
196
public UserProfile lookupUser(String userId) {
197
return userService.findById(userId);
198
}
199
200
// Specify reply exchange and routing key
201
@RabbitListener(queues = "orders.process")
202
@SendTo("results.exchange/order.result")
203
public OrderResult processOrder(Order order) {
204
return orderService.process(order);
205
}
206
207
// Dynamic reply destination
208
@RabbitListener(queues = "dynamic.request")
209
@SendTo("#{@replyDestinationResolver.resolve(#message)}")
210
public String handleDynamicReply(String request, Message message) {
211
return "Response: " + request;
212
}
213
}
214
```
215
216
## Error Handling
217
218
### Exception Handling
219
220
```java { .api }
221
@Component
222
public class ErrorHandlingListeners {
223
224
@RabbitListener(queues = "error.prone.queue")
225
public void handleWithErrorHandling(String message) throws ProcessingException {
226
if (message.contains("error")) {
227
throw new ProcessingException("Processing failed for: " + message);
228
}
229
// Process message
230
}
231
232
@RabbitListener(queues = "retry.queue")
233
public void handleWithRetry(String message,
234
@Header(AmqpHeaders.REDELIVERED) boolean redelivered,
235
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
236
if (redelivered) {
237
// This is a retry attempt
238
log.warn("Retrying message: {}", message);
239
}
240
241
try {
242
processMessage(message);
243
} catch (Exception e) {
244
if (shouldRetry(e)) {
245
throw new AmqpRejectAndDontRequeueException("Temporary failure", e);
246
} else {
247
throw new AmqpRejectAndDontRequeueException("Permanent failure", e);
248
}
249
}
250
}
251
}
252
```
253
254
### Dead Letter Queue Configuration
255
256
```java { .api }
257
@Configuration
258
public class DeadLetterConfig {
259
260
@Bean
261
public Queue mainQueue() {
262
return QueueBuilder.durable("main.queue")
263
.withArgument("x-dead-letter-exchange", "dlx.exchange")
264
.withArgument("x-dead-letter-routing-key", "dead.letter")
265
.withArgument("x-message-ttl", 300000) // 5 minutes
266
.build();
267
}
268
269
@Bean
270
public DirectExchange deadLetterExchange() {
271
return new DirectExchange("dlx.exchange");
272
}
273
274
@Bean
275
public Queue deadLetterQueue() {
276
return QueueBuilder.durable("dead.letter.queue").build();
277
}
278
279
@Bean
280
public Binding deadLetterBinding() {
281
return BindingBuilder.bind(deadLetterQueue())
282
.to(deadLetterExchange())
283
.with("dead.letter");
284
}
285
286
@RabbitListener(queues = "dead.letter.queue")
287
public void handleDeadLetter(String message,
288
@Header Map<String, Object> headers,
289
@Header(required = false, name = "x-death") List<Map<String, Object>> xDeath) {
290
// Handle dead letter messages
291
log.error("Dead letter received: {}, death info: {}", message, xDeath);
292
}
293
}
294
```
295
296
## Advanced Configuration
297
298
### Message Converter Configuration
299
300
```java { .api }
301
@Configuration
302
public class ListenerConverterConfig {
303
304
@Bean
305
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
306
ConnectionFactory connectionFactory,
307
MessageConverter messageConverter) {
308
309
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
310
factory.setConnectionFactory(connectionFactory);
311
factory.setMessageConverter(messageConverter);
312
313
return factory;
314
}
315
316
@Bean
317
public MessageConverter messageConverter() {
318
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
319
converter.setClassMapper(classMapper());
320
return converter;
321
}
322
323
@Bean
324
public DefaultClassMapper classMapper() {
325
DefaultClassMapper classMapper = new DefaultClassMapper();
326
Map<String, Class<?>> idClassMapping = new HashMap<>();
327
idClassMapping.put("userEvent", UserEvent.class);
328
idClassMapping.put("orderEvent", OrderEvent.class);
329
classMapper.setIdClassMapping(idClassMapping);
330
return classMapper;
331
}
332
}
333
```
334
335
### Container Customization
336
337
```java { .api }
338
@Configuration
339
public class ContainerCustomizationConfig {
340
341
@Bean
342
public ContainerCustomizer<SimpleMessageListenerContainer> containerCustomizer() {
343
return container -> {
344
container.setPrefetchCount(1);
345
container.setDefaultRequeueRejected(false);
346
container.setMissingQueuesFatal(false);
347
container.setConsumerTagStrategy(queue -> "consumer-" + queue + "-" + UUID.randomUUID());
348
};
349
}
350
351
@Bean
352
public SimpleRabbitListenerContainerFactory customContainerFactory(
353
ConnectionFactory connectionFactory,
354
ContainerCustomizer<SimpleMessageListenerContainer> customizer) {
355
356
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
357
factory.setConnectionFactory(connectionFactory);
358
factory.setContainerCustomizer(customizer);
359
360
return factory;
361
}
362
}
363
```
364
365
### Conditional Listeners
366
367
```java { .api }
368
@Component
369
@ConditionalOnProperty(name = "messaging.listeners.enabled", havingValue = "true")
370
public class ConditionalListeners {
371
372
@RabbitListener(queues = "conditional.queue")
373
public void handleConditionally(String message) {
374
// Only active if property is set
375
}
376
}
377
378
@Component
379
public class ProfileBasedListeners {
380
381
@RabbitListener(queues = "dev.queue")
382
@Profile("development")
383
public void handleDevMessage(String message) {
384
// Only active in development profile
385
}
386
387
@RabbitListener(queues = "prod.queue")
388
@Profile("production")
389
public void handleProdMessage(String message) {
390
// Only active in production profile
391
}
392
}
393
```
394
395
## Batch Message Processing
396
397
### Batch Listener Configuration
398
399
```java { .api }
400
@Configuration
401
public class BatchListenerConfig {
402
403
@Bean
404
public SimpleRabbitListenerContainerFactory batchContainerFactory(
405
ConnectionFactory connectionFactory) {
406
407
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
408
factory.setConnectionFactory(connectionFactory);
409
factory.setBatchListener(true);
410
factory.setBatchSize(10);
411
factory.setConsumerBatchEnabled(true);
412
factory.setReceiveTimeout(5000L);
413
414
return factory;
415
}
416
}
417
418
@Component
419
public class BatchMessageHandler {
420
421
@RabbitListener(queues = "batch.queue", containerFactory = "batchContainerFactory")
422
public void handleBatch(List<Message> messages) {
423
for (Message message : messages) {
424
// Process each message in batch
425
String body = new String(message.getBody());
426
MessageProperties properties = message.getMessageProperties();
427
}
428
}
429
430
@RabbitListener(queues = "batch.objects.queue", containerFactory = "batchContainerFactory")
431
public void handleObjectBatch(List<UserEvent> events) {
432
// Process batch of converted objects
433
events.forEach(this::processUserEvent);
434
}
435
}
436
```
437
438
## Usage Examples
439
440
### Multi-tenant Message Processing
441
442
```java
443
@Component
444
public class MultiTenantListener {
445
446
@RabbitListener(bindings = @QueueBinding(
447
value = @Queue(value = "tenant.#{@tenantResolver.currentTenant()}.events"),
448
exchange = @Exchange(value = "multi-tenant", type = ExchangeTypes.TOPIC),
449
key = "tenant.*.events"
450
))
451
public void handleTenantEvent(TenantEvent event, @Header("tenantId") String tenantId) {
452
try (TenantContext.Scope ignored = tenantContext.withTenant(tenantId)) {
453
processTenantEvent(event);
454
}
455
}
456
}
457
```
458
459
### Ordered Message Processing
460
461
```java
462
@Component
463
public class OrderedMessageHandler {
464
465
@RabbitListener(queues = "ordered.messages", concurrency = "1")
466
public void handleOrdered(OrderedMessage message,
467
@Header("sequenceNumber") long sequenceNumber) {
468
// Single consumer ensures ordering
469
if (isNextInSequence(sequenceNumber)) {
470
processMessage(message);
471
} else {
472
// Requeue for later processing
473
throw new AmqpRejectAndDontRequeueException("Out of sequence");
474
}
475
}
476
}
477
```
478
479
### Priority Queue Processing
480
481
```java
482
@Configuration
483
public class PriorityQueueConfig {
484
485
@Bean
486
public Queue priorityQueue() {
487
return QueueBuilder.durable("priority.queue")
488
.withArgument("x-max-priority", 10)
489
.build();
490
}
491
}
492
493
@Component
494
public class PriorityMessageHandler {
495
496
@RabbitListener(queues = "priority.queue")
497
public void handlePriorityMessage(PriorityMessage message,
498
@Header(required = false, name = "priority") Integer priority) {
499
if (priority != null && priority > 7) {
500
// Handle high priority messages first
501
processHighPriority(message);
502
} else {
503
processNormalPriority(message);
504
}
505
}
506
}