CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-redis-py-cluster

Library for communicating with Redis Clusters. Built on top of redis-py lib

Pending
Overview
Eval results
Files

exceptions.mddocs/

Exception Handling

Comprehensive exception classes for handling cluster-specific errors, redirections, failure scenarios, and constraint violations. These exceptions provide detailed information for implementing proper error handling and retry logic in cluster applications.

Capabilities

Base Cluster Exceptions

Core exception classes for general cluster error handling.

class RedisClusterException(Exception):
    """
    Base exception for all cluster-specific errors.
    
    Used for:
    - Cluster constraint violations
    - Pipeline limitations
    - Configuration errors
    - General cluster operation failures
    """

class RedisClusterError(Exception):
    """
    General cluster operation error.
    
    Used for:
    - Command execution failures
    - Cluster state inconsistencies
    - Operational errors not covered by specific exceptions
    """

Cluster State Exceptions

Exceptions related to cluster availability and operational state.

class ClusterDownException(Exception):
    """
    Raised when cluster reports it's not operational.
    
    Indicates:
    - Cluster is in CLUSTER_STATE_FAIL
    - Not enough master nodes available
    - Cluster cannot serve requests
    """

class ClusterDownError(ClusterError, ResponseError):
    """
    Specific cluster down error with server response details.
    
    Attributes:
    - message (str): Server response message
    """
    
    def __init__(self, resp):
        """
        Initialize with server response.
        
        Parameters:
        - resp (str): Server response message
        """

Slot Constraint Exceptions

Exceptions for hash slot and key distribution violations.

class ClusterCrossSlotError(ResponseError):
    """
    Keys in request don't hash to the same slot.
    
    Raised when:
    - Multi-key operations span different slots
    - Commands require same-slot keys but don't match
    - Pipeline operations violate slot constraints
    
    Attributes:
    - message (str): "Keys in request don't hash to the same slot"
    """

class SlotNotCoveredError(RedisClusterException):
    """
    Requested slot is not covered by any node.
    
    Indicates:
    - Cluster topology incomplete
    - Slot migration in progress
    - Node failures affecting slot coverage
    
    Recovery: Drop current node layout and reconnect
    """

Redirection Exceptions

Exceptions for cluster slot migration and redirection handling.

class AskError(ResponseError):
    """
    Temporary redirection error during slot migration.
    
    Occurs when:
    - Source node is MIGRATING slot to destination node
    - Key may be on either source or destination
    - Client should ASK destination then retry command
    
    Attributes:
    - slot_id (int): Hash slot being migrated
    - host (str): Destination host for redirection
    - port (int): Destination port for redirection  
    - node_addr (tuple): Tuple of (host, port)
    - message (str): Server response message
    """
    
    def __init__(self, resp):
        """
        Parse redirection information from server response.
        
        Parameters:
        - resp (str): Server response "SLOT_ID HOST:PORT"
        """

class MovedError(AskError):
    """
    Permanent redirection error - slot has permanently moved.
    
    Occurs when:
    - Slot has been permanently reassigned to different node
    - Client should update slot mapping
    - Future commands for this slot go to new node
    
    Inherits all attributes from AskError:
    - slot_id, host, port, node_addr, message
    """

class TryAgainError(ResponseError):
    """
    Temporary error - operation should be retried.
    
    Occurs when:
    - Cluster is reconfiguring
    - Temporary resource constraints
    - Node is busy with internal operations
    
    Recommended action: Wait briefly then retry
    """

Node Failure Exceptions

Exceptions for master node failures and availability issues.

class MasterDownError(ClusterDownError):
    """
    Master node is down or unavailable.
    
    Occurs when:
    - Master node has failed
    - Master is unreachable
    - Failover in progress
    
    Recovery actions:
    - Wait for automatic failover
    - Retry with updated cluster topology
    - Use replica if available for reads
    """

Configuration Exceptions

Exceptions for cluster setup and configuration issues.

class RedisClusterConfigError(Exception):
    """
    Cluster configuration error.
    
    Occurs when:
    - Invalid startup nodes
    - Misconfigured cluster parameters
    - Connection setup failures
    """

class SlotNotCoveredError(RedisClusterException):
    """
    Requested slot is not covered by any cluster node.
    
    Occurs when:
    - Cluster topology is incomplete or inconsistent
    - Hash slot is not assigned to any node
    - Cluster is in inconsistent state during reconfiguration
    - Node failures have left slots uncovered
    
    Recovery action:
    - Drop current node layout and attempt to reconnect
    - Refresh cluster topology and slot mappings
    - Ensure cluster has proper slot coverage before retrying
    """

Error Handling Patterns

Retry Logic

Implement proper retry logic for different exception types.

# Exceptions that typically benefit from retries
RETRIABLE_ERRORS = (
    MovedError,      # Update slot mapping and retry
    AskError,        # Send ASK then retry  
    TryAgainError,   # Wait briefly and retry
    MasterDownError, # Wait for failover and retry
    ConnectionError, # Reconnect and retry
    TimeoutError     # Retry with timeout
)

# Exceptions that usually don't benefit from retries
NON_RETRIABLE_ERRORS = (
    ClusterCrossSlotError,    # Fix key distribution
    RedisClusterException,    # Fix application logic
    SlotNotCoveredError      # Reinitialize cluster
)

Redirection Handling

Handle cluster redirections properly for slot migrations.

def handle_ask_error(client, error, command, *args):
    """
    Handle ASK redirection during slot migration.
    
    Parameters:
    - client: Redis cluster client
    - error (AskError): ASK error with redirection info
    - command (str): Original command name
    - *args: Original command arguments
    
    Returns:
    Any: Command result from destination node
    """

def handle_moved_error(client, error, command, *args):
    """
    Handle MOVED redirection for permanent slot moves.
    
    Parameters:
    - client: Redis cluster client  
    - error (MovedError): MOVED error with new node info
    - command (str): Original command name
    - *args: Original command arguments
    
    Returns:
    Any: Command result after slot mapping update
    """

Usage Examples

Basic Exception Handling

from rediscluster import RedisCluster
from rediscluster import (
    RedisClusterException, ClusterDownError, MovedError, 
    AskError, ClusterCrossSlotError
)

rc = RedisCluster(startup_nodes=[{"host": "127.0.0.1", "port": "7000"}])

try:
    # This might fail if keys are in different slots
    result = rc.mget(["key1", "key2", "key3"])
    
except ClusterCrossSlotError:
    print("Keys span multiple slots - using individual gets")
    results = []
    for key in ["key1", "key2", "key3"]:
        try:
            results.append(rc.get(key))
        except Exception as e:
            print(f"Failed to get {key}: {e}")
            results.append(None)

except ClusterDownError as e:
    print(f"Cluster is down: {e.message}")
    # Wait and retry or use fallback

except RedisClusterException as e:
    print(f"Cluster error: {e}")
    # Handle cluster-specific issues

Redirection Error Handling

import time
from rediscluster import RedisCluster
from rediscluster import MovedError, AskError, TryAgainError

def execute_with_redirections(rc, command, *args, max_redirections=5):
    """Execute command with automatic redirection handling."""
    redirections = 0
    
    while redirections < max_redirections:
        try:
            return rc.execute_command(command, *args)
            
        except MovedError as e:
            print(f"MOVED: slot {e.slot_id} -> {e.host}:{e.port}")
            # Client automatically updates slot mapping
            redirections += 1
            
        except AskError as e:
            print(f"ASK: slot {e.slot_id} -> {e.host}:{e.port}")
            # Client handles ASK redirection automatically
            redirections += 1
            
        except TryAgainError:
            print("TRY_AGAIN - waiting before retry")
            time.sleep(0.1)  # Brief wait
            redirections += 1
    
    raise Exception(f"Too many redirections ({max_redirections})")

# Usage
try:
    result = execute_with_redirections(rc, "GET", "some_key")
    print(f"Result: {result}")
except Exception as e:
    print(f"Failed after redirections: {e}")

Comprehensive Error Handling

import logging
import time
from rediscluster import RedisCluster
from rediscluster import *
from redis.exceptions import ConnectionError, TimeoutError

def robust_cluster_operation(rc, operation_func, *args, **kwargs):
    """
    Execute cluster operation with comprehensive error handling.
    
    Parameters:
    - rc: RedisCluster instance
    - operation_func: Function to execute (e.g., rc.get, rc.set)
    - *args, **kwargs: Arguments for operation_func
    
    Returns:
    Any: Operation result or None if all retries failed
    """
    max_retries = 3
    retry_delay = 0.5
    
    for attempt in range(max_retries):
        try:
            return operation_func(*args, **kwargs)
            
        except ClusterCrossSlotError:
            logging.error("Keys span multiple slots - cannot execute as single operation")
            return None
            
        except (MovedError, AskError) as e:
            logging.info(f"Redirection: {type(e).__name__} - slot {e.slot_id} -> {e.host}:{e.port}")
            # Client handles redirection automatically, retry
            time.sleep(retry_delay)
            
        except TryAgainError:
            logging.info("TRY_AGAIN received - cluster busy")
            time.sleep(retry_delay)
            
        except (ClusterDownError, MasterDownError) as e:
            logging.warning(f"Cluster/Master down: {e.message}")
            time.sleep(retry_delay * 2)  # Longer wait for cluster issues
            
        except (ConnectionError, TimeoutError) as e:
            logging.warning(f"Connection issue: {e}")
            time.sleep(retry_delay)
            
        except SlotNotCoveredError:
            logging.error("Slot not covered - cluster topology issue")
            # Force cluster reinitialize  
            rc.connection_pool.reset()
            time.sleep(retry_delay)
            
        except RedisClusterException as e:
            logging.error(f"Cluster constraint violation: {e}")
            return None  # Don't retry constraint violations
            
        retry_delay *= 1.5  # Exponential backoff
    
    logging.error(f"Operation failed after {max_retries} attempts")
    return None

# Usage examples
rc = RedisCluster(startup_nodes=[{"host": "127.0.0.1", "port": "7000"}])

# Robust single operations
value = robust_cluster_operation(rc, rc.get, "some_key")
success = robust_cluster_operation(rc, rc.set, "some_key", "some_value")

# Robust multi-key operations (handles cross-slot errors)
values = robust_cluster_operation(rc, rc.mget, ["key1", "key2", "key3"])
if values is None:
    # Fall back to individual gets
    values = []
    for key in ["key1", "key2", "key3"]:
        val = robust_cluster_operation(rc, rc.get, key)
        values.append(val)

Pipeline Error Handling

from rediscluster import RedisCluster, ClusterPipeline
from rediscluster import RedisClusterException

def safe_pipeline_execution(rc, commands):
    """
    Execute pipeline commands with error handling.
    
    Parameters:
    - rc: RedisCluster instance
    - commands: List of (command, args) tuples
    
    Returns:
    List: Results or error information for each command
    """
    pipe = rc.pipeline()
    
    try:
        # Queue all commands
        for command, args in commands:
            getattr(pipe, command)(*args)
        
        # Execute pipeline
        results = pipe.execute(raise_on_error=False)
        
        # Check for individual command errors
        processed_results = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                command, args = commands[i]
                logging.error(f"Pipeline command {command}({args}) failed: {result}")
                processed_results.append({"error": str(result)})
            else:
                processed_results.append({"result": result})
        
        return processed_results
        
    except RedisClusterException as e:
        logging.error(f"Pipeline constraint violation: {e}")
        return [{"error": "Pipeline blocked by cluster constraints"}] * len(commands)
        
    finally:
        pipe.reset()

# Usage
commands = [
    ("set", ["key1", "value1"]),
    ("set", ["key2", "value2"]), 
    ("get", ["key1"]),
    ("incr", ["counter"])
]

results = safe_pipeline_execution(rc, commands)
for i, result in enumerate(results):
    command, args = commands[i]
    if "error" in result:
        print(f"Command {command}({args}) failed: {result['error']}")
    else:
        print(f"Command {command}({args}) result: {result['result']}")

Install with Tessl CLI

npx tessl i tessl/pypi-redis-py-cluster

docs

client.md

connections.md

exceptions.md

index.md

pipeline.md

pubsub.md

tile.json