Library for communicating with Redis Clusters. Built on top of redis-py lib
npx @tessl/cli install tessl/pypi-redis-py-cluster@2.1.0A Python client library for Redis Cluster that provides cluster-aware implementations of Redis commands with automatic slot management, node discovery, failover support, and connection pooling. Built on top of the redis-py library, it extends standard Redis functionality to work seamlessly with Redis Cluster's distributed architecture.
pip install redis-py-clusterfrom rediscluster import RedisClusterAll public components:
from rediscluster import (
RedisCluster,
ClusterConnection,
ClusterConnectionPool,
ClusterBlockingConnectionPool,
ClusterPipeline,
RedisClusterException,
RedisClusterError,
ClusterDownException,
ClusterDownError,
ClusterCrossSlotError,
MovedError,
AskError,
TryAgainError,
MasterDownError
)from rediscluster import RedisCluster
# Connect to Redis Cluster
startup_nodes = [{"host": "127.0.0.1", "port": "7000"}]
rc = RedisCluster(startup_nodes=startup_nodes, decode_responses=True)
# Execute standard Redis commands
rc.set("key", "value")
result = rc.get("key")
# Multi-key operations work across cluster nodes
rc.mset({"key1": "value1", "key2": "value2", "key3": "value3"})
values = rc.mget(["key1", "key2", "key3"])
# Pipeline operations
pipe = rc.pipeline()
pipe.set("pkey1", "pvalue1")
pipe.get("pkey1")
pipe.incr("counter")
results = pipe.execute()
# Cluster management
cluster_info = rc.cluster_info()
nodes = rc.cluster_nodes()Redis-py-cluster extends redis-py with cluster-aware functionality:
The library automatically handles Redis Cluster's 16,384 hash slots, discovers node topology, manages failover scenarios, and routes commands to appropriate nodes while maintaining the familiar redis-py API.
The primary interface for interacting with Redis clusters, providing all standard Redis operations with cluster-aware implementations and additional cluster management commands.
class RedisCluster(Redis):
def __init__(self, host=None, port=None, startup_nodes=None,
max_connections=None, max_connections_per_node=False,
init_slot_cache=True, readonly_mode=False,
reinitialize_steps=None, skip_full_coverage_check=False,
nodemanager_follow_cluster=False, connection_class=None,
read_from_replicas=False, cluster_down_retry_attempts=3,
host_port_remap=None, **kwargs)
@classmethod
def from_url(cls, url, **kwargs): ...
def pubsub(self, **kwargs): ...
def pipeline(self, transaction=None, shard_hint=None, read_from_replicas=False): ...
def execute_command(self, *args, **kwargs): ...Connection classes and pools for managing TCP/SSL connections to cluster nodes with automatic discovery, load balancing, and failover support.
class ClusterConnection(Connection):
def __init__(self, readonly=False, **kwargs): ...
class ClusterConnectionPool(ConnectionPool):
def __init__(self, startup_nodes=None, init_slot_cache=True,
connection_class=None, max_connections=None,
max_connections_per_node=False, **kwargs): ...
def get_connection_by_node(self, node): ...
def get_connection_by_slot(self, slot): ...Cluster-aware pipeline for batching commands with automatic routing to appropriate nodes while respecting Redis Cluster constraints.
class ClusterPipeline(RedisCluster):
def __init__(self, connection_pool, result_callbacks=None,
response_callbacks=None, startup_nodes=None,
read_from_replicas=False, cluster_down_retry_attempts=3): ...
def execute_command(self, *args, **kwargs): ...
def execute(self, raise_on_error=True): ...
def reset(self): ...Comprehensive exception classes for handling cluster-specific errors, redirections, and failure scenarios.
class RedisClusterException(Exception): ...
class ClusterDownError(ClusterError, ResponseError): ...
class MovedError(AskError): ...
class AskError(ResponseError):
def __init__(self, resp): ...
# Properties: slot_id, host, port, node_addr, messageCluster-aware publish/subscribe functionality for message broadcasting and real-time communication across cluster nodes.
class ClusterPubSub(PubSub):
def __init__(self, connection_pool, shard_hint=None, ignore_subscribe_messages=False): ...
def execute_command(self, *args, **kwargs): ...
def subscribe(self, *args, **kwargs): ...
def psubscribe(self, *args, **kwargs): ...
def get_message(self, timeout=0, ignore_subscribe_messages=False): ...
def listen(self): ...# Startup node specification
StartupNode = TypedDict('StartupNode', {
'host': str,
'port': Union[str, int]
})
# Host/port remapping configuration
HostPortRemap = Dict[Tuple[str, int], Tuple[str, int]]
# Node information from cluster
NodeInfo = Dict[str, Union[str, int, List[int]]]
# Cluster slots mapping
SlotsMapping = List[List[Union[int, List[Union[str, int]]]]]