Comprehensive connection configuration for RabbitMQ, supporting both individual parameter setup and URI-based configuration with extensive timeout and recovery options.
Main configuration class that encapsulates all RabbitMQ connection parameters and provides a configured ConnectionFactory.
/**
* Connection Configuration for RMQ supporting both individual parameters and URI-based setup
*/
public class RMQConnectionConfig implements Serializable {
/** Returns the host to use for connections */
public String getHost();
/** Returns the port to use for connections */
public int getPort();
/** Returns the virtual host to use when connecting to the broker */
public String getVirtualHost();
/** Returns the AMQP user name to use when connecting to the broker */
public String getUsername();
/** Returns the password to use when connecting to the broker */
public String getPassword();
/** Returns the connection URI when connecting to the broker */
public String getUri();
/** Returns automatic connection recovery interval in milliseconds; default is 5000 */
public Integer getNetworkRecoveryInterval();
/** Returns true if automatic connection recovery is enabled, false otherwise */
public Boolean isAutomaticRecovery();
/** Returns true if topology recovery is enabled, false otherwise */
public Boolean isTopologyRecovery();
/** Returns the connection timeout, in milliseconds; zero for infinite */
public Integer getConnectionTimeout();
/** Returns the initially requested maximum channel number; zero for unlimited */
public Integer getRequestedChannelMax();
/** Returns the initially requested maximum frame size, in octets; zero for unlimited */
public Integer getRequestedFrameMax();
/** Returns the initially requested heartbeat interval, in seconds; zero for none */
public Integer getRequestedHeartbeat();
/**
* Returns configured Connection Factory for RMQ
* @throws URISyntaxException if malformed URI has been passed
* @throws NoSuchAlgorithmException if SSL algorithm is not available
* @throws KeyManagementException if SSL key management fails
*/
public ConnectionFactory getConnectionFactory()
throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException;
}Builder class for constructing RMQConnectionConfig instances with flexible parameter setup.
/**
* Builder class for creating RMQConnectionConfig instances
*/
public static class Builder {
/** Set the default host to use for connections */
public Builder setHost(String host);
/** Set the target port */
public Builder setPort(int port);
/** Set the virtual host to use when connecting to the broker */
public Builder setVirtualHost(String virtualHost);
/** Set the AMQP user name to use when connecting to the broker */
public Builder setUserName(String username);
/** Set the password to use when connecting to the broker */
public Builder setPassword(String password);
/**
* Convenience method for setting the fields in an AMQP URI: host,
* port, username, password and virtual host. If any part of the
* URI is omitted, the ConnectionFactory's corresponding variable
* is left unchanged.
*/
public Builder setUri(String uri);
/** Enables or disables automatic connection recovery */
public Builder setAutomaticRecovery(boolean automaticRecovery);
/** Enables or disables topology recovery */
public Builder setTopologyRecoveryEnabled(boolean topologyRecovery);
/** Sets connection recovery interval. Default is 5000. */
public Builder setNetworkRecoveryInterval(int networkRecoveryInterval);
/** Set the connection timeout in milliseconds; zero for infinite */
public Builder setConnectionTimeout(int connectionTimeout);
/** Set the requested maximum channel number; zero for unlimited */
public Builder setRequestedChannelMax(int requestedChannelMax);
/** Set the requested maximum frame size in octets; zero for unlimited */
public Builder setRequestedFrameMax(int requestedFrameMax);
/** Set the requested heartbeat interval in seconds; zero for none */
public Builder setRequestedHeartbeat(int requestedHeartbeat);
/**
* Builds the RMQConnectionConfig instance
* If URI is provided, uses URI-based configuration
* Otherwise uses individual parameter-based configuration
*/
public RMQConnectionConfig build();
}Usage Examples:
// Individual parameter configuration
RMQConnectionConfig config1 = new RMQConnectionConfig.Builder()
.setHost("rabbitmq.example.com")
.setPort(5672)
.setVirtualHost("/production")
.setUserName("flink-user")
.setPassword("secure-password")
.setAutomaticRecovery(true)
.setNetworkRecoveryInterval(10000)
.setConnectionTimeout(30000)
.build();
// URI-based configuration
RMQConnectionConfig config2 = new RMQConnectionConfig.Builder()
.setUri("amqp://user:password@rabbitmq.example.com:5672/vhost")
.setAutomaticRecovery(true)
.setRequestedHeartbeat(60)
.build();
// SSL configuration with URI
RMQConnectionConfig sslConfig = new RMQConnectionConfig.Builder()
.setUri("amqps://user:password@rabbitmq.example.com:5671/vhost")
.setConnectionTimeout(60000)
.build();
// Get the configured connection factory
ConnectionFactory factory = config1.getConnectionFactory();
Connection connection = factory.newConnection();The getConnectionFactory() method can throw several exceptions:
try {
ConnectionFactory factory = config.getConnectionFactory();
Connection connection = factory.newConnection();
} catch (URISyntaxException e) {
// Handle malformed URI
} catch (NoSuchAlgorithmException | KeyManagementException e) {
// Handle SSL configuration issues
}