or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-support.mdcluster-support.mdconnection-management.mdcore-client.mddistributed-locking.mderror-handling.mdhigh-availability.mdindex.mdpipelines-transactions.mdpubsub-messaging.md

cluster-support.mddocs/

0

# Cluster Support

1

2

Redis Cluster support provides automatic sharding across multiple Redis nodes with built-in failover and high availability. The RedisCluster client handles node discovery, command routing, and cluster topology management.

3

4

```python

5

import redis

6

from redis import RedisCluster

7

from redis.cluster import ClusterNode

8

from typing import Optional, List, Dict, Any, Type

9

```

10

11

## Capabilities

12

13

### Cluster Client Initialization

14

15

Create Redis cluster clients with automatic node discovery and configuration.

16

17

```python { .api }

18

class RedisCluster:

19

def __init__(

20

self,

21

host: Optional[str] = None,

22

port: int = 7000,

23

startup_nodes: Optional[List[ClusterNode]] = None,

24

cluster_error_retry_attempts: int = 3,

25

require_full_coverage: bool = True,

26

skip_full_coverage_check: bool = False,

27

reinitialize_steps: int = 10,

28

read_from_replicas: bool = False,

29

dynamic_startup_nodes: bool = True,

30

connection_pool_class: Type[ConnectionPool] = ConnectionPool,

31

**kwargs

32

): ...

33

34

@classmethod

35

def from_url(

36

cls,

37

url: str,

38

**kwargs

39

) -> "RedisCluster": ...

40

41

class ClusterNode:

42

def __init__(

43

self,

44

host: str,

45

port: int,

46

server_type: Optional[str] = None

47

): ...

48

49

host: str

50

port: int

51

server_type: Optional[str]

52

```

53

54

### Cluster Operations

55

56

Cluster-specific operations for managing and inspecting cluster state.

57

58

```python { .api }

59

def cluster_info(self) -> Dict[str, Any]: ...

60

61

def cluster_nodes(self) -> Dict[str, Dict[str, Any]]: ...

62

63

def cluster_slots(self) -> List[List[Union[int, List[str]]]]: ...

64

65

def cluster_keyslot(self, key: KeyT) -> int: ...

66

67

def cluster_countkeysinslot(self, slot: int) -> int: ...

68

69

def cluster_getkeysinslot(self, slot: int, count: int) -> List[bytes]: ...

70

71

def cluster_addslots(self, *slots: int) -> bool: ...

72

73

def cluster_delslots(self, *slots: int) -> bool: ...

74

75

def cluster_flushslots(self) -> bool: ...

76

77

def cluster_setslot(self, slot: int, command: str, node_id: Optional[str] = None) -> bool: ...

78

79

def cluster_replicate(self, node_id: str) -> bool: ...

80

81

def cluster_meet(self, host: str, port: int) -> bool: ...

82

83

def cluster_forget(self, node_id: str) -> bool: ...

84

85

def cluster_reset(self, hard: bool = False) -> bool: ...

86

87

def cluster_failover(self, option: Optional[str] = None) -> bool: ...

88

```

89

90

### Pipeline Support

91

92

Cluster-aware pipeline operations for batching commands while respecting cluster topology.

93

94

```python { .api }

95

def pipeline(self, transaction: bool = False) -> "ClusterPipeline": ...

96

97

class ClusterPipeline:

98

def execute(self, raise_on_error: bool = True) -> List[Any]: ...

99

100

def reset(self) -> None: ...

101

102

def watch(self, *names: KeyT) -> bool: ...

103

104

def multi(self) -> "ClusterPipeline": ...

105

106

def discard(self) -> None: ...

107

```

108

109

### Node Management

110

111

Access and manage individual cluster nodes for advanced operations.

112

113

```python { .api }

114

def get_node(

115

self,

116

host: Optional[str] = None,

117

port: Optional[int] = None,

118

node_name: Optional[str] = None

119

) -> Optional[ClusterNode]: ...

120

121

def get_primaries(self) -> List[ClusterNode]: ...

122

123

def get_replicas(self) -> List[ClusterNode]: ...

124

125

def get_random_node(self) -> ClusterNode: ...

126

127

def get_master_node_by_key(self, key: KeyT) -> ClusterNode: ...

128

129

def get_nodes_by_keys(self, *keys: KeyT) -> Dict[ClusterNode, List[KeyT]]: ...

130

131

def execute_command_on_nodes(

132

self,

133

nodes: List[ClusterNode],

134

command: str,

135

*args,

136

**kwargs

137

) -> Dict[ClusterNode, Any]: ...

138

```

139

140

## Usage Examples

141

142

### Basic Cluster Connection

143

144

```python

145

import redis

146

from redis.cluster import RedisCluster, ClusterNode

147

148

# Connect with startup nodes

149

startup_nodes = [

150

ClusterNode("localhost", 7000),

151

ClusterNode("localhost", 7001),

152

ClusterNode("localhost", 7002)

153

]

154

155

cluster = RedisCluster(startup_nodes=startup_nodes, decode_responses=True)

156

157

# Basic operations work the same as single Redis

158

cluster.set("key1", "value1")

159

value = cluster.get("key1")

160

```

161

162

### Connection from URL

163

164

```python

165

# Connect to cluster via URL

166

cluster = RedisCluster.from_url("redis://localhost:7000")

167

168

# With authentication

169

cluster = RedisCluster.from_url("redis://:password@localhost:7000")

170

```

171

172

### Cluster Information and Management

173

174

```python

175

# Get cluster information

176

info = cluster.cluster_info()

177

print(f"Cluster state: {info['cluster_state']}")

178

print(f"Cluster size: {info['cluster_size']}")

179

180

# Get node information

181

nodes = cluster.cluster_nodes()

182

for node_id, node_info in nodes.items():

183

print(f"Node {node_id}: {node_info['ip']}:{node_info['port']}")

184

185

# Get slot distribution

186

slots = cluster.cluster_slots()

187

for slot_range in slots:

188

start_slot, end_slot = slot_range[0], slot_range[1]

189

master_info = slot_range[2]

190

print(f"Slots {start_slot}-{end_slot}: {master_info[0]}:{master_info[1]}")

191

```

192

193

### Working with Specific Nodes

194

195

```python

196

# Get different types of nodes

197

primaries = cluster.get_primaries()

198

replicas = cluster.get_replicas()

199

200

# Execute command on specific nodes

201

results = cluster.execute_command_on_nodes(

202

primaries,

203

"INFO",

204

"memory"

205

)

206

207

for node, result in results.items():

208

print(f"Node {node.host}:{node.port} memory info: {result}")

209

210

# Find which node handles a specific key

211

key = "user:1001"

212

node = cluster.get_master_node_by_key(key)

213

print(f"Key '{key}' is handled by {node.host}:{node.port}")

214

```

215

216

### Cluster Pipeline Operations

217

218

```python

219

# Use pipeline for batching (commands are automatically routed)

220

pipe = cluster.pipeline()

221

222

# Add commands to pipeline

223

pipe.set("key1", "value1")

224

pipe.set("key2", "value2")

225

pipe.get("key1")

226

pipe.get("key2")

227

228

# Execute all commands

229

results = pipe.execute()

230

print(f"Results: {results}")

231

```

232

233

### High Availability Configuration

234

235

```python

236

# Enable reading from replicas for better distribution

237

cluster = RedisCluster(

238

startup_nodes=startup_nodes,

239

read_from_replicas=True, # Read from replicas when possible

240

cluster_error_retry_attempts=5, # Retry failed operations

241

require_full_coverage=False, # Allow partial cluster operation

242

skip_full_coverage_check=True # Skip coverage validation

243

)

244

245

# Configure connection pooling

246

cluster = RedisCluster(

247

startup_nodes=startup_nodes,

248

max_connections=100, # Max connections per node

249

retry_on_timeout=True, # Retry on timeout

250

health_check_interval=30 # Health check frequency

251

)

252

```

253

254

### Cross-Slot Operations

255

256

```python

257

# Operations spanning multiple hash slots require special handling

258

import hashlib

259

260

def same_slot_keys(*keys):

261

"""Ensure all keys map to the same hash slot using hash tags"""

262

hash_tag = "{user1001}"

263

return [f"{hash_tag}:{key}" for key in keys]

264

265

# These keys will be in the same slot

266

keys = same_slot_keys("profile", "settings", "activity")

267

268

# Multi-key operations work within same slot

269

pipe = cluster.pipeline()

270

pipe.mset({

271

keys[0]: "profile_data",

272

keys[1]: "settings_data",

273

keys[2]: "activity_data"

274

})

275

pipe.mget(keys)

276

results = pipe.execute()

277

```

278

279

### Error Handling

280

281

```python

282

from redis.exceptions import RedisClusterException, ClusterDownError

283

284

try:

285

cluster.set("key", "value")

286

except ClusterDownError:

287

print("Cluster is down or unreachable")

288

except RedisClusterException as e:

289

print(f"Cluster error: {e}")

290

```