CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-redis-py-cluster

Library for communicating with Redis Clusters. Built on top of redis-py lib

Pending
Overview
Eval results
Files

client.mddocs/

Redis Cluster Client

The RedisCluster class is the primary interface for interacting with Redis clusters. It extends the standard Redis client with cluster-aware implementations of Redis commands, automatic slot management, node discovery, and failover support.

Capabilities

RedisCluster Class

Main Redis cluster client that provides all standard Redis operations with cluster-aware implementations and additional cluster management functionality.

class RedisCluster(Redis):
    def __init__(self, host=None, port=None, startup_nodes=None, 
                 max_connections=None, max_connections_per_node=False,
                 init_slot_cache=True, readonly_mode=False, 
                 reinitialize_steps=None, skip_full_coverage_check=False,
                 nodemanager_follow_cluster=False, connection_class=None,
                 read_from_replicas=False, cluster_down_retry_attempts=3,
                 host_port_remap=None, **kwargs):
        """
        Initialize Redis cluster client.
        
        Parameters:
        - host (str, optional): Single startup host
        - port (int, optional): Port for single host
        - startup_nodes (List[StartupNode], optional): List of initial cluster nodes
        - max_connections (int, optional): Maximum total connections across cluster
        - max_connections_per_node (bool|int): Enable per-node connection limits
        - init_slot_cache (bool): Initialize slot-to-node mapping on startup
        - readonly_mode (bool): Enable read-only operations
        - reinitialize_steps (int, optional): Steps before reinitializing cluster layout
        - skip_full_coverage_check (bool): Skip full cluster coverage validation
        - nodemanager_follow_cluster (bool): Automatically follow cluster topology changes
        - connection_class (type, optional): Custom connection class
        - read_from_replicas (bool): Allow reading from replica nodes
        - cluster_down_retry_attempts (int): Retry attempts for cluster-down errors
        - host_port_remap (HostPortRemap, optional): Host/port remapping configuration
        - **kwargs: Additional Redis connection parameters (decode_responses, etc.)
        """

Client Creation

Create cluster clients from URLs or startup node lists.

@classmethod
def from_url(cls, url, **kwargs):
    """
    Create RedisCluster client from connection URL.
    
    Parameters:
    - url (str): Redis cluster URL (redis://host:port)
    - **kwargs: Additional connection parameters
    
    Returns:
    RedisCluster: Configured cluster client instance
    """

Core Client Methods

Essential client functionality for command execution and cluster interface creation.

def execute_command(self, *args, **kwargs):
    """
    Execute Redis command with cluster routing.
    
    Parameters:
    - *args: Command and arguments
    - **kwargs: Additional execution options
    
    Returns:
    Any: Command result
    """

def pubsub(self, **kwargs):
    """
    Get cluster-aware pub/sub interface.
    
    Parameters:
    - **kwargs: PubSub configuration options
    
    Returns:
    ClusterPubSub: Cluster pub/sub instance
    """

def pipeline(self, transaction=None, shard_hint=None, read_from_replicas=False):
    """
    Create cluster pipeline for batching commands.
    
    Parameters:
    - transaction (bool, optional): Transaction mode (not supported in cluster)
    - shard_hint (str, optional): Hint for pipeline routing
    - read_from_replicas (bool): Enable reading from replicas
    
    Returns:
    ClusterPipeline: Pipeline instance for command batching
    """

Key Scanning

Cluster-wide key iteration with pattern matching and type filtering.

def scan_iter(self, match=None, count=None, _type=None):
    """
    Scan keys across entire cluster.
    
    Parameters:
    - match (str, optional): Key pattern to match
    - count (int, optional): Hint for number of keys per iteration
    - _type (str, optional): Filter by key type
    
    Yields:
    str: Matching keys from all cluster nodes
    """

Cluster Management Commands

Commands for managing cluster topology, slots, and node configuration.

def cluster_info(self):
    """
    Get cluster state information.
    
    Returns:
    dict: Cluster state details including size, slots, and status
    """

def cluster_nodes(self):
    """
    Get detailed cluster topology information.
    
    Returns:
    dict: Node information including IDs, addresses, roles, and slot assignments
    """

def cluster_slots(self):
    """
    Get hash slot to node mappings.
    
    Returns:
    SlotsMapping: List of slot ranges with assigned master and replica nodes
    """

def cluster_keyslot(self, name):
    """
    Get hash slot for key.
    
    Parameters:
    - name (str): Key name
    
    Returns:
    int: Hash slot number (0-16383)
    """

def cluster_countkeysinslot(self, slot_id):
    """
    Count keys in specific slot.
    
    Parameters:
    - slot_id (int): Slot number (0-16383)
    
    Returns:
    int: Number of keys in slot
    """

def cluster_count_failure_report(self, node_id):
    """
    Get failure report count for specific node.
    
    Parameters:
    - node_id (str): Node ID to check failure reports for
    
    Returns:
    int: Number of failure reports for the node
    """

Slot Management Commands

Commands for assigning and managing hash slots across cluster nodes.

def cluster_addslots(self, node_id, *slots):
    """
    Assign hash slots to specific node.
    
    Parameters:
    - node_id (str): Target node ID
    - *slots (int): Slot numbers to assign
    
    Returns:
    bool: Success status
    """

def cluster_delslots(self, *slots):
    """
    Remove hash slots from nodes.
    
    Parameters:
    - *slots (int): Slot numbers to remove
    
    Returns:
    bool: Success status
    """

Node Management Commands

Commands for adding nodes, configuring replication, and managing cluster membership.

def cluster_meet(self, node_id, host, port):
    """
    Add node to cluster.
    
    Parameters:
    - node_id (str): Node ID to add
    - host (str): Node hostname
    - port (int): Node port
    
    Returns:
    bool: Success status
    """

def cluster_replicate(self, target_node_id):
    """
    Configure current node as replica.
    
    Parameters:
    - target_node_id (str): Master node ID to replicate
    
    Returns:
    bool: Success status
    """

def cluster_failover(self, node_id, option=None):
    """
    Force failover on specified node.
    
    Parameters:
    - node_id (str): Node ID to failover
    - option (str, optional): Failover option ('FORCE' or 'TAKEOVER')
    
    Returns:
    bool: Success status
    """

def cluster_reset(self, node_id, soft=True):
    """
    Reset cluster node.
    
    Parameters:
    - node_id (str): Node ID to reset
    - soft (bool): Perform soft reset (preserve data)
    
    Returns:
    bool: Success status
    """

def cluster_reset_all_nodes(self, soft=True):
    """
    Reset all cluster nodes.
    
    Parameters:
    - soft (bool): Perform soft reset (preserve data)
    
    Returns:
    bool: Success status
    """

def cluster_save_config(self):
    """
    Save cluster configuration to disk.
    
    Returns:
    bool: Success status
    """

def cluster_get_keys_in_slot(self, slot, num_keys):
    """
    Get keys in specific slot.
    
    Parameters:
    - slot (int): Slot number (0-16383)
    - num_keys (int): Maximum number of keys to return
    
    Returns:
    List[str]: Keys in the slot
    """

def cluster_set_config_epoch(self, node_id, epoch):
    """
    Set cluster configuration epoch for node.
    
    Parameters:
    - node_id (str): Node ID
    - epoch (int): Configuration epoch
    
    Returns:
    bool: Success status
    """

def cluster_setslot(self, node_id, slot_id, state, bind_to_node_id=None):
    """
    Set slot state for cluster reconfiguration.
    
    Parameters:
    - node_id (str): Node ID
    - slot_id (int): Slot number (0-16383)
    - state (str): Slot state ('IMPORTING', 'MIGRATING', 'STABLE', 'NODE')
    - bind_to_node_id (str, optional): Node to bind slot to (for 'NODE' state)
    
    Returns:
    bool: Success status
    """

def cluster_slaves(self, target_node_id):
    """
    Get slave nodes for specific master.
    
    Parameters:
    - target_node_id (str): Master node ID
    
    Returns:
    List[dict]: Slave node information
    """

Cluster-Aware Redis Commands

Modified Redis commands that work across multiple cluster nodes.

def mget(self, keys, *args):
    """
    Multi-get across cluster nodes.
    
    Parameters:
    - keys (List[str]): List of keys to retrieve
    - *args: Additional key arguments
    
    Returns:
    List[Any]: Values for requested keys (None for missing keys)
    """

def mset(self, *args, **kwargs):
    """
    Multi-set across cluster nodes.
    
    Parameters:
    - *args: Key-value pairs as positional arguments
    - **kwargs: Key-value pairs as keyword arguments
    
    Returns:
    bool: Success status
    """

def msetnx(self, *args, **kwargs):
    """
    Multi-set if not exists across cluster nodes.
    
    Parameters:
    - *args: Key-value pairs as positional arguments
    - **kwargs: Key-value pairs as keyword arguments
    
    Returns:
    bool: True if all keys were set, False otherwise
    """

def delete(self, *names):
    """
    Delete multiple keys across cluster nodes.
    
    Parameters:
    - *names (str): Keys to delete
    
    Returns:
    int: Number of keys deleted
    """

def rename(self, src, dst, replace=False):
    """
    Rename key with cross-slot support.
    
    Parameters:
    - src (str): Source key name
    - dst (str): Destination key name
    - replace (bool): Allow replacing existing destination key
    
    Returns:
    bool: Success status
    """

def renamenx(self, src, dst):
    """
    Rename key if destination doesn't exist (cluster implementation is non-atomic).
    
    Parameters:
    - src (str): Source key name
    - dst (str): Destination key name
    
    Returns:
    bool: True if rename occurred, False if destination already exists
    
    Note: 
    In cluster mode, this operation is not atomic - it checks existence 
    then calls rename, which may cause race conditions.
    """

Set Operations

Cluster-aware set operations that work across multiple nodes.

def sdiff(self, keys, *args):
    """Set difference across cluster nodes."""

def sdiffstore(self, dest, keys, *args):
    """Store set difference result across cluster nodes."""

def sinter(self, keys, *args):
    """Set intersection across cluster nodes."""

def sinterstore(self, dest, keys, *args):
    """Store set intersection result across cluster nodes."""

def smove(self, src, dst, value):
    """Move set member between sets across cluster nodes."""

def sunion(self, keys, *args):
    """Set union across cluster nodes."""

def sunionstore(self, dest, keys, *args):
    """Store set union result across cluster nodes."""

List Operations

Cluster-aware list operations for cross-slot list manipulation.

def brpoplpush(self, src, dst, timeout=0):
    """
    Blocking right pop left push across cluster nodes.
    
    Parameters:
    - src (str): Source list key
    - dst (str): Destination list key  
    - timeout (int): Blocking timeout in seconds
    
    Returns:
    str: Moved element or None if timeout
    """

def rpoplpush(self, src, dst):
    """
    Right pop left push across cluster nodes.
    
    Parameters:
    - src (str): Source list key
    - dst (str): Destination list key
    
    Returns:
    str: Moved element or None if source empty
    """

HyperLogLog Operations

Cluster-aware HyperLogLog operations for distributed cardinality estimation.

def pfcount(self, *sources):
    """
    Count unique elements across HyperLogLog keys.
    
    Parameters:
    - *sources (str): HyperLogLog key names
    
    Returns:
    int: Approximate cardinality
    """

def pfmerge(self, dest, *sources):
    """
    Merge HyperLogLog keys across cluster nodes.
    
    Parameters:
    - dest (str): Destination key
    - *sources (str): Source HyperLogLog keys
    
    Returns:
    bool: Success status
    """

Pub/Sub Information Commands

Commands for getting information about active channels and subscriptions.

def pubsub_channels(self, pattern='*', aggregate=True):
    """
    Get list of channels with active subscribers.
    
    Parameters:
    - pattern (str): Channel name pattern to match
    - aggregate (bool): Merge responses from all nodes
    
    Returns:
    List[str]: Active channel names matching pattern
    """

def pubsub_numpat(self, aggregate=True):
    """
    Get number of active pattern subscriptions.
    
    Parameters:
    - aggregate (bool): Merge responses from all nodes
    
    Returns:
    int: Total number of pattern subscriptions
    """

def pubsub_numsub(self, *args, **kwargs):
    """
    Get subscriber counts for specific channels.
    
    Parameters:
    - *args: Channel names to check
    - aggregate (bool): Merge responses from all nodes (via kwargs)
    
    Returns:
    List[Tuple[str, int]]: Channel names and subscriber counts
    """

Usage Examples

Basic Client Setup

from rediscluster import RedisCluster

# Single host startup
rc = RedisCluster(host='127.0.0.1', port=7000, decode_responses=True)

# Multiple startup nodes for redundancy
startup_nodes = [
    {"host": "127.0.0.1", "port": "7000"},
    {"host": "127.0.0.1", "port": "7001"},
    {"host": "127.0.0.1", "port": "7002"}
]
rc = RedisCluster(startup_nodes=startup_nodes, decode_responses=True)

# From URL
rc = RedisCluster.from_url("redis://127.0.0.1:7000", decode_responses=True)

Advanced Configuration

# Production configuration with connection pooling
rc = RedisCluster(
    startup_nodes=startup_nodes,
    decode_responses=True,
    max_connections=32,
    max_connections_per_node=8,
    read_from_replicas=True,
    cluster_down_retry_attempts=5,
    socket_timeout=5,
    socket_connect_timeout=5
)

# Read-only replica configuration
rc = RedisCluster(
    startup_nodes=startup_nodes,
    readonly_mode=True,
    read_from_replicas=True
)

Cluster Management

# Get cluster information
info = rc.cluster_info()
print(f"Cluster size: {info['cluster_size']}")
print(f"Cluster state: {info['cluster_state']}")

# Get node topology
nodes = rc.cluster_nodes()
for node_id, node_info in nodes.items():
    print(f"Node {node_id}: {node_info['host']}:{node_info['port']} - {node_info['role']}")

# Get slot mappings
slots = rc.cluster_slots()
for slot_range in slots:
    start_slot, end_slot = slot_range[0], slot_range[1]
    master = slot_range[2]
    print(f"Slots {start_slot}-{end_slot}: master {master[0]}:{master[1]}")

Multi-Key Operations

# Multi-key operations work transparently across nodes
keys_values = {"user:1": "data1", "user:2": "data2", "session:abc": "active"}
rc.mset(keys_values)

values = rc.mget(["user:1", "user:2", "session:abc"])
print(values)  # ['data1', 'data2', 'active']

# Delete multiple keys
deleted = rc.delete("user:1", "user:2", "session:abc")
print(f"Deleted {deleted} keys")

Install with Tessl CLI

npx tessl i tessl/pypi-redis-py-cluster

docs

client.md

connections.md

exceptions.md

index.md

pipeline.md

pubsub.md

tile.json