The RabbitMQ Java client library allows Java applications to interface with RabbitMQ message broker servers
—
Exception classes and automatic recovery mechanisms for handling network failures, protocol errors, and application-initiated shutdowns. The RabbitMQ Java client provides comprehensive error handling and automatic recovery capabilities for robust messaging applications.
Base exception classes for different types of errors in AMQP operations.
/**
* Exception indicating a shutdown signal from connection or channel
*/
public class ShutdownSignalException extends RuntimeException {
/**
* Get the shutdown reason object
* @return AMQP method object indicating the reason
*/
public Object getReason();
/**
* Check if this is a hard error (connection-level)
* @return true for connection errors, false for channel errors
*/
public boolean isHardError();
/**
* Check if shutdown was initiated by the application
* @return true if application initiated, false if server initiated
*/
public boolean isInitiatedByApplication();
/**
* Get reference to the connection or channel that was shut down
* @return ShutdownNotifier object (Connection or Channel)
*/
public ShutdownNotifier getReference();
}
/**
* Exception for operations on already closed connections or channels
*/
public class AlreadyClosedException extends ShutdownSignalException {
/**
* Create exception with shutdown signal
* @param shutdownSignalException - Original shutdown signal
*/
public AlreadyClosedException(ShutdownSignalException shutdownSignalException);
}
/**
* Exception for malformed AMQP frames
*/
public class MalformedFrameException extends IOException {
/**
* Create exception with message
* @param message - Error message
*/
public MalformedFrameException(String message);
}
/**
* Exception for missed heartbeats from server
*/
public class MissedHeartbeatException extends IOException {
/**
* Create exception with message
* @param message - Error message
*/
public MissedHeartbeatException(String message);
}Usage Examples:
// Handling shutdown signals
try {
channel.basicPublish("exchange", "key", null, message.getBytes());
} catch (ShutdownSignalException e) {
if (e.isHardError()) {
System.out.println("Connection error: " + e.getReason());
// Handle connection-level error
reconnect();
} else {
System.out.println("Channel error: " + e.getReason());
// Handle channel-level error
reopenChannel();
}
} catch (AlreadyClosedException e) {
System.out.println("Attempted operation on closed resource");
// Recreate connection/channel
recreateResources();
}Exception types for authentication and authorization failures.
/**
* Base class for possible authentication failures
*/
public class PossibleAuthenticationFailureException extends IOException {
/**
* Create exception with message
* @param message - Error message
*/
public PossibleAuthenticationFailureException(String message);
}
/**
* Exception for confirmed authentication failures
*/
public class AuthenticationFailureException extends PossibleAuthenticationFailureException {
/**
* Create exception with message
* @param message - Error message
*/
public AuthenticationFailureException(String message);
}Exceptions for protocol-level errors and unexpected conditions.
/**
* Exception for protocol version mismatches
*/
public class ProtocolVersionMismatchException extends IOException {
/**
* Create exception with version information
* @param clientMajor - Client protocol major version
* @param clientMinor - Client protocol minor version
* @param serverMajor - Server protocol major version
* @param serverMinor - Server protocol minor version
*/
public ProtocolVersionMismatchException(int clientMajor, int clientMinor, int serverMajor, int serverMinor);
public int getClientMajor();
public int getClientMinor();
public int getServerMajor();
public int getServerMinor();
}
/**
* Error for unexpected AMQP frames
*/
public class UnexpectedFrameError extends Error {
/**
* Create error with frame information
* @param frame - Unexpected frame object
* @param expectedFrameType - Expected frame type
*/
public UnexpectedFrameError(Frame frame, int expectedFrameType);
}
/**
* Error for unexpected AMQP methods
*/
public class UnexpectedMethodError extends Error {
/**
* Create error with method information
* @param method - Unexpected method object
*/
public UnexpectedMethodError(Method method);
}
/**
* Exception for unknown class or method IDs
*/
public class UnknownClassOrMethodId extends IOException {
/**
* Create exception with class and method IDs
* @param classId - AMQP class ID
* @param methodId - AMQP method ID
*/
public UnknownClassOrMethodId(int classId, int methodId);
public int getClassId();
public int getMethodId();
}Exceptions specific to channel operations and RPC timeouts.
/**
* Exception for channel RPC operation timeouts
*/
public class ChannelContinuationTimeoutException extends TimeoutException {
/**
* Create timeout exception
*/
public ChannelContinuationTimeoutException();
/**
* Create timeout exception with message
* @param message - Error message
*/
public ChannelContinuationTimeoutException(String message);
}
/**
* Exception for consumer cancellation
*/
public class ConsumerCancelledException extends RuntimeException {
/**
* Create consumer cancellation exception
*/
public ConsumerCancelledException();
}
/**
* Exception for unroutable RPC requests
*/
public class UnroutableRpcRequestException extends IOException {
/**
* Create exception for unroutable RPC request
* @param message - Error message
*/
public UnroutableRpcRequestException(String message);
}Exceptions related to automatic topology recovery.
/**
* Exception during topology recovery process
*/
public class TopologyRecoveryException extends Exception {
/**
* Create recovery exception with cause
* @param cause - Underlying cause of recovery failure
*/
public TopologyRecoveryException(Throwable cause);
/**
* Create recovery exception with message and cause
* @param message - Error message
* @param cause - Underlying cause
*/
public TopologyRecoveryException(String message, Throwable cause);
}Interfaces and classes for automatic connection and topology recovery.
/**
* Interface for objects that support recovery
*/
public interface Recoverable {
/**
* Add a recovery listener
* @param listener - Listener to notify on recovery events
*/
void addRecoveryListener(RecoveryListener listener);
/**
* Remove a recovery listener
* @param listener - Listener to remove
*/
void removeRecoveryListener(RecoveryListener listener);
}
/**
* Listener interface for recovery events
*/
public interface RecoveryListener {
/**
* Called when recovery completes successfully
* @param recoverable - Object that was recovered
*/
void handleRecovery(Recoverable recoverable);
/**
* Called when recovery process starts
* @param recoverable - Object being recovered
*/
void handleRecoveryStarted(Recoverable recoverable);
}
/**
* Interface for handling recovery delays
*/
public interface RecoveryDelayHandler {
/**
* Get delay before next recovery attempt
* @param recoveryAttempts - Number of recovery attempts so far
* @return Delay in milliseconds before next attempt
*/
long getDelay(int recoveryAttempts);
}Usage Examples:
// Custom recovery listener
RecoveryListener recoveryListener = new RecoveryListener() {
@Override
public void handleRecovery(Recoverable recoverable) {
if (recoverable instanceof Connection) {
System.out.println("Connection recovered successfully");
// Notify application components
notifyConnectionRecovered();
} else if (recoverable instanceof Channel) {
System.out.println("Channel recovered successfully");
// Restart consumers if needed
restartConsumers();
}
}
@Override
public void handleRecoveryStarted(Recoverable recoverable) {
System.out.println("Recovery started for: " + recoverable);
// Pause message processing during recovery
pauseProcessing();
}
};
// Add to recoverable connection
ConnectionFactory factory = new ConnectionFactory();
factory.setAutomaticRecoveryEnabled(true);
RecoverableConnection connection = (RecoverableConnection) factory.newConnection();
connection.addRecoveryListener(recoveryListener);// Custom recovery delay handler with exponential backoff
RecoveryDelayHandler delayHandler = new RecoveryDelayHandler() {
@Override
public long getDelay(int recoveryAttempts) {
// Exponential backoff: 1s, 2s, 4s, 8s, max 30s
long delay = Math.min(1000L * (1L << recoveryAttempts), 30000L);
System.out.println("Recovery attempt " + recoveryAttempts + ", waiting " + delay + "ms");
return delay;
}
};
ConnectionFactory factory = new ConnectionFactory();
factory.setRecoveryDelayHandler(delayHandler);
factory.setAutomaticRecoveryEnabled(true);Interface for handling exceptions in consumers and other callback contexts.
/**
* Interface for handling exceptions in consumers and connections
*/
public interface ExceptionHandler {
/**
* Handle unexpected exception in connection driver
* @param conn - Connection where exception occurred
* @param exception - Exception that occurred
*/
void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception);
/**
* Handle exception in return listener
* @param channel - Channel where exception occurred
* @param exception - Exception that occurred
*/
void handleReturnListenerException(Channel channel, Throwable exception);
/**
* Handle exception in flow listener
* @param channel - Channel where exception occurred
* @param exception - Exception that occurred
*/
void handleFlowListenerException(Channel channel, Throwable exception);
/**
* Handle exception in confirm listener
* @param channel - Channel where exception occurred
* @param exception - Exception that occurred
*/
void handleConfirmListenerException(Channel channel, Throwable exception);
/**
* Handle exception in blocked listener
* @param connection - Connection where exception occurred
* @param exception - Exception that occurred
*/
void handleBlockedListenerException(Connection connection, Throwable exception);
/**
* Handle exception in consumer
* @param channel - Channel where exception occurred
* @param exception - Exception that occurred
* @param consumer - Consumer that threw exception
* @param consumerTag - Consumer tag
* @param methodName - Method where exception occurred
*/
void handleConsumerException(Channel channel, Throwable exception, Consumer consumer, String consumerTag, String methodName);
/**
* Handle exception during connection recovery
* @param conn - Connection being recovered
* @param exception - Exception that occurred
*/
void handleConnectionRecoveryException(Connection conn, Throwable exception);
/**
* Handle exception during channel recovery
* @param ch - Channel being recovered
* @param exception - Exception that occurred
*/
void handleChannelRecoveryException(Channel ch, Throwable exception);
/**
* Handle exception during topology recovery
* @param conn - Connection being recovered
* @param ch - Channel being recovered (may be null)
* @param exception - Exception that occurred
*/
void handleTopologyRecoveryException(Connection conn, Channel ch, TopologyRecoveryException exception);
}Usage Examples:
// Custom exception handler with logging and metrics
public class CustomExceptionHandler implements ExceptionHandler {
private static final Logger logger = LoggerFactory.getLogger(CustomExceptionHandler.class);
private final MetricsRegistry metrics;
public CustomExceptionHandler(MetricsRegistry metrics) {
this.metrics = metrics;
}
@Override
public void handleConsumerException(Channel channel, Throwable exception,
Consumer consumer, String consumerTag, String methodName) {
logger.error("Consumer exception in {}: {}", methodName, exception.getMessage(), exception);
metrics.counter("consumer.exceptions").increment();
// Optionally restart consumer
if (exception instanceof RuntimeException) {
restartConsumer(channel, consumer, consumerTag);
}
}
@Override
public void handleConnectionRecoveryException(Connection conn, Throwable exception) {
logger.error("Connection recovery failed: {}", exception.getMessage(), exception);
metrics.counter("connection.recovery.failures").increment();
// Send alert to monitoring system
alertingService.sendAlert("RabbitMQ connection recovery failed", exception);
}
@Override
public void handleTopologyRecoveryException(Connection conn, Channel ch, TopologyRecoveryException exception) {
logger.error("Topology recovery failed: {}", exception.getMessage(), exception);
metrics.counter("topology.recovery.failures").increment();
// Attempt manual topology recreation
scheduleTopologyRecreation(conn, ch);
}
// Implement other methods with appropriate logging and handling...
}
// Use custom exception handler
ConnectionFactory factory = new ConnectionFactory();
factory.setExceptionHandler(new CustomExceptionHandler(metricsRegistry));Robust Consumer with Error Handling:
public class RobustConsumer extends DefaultConsumer {
private static final Logger logger = LoggerFactory.getLogger(RobustConsumer.class);
private final int maxRetries;
public RobustConsumer(Channel channel, int maxRetries) {
super(channel);
this.maxRetries = maxRetries;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.Properties properties, byte[] body) throws IOException {
try {
processMessage(new String(body, "UTF-8"), properties);
getChannel().basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
logger.error("Error processing message: {}", e.getMessage(), e);
handleProcessingError(envelope, properties, body, e);
}
}
private void handleProcessingError(Envelope envelope, AMQP.BasicProperties properties,
byte[] body, Exception error) throws IOException {
// Get retry count from headers
Map<String, Object> headers = properties.getHeaders();
int retryCount = headers != null && headers.containsKey("x-retry-count") ?
(Integer) headers.get("x-retry-count") : 0;
if (retryCount < maxRetries) {
// Republish for retry
republishForRetry(envelope, properties, body, retryCount + 1);
getChannel().basicAck(envelope.getDeliveryTag(), false);
} else {
// Send to dead letter queue or log as failed
logger.error("Message failed after {} retries, sending to DLQ", maxRetries);
getChannel().basicNack(envelope.getDeliveryTag(), false, false);
}
}
@Override
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
if (!sig.isInitiatedByApplication()) {
logger.error("Consumer {} received unexpected shutdown: {}", consumerTag, sig.getReason());
// Implement reconnection logic
scheduleReconnection();
}
}
}Connection Recovery with Custom Logic:
public class ResilientConnectionManager {
private ConnectionFactory factory;
private volatile Connection connection;
private final List<Channel> channels = new CopyOnWriteArrayList<>();
public ResilientConnectionManager() {
setupConnectionFactory();
}
private void setupConnectionFactory() {
factory = new ConnectionFactory();
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
// Custom recovery listener
factory.setRecoveryDelayHandler(recoveryAttempts -> {
long delay = Math.min(1000L * recoveryAttempts, 30000L);
logger.info("Recovery attempt {}, waiting {}ms", recoveryAttempts, delay);
return delay;
});
// Custom exception handler
factory.setExceptionHandler(new ExceptionHandler() {
@Override
public void handleConnectionRecoveryException(Connection conn, Throwable exception) {
logger.error("Connection recovery failed", exception);
// Custom recovery logic
attemptManualRecovery();
}
// ... implement other methods
});
}
public synchronized Connection getConnection() throws IOException, TimeoutException {
if (connection == null || !connection.isOpen()) {
connection = factory.newConnection();
((RecoverableConnection) connection).addRecoveryListener(new RecoveryListener() {
@Override
public void handleRecovery(Recoverable recoverable) {
logger.info("Connection recovered successfully");
// Recreate channels and consumers
recreateChannels();
}
@Override
public void handleRecoveryStarted(Recoverable recoverable) {
logger.info("Connection recovery started");
}
});
}
return connection;
}
}Install with Tessl CLI
npx tessl i tessl/maven-com-rabbitmq--amqp-client