Library for communicating with Redis Clusters. Built on top of redis-py lib
—
Connection classes and pools for managing TCP/SSL connections to Redis cluster nodes with automatic discovery, load balancing, and failover support. The connection system handles cluster topology changes, node failures, and provides efficient connection pooling.
Standard TCP connection for cluster nodes with cluster-specific response parsing and read-only mode support.
class ClusterConnection(Connection):
def __init__(self, readonly=False, **kwargs):
"""
Initialize cluster TCP connection.
Parameters:
- readonly (bool): Configure connection as read-only
- **kwargs: Standard Redis connection parameters
- host (str): Redis host
- port (int): Redis port
- db (int): Database number (usually 0 for cluster)
- password (str, optional): Authentication password
- socket_timeout (float): Socket timeout in seconds
- socket_connect_timeout (float): Connection timeout in seconds
- socket_keepalive (bool): Enable TCP keepalive
- socket_keepalive_options (dict): TCP keepalive options
- retry_on_timeout (bool): Retry commands on timeout
- encoding (str): String encoding (default 'utf-8')
- encoding_errors (str): Encoding error handling
- decode_responses (bool): Decode responses to strings
"""
def on_connect(self):
"""
Initialize connection with authentication and read-only mode setup.
Automatically sends READONLY command if readonly=True.
"""SSL/TLS connection for secure cluster communication with cluster-specific parsing.
class SSLClusterConnection(SSLConnection):
def __init__(self, readonly=False, **kwargs):
"""
Initialize secure cluster SSL connection.
Parameters:
- readonly (bool): Configure connection as read-only
- **kwargs: Standard SSL connection parameters
- ssl_cert_reqs (str): Certificate verification mode
- ssl_ca_certs (str): CA certificates file path
- ssl_certfile (str): Client certificate file path
- ssl_keyfile (str): Client private key file path
- ssl_password (str, optional): Private key password
- ssl_check_hostname (bool): Verify hostname in certificate
- All ClusterConnection parameters
"""
def on_connect(self):
"""
Initialize SSL connection with authentication and read-only mode setup.
"""Base connection pool managing connections to cluster nodes with automatic slot-to-node mapping and discovery.
class ClusterConnectionPool(ConnectionPool):
def __init__(self, startup_nodes=None, init_slot_cache=True,
connection_class=None, max_connections=None,
max_connections_per_node=False, reinitialize_steps=None,
skip_full_coverage_check=False, nodemanager_follow_cluster=False,
host_port_remap=None, **connection_kwargs):
"""
Initialize cluster connection pool.
Parameters:
- startup_nodes (List[StartupNode], optional): Initial cluster nodes for bootstrap
- init_slot_cache (bool): Initialize slot-to-node mapping on startup
- connection_class (type, optional): Connection class (ClusterConnection or SSLClusterConnection)
- max_connections (int, optional): Maximum total connections across all nodes
- max_connections_per_node (bool|int): Per-node connection limit (True=16, int=custom limit)
- reinitialize_steps (int, optional): Commands before reinitializing cluster layout
- skip_full_coverage_check (bool): Skip validation that all slots are covered
- nodemanager_follow_cluster (bool): Automatically follow cluster topology changes
- host_port_remap (HostPortRemap, optional): Remap node addresses
- **connection_kwargs: Parameters passed to connection class
"""
def get_connection_by_node(self, node):
"""
Get connection to specific cluster node.
Parameters:
- node (NodeInfo): Node information dictionary
Returns:
ClusterConnection: Connection instance for the node
"""
def get_connection_by_slot(self, slot):
"""
Get connection for hash slot.
Parameters:
- slot (int): Hash slot number (0-16383)
Returns:
ClusterConnection: Connection to node handling the slot
"""
def get_random_connection(self):
"""
Get random connection from pool.
Returns:
ClusterConnection: Random connection instance
"""
def release(self, connection):
"""
Return connection to pool for reuse.
Parameters:
- connection (ClusterConnection): Connection to release
"""
def disconnect(self):
"""
Close all connections in pool and clear node information.
"""
def reset(self):
"""
Reset pool state and reinitialize cluster discovery.
"""Thread-safe blocking connection pool that blocks when no connections are available.
class ClusterBlockingConnectionPool(ClusterConnectionPool):
def __init__(self, max_connections=50, timeout=20, **kwargs):
"""
Initialize blocking cluster connection pool.
Parameters:
- max_connections (int): Maximum total connections across cluster
- timeout (int): Blocking timeout in seconds when no connections available
- **kwargs: All ClusterConnectionPool parameters
"""
def get_connection(self, command_name, *keys, **options):
"""
Get connection with blocking behavior.
Blocks up to timeout seconds if no connections available.
Parameters:
- command_name (str): Redis command name
- *keys: Command keys for slot routing
- **options: Additional routing options
Returns:
ClusterConnection: Available connection instance
Raises:
ConnectionError: If timeout exceeded waiting for connection
"""
def make_connection(self):
"""
Create new connection instance when pool not at capacity.
Returns:
ClusterConnection: New connection instance
"""Connection pool optimized for read-only operations with automatic replica routing.
class ClusterReadOnlyConnectionPool(ClusterConnectionPool):
def __init__(self, startup_nodes=None, init_slot_cache=True,
connection_class=None, **kwargs):
"""
Initialize read-only connection pool.
Parameters:
- startup_nodes (List[StartupNode], optional): Initial cluster nodes for bootstrap
- init_slot_cache (bool): Initialize slot-to-node mapping on startup
- connection_class (type, optional): Connection class (defaults to ClusterConnection with readonly=True)
- **kwargs: Additional ClusterConnectionPool parameters
Note: Automatically sets readonly=True on all connections
"""Connection pool that intelligently routes read operations to replica nodes for load distribution.
class ClusterWithReadReplicasConnectionPool(ClusterConnectionPool):
def __init__(self, startup_nodes=None, init_slot_cache=True,
connection_class=None, **kwargs):
"""
Initialize connection pool with read replica support.
Parameters:
- startup_nodes (List[StartupNode], optional): Initial cluster nodes for bootstrap
- init_slot_cache (bool): Initialize slot-to-node mapping on startup
- connection_class (type, optional): Connection class (defaults to ClusterConnection)
- **kwargs: Additional ClusterConnectionPool parameters
Features:
- Routes read commands to replica nodes when available
- Falls back to master nodes for write operations
- Provides load balancing across replicas
"""
def get_connection_by_slot(self, slot, read_command=False):
"""
Get connection for slot with replica routing support.
Parameters:
- slot (int): Hash slot number (0-16383)
- read_command (bool): Whether this is a read operation
Returns:
ClusterConnection: Connection to appropriate node (replica for reads, master for writes)
"""The connection pool automatically discovers cluster topology and maintains slot-to-node mappings.
# Internal methods used by RedisCluster (not typically called directly)
def initialize(self):
"""Initialize cluster discovery and slot mappings."""
def reset(self):
"""Reset cluster state and rediscover topology."""
def get_master_node_by_slot(self, slot):
"""Get master node information for slot."""
def get_nodes_by_server_type(self, server_type):
"""Get nodes filtered by type (master/slave)."""Connection pools handle the complete connection lifecycle including creation, reuse, and cleanup.
def make_connection(self):
"""Create new connection when needed."""
def get_connection(self, command_name, *keys, **options):
"""Get appropriate connection for command and keys."""
def release(self, connection):
"""Return connection to pool for reuse."""
def disconnect(self):
"""Close all connections and cleanup resources."""from rediscluster import RedisCluster, ClusterConnectionPool, ClusterConnection
# Default connection pool (automatically created by RedisCluster)
rc = RedisCluster(startup_nodes=[{"host": "127.0.0.1", "port": "7000"}])
# Custom connection pool
pool = ClusterConnectionPool(
startup_nodes=[
{"host": "127.0.0.1", "port": "7000"},
{"host": "127.0.0.1", "port": "7001"},
{"host": "127.0.0.1", "port": "7002"}
],
max_connections=32,
max_connections_per_node=8
)
rc = RedisCluster(connection_pool=pool)from rediscluster import RedisCluster, ClusterConnectionPool, SSLClusterConnection
# SSL connection pool
pool = ClusterConnectionPool(
startup_nodes=[{"host": "secure-cluster.example.com", "port": "6380"}],
connection_class=SSLClusterConnection,
ssl_cert_reqs="required",
ssl_ca_certs="/path/to/ca.crt",
ssl_certfile="/path/to/client.crt",
ssl_keyfile="/path/to/client.key"
)
rc = RedisCluster(connection_pool=pool)from rediscluster import RedisCluster, ClusterBlockingConnectionPool
# Blocking pool with limited connections
pool = ClusterBlockingConnectionPool(
startup_nodes=[{"host": "127.0.0.1", "port": "7000"}],
max_connections=20,
timeout=30 # Block up to 30 seconds for connection
)
rc = RedisCluster(connection_pool=pool)from rediscluster import RedisCluster
# Enable reading from replicas for load distribution
rc = RedisCluster(
startup_nodes=[{"host": "127.0.0.1", "port": "7000"}],
read_from_replicas=True,
readonly_mode=False # Allow writes to masters, reads from replicas
)
# Read-only mode (all operations go to replicas when possible)
rc_readonly = RedisCluster(
startup_nodes=[{"host": "127.0.0.1", "port": "7000"}],
readonly_mode=True,
read_from_replicas=True
)# Access connection pool information
pool = rc.connection_pool
# Get cluster topology
nodes = pool.nodes.all_nodes()
for node in nodes:
print(f"Node: {node['host']}:{node['port']} - {node['server_type']}")
# Get slot mappings
slots = pool.nodes.slots
print(f"Total slots mapped: {len(slots)}")
# Connection statistics (if available)
print(f"Active connections: {pool.created_connections}")# Remap internal cluster addresses to external addresses
host_port_remap = {
('internal-node1', 7000): ('external-node1.example.com', 7000),
('internal-node2', 7001): ('external-node2.example.com', 7001),
('internal-node3', 7002): ('external-node3.example.com', 7002)
}
rc = RedisCluster(
startup_nodes=[{"host": "external-node1.example.com", "port": "7000"}],
host_port_remap=host_port_remap
)pool = ClusterConnectionPool(
startup_nodes=[{"host": "127.0.0.1", "port": "7000"}],
init_slot_cache=True, # Initialize slot mappings on startup
max_connections=64, # Total connections across cluster
max_connections_per_node=16, # Per-node limit
reinitialize_steps=1000, # Reinitialize after 1000 commands
skip_full_coverage_check=False, # Ensure all slots covered
nodemanager_follow_cluster=True, # Follow topology changes
socket_timeout=5, # 5 second socket timeout
socket_connect_timeout=5, # 5 second connection timeout
retry_on_timeout=True, # Retry commands on timeout
decode_responses=True # Decode responses to strings
)Install with Tessl CLI
npx tessl i tessl/pypi-redis-py-cluster