The RabbitMQ Java client library allows Java applications to interface with RabbitMQ message broker servers
npx @tessl/cli install tessl/maven-com-rabbitmq--amqp-client@5.25.0The RabbitMQ Java Client is a comprehensive library that enables Java applications to communicate with RabbitMQ message broker servers. It provides a complete implementation of the AMQP 0-9-1 protocol, offering APIs for connection management, channel operations, message publishing and consuming, queue and exchange management, and advanced messaging patterns.
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.25.0</version></dependency>import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.*;
// Create connection factory and configure
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
// Create connection and channel
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// Declare a queue
channel.queueDeclare("hello", false, false, false, null);
// Publish a message
String message = "Hello World!";
channel.basicPublish("", "hello", null, message.getBytes("UTF-8"));
// Consume messages
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String receivedMessage = new String(delivery.getBody(), "UTF-8");
System.out.println("Received: " + receivedMessage);
};
channel.basicConsume("hello", true, deliverCallback, consumerTag -> { });
// Clean up
channel.close();
connection.close();The RabbitMQ Java Client is built around several key components:
ConnectionFactory creates and configures connections to RabbitMQ brokersChannel interface provides all AMQP operations (publish, consume, declare, bind, etc.)Core functionality for establishing connections to RabbitMQ brokers and creating channels for AMQP operations.
// Connection factory for creating connections
public class ConnectionFactory implements Cloneable {
public Connection newConnection() throws IOException, TimeoutException;
public void setHost(String host);
public void setPort(int port);
public void setUsername(String username);
public void setPassword(String password);
public void setVirtualHost(String virtualHost);
public void setUri(String uri) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException;
}
// Connection interface
public interface Connection extends Closeable, ShutdownNotifier {
Channel createChannel() throws IOException;
boolean isOpen();
Map<String, Object> getServerProperties();
void addShutdownListener(ShutdownListener listener);
void close() throws IOException;
}
// Channel interface for AMQP operations
public interface Channel extends Closeable, ShutdownNotifier {
int getChannelNumber();
Connection getConnection();
boolean isOpen();
void close() throws IOException, TimeoutException;
}Connection and Channel Management
Operations for publishing messages to exchanges and managing exchange topology.
// Basic message publishing
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
// Exchange operations
Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException;
Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;
Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;Operations for consuming messages from queues and managing queue topology.
// Queue operations
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
Queue.DeleteOk queueDelete(String queue) throws IOException;
Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
// Message consuming
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
GetResponse basicGet(String queue, boolean autoAck) throws IOException;
void basicAck(long deliveryTag, boolean multiple) throws IOException;
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;Interfaces and implementations for consuming messages asynchronously with callbacks.
// Functional interfaces for consumers
@FunctionalInterface
public interface DeliverCallback {
void handle(String consumerTag, Delivery delivery) throws IOException;
}
@FunctionalInterface
public interface CancelCallback {
void handle(String consumerTag) throws IOException;
}
// Consumer interface
public interface Consumer {
void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException;
void handleCancel(String consumerTag) throws IOException;
String getConsumerTag();
}Configuration classes for connection parameters, addressing, authentication, and SSL settings.
// Address and resolver classes
public class Address {
public static Address[] parseAddresses(String addresses) throws IOException;
public String getHost();
public int getPort();
}
public interface AddressResolver {
List<Address> getAddresses() throws IOException;
}
// Authentication configuration
public interface SaslConfig {
SaslMechanism getSaslMechanism(String[] serverMechanisms);
}Exception classes and automatic recovery mechanisms for handling network failures and errors.
// Core exception types
public class ShutdownSignalException extends RuntimeException {
public Object getReason();
public boolean isHardError();
public boolean isInitiatedByApplication();
}
// Recovery interfaces
public interface RecoverableConnection extends Connection {
void addRecoveryListener(RecoveryListener listener);
boolean isOpen();
}
public interface RecoveryListener {
void handleRecovery(Recoverable recoverable);
void handleRecoveryStarted(Recoverable recoverable);
}Mechanisms for reliable message publishing with publisher confirms and handling returned messages.
// Publisher confirms
void confirmSelect() throws IOException;
boolean waitForConfirms() throws InterruptedException;
void addConfirmListener(ConfirmListener listener);
@FunctionalInterface
public interface ConfirmCallback {
void handle(long deliveryTag, boolean multiple) throws IOException;
}
// Returns handling
void addReturnListener(ReturnListener listener);
@FunctionalInterface
public interface ReturnCallback {
void handle(Return returnMessage) throws IOException;
}Publisher Confirms and Returns
Remote Procedure Call (RPC) patterns over AMQP for request-response messaging.
// RPC client for making calls
public class RpcClient {
public RpcClient(Channel channel, String exchange, String routingKey) throws IOException;
public byte[] primitiveCall(byte[] message) throws IOException, ShutdownSignalException, TimeoutException;
public String stringCall(String message) throws IOException, ShutdownSignalException, TimeoutException;
public void close() throws IOException;
}
// RPC server for handling calls
public abstract class RpcServer {
public abstract byte[] handleCall(byte[] requestBody, AMQP.BasicProperties replyProperties);
public void mainloop() throws IOException;
}Interfaces for collecting metrics and integrating with observability systems.
// Metrics collection interface
public interface MetricsCollector {
void newConnection(Connection connection);
void closeConnection(Connection connection);
void newChannel(Channel channel);
void closeChannel(Channel channel);
void basicPublish(Channel channel);
void basicConsume(Channel channel, String queue, boolean autoAck);
}// Message delivery information
public class Delivery {
public Envelope getEnvelope();
public AMQP.BasicProperties getProperties();
public byte[] getBody();
}
// Envelope contains routing information
public class Envelope {
public long getDeliveryTag();
public boolean isRedeliver();
public String getExchange();
public String getRoutingKey();
}
// Message properties
public class AMQP.BasicProperties {
public String getContentType();
public String getContentEncoding();
public Map<String, Object> getHeaders();
public Integer getDeliveryMode();
public Integer getPriority();
public String getCorrelationId();
public String getReplyTo();
public String getExpiration();
public String getMessageId();
public Date getTimestamp();
public String getType();
public String getUserId();
public String getAppId();
}
// Queue information response
public class GetResponse {
public Envelope getEnvelope();
public AMQP.BasicProperties getProps();
public byte[] getBody();
public int getMessageCount();
}
// Returned message information
public class Return {
public int getReplyCode();
public String getReplyText();
public String getExchange();
public String getRoutingKey();
public AMQP.BasicProperties getProperties();
public byte[] getBody();
}