CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-redis-py-cluster

Library for communicating with Redis Clusters. Built on top of redis-py lib

Pending
Overview
Eval results
Files

pipeline.mddocs/

Cluster Pipeline

The ClusterPipeline class provides cluster-aware command batching with automatic routing to appropriate nodes while respecting Redis Cluster constraints. Unlike standard Redis pipelining, cluster pipelines have limitations due to the distributed nature of Redis Cluster.

Capabilities

ClusterPipeline Class

Cluster-aware pipeline for batching commands with automatic node routing and cluster constraint handling.

class ClusterPipeline(RedisCluster):
    def __init__(self, connection_pool, result_callbacks=None,
                 response_callbacks=None, startup_nodes=None,
                 read_from_replicas=False, cluster_down_retry_attempts=3):
        """
        Initialize cluster pipeline.
        
        Parameters:
        - connection_pool (ClusterConnectionPool): Connection pool instance
        - result_callbacks (dict, optional): Custom result processing callbacks
        - response_callbacks (dict, optional): Custom response processing callbacks  
        - startup_nodes (List[StartupNode], optional): Startup nodes list
        - read_from_replicas (bool): Enable reading from replica nodes
        - cluster_down_retry_attempts (int): Retry attempts for cluster-down errors
        """

Pipeline Context Management

Use pipelines as context managers for automatic execution and cleanup.

def __enter__(self):
    """Enter pipeline context."""
    
def __exit__(self, exc_type, exc_val, exc_tb):
    """Exit pipeline context with automatic reset."""

Command Queuing

Queue commands for batch execution with cluster-aware routing.

def execute_command(self, *args, **kwargs):
    """
    Queue command for pipeline execution.
    
    Parameters:
    - *args: Command name and arguments
    - **kwargs: Additional command options
    
    Returns:
    ClusterPipeline: Pipeline instance for method chaining
    
    Note: Commands are queued, not executed immediately
    """

Pipeline Execution

Execute all queued commands and return results.

def execute(self, raise_on_error=True):
    """
    Execute all queued pipeline commands.
    
    Parameters:
    - raise_on_error (bool): Raise exception on command errors
    
    Returns:
    List[Any]: Results from all executed commands in order
    
    Raises:
    RedisClusterException: If commands violate cluster constraints
    ResponseError: If individual commands fail and raise_on_error=True
    """

def reset(self):
    """
    Clear all queued commands from pipeline.
    Pipeline can be reused after reset.
    """

Supported Commands

Limited set of Redis commands that work in cluster pipeline mode.

def delete(self, *names):
    """
    Queue key deletion (single key only in cluster mode).
    
    Parameters:
    - *names (str): Key name (only one key allowed)
    
    Returns:
    ClusterPipeline: Pipeline instance
    
    Note: Multi-key delete not supported in cluster pipeline
    """

# Standard Redis commands that work in pipeline
def get(self, name): ...
def set(self, name, value, **kwargs): ...
def incr(self, name, amount=1): ...
def decr(self, name, amount=1): ...
def hget(self, name, key): ...
def hset(self, name, key=None, value=None, mapping=None): ...
def lpush(self, name, *values): ...
def rpush(self, name, *values): ...
def lpop(self, name, count=None): ...
def rpop(self, name, count=None): ...
def sadd(self, name, *values): ...
def srem(self, name, *values): ...
def zadd(self, name, mapping, **kwargs): ...
def zrem(self, name, *values): ...

Blocked Commands

Many Redis commands are blocked in cluster pipeline mode due to cluster constraints.

# These commands raise RedisClusterException when used in pipeline:
# - Multi-key operations: mget, mset, del with multiple keys
# - Cross-slot operations: smove, rpoplpush, brpoplpush
# - Transactions: multi, exec, discard, watch, unwatch
# - Pub/sub operations: publish, subscribe, unsubscribe
# - Lua scripts with multiple keys
# - Server management commands

Pipeline Limitations

Cluster Constraints

Redis Cluster pipeline operations have several important limitations:

  1. Single Key Operations: Most pipelined operations must operate on single keys
  2. Same Slot Requirement: Multi-key operations must hash to the same slot
  3. No Transactions: MULTI/EXEC transactions not supported
  4. No Cross-Slot Commands: Commands spanning multiple slots blocked
  5. Limited Lua Scripts: Scripts with multiple keys must use same slot

Error Handling

Pipeline execution can fail at various points due to cluster constraints.

# Exception types during pipeline execution:
# - RedisClusterException: Cluster constraint violations
# - ResponseError: Individual command failures  
# - ConnectionError: Node connectivity issues
# - MovedError/AskError: Slot migration during execution

Usage Examples

Basic Pipeline Usage

from rediscluster import RedisCluster

rc = RedisCluster(startup_nodes=[{"host": "127.0.0.1", "port": "7000"}])

# Context manager (recommended)
with rc.pipeline() as pipe:
    pipe.set("key1", "value1")
    pipe.set("key2", "value2")
    pipe.get("key1")
    pipe.incr("counter")
    results = pipe.execute()

print(results)  # [True, True, 'value1', 1]

Manual Pipeline Management

# Manual pipeline creation and cleanup
pipe = rc.pipeline()

try:
    pipe.set("user:1:name", "Alice")
    pipe.set("user:1:email", "alice@example.com")
    pipe.hset("user:1:profile", mapping={"age": 30, "city": "New York"})
    pipe.get("user:1:name")
    pipe.hgetall("user:1:profile")
    
    results = pipe.execute()
    print(f"Set results: {results[:3]}")  # [True, True, 3]
    print(f"Name: {results[3]}")          # Alice
    print(f"Profile: {results[4]}")       # {'age': '30', 'city': 'New York'}
    
finally:
    pipe.reset()  # Clean up queued commands

Error Handling

pipe = rc.pipeline()

try:
    pipe.set("valid_key", "value")
    pipe.get("valid_key")
    pipe.incr("non_numeric_key")  # This will fail
    
    # Execute with error handling
    results = pipe.execute(raise_on_error=False)
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Command {i} failed: {result}")
        else:
            print(f"Command {i} result: {result}")
            
except RedisClusterException as e:
    print(f"Cluster constraint violation: {e}")
    
finally:
    pipe.reset()

Working with Hash Slots

# Pipeline commands must respect slot constraints
import hashlib

def get_slot(key):
    """Calculate Redis Cluster slot for key."""
    return hashlib.crc16(key.encode()) % 16384

# Keys that hash to the same slot can be used together
key1 = "user:123:profile"
key2 = "user:123:settings"

if get_slot(key1) == get_slot(key2):
    print("Keys are in same slot - can pipeline together")
    with rc.pipeline() as pipe:
        pipe.hset(key1, "name", "John")
        pipe.hset(key2, "theme", "dark")
        pipe.hget(key1, "name")
        pipe.hget(key2, "theme")
        results = pipe.execute()
else:
    print("Keys in different slots - use separate commands")

Pipeline Performance Optimization

# Batch similar operations for better performance
keys_to_set = [f"batch:key:{i}" for i in range(100)]
values = [f"value_{i}" for i in range(100)]

# Process in chunks to avoid large pipeline buildup
chunk_size = 50
for i in range(0, len(keys_to_set), chunk_size):
    chunk_keys = keys_to_set[i:i+chunk_size]
    chunk_values = values[i:i+chunk_size]
    
    with rc.pipeline() as pipe:
        for key, value in zip(chunk_keys, chunk_values):
            pipe.set(key, value)
        
        results = pipe.execute()
        print(f"Set {len(results)} keys in chunk {i//chunk_size + 1}")

Read-from-Replicas Pipeline

# Pipeline with replica reads for load distribution
rc_with_replicas = RedisCluster(
    startup_nodes=[{"host": "127.0.0.1", "port": "7000"}],
    read_from_replicas=True
)

with rc_with_replicas.pipeline(read_from_replicas=True) as pipe:
    # Read operations can go to replicas
    pipe.get("readonly_key1")
    pipe.get("readonly_key2") 
    pipe.hgetall("readonly_hash")
    
    read_results = pipe.execute()
    print(f"Read results: {read_results}")

Cluster-Specific Pipeline Patterns

# Pattern: Single-key operations work reliably
def pipeline_single_key_operations(rc, key_prefix, count):
    """Pipeline operations on keys with same prefix."""
    with rc.pipeline() as pipe:
        for i in range(count):
            key = f"{key_prefix}:{i}"
            pipe.set(key, f"value_{i}")
            pipe.expire(key, 3600)  # 1 hour TTL
        
        results = pipe.execute()
        set_results = results[::2]    # Every other result (set operations)
        expire_results = results[1::2]  # Every other result (expire operations)
        
        return set_results, expire_results

# Pattern: Avoiding multi-key operations
def safe_multi_key_pipeline(rc, operations):
    """Execute multi-key operations safely by grouping by slot."""
    from collections import defaultdict
    
    # Group operations by calculated slot
    slot_groups = defaultdict(list)
    for op in operations:
        slot = get_slot(op['key'])
        slot_groups[slot].append(op)
    
    all_results = []
    for slot, ops in slot_groups.items():
        with rc.pipeline() as pipe:
            for op in ops:
                getattr(pipe, op['command'])(op['key'], *op.get('args', []))
            
            results = pipe.execute()
            all_results.extend(results)
    
    return all_results

Install with Tessl CLI

npx tessl i tessl/pypi-redis-py-cluster

docs

client.md

connections.md

exceptions.md

index.md

pipeline.md

pubsub.md

tile.json