The RabbitMQ Java client library allows Java applications to interface with RabbitMQ message broker servers
—
Interfaces and implementations for consuming messages asynchronously with callbacks. The Consumer API provides both functional callback interfaces and full Consumer implementations for handling message delivery, cancellation, and shutdown events.
Modern callback interfaces for handling message delivery and consumer events.
/**
* Callback interface for message delivery
*/
@FunctionalInterface
public interface DeliverCallback {
/**
* Called when a message is delivered to the consumer
* @param consumerTag - Consumer identifier
* @param delivery - Message delivery information
*/
void handle(String consumerTag, Delivery delivery) throws IOException;
}
/**
* Callback interface for consumer cancellation
*/
@FunctionalInterface
public interface CancelCallback {
/**
* Called when the consumer is cancelled
* @param consumerTag - Consumer identifier that was cancelled
*/
void handle(String consumerTag) throws IOException;
}
/**
* Callback interface for consumer shutdown signals
*/
@FunctionalInterface
public interface ConsumerShutdownSignalCallback {
/**
* Called when the consumer receives a shutdown signal
* @param consumerTag - Consumer identifier
* @param sig - Shutdown signal with reason and details
*/
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
}Usage Examples:
// Lambda-based message handling
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("[" + consumerTag + "] Received: " + message);
// Process based on message properties
AMQP.BasicProperties props = delivery.getProperties();
if ("urgent".equals(props.getType())) {
handleUrgentMessage(message);
} else {
handleRegularMessage(message);
}
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("Consumer " + consumerTag + " was cancelled");
// Cleanup resources
cleanupConsumerResources(consumerTag);
};
ConsumerShutdownSignalCallback shutdownCallback = (consumerTag, sig) -> {
System.out.println("Consumer " + consumerTag + " shutdown: " + sig.getReason());
if (!sig.isInitiatedByApplication()) {
// Handle unexpected shutdown
scheduleReconnection();
}
};
// Start consuming with callbacks
String consumerTag = channel.basicConsume("work.queue", false,
deliverCallback, cancelCallback, shutdownCallback);// Method reference usage
public class MessageProcessor {
public void handleDelivery(String consumerTag, Delivery delivery) throws IOException {
// Process message
String message = new String(delivery.getBody(), "UTF-8");
processBusinessLogic(message);
// Manual acknowledgment
Channel channel = ((Consumer) this).getChannel();
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
public void handleCancel(String consumerTag) throws IOException {
System.out.println("Consumer cancelled: " + consumerTag);
}
}
MessageProcessor processor = new MessageProcessor();
channel.basicConsume("queue", false, processor::handleDelivery, processor::handleCancel);Full consumer interface for handling all consumer events.
/**
* Interface for implementing message consumers
*/
public interface Consumer {
/**
* Called when a message is delivered
* @param consumerTag - Consumer identifier
* @param envelope - Delivery envelope with routing info
* @param properties - Message properties
* @param body - Message body
*/
void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException;
/**
* Called when the consumer is cancelled by the server
* @param consumerTag - Consumer identifier
*/
void handleCancel(String consumerTag) throws IOException;
/**
* Called when basicCancel is called
* @param consumerTag - Consumer identifier
*/
void handleCancelOk(String consumerTag);
/**
* Called when the consumer is registered
* @param consumerTag - Consumer identifier
*/
void handleConsumeOk(String consumerTag);
/**
* Called when basicRecover is called
* @param consumerTag - Consumer identifier
*/
void handleRecoverOk(String consumerTag);
/**
* Called when a shutdown signal is received
* @param consumerTag - Consumer identifier
* @param sig - Shutdown signal
*/
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
/**
* Get the consumer tag
* @return Consumer identifier
*/
String getConsumerTag();
}Default implementation of the Consumer interface providing base functionality.
/**
* Default implementation of Consumer interface.
* Extends this class and override methods as needed.
*/
public class DefaultConsumer implements Consumer {
/**
* Constructor taking a channel
* @param channel - Channel for this consumer
*/
public DefaultConsumer(Channel channel);
/**
* Get the channel this consumer is associated with
* @return Channel instance
*/
public Channel getChannel();
/**
* Get the consumer tag
* @return Consumer tag string
*/
public String getConsumerTag();
/**
* Set the consumer tag (called by the library)
* @param consumerTag - Consumer identifier
*/
public void setConsumerTag(String consumerTag);
// Default implementations of Consumer interface methods
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException;
public void handleCancel(String consumerTag) throws IOException;
public void handleCancelOk(String consumerTag);
public void handleConsumeOk(String consumerTag);
public void handleRecoverOk(String consumerTag);
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
}Usage Examples:
// Custom consumer extending DefaultConsumer
public class WorkQueueConsumer extends DefaultConsumer {
public WorkQueueConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
try {
// Simulate work
processWork(message);
// Acknowledge successful processing
getChannel().basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
System.err.println("Error processing message: " + e.getMessage());
// Reject and requeue for retry
getChannel().basicNack(envelope.getDeliveryTag(), false, true);
}
}
@Override
public void handleCancel(String consumerTag) throws IOException {
System.out.println("Consumer cancelled: " + consumerTag);
// Perform cleanup
cleanup();
}
@Override
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
System.out.println("Consumer shutdown: " + sig.getReason());
if (!sig.isInitiatedByApplication()) {
// Handle unexpected shutdown
reconnect();
}
}
private void processWork(String message) throws Exception {
// Business logic here
Thread.sleep(1000); // Simulate processing time
System.out.println("Processed: " + message);
}
private void cleanup() {
// Cleanup resources
}
private void reconnect() {
// Reconnection logic
}
}
// Use the custom consumer
Channel channel = connection.createChannel();
WorkQueueConsumer consumer = new WorkQueueConsumer(channel);
channel.basicConsume("work.queue", false, consumer);// Simple consumer for logging messages
public class LoggingConsumer extends DefaultConsumer {
public LoggingConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
System.out.printf("[%s] %s->%s: %s%n",
consumerTag, exchange, routingKey, message);
// Auto-acknowledge for logging (fire-and-forget)
getChannel().basicAck(envelope.getDeliveryTag(), false);
}
}
channel.basicConsume("logs.queue", false, new LoggingConsumer(channel));/**
* Exception thrown when a consumer is cancelled
*/
public class ConsumerCancelledException extends RuntimeException {
public ConsumerCancelledException();
}Usage Examples:
// Handling consumer cancellation in application code
try {
// Consumer processing logic
while (isRunning) {
// Process messages or wait
Thread.sleep(1000);
}
} catch (ConsumerCancelledException e) {
System.out.println("Consumer was cancelled");
// Restart consumer or exit gracefully
restartConsumer();
}Multi-threaded Consumer:
public class ThreadedConsumer extends DefaultConsumer {
private final ExecutorService executor;
public ThreadedConsumer(Channel channel, ExecutorService executor) {
super(channel);
this.executor = executor;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// Submit to thread pool for processing
executor.submit(() -> {
try {
String message = new String(body, "UTF-8");
processMessage(message);
// Acknowledge in the callback thread
getChannel().basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
try {
getChannel().basicNack(envelope.getDeliveryTag(), false, true);
} catch (IOException ex) {
System.err.println("Failed to nack message: " + ex.getMessage());
}
}
});
}
}Retry Logic Consumer:
public class RetryConsumer extends DefaultConsumer {
private static final int MAX_RETRIES = 3;
public RetryConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
Map<String, Object> headers = properties.getHeaders();
int retryCount = headers != null && headers.containsKey("x-retry-count") ?
(Integer) headers.get("x-retry-count") : 0;
try {
String message = new String(body, "UTF-8");
processMessage(message);
getChannel().basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
if (retryCount < MAX_RETRIES) {
// Republish with incremented retry count
republishWithRetry(envelope, properties, body, retryCount + 1);
getChannel().basicAck(envelope.getDeliveryTag(), false);
} else {
// Send to dead letter queue or discard
getChannel().basicNack(envelope.getDeliveryTag(), false, false);
}
}
}
private void republishWithRetry(Envelope envelope, AMQP.BasicProperties props,
byte[] body, int retryCount) throws IOException {
Map<String, Object> headers = new HashMap<>(props.getHeaders() != null ? props.getHeaders() : new HashMap<>());
headers.put("x-retry-count", retryCount);
AMQP.BasicProperties newProps = new AMQP.BasicProperties.Builder()
.headers(headers)
.contentType(props.getContentType())
.deliveryMode(props.getDeliveryMode())
.build();
// Republish to retry exchange/queue
getChannel().basicPublish("retry.exchange", envelope.getRoutingKey(), newProps, body);
}
}// Consumer tag management
public interface ConsumerTagSupplier {
String get();
}
// Message delivery data
public class Delivery {
public Envelope getEnvelope();
public AMQP.BasicProperties getProperties();
public byte[] getBody();
}
// Envelope with message routing information
public class Envelope {
public long getDeliveryTag();
public boolean isRedeliver();
public String getExchange();
public String getRoutingKey();
}Install with Tessl CLI
npx tessl i tessl/maven-com-rabbitmq--amqp-client