or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration-properties.mdconnection-configuration.mderror-handling.mdindex.mdlistener-configuration.mdmessage-operations.mdqueue-exchange-management.mdstream-processing.md

listener-configuration.mddocs/

0

# Listener Configuration

1

2

Configuration and usage of @RabbitListener annotation for creating message consumers with various listener container types and advanced configuration options.

3

4

## Basic Listener Configuration

5

6

### Simple Queue Listeners

7

8

```java { .api }

9

@Component

10

public class MessageListeners {

11

12

@RabbitListener(queues = "user.registration")

13

public void handleUserRegistration(UserRegistrationEvent event) {

14

// Process user registration

15

}

16

17

@RabbitListener(queues = {"orders.created", "orders.updated"})

18

public void handleOrderEvents(OrderEvent event) {

19

// Process order events from multiple queues

20

}

21

22

@RabbitListener(queues = "#{userQueue.name}")

23

public void handleDynamicQueue(String message) {

24

// Use SpEL to reference queue bean

25

}

26

}

27

```

28

29

### Queue Binding with Exchanges

30

31

```java { .api }

32

@Component

33

public class AdvancedListeners {

34

35

@RabbitListener(bindings = @QueueBinding(

36

value = @Queue(value = "order.processed", durable = "true"),

37

exchange = @Exchange(value = "orders", type = ExchangeTypes.TOPIC),

38

key = "order.processed.*"

39

))

40

public void handleOrderProcessed(OrderProcessedEvent event) {

41

// Process order events with topic binding

42

}

43

44

@RabbitListener(bindings = @QueueBinding(

45

value = @Queue(value = "user.notifications",

46

arguments = @Argument(name = "x-message-ttl", value = "60000", type = "java.lang.Integer")),

47

exchange = @Exchange(value = "notifications", type = ExchangeTypes.DIRECT),

48

key = "user.#{authentication.name}"

49

))

50

public void handleUserNotification(NotificationEvent event) {

51

// Handle user-specific notifications with TTL

52

}

53

}

54

```

55

56

## Container Configuration

57

58

### Simple Container Configuration

59

60

```java { .api }

61

@RabbitListener(

62

queues = "simple.queue",

63

containerFactory = "simpleRabbitListenerContainerFactory"

64

)

65

public void handleWithSimpleContainer(String message) {

66

// Processed by SimpleMessageListenerContainer

67

}

68

69

// Container Factory Configurer

70

public class SimpleRabbitListenerContainerFactoryConfigurer {

71

public void configure(SimpleRabbitListenerContainerFactory factory, ConnectionFactory connectionFactory) {

72

// Configures factory with default properties and connection factory

73

}

74

}

75

76

@Configuration

77

public class SimpleContainerConfig {

78

79

@Bean

80

public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(

81

ConnectionFactory connectionFactory,

82

SimpleRabbitListenerContainerFactoryConfigurer configurer) {

83

84

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

85

configurer.configure(factory, connectionFactory);

86

87

factory.setConcurrentConsumers(2);

88

factory.setMaxConcurrentConsumers(10);

89

factory.setPrefetchCount(5);

90

factory.setTxSize(3);

91

92

return factory;

93

}

94

}

95

```

96

97

### Direct Container Configuration

98

99

```java { .api }

100

@RabbitListener(

101

queues = "direct.queue",

102

containerFactory = "directRabbitListenerContainerFactory"

103

)

104

public void handleWithDirectContainer(String message) {

105

// Processed by DirectMessageListenerContainer

106

}

107

108

// Container Factory Configurer

109

public class DirectRabbitListenerContainerFactoryConfigurer {

110

public void configure(DirectRabbitListenerContainerFactory factory, ConnectionFactory connectionFactory) {

111

// Configures factory with default properties and connection factory

112

}

113

}

114

115

@Configuration

116

public class DirectContainerConfig {

117

118

@Bean

119

public DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory(

120

ConnectionFactory connectionFactory,

121

DirectRabbitListenerContainerFactoryConfigurer configurer) {

122

123

DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();

124

configurer.configure(factory, connectionFactory);

125

126

factory.setConsumersPerQueue(3);

127

factory.setPrefetchCount(10);

128

129

return factory;

130

}

131

}

132

```

133

134

## Message Handling Patterns

135

136

### Message Parameter Types

137

138

```java { .api }

139

@Component

140

public class MessageHandlers {

141

142

// Raw message

143

@RabbitListener(queues = "raw.queue")

144

public void handleRawMessage(Message message) {

145

byte[] body = message.getBody();

146

MessageProperties properties = message.getMessageProperties();

147

}

148

149

// String message

150

@RabbitListener(queues = "text.queue")

151

public void handleTextMessage(String message) {

152

// Handle text message

153

}

154

155

// JSON object

156

@RabbitListener(queues = "json.queue")

157

public void handleJsonMessage(UserEvent event) {

158

// Automatically deserialized from JSON

159

}

160

161

// With headers

162

@RabbitListener(queues = "headers.queue")

163

public void handleWithHeaders(UserEvent event, @Header("userId") String userId) {

164

// Access specific header

165

}

166

167

// With all headers

168

@RabbitListener(queues = "all-headers.queue")

169

public void handleWithAllHeaders(UserEvent event, @Header Map<String, Object> headers) {

170

// Access all headers

171

}

172

173

// With message properties

174

@RabbitListener(queues = "properties.queue")

175

public void handleWithProperties(UserEvent event, MessageProperties properties) {

176

String correlationId = properties.getCorrelationId();

177

Date timestamp = properties.getTimestamp();

178

}

179

}

180

```

181

182

### Return Values and Replies

183

184

```java { .api }

185

@Component

186

public class ReplyHandlers {

187

188

// Return reply message

189

@RabbitListener(queues = "request.queue")

190

public String handleRequest(String request) {

191

return "Processed: " + request;

192

}

193

194

// Return complex object

195

@RabbitListener(queues = "user.lookup")

196

public UserProfile lookupUser(String userId) {

197

return userService.findById(userId);

198

}

199

200

// Specify reply exchange and routing key

201

@RabbitListener(queues = "orders.process")

202

@SendTo("results.exchange/order.result")

203

public OrderResult processOrder(Order order) {

204

return orderService.process(order);

205

}

206

207

// Dynamic reply destination

208

@RabbitListener(queues = "dynamic.request")

209

@SendTo("#{@replyDestinationResolver.resolve(#message)}")

210

public String handleDynamicReply(String request, Message message) {

211

return "Response: " + request;

212

}

213

}

214

```

215

216

## Error Handling

217

218

### Exception Handling

219

220

```java { .api }

221

@Component

222

public class ErrorHandlingListeners {

223

224

@RabbitListener(queues = "error.prone.queue")

225

public void handleWithErrorHandling(String message) throws ProcessingException {

226

if (message.contains("error")) {

227

throw new ProcessingException("Processing failed for: " + message);

228

}

229

// Process message

230

}

231

232

@RabbitListener(queues = "retry.queue")

233

public void handleWithRetry(String message,

234

@Header(AmqpHeaders.REDELIVERED) boolean redelivered,

235

@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {

236

if (redelivered) {

237

// This is a retry attempt

238

log.warn("Retrying message: {}", message);

239

}

240

241

try {

242

processMessage(message);

243

} catch (Exception e) {

244

if (shouldRetry(e)) {

245

throw new AmqpRejectAndDontRequeueException("Temporary failure", e);

246

} else {

247

throw new AmqpRejectAndDontRequeueException("Permanent failure", e);

248

}

249

}

250

}

251

}

252

```

253

254

### Dead Letter Queue Configuration

255

256

```java { .api }

257

@Configuration

258

public class DeadLetterConfig {

259

260

@Bean

261

public Queue mainQueue() {

262

return QueueBuilder.durable("main.queue")

263

.withArgument("x-dead-letter-exchange", "dlx.exchange")

264

.withArgument("x-dead-letter-routing-key", "dead.letter")

265

.withArgument("x-message-ttl", 300000) // 5 minutes

266

.build();

267

}

268

269

@Bean

270

public DirectExchange deadLetterExchange() {

271

return new DirectExchange("dlx.exchange");

272

}

273

274

@Bean

275

public Queue deadLetterQueue() {

276

return QueueBuilder.durable("dead.letter.queue").build();

277

}

278

279

@Bean

280

public Binding deadLetterBinding() {

281

return BindingBuilder.bind(deadLetterQueue())

282

.to(deadLetterExchange())

283

.with("dead.letter");

284

}

285

286

@RabbitListener(queues = "dead.letter.queue")

287

public void handleDeadLetter(String message,

288

@Header Map<String, Object> headers,

289

@Header(required = false, name = "x-death") List<Map<String, Object>> xDeath) {

290

// Handle dead letter messages

291

log.error("Dead letter received: {}, death info: {}", message, xDeath);

292

}

293

}

294

```

295

296

## Advanced Configuration

297

298

### Message Converter Configuration

299

300

```java { .api }

301

@Configuration

302

public class ListenerConverterConfig {

303

304

@Bean

305

public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(

306

ConnectionFactory connectionFactory,

307

MessageConverter messageConverter) {

308

309

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

310

factory.setConnectionFactory(connectionFactory);

311

factory.setMessageConverter(messageConverter);

312

313

return factory;

314

}

315

316

@Bean

317

public MessageConverter messageConverter() {

318

Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();

319

converter.setClassMapper(classMapper());

320

return converter;

321

}

322

323

@Bean

324

public DefaultClassMapper classMapper() {

325

DefaultClassMapper classMapper = new DefaultClassMapper();

326

Map<String, Class<?>> idClassMapping = new HashMap<>();

327

idClassMapping.put("userEvent", UserEvent.class);

328

idClassMapping.put("orderEvent", OrderEvent.class);

329

classMapper.setIdClassMapping(idClassMapping);

330

return classMapper;

331

}

332

}

333

```

334

335

### Container Customization

336

337

```java { .api }

338

@Configuration

339

public class ContainerCustomizationConfig {

340

341

@Bean

342

public ContainerCustomizer<SimpleMessageListenerContainer> containerCustomizer() {

343

return container -> {

344

container.setPrefetchCount(1);

345

container.setDefaultRequeueRejected(false);

346

container.setMissingQueuesFatal(false);

347

container.setConsumerTagStrategy(queue -> "consumer-" + queue + "-" + UUID.randomUUID());

348

};

349

}

350

351

@Bean

352

public SimpleRabbitListenerContainerFactory customContainerFactory(

353

ConnectionFactory connectionFactory,

354

ContainerCustomizer<SimpleMessageListenerContainer> customizer) {

355

356

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

357

factory.setConnectionFactory(connectionFactory);

358

factory.setContainerCustomizer(customizer);

359

360

return factory;

361

}

362

}

363

```

364

365

### Conditional Listeners

366

367

```java { .api }

368

@Component

369

@ConditionalOnProperty(name = "messaging.listeners.enabled", havingValue = "true")

370

public class ConditionalListeners {

371

372

@RabbitListener(queues = "conditional.queue")

373

public void handleConditionally(String message) {

374

// Only active if property is set

375

}

376

}

377

378

@Component

379

public class ProfileBasedListeners {

380

381

@RabbitListener(queues = "dev.queue")

382

@Profile("development")

383

public void handleDevMessage(String message) {

384

// Only active in development profile

385

}

386

387

@RabbitListener(queues = "prod.queue")

388

@Profile("production")

389

public void handleProdMessage(String message) {

390

// Only active in production profile

391

}

392

}

393

```

394

395

## Batch Message Processing

396

397

### Batch Listener Configuration

398

399

```java { .api }

400

@Configuration

401

public class BatchListenerConfig {

402

403

@Bean

404

public SimpleRabbitListenerContainerFactory batchContainerFactory(

405

ConnectionFactory connectionFactory) {

406

407

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

408

factory.setConnectionFactory(connectionFactory);

409

factory.setBatchListener(true);

410

factory.setBatchSize(10);

411

factory.setConsumerBatchEnabled(true);

412

factory.setReceiveTimeout(5000L);

413

414

return factory;

415

}

416

}

417

418

@Component

419

public class BatchMessageHandler {

420

421

@RabbitListener(queues = "batch.queue", containerFactory = "batchContainerFactory")

422

public void handleBatch(List<Message> messages) {

423

for (Message message : messages) {

424

// Process each message in batch

425

String body = new String(message.getBody());

426

MessageProperties properties = message.getMessageProperties();

427

}

428

}

429

430

@RabbitListener(queues = "batch.objects.queue", containerFactory = "batchContainerFactory")

431

public void handleObjectBatch(List<UserEvent> events) {

432

// Process batch of converted objects

433

events.forEach(this::processUserEvent);

434

}

435

}

436

```

437

438

## Usage Examples

439

440

### Multi-tenant Message Processing

441

442

```java

443

@Component

444

public class MultiTenantListener {

445

446

@RabbitListener(bindings = @QueueBinding(

447

value = @Queue(value = "tenant.#{@tenantResolver.currentTenant()}.events"),

448

exchange = @Exchange(value = "multi-tenant", type = ExchangeTypes.TOPIC),

449

key = "tenant.*.events"

450

))

451

public void handleTenantEvent(TenantEvent event, @Header("tenantId") String tenantId) {

452

try (TenantContext.Scope ignored = tenantContext.withTenant(tenantId)) {

453

processTenantEvent(event);

454

}

455

}

456

}

457

```

458

459

### Ordered Message Processing

460

461

```java

462

@Component

463

public class OrderedMessageHandler {

464

465

@RabbitListener(queues = "ordered.messages", concurrency = "1")

466

public void handleOrdered(OrderedMessage message,

467

@Header("sequenceNumber") long sequenceNumber) {

468

// Single consumer ensures ordering

469

if (isNextInSequence(sequenceNumber)) {

470

processMessage(message);

471

} else {

472

// Requeue for later processing

473

throw new AmqpRejectAndDontRequeueException("Out of sequence");

474

}

475

}

476

}

477

```

478

479

### Priority Queue Processing

480

481

```java

482

@Configuration

483

public class PriorityQueueConfig {

484

485

@Bean

486

public Queue priorityQueue() {

487

return QueueBuilder.durable("priority.queue")

488

.withArgument("x-max-priority", 10)

489

.build();

490

}

491

}

492

493

@Component

494

public class PriorityMessageHandler {

495

496

@RabbitListener(queues = "priority.queue")

497

public void handlePriorityMessage(PriorityMessage message,

498

@Header(required = false, name = "priority") Integer priority) {

499

if (priority != null && priority > 7) {

500

// Handle high priority messages first

501

processHighPriority(message);

502

} else {

503

processNormalPriority(message);

504

}

505

}

506

}