0
# Connection Configuration
1
2
Auto-configuration and customization of RabbitMQ connection factories, including connection pooling, SSL configuration, clustering support, and connection management strategies.
3
4
## Connection Factory Configuration
5
6
### Basic Connection Factory
7
8
```java { .api }
9
@Configuration
10
public class ConnectionConfig {
11
12
@Bean
13
public CachingConnectionFactory connectionFactory() {
14
CachingConnectionFactory factory = new CachingConnectionFactory();
15
factory.setHost("localhost");
16
factory.setPort(5672);
17
factory.setUsername("guest");
18
factory.setPassword("guest");
19
factory.setVirtualHost("/");
20
return factory;
21
}
22
}
23
```
24
25
### Connection Factory Customization
26
27
```java { .api }
28
@FunctionalInterface
29
public interface ConnectionFactoryCustomizer {
30
void customize(ConnectionFactory factory);
31
}
32
33
@Configuration
34
public class CustomConnectionConfig {
35
36
@Bean
37
public ConnectionFactoryCustomizer connectionFactoryCustomizer() {
38
return (connectionFactory) -> {
39
connectionFactory.setRequestedHeartbeat(60);
40
connectionFactory.setConnectionTimeout(30000);
41
connectionFactory.setHandshakeTimeout(10000);
42
connectionFactory.setShutdownTimeout(5000);
43
connectionFactory.setRequestedChannelMax(20);
44
connectionFactory.setNetworkRecoveryInterval(5000);
45
connectionFactory.setAutomaticRecoveryEnabled(true);
46
connectionFactory.setTopologyRecoveryEnabled(true);
47
};
48
}
49
}
50
```
51
52
### Caching Connection Factory Configuration
53
54
```java { .api }
55
@Configuration
56
public class CachingConfig {
57
58
@Bean
59
public CachingConnectionFactory cachingConnectionFactory() {
60
CachingConnectionFactory factory = new CachingConnectionFactory("localhost");
61
62
// Channel caching configuration
63
factory.setCacheMode(CacheMode.CHANNEL);
64
factory.setChannelCacheSize(25);
65
factory.setChannelCheckoutTimeout(5000);
66
67
// Connection configuration
68
factory.setConnectionTimeout(60000);
69
factory.setCloseTimeout(30000);
70
71
// Publisher confirms
72
factory.setPublisherConfirmType(ConfirmType.CORRELATED);
73
factory.setPublisherReturns(true);
74
75
return factory;
76
}
77
}
78
```
79
80
### Connection Caching Modes
81
82
```java { .api }
83
@Configuration
84
public class CachingModeConfig {
85
86
// Channel caching mode (default)
87
@Bean("channelCachingFactory")
88
public CachingConnectionFactory channelCachingFactory() {
89
CachingConnectionFactory factory = new CachingConnectionFactory("localhost");
90
factory.setCacheMode(CacheMode.CHANNEL);
91
factory.setChannelCacheSize(25);
92
return factory;
93
}
94
95
// Connection caching mode
96
@Bean("connectionCachingFactory")
97
public CachingConnectionFactory connectionCachingFactory() {
98
CachingConnectionFactory factory = new CachingConnectionFactory("localhost");
99
factory.setCacheMode(CacheMode.CONNECTION);
100
factory.setConnectionCacheSize(5);
101
factory.setChannelCacheSize(10);
102
return factory;
103
}
104
}
105
```
106
107
## SSL Configuration
108
109
### SSL Connection Factory
110
111
```java { .api }
112
@Configuration
113
public class SSLConfig {
114
115
@Bean
116
public CachingConnectionFactory sslConnectionFactory() throws Exception {
117
CachingConnectionFactory factory = new CachingConnectionFactory();
118
factory.setHost("rabbitmq.example.com");
119
factory.setPort(5671);
120
factory.setUsername("ssl-user");
121
factory.setPassword("ssl-password");
122
123
// SSL configuration
124
factory.getRabbitConnectionFactory().useSslProtocol();
125
126
return factory;
127
}
128
129
@Bean
130
public CachingConnectionFactory customSslConnectionFactory() throws Exception {
131
CachingConnectionFactory factory = new CachingConnectionFactory();
132
factory.setHost("rabbitmq.example.com");
133
factory.setPort(5671);
134
135
// Custom SSL context
136
SSLContext sslContext = createCustomSSLContext();
137
factory.getRabbitConnectionFactory().useSslProtocol(sslContext);
138
139
return factory;
140
}
141
142
private SSLContext createCustomSSLContext() throws Exception {
143
SSLContext context = SSLContext.getInstance("TLSv1.2");
144
145
// Load key store
146
KeyStore keyStore = KeyStore.getInstance("PKCS12");
147
keyStore.load(new FileInputStream("client-keystore.p12"), "password".toCharArray());
148
149
KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
150
kmf.init(keyStore, "password".toCharArray());
151
152
// Load trust store
153
KeyStore trustStore = KeyStore.getInstance("JKS");
154
trustStore.load(new FileInputStream("truststore.jks"), "password".toCharArray());
155
156
TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
157
tmf.init(trustStore);
158
159
context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom());
160
161
return context;
162
}
163
}
164
```
165
166
### SSL with Client Certificate Authentication
167
168
```java { .api }
169
@Configuration
170
public class ClientCertConfig {
171
172
@Bean
173
public CachingConnectionFactory clientCertConnectionFactory() throws Exception {
174
CachingConnectionFactory factory = new CachingConnectionFactory();
175
factory.setHost("secure-rabbitmq.example.com");
176
factory.setPort(5671);
177
178
// External authentication (x509 certificate)
179
factory.getRabbitConnectionFactory().setCredentialsProvider(new ExternalCredentialsProvider());
180
181
// SSL with client certificate
182
SSLContext sslContext = createClientCertSSLContext();
183
factory.getRabbitConnectionFactory().useSslProtocol(sslContext);
184
185
return factory;
186
}
187
188
private SSLContext createClientCertSSLContext() throws Exception {
189
// Implementation for client certificate SSL context
190
// Load client certificate and configure SSL context
191
return SSLContext.getDefault();
192
}
193
}
194
```
195
196
## Clustering and High Availability
197
198
### Multiple Hosts Configuration
199
200
```java { .api }
201
@Configuration
202
public class ClusterConfig {
203
204
@Bean
205
public CachingConnectionFactory clusterConnectionFactory() {
206
CachingConnectionFactory factory = new CachingConnectionFactory();
207
208
// Multiple host addresses
209
Address[] addresses = {
210
new Address("rabbitmq1.example.com", 5672),
211
new Address("rabbitmq2.example.com", 5672),
212
new Address("rabbitmq3.example.com", 5672)
213
};
214
factory.setAddresses(addresses);
215
factory.setAddressShuffleMode(AddressShuffleMode.RANDOM);
216
217
factory.setUsername("cluster-user");
218
factory.setPassword("cluster-password");
219
220
return factory;
221
}
222
}
223
```
224
225
### Load Balancing Connection Factory
226
227
```java { .api }
228
@Configuration
229
public class LoadBalancingConfig {
230
231
@Bean
232
public AbstractConnectionFactory loadBalancingConnectionFactory() {
233
List<Address> addresses = Arrays.asList(
234
new Address("node1.rabbitmq.com", 5672),
235
new Address("node2.rabbitmq.com", 5672),
236
new Address("node3.rabbitmq.com", 5672)
237
);
238
239
// Round-robin load balancing
240
return new SimpleRoutingConnectionFactory() {
241
private int currentIndex = 0;
242
243
@Override
244
protected Object determineCurrentLookupKey() {
245
return addresses.get((currentIndex++) % addresses.size());
246
}
247
};
248
}
249
}
250
```
251
252
## Connection Recovery and Resilience
253
254
### Automatic Recovery Configuration
255
256
```java { .api }
257
@Configuration
258
public class RecoveryConfig {
259
260
@Bean
261
public CachingConnectionFactory resilientConnectionFactory() {
262
CachingConnectionFactory factory = new CachingConnectionFactory("localhost");
263
264
// Automatic recovery settings
265
factory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);
266
factory.getRabbitConnectionFactory().setTopologyRecoveryEnabled(true);
267
factory.getRabbitConnectionFactory().setNetworkRecoveryInterval(5000);
268
269
// Connection settings for resilience
270
factory.setConnectionTimeout(60000);
271
factory.getRabbitConnectionFactory().setRequestedHeartbeat(30);
272
factory.getRabbitConnectionFactory().setConnectionTimeout(30000);
273
factory.getRabbitConnectionFactory().setHandshakeTimeout(10000);
274
275
return factory;
276
}
277
}
278
```
279
280
### Custom Recovery Callback
281
282
```java { .api }
283
@Configuration
284
public class CustomRecoveryConfig {
285
286
@Bean
287
public CachingConnectionFactory connectionFactoryWithRecovery() {
288
CachingConnectionFactory factory = new CachingConnectionFactory("localhost");
289
290
factory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);
291
292
// Add recovery listeners
293
factory.addConnectionListener(new ConnectionListener() {
294
@Override
295
public void onCreate(Connection connection) {
296
log.info("Connection created: {}", connection);
297
}
298
299
@Override
300
public void onClose(Connection connection) {
301
log.warn("Connection closed: {}", connection);
302
}
303
304
@Override
305
public void onShutDown(ShutdownSignalException signal) {
306
log.error("Connection shutdown: {}", signal.getReason());
307
}
308
});
309
310
return factory;
311
}
312
}
313
```
314
315
## Connection Monitoring and Health
316
317
### Connection Health Indicator
318
319
```java { .api }
320
@Component
321
public class RabbitHealthIndicator implements HealthIndicator {
322
323
private final CachingConnectionFactory connectionFactory;
324
325
public RabbitHealthIndicator(CachingConnectionFactory connectionFactory) {
326
this.connectionFactory = connectionFactory;
327
}
328
329
@Override
330
public Health health() {
331
Health.Builder builder = new Health.Builder();
332
333
try {
334
Connection connection = connectionFactory.createConnection();
335
if (connection.isOpen()) {
336
builder.up()
337
.withDetail("version", getServerVersion(connection))
338
.withDetail("connectionCount", connectionFactory.getCacheProperties().get("openConnections"));
339
} else {
340
builder.down().withDetail("reason", "Connection is not open");
341
}
342
connection.close();
343
} catch (Exception e) {
344
builder.down(e);
345
}
346
347
return builder.build();
348
}
349
350
private String getServerVersion(Connection connection) {
351
return connection.getServerProperties().get("version").toString();
352
}
353
}
354
```
355
356
### Connection Metrics
357
358
```java { .api }
359
@Component
360
public class ConnectionMetrics {
361
362
private final CachingConnectionFactory connectionFactory;
363
private final MeterRegistry meterRegistry;
364
365
public ConnectionMetrics(CachingConnectionFactory connectionFactory, MeterRegistry meterRegistry) {
366
this.connectionFactory = connectionFactory;
367
this.meterRegistry = meterRegistry;
368
setupMetrics();
369
}
370
371
private void setupMetrics() {
372
Gauge.builder("rabbitmq.connections.open")
373
.description("Number of open connections")
374
.register(meterRegistry, connectionFactory, cf -> {
375
Properties props = cf.getCacheProperties();
376
return props.get("openConnections");
377
});
378
379
Gauge.builder("rabbitmq.channels.cached")
380
.description("Number of cached channels")
381
.register(meterRegistry, connectionFactory, cf -> {
382
Properties props = cf.getCacheProperties();
383
return props.get("channelCacheSize");
384
});
385
}
386
}
387
```
388
389
## Custom Connection Name Strategy
390
391
### Connection Naming
392
393
```java { .api }
394
@Configuration
395
public class ConnectionNamingConfig {
396
397
@Bean
398
public ConnectionNameStrategy connectionNameStrategy() {
399
return (connectionFactory) -> {
400
String applicationName = "my-spring-app";
401
String instanceId = System.getProperty("instance.id", UUID.randomUUID().toString());
402
String hostname = getHostname();
403
return String.format("%s-%s@%s", applicationName, instanceId, hostname);
404
};
405
}
406
407
@Bean
408
public CachingConnectionFactory namedConnectionFactory(ConnectionNameStrategy connectionNameStrategy) {
409
CachingConnectionFactory factory = new CachingConnectionFactory("localhost");
410
factory.setConnectionNameStrategy(connectionNameStrategy);
411
return factory;
412
}
413
414
private String getHostname() {
415
try {
416
return InetAddress.getLocalHost().getHostName();
417
} catch (UnknownHostException e) {
418
return "unknown";
419
}
420
}
421
}
422
```
423
424
## Threading and Executor Configuration
425
426
### Custom Executor Configuration
427
428
```java { .api }
429
@Configuration
430
public class ExecutorConfig {
431
432
@Bean
433
public Executor rabbitConnectionExecutor() {
434
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
435
executor.setCorePoolSize(5);
436
executor.setMaxPoolSize(10);
437
executor.setQueueCapacity(25);
438
executor.setThreadNamePrefix("rabbit-conn-");
439
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
440
executor.initialize();
441
return executor;
442
}
443
444
@Bean
445
public CachingConnectionFactory connectionFactoryWithExecutor(Executor rabbitConnectionExecutor) {
446
CachingConnectionFactory factory = new CachingConnectionFactory("localhost");
447
factory.setExecutor(rabbitConnectionExecutor);
448
return factory;
449
}
450
}
451
```
452
453
## Advanced Connection Scenarios
454
455
### Multi-tenant Connection Management
456
457
```java { .api }
458
@Configuration
459
public class MultiTenantConnectionConfig {
460
461
@Bean
462
public AbstractRoutingConnectionFactory routingConnectionFactory() {
463
SimpleRoutingConnectionFactory routingFactory = new SimpleRoutingConnectionFactory();
464
465
Map<Object, ConnectionFactory> targetConnectionFactories = new HashMap<>();
466
targetConnectionFactories.put("tenant1", createConnectionFactory("tenant1-vhost", "tenant1-user", "tenant1-pass"));
467
targetConnectionFactories.put("tenant2", createConnectionFactory("tenant2-vhost", "tenant2-user", "tenant2-pass"));
468
targetConnectionFactories.put("default", createConnectionFactory("/", "guest", "guest"));
469
470
routingFactory.setTargetConnectionFactories(targetConnectionFactories);
471
routingFactory.setDefaultTargetConnectionFactory(targetConnectionFactories.get("default"));
472
473
return routingFactory;
474
}
475
476
private CachingConnectionFactory createConnectionFactory(String vhost, String username, String password) {
477
CachingConnectionFactory factory = new CachingConnectionFactory("localhost");
478
factory.setVirtualHost(vhost);
479
factory.setUsername(username);
480
factory.setPassword(password);
481
return factory;
482
}
483
484
@Component
485
public static class TenantRoutingConnectionFactory extends SimpleRoutingConnectionFactory {
486
@Override
487
protected Object determineCurrentLookupKey() {
488
return TenantContext.getCurrentTenant();
489
}
490
}
491
}
492
```
493
494
### Environment-specific Configuration
495
496
```java { .api }
497
@Configuration
498
public class EnvironmentConnectionConfig {
499
500
@Bean
501
@Profile("development")
502
public CachingConnectionFactory devConnectionFactory() {
503
CachingConnectionFactory factory = new CachingConnectionFactory("localhost");
504
factory.setUsername("dev-user");
505
factory.setPassword("dev-pass");
506
factory.setChannelCacheSize(5);
507
return factory;
508
}
509
510
@Bean
511
@Profile("production")
512
public CachingConnectionFactory prodConnectionFactory() {
513
CachingConnectionFactory factory = new CachingConnectionFactory();
514
515
// Production cluster configuration
516
factory.setAddresses("prod-rabbit1:5672,prod-rabbit2:5672,prod-rabbit3:5672");
517
factory.setUsername("${rabbitmq.prod.username}");
518
factory.setPassword("${rabbitmq.prod.password}");
519
factory.setVirtualHost("${rabbitmq.prod.vhost}");
520
521
// Production settings
522
factory.setChannelCacheSize(50);
523
factory.setConnectionCacheSize(10);
524
factory.setPublisherConfirmType(ConfirmType.CORRELATED);
525
factory.setPublisherReturns(true);
526
527
return factory;
528
}
529
}
530
```