CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-rabbitmq--amqp-client

The RabbitMQ Java client library allows Java applications to interface with RabbitMQ message broker servers

Pending
Overview
Eval results
Files

error-recovery.mddocs/

Error Handling and Recovery

Exception classes and automatic recovery mechanisms for handling network failures, protocol errors, and application-initiated shutdowns. The RabbitMQ Java client provides comprehensive error handling and automatic recovery capabilities for robust messaging applications.

Capabilities

Core Exception Types

Base exception classes for different types of errors in AMQP operations.

/**
 * Exception indicating a shutdown signal from connection or channel
 */
public class ShutdownSignalException extends RuntimeException {
    /**
     * Get the shutdown reason object
     * @return AMQP method object indicating the reason
     */
    public Object getReason();
    
    /**
     * Check if this is a hard error (connection-level)
     * @return true for connection errors, false for channel errors
     */
    public boolean isHardError();
    
    /**
     * Check if shutdown was initiated by the application
     * @return true if application initiated, false if server initiated
     */
    public boolean isInitiatedByApplication();
    
    /**
     * Get reference to the connection or channel that was shut down
     * @return ShutdownNotifier object (Connection or Channel)
     */
    public ShutdownNotifier getReference();
}

/**
 * Exception for operations on already closed connections or channels
 */
public class AlreadyClosedException extends ShutdownSignalException {
    /**
     * Create exception with shutdown signal
     * @param shutdownSignalException - Original shutdown signal
     */
    public AlreadyClosedException(ShutdownSignalException shutdownSignalException);
}

/**
 * Exception for malformed AMQP frames
 */
public class MalformedFrameException extends IOException {
    /**
     * Create exception with message
     * @param message - Error message
     */
    public MalformedFrameException(String message);
}

/**
 * Exception for missed heartbeats from server
 */
public class MissedHeartbeatException extends IOException {
    /**
     * Create exception with message
     * @param message - Error message
     */
    public MissedHeartbeatException(String message);
}

Usage Examples:

// Handling shutdown signals
try {
    channel.basicPublish("exchange", "key", null, message.getBytes());
} catch (ShutdownSignalException e) {
    if (e.isHardError()) {
        System.out.println("Connection error: " + e.getReason());
        // Handle connection-level error
        reconnect();
    } else {
        System.out.println("Channel error: " + e.getReason());
        // Handle channel-level error  
        reopenChannel();
    }
} catch (AlreadyClosedException e) {
    System.out.println("Attempted operation on closed resource");
    // Recreate connection/channel
    recreateResources();
}

Authentication Exceptions

Exception types for authentication and authorization failures.

/**
 * Base class for possible authentication failures
 */
public class PossibleAuthenticationFailureException extends IOException {
    /**
     * Create exception with message
     * @param message - Error message
     */
    public PossibleAuthenticationFailureException(String message);
}

/**
 * Exception for confirmed authentication failures
 */
public class AuthenticationFailureException extends PossibleAuthenticationFailureException {
    /**
     * Create exception with message
     * @param message - Error message
     */
    public AuthenticationFailureException(String message);
}

Protocol and Frame Exceptions

Exceptions for protocol-level errors and unexpected conditions.

/**
 * Exception for protocol version mismatches
 */
public class ProtocolVersionMismatchException extends IOException {
    /**
     * Create exception with version information
     * @param clientMajor - Client protocol major version
     * @param clientMinor - Client protocol minor version
     * @param serverMajor - Server protocol major version
     * @param serverMinor - Server protocol minor version
     */
    public ProtocolVersionMismatchException(int clientMajor, int clientMinor, int serverMajor, int serverMinor);
    
    public int getClientMajor();
    public int getClientMinor();
    public int getServerMajor();
    public int getServerMinor();
}

/**
 * Error for unexpected AMQP frames
 */
public class UnexpectedFrameError extends Error {
    /**
     * Create error with frame information
     * @param frame - Unexpected frame object
     * @param expectedFrameType - Expected frame type
     */
    public UnexpectedFrameError(Frame frame, int expectedFrameType);
}

/**
 * Error for unexpected AMQP methods
 */  
public class UnexpectedMethodError extends Error {
    /**
     * Create error with method information
     * @param method - Unexpected method object
     */
    public UnexpectedMethodError(Method method);
}

/**
 * Exception for unknown class or method IDs
 */
public class UnknownClassOrMethodId extends IOException {
    /**
     * Create exception with class and method IDs
     * @param classId - AMQP class ID
     * @param methodId - AMQP method ID
     */
    public UnknownClassOrMethodId(int classId, int methodId);
    
    public int getClassId();
    public int getMethodId();
}

Channel and Operation Exceptions

Exceptions specific to channel operations and RPC timeouts.

/**
 * Exception for channel RPC operation timeouts
 */
public class ChannelContinuationTimeoutException extends TimeoutException {
    /**
     * Create timeout exception
     */
    public ChannelContinuationTimeoutException();
    
    /**
     * Create timeout exception with message
     * @param message - Error message
     */
    public ChannelContinuationTimeoutException(String message);
}

/**
 * Exception for consumer cancellation
 */
public class ConsumerCancelledException extends RuntimeException {
    /**
     * Create consumer cancellation exception
     */
    public ConsumerCancelledException();
}

/**
 * Exception for unroutable RPC requests
 */
public class UnroutableRpcRequestException extends IOException {
    /**
     * Create exception for unroutable RPC request
     * @param message - Error message
     */
    public UnroutableRpcRequestException(String message);
}

Topology Recovery Exceptions

Exceptions related to automatic topology recovery.

/**
 * Exception during topology recovery process
 */
public class TopologyRecoveryException extends Exception {
    /**
     * Create recovery exception with cause
     * @param cause - Underlying cause of recovery failure
     */
    public TopologyRecoveryException(Throwable cause);
    
    /**
     * Create recovery exception with message and cause
     * @param message - Error message
     * @param cause - Underlying cause
     */
    public TopologyRecoveryException(String message, Throwable cause);
}

Recovery System

Automatic Recovery

Interfaces and classes for automatic connection and topology recovery.

/**
 * Interface for objects that support recovery
 */
public interface Recoverable {
    /**
     * Add a recovery listener
     * @param listener - Listener to notify on recovery events
     */
    void addRecoveryListener(RecoveryListener listener);
    
    /**
     * Remove a recovery listener
     * @param listener - Listener to remove
     */
    void removeRecoveryListener(RecoveryListener listener);
}

/**
 * Listener interface for recovery events
 */
public interface RecoveryListener {
    /**
     * Called when recovery completes successfully
     * @param recoverable - Object that was recovered
     */
    void handleRecovery(Recoverable recoverable);
    
    /**
     * Called when recovery process starts
     * @param recoverable - Object being recovered
     */
    void handleRecoveryStarted(Recoverable recoverable);
}

/**
 * Interface for handling recovery delays
 */
public interface RecoveryDelayHandler {
    /**
     * Get delay before next recovery attempt
     * @param recoveryAttempts - Number of recovery attempts so far
     * @return Delay in milliseconds before next attempt
     */
    long getDelay(int recoveryAttempts);
}

Usage Examples:

// Custom recovery listener
RecoveryListener recoveryListener = new RecoveryListener() {
    @Override
    public void handleRecovery(Recoverable recoverable) {
        if (recoverable instanceof Connection) {
            System.out.println("Connection recovered successfully");
            // Notify application components
            notifyConnectionRecovered();
        } else if (recoverable instanceof Channel) {
            System.out.println("Channel recovered successfully");
            // Restart consumers if needed
            restartConsumers();
        }
    }
    
    @Override
    public void handleRecoveryStarted(Recoverable recoverable) {
        System.out.println("Recovery started for: " + recoverable);
        // Pause message processing during recovery
        pauseProcessing();
    }
};

// Add to recoverable connection
ConnectionFactory factory = new ConnectionFactory();
factory.setAutomaticRecoveryEnabled(true);
RecoverableConnection connection = (RecoverableConnection) factory.newConnection();
connection.addRecoveryListener(recoveryListener);
// Custom recovery delay handler with exponential backoff
RecoveryDelayHandler delayHandler = new RecoveryDelayHandler() {
    @Override
    public long getDelay(int recoveryAttempts) {
        // Exponential backoff: 1s, 2s, 4s, 8s, max 30s
        long delay = Math.min(1000L * (1L << recoveryAttempts), 30000L);
        System.out.println("Recovery attempt " + recoveryAttempts + ", waiting " + delay + "ms");
        return delay;
    }
};

ConnectionFactory factory = new ConnectionFactory();
factory.setRecoveryDelayHandler(delayHandler);
factory.setAutomaticRecoveryEnabled(true);

Exception Handling

Interface for handling exceptions in consumers and other callback contexts.

/**
 * Interface for handling exceptions in consumers and connections
 */
public interface ExceptionHandler {
    /**
     * Handle unexpected exception in connection driver
     * @param conn - Connection where exception occurred
     * @param exception - Exception that occurred
     */
    void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception);
    
    /**
     * Handle exception in return listener
     * @param channel - Channel where exception occurred
     * @param exception - Exception that occurred
     */
    void handleReturnListenerException(Channel channel, Throwable exception);
    
    /**
     * Handle exception in flow listener
     * @param channel - Channel where exception occurred
     * @param exception - Exception that occurred
     */
    void handleFlowListenerException(Channel channel, Throwable exception);
    
    /**
     * Handle exception in confirm listener
     * @param channel - Channel where exception occurred
     * @param exception - Exception that occurred
     */
    void handleConfirmListenerException(Channel channel, Throwable exception);
    
    /**
     * Handle exception in blocked listener
     * @param connection - Connection where exception occurred
     * @param exception - Exception that occurred
     */
    void handleBlockedListenerException(Connection connection, Throwable exception);
    
    /**
     * Handle exception in consumer
     * @param channel - Channel where exception occurred
     * @param exception - Exception that occurred
     * @param consumer - Consumer that threw exception
     * @param consumerTag - Consumer tag
     * @param methodName - Method where exception occurred
     */
    void handleConsumerException(Channel channel, Throwable exception, Consumer consumer, String consumerTag, String methodName);
    
    /**
     * Handle exception during connection recovery
     * @param conn - Connection being recovered
     * @param exception - Exception that occurred
     */
    void handleConnectionRecoveryException(Connection conn, Throwable exception);
    
    /**
     * Handle exception during channel recovery
     * @param ch - Channel being recovered
     * @param exception - Exception that occurred
     */
    void handleChannelRecoveryException(Channel ch, Throwable exception);
    
    /**
     * Handle exception during topology recovery
     * @param conn - Connection being recovered
     * @param ch - Channel being recovered (may be null)
     * @param exception - Exception that occurred
     */
    void handleTopologyRecoveryException(Connection conn, Channel ch, TopologyRecoveryException exception);
}

Usage Examples:

// Custom exception handler with logging and metrics
public class CustomExceptionHandler implements ExceptionHandler {
    private static final Logger logger = LoggerFactory.getLogger(CustomExceptionHandler.class);
    private final MetricsRegistry metrics;
    
    public CustomExceptionHandler(MetricsRegistry metrics) {
        this.metrics = metrics;
    }
    
    @Override
    public void handleConsumerException(Channel channel, Throwable exception, 
                                       Consumer consumer, String consumerTag, String methodName) {
        logger.error("Consumer exception in {}: {}", methodName, exception.getMessage(), exception);
        metrics.counter("consumer.exceptions").increment();
        
        // Optionally restart consumer
        if (exception instanceof RuntimeException) {
            restartConsumer(channel, consumer, consumerTag);
        }
    }
    
    @Override
    public void handleConnectionRecoveryException(Connection conn, Throwable exception) {
        logger.error("Connection recovery failed: {}", exception.getMessage(), exception);
        metrics.counter("connection.recovery.failures").increment();
        
        // Send alert to monitoring system
        alertingService.sendAlert("RabbitMQ connection recovery failed", exception);
    }
    
    @Override
    public void handleTopologyRecoveryException(Connection conn, Channel ch, TopologyRecoveryException exception) {
        logger.error("Topology recovery failed: {}", exception.getMessage(), exception);
        metrics.counter("topology.recovery.failures").increment();
        
        // Attempt manual topology recreation
        scheduleTopologyRecreation(conn, ch);
    }
    
    // Implement other methods with appropriate logging and handling...
}

// Use custom exception handler
ConnectionFactory factory = new ConnectionFactory();
factory.setExceptionHandler(new CustomExceptionHandler(metricsRegistry));

Error Handling Patterns

Robust Consumer with Error Handling:

public class RobustConsumer extends DefaultConsumer {
    private static final Logger logger = LoggerFactory.getLogger(RobustConsumer.class);
    private final int maxRetries;
    
    public RobustConsumer(Channel channel, int maxRetries) {
        super(channel);
        this.maxRetries = maxRetries;
    }
    
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                              AMQP.Properties properties, byte[] body) throws IOException {
        try {
            processMessage(new String(body, "UTF-8"), properties);
            getChannel().basicAck(envelope.getDeliveryTag(), false);
            
        } catch (Exception e) {
            logger.error("Error processing message: {}", e.getMessage(), e);
            handleProcessingError(envelope, properties, body, e);
        }
    }
    
    private void handleProcessingError(Envelope envelope, AMQP.BasicProperties properties, 
                                     byte[] body, Exception error) throws IOException {
        // Get retry count from headers
        Map<String, Object> headers = properties.getHeaders();
        int retryCount = headers != null && headers.containsKey("x-retry-count") ?
            (Integer) headers.get("x-retry-count") : 0;
            
        if (retryCount < maxRetries) {
            // Republish for retry
            republishForRetry(envelope, properties, body, retryCount + 1);
            getChannel().basicAck(envelope.getDeliveryTag(), false);
        } else {
            // Send to dead letter queue or log as failed
            logger.error("Message failed after {} retries, sending to DLQ", maxRetries);
            getChannel().basicNack(envelope.getDeliveryTag(), false, false);
        }
    }
    
    @Override
    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
        if (!sig.isInitiatedByApplication()) {
            logger.error("Consumer {} received unexpected shutdown: {}", consumerTag, sig.getReason());
            // Implement reconnection logic
            scheduleReconnection();
        }
    }
}

Connection Recovery with Custom Logic:

public class ResilientConnectionManager {
    private ConnectionFactory factory;
    private volatile Connection connection;
    private final List<Channel> channels = new CopyOnWriteArrayList<>();
    
    public ResilientConnectionManager() {
        setupConnectionFactory();
    }
    
    private void setupConnectionFactory() {
        factory = new ConnectionFactory();
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        
        // Custom recovery listener
        factory.setRecoveryDelayHandler(recoveryAttempts -> {
            long delay = Math.min(1000L * recoveryAttempts, 30000L);
            logger.info("Recovery attempt {}, waiting {}ms", recoveryAttempts, delay);
            return delay;
        });
        
        // Custom exception handler
        factory.setExceptionHandler(new ExceptionHandler() {
            @Override
            public void handleConnectionRecoveryException(Connection conn, Throwable exception) {
                logger.error("Connection recovery failed", exception);
                // Custom recovery logic
                attemptManualRecovery();
            }
            
            // ... implement other methods
        });
    }
    
    public synchronized Connection getConnection() throws IOException, TimeoutException {
        if (connection == null || !connection.isOpen()) {
            connection = factory.newConnection();
            ((RecoverableConnection) connection).addRecoveryListener(new RecoveryListener() {
                @Override
                public void handleRecovery(Recoverable recoverable) {
                    logger.info("Connection recovered successfully");
                    // Recreate channels and consumers
                    recreateChannels();
                }
                
                @Override
                public void handleRecoveryStarted(Recoverable recoverable) {
                    logger.info("Connection recovery started");
                }
            });
        }
        return connection;
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-com-rabbitmq--amqp-client

docs

configuration.md

confirms-returns.md

connection-channel.md

consumer-api.md

consuming.md

error-recovery.md

index.md

observability.md

publishing.md

rpc.md

tile.json