0
# Connection Configuration
1
2
Comprehensive connection configuration for RabbitMQ, supporting both individual parameter setup and URI-based configuration with extensive timeout and recovery options.
3
4
## Capabilities
5
6
### Connection Config Class
7
8
Main configuration class that encapsulates all RabbitMQ connection parameters and provides a configured ConnectionFactory.
9
10
```java { .api }
11
/**
12
* Connection Configuration for RMQ supporting both individual parameters and URI-based setup
13
*/
14
public class RMQConnectionConfig implements Serializable {
15
16
/** Returns the host to use for connections */
17
public String getHost();
18
19
/** Returns the port to use for connections */
20
public int getPort();
21
22
/** Returns the virtual host to use when connecting to the broker */
23
public String getVirtualHost();
24
25
/** Returns the AMQP user name to use when connecting to the broker */
26
public String getUsername();
27
28
/** Returns the password to use when connecting to the broker */
29
public String getPassword();
30
31
/** Returns the connection URI when connecting to the broker */
32
public String getUri();
33
34
/** Returns automatic connection recovery interval in milliseconds; default is 5000 */
35
public Integer getNetworkRecoveryInterval();
36
37
/** Returns true if automatic connection recovery is enabled, false otherwise */
38
public Boolean isAutomaticRecovery();
39
40
/** Returns true if topology recovery is enabled, false otherwise */
41
public Boolean isTopologyRecovery();
42
43
/** Returns the connection timeout, in milliseconds; zero for infinite */
44
public Integer getConnectionTimeout();
45
46
/** Returns the initially requested maximum channel number; zero for unlimited */
47
public Integer getRequestedChannelMax();
48
49
/** Returns the initially requested maximum frame size, in octets; zero for unlimited */
50
public Integer getRequestedFrameMax();
51
52
/** Returns the initially requested heartbeat interval, in seconds; zero for none */
53
public Integer getRequestedHeartbeat();
54
55
/**
56
* Returns configured Connection Factory for RMQ
57
* @throws URISyntaxException if malformed URI has been passed
58
* @throws NoSuchAlgorithmException if SSL algorithm is not available
59
* @throws KeyManagementException if SSL key management fails
60
*/
61
public ConnectionFactory getConnectionFactory()
62
throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException;
63
}
64
```
65
66
### Builder Pattern
67
68
Builder class for constructing RMQConnectionConfig instances with flexible parameter setup.
69
70
```java { .api }
71
/**
72
* Builder class for creating RMQConnectionConfig instances
73
*/
74
public static class Builder {
75
76
/** Set the default host to use for connections */
77
public Builder setHost(String host);
78
79
/** Set the target port */
80
public Builder setPort(int port);
81
82
/** Set the virtual host to use when connecting to the broker */
83
public Builder setVirtualHost(String virtualHost);
84
85
/** Set the AMQP user name to use when connecting to the broker */
86
public Builder setUserName(String username);
87
88
/** Set the password to use when connecting to the broker */
89
public Builder setPassword(String password);
90
91
/**
92
* Convenience method for setting the fields in an AMQP URI: host,
93
* port, username, password and virtual host. If any part of the
94
* URI is omitted, the ConnectionFactory's corresponding variable
95
* is left unchanged.
96
*/
97
public Builder setUri(String uri);
98
99
/** Enables or disables automatic connection recovery */
100
public Builder setAutomaticRecovery(boolean automaticRecovery);
101
102
/** Enables or disables topology recovery */
103
public Builder setTopologyRecoveryEnabled(boolean topologyRecovery);
104
105
/** Sets connection recovery interval. Default is 5000. */
106
public Builder setNetworkRecoveryInterval(int networkRecoveryInterval);
107
108
/** Set the connection timeout in milliseconds; zero for infinite */
109
public Builder setConnectionTimeout(int connectionTimeout);
110
111
/** Set the requested maximum channel number; zero for unlimited */
112
public Builder setRequestedChannelMax(int requestedChannelMax);
113
114
/** Set the requested maximum frame size in octets; zero for unlimited */
115
public Builder setRequestedFrameMax(int requestedFrameMax);
116
117
/** Set the requested heartbeat interval in seconds; zero for none */
118
public Builder setRequestedHeartbeat(int requestedHeartbeat);
119
120
/**
121
* Builds the RMQConnectionConfig instance
122
* If URI is provided, uses URI-based configuration
123
* Otherwise uses individual parameter-based configuration
124
*/
125
public RMQConnectionConfig build();
126
}
127
```
128
129
**Usage Examples:**
130
131
```java
132
// Individual parameter configuration
133
RMQConnectionConfig config1 = new RMQConnectionConfig.Builder()
134
.setHost("rabbitmq.example.com")
135
.setPort(5672)
136
.setVirtualHost("/production")
137
.setUserName("flink-user")
138
.setPassword("secure-password")
139
.setAutomaticRecovery(true)
140
.setNetworkRecoveryInterval(10000)
141
.setConnectionTimeout(30000)
142
.build();
143
144
// URI-based configuration
145
RMQConnectionConfig config2 = new RMQConnectionConfig.Builder()
146
.setUri("amqp://user:password@rabbitmq.example.com:5672/vhost")
147
.setAutomaticRecovery(true)
148
.setRequestedHeartbeat(60)
149
.build();
150
151
// SSL configuration with URI
152
RMQConnectionConfig sslConfig = new RMQConnectionConfig.Builder()
153
.setUri("amqps://user:password@rabbitmq.example.com:5671/vhost")
154
.setConnectionTimeout(60000)
155
.build();
156
157
// Get the configured connection factory
158
ConnectionFactory factory = config1.getConnectionFactory();
159
Connection connection = factory.newConnection();
160
```
161
162
## Configuration Options
163
164
### Basic Connection Parameters
165
166
- **Host**: RabbitMQ server hostname or IP address
167
- **Port**: RabbitMQ server port (default: 5672 for AMQP, 5671 for AMQPS)
168
- **Virtual Host**: RabbitMQ virtual host (default: "/")
169
- **Username**: Authentication username
170
- **Password**: Authentication password
171
- **URI**: Complete AMQP URI (alternative to individual parameters)
172
173
### Recovery and Resilience
174
175
- **Automatic Recovery**: Enables automatic connection recovery on network failures
176
- **Topology Recovery**: Enables automatic recovery of queues, exchanges, and bindings
177
- **Network Recovery Interval**: Delay between reconnection attempts (default: 5000ms)
178
179
### Performance and Limits
180
181
- **Connection Timeout**: Maximum time to wait for connection establishment (0 = infinite)
182
- **Requested Channel Max**: Maximum number of channels per connection (0 = unlimited)
183
- **Requested Frame Max**: Maximum frame size in octets (0 = unlimited)
184
- **Requested Heartbeat**: Heartbeat interval in seconds (0 = disabled)
185
186
## Error Handling
187
188
The `getConnectionFactory()` method can throw several exceptions:
189
190
- **URISyntaxException**: Thrown when URI is malformed or cannot be parsed
191
- **NoSuchAlgorithmException**: Thrown when required SSL algorithms are not available
192
- **KeyManagementException**: Thrown when SSL key management setup fails
193
194
```java
195
try {
196
ConnectionFactory factory = config.getConnectionFactory();
197
Connection connection = factory.newConnection();
198
} catch (URISyntaxException e) {
199
// Handle malformed URI
200
} catch (NoSuchAlgorithmException | KeyManagementException e) {
201
// Handle SSL configuration issues
202
}
203
```