Library for communicating with Redis Clusters. Built on top of redis-py lib
—
The ClusterPipeline class provides cluster-aware command batching with automatic routing to appropriate nodes while respecting Redis Cluster constraints. Unlike standard Redis pipelining, cluster pipelines have limitations due to the distributed nature of Redis Cluster.
Cluster-aware pipeline for batching commands with automatic node routing and cluster constraint handling.
class ClusterPipeline(RedisCluster):
def __init__(self, connection_pool, result_callbacks=None,
response_callbacks=None, startup_nodes=None,
read_from_replicas=False, cluster_down_retry_attempts=3):
"""
Initialize cluster pipeline.
Parameters:
- connection_pool (ClusterConnectionPool): Connection pool instance
- result_callbacks (dict, optional): Custom result processing callbacks
- response_callbacks (dict, optional): Custom response processing callbacks
- startup_nodes (List[StartupNode], optional): Startup nodes list
- read_from_replicas (bool): Enable reading from replica nodes
- cluster_down_retry_attempts (int): Retry attempts for cluster-down errors
"""Use pipelines as context managers for automatic execution and cleanup.
def __enter__(self):
"""Enter pipeline context."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit pipeline context with automatic reset."""Queue commands for batch execution with cluster-aware routing.
def execute_command(self, *args, **kwargs):
"""
Queue command for pipeline execution.
Parameters:
- *args: Command name and arguments
- **kwargs: Additional command options
Returns:
ClusterPipeline: Pipeline instance for method chaining
Note: Commands are queued, not executed immediately
"""Execute all queued commands and return results.
def execute(self, raise_on_error=True):
"""
Execute all queued pipeline commands.
Parameters:
- raise_on_error (bool): Raise exception on command errors
Returns:
List[Any]: Results from all executed commands in order
Raises:
RedisClusterException: If commands violate cluster constraints
ResponseError: If individual commands fail and raise_on_error=True
"""
def reset(self):
"""
Clear all queued commands from pipeline.
Pipeline can be reused after reset.
"""Limited set of Redis commands that work in cluster pipeline mode.
def delete(self, *names):
"""
Queue key deletion (single key only in cluster mode).
Parameters:
- *names (str): Key name (only one key allowed)
Returns:
ClusterPipeline: Pipeline instance
Note: Multi-key delete not supported in cluster pipeline
"""
# Standard Redis commands that work in pipeline
def get(self, name): ...
def set(self, name, value, **kwargs): ...
def incr(self, name, amount=1): ...
def decr(self, name, amount=1): ...
def hget(self, name, key): ...
def hset(self, name, key=None, value=None, mapping=None): ...
def lpush(self, name, *values): ...
def rpush(self, name, *values): ...
def lpop(self, name, count=None): ...
def rpop(self, name, count=None): ...
def sadd(self, name, *values): ...
def srem(self, name, *values): ...
def zadd(self, name, mapping, **kwargs): ...
def zrem(self, name, *values): ...Many Redis commands are blocked in cluster pipeline mode due to cluster constraints.
# These commands raise RedisClusterException when used in pipeline:
# - Multi-key operations: mget, mset, del with multiple keys
# - Cross-slot operations: smove, rpoplpush, brpoplpush
# - Transactions: multi, exec, discard, watch, unwatch
# - Pub/sub operations: publish, subscribe, unsubscribe
# - Lua scripts with multiple keys
# - Server management commandsRedis Cluster pipeline operations have several important limitations:
Pipeline execution can fail at various points due to cluster constraints.
# Exception types during pipeline execution:
# - RedisClusterException: Cluster constraint violations
# - ResponseError: Individual command failures
# - ConnectionError: Node connectivity issues
# - MovedError/AskError: Slot migration during executionfrom rediscluster import RedisCluster
rc = RedisCluster(startup_nodes=[{"host": "127.0.0.1", "port": "7000"}])
# Context manager (recommended)
with rc.pipeline() as pipe:
pipe.set("key1", "value1")
pipe.set("key2", "value2")
pipe.get("key1")
pipe.incr("counter")
results = pipe.execute()
print(results) # [True, True, 'value1', 1]# Manual pipeline creation and cleanup
pipe = rc.pipeline()
try:
pipe.set("user:1:name", "Alice")
pipe.set("user:1:email", "alice@example.com")
pipe.hset("user:1:profile", mapping={"age": 30, "city": "New York"})
pipe.get("user:1:name")
pipe.hgetall("user:1:profile")
results = pipe.execute()
print(f"Set results: {results[:3]}") # [True, True, 3]
print(f"Name: {results[3]}") # Alice
print(f"Profile: {results[4]}") # {'age': '30', 'city': 'New York'}
finally:
pipe.reset() # Clean up queued commandspipe = rc.pipeline()
try:
pipe.set("valid_key", "value")
pipe.get("valid_key")
pipe.incr("non_numeric_key") # This will fail
# Execute with error handling
results = pipe.execute(raise_on_error=False)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Command {i} failed: {result}")
else:
print(f"Command {i} result: {result}")
except RedisClusterException as e:
print(f"Cluster constraint violation: {e}")
finally:
pipe.reset()# Pipeline commands must respect slot constraints
import hashlib
def get_slot(key):
"""Calculate Redis Cluster slot for key."""
return hashlib.crc16(key.encode()) % 16384
# Keys that hash to the same slot can be used together
key1 = "user:123:profile"
key2 = "user:123:settings"
if get_slot(key1) == get_slot(key2):
print("Keys are in same slot - can pipeline together")
with rc.pipeline() as pipe:
pipe.hset(key1, "name", "John")
pipe.hset(key2, "theme", "dark")
pipe.hget(key1, "name")
pipe.hget(key2, "theme")
results = pipe.execute()
else:
print("Keys in different slots - use separate commands")# Batch similar operations for better performance
keys_to_set = [f"batch:key:{i}" for i in range(100)]
values = [f"value_{i}" for i in range(100)]
# Process in chunks to avoid large pipeline buildup
chunk_size = 50
for i in range(0, len(keys_to_set), chunk_size):
chunk_keys = keys_to_set[i:i+chunk_size]
chunk_values = values[i:i+chunk_size]
with rc.pipeline() as pipe:
for key, value in zip(chunk_keys, chunk_values):
pipe.set(key, value)
results = pipe.execute()
print(f"Set {len(results)} keys in chunk {i//chunk_size + 1}")# Pipeline with replica reads for load distribution
rc_with_replicas = RedisCluster(
startup_nodes=[{"host": "127.0.0.1", "port": "7000"}],
read_from_replicas=True
)
with rc_with_replicas.pipeline(read_from_replicas=True) as pipe:
# Read operations can go to replicas
pipe.get("readonly_key1")
pipe.get("readonly_key2")
pipe.hgetall("readonly_hash")
read_results = pipe.execute()
print(f"Read results: {read_results}")# Pattern: Single-key operations work reliably
def pipeline_single_key_operations(rc, key_prefix, count):
"""Pipeline operations on keys with same prefix."""
with rc.pipeline() as pipe:
for i in range(count):
key = f"{key_prefix}:{i}"
pipe.set(key, f"value_{i}")
pipe.expire(key, 3600) # 1 hour TTL
results = pipe.execute()
set_results = results[::2] # Every other result (set operations)
expire_results = results[1::2] # Every other result (expire operations)
return set_results, expire_results
# Pattern: Avoiding multi-key operations
def safe_multi_key_pipeline(rc, operations):
"""Execute multi-key operations safely by grouping by slot."""
from collections import defaultdict
# Group operations by calculated slot
slot_groups = defaultdict(list)
for op in operations:
slot = get_slot(op['key'])
slot_groups[slot].append(op)
all_results = []
for slot, ops in slot_groups.items():
with rc.pipeline() as pipe:
for op in ops:
getattr(pipe, op['command'])(op['key'], *op.get('args', []))
results = pipe.execute()
all_results.extend(results)
return all_resultsInstall with Tessl CLI
npx tessl i tessl/pypi-redis-py-cluster