CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-rabbitmq--amqp-client

The RabbitMQ Java client library allows Java applications to interface with RabbitMQ message broker servers

Pending
Overview
Eval results
Files

observability.mddocs/

Observability and Metrics

Interfaces for collecting metrics and integrating with observability systems. The RabbitMQ Java client provides comprehensive metrics collection capabilities and distributed tracing integration.

Capabilities

Metrics Collection

Interface for collecting operational metrics from the RabbitMQ client.

/**
 * Interface to gather execution data of the client.
 * Note transactions are not supported: they deal with
 * publishing and acknowledgments and the collector contract
 * assumes then that published messages and acks sent
 * in a transaction are always counted, even if the
 * transaction is rolled back.
 */
public interface MetricsCollector {
    
    // Connection metrics
    /**
     * Called when a new connection is created
     * @param connection - The new connection
     */
    void newConnection(Connection connection);
    
    /**
     * Called when a connection is closed
     * @param connection - The closed connection
     */
    void closeConnection(Connection connection);
    
    // Channel metrics
    /**
     * Called when a new channel is created
     * @param channel - The new channel
     */
    void newChannel(Channel channel);
    
    /**
     * Called when a channel is closed
     * @param channel - The closed channel
     */
    void closeChannel(Channel channel);
    
    // Publishing metrics
    /**
     * Called when a message is published
     * @param channel - Channel used for publishing
     */
    void basicPublish(Channel channel);
    
    /**
     * Called when a message publishing fails (default method)
     * @param channel - Channel used for publishing
     * @param cause - Exception that caused failure
     */
    default void basicPublishFailure(Channel channel, Throwable cause) {
        // Default no-op implementation
    }
    
    /**
     * Called when a publisher confirm (ack) is received (default method)
     * @param channel - Channel that received confirm
     * @param deliveryTag - Delivery tag of confirmed message
     * @param multiple - Whether multiple messages were confirmed
     */
    default void basicPublishAck(Channel channel, long deliveryTag, boolean multiple) {
        // Default no-op implementation
    }
    
    /**
     * Called when a publisher nack is received (default method)
     * @param channel - Channel that received nack
     * @param deliveryTag - Delivery tag of nacked message
     * @param multiple - Whether multiple messages were nacked
     */
    default void basicPublishNack(Channel channel, long deliveryTag, boolean multiple) {
        // Default no-op implementation
    }
    
    /**
     * Called when a published message is returned as unroutable (default method)  
     * @param channel - Channel that published the message
     */
    default void basicPublishUnrouted(Channel channel) {
        // Default no-op implementation
    }
    
    // Message consumption metrics
    /**
     * Called when a message is consumed (delivered to consumer)
     * @param channel - Channel used for consuming
     * @param deliveryTag - Delivery tag of consumed message
     * @param autoAck - Whether auto-ack is enabled
     */
    void consumedMessage(Channel channel, long deliveryTag, boolean autoAck);
    
    /**
     * Called when a message is consumed with consumer tag
     * @param channel - Channel used for consuming
     * @param deliveryTag - Delivery tag of consumed message
     * @param consumerTag - Consumer tag
     */
    void consumedMessage(Channel channel, long deliveryTag, String consumerTag);
    
    // Message acknowledgment metrics
    /**
     * Called when a message is acknowledged
     * @param channel - Channel used for ack
     * @param deliveryTag - Delivery tag of acknowledged message
     * @param multiple - Whether multiple messages were acknowledged
     */
    void basicAck(Channel channel, long deliveryTag, boolean multiple);
    
    /**
     * Called when a message is negatively acknowledged
     * @param channel - Channel used for nack
     * @param deliveryTag - Delivery tag of nacked message
     */
    void basicNack(Channel channel, long deliveryTag);
    
    /**
     * Called when a message is negatively acknowledged with requeue option (default method)
     * @param channel - Channel used for nack
     * @param deliveryTag - Delivery tag of nacked message
     * @param requeue - Whether to requeue the message
     */
    default void basicNack(Channel channel, long deliveryTag, boolean requeue) {
        this.basicNack(channel, deliveryTag);
    }
    
    /**
     * Called when a message is rejected
     * @param channel - Channel used for reject
     * @param deliveryTag - Delivery tag of rejected message
     */
    void basicReject(Channel channel, long deliveryTag);
    
    /**
     * Called when a message is rejected with requeue option (default method)
     * @param channel - Channel used for reject
     * @param deliveryTag - Delivery tag of rejected message
     * @param requeue - Whether to requeue the message
     */
    default void basicReject(Channel channel, long deliveryTag, boolean requeue) {
        this.basicReject(channel, deliveryTag);
    }
    
    // Consumer lifecycle metrics
    /**
     * Called when a consumer is created
     * @param channel - Channel used for consuming
     * @param consumerTag - Consumer tag
     * @param autoAck - Whether auto-ack is enabled
     */
    void basicConsume(Channel channel, String consumerTag, boolean autoAck);
    
    /**
     * Called when a consumer is cancelled
     * @param channel - Channel used for consuming
     * @param consumerTag - Consumer tag that was cancelled
     */
    void basicCancel(Channel channel, String consumerTag);
}

No-Op Metrics Collector

Default implementation that performs no operations - useful for disabling metrics collection.

/**
 * No-operation metrics collector that discards all metrics
 */
public class NoOpMetricsCollector implements MetricsCollector {
    
    /**
     * Singleton instance of the no-op collector
     */
    public static final NoOpMetricsCollector INSTANCE = new NoOpMetricsCollector();
    
    // All methods are no-op implementations
    @Override public void newConnection(Connection connection) {}
    @Override public void closeConnection(Connection connection) {}
    @Override public void newChannel(Channel channel) {}
    @Override public void closeChannel(Channel channel) {}
    @Override public void basicPublish(Channel channel) {}
    @Override public void basicConsume(Channel channel, String queue, boolean autoAck) {}
    @Override public void basicCancel(Channel channel, String consumerTag) {}
    @Override public void basicAck(Channel channel, long deliveryTag, boolean multiple) {}
    @Override public void basicNack(Channel channel, long deliveryTag) {}
    @Override public void basicReject(Channel channel, long deliveryTag) {}
    @Override public void basicGet(Channel channel, String queue, boolean messageRetrieved) {}
    @Override public void consumedMessage(Channel channel, long deliveryTag, boolean autoAck) {}
    @Override public void consumedMessage(Channel channel, long deliveryTag, boolean multiple) {}
    @Override public void basicPublishAck(Channel channel, long deliveryTag, boolean multiple) {}
    @Override public void basicPublishNack(Channel channel, long deliveryTag, boolean multiple) {}
    @Override public void basicPublishUnrouted(Channel channel) {}
}

Observation Collection

Interface for collecting observations and telemetry data with more advanced features.

/**
 * Interface for collecting observations and telemetry from RabbitMQ operations
 */
public interface ObservationCollector {
    
    /**
     * Create observation for a publish operation
     * @param exchange - Exchange name
     * @param routingKey - Routing key
     * @return Observation context for the publish operation
     */
    Observation.Context newPublishObservation(String exchange, String routingKey);
    
    /**
     * Create observation for a consume operation
     * @param queue - Queue name
     * @return Observation context for the consume operation
     */
    Observation.Context newConsumeObservation(String queue);
    
    /**
     * Start an observation
     * @param context - Observation context
     * @return Started observation
     */
    Observation start(Observation.Context context);
    
    /**
     * Stop an observation
     * @param observation - Observation to stop
     */
    void stop(Observation observation);
    
    /**
     * Record an error in an observation
     * @param observation - Observation to record error for
     * @param error - Error that occurred
     */
    void error(Observation observation, Throwable error);
}

Usage Examples:

// Configure metrics collection on ConnectionFactory
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

// Custom metrics collector implementation
MetricsCollector metricsCollector = new CustomMetricsCollector();
factory.setMetricsCollector(metricsCollector);

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// All operations will now be tracked by the metrics collector
channel.basicPublish("", "queue", null, "message".getBytes());
// Using no-op collector to disable metrics
ConnectionFactory factory = new ConnectionFactory();
factory.setMetricsCollector(NoOpMetricsCollector.INSTANCE);
// Example custom metrics collector implementation
public class CustomMetricsCollector implements MetricsCollector {
    private final Counter connectionsCreated = Counter.builder("rabbitmq.connections.created").register();
    private final Counter messagesPublished = Counter.builder("rabbitmq.messages.published").register();
    private final Timer publishTimer = Timer.builder("rabbitmq.publish.duration").register();
    
    @Override
    public void newConnection(Connection connection) {
        connectionsCreated.increment();
        System.out.println("New connection created: " + connection.getId());
    }
    
    @Override
    public void basicPublish(Channel channel) {
        messagesPublished.increment();
        // Record publish timing, etc.
    }
    
    // Implement other methods as needed...
}

Types

Traffic Listener

Interface for monitoring network traffic for debugging and analysis purposes.

/**
 * Interface for listening to network traffic
 */
public interface TrafficListener {
    
    /**
     * Called when data is read from the network
     * @param data - Data that was read
     */
    void read(byte[] data);
    
    /**
     * Called when data is written to the network
     * @param data - Data that was written
     */
    void write(byte[] data);
}

Exception Handler

Interface for customizing how the client handles various types of exceptions.

/**
 * Interface for handling exceptions that occur in consumers and connections
 */
public interface ExceptionHandler {
    
    /**
     * Handle unexpected connection driver exceptions
     * @param conn - Connection where exception occurred
     * @param exception - The exception
     */
    void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception);
    
    /**
     * Handle exceptions in return listeners
     * @param channel - Channel where exception occurred
     * @param exception - The exception
     */
    void handleReturnListenerException(Channel channel, Throwable exception);
    
    /**
     * Handle exceptions in flow listeners
     * @param channel - Channel where exception occurred
     * @param exception - The exception
     */
    void handleFlowListenerException(Channel channel, Throwable exception);
    
    /**
     * Handle exceptions in confirm listeners
     * @param channel - Channel where exception occurred
     * @param exception - The exception
     */
    void handleConfirmListenerException(Channel channel, Throwable exception);
    
    /**
     * Handle exceptions in blocked listeners
     * @param connection - Connection where exception occurred
     * @param exception - The exception
     */
    void handleBlockedListenerException(Connection connection, Throwable exception);
    
    /**
     * Handle exceptions in consumers
     * @param channel - Channel where exception occurred
     * @param exception - The exception
     * @param consumer - Consumer that caused the exception
     * @param consumerTag - Consumer tag
     * @param methodName - Method where exception occurred
     */
    void handleConsumerException(Channel channel, Throwable exception, Consumer consumer, String consumerTag, String methodName);
    
    /**
     * Handle connection recovery exceptions
     * @param conn - Connection being recovered
     * @param exception - The exception
     */
    void handleConnectionRecoveryException(Connection conn, Throwable exception);
    
    /**
     * Handle channel recovery exceptions
     * @param ch - Channel being recovered
     * @param exception - The exception
     */
    void handleChannelRecoveryException(Channel ch, Throwable exception);
    
    /**
     * Handle topology recovery exceptions
     * @param conn - Connection where topology recovery failed
     * @param ch - Channel where topology recovery failed
     * @param exception - The exception
     */
    void handleTopologyRecoveryException(Connection conn, Channel ch, TopologyRecoveryException exception);
}

Install with Tessl CLI

npx tessl i tessl/maven-com-rabbitmq--amqp-client

docs

configuration.md

confirms-returns.md

connection-channel.md

consumer-api.md

consuming.md

error-recovery.md

index.md

observability.md

publishing.md

rpc.md

tile.json