Library for communicating with Redis Clusters. Built on top of redis-py lib
—
The RedisCluster class is the primary interface for interacting with Redis clusters. It extends the standard Redis client with cluster-aware implementations of Redis commands, automatic slot management, node discovery, and failover support.
Main Redis cluster client that provides all standard Redis operations with cluster-aware implementations and additional cluster management functionality.
class RedisCluster(Redis):
def __init__(self, host=None, port=None, startup_nodes=None,
max_connections=None, max_connections_per_node=False,
init_slot_cache=True, readonly_mode=False,
reinitialize_steps=None, skip_full_coverage_check=False,
nodemanager_follow_cluster=False, connection_class=None,
read_from_replicas=False, cluster_down_retry_attempts=3,
host_port_remap=None, **kwargs):
"""
Initialize Redis cluster client.
Parameters:
- host (str, optional): Single startup host
- port (int, optional): Port for single host
- startup_nodes (List[StartupNode], optional): List of initial cluster nodes
- max_connections (int, optional): Maximum total connections across cluster
- max_connections_per_node (bool|int): Enable per-node connection limits
- init_slot_cache (bool): Initialize slot-to-node mapping on startup
- readonly_mode (bool): Enable read-only operations
- reinitialize_steps (int, optional): Steps before reinitializing cluster layout
- skip_full_coverage_check (bool): Skip full cluster coverage validation
- nodemanager_follow_cluster (bool): Automatically follow cluster topology changes
- connection_class (type, optional): Custom connection class
- read_from_replicas (bool): Allow reading from replica nodes
- cluster_down_retry_attempts (int): Retry attempts for cluster-down errors
- host_port_remap (HostPortRemap, optional): Host/port remapping configuration
- **kwargs: Additional Redis connection parameters (decode_responses, etc.)
"""Create cluster clients from URLs or startup node lists.
@classmethod
def from_url(cls, url, **kwargs):
"""
Create RedisCluster client from connection URL.
Parameters:
- url (str): Redis cluster URL (redis://host:port)
- **kwargs: Additional connection parameters
Returns:
RedisCluster: Configured cluster client instance
"""Essential client functionality for command execution and cluster interface creation.
def execute_command(self, *args, **kwargs):
"""
Execute Redis command with cluster routing.
Parameters:
- *args: Command and arguments
- **kwargs: Additional execution options
Returns:
Any: Command result
"""
def pubsub(self, **kwargs):
"""
Get cluster-aware pub/sub interface.
Parameters:
- **kwargs: PubSub configuration options
Returns:
ClusterPubSub: Cluster pub/sub instance
"""
def pipeline(self, transaction=None, shard_hint=None, read_from_replicas=False):
"""
Create cluster pipeline for batching commands.
Parameters:
- transaction (bool, optional): Transaction mode (not supported in cluster)
- shard_hint (str, optional): Hint for pipeline routing
- read_from_replicas (bool): Enable reading from replicas
Returns:
ClusterPipeline: Pipeline instance for command batching
"""Cluster-wide key iteration with pattern matching and type filtering.
def scan_iter(self, match=None, count=None, _type=None):
"""
Scan keys across entire cluster.
Parameters:
- match (str, optional): Key pattern to match
- count (int, optional): Hint for number of keys per iteration
- _type (str, optional): Filter by key type
Yields:
str: Matching keys from all cluster nodes
"""Commands for managing cluster topology, slots, and node configuration.
def cluster_info(self):
"""
Get cluster state information.
Returns:
dict: Cluster state details including size, slots, and status
"""
def cluster_nodes(self):
"""
Get detailed cluster topology information.
Returns:
dict: Node information including IDs, addresses, roles, and slot assignments
"""
def cluster_slots(self):
"""
Get hash slot to node mappings.
Returns:
SlotsMapping: List of slot ranges with assigned master and replica nodes
"""
def cluster_keyslot(self, name):
"""
Get hash slot for key.
Parameters:
- name (str): Key name
Returns:
int: Hash slot number (0-16383)
"""
def cluster_countkeysinslot(self, slot_id):
"""
Count keys in specific slot.
Parameters:
- slot_id (int): Slot number (0-16383)
Returns:
int: Number of keys in slot
"""
def cluster_count_failure_report(self, node_id):
"""
Get failure report count for specific node.
Parameters:
- node_id (str): Node ID to check failure reports for
Returns:
int: Number of failure reports for the node
"""Commands for assigning and managing hash slots across cluster nodes.
def cluster_addslots(self, node_id, *slots):
"""
Assign hash slots to specific node.
Parameters:
- node_id (str): Target node ID
- *slots (int): Slot numbers to assign
Returns:
bool: Success status
"""
def cluster_delslots(self, *slots):
"""
Remove hash slots from nodes.
Parameters:
- *slots (int): Slot numbers to remove
Returns:
bool: Success status
"""Commands for adding nodes, configuring replication, and managing cluster membership.
def cluster_meet(self, node_id, host, port):
"""
Add node to cluster.
Parameters:
- node_id (str): Node ID to add
- host (str): Node hostname
- port (int): Node port
Returns:
bool: Success status
"""
def cluster_replicate(self, target_node_id):
"""
Configure current node as replica.
Parameters:
- target_node_id (str): Master node ID to replicate
Returns:
bool: Success status
"""
def cluster_failover(self, node_id, option=None):
"""
Force failover on specified node.
Parameters:
- node_id (str): Node ID to failover
- option (str, optional): Failover option ('FORCE' or 'TAKEOVER')
Returns:
bool: Success status
"""
def cluster_reset(self, node_id, soft=True):
"""
Reset cluster node.
Parameters:
- node_id (str): Node ID to reset
- soft (bool): Perform soft reset (preserve data)
Returns:
bool: Success status
"""
def cluster_reset_all_nodes(self, soft=True):
"""
Reset all cluster nodes.
Parameters:
- soft (bool): Perform soft reset (preserve data)
Returns:
bool: Success status
"""
def cluster_save_config(self):
"""
Save cluster configuration to disk.
Returns:
bool: Success status
"""
def cluster_get_keys_in_slot(self, slot, num_keys):
"""
Get keys in specific slot.
Parameters:
- slot (int): Slot number (0-16383)
- num_keys (int): Maximum number of keys to return
Returns:
List[str]: Keys in the slot
"""
def cluster_set_config_epoch(self, node_id, epoch):
"""
Set cluster configuration epoch for node.
Parameters:
- node_id (str): Node ID
- epoch (int): Configuration epoch
Returns:
bool: Success status
"""
def cluster_setslot(self, node_id, slot_id, state, bind_to_node_id=None):
"""
Set slot state for cluster reconfiguration.
Parameters:
- node_id (str): Node ID
- slot_id (int): Slot number (0-16383)
- state (str): Slot state ('IMPORTING', 'MIGRATING', 'STABLE', 'NODE')
- bind_to_node_id (str, optional): Node to bind slot to (for 'NODE' state)
Returns:
bool: Success status
"""
def cluster_slaves(self, target_node_id):
"""
Get slave nodes for specific master.
Parameters:
- target_node_id (str): Master node ID
Returns:
List[dict]: Slave node information
"""Modified Redis commands that work across multiple cluster nodes.
def mget(self, keys, *args):
"""
Multi-get across cluster nodes.
Parameters:
- keys (List[str]): List of keys to retrieve
- *args: Additional key arguments
Returns:
List[Any]: Values for requested keys (None for missing keys)
"""
def mset(self, *args, **kwargs):
"""
Multi-set across cluster nodes.
Parameters:
- *args: Key-value pairs as positional arguments
- **kwargs: Key-value pairs as keyword arguments
Returns:
bool: Success status
"""
def msetnx(self, *args, **kwargs):
"""
Multi-set if not exists across cluster nodes.
Parameters:
- *args: Key-value pairs as positional arguments
- **kwargs: Key-value pairs as keyword arguments
Returns:
bool: True if all keys were set, False otherwise
"""
def delete(self, *names):
"""
Delete multiple keys across cluster nodes.
Parameters:
- *names (str): Keys to delete
Returns:
int: Number of keys deleted
"""
def rename(self, src, dst, replace=False):
"""
Rename key with cross-slot support.
Parameters:
- src (str): Source key name
- dst (str): Destination key name
- replace (bool): Allow replacing existing destination key
Returns:
bool: Success status
"""
def renamenx(self, src, dst):
"""
Rename key if destination doesn't exist (cluster implementation is non-atomic).
Parameters:
- src (str): Source key name
- dst (str): Destination key name
Returns:
bool: True if rename occurred, False if destination already exists
Note:
In cluster mode, this operation is not atomic - it checks existence
then calls rename, which may cause race conditions.
"""Cluster-aware set operations that work across multiple nodes.
def sdiff(self, keys, *args):
"""Set difference across cluster nodes."""
def sdiffstore(self, dest, keys, *args):
"""Store set difference result across cluster nodes."""
def sinter(self, keys, *args):
"""Set intersection across cluster nodes."""
def sinterstore(self, dest, keys, *args):
"""Store set intersection result across cluster nodes."""
def smove(self, src, dst, value):
"""Move set member between sets across cluster nodes."""
def sunion(self, keys, *args):
"""Set union across cluster nodes."""
def sunionstore(self, dest, keys, *args):
"""Store set union result across cluster nodes."""Cluster-aware list operations for cross-slot list manipulation.
def brpoplpush(self, src, dst, timeout=0):
"""
Blocking right pop left push across cluster nodes.
Parameters:
- src (str): Source list key
- dst (str): Destination list key
- timeout (int): Blocking timeout in seconds
Returns:
str: Moved element or None if timeout
"""
def rpoplpush(self, src, dst):
"""
Right pop left push across cluster nodes.
Parameters:
- src (str): Source list key
- dst (str): Destination list key
Returns:
str: Moved element or None if source empty
"""Cluster-aware HyperLogLog operations for distributed cardinality estimation.
def pfcount(self, *sources):
"""
Count unique elements across HyperLogLog keys.
Parameters:
- *sources (str): HyperLogLog key names
Returns:
int: Approximate cardinality
"""
def pfmerge(self, dest, *sources):
"""
Merge HyperLogLog keys across cluster nodes.
Parameters:
- dest (str): Destination key
- *sources (str): Source HyperLogLog keys
Returns:
bool: Success status
"""Commands for getting information about active channels and subscriptions.
def pubsub_channels(self, pattern='*', aggregate=True):
"""
Get list of channels with active subscribers.
Parameters:
- pattern (str): Channel name pattern to match
- aggregate (bool): Merge responses from all nodes
Returns:
List[str]: Active channel names matching pattern
"""
def pubsub_numpat(self, aggregate=True):
"""
Get number of active pattern subscriptions.
Parameters:
- aggregate (bool): Merge responses from all nodes
Returns:
int: Total number of pattern subscriptions
"""
def pubsub_numsub(self, *args, **kwargs):
"""
Get subscriber counts for specific channels.
Parameters:
- *args: Channel names to check
- aggregate (bool): Merge responses from all nodes (via kwargs)
Returns:
List[Tuple[str, int]]: Channel names and subscriber counts
"""from rediscluster import RedisCluster
# Single host startup
rc = RedisCluster(host='127.0.0.1', port=7000, decode_responses=True)
# Multiple startup nodes for redundancy
startup_nodes = [
{"host": "127.0.0.1", "port": "7000"},
{"host": "127.0.0.1", "port": "7001"},
{"host": "127.0.0.1", "port": "7002"}
]
rc = RedisCluster(startup_nodes=startup_nodes, decode_responses=True)
# From URL
rc = RedisCluster.from_url("redis://127.0.0.1:7000", decode_responses=True)# Production configuration with connection pooling
rc = RedisCluster(
startup_nodes=startup_nodes,
decode_responses=True,
max_connections=32,
max_connections_per_node=8,
read_from_replicas=True,
cluster_down_retry_attempts=5,
socket_timeout=5,
socket_connect_timeout=5
)
# Read-only replica configuration
rc = RedisCluster(
startup_nodes=startup_nodes,
readonly_mode=True,
read_from_replicas=True
)# Get cluster information
info = rc.cluster_info()
print(f"Cluster size: {info['cluster_size']}")
print(f"Cluster state: {info['cluster_state']}")
# Get node topology
nodes = rc.cluster_nodes()
for node_id, node_info in nodes.items():
print(f"Node {node_id}: {node_info['host']}:{node_info['port']} - {node_info['role']}")
# Get slot mappings
slots = rc.cluster_slots()
for slot_range in slots:
start_slot, end_slot = slot_range[0], slot_range[1]
master = slot_range[2]
print(f"Slots {start_slot}-{end_slot}: master {master[0]}:{master[1]}")# Multi-key operations work transparently across nodes
keys_values = {"user:1": "data1", "user:2": "data2", "session:abc": "active"}
rc.mset(keys_values)
values = rc.mget(["user:1", "user:2", "session:abc"])
print(values) # ['data1', 'data2', 'active']
# Delete multiple keys
deleted = rc.delete("user:1", "user:2", "session:abc")
print(f"Deleted {deleted} keys")Install with Tessl CLI
npx tessl i tessl/pypi-redis-py-cluster