or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration-properties.mdconnection-configuration.mderror-handling.mdindex.mdlistener-configuration.mdmessage-operations.mdqueue-exchange-management.mdstream-processing.md

queue-exchange-management.mddocs/

0

# Queue, Exchange, and Binding Management

1

2

Administrative operations for managing RabbitMQ topology using RabbitAdmin and declarative configuration for queues, exchanges, and bindings.

3

4

## RabbitAdmin Operations

5

6

### Basic Admin Operations

7

8

```java { .api }

9

@Service

10

public class TopologyManager {

11

12

@Autowired

13

private RabbitAdmin rabbitAdmin;

14

15

// Queue operations

16

public void declareQueue(String queueName) {

17

Queue queue = new Queue(queueName);

18

rabbitAdmin.declareQueue(queue);

19

}

20

21

public void deleteQueue(String queueName) {

22

rabbitAdmin.deleteQueue(queueName);

23

}

24

25

public void purgeQueue(String queueName) {

26

rabbitAdmin.purgeQueue(queueName, false);

27

}

28

29

// Exchange operations

30

public void declareExchange(String exchangeName, String type) {

31

Exchange exchange = new CustomExchange(exchangeName, type);

32

rabbitAdmin.declareExchange(exchange);

33

}

34

35

public void deleteExchange(String exchangeName) {

36

rabbitAdmin.deleteExchange(exchangeName);

37

}

38

39

// Binding operations

40

public void bindQueue(String queueName, String exchangeName, String routingKey) {

41

Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE,

42

exchangeName, routingKey, null);

43

rabbitAdmin.declareBinding(binding);

44

}

45

46

public void unbindQueue(String queueName, String exchangeName, String routingKey) {

47

Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE,

48

exchangeName, routingKey, null);

49

rabbitAdmin.removeBinding(binding);

50

}

51

}

52

```

53

54

### RabbitAdmin Configuration

55

56

```java { .api }

57

@Configuration

58

public class AdminConfig {

59

60

@Bean

61

public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {

62

RabbitAdmin admin = new RabbitAdmin(connectionFactory);

63

admin.setAutoStartup(true);

64

admin.setIgnoreDeclarationExceptions(true);

65

return admin;

66

}

67

68

@Bean

69

public RabbitAdmin conditionalAdmin(ConnectionFactory connectionFactory) {

70

RabbitAdmin admin = new RabbitAdmin(connectionFactory);

71

admin.setAutoStartup(false); // Manual startup

72

return admin;

73

}

74

}

75

```

76

77

## Queue Declaration

78

79

### Basic Queue Types

80

81

```java { .api }

82

@Configuration

83

public class QueueConfig {

84

85

// Simple durable queue

86

@Bean

87

public Queue simpleQueue() {

88

return new Queue("simple.queue", true, false, false);

89

}

90

91

// Auto-delete queue

92

@Bean

93

public Queue autoDeleteQueue() {

94

return new Queue("auto.delete.queue", false, false, true);

95

}

96

97

// Exclusive queue

98

@Bean

99

public Queue exclusiveQueue() {

100

return new Queue("exclusive.queue", false, true, false);

101

}

102

103

// Temporary queue

104

@Bean

105

public Queue temporaryQueue() {

106

return new AnonymousQueue();

107

}

108

}

109

```

110

111

### Queue Builder Pattern

112

113

```java { .api }

114

@Configuration

115

public class AdvancedQueueConfig {

116

117

@Bean

118

public Queue ttlQueue() {

119

return QueueBuilder.durable("ttl.queue")

120

.withArgument("x-message-ttl", 60000)

121

.build();

122

}

123

124

@Bean

125

public Queue deadLetterQueue() {

126

return QueueBuilder.durable("main.queue")

127

.withArgument("x-dead-letter-exchange", "dlx.exchange")

128

.withArgument("x-dead-letter-routing-key", "dead.letter")

129

.withArgument("x-message-ttl", 300000)

130

.build();

131

}

132

133

@Bean

134

public Queue priorityQueue() {

135

return QueueBuilder.durable("priority.queue")

136

.withArgument("x-max-priority", 10)

137

.build();

138

}

139

140

@Bean

141

public Queue maxLengthQueue() {

142

return QueueBuilder.durable("max.length.queue")

143

.withArgument("x-max-length", 1000)

144

.withArgument("x-overflow", "reject-publish")

145

.build();

146

}

147

148

@Bean

149

public Queue lazyQueue() {

150

return QueueBuilder.durable("lazy.queue")

151

.lazy()

152

.build();

153

}

154

155

@Bean

156

public Queue quorumQueue() {

157

return QueueBuilder.durable("quorum.queue")

158

.quorum()

159

.build();

160

}

161

}

162

```

163

164

### Queue Arguments and Properties

165

166

```java { .api }

167

@Configuration

168

public class QueueArgumentsConfig {

169

170

@Bean

171

public Queue queueWithArguments() {

172

Map<String, Object> arguments = new HashMap<>();

173

arguments.put("x-message-ttl", 600000); // 10 minutes TTL

174

arguments.put("x-expires", 1800000); // Queue expires after 30 minutes

175

arguments.put("x-max-length", 1000); // Max 1000 messages

176

arguments.put("x-max-length-bytes", 1048576); // Max 1MB

177

arguments.put("x-overflow", "reject-publish"); // Reject when full

178

arguments.put("x-dead-letter-exchange", "dlx"); // Dead letter exchange

179

arguments.put("x-dead-letter-routing-key", "failed"); // Dead letter routing key

180

arguments.put("x-max-priority", 10); // Priority queue

181

arguments.put("x-queue-mode", "lazy"); // Lazy queue

182

183

return new Queue("configured.queue", true, false, false, arguments);

184

}

185

}

186

```

187

188

## Exchange Declaration

189

190

### Exchange Types

191

192

```java { .api }

193

@Configuration

194

public class ExchangeConfig {

195

196

// Direct exchange

197

@Bean

198

public DirectExchange directExchange() {

199

return new DirectExchange("direct.exchange", true, false);

200

}

201

202

// Topic exchange

203

@Bean

204

public TopicExchange topicExchange() {

205

return new TopicExchange("topic.exchange", true, false);

206

}

207

208

// Fanout exchange

209

@Bean

210

public FanoutExchange fanoutExchange() {

211

return new FanoutExchange("fanout.exchange", true, false);

212

}

213

214

// Headers exchange

215

@Bean

216

public HeadersExchange headersExchange() {

217

return new HeadersExchange("headers.exchange", true, false);

218

}

219

220

// Custom exchange

221

@Bean

222

public CustomExchange customExchange() {

223

return new CustomExchange("custom.exchange", "x-delayed-message", true, false);

224

}

225

}

226

```

227

228

### Exchange Builder Pattern

229

230

```java { .api }

231

@Configuration

232

public class AdvancedExchangeConfig {

233

234

@Bean

235

public TopicExchange durableTopicExchange() {

236

return ExchangeBuilder.topicExchange("durable.topic")

237

.durable(true)

238

.build();

239

}

240

241

@Bean

242

public DirectExchange autoDeleteExchange() {

243

return ExchangeBuilder.directExchange("auto.delete.direct")

244

.autoDelete()

245

.build();

246

}

247

248

@Bean

249

public CustomExchange delayedExchange() {

250

return ExchangeBuilder.directExchange("delayed.exchange")

251

.delayed()

252

.build();

253

}

254

255

@Bean

256

public TopicExchange exchangeWithArguments() {

257

return ExchangeBuilder.topicExchange("args.topic")

258

.withArgument("x-delayed-type", "direct")

259

.build();

260

}

261

}

262

```

263

264

### Exchange Arguments

265

266

```java { .api }

267

@Configuration

268

public class ExchangeArgumentsConfig {

269

270

@Bean

271

public CustomExchange delayedMessageExchange() {

272

Map<String, Object> arguments = new HashMap<>();

273

arguments.put("x-delayed-type", "direct");

274

275

return new CustomExchange("delayed.messages", "x-delayed-message",

276

true, false, arguments);

277

}

278

279

@Bean

280

public TopicExchange alternateExchange() {

281

Map<String, Object> arguments = new HashMap<>();

282

arguments.put("alternate-exchange", "alternate.exchange");

283

284

return new TopicExchange("main.topic", true, false, arguments);

285

}

286

}

287

```

288

289

## Binding Configuration

290

291

### Basic Bindings

292

293

```java { .api }

294

@Configuration

295

public class BindingConfig {

296

297

// Direct exchange binding

298

@Bean

299

public Binding directBinding(Queue directQueue, DirectExchange directExchange) {

300

return BindingBuilder.bind(directQueue).to(directExchange).with("direct.key");

301

}

302

303

// Topic exchange bindings

304

@Bean

305

public Binding topicBinding1(Queue topicQueue, TopicExchange topicExchange) {

306

return BindingBuilder.bind(topicQueue).to(topicExchange).with("order.*");

307

}

308

309

@Bean

310

public Binding topicBinding2(Queue topicQueue, TopicExchange topicExchange) {

311

return BindingBuilder.bind(topicQueue).to(topicExchange).with("user.#");

312

}

313

314

// Fanout exchange binding (no routing key needed)

315

@Bean

316

public Binding fanoutBinding(Queue fanoutQueue, FanoutExchange fanoutExchange) {

317

return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);

318

}

319

320

// Headers exchange binding

321

@Bean

322

public Binding headersBinding(Queue headersQueue, HeadersExchange headersExchange) {

323

return BindingBuilder.bind(headersQueue).to(headersExchange)

324

.where("format").matches("pdf").and("type").matches("report");

325

}

326

}

327

```

328

329

### Advanced Binding Patterns

330

331

```java { .api }

332

@Configuration

333

public class AdvancedBindingConfig {

334

335

// Multiple bindings for one queue

336

@Bean

337

public List<Binding> multipleBindings(Queue multiQueue, TopicExchange topicExchange) {

338

return Arrays.asList(

339

BindingBuilder.bind(multiQueue).to(topicExchange).with("order.created"),

340

BindingBuilder.bind(multiQueue).to(topicExchange).with("order.updated"),

341

BindingBuilder.bind(multiQueue).to(topicExchange).with("order.cancelled")

342

);

343

}

344

345

// Exchange to exchange binding

346

@Bean

347

public Binding exchangeBinding(TopicExchange sourceExchange, DirectExchange targetExchange) {

348

return BindingBuilder.bind(targetExchange).to(sourceExchange).with("important.*");

349

}

350

351

// Headers binding with all match

352

@Bean

353

public Binding headersAllBinding(Queue headersQueue, HeadersExchange headersExchange) {

354

Map<String, Object> headers = new HashMap<>();

355

headers.put("x-match", "all");

356

headers.put("format", "pdf");

357

headers.put("type", "report");

358

359

return BindingBuilder.bind(headersQueue).to(headersExchange).whereAll(headers).match();

360

}

361

362

// Headers binding with any match

363

@Bean

364

public Binding headersAnyBinding(Queue headersQueue, HeadersExchange headersExchange) {

365

Map<String, Object> headers = new HashMap<>();

366

headers.put("format", "pdf");

367

headers.put("urgent", true);

368

369

return BindingBuilder.bind(headersQueue).to(headersExchange).whereAny(headers).match();

370

}

371

}

372

```

373

374

## Declarative Configuration

375

376

### Complete Topology Declaration

377

378

```java { .api }

379

@Configuration

380

public class CompleteTopologyConfig {

381

382

// Exchanges

383

@Bean

384

public TopicExchange ordersExchange() {

385

return ExchangeBuilder.topicExchange("orders.exchange").durable(true).build();

386

}

387

388

@Bean

389

public DirectExchange notificationsExchange() {

390

return ExchangeBuilder.directExchange("notifications.exchange").durable(true).build();

391

}

392

393

@Bean

394

public DirectExchange dlxExchange() {

395

return ExchangeBuilder.directExchange("dlx.exchange").durable(true).build();

396

}

397

398

// Queues

399

@Bean

400

public Queue orderProcessingQueue() {

401

return QueueBuilder.durable("order.processing")

402

.withArgument("x-dead-letter-exchange", "dlx.exchange")

403

.withArgument("x-dead-letter-routing-key", "order.failed")

404

.build();

405

}

406

407

@Bean

408

public Queue orderNotificationQueue() {

409

return QueueBuilder.durable("order.notifications")

410

.withArgument("x-message-ttl", 300000)

411

.build();

412

}

413

414

@Bean

415

public Queue deadLetterQueue() {

416

return QueueBuilder.durable("dead.letters").build();

417

}

418

419

// Bindings

420

@Bean

421

public Binding orderProcessingBinding() {

422

return BindingBuilder.bind(orderProcessingQueue())

423

.to(ordersExchange())

424

.with("order.created");

425

}

426

427

@Bean

428

public Binding orderNotificationBinding() {

429

return BindingBuilder.bind(orderNotificationQueue())

430

.to(notificationsExchange())

431

.with("order.notification");

432

}

433

434

@Bean

435

public Binding deadLetterBinding() {

436

return BindingBuilder.bind(deadLetterQueue())

437

.to(dlxExchange())

438

.with("order.failed");

439

}

440

}

441

```

442

443

### Conditional Topology

444

445

```java { .api }

446

@Configuration

447

public class ConditionalTopologyConfig {

448

449

@Bean

450

@ConditionalOnProperty(name = "messaging.queues.priority.enabled", havingValue = "true")

451

public Queue priorityQueue() {

452

return QueueBuilder.durable("priority.processing")

453

.withArgument("x-max-priority", 10)

454

.build();

455

}

456

457

@Bean

458

@Profile("development")

459

public Queue devQueue() {

460

return QueueBuilder.nonDurable("dev.testing").autoDelete().build();

461

}

462

463

@Bean

464

@Profile("production")

465

public Queue prodQueue() {

466

return QueueBuilder.durable("prod.processing")

467

.withArgument("x-queue-mode", "lazy")

468

.build();

469

}

470

}

471

```

472

473

## Dynamic Topology Management

474

475

### Runtime Queue Creation

476

477

```java { .api }

478

@Service

479

public class DynamicTopologyService {

480

481

@Autowired

482

private RabbitAdmin rabbitAdmin;

483

484

public void createTenantQueue(String tenantId) {

485

String queueName = "tenant." + tenantId + ".events";

486

487

Queue queue = QueueBuilder.durable(queueName)

488

.withArgument("x-message-ttl", 3600000) // 1 hour TTL

489

.build();

490

491

rabbitAdmin.declareQueue(queue);

492

493

// Create binding

494

TopicExchange exchange = new TopicExchange("tenant.events");

495

Binding binding = BindingBuilder.bind(queue)

496

.to(exchange)

497

.with("tenant." + tenantId + ".*");

498

499

rabbitAdmin.declareBinding(binding);

500

}

501

502

public void deleteTenantQueue(String tenantId) {

503

String queueName = "tenant." + tenantId + ".events";

504

rabbitAdmin.deleteQueue(queueName);

505

}

506

507

public void createTemporaryReplyQueue(String sessionId) {

508

String queueName = "reply." + sessionId;

509

510

Queue replyQueue = QueueBuilder.nonDurable(queueName)

511

.autoDelete()

512

.exclusive()

513

.withArgument("x-expires", 300000) // 5 minutes

514

.build();

515

516

rabbitAdmin.declareQueue(replyQueue);

517

}

518

}

519

```

520

521

### Topology Templates

522

523

```java { .api }

524

@Component

525

public class TopologyTemplates {

526

527

@Autowired

528

private RabbitAdmin rabbitAdmin;

529

530

public void createStandardTopology(String domain) {

531

// Create exchanges

532

TopicExchange domainExchange = new TopicExchange(domain + ".exchange", true, false);

533

DirectExchange dlxExchange = new DirectExchange(domain + ".dlx", true, false);

534

535

rabbitAdmin.declareExchange(domainExchange);

536

rabbitAdmin.declareExchange(dlxExchange);

537

538

// Create queues

539

Queue eventQueue = QueueBuilder.durable(domain + ".events")

540

.withArgument("x-dead-letter-exchange", domain + ".dlx")

541

.build();

542

543

Queue commandQueue = QueueBuilder.durable(domain + ".commands")

544

.withArgument("x-dead-letter-exchange", domain + ".dlx")

545

.build();

546

547

Queue deadLetterQueue = QueueBuilder.durable(domain + ".dead.letters").build();

548

549

rabbitAdmin.declareQueue(eventQueue);

550

rabbitAdmin.declareQueue(commandQueue);

551

rabbitAdmin.declareQueue(deadLetterQueue);

552

553

// Create bindings

554

Binding eventBinding = BindingBuilder.bind(eventQueue)

555

.to(domainExchange).with(domain + ".event.*");

556

557

Binding commandBinding = BindingBuilder.bind(commandQueue)

558

.to(domainExchange).with(domain + ".command.*");

559

560

Binding dlxBinding = BindingBuilder.bind(deadLetterQueue)

561

.to(dlxExchange).with("#");

562

563

rabbitAdmin.declareBinding(eventBinding);

564

rabbitAdmin.declareBinding(commandBinding);

565

rabbitAdmin.declareBinding(dlxBinding);

566

}

567

}

568

```

569

570

## Queue and Exchange Properties

571

572

### Queue Information

573

574

```java { .api }

575

@Service

576

public class QueueInspectionService {

577

578

@Autowired

579

private RabbitAdmin rabbitAdmin;

580

581

public Properties getQueueProperties(String queueName) {

582

return rabbitAdmin.getQueueProperties(queueName);

583

}

584

585

public int getQueueMessageCount(String queueName) {

586

Properties props = rabbitAdmin.getQueueProperties(queueName);

587

return props != null ? (Integer) props.get(RabbitAdmin.QUEUE_MESSAGE_COUNT) : -1;

588

}

589

590

public int getQueueConsumerCount(String queueName) {

591

Properties props = rabbitAdmin.getQueueProperties(queueName);

592

return props != null ? (Integer) props.get(RabbitAdmin.QUEUE_CONSUMER_COUNT) : -1;

593

}

594

595

public void purgeQueueIfEmpty(String queueName) {

596

Properties props = rabbitAdmin.getQueueProperties(queueName);

597

if (props != null && (Integer) props.get(RabbitAdmin.QUEUE_MESSAGE_COUNT) == 0) {

598

rabbitAdmin.purgeQueue(queueName, false);

599

}

600

}

601

}

602

```

603

604

### Queue Management Operations

605

606

```java { .api }

607

@Service

608

public class QueueManagementService {

609

610

@Autowired

611

private RabbitAdmin rabbitAdmin;

612

613

public boolean queueExists(String queueName) {

614

Properties props = rabbitAdmin.getQueueProperties(queueName);

615

return props != null;

616

}

617

618

public void declareQueueIfNotExists(String queueName) {

619

if (!queueExists(queueName)) {

620

Queue queue = QueueBuilder.durable(queueName).build();

621

rabbitAdmin.declareQueue(queue);

622

}

623

}

624

625

public void deleteQueueIfEmpty(String queueName) {

626

Properties props = rabbitAdmin.getQueueProperties(queueName);

627

if (props != null && (Integer) props.get(RabbitAdmin.QUEUE_MESSAGE_COUNT) == 0) {

628

rabbitAdmin.deleteQueue(queueName, false, true); // unused=false, empty=true

629

}

630

}

631

632

public void purgeAllQueues(String... queueNames) {

633

for (String queueName : queueNames) {

634

rabbitAdmin.purgeQueue(queueName, false);

635

}

636

}

637

}