0
# Cluster Support
1
2
Redis Cluster support provides automatic sharding across multiple Redis nodes with built-in failover and high availability. The RedisCluster client handles node discovery, command routing, and cluster topology management.
3
4
```python
5
import redis
6
from redis import RedisCluster
7
from redis.cluster import ClusterNode
8
from typing import Optional, List, Dict, Any, Type
9
```
10
11
## Capabilities
12
13
### Cluster Client Initialization
14
15
Create Redis cluster clients with automatic node discovery and configuration.
16
17
```python { .api }
18
class RedisCluster:
19
def __init__(
20
self,
21
host: Optional[str] = None,
22
port: int = 7000,
23
startup_nodes: Optional[List[ClusterNode]] = None,
24
cluster_error_retry_attempts: int = 3,
25
require_full_coverage: bool = True,
26
skip_full_coverage_check: bool = False,
27
reinitialize_steps: int = 10,
28
read_from_replicas: bool = False,
29
dynamic_startup_nodes: bool = True,
30
connection_pool_class: Type[ConnectionPool] = ConnectionPool,
31
**kwargs
32
): ...
33
34
@classmethod
35
def from_url(
36
cls,
37
url: str,
38
**kwargs
39
) -> "RedisCluster": ...
40
41
class ClusterNode:
42
def __init__(
43
self,
44
host: str,
45
port: int,
46
server_type: Optional[str] = None
47
): ...
48
49
host: str
50
port: int
51
server_type: Optional[str]
52
```
53
54
### Cluster Operations
55
56
Cluster-specific operations for managing and inspecting cluster state.
57
58
```python { .api }
59
def cluster_info(self) -> Dict[str, Any]: ...
60
61
def cluster_nodes(self) -> Dict[str, Dict[str, Any]]: ...
62
63
def cluster_slots(self) -> List[List[Union[int, List[str]]]]: ...
64
65
def cluster_keyslot(self, key: KeyT) -> int: ...
66
67
def cluster_countkeysinslot(self, slot: int) -> int: ...
68
69
def cluster_getkeysinslot(self, slot: int, count: int) -> List[bytes]: ...
70
71
def cluster_addslots(self, *slots: int) -> bool: ...
72
73
def cluster_delslots(self, *slots: int) -> bool: ...
74
75
def cluster_flushslots(self) -> bool: ...
76
77
def cluster_setslot(self, slot: int, command: str, node_id: Optional[str] = None) -> bool: ...
78
79
def cluster_replicate(self, node_id: str) -> bool: ...
80
81
def cluster_meet(self, host: str, port: int) -> bool: ...
82
83
def cluster_forget(self, node_id: str) -> bool: ...
84
85
def cluster_reset(self, hard: bool = False) -> bool: ...
86
87
def cluster_failover(self, option: Optional[str] = None) -> bool: ...
88
```
89
90
### Pipeline Support
91
92
Cluster-aware pipeline operations for batching commands while respecting cluster topology.
93
94
```python { .api }
95
def pipeline(self, transaction: bool = False) -> "ClusterPipeline": ...
96
97
class ClusterPipeline:
98
def execute(self, raise_on_error: bool = True) -> List[Any]: ...
99
100
def reset(self) -> None: ...
101
102
def watch(self, *names: KeyT) -> bool: ...
103
104
def multi(self) -> "ClusterPipeline": ...
105
106
def discard(self) -> None: ...
107
```
108
109
### Node Management
110
111
Access and manage individual cluster nodes for advanced operations.
112
113
```python { .api }
114
def get_node(
115
self,
116
host: Optional[str] = None,
117
port: Optional[int] = None,
118
node_name: Optional[str] = None
119
) -> Optional[ClusterNode]: ...
120
121
def get_primaries(self) -> List[ClusterNode]: ...
122
123
def get_replicas(self) -> List[ClusterNode]: ...
124
125
def get_random_node(self) -> ClusterNode: ...
126
127
def get_master_node_by_key(self, key: KeyT) -> ClusterNode: ...
128
129
def get_nodes_by_keys(self, *keys: KeyT) -> Dict[ClusterNode, List[KeyT]]: ...
130
131
def execute_command_on_nodes(
132
self,
133
nodes: List[ClusterNode],
134
command: str,
135
*args,
136
**kwargs
137
) -> Dict[ClusterNode, Any]: ...
138
```
139
140
## Usage Examples
141
142
### Basic Cluster Connection
143
144
```python
145
import redis
146
from redis.cluster import RedisCluster, ClusterNode
147
148
# Connect with startup nodes
149
startup_nodes = [
150
ClusterNode("localhost", 7000),
151
ClusterNode("localhost", 7001),
152
ClusterNode("localhost", 7002)
153
]
154
155
cluster = RedisCluster(startup_nodes=startup_nodes, decode_responses=True)
156
157
# Basic operations work the same as single Redis
158
cluster.set("key1", "value1")
159
value = cluster.get("key1")
160
```
161
162
### Connection from URL
163
164
```python
165
# Connect to cluster via URL
166
cluster = RedisCluster.from_url("redis://localhost:7000")
167
168
# With authentication
169
cluster = RedisCluster.from_url("redis://:password@localhost:7000")
170
```
171
172
### Cluster Information and Management
173
174
```python
175
# Get cluster information
176
info = cluster.cluster_info()
177
print(f"Cluster state: {info['cluster_state']}")
178
print(f"Cluster size: {info['cluster_size']}")
179
180
# Get node information
181
nodes = cluster.cluster_nodes()
182
for node_id, node_info in nodes.items():
183
print(f"Node {node_id}: {node_info['ip']}:{node_info['port']}")
184
185
# Get slot distribution
186
slots = cluster.cluster_slots()
187
for slot_range in slots:
188
start_slot, end_slot = slot_range[0], slot_range[1]
189
master_info = slot_range[2]
190
print(f"Slots {start_slot}-{end_slot}: {master_info[0]}:{master_info[1]}")
191
```
192
193
### Working with Specific Nodes
194
195
```python
196
# Get different types of nodes
197
primaries = cluster.get_primaries()
198
replicas = cluster.get_replicas()
199
200
# Execute command on specific nodes
201
results = cluster.execute_command_on_nodes(
202
primaries,
203
"INFO",
204
"memory"
205
)
206
207
for node, result in results.items():
208
print(f"Node {node.host}:{node.port} memory info: {result}")
209
210
# Find which node handles a specific key
211
key = "user:1001"
212
node = cluster.get_master_node_by_key(key)
213
print(f"Key '{key}' is handled by {node.host}:{node.port}")
214
```
215
216
### Cluster Pipeline Operations
217
218
```python
219
# Use pipeline for batching (commands are automatically routed)
220
pipe = cluster.pipeline()
221
222
# Add commands to pipeline
223
pipe.set("key1", "value1")
224
pipe.set("key2", "value2")
225
pipe.get("key1")
226
pipe.get("key2")
227
228
# Execute all commands
229
results = pipe.execute()
230
print(f"Results: {results}")
231
```
232
233
### High Availability Configuration
234
235
```python
236
# Enable reading from replicas for better distribution
237
cluster = RedisCluster(
238
startup_nodes=startup_nodes,
239
read_from_replicas=True, # Read from replicas when possible
240
cluster_error_retry_attempts=5, # Retry failed operations
241
require_full_coverage=False, # Allow partial cluster operation
242
skip_full_coverage_check=True # Skip coverage validation
243
)
244
245
# Configure connection pooling
246
cluster = RedisCluster(
247
startup_nodes=startup_nodes,
248
max_connections=100, # Max connections per node
249
retry_on_timeout=True, # Retry on timeout
250
health_check_interval=30 # Health check frequency
251
)
252
```
253
254
### Cross-Slot Operations
255
256
```python
257
# Operations spanning multiple hash slots require special handling
258
import hashlib
259
260
def same_slot_keys(*keys):
261
"""Ensure all keys map to the same hash slot using hash tags"""
262
hash_tag = "{user1001}"
263
return [f"{hash_tag}:{key}" for key in keys]
264
265
# These keys will be in the same slot
266
keys = same_slot_keys("profile", "settings", "activity")
267
268
# Multi-key operations work within same slot
269
pipe = cluster.pipeline()
270
pipe.mset({
271
keys[0]: "profile_data",
272
keys[1]: "settings_data",
273
keys[2]: "activity_data"
274
})
275
pipe.mget(keys)
276
results = pipe.execute()
277
```
278
279
### Error Handling
280
281
```python
282
from redis.exceptions import RedisClusterException, ClusterDownError
283
284
try:
285
cluster.set("key", "value")
286
except ClusterDownError:
287
print("Cluster is down or unreachable")
288
except RedisClusterException as e:
289
print(f"Cluster error: {e}")
290
```