The RabbitMQ Java client library allows Java applications to interface with RabbitMQ message broker servers
—
Operations for consuming messages from queues and managing queue topology. Consuming allows applications to receive messages from RabbitMQ queues either by pushing messages to consumers or by polling for messages.
Operations for declaring, deleting, binding, and purging queues.
/**
* Declare a queue with default settings
* @param queue - Queue name (empty string for server-generated name)
* @return Queue.DeclareOk with queue information
*/
AMQP.Queue.DeclareOk queueDeclare(String queue) throws IOException;
/**
* Declare a queue with full options
* @param queue - Queue name (empty string for server-generated name)
* @param durable - Queue survives server restarts
* @param exclusive - Queue is exclusive to this connection
* @param autoDelete - Queue deleted when no longer in use
* @param arguments - Optional queue arguments
*/
AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
/**
* Declare a queue passively (check if exists without creating)
* @param queue - Queue name
*/
AMQP.Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
/**
* Delete a queue
* @param queue - Queue name
*/
AMQP.Queue.DeleteOk queueDelete(String queue) throws IOException;
/**
* Delete a queue with conditions
* @param queue - Queue name
* @param ifUnused - Only delete if queue has no consumers
* @param ifEmpty - Only delete if queue is empty
*/
AMQP.Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
/**
* Bind a queue to an exchange
* @param queue - Queue name
* @param exchange - Exchange name
* @param routingKey - Routing key for binding
*/
AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
/**
* Bind queue with arguments
* @param queue - Queue name
* @param exchange - Exchange name
* @param routingKey - Routing key
* @param arguments - Binding arguments
*/
AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
/**
* Unbind a queue from an exchange
* @param queue - Queue name
* @param exchange - Exchange name
* @param routingKey - Routing key to unbind
*/
AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;
/**
* Unbind queue with arguments
* @param queue - Queue name
* @param exchange - Exchange name
* @param routingKey - Routing key
* @param arguments - Binding arguments to match
*/
AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
/**
* Purge all messages from a queue
* @param queue - Queue name
* @return Queue.PurgeOk with message count purged
*/
AMQP.Queue.PurgeOk queuePurge(String queue) throws IOException;Usage Examples:
Channel channel = connection.createChannel();
// Declare different types of queues
channel.queueDeclare("work.queue", true, false, false, null); // Durable work queue
channel.queueDeclare("temp.queue", false, true, true, null); // Temporary exclusive queue
// Server-generated queue name
AMQP.Queue.DeclareOk result = channel.queueDeclare("", false, true, true, null);
String queueName = result.getQueue();
// Queue with arguments (TTL, max length, etc.)
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // Message TTL
args.put("x-max-length", 1000); // Max queue length
args.put("x-overflow", "reject-publish"); // Overflow behavior
channel.queueDeclare("limited.queue", true, false, false, args);// Bind queue to exchanges
channel.queueBind("user.notifications", "user.events", "user.created");
channel.queueBind("user.notifications", "user.events", "user.updated");
// Topic binding with wildcards
channel.queueBind("error.logs", "logs.topic", "*.error.*");
channel.queueBind("all.logs", "logs.topic", "#");
// Headers binding
Map<String, Object> bindingArgs = new HashMap<>();
bindingArgs.put("x-match", "any");
bindingArgs.put("type", "notification");
bindingArgs.put("priority", "high");
channel.queueBind("priority.queue", "headers.exchange", "", bindingArgs);Asynchronous message consumption using callbacks where messages are pushed to consumers.
/**
* Start consuming messages with callbacks
* @param queue - Queue name to consume from
* @param autoAck - Auto-acknowledge messages
* @param deliverCallback - Callback for message delivery
* @param cancelCallback - Callback for consumer cancellation
* @return Consumer tag for managing the consumer
*/
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
/**
* Start consuming with additional options
* @param queue - Queue name
* @param autoAck - Auto-acknowledge messages
* @param consumerTag - Custom consumer tag (empty string for server-generated)
* @param deliverCallback - Message delivery callback
* @param cancelCallback - Consumer cancellation callback
*/
String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
/**
* Start consuming with full options
* @param queue - Queue name
* @param autoAck - Auto-acknowledge messages
* @param consumerTag - Custom consumer tag
* @param noLocal - Don't deliver messages published by this connection
* @param exclusive - Exclusive consumer
* @param arguments - Consumer arguments
* @param deliverCallback - Message delivery callback
* @param cancelCallback - Consumer cancellation callback
*/
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
/**
* Start consuming with Consumer interface
* @param queue - Queue name
* @param consumer - Consumer implementation
*/
String basicConsume(String queue, Consumer consumer) throws IOException;
/**
* Start consuming with Consumer and options
* @param queue - Queue name
* @param autoAck - Auto-acknowledge messages
* @param consumer - Consumer implementation
*/
String basicConsume(String queue, boolean autoAck, Consumer consumer) throws IOException;
/**
* Cancel a consumer
* @param consumerTag - Consumer tag to cancel
*/
AMQP.Basic.CancelOk basicCancel(String consumerTag) throws IOException;Usage Examples:
// Simple callback-based consumer
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Received: " + message);
// Access message metadata
Envelope envelope = delivery.getEnvelope();
System.out.println("Delivery tag: " + envelope.getDeliveryTag());
System.out.println("Exchange: " + envelope.getExchange());
System.out.println("Routing key: " + envelope.getRoutingKey());
// Access message properties
AMQP.BasicProperties props = delivery.getProperties();
if (props.getContentType() != null) {
System.out.println("Content type: " + props.getContentType());
}
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("Consumer cancelled: " + consumerTag);
};
String consumerTag = channel.basicConsume("work.queue", true, deliverCallback, cancelCallback);// Manual acknowledgment consumer
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
String message = new String(delivery.getBody(), "UTF-8");
// Process message
processMessage(message);
// Manually acknowledge successful processing
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
// Reject and requeue on error
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
};
channel.basicConsume("work.queue", false, deliverCallback, cancelCallback);// Consumer with shutdown signal callback
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// Handle delivery
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("Consumer cancelled: " + consumerTag);
};
ConsumerShutdownSignalCallback shutdownCallback = (consumerTag, sig) -> {
System.out.println("Consumer shutdown: " + consumerTag);
};
String consumerTag = channel.basicConsume("queue", true,
deliverCallback, cancelCallback, shutdownCallback);Synchronous message consumption by polling for messages.
/**
* Get a single message from queue (polling)
* @param queue - Queue name
* @param autoAck - Auto-acknowledge the message
* @return GetResponse with message data, or null if no message available
*/
GetResponse basicGet(String queue, boolean autoAck) throws IOException;Usage Examples:
// Poll for messages
while (true) {
GetResponse response = channel.basicGet("work.queue", false);
if (response != null) {
String message = new String(response.getBody(), "UTF-8");
System.out.println("Got message: " + message);
System.out.println("Messages remaining: " + response.getMessageCount());
// Process message
try {
processMessage(message);
// Acknowledge after successful processing
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
// Reject and requeue on error
channel.basicNack(response.getEnvelope().getDeliveryTag(), false, true);
}
} else {
// No messages available
Thread.sleep(1000);
}
}Operations for acknowledging, rejecting, and recovering messages.
/**
* Acknowledge one or more messages
* @param deliveryTag - Delivery tag from the message
* @param multiple - Acknowledge all messages up to and including this delivery tag
*/
void basicAck(long deliveryTag, boolean multiple) throws IOException;
/**
* Reject one or more messages with requeue option
* @param deliveryTag - Delivery tag from the message
* @param multiple - Reject all messages up to and including this delivery tag
* @param requeue - Requeue the rejected messages
*/
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
/**
* Reject a single message
* @param deliveryTag - Delivery tag from the message
* @param requeue - Requeue the rejected message
*/
void basicReject(long deliveryTag, boolean requeue) throws IOException;
/**
* Recover unacknowledged messages (redelivers them)
* @param requeue - Requeue the messages
*/
AMQP.Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
/**
* Recover unacknowledged messages (deprecated)
*/
AMQP.Basic.RecoverOk basicRecover() throws IOException;Usage Examples:
// Single message acknowledgment
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// Acknowledge multiple messages (up to and including the delivery tag)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
// Reject and requeue single message
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
// Reject and discard message (send to dead letter exchange if configured)
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
// Recover all unacknowledged messages in this channel
channel.basicRecover(true);Control message prefetching and processing limits.
/**
* Set QoS parameters for the channel
* @param prefetchCount - Maximum number of unacknowledged messages
*/
void basicQos(int prefetchCount) throws IOException;
/**
* Set QoS with size limit
* @param prefetchSize - Maximum size of unacknowledged messages (0 = no limit)
* @param prefetchCount - Maximum number of unacknowledged messages
* @param global - Apply to entire connection vs. just this channel
*/
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;Usage Examples:
// Limit to processing 1 message at a time (useful for work queues)
channel.basicQos(1);
// Limit to 10 messages at a time
channel.basicQos(10);
// Set prefetch count and size limits
channel.basicQos(0, 5, false); // Max 5 messages, no size limit, per-channel// Queue operation results
public static class AMQP.Queue {
public static class DeclareOk {
public String getQueue(); // Queue name (useful for server-generated names)
public int getMessageCount(); // Current message count
public int getConsumerCount(); // Current consumer count
}
public static class DeleteOk {
public int getMessageCount(); // Number of messages deleted
}
public static class BindOk {
// Confirmation of queue binding
}
public static class UnbindOk {
// Confirmation of queue unbinding
}
public static class PurgeOk {
public int getMessageCount(); // Number of messages purged
}
}
// Basic operation results
public static class AMQP.Basic {
public static class CancelOk {
public String getConsumerTag();
}
public static class RecoverOk {
// Confirmation of message recovery
}
}
// Message delivery information
public class Delivery {
public Envelope getEnvelope();
public AMQP.BasicProperties getProperties();
public byte[] getBody();
}
// Message envelope with routing information
public class Envelope {
public long getDeliveryTag();
public boolean isRedeliver();
public String getExchange();
public String getRoutingKey();
}
// Response from basicGet operation
public class GetResponse {
public Envelope getEnvelope();
public AMQP.BasicProperties getProps();
public byte[] getBody();
public int getMessageCount(); // Remaining messages in queue
}Install with Tessl CLI
npx tessl i tessl/maven-com-rabbitmq--amqp-client