Python client for Redis database and key-value store
—
Redis Cluster support provides automatic sharding across multiple Redis nodes with built-in failover and high availability. The RedisCluster client handles node discovery, command routing, and cluster topology management.
import redis
from redis import RedisCluster
from redis.cluster import ClusterNode
from typing import Optional, List, Dict, Any, TypeCreate Redis cluster clients with automatic node discovery and configuration.
class RedisCluster:
def __init__(
self,
host: Optional[str] = None,
port: int = 7000,
startup_nodes: Optional[List[ClusterNode]] = None,
cluster_error_retry_attempts: int = 3,
require_full_coverage: bool = True,
skip_full_coverage_check: bool = False,
reinitialize_steps: int = 10,
read_from_replicas: bool = False,
dynamic_startup_nodes: bool = True,
connection_pool_class: Type[ConnectionPool] = ConnectionPool,
**kwargs
): ...
@classmethod
def from_url(
cls,
url: str,
**kwargs
) -> "RedisCluster": ...
class ClusterNode:
def __init__(
self,
host: str,
port: int,
server_type: Optional[str] = None
): ...
host: str
port: int
server_type: Optional[str]Cluster-specific operations for managing and inspecting cluster state.
def cluster_info(self) -> Dict[str, Any]: ...
def cluster_nodes(self) -> Dict[str, Dict[str, Any]]: ...
def cluster_slots(self) -> List[List[Union[int, List[str]]]]: ...
def cluster_keyslot(self, key: KeyT) -> int: ...
def cluster_countkeysinslot(self, slot: int) -> int: ...
def cluster_getkeysinslot(self, slot: int, count: int) -> List[bytes]: ...
def cluster_addslots(self, *slots: int) -> bool: ...
def cluster_delslots(self, *slots: int) -> bool: ...
def cluster_flushslots(self) -> bool: ...
def cluster_setslot(self, slot: int, command: str, node_id: Optional[str] = None) -> bool: ...
def cluster_replicate(self, node_id: str) -> bool: ...
def cluster_meet(self, host: str, port: int) -> bool: ...
def cluster_forget(self, node_id: str) -> bool: ...
def cluster_reset(self, hard: bool = False) -> bool: ...
def cluster_failover(self, option: Optional[str] = None) -> bool: ...Cluster-aware pipeline operations for batching commands while respecting cluster topology.
def pipeline(self, transaction: bool = False) -> "ClusterPipeline": ...
class ClusterPipeline:
def execute(self, raise_on_error: bool = True) -> List[Any]: ...
def reset(self) -> None: ...
def watch(self, *names: KeyT) -> bool: ...
def multi(self) -> "ClusterPipeline": ...
def discard(self) -> None: ...Access and manage individual cluster nodes for advanced operations.
def get_node(
self,
host: Optional[str] = None,
port: Optional[int] = None,
node_name: Optional[str] = None
) -> Optional[ClusterNode]: ...
def get_primaries(self) -> List[ClusterNode]: ...
def get_replicas(self) -> List[ClusterNode]: ...
def get_random_node(self) -> ClusterNode: ...
def get_master_node_by_key(self, key: KeyT) -> ClusterNode: ...
def get_nodes_by_keys(self, *keys: KeyT) -> Dict[ClusterNode, List[KeyT]]: ...
def execute_command_on_nodes(
self,
nodes: List[ClusterNode],
command: str,
*args,
**kwargs
) -> Dict[ClusterNode, Any]: ...import redis
from redis.cluster import RedisCluster, ClusterNode
# Connect with startup nodes
startup_nodes = [
ClusterNode("localhost", 7000),
ClusterNode("localhost", 7001),
ClusterNode("localhost", 7002)
]
cluster = RedisCluster(startup_nodes=startup_nodes, decode_responses=True)
# Basic operations work the same as single Redis
cluster.set("key1", "value1")
value = cluster.get("key1")# Connect to cluster via URL
cluster = RedisCluster.from_url("redis://localhost:7000")
# With authentication
cluster = RedisCluster.from_url("redis://:password@localhost:7000")# Get cluster information
info = cluster.cluster_info()
print(f"Cluster state: {info['cluster_state']}")
print(f"Cluster size: {info['cluster_size']}")
# Get node information
nodes = cluster.cluster_nodes()
for node_id, node_info in nodes.items():
print(f"Node {node_id}: {node_info['ip']}:{node_info['port']}")
# Get slot distribution
slots = cluster.cluster_slots()
for slot_range in slots:
start_slot, end_slot = slot_range[0], slot_range[1]
master_info = slot_range[2]
print(f"Slots {start_slot}-{end_slot}: {master_info[0]}:{master_info[1]}")# Get different types of nodes
primaries = cluster.get_primaries()
replicas = cluster.get_replicas()
# Execute command on specific nodes
results = cluster.execute_command_on_nodes(
primaries,
"INFO",
"memory"
)
for node, result in results.items():
print(f"Node {node.host}:{node.port} memory info: {result}")
# Find which node handles a specific key
key = "user:1001"
node = cluster.get_master_node_by_key(key)
print(f"Key '{key}' is handled by {node.host}:{node.port}")# Use pipeline for batching (commands are automatically routed)
pipe = cluster.pipeline()
# Add commands to pipeline
pipe.set("key1", "value1")
pipe.set("key2", "value2")
pipe.get("key1")
pipe.get("key2")
# Execute all commands
results = pipe.execute()
print(f"Results: {results}")# Enable reading from replicas for better distribution
cluster = RedisCluster(
startup_nodes=startup_nodes,
read_from_replicas=True, # Read from replicas when possible
cluster_error_retry_attempts=5, # Retry failed operations
require_full_coverage=False, # Allow partial cluster operation
skip_full_coverage_check=True # Skip coverage validation
)
# Configure connection pooling
cluster = RedisCluster(
startup_nodes=startup_nodes,
max_connections=100, # Max connections per node
retry_on_timeout=True, # Retry on timeout
health_check_interval=30 # Health check frequency
)# Operations spanning multiple hash slots require special handling
import hashlib
def same_slot_keys(*keys):
"""Ensure all keys map to the same hash slot using hash tags"""
hash_tag = "{user1001}"
return [f"{hash_tag}:{key}" for key in keys]
# These keys will be in the same slot
keys = same_slot_keys("profile", "settings", "activity")
# Multi-key operations work within same slot
pipe = cluster.pipeline()
pipe.mset({
keys[0]: "profile_data",
keys[1]: "settings_data",
keys[2]: "activity_data"
})
pipe.mget(keys)
results = pipe.execute()from redis.exceptions import RedisClusterException, ClusterDownError
try:
cluster.set("key", "value")
except ClusterDownError:
print("Cluster is down or unreachable")
except RedisClusterException as e:
print(f"Cluster error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-redis