or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration-properties.mdconnection-configuration.mderror-handling.mdindex.mdlistener-configuration.mdmessage-operations.mdqueue-exchange-management.mdstream-processing.md

connection-configuration.mddocs/

0

# Connection Configuration

1

2

Auto-configuration and customization of RabbitMQ connection factories, including connection pooling, SSL configuration, clustering support, and connection management strategies.

3

4

## Connection Factory Configuration

5

6

### Basic Connection Factory

7

8

```java { .api }

9

@Configuration

10

public class ConnectionConfig {

11

12

@Bean

13

public CachingConnectionFactory connectionFactory() {

14

CachingConnectionFactory factory = new CachingConnectionFactory();

15

factory.setHost("localhost");

16

factory.setPort(5672);

17

factory.setUsername("guest");

18

factory.setPassword("guest");

19

factory.setVirtualHost("/");

20

return factory;

21

}

22

}

23

```

24

25

### Connection Factory Customization

26

27

```java { .api }

28

@FunctionalInterface

29

public interface ConnectionFactoryCustomizer {

30

void customize(ConnectionFactory factory);

31

}

32

33

@Configuration

34

public class CustomConnectionConfig {

35

36

@Bean

37

public ConnectionFactoryCustomizer connectionFactoryCustomizer() {

38

return (connectionFactory) -> {

39

connectionFactory.setRequestedHeartbeat(60);

40

connectionFactory.setConnectionTimeout(30000);

41

connectionFactory.setHandshakeTimeout(10000);

42

connectionFactory.setShutdownTimeout(5000);

43

connectionFactory.setRequestedChannelMax(20);

44

connectionFactory.setNetworkRecoveryInterval(5000);

45

connectionFactory.setAutomaticRecoveryEnabled(true);

46

connectionFactory.setTopologyRecoveryEnabled(true);

47

};

48

}

49

}

50

```

51

52

### Caching Connection Factory Configuration

53

54

```java { .api }

55

@Configuration

56

public class CachingConfig {

57

58

@Bean

59

public CachingConnectionFactory cachingConnectionFactory() {

60

CachingConnectionFactory factory = new CachingConnectionFactory("localhost");

61

62

// Channel caching configuration

63

factory.setCacheMode(CacheMode.CHANNEL);

64

factory.setChannelCacheSize(25);

65

factory.setChannelCheckoutTimeout(5000);

66

67

// Connection configuration

68

factory.setConnectionTimeout(60000);

69

factory.setCloseTimeout(30000);

70

71

// Publisher confirms

72

factory.setPublisherConfirmType(ConfirmType.CORRELATED);

73

factory.setPublisherReturns(true);

74

75

return factory;

76

}

77

}

78

```

79

80

### Connection Caching Modes

81

82

```java { .api }

83

@Configuration

84

public class CachingModeConfig {

85

86

// Channel caching mode (default)

87

@Bean("channelCachingFactory")

88

public CachingConnectionFactory channelCachingFactory() {

89

CachingConnectionFactory factory = new CachingConnectionFactory("localhost");

90

factory.setCacheMode(CacheMode.CHANNEL);

91

factory.setChannelCacheSize(25);

92

return factory;

93

}

94

95

// Connection caching mode

96

@Bean("connectionCachingFactory")

97

public CachingConnectionFactory connectionCachingFactory() {

98

CachingConnectionFactory factory = new CachingConnectionFactory("localhost");

99

factory.setCacheMode(CacheMode.CONNECTION);

100

factory.setConnectionCacheSize(5);

101

factory.setChannelCacheSize(10);

102

return factory;

103

}

104

}

105

```

106

107

## SSL Configuration

108

109

### SSL Connection Factory

110

111

```java { .api }

112

@Configuration

113

public class SSLConfig {

114

115

@Bean

116

public CachingConnectionFactory sslConnectionFactory() throws Exception {

117

CachingConnectionFactory factory = new CachingConnectionFactory();

118

factory.setHost("rabbitmq.example.com");

119

factory.setPort(5671);

120

factory.setUsername("ssl-user");

121

factory.setPassword("ssl-password");

122

123

// SSL configuration

124

factory.getRabbitConnectionFactory().useSslProtocol();

125

126

return factory;

127

}

128

129

@Bean

130

public CachingConnectionFactory customSslConnectionFactory() throws Exception {

131

CachingConnectionFactory factory = new CachingConnectionFactory();

132

factory.setHost("rabbitmq.example.com");

133

factory.setPort(5671);

134

135

// Custom SSL context

136

SSLContext sslContext = createCustomSSLContext();

137

factory.getRabbitConnectionFactory().useSslProtocol(sslContext);

138

139

return factory;

140

}

141

142

private SSLContext createCustomSSLContext() throws Exception {

143

SSLContext context = SSLContext.getInstance("TLSv1.2");

144

145

// Load key store

146

KeyStore keyStore = KeyStore.getInstance("PKCS12");

147

keyStore.load(new FileInputStream("client-keystore.p12"), "password".toCharArray());

148

149

KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");

150

kmf.init(keyStore, "password".toCharArray());

151

152

// Load trust store

153

KeyStore trustStore = KeyStore.getInstance("JKS");

154

trustStore.load(new FileInputStream("truststore.jks"), "password".toCharArray());

155

156

TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");

157

tmf.init(trustStore);

158

159

context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom());

160

161

return context;

162

}

163

}

164

```

165

166

### SSL with Client Certificate Authentication

167

168

```java { .api }

169

@Configuration

170

public class ClientCertConfig {

171

172

@Bean

173

public CachingConnectionFactory clientCertConnectionFactory() throws Exception {

174

CachingConnectionFactory factory = new CachingConnectionFactory();

175

factory.setHost("secure-rabbitmq.example.com");

176

factory.setPort(5671);

177

178

// External authentication (x509 certificate)

179

factory.getRabbitConnectionFactory().setCredentialsProvider(new ExternalCredentialsProvider());

180

181

// SSL with client certificate

182

SSLContext sslContext = createClientCertSSLContext();

183

factory.getRabbitConnectionFactory().useSslProtocol(sslContext);

184

185

return factory;

186

}

187

188

private SSLContext createClientCertSSLContext() throws Exception {

189

// Implementation for client certificate SSL context

190

// Load client certificate and configure SSL context

191

return SSLContext.getDefault();

192

}

193

}

194

```

195

196

## Clustering and High Availability

197

198

### Multiple Hosts Configuration

199

200

```java { .api }

201

@Configuration

202

public class ClusterConfig {

203

204

@Bean

205

public CachingConnectionFactory clusterConnectionFactory() {

206

CachingConnectionFactory factory = new CachingConnectionFactory();

207

208

// Multiple host addresses

209

Address[] addresses = {

210

new Address("rabbitmq1.example.com", 5672),

211

new Address("rabbitmq2.example.com", 5672),

212

new Address("rabbitmq3.example.com", 5672)

213

};

214

factory.setAddresses(addresses);

215

factory.setAddressShuffleMode(AddressShuffleMode.RANDOM);

216

217

factory.setUsername("cluster-user");

218

factory.setPassword("cluster-password");

219

220

return factory;

221

}

222

}

223

```

224

225

### Load Balancing Connection Factory

226

227

```java { .api }

228

@Configuration

229

public class LoadBalancingConfig {

230

231

@Bean

232

public AbstractConnectionFactory loadBalancingConnectionFactory() {

233

List<Address> addresses = Arrays.asList(

234

new Address("node1.rabbitmq.com", 5672),

235

new Address("node2.rabbitmq.com", 5672),

236

new Address("node3.rabbitmq.com", 5672)

237

);

238

239

// Round-robin load balancing

240

return new SimpleRoutingConnectionFactory() {

241

private int currentIndex = 0;

242

243

@Override

244

protected Object determineCurrentLookupKey() {

245

return addresses.get((currentIndex++) % addresses.size());

246

}

247

};

248

}

249

}

250

```

251

252

## Connection Recovery and Resilience

253

254

### Automatic Recovery Configuration

255

256

```java { .api }

257

@Configuration

258

public class RecoveryConfig {

259

260

@Bean

261

public CachingConnectionFactory resilientConnectionFactory() {

262

CachingConnectionFactory factory = new CachingConnectionFactory("localhost");

263

264

// Automatic recovery settings

265

factory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);

266

factory.getRabbitConnectionFactory().setTopologyRecoveryEnabled(true);

267

factory.getRabbitConnectionFactory().setNetworkRecoveryInterval(5000);

268

269

// Connection settings for resilience

270

factory.setConnectionTimeout(60000);

271

factory.getRabbitConnectionFactory().setRequestedHeartbeat(30);

272

factory.getRabbitConnectionFactory().setConnectionTimeout(30000);

273

factory.getRabbitConnectionFactory().setHandshakeTimeout(10000);

274

275

return factory;

276

}

277

}

278

```

279

280

### Custom Recovery Callback

281

282

```java { .api }

283

@Configuration

284

public class CustomRecoveryConfig {

285

286

@Bean

287

public CachingConnectionFactory connectionFactoryWithRecovery() {

288

CachingConnectionFactory factory = new CachingConnectionFactory("localhost");

289

290

factory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);

291

292

// Add recovery listeners

293

factory.addConnectionListener(new ConnectionListener() {

294

@Override

295

public void onCreate(Connection connection) {

296

log.info("Connection created: {}", connection);

297

}

298

299

@Override

300

public void onClose(Connection connection) {

301

log.warn("Connection closed: {}", connection);

302

}

303

304

@Override

305

public void onShutDown(ShutdownSignalException signal) {

306

log.error("Connection shutdown: {}", signal.getReason());

307

}

308

});

309

310

return factory;

311

}

312

}

313

```

314

315

## Connection Monitoring and Health

316

317

### Connection Health Indicator

318

319

```java { .api }

320

@Component

321

public class RabbitHealthIndicator implements HealthIndicator {

322

323

private final CachingConnectionFactory connectionFactory;

324

325

public RabbitHealthIndicator(CachingConnectionFactory connectionFactory) {

326

this.connectionFactory = connectionFactory;

327

}

328

329

@Override

330

public Health health() {

331

Health.Builder builder = new Health.Builder();

332

333

try {

334

Connection connection = connectionFactory.createConnection();

335

if (connection.isOpen()) {

336

builder.up()

337

.withDetail("version", getServerVersion(connection))

338

.withDetail("connectionCount", connectionFactory.getCacheProperties().get("openConnections"));

339

} else {

340

builder.down().withDetail("reason", "Connection is not open");

341

}

342

connection.close();

343

} catch (Exception e) {

344

builder.down(e);

345

}

346

347

return builder.build();

348

}

349

350

private String getServerVersion(Connection connection) {

351

return connection.getServerProperties().get("version").toString();

352

}

353

}

354

```

355

356

### Connection Metrics

357

358

```java { .api }

359

@Component

360

public class ConnectionMetrics {

361

362

private final CachingConnectionFactory connectionFactory;

363

private final MeterRegistry meterRegistry;

364

365

public ConnectionMetrics(CachingConnectionFactory connectionFactory, MeterRegistry meterRegistry) {

366

this.connectionFactory = connectionFactory;

367

this.meterRegistry = meterRegistry;

368

setupMetrics();

369

}

370

371

private void setupMetrics() {

372

Gauge.builder("rabbitmq.connections.open")

373

.description("Number of open connections")

374

.register(meterRegistry, connectionFactory, cf -> {

375

Properties props = cf.getCacheProperties();

376

return props.get("openConnections");

377

});

378

379

Gauge.builder("rabbitmq.channels.cached")

380

.description("Number of cached channels")

381

.register(meterRegistry, connectionFactory, cf -> {

382

Properties props = cf.getCacheProperties();

383

return props.get("channelCacheSize");

384

});

385

}

386

}

387

```

388

389

## Custom Connection Name Strategy

390

391

### Connection Naming

392

393

```java { .api }

394

@Configuration

395

public class ConnectionNamingConfig {

396

397

@Bean

398

public ConnectionNameStrategy connectionNameStrategy() {

399

return (connectionFactory) -> {

400

String applicationName = "my-spring-app";

401

String instanceId = System.getProperty("instance.id", UUID.randomUUID().toString());

402

String hostname = getHostname();

403

return String.format("%s-%s@%s", applicationName, instanceId, hostname);

404

};

405

}

406

407

@Bean

408

public CachingConnectionFactory namedConnectionFactory(ConnectionNameStrategy connectionNameStrategy) {

409

CachingConnectionFactory factory = new CachingConnectionFactory("localhost");

410

factory.setConnectionNameStrategy(connectionNameStrategy);

411

return factory;

412

}

413

414

private String getHostname() {

415

try {

416

return InetAddress.getLocalHost().getHostName();

417

} catch (UnknownHostException e) {

418

return "unknown";

419

}

420

}

421

}

422

```

423

424

## Threading and Executor Configuration

425

426

### Custom Executor Configuration

427

428

```java { .api }

429

@Configuration

430

public class ExecutorConfig {

431

432

@Bean

433

public Executor rabbitConnectionExecutor() {

434

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

435

executor.setCorePoolSize(5);

436

executor.setMaxPoolSize(10);

437

executor.setQueueCapacity(25);

438

executor.setThreadNamePrefix("rabbit-conn-");

439

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

440

executor.initialize();

441

return executor;

442

}

443

444

@Bean

445

public CachingConnectionFactory connectionFactoryWithExecutor(Executor rabbitConnectionExecutor) {

446

CachingConnectionFactory factory = new CachingConnectionFactory("localhost");

447

factory.setExecutor(rabbitConnectionExecutor);

448

return factory;

449

}

450

}

451

```

452

453

## Advanced Connection Scenarios

454

455

### Multi-tenant Connection Management

456

457

```java { .api }

458

@Configuration

459

public class MultiTenantConnectionConfig {

460

461

@Bean

462

public AbstractRoutingConnectionFactory routingConnectionFactory() {

463

SimpleRoutingConnectionFactory routingFactory = new SimpleRoutingConnectionFactory();

464

465

Map<Object, ConnectionFactory> targetConnectionFactories = new HashMap<>();

466

targetConnectionFactories.put("tenant1", createConnectionFactory("tenant1-vhost", "tenant1-user", "tenant1-pass"));

467

targetConnectionFactories.put("tenant2", createConnectionFactory("tenant2-vhost", "tenant2-user", "tenant2-pass"));

468

targetConnectionFactories.put("default", createConnectionFactory("/", "guest", "guest"));

469

470

routingFactory.setTargetConnectionFactories(targetConnectionFactories);

471

routingFactory.setDefaultTargetConnectionFactory(targetConnectionFactories.get("default"));

472

473

return routingFactory;

474

}

475

476

private CachingConnectionFactory createConnectionFactory(String vhost, String username, String password) {

477

CachingConnectionFactory factory = new CachingConnectionFactory("localhost");

478

factory.setVirtualHost(vhost);

479

factory.setUsername(username);

480

factory.setPassword(password);

481

return factory;

482

}

483

484

@Component

485

public static class TenantRoutingConnectionFactory extends SimpleRoutingConnectionFactory {

486

@Override

487

protected Object determineCurrentLookupKey() {

488

return TenantContext.getCurrentTenant();

489

}

490

}

491

}

492

```

493

494

### Environment-specific Configuration

495

496

```java { .api }

497

@Configuration

498

public class EnvironmentConnectionConfig {

499

500

@Bean

501

@Profile("development")

502

public CachingConnectionFactory devConnectionFactory() {

503

CachingConnectionFactory factory = new CachingConnectionFactory("localhost");

504

factory.setUsername("dev-user");

505

factory.setPassword("dev-pass");

506

factory.setChannelCacheSize(5);

507

return factory;

508

}

509

510

@Bean

511

@Profile("production")

512

public CachingConnectionFactory prodConnectionFactory() {

513

CachingConnectionFactory factory = new CachingConnectionFactory();

514

515

// Production cluster configuration

516

factory.setAddresses("prod-rabbit1:5672,prod-rabbit2:5672,prod-rabbit3:5672");

517

factory.setUsername("${rabbitmq.prod.username}");

518

factory.setPassword("${rabbitmq.prod.password}");

519

factory.setVirtualHost("${rabbitmq.prod.vhost}");

520

521

// Production settings

522

factory.setChannelCacheSize(50);

523

factory.setConnectionCacheSize(10);

524

factory.setPublisherConfirmType(ConfirmType.CORRELATED);

525

factory.setPublisherReturns(true);

526

527

return factory;

528

}

529

}

530

```