0
# Queue, Exchange, and Binding Management
1
2
Administrative operations for managing RabbitMQ topology using RabbitAdmin and declarative configuration for queues, exchanges, and bindings.
3
4
## RabbitAdmin Operations
5
6
### Basic Admin Operations
7
8
```java { .api }
9
@Service
10
public class TopologyManager {
11
12
@Autowired
13
private RabbitAdmin rabbitAdmin;
14
15
// Queue operations
16
public void declareQueue(String queueName) {
17
Queue queue = new Queue(queueName);
18
rabbitAdmin.declareQueue(queue);
19
}
20
21
public void deleteQueue(String queueName) {
22
rabbitAdmin.deleteQueue(queueName);
23
}
24
25
public void purgeQueue(String queueName) {
26
rabbitAdmin.purgeQueue(queueName, false);
27
}
28
29
// Exchange operations
30
public void declareExchange(String exchangeName, String type) {
31
Exchange exchange = new CustomExchange(exchangeName, type);
32
rabbitAdmin.declareExchange(exchange);
33
}
34
35
public void deleteExchange(String exchangeName) {
36
rabbitAdmin.deleteExchange(exchangeName);
37
}
38
39
// Binding operations
40
public void bindQueue(String queueName, String exchangeName, String routingKey) {
41
Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE,
42
exchangeName, routingKey, null);
43
rabbitAdmin.declareBinding(binding);
44
}
45
46
public void unbindQueue(String queueName, String exchangeName, String routingKey) {
47
Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE,
48
exchangeName, routingKey, null);
49
rabbitAdmin.removeBinding(binding);
50
}
51
}
52
```
53
54
### RabbitAdmin Configuration
55
56
```java { .api }
57
@Configuration
58
public class AdminConfig {
59
60
@Bean
61
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
62
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
63
admin.setAutoStartup(true);
64
admin.setIgnoreDeclarationExceptions(true);
65
return admin;
66
}
67
68
@Bean
69
public RabbitAdmin conditionalAdmin(ConnectionFactory connectionFactory) {
70
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
71
admin.setAutoStartup(false); // Manual startup
72
return admin;
73
}
74
}
75
```
76
77
## Queue Declaration
78
79
### Basic Queue Types
80
81
```java { .api }
82
@Configuration
83
public class QueueConfig {
84
85
// Simple durable queue
86
@Bean
87
public Queue simpleQueue() {
88
return new Queue("simple.queue", true, false, false);
89
}
90
91
// Auto-delete queue
92
@Bean
93
public Queue autoDeleteQueue() {
94
return new Queue("auto.delete.queue", false, false, true);
95
}
96
97
// Exclusive queue
98
@Bean
99
public Queue exclusiveQueue() {
100
return new Queue("exclusive.queue", false, true, false);
101
}
102
103
// Temporary queue
104
@Bean
105
public Queue temporaryQueue() {
106
return new AnonymousQueue();
107
}
108
}
109
```
110
111
### Queue Builder Pattern
112
113
```java { .api }
114
@Configuration
115
public class AdvancedQueueConfig {
116
117
@Bean
118
public Queue ttlQueue() {
119
return QueueBuilder.durable("ttl.queue")
120
.withArgument("x-message-ttl", 60000)
121
.build();
122
}
123
124
@Bean
125
public Queue deadLetterQueue() {
126
return QueueBuilder.durable("main.queue")
127
.withArgument("x-dead-letter-exchange", "dlx.exchange")
128
.withArgument("x-dead-letter-routing-key", "dead.letter")
129
.withArgument("x-message-ttl", 300000)
130
.build();
131
}
132
133
@Bean
134
public Queue priorityQueue() {
135
return QueueBuilder.durable("priority.queue")
136
.withArgument("x-max-priority", 10)
137
.build();
138
}
139
140
@Bean
141
public Queue maxLengthQueue() {
142
return QueueBuilder.durable("max.length.queue")
143
.withArgument("x-max-length", 1000)
144
.withArgument("x-overflow", "reject-publish")
145
.build();
146
}
147
148
@Bean
149
public Queue lazyQueue() {
150
return QueueBuilder.durable("lazy.queue")
151
.lazy()
152
.build();
153
}
154
155
@Bean
156
public Queue quorumQueue() {
157
return QueueBuilder.durable("quorum.queue")
158
.quorum()
159
.build();
160
}
161
}
162
```
163
164
### Queue Arguments and Properties
165
166
```java { .api }
167
@Configuration
168
public class QueueArgumentsConfig {
169
170
@Bean
171
public Queue queueWithArguments() {
172
Map<String, Object> arguments = new HashMap<>();
173
arguments.put("x-message-ttl", 600000); // 10 minutes TTL
174
arguments.put("x-expires", 1800000); // Queue expires after 30 minutes
175
arguments.put("x-max-length", 1000); // Max 1000 messages
176
arguments.put("x-max-length-bytes", 1048576); // Max 1MB
177
arguments.put("x-overflow", "reject-publish"); // Reject when full
178
arguments.put("x-dead-letter-exchange", "dlx"); // Dead letter exchange
179
arguments.put("x-dead-letter-routing-key", "failed"); // Dead letter routing key
180
arguments.put("x-max-priority", 10); // Priority queue
181
arguments.put("x-queue-mode", "lazy"); // Lazy queue
182
183
return new Queue("configured.queue", true, false, false, arguments);
184
}
185
}
186
```
187
188
## Exchange Declaration
189
190
### Exchange Types
191
192
```java { .api }
193
@Configuration
194
public class ExchangeConfig {
195
196
// Direct exchange
197
@Bean
198
public DirectExchange directExchange() {
199
return new DirectExchange("direct.exchange", true, false);
200
}
201
202
// Topic exchange
203
@Bean
204
public TopicExchange topicExchange() {
205
return new TopicExchange("topic.exchange", true, false);
206
}
207
208
// Fanout exchange
209
@Bean
210
public FanoutExchange fanoutExchange() {
211
return new FanoutExchange("fanout.exchange", true, false);
212
}
213
214
// Headers exchange
215
@Bean
216
public HeadersExchange headersExchange() {
217
return new HeadersExchange("headers.exchange", true, false);
218
}
219
220
// Custom exchange
221
@Bean
222
public CustomExchange customExchange() {
223
return new CustomExchange("custom.exchange", "x-delayed-message", true, false);
224
}
225
}
226
```
227
228
### Exchange Builder Pattern
229
230
```java { .api }
231
@Configuration
232
public class AdvancedExchangeConfig {
233
234
@Bean
235
public TopicExchange durableTopicExchange() {
236
return ExchangeBuilder.topicExchange("durable.topic")
237
.durable(true)
238
.build();
239
}
240
241
@Bean
242
public DirectExchange autoDeleteExchange() {
243
return ExchangeBuilder.directExchange("auto.delete.direct")
244
.autoDelete()
245
.build();
246
}
247
248
@Bean
249
public CustomExchange delayedExchange() {
250
return ExchangeBuilder.directExchange("delayed.exchange")
251
.delayed()
252
.build();
253
}
254
255
@Bean
256
public TopicExchange exchangeWithArguments() {
257
return ExchangeBuilder.topicExchange("args.topic")
258
.withArgument("x-delayed-type", "direct")
259
.build();
260
}
261
}
262
```
263
264
### Exchange Arguments
265
266
```java { .api }
267
@Configuration
268
public class ExchangeArgumentsConfig {
269
270
@Bean
271
public CustomExchange delayedMessageExchange() {
272
Map<String, Object> arguments = new HashMap<>();
273
arguments.put("x-delayed-type", "direct");
274
275
return new CustomExchange("delayed.messages", "x-delayed-message",
276
true, false, arguments);
277
}
278
279
@Bean
280
public TopicExchange alternateExchange() {
281
Map<String, Object> arguments = new HashMap<>();
282
arguments.put("alternate-exchange", "alternate.exchange");
283
284
return new TopicExchange("main.topic", true, false, arguments);
285
}
286
}
287
```
288
289
## Binding Configuration
290
291
### Basic Bindings
292
293
```java { .api }
294
@Configuration
295
public class BindingConfig {
296
297
// Direct exchange binding
298
@Bean
299
public Binding directBinding(Queue directQueue, DirectExchange directExchange) {
300
return BindingBuilder.bind(directQueue).to(directExchange).with("direct.key");
301
}
302
303
// Topic exchange bindings
304
@Bean
305
public Binding topicBinding1(Queue topicQueue, TopicExchange topicExchange) {
306
return BindingBuilder.bind(topicQueue).to(topicExchange).with("order.*");
307
}
308
309
@Bean
310
public Binding topicBinding2(Queue topicQueue, TopicExchange topicExchange) {
311
return BindingBuilder.bind(topicQueue).to(topicExchange).with("user.#");
312
}
313
314
// Fanout exchange binding (no routing key needed)
315
@Bean
316
public Binding fanoutBinding(Queue fanoutQueue, FanoutExchange fanoutExchange) {
317
return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
318
}
319
320
// Headers exchange binding
321
@Bean
322
public Binding headersBinding(Queue headersQueue, HeadersExchange headersExchange) {
323
return BindingBuilder.bind(headersQueue).to(headersExchange)
324
.where("format").matches("pdf").and("type").matches("report");
325
}
326
}
327
```
328
329
### Advanced Binding Patterns
330
331
```java { .api }
332
@Configuration
333
public class AdvancedBindingConfig {
334
335
// Multiple bindings for one queue
336
@Bean
337
public List<Binding> multipleBindings(Queue multiQueue, TopicExchange topicExchange) {
338
return Arrays.asList(
339
BindingBuilder.bind(multiQueue).to(topicExchange).with("order.created"),
340
BindingBuilder.bind(multiQueue).to(topicExchange).with("order.updated"),
341
BindingBuilder.bind(multiQueue).to(topicExchange).with("order.cancelled")
342
);
343
}
344
345
// Exchange to exchange binding
346
@Bean
347
public Binding exchangeBinding(TopicExchange sourceExchange, DirectExchange targetExchange) {
348
return BindingBuilder.bind(targetExchange).to(sourceExchange).with("important.*");
349
}
350
351
// Headers binding with all match
352
@Bean
353
public Binding headersAllBinding(Queue headersQueue, HeadersExchange headersExchange) {
354
Map<String, Object> headers = new HashMap<>();
355
headers.put("x-match", "all");
356
headers.put("format", "pdf");
357
headers.put("type", "report");
358
359
return BindingBuilder.bind(headersQueue).to(headersExchange).whereAll(headers).match();
360
}
361
362
// Headers binding with any match
363
@Bean
364
public Binding headersAnyBinding(Queue headersQueue, HeadersExchange headersExchange) {
365
Map<String, Object> headers = new HashMap<>();
366
headers.put("format", "pdf");
367
headers.put("urgent", true);
368
369
return BindingBuilder.bind(headersQueue).to(headersExchange).whereAny(headers).match();
370
}
371
}
372
```
373
374
## Declarative Configuration
375
376
### Complete Topology Declaration
377
378
```java { .api }
379
@Configuration
380
public class CompleteTopologyConfig {
381
382
// Exchanges
383
@Bean
384
public TopicExchange ordersExchange() {
385
return ExchangeBuilder.topicExchange("orders.exchange").durable(true).build();
386
}
387
388
@Bean
389
public DirectExchange notificationsExchange() {
390
return ExchangeBuilder.directExchange("notifications.exchange").durable(true).build();
391
}
392
393
@Bean
394
public DirectExchange dlxExchange() {
395
return ExchangeBuilder.directExchange("dlx.exchange").durable(true).build();
396
}
397
398
// Queues
399
@Bean
400
public Queue orderProcessingQueue() {
401
return QueueBuilder.durable("order.processing")
402
.withArgument("x-dead-letter-exchange", "dlx.exchange")
403
.withArgument("x-dead-letter-routing-key", "order.failed")
404
.build();
405
}
406
407
@Bean
408
public Queue orderNotificationQueue() {
409
return QueueBuilder.durable("order.notifications")
410
.withArgument("x-message-ttl", 300000)
411
.build();
412
}
413
414
@Bean
415
public Queue deadLetterQueue() {
416
return QueueBuilder.durable("dead.letters").build();
417
}
418
419
// Bindings
420
@Bean
421
public Binding orderProcessingBinding() {
422
return BindingBuilder.bind(orderProcessingQueue())
423
.to(ordersExchange())
424
.with("order.created");
425
}
426
427
@Bean
428
public Binding orderNotificationBinding() {
429
return BindingBuilder.bind(orderNotificationQueue())
430
.to(notificationsExchange())
431
.with("order.notification");
432
}
433
434
@Bean
435
public Binding deadLetterBinding() {
436
return BindingBuilder.bind(deadLetterQueue())
437
.to(dlxExchange())
438
.with("order.failed");
439
}
440
}
441
```
442
443
### Conditional Topology
444
445
```java { .api }
446
@Configuration
447
public class ConditionalTopologyConfig {
448
449
@Bean
450
@ConditionalOnProperty(name = "messaging.queues.priority.enabled", havingValue = "true")
451
public Queue priorityQueue() {
452
return QueueBuilder.durable("priority.processing")
453
.withArgument("x-max-priority", 10)
454
.build();
455
}
456
457
@Bean
458
@Profile("development")
459
public Queue devQueue() {
460
return QueueBuilder.nonDurable("dev.testing").autoDelete().build();
461
}
462
463
@Bean
464
@Profile("production")
465
public Queue prodQueue() {
466
return QueueBuilder.durable("prod.processing")
467
.withArgument("x-queue-mode", "lazy")
468
.build();
469
}
470
}
471
```
472
473
## Dynamic Topology Management
474
475
### Runtime Queue Creation
476
477
```java { .api }
478
@Service
479
public class DynamicTopologyService {
480
481
@Autowired
482
private RabbitAdmin rabbitAdmin;
483
484
public void createTenantQueue(String tenantId) {
485
String queueName = "tenant." + tenantId + ".events";
486
487
Queue queue = QueueBuilder.durable(queueName)
488
.withArgument("x-message-ttl", 3600000) // 1 hour TTL
489
.build();
490
491
rabbitAdmin.declareQueue(queue);
492
493
// Create binding
494
TopicExchange exchange = new TopicExchange("tenant.events");
495
Binding binding = BindingBuilder.bind(queue)
496
.to(exchange)
497
.with("tenant." + tenantId + ".*");
498
499
rabbitAdmin.declareBinding(binding);
500
}
501
502
public void deleteTenantQueue(String tenantId) {
503
String queueName = "tenant." + tenantId + ".events";
504
rabbitAdmin.deleteQueue(queueName);
505
}
506
507
public void createTemporaryReplyQueue(String sessionId) {
508
String queueName = "reply." + sessionId;
509
510
Queue replyQueue = QueueBuilder.nonDurable(queueName)
511
.autoDelete()
512
.exclusive()
513
.withArgument("x-expires", 300000) // 5 minutes
514
.build();
515
516
rabbitAdmin.declareQueue(replyQueue);
517
}
518
}
519
```
520
521
### Topology Templates
522
523
```java { .api }
524
@Component
525
public class TopologyTemplates {
526
527
@Autowired
528
private RabbitAdmin rabbitAdmin;
529
530
public void createStandardTopology(String domain) {
531
// Create exchanges
532
TopicExchange domainExchange = new TopicExchange(domain + ".exchange", true, false);
533
DirectExchange dlxExchange = new DirectExchange(domain + ".dlx", true, false);
534
535
rabbitAdmin.declareExchange(domainExchange);
536
rabbitAdmin.declareExchange(dlxExchange);
537
538
// Create queues
539
Queue eventQueue = QueueBuilder.durable(domain + ".events")
540
.withArgument("x-dead-letter-exchange", domain + ".dlx")
541
.build();
542
543
Queue commandQueue = QueueBuilder.durable(domain + ".commands")
544
.withArgument("x-dead-letter-exchange", domain + ".dlx")
545
.build();
546
547
Queue deadLetterQueue = QueueBuilder.durable(domain + ".dead.letters").build();
548
549
rabbitAdmin.declareQueue(eventQueue);
550
rabbitAdmin.declareQueue(commandQueue);
551
rabbitAdmin.declareQueue(deadLetterQueue);
552
553
// Create bindings
554
Binding eventBinding = BindingBuilder.bind(eventQueue)
555
.to(domainExchange).with(domain + ".event.*");
556
557
Binding commandBinding = BindingBuilder.bind(commandQueue)
558
.to(domainExchange).with(domain + ".command.*");
559
560
Binding dlxBinding = BindingBuilder.bind(deadLetterQueue)
561
.to(dlxExchange).with("#");
562
563
rabbitAdmin.declareBinding(eventBinding);
564
rabbitAdmin.declareBinding(commandBinding);
565
rabbitAdmin.declareBinding(dlxBinding);
566
}
567
}
568
```
569
570
## Queue and Exchange Properties
571
572
### Queue Information
573
574
```java { .api }
575
@Service
576
public class QueueInspectionService {
577
578
@Autowired
579
private RabbitAdmin rabbitAdmin;
580
581
public Properties getQueueProperties(String queueName) {
582
return rabbitAdmin.getQueueProperties(queueName);
583
}
584
585
public int getQueueMessageCount(String queueName) {
586
Properties props = rabbitAdmin.getQueueProperties(queueName);
587
return props != null ? (Integer) props.get(RabbitAdmin.QUEUE_MESSAGE_COUNT) : -1;
588
}
589
590
public int getQueueConsumerCount(String queueName) {
591
Properties props = rabbitAdmin.getQueueProperties(queueName);
592
return props != null ? (Integer) props.get(RabbitAdmin.QUEUE_CONSUMER_COUNT) : -1;
593
}
594
595
public void purgeQueueIfEmpty(String queueName) {
596
Properties props = rabbitAdmin.getQueueProperties(queueName);
597
if (props != null && (Integer) props.get(RabbitAdmin.QUEUE_MESSAGE_COUNT) == 0) {
598
rabbitAdmin.purgeQueue(queueName, false);
599
}
600
}
601
}
602
```
603
604
### Queue Management Operations
605
606
```java { .api }
607
@Service
608
public class QueueManagementService {
609
610
@Autowired
611
private RabbitAdmin rabbitAdmin;
612
613
public boolean queueExists(String queueName) {
614
Properties props = rabbitAdmin.getQueueProperties(queueName);
615
return props != null;
616
}
617
618
public void declareQueueIfNotExists(String queueName) {
619
if (!queueExists(queueName)) {
620
Queue queue = QueueBuilder.durable(queueName).build();
621
rabbitAdmin.declareQueue(queue);
622
}
623
}
624
625
public void deleteQueueIfEmpty(String queueName) {
626
Properties props = rabbitAdmin.getQueueProperties(queueName);
627
if (props != null && (Integer) props.get(RabbitAdmin.QUEUE_MESSAGE_COUNT) == 0) {
628
rabbitAdmin.deleteQueue(queueName, false, true); // unused=false, empty=true
629
}
630
}
631
632
public void purgeAllQueues(String... queueNames) {
633
for (String queueName : queueNames) {
634
rabbitAdmin.purgeQueue(queueName, false);
635
}
636
}
637
}