CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-redis

Python client for Redis database and key-value store

Pending
Overview
Eval results
Files

cluster-support.mddocs/

Cluster Support

Redis Cluster support provides automatic sharding across multiple Redis nodes with built-in failover and high availability. The RedisCluster client handles node discovery, command routing, and cluster topology management.

import redis
from redis import RedisCluster
from redis.cluster import ClusterNode
from typing import Optional, List, Dict, Any, Type

Capabilities

Cluster Client Initialization

Create Redis cluster clients with automatic node discovery and configuration.

class RedisCluster:
    def __init__(
        self,
        host: Optional[str] = None,
        port: int = 7000,
        startup_nodes: Optional[List[ClusterNode]] = None,
        cluster_error_retry_attempts: int = 3,
        require_full_coverage: bool = True,
        skip_full_coverage_check: bool = False,
        reinitialize_steps: int = 10,
        read_from_replicas: bool = False,
        dynamic_startup_nodes: bool = True,
        connection_pool_class: Type[ConnectionPool] = ConnectionPool,
        **kwargs
    ): ...

    @classmethod
    def from_url(
        cls,
        url: str,
        **kwargs
    ) -> "RedisCluster": ...

class ClusterNode:
    def __init__(
        self,
        host: str,
        port: int,
        server_type: Optional[str] = None
    ): ...
    
    host: str
    port: int
    server_type: Optional[str]

Cluster Operations

Cluster-specific operations for managing and inspecting cluster state.

def cluster_info(self) -> Dict[str, Any]: ...

def cluster_nodes(self) -> Dict[str, Dict[str, Any]]: ...

def cluster_slots(self) -> List[List[Union[int, List[str]]]]: ...

def cluster_keyslot(self, key: KeyT) -> int: ...

def cluster_countkeysinslot(self, slot: int) -> int: ...

def cluster_getkeysinslot(self, slot: int, count: int) -> List[bytes]: ...

def cluster_addslots(self, *slots: int) -> bool: ...

def cluster_delslots(self, *slots: int) -> bool: ...

def cluster_flushslots(self) -> bool: ...

def cluster_setslot(self, slot: int, command: str, node_id: Optional[str] = None) -> bool: ...

def cluster_replicate(self, node_id: str) -> bool: ...

def cluster_meet(self, host: str, port: int) -> bool: ...

def cluster_forget(self, node_id: str) -> bool: ...

def cluster_reset(self, hard: bool = False) -> bool: ...

def cluster_failover(self, option: Optional[str] = None) -> bool: ...

Pipeline Support

Cluster-aware pipeline operations for batching commands while respecting cluster topology.

def pipeline(self, transaction: bool = False) -> "ClusterPipeline": ...

class ClusterPipeline:
    def execute(self, raise_on_error: bool = True) -> List[Any]: ...
    
    def reset(self) -> None: ...
    
    def watch(self, *names: KeyT) -> bool: ...
    
    def multi(self) -> "ClusterPipeline": ...
    
    def discard(self) -> None: ...

Node Management

Access and manage individual cluster nodes for advanced operations.

def get_node(
    self,
    host: Optional[str] = None,
    port: Optional[int] = None,
    node_name: Optional[str] = None
) -> Optional[ClusterNode]: ...

def get_primaries(self) -> List[ClusterNode]: ...

def get_replicas(self) -> List[ClusterNode]: ...

def get_random_node(self) -> ClusterNode: ...

def get_master_node_by_key(self, key: KeyT) -> ClusterNode: ...

def get_nodes_by_keys(self, *keys: KeyT) -> Dict[ClusterNode, List[KeyT]]: ...

def execute_command_on_nodes(
    self,
    nodes: List[ClusterNode],
    command: str,
    *args,
    **kwargs
) -> Dict[ClusterNode, Any]: ...

Usage Examples

Basic Cluster Connection

import redis
from redis.cluster import RedisCluster, ClusterNode

# Connect with startup nodes
startup_nodes = [
    ClusterNode("localhost", 7000),
    ClusterNode("localhost", 7001),
    ClusterNode("localhost", 7002)
]

cluster = RedisCluster(startup_nodes=startup_nodes, decode_responses=True)

# Basic operations work the same as single Redis
cluster.set("key1", "value1")
value = cluster.get("key1")

Connection from URL

# Connect to cluster via URL
cluster = RedisCluster.from_url("redis://localhost:7000")

# With authentication
cluster = RedisCluster.from_url("redis://:password@localhost:7000")

Cluster Information and Management

# Get cluster information
info = cluster.cluster_info()
print(f"Cluster state: {info['cluster_state']}")
print(f"Cluster size: {info['cluster_size']}")

# Get node information
nodes = cluster.cluster_nodes()
for node_id, node_info in nodes.items():
    print(f"Node {node_id}: {node_info['ip']}:{node_info['port']}")

# Get slot distribution
slots = cluster.cluster_slots()
for slot_range in slots:
    start_slot, end_slot = slot_range[0], slot_range[1]
    master_info = slot_range[2]
    print(f"Slots {start_slot}-{end_slot}: {master_info[0]}:{master_info[1]}")

Working with Specific Nodes

# Get different types of nodes
primaries = cluster.get_primaries()
replicas = cluster.get_replicas()

# Execute command on specific nodes
results = cluster.execute_command_on_nodes(
    primaries, 
    "INFO", 
    "memory"
)

for node, result in results.items():
    print(f"Node {node.host}:{node.port} memory info: {result}")

# Find which node handles a specific key
key = "user:1001"
node = cluster.get_master_node_by_key(key)
print(f"Key '{key}' is handled by {node.host}:{node.port}")

Cluster Pipeline Operations

# Use pipeline for batching (commands are automatically routed)
pipe = cluster.pipeline()

# Add commands to pipeline
pipe.set("key1", "value1")
pipe.set("key2", "value2") 
pipe.get("key1")
pipe.get("key2")

# Execute all commands
results = pipe.execute()
print(f"Results: {results}")

High Availability Configuration

# Enable reading from replicas for better distribution
cluster = RedisCluster(
    startup_nodes=startup_nodes,
    read_from_replicas=True,          # Read from replicas when possible
    cluster_error_retry_attempts=5,   # Retry failed operations
    require_full_coverage=False,      # Allow partial cluster operation
    skip_full_coverage_check=True     # Skip coverage validation
)

# Configure connection pooling
cluster = RedisCluster(
    startup_nodes=startup_nodes,
    max_connections=100,              # Max connections per node
    retry_on_timeout=True,            # Retry on timeout
    health_check_interval=30          # Health check frequency
)

Cross-Slot Operations

# Operations spanning multiple hash slots require special handling
import hashlib

def same_slot_keys(*keys):
    """Ensure all keys map to the same hash slot using hash tags"""
    hash_tag = "{user1001}"
    return [f"{hash_tag}:{key}" for key in keys]

# These keys will be in the same slot
keys = same_slot_keys("profile", "settings", "activity")

# Multi-key operations work within same slot
pipe = cluster.pipeline()
pipe.mset({
    keys[0]: "profile_data",
    keys[1]: "settings_data", 
    keys[2]: "activity_data"
})
pipe.mget(keys)
results = pipe.execute()

Error Handling

from redis.exceptions import RedisClusterException, ClusterDownError

try:
    cluster.set("key", "value")
except ClusterDownError:
    print("Cluster is down or unreachable")
except RedisClusterException as e:
    print(f"Cluster error: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-redis

docs

async-support.md

cluster-support.md

connection-management.md

core-client.md

distributed-locking.md

error-handling.md

high-availability.md

index.md

pipelines-transactions.md

pubsub-messaging.md

tile.json