0
# Message Consuming
1
2
Operations for consuming messages from queues and managing queue topology. Consuming allows applications to receive messages from RabbitMQ queues either by pushing messages to consumers or by polling for messages.
3
4
## Capabilities
5
6
### Queue Management
7
8
Operations for declaring, deleting, binding, and purging queues.
9
10
```java { .api }
11
/**
12
* Declare a queue with default settings
13
* @param queue - Queue name (empty string for server-generated name)
14
* @return Queue.DeclareOk with queue information
15
*/
16
AMQP.Queue.DeclareOk queueDeclare(String queue) throws IOException;
17
18
/**
19
* Declare a queue with full options
20
* @param queue - Queue name (empty string for server-generated name)
21
* @param durable - Queue survives server restarts
22
* @param exclusive - Queue is exclusive to this connection
23
* @param autoDelete - Queue deleted when no longer in use
24
* @param arguments - Optional queue arguments
25
*/
26
AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
27
28
/**
29
* Declare a queue passively (check if exists without creating)
30
* @param queue - Queue name
31
*/
32
AMQP.Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
33
34
/**
35
* Delete a queue
36
* @param queue - Queue name
37
*/
38
AMQP.Queue.DeleteOk queueDelete(String queue) throws IOException;
39
40
/**
41
* Delete a queue with conditions
42
* @param queue - Queue name
43
* @param ifUnused - Only delete if queue has no consumers
44
* @param ifEmpty - Only delete if queue is empty
45
*/
46
AMQP.Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
47
48
/**
49
* Bind a queue to an exchange
50
* @param queue - Queue name
51
* @param exchange - Exchange name
52
* @param routingKey - Routing key for binding
53
*/
54
AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
55
56
/**
57
* Bind queue with arguments
58
* @param queue - Queue name
59
* @param exchange - Exchange name
60
* @param routingKey - Routing key
61
* @param arguments - Binding arguments
62
*/
63
AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
64
65
/**
66
* Unbind a queue from an exchange
67
* @param queue - Queue name
68
* @param exchange - Exchange name
69
* @param routingKey - Routing key to unbind
70
*/
71
AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;
72
73
/**
74
* Unbind queue with arguments
75
* @param queue - Queue name
76
* @param exchange - Exchange name
77
* @param routingKey - Routing key
78
* @param arguments - Binding arguments to match
79
*/
80
AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
81
82
/**
83
* Purge all messages from a queue
84
* @param queue - Queue name
85
* @return Queue.PurgeOk with message count purged
86
*/
87
AMQP.Queue.PurgeOk queuePurge(String queue) throws IOException;
88
```
89
90
**Usage Examples:**
91
92
```java
93
Channel channel = connection.createChannel();
94
95
// Declare different types of queues
96
channel.queueDeclare("work.queue", true, false, false, null); // Durable work queue
97
channel.queueDeclare("temp.queue", false, true, true, null); // Temporary exclusive queue
98
99
// Server-generated queue name
100
AMQP.Queue.DeclareOk result = channel.queueDeclare("", false, true, true, null);
101
String queueName = result.getQueue();
102
103
// Queue with arguments (TTL, max length, etc.)
104
Map<String, Object> args = new HashMap<>();
105
args.put("x-message-ttl", 60000); // Message TTL
106
args.put("x-max-length", 1000); // Max queue length
107
args.put("x-overflow", "reject-publish"); // Overflow behavior
108
channel.queueDeclare("limited.queue", true, false, false, args);
109
```
110
111
```java
112
// Bind queue to exchanges
113
channel.queueBind("user.notifications", "user.events", "user.created");
114
channel.queueBind("user.notifications", "user.events", "user.updated");
115
116
// Topic binding with wildcards
117
channel.queueBind("error.logs", "logs.topic", "*.error.*");
118
channel.queueBind("all.logs", "logs.topic", "#");
119
120
// Headers binding
121
Map<String, Object> bindingArgs = new HashMap<>();
122
bindingArgs.put("x-match", "any");
123
bindingArgs.put("type", "notification");
124
bindingArgs.put("priority", "high");
125
channel.queueBind("priority.queue", "headers.exchange", "", bindingArgs);
126
```
127
128
### Push-Based Consuming
129
130
Asynchronous message consumption using callbacks where messages are pushed to consumers.
131
132
```java { .api }
133
/**
134
* Start consuming messages with callbacks
135
* @param queue - Queue name to consume from
136
* @param autoAck - Auto-acknowledge messages
137
* @param deliverCallback - Callback for message delivery
138
* @param cancelCallback - Callback for consumer cancellation
139
* @return Consumer tag for managing the consumer
140
*/
141
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
142
143
/**
144
* Start consuming with additional options
145
* @param queue - Queue name
146
* @param autoAck - Auto-acknowledge messages
147
* @param consumerTag - Custom consumer tag (empty string for server-generated)
148
* @param deliverCallback - Message delivery callback
149
* @param cancelCallback - Consumer cancellation callback
150
*/
151
String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
152
153
/**
154
* Start consuming with full options
155
* @param queue - Queue name
156
* @param autoAck - Auto-acknowledge messages
157
* @param consumerTag - Custom consumer tag
158
* @param noLocal - Don't deliver messages published by this connection
159
* @param exclusive - Exclusive consumer
160
* @param arguments - Consumer arguments
161
* @param deliverCallback - Message delivery callback
162
* @param cancelCallback - Consumer cancellation callback
163
*/
164
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
165
166
/**
167
* Start consuming with Consumer interface
168
* @param queue - Queue name
169
* @param consumer - Consumer implementation
170
*/
171
String basicConsume(String queue, Consumer consumer) throws IOException;
172
173
/**
174
* Start consuming with Consumer and options
175
* @param queue - Queue name
176
* @param autoAck - Auto-acknowledge messages
177
* @param consumer - Consumer implementation
178
*/
179
String basicConsume(String queue, boolean autoAck, Consumer consumer) throws IOException;
180
181
/**
182
* Cancel a consumer
183
* @param consumerTag - Consumer tag to cancel
184
*/
185
AMQP.Basic.CancelOk basicCancel(String consumerTag) throws IOException;
186
```
187
188
**Usage Examples:**
189
190
```java
191
// Simple callback-based consumer
192
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
193
String message = new String(delivery.getBody(), "UTF-8");
194
System.out.println("Received: " + message);
195
196
// Access message metadata
197
Envelope envelope = delivery.getEnvelope();
198
System.out.println("Delivery tag: " + envelope.getDeliveryTag());
199
System.out.println("Exchange: " + envelope.getExchange());
200
System.out.println("Routing key: " + envelope.getRoutingKey());
201
202
// Access message properties
203
AMQP.BasicProperties props = delivery.getProperties();
204
if (props.getContentType() != null) {
205
System.out.println("Content type: " + props.getContentType());
206
}
207
};
208
209
CancelCallback cancelCallback = consumerTag -> {
210
System.out.println("Consumer cancelled: " + consumerTag);
211
};
212
213
String consumerTag = channel.basicConsume("work.queue", true, deliverCallback, cancelCallback);
214
```
215
216
```java
217
// Manual acknowledgment consumer
218
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
219
try {
220
String message = new String(delivery.getBody(), "UTF-8");
221
222
// Process message
223
processMessage(message);
224
225
// Manually acknowledge successful processing
226
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
227
228
} catch (Exception e) {
229
// Reject and requeue on error
230
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
231
}
232
};
233
234
channel.basicConsume("work.queue", false, deliverCallback, cancelCallback);
235
```
236
237
```java
238
// Consumer with shutdown signal callback
239
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
240
// Handle delivery
241
};
242
243
CancelCallback cancelCallback = consumerTag -> {
244
System.out.println("Consumer cancelled: " + consumerTag);
245
};
246
247
ConsumerShutdownSignalCallback shutdownCallback = (consumerTag, sig) -> {
248
System.out.println("Consumer shutdown: " + consumerTag);
249
};
250
251
String consumerTag = channel.basicConsume("queue", true,
252
deliverCallback, cancelCallback, shutdownCallback);
253
```
254
255
### Pull-Based Consuming
256
257
Synchronous message consumption by polling for messages.
258
259
```java { .api }
260
/**
261
* Get a single message from queue (polling)
262
* @param queue - Queue name
263
* @param autoAck - Auto-acknowledge the message
264
* @return GetResponse with message data, or null if no message available
265
*/
266
GetResponse basicGet(String queue, boolean autoAck) throws IOException;
267
```
268
269
**Usage Examples:**
270
271
```java
272
// Poll for messages
273
while (true) {
274
GetResponse response = channel.basicGet("work.queue", false);
275
276
if (response != null) {
277
String message = new String(response.getBody(), "UTF-8");
278
System.out.println("Got message: " + message);
279
System.out.println("Messages remaining: " + response.getMessageCount());
280
281
// Process message
282
try {
283
processMessage(message);
284
// Acknowledge after successful processing
285
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
286
} catch (Exception e) {
287
// Reject and requeue on error
288
channel.basicNack(response.getEnvelope().getDeliveryTag(), false, true);
289
}
290
} else {
291
// No messages available
292
Thread.sleep(1000);
293
}
294
}
295
```
296
297
### Message Acknowledgment
298
299
Operations for acknowledging, rejecting, and recovering messages.
300
301
```java { .api }
302
/**
303
* Acknowledge one or more messages
304
* @param deliveryTag - Delivery tag from the message
305
* @param multiple - Acknowledge all messages up to and including this delivery tag
306
*/
307
void basicAck(long deliveryTag, boolean multiple) throws IOException;
308
309
/**
310
* Reject one or more messages with requeue option
311
* @param deliveryTag - Delivery tag from the message
312
* @param multiple - Reject all messages up to and including this delivery tag
313
* @param requeue - Requeue the rejected messages
314
*/
315
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
316
317
/**
318
* Reject a single message
319
* @param deliveryTag - Delivery tag from the message
320
* @param requeue - Requeue the rejected message
321
*/
322
void basicReject(long deliveryTag, boolean requeue) throws IOException;
323
324
/**
325
* Recover unacknowledged messages (redelivers them)
326
* @param requeue - Requeue the messages
327
*/
328
AMQP.Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
329
330
/**
331
* Recover unacknowledged messages (deprecated)
332
*/
333
AMQP.Basic.RecoverOk basicRecover() throws IOException;
334
```
335
336
**Usage Examples:**
337
338
```java
339
// Single message acknowledgment
340
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
341
342
// Acknowledge multiple messages (up to and including the delivery tag)
343
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
344
345
// Reject and requeue single message
346
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
347
348
// Reject and discard message (send to dead letter exchange if configured)
349
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
350
351
// Recover all unacknowledged messages in this channel
352
channel.basicRecover(true);
353
```
354
355
### Quality of Service (QoS)
356
357
Control message prefetching and processing limits.
358
359
```java { .api }
360
/**
361
* Set QoS parameters for the channel
362
* @param prefetchCount - Maximum number of unacknowledged messages
363
*/
364
void basicQos(int prefetchCount) throws IOException;
365
366
/**
367
* Set QoS with size limit
368
* @param prefetchSize - Maximum size of unacknowledged messages (0 = no limit)
369
* @param prefetchCount - Maximum number of unacknowledged messages
370
* @param global - Apply to entire connection vs. just this channel
371
*/
372
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
373
```
374
375
**Usage Examples:**
376
377
```java
378
// Limit to processing 1 message at a time (useful for work queues)
379
channel.basicQos(1);
380
381
// Limit to 10 messages at a time
382
channel.basicQos(10);
383
384
// Set prefetch count and size limits
385
channel.basicQos(0, 5, false); // Max 5 messages, no size limit, per-channel
386
```
387
388
## Types
389
390
### Queue and Consuming Types
391
392
```java { .api }
393
// Queue operation results
394
public static class AMQP.Queue {
395
public static class DeclareOk {
396
public String getQueue(); // Queue name (useful for server-generated names)
397
public int getMessageCount(); // Current message count
398
public int getConsumerCount(); // Current consumer count
399
}
400
401
public static class DeleteOk {
402
public int getMessageCount(); // Number of messages deleted
403
}
404
405
public static class BindOk {
406
// Confirmation of queue binding
407
}
408
409
public static class UnbindOk {
410
// Confirmation of queue unbinding
411
}
412
413
public static class PurgeOk {
414
public int getMessageCount(); // Number of messages purged
415
}
416
}
417
418
// Basic operation results
419
public static class AMQP.Basic {
420
public static class CancelOk {
421
public String getConsumerTag();
422
}
423
424
public static class RecoverOk {
425
// Confirmation of message recovery
426
}
427
}
428
429
// Message delivery information
430
public class Delivery {
431
public Envelope getEnvelope();
432
public AMQP.BasicProperties getProperties();
433
public byte[] getBody();
434
}
435
436
// Message envelope with routing information
437
public class Envelope {
438
public long getDeliveryTag();
439
public boolean isRedeliver();
440
public String getExchange();
441
public String getRoutingKey();
442
}
443
444
// Response from basicGet operation
445
public class GetResponse {
446
public Envelope getEnvelope();
447
public AMQP.BasicProperties getProps();
448
public byte[] getBody();
449
public int getMessageCount(); // Remaining messages in queue
450
}
451
```