or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client.mdconnections.mdexceptions.mdindex.mdpipeline.mdpubsub.md

connections.mddocs/

0

# Connection Management

1

2

Connection classes and pools for managing TCP/SSL connections to Redis cluster nodes with automatic discovery, load balancing, and failover support. The connection system handles cluster topology changes, node failures, and provides efficient connection pooling.

3

4

## Capabilities

5

6

### ClusterConnection

7

8

Standard TCP connection for cluster nodes with cluster-specific response parsing and read-only mode support.

9

10

```python { .api }

11

class ClusterConnection(Connection):

12

def __init__(self, readonly=False, **kwargs):

13

"""

14

Initialize cluster TCP connection.

15

16

Parameters:

17

- readonly (bool): Configure connection as read-only

18

- **kwargs: Standard Redis connection parameters

19

- host (str): Redis host

20

- port (int): Redis port

21

- db (int): Database number (usually 0 for cluster)

22

- password (str, optional): Authentication password

23

- socket_timeout (float): Socket timeout in seconds

24

- socket_connect_timeout (float): Connection timeout in seconds

25

- socket_keepalive (bool): Enable TCP keepalive

26

- socket_keepalive_options (dict): TCP keepalive options

27

- retry_on_timeout (bool): Retry commands on timeout

28

- encoding (str): String encoding (default 'utf-8')

29

- encoding_errors (str): Encoding error handling

30

- decode_responses (bool): Decode responses to strings

31

"""

32

33

def on_connect(self):

34

"""

35

Initialize connection with authentication and read-only mode setup.

36

Automatically sends READONLY command if readonly=True.

37

"""

38

```

39

40

### SSLClusterConnection

41

42

SSL/TLS connection for secure cluster communication with cluster-specific parsing.

43

44

```python { .api }

45

class SSLClusterConnection(SSLConnection):

46

def __init__(self, readonly=False, **kwargs):

47

"""

48

Initialize secure cluster SSL connection.

49

50

Parameters:

51

- readonly (bool): Configure connection as read-only

52

- **kwargs: Standard SSL connection parameters

53

- ssl_cert_reqs (str): Certificate verification mode

54

- ssl_ca_certs (str): CA certificates file path

55

- ssl_certfile (str): Client certificate file path

56

- ssl_keyfile (str): Client private key file path

57

- ssl_password (str, optional): Private key password

58

- ssl_check_hostname (bool): Verify hostname in certificate

59

- All ClusterConnection parameters

60

"""

61

62

def on_connect(self):

63

"""

64

Initialize SSL connection with authentication and read-only mode setup.

65

"""

66

```

67

68

### ClusterConnectionPool

69

70

Base connection pool managing connections to cluster nodes with automatic slot-to-node mapping and discovery.

71

72

```python { .api }

73

class ClusterConnectionPool(ConnectionPool):

74

def __init__(self, startup_nodes=None, init_slot_cache=True,

75

connection_class=None, max_connections=None,

76

max_connections_per_node=False, reinitialize_steps=None,

77

skip_full_coverage_check=False, nodemanager_follow_cluster=False,

78

host_port_remap=None, **connection_kwargs):

79

"""

80

Initialize cluster connection pool.

81

82

Parameters:

83

- startup_nodes (List[StartupNode], optional): Initial cluster nodes for bootstrap

84

- init_slot_cache (bool): Initialize slot-to-node mapping on startup

85

- connection_class (type, optional): Connection class (ClusterConnection or SSLClusterConnection)

86

- max_connections (int, optional): Maximum total connections across all nodes

87

- max_connections_per_node (bool|int): Per-node connection limit (True=16, int=custom limit)

88

- reinitialize_steps (int, optional): Commands before reinitializing cluster layout

89

- skip_full_coverage_check (bool): Skip validation that all slots are covered

90

- nodemanager_follow_cluster (bool): Automatically follow cluster topology changes

91

- host_port_remap (HostPortRemap, optional): Remap node addresses

92

- **connection_kwargs: Parameters passed to connection class

93

"""

94

95

def get_connection_by_node(self, node):

96

"""

97

Get connection to specific cluster node.

98

99

Parameters:

100

- node (NodeInfo): Node information dictionary

101

102

Returns:

103

ClusterConnection: Connection instance for the node

104

"""

105

106

def get_connection_by_slot(self, slot):

107

"""

108

Get connection for hash slot.

109

110

Parameters:

111

- slot (int): Hash slot number (0-16383)

112

113

Returns:

114

ClusterConnection: Connection to node handling the slot

115

"""

116

117

def get_random_connection(self):

118

"""

119

Get random connection from pool.

120

121

Returns:

122

ClusterConnection: Random connection instance

123

"""

124

125

def release(self, connection):

126

"""

127

Return connection to pool for reuse.

128

129

Parameters:

130

- connection (ClusterConnection): Connection to release

131

"""

132

133

def disconnect(self):

134

"""

135

Close all connections in pool and clear node information.

136

"""

137

138

def reset(self):

139

"""

140

Reset pool state and reinitialize cluster discovery.

141

"""

142

```

143

144

### ClusterBlockingConnectionPool

145

146

Thread-safe blocking connection pool that blocks when no connections are available.

147

148

```python { .api }

149

class ClusterBlockingConnectionPool(ClusterConnectionPool):

150

def __init__(self, max_connections=50, timeout=20, **kwargs):

151

"""

152

Initialize blocking cluster connection pool.

153

154

Parameters:

155

- max_connections (int): Maximum total connections across cluster

156

- timeout (int): Blocking timeout in seconds when no connections available

157

- **kwargs: All ClusterConnectionPool parameters

158

"""

159

160

def get_connection(self, command_name, *keys, **options):

161

"""

162

Get connection with blocking behavior.

163

Blocks up to timeout seconds if no connections available.

164

165

Parameters:

166

- command_name (str): Redis command name

167

- *keys: Command keys for slot routing

168

- **options: Additional routing options

169

170

Returns:

171

ClusterConnection: Available connection instance

172

173

Raises:

174

ConnectionError: If timeout exceeded waiting for connection

175

"""

176

177

def make_connection(self):

178

"""

179

Create new connection instance when pool not at capacity.

180

181

Returns:

182

ClusterConnection: New connection instance

183

"""

184

```

185

186

### ClusterReadOnlyConnectionPool

187

188

Connection pool optimized for read-only operations with automatic replica routing.

189

190

```python { .api }

191

class ClusterReadOnlyConnectionPool(ClusterConnectionPool):

192

def __init__(self, startup_nodes=None, init_slot_cache=True,

193

connection_class=None, **kwargs):

194

"""

195

Initialize read-only connection pool.

196

197

Parameters:

198

- startup_nodes (List[StartupNode], optional): Initial cluster nodes for bootstrap

199

- init_slot_cache (bool): Initialize slot-to-node mapping on startup

200

- connection_class (type, optional): Connection class (defaults to ClusterConnection with readonly=True)

201

- **kwargs: Additional ClusterConnectionPool parameters

202

203

Note: Automatically sets readonly=True on all connections

204

"""

205

```

206

207

### ClusterWithReadReplicasConnectionPool

208

209

Connection pool that intelligently routes read operations to replica nodes for load distribution.

210

211

```python { .api }

212

class ClusterWithReadReplicasConnectionPool(ClusterConnectionPool):

213

def __init__(self, startup_nodes=None, init_slot_cache=True,

214

connection_class=None, **kwargs):

215

"""

216

Initialize connection pool with read replica support.

217

218

Parameters:

219

- startup_nodes (List[StartupNode], optional): Initial cluster nodes for bootstrap

220

- init_slot_cache (bool): Initialize slot-to-node mapping on startup

221

- connection_class (type, optional): Connection class (defaults to ClusterConnection)

222

- **kwargs: Additional ClusterConnectionPool parameters

223

224

Features:

225

- Routes read commands to replica nodes when available

226

- Falls back to master nodes for write operations

227

- Provides load balancing across replicas

228

"""

229

230

def get_connection_by_slot(self, slot, read_command=False):

231

"""

232

Get connection for slot with replica routing support.

233

234

Parameters:

235

- slot (int): Hash slot number (0-16383)

236

- read_command (bool): Whether this is a read operation

237

238

Returns:

239

ClusterConnection: Connection to appropriate node (replica for reads, master for writes)

240

"""

241

```

242

243

## Connection Pool Management

244

245

### Node Discovery and Slot Mapping

246

247

The connection pool automatically discovers cluster topology and maintains slot-to-node mappings.

248

249

```python { .api }

250

# Internal methods used by RedisCluster (not typically called directly)

251

def initialize(self):

252

"""Initialize cluster discovery and slot mappings."""

253

254

def reset(self):

255

"""Reset cluster state and rediscover topology."""

256

257

def get_master_node_by_slot(self, slot):

258

"""Get master node information for slot."""

259

260

def get_nodes_by_server_type(self, server_type):

261

"""Get nodes filtered by type (master/slave)."""

262

```

263

264

### Connection Lifecycle

265

266

Connection pools handle the complete connection lifecycle including creation, reuse, and cleanup.

267

268

```python { .api }

269

def make_connection(self):

270

"""Create new connection when needed."""

271

272

def get_connection(self, command_name, *keys, **options):

273

"""Get appropriate connection for command and keys."""

274

275

def release(self, connection):

276

"""Return connection to pool for reuse."""

277

278

def disconnect(self):

279

"""Close all connections and cleanup resources."""

280

```

281

282

## Usage Examples

283

284

### Basic Connection Pool Usage

285

286

```python

287

from rediscluster import RedisCluster, ClusterConnectionPool, ClusterConnection

288

289

# Default connection pool (automatically created by RedisCluster)

290

rc = RedisCluster(startup_nodes=[{"host": "127.0.0.1", "port": "7000"}])

291

292

# Custom connection pool

293

pool = ClusterConnectionPool(

294

startup_nodes=[

295

{"host": "127.0.0.1", "port": "7000"},

296

{"host": "127.0.0.1", "port": "7001"},

297

{"host": "127.0.0.1", "port": "7002"}

298

],

299

max_connections=32,

300

max_connections_per_node=8

301

)

302

rc = RedisCluster(connection_pool=pool)

303

```

304

305

### SSL/TLS Configuration

306

307

```python

308

from rediscluster import RedisCluster, ClusterConnectionPool, SSLClusterConnection

309

310

# SSL connection pool

311

pool = ClusterConnectionPool(

312

startup_nodes=[{"host": "secure-cluster.example.com", "port": "6380"}],

313

connection_class=SSLClusterConnection,

314

ssl_cert_reqs="required",

315

ssl_ca_certs="/path/to/ca.crt",

316

ssl_certfile="/path/to/client.crt",

317

ssl_keyfile="/path/to/client.key"

318

)

319

rc = RedisCluster(connection_pool=pool)

320

```

321

322

### Blocking Connection Pool

323

324

```python

325

from rediscluster import RedisCluster, ClusterBlockingConnectionPool

326

327

# Blocking pool with limited connections

328

pool = ClusterBlockingConnectionPool(

329

startup_nodes=[{"host": "127.0.0.1", "port": "7000"}],

330

max_connections=20,

331

timeout=30 # Block up to 30 seconds for connection

332

)

333

rc = RedisCluster(connection_pool=pool)

334

```

335

336

### Read Replica Configuration

337

338

```python

339

from rediscluster import RedisCluster

340

341

# Enable reading from replicas for load distribution

342

rc = RedisCluster(

343

startup_nodes=[{"host": "127.0.0.1", "port": "7000"}],

344

read_from_replicas=True,

345

readonly_mode=False # Allow writes to masters, reads from replicas

346

)

347

348

# Read-only mode (all operations go to replicas when possible)

349

rc_readonly = RedisCluster(

350

startup_nodes=[{"host": "127.0.0.1", "port": "7000"}],

351

readonly_mode=True,

352

read_from_replicas=True

353

)

354

```

355

356

### Connection Pool Monitoring

357

358

```python

359

# Access connection pool information

360

pool = rc.connection_pool

361

362

# Get cluster topology

363

nodes = pool.nodes.all_nodes()

364

for node in nodes:

365

print(f"Node: {node['host']}:{node['port']} - {node['server_type']}")

366

367

# Get slot mappings

368

slots = pool.nodes.slots

369

print(f"Total slots mapped: {len(slots)}")

370

371

# Connection statistics (if available)

372

print(f"Active connections: {pool.created_connections}")

373

```

374

375

### Host/Port Remapping

376

377

```python

378

# Remap internal cluster addresses to external addresses

379

host_port_remap = {

380

('internal-node1', 7000): ('external-node1.example.com', 7000),

381

('internal-node2', 7001): ('external-node2.example.com', 7001),

382

('internal-node3', 7002): ('external-node3.example.com', 7002)

383

}

384

385

rc = RedisCluster(

386

startup_nodes=[{"host": "external-node1.example.com", "port": "7000"}],

387

host_port_remap=host_port_remap

388

)

389

```

390

391

### Advanced Pool Configuration

392

393

```python

394

pool = ClusterConnectionPool(

395

startup_nodes=[{"host": "127.0.0.1", "port": "7000"}],

396

init_slot_cache=True, # Initialize slot mappings on startup

397

max_connections=64, # Total connections across cluster

398

max_connections_per_node=16, # Per-node limit

399

reinitialize_steps=1000, # Reinitialize after 1000 commands

400

skip_full_coverage_check=False, # Ensure all slots covered

401

nodemanager_follow_cluster=True, # Follow topology changes

402

socket_timeout=5, # 5 second socket timeout

403

socket_connect_timeout=5, # 5 second connection timeout

404

retry_on_timeout=True, # Retry commands on timeout

405

decode_responses=True # Decode responses to strings

406

)

407

```