CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-redis

Python client for Redis database and key-value store

Pending
Overview
Eval results
Files

error-handling.mddocs/

Error Handling

Redis Python client provides comprehensive exception classes for handling different types of errors that can occur during Redis operations. Proper error handling ensures robust applications that can gracefully handle connection issues, data errors, and Redis server states.

Capabilities

Exception Hierarchy

Redis exception classes organized by error type for precise error handling.

class RedisError(Exception):
    """Base exception class for all Redis errors"""
    pass

# Connection-related exceptions
class ConnectionError(RedisError):
    """Connection to Redis server failed"""
    pass

class TimeoutError(RedisError):
    """Operation timed out"""
    pass

class BusyLoadingError(ConnectionError):
    """Redis server is busy loading data from disk"""
    pass

class MaxConnectionsError(ConnectionError):
    """Maximum connections exceeded in connection pool"""
    pass

class ChildDeadlockedError(Exception):
    """Child process deadlocked"""
    pass

# Authentication and authorization exceptions  
class AuthenticationError(ConnectionError):
    """Authentication with Redis server failed"""
    pass

class AuthenticationWrongNumberOfArgsError(AuthenticationError):
    """Wrong number of arguments for AUTH command"""
    pass

# Data and response exceptions
class ResponseError(RedisError):
    """Invalid response from Redis server"""
    pass

class DataError(RedisError):
    """Invalid data type or value"""
    pass

class InvalidResponse(RedisError):
    """Invalid response format from Redis"""
    pass

class OutOfMemoryError(RedisError):
    """Redis server is out of memory"""
    pass

class ReadOnlyError(RedisError):
    """Attempted write operation on read-only connection"""
    pass

# Transaction exceptions
class WatchError(RedisError):
    """Watched key was modified during transaction"""
    pass

class InvalidPipelineStack(RedisError):
    """Invalid pipeline stack state"""
    pass

# Pub/Sub exceptions
class PubSubError(RedisError):
    """Publish/Subscribe operation error"""
    pass

# Cluster exceptions
class RedisClusterException(RedisError):
    """Base exception for Redis Cluster errors"""
    pass

class ClusterDownError(RedisClusterException, ConnectionError):
    """Redis Cluster is down or unreachable"""
    pass

class ClusterError(RedisClusterException):
    """Generic Redis Cluster error"""
    pass

class ClusterCrossSlotError(RedisClusterException):
    """Cross-slot operation in Redis Cluster"""
    pass

class CrossSlotTransactionError(RedisClusterException):
    """Cross-slot transaction error"""
    pass

class MasterNotFoundError(RedisClusterException):
    """No master found for given key"""
    pass

class SlaveNotFoundError(RedisClusterException):
    """No slave found for given key"""
    pass

class ClusterTransactionError(RedisClusterException):
    """Transaction error in cluster mode"""
    pass

class AskError(RedisClusterException):
    """ASK redirection in Redis Cluster"""
    def __init__(self, resp):
        self.slot_id, self.node_addr = resp.split(' ')
        self.slot_id = int(self.slot_id)
        super(AskError, self).__init__(
            f"ASK {self.slot_id} {self.node_addr}"
        )

class MovedError(RedisClusterException):
    """MOVED redirection in Redis Cluster"""
    def __init__(self, resp):
        self.slot_id, self.node_addr = resp.split(' ')
        self.slot_id = int(self.slot_id)
        super(MovedError, self).__init__(
            f"MOVED {self.slot_id} {self.node_addr}"
        )

class TryAgainError(RedisClusterException):
    """TRYAGAIN error in Redis Cluster"""
    pass

Usage Examples

Basic Error Handling

import redis
from redis.exceptions import (
    ConnectionError, 
    TimeoutError, 
    ResponseError, 
    AuthenticationError
)

def safe_redis_operation():
    """Basic error handling for Redis operations"""
    try:
        r = redis.Redis(
            host='localhost',
            port=6379,
            socket_timeout=5,
            socket_connect_timeout=10,
            retry_on_timeout=True
        )
        
        # Test connection
        r.ping()
        
        # Perform operations
        r.set('test_key', 'test_value')
        value = r.get('test_key')
        print(f"Success: {value}")
        
    except ConnectionError as e:
        print(f"Connection failed: {e}")
        # Handle connection issues (retry, fallback, etc.)
        
    except TimeoutError as e:
        print(f"Operation timed out: {e}")
        # Handle timeouts (retry with backoff, etc.)
        
    except AuthenticationError as e:
        print(f"Authentication failed: {e}")
        # Handle auth issues (check credentials, etc.)
        
    except ResponseError as e:
        print(f"Redis responded with error: {e}")
        # Handle Redis command errors
        
    except Exception as e:
        print(f"Unexpected error: {e}")
        # Handle any other errors

safe_redis_operation()

Connection Retry Logic

import redis
import time
import random
from redis.exceptions import ConnectionError, TimeoutError

class RedisRetryClient:
    def __init__(self, max_retries=3, backoff_factor=1.0, **redis_kwargs):
        self.max_retries = max_retries
        self.backoff_factor = backoff_factor
        self.redis_kwargs = redis_kwargs
        self.client = None
    
    def _get_client(self):
        """Get Redis client with connection retry"""
        if self.client is None:
            self.client = redis.Redis(**self.redis_kwargs)
        return self.client
    
    def _retry_operation(self, operation, *args, **kwargs):
        """Execute operation with retry logic"""
        last_error = None
        
        for attempt in range(self.max_retries + 1):
            try:
                client = self._get_client()
                return operation(client, *args, **kwargs)
                
            except (ConnectionError, TimeoutError) as e:
                last_error = e
                self.client = None  # Reset client for next attempt
                
                if attempt < self.max_retries:
                    # Exponential backoff with jitter
                    delay = self.backoff_factor * (2 ** attempt)
                    jitter = random.uniform(0, 0.1) * delay
                    total_delay = delay + jitter
                    
                    print(f"Attempt {attempt + 1} failed: {e}")
                    print(f"Retrying in {total_delay:.2f} seconds...")
                    time.sleep(total_delay)
                else:
                    print(f"All {self.max_retries + 1} attempts failed")
        
        raise last_error
    
    def set(self, key, value, **kwargs):
        """Set with retry"""
        return self._retry_operation(lambda client, k, v, **kw: client.set(k, v, **kw), key, value, **kwargs)
    
    def get(self, key):
        """Get with retry"""
        return self._retry_operation(lambda client, k: client.get(k), key)
    
    def ping(self):
        """Ping with retry"""
        return self._retry_operation(lambda client: client.ping())

# Usage example
retry_client = RedisRetryClient(
    max_retries=3,
    backoff_factor=0.5,
    host='localhost',
    port=6379,
    socket_timeout=2,
    socket_connect_timeout=5
)

try:
    retry_client.ping()
    retry_client.set('retry_test', 'success')
    value = retry_client.get('retry_test')
    print(f"Retrieved: {value}")
except Exception as e:
    print(f"Operation failed after retries: {e}")

Transaction Error Handling

import redis
from redis.exceptions import WatchError, ResponseError

def safe_transaction_increment(key, increment_by=1):
    """Safely increment a counter with transaction error handling"""
    r = redis.Redis(host='localhost', port=6379, db=0)
    
    max_attempts = 10
    
    for attempt in range(max_attempts):
        try:
            pipe = r.pipeline()
            pipe.watch(key)
            
            # Get current value
            current_value = r.get(key)
            current_value = int(current_value) if current_value else 0
            
            # Start transaction
            pipe.multi()
            pipe.set(key, current_value + increment_by)
            
            # Execute transaction
            result = pipe.execute()
            
            new_value = current_value + increment_by
            print(f"Successfully incremented {key} to {new_value}")
            return new_value
            
        except WatchError:
            print(f"Transaction attempt {attempt + 1}: Key '{key}' was modified, retrying...")
            if attempt == max_attempts - 1:
                raise Exception(f"Failed to increment after {max_attempts} attempts")
            time.sleep(0.01 * (2 ** attempt))  # Exponential backoff
            
        except ResponseError as e:
            print(f"Redis command error: {e}")
            raise
            
        except Exception as e:
            print(f"Unexpected error during transaction: {e}")
            raise

# Initialize counter
r = redis.Redis(host='localhost', port=6379, db=0)
r.set('safe_counter', 0)

# Test concurrent increments
import threading

def worker(worker_id):
    try:
        for i in range(5):
            safe_transaction_increment('safe_counter')
            print(f"Worker {worker_id}: Increment {i+1} completed")
    except Exception as e:
        print(f"Worker {worker_id} failed: {e}")

# Run multiple workers
threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

final_value = r.get('safe_counter')
print(f"Final counter value: {final_value}")

Cluster Error Handling

import redis
from redis.cluster import RedisCluster, ClusterNode
from redis.exceptions import (
    RedisClusterException,
    ClusterDownError,
    MovedError,
    AskError,
    ClusterCrossSlotError
)

def safe_cluster_operations():
    """Handle Redis Cluster specific errors"""
    startup_nodes = [
        ClusterNode("localhost", 7000),
        ClusterNode("localhost", 7001),
        ClusterNode("localhost", 7002)
    ]
    
    try:
        cluster = RedisCluster(
            startup_nodes=startup_nodes,
            decode_responses=True,
            skip_full_coverage_check=True,
            cluster_error_retry_attempts=3
        )
        
        # Basic operations
        cluster.set("user:1001", "John")
        user = cluster.get("user:1001")
        print(f"User: {user}")
        
        # Multi-key operation (potential cross-slot error)
        try:
            keys = ["user:1001", "user:1002", "user:1003"]
            values = cluster.mget(keys)
            print(f"Multi-get success: {values}")
        except ClusterCrossSlotError as e:
            print(f"Cross-slot operation error: {e}")
            # Handle by getting keys individually
            values = []
            for key in keys:
                try:
                    value = cluster.get(key)
                    values.append(value)
                except Exception as key_error:
                    print(f"Error getting {key}: {key_error}")
                    values.append(None)
            print(f"Individual gets: {values}")
        
    except ClusterDownError as e:
        print(f"Cluster is down: {e}")
        # Implement fallback or circuit breaker
        
    except MovedError as e:
        print(f"Slot moved: {e}")
        print(f"New location: slot {e.slot_id} at {e.node_addr}")
        # Client should automatically handle this
        
    except AskError as e:
        print(f"ASK redirection: {e}")
        print(f"Temporary redirection: slot {e.slot_id} at {e.node_addr}")
        # Client should automatically handle this
        
    except RedisClusterException as e:
        print(f"Cluster error: {e}")
        
    except Exception as e:
        print(f"Unexpected error: {e}")

safe_cluster_operations()

Pub/Sub Error Handling

import redis
import time
import threading
from redis.exceptions import PubSubError, ConnectionError

class RobustSubscriber:
    def __init__(self, channels, **redis_kwargs):
        self.channels = channels
        self.redis_kwargs = redis_kwargs
        self.running = False
        self.reconnect_attempts = 0
        self.max_reconnect_attempts = 5
    
    def start(self):
        """Start subscriber with automatic reconnection"""
        self.running = True
        
        while self.running and self.reconnect_attempts < self.max_reconnect_attempts:
            try:
                self._subscribe_loop()
                break  # Normal exit
                
            except (ConnectionError, PubSubError) as e:
                self.reconnect_attempts += 1
                print(f"Subscription error (attempt {self.reconnect_attempts}): {e}")
                
                if self.reconnect_attempts < self.max_reconnect_attempts:
                    wait_time = min(2 ** self.reconnect_attempts, 30)
                    print(f"Reconnecting in {wait_time} seconds...")
                    time.sleep(wait_time)
                else:
                    print("Max reconnection attempts reached")
                    raise
            
            except Exception as e:
                print(f"Unexpected subscriber error: {e}")
                break
    
    def _subscribe_loop(self):
        """Main subscription loop"""
        try:
            r = redis.Redis(**self.redis_kwargs)
            pubsub = r.pubsub()
            
            # Subscribe to channels
            pubsub.subscribe(*self.channels)
            print(f"Subscribed to channels: {self.channels}")
            
            # Reset reconnect counter on successful connection
            self.reconnect_attempts = 0
            
            # Listen for messages
            for message in pubsub.listen():
                if not self.running:
                    break
                    
                try:
                    self._handle_message(message)
                except Exception as e:
                    print(f"Error handling message: {e}")
                    # Continue listening despite handler errors
                    
        except Exception as e:
            print(f"Subscription loop error: {e}")
            raise
        finally:
            try:
                pubsub.close()
            except:
                pass
    
    def _handle_message(self, message):
        """Handle received message"""
        if message['type'] == 'message':
            channel = message['channel']
            data = message['data']
            print(f"Received on {channel}: {data}")
        elif message['type'] == 'subscribe':
            print(f"Confirmed subscription to: {message['channel']}")
    
    def stop(self):
        """Stop the subscriber"""
        self.running = False

# Usage example
def test_robust_subscriber():
    subscriber = RobustSubscriber(
        channels=['events', 'notifications'],
        host='localhost',
        port=6379,
        socket_timeout=5,
        retry_on_timeout=True
    )
    
    # Start subscriber in thread
    subscriber_thread = threading.Thread(target=subscriber.start)
    subscriber_thread.daemon = True
    subscriber_thread.start()
    
    # Simulate some publishing
    time.sleep(1)
    try:
        publisher = redis.Redis(host='localhost', port=6379)
        for i in range(10):
            publisher.publish('events', f'Event {i}')
            time.sleep(0.5)
    except Exception as e:
        print(f"Publishing error: {e}")
    
    # Stop subscriber
    subscriber.stop()
    subscriber_thread.join(timeout=5)

test_robust_subscriber()

Data Validation and Error Prevention

import redis
from redis.exceptions import DataError, ResponseError
import json

class ValidatedRedisClient:
    def __init__(self, **redis_kwargs):
        self.client = redis.Redis(**redis_kwargs)
    
    def set_json(self, key, data, **kwargs):
        """Set JSON data with validation"""
        try:
            # Validate data is JSON serializable
            json_data = json.dumps(data)
            return self.client.set(key, json_data, **kwargs)
        except (TypeError, ValueError) as e:
            raise DataError(f"Invalid JSON data: {e}")
        except ResponseError as e:
            raise ResponseError(f"Redis error setting JSON: {e}")
    
    def get_json(self, key):
        """Get JSON data with validation"""
        try:
            raw_data = self.client.get(key)
            if raw_data is None:
                return None
            return json.loads(raw_data)
        except json.JSONDecodeError as e:
            raise DataError(f"Invalid JSON in Redis key '{key}': {e}")
        except ResponseError as e:
            raise ResponseError(f"Redis error getting JSON: {e}")
    
    def safe_incr(self, key, amount=1):
        """Increment with type validation"""
        try:
            return self.client.incr(key, amount)
        except ResponseError as e:
            if "not an integer" in str(e).lower():
                raise DataError(f"Key '{key}' contains non-integer value")
            raise
    
    def safe_lpush(self, key, *values):
        """List push with type validation"""
        try:
            # Filter out None values
            valid_values = [v for v in values if v is not None]
            if not valid_values:
                raise DataError("No valid values provided for list push")
            return self.client.lpush(key, *valid_values)
        except ResponseError as e:
            if "wrong kind" in str(e).lower():
                raise DataError(f"Key '{key}' is not a list")
            raise

# Usage example
def test_validated_client():
    client = ValidatedRedisClient(host='localhost', port=6379, decode_responses=True)
    
    # Test JSON operations
    try:
        test_data = {'name': 'John', 'age': 30, 'active': True}
        client.set_json('user:1001', test_data)
        
        retrieved_data = client.get_json('user:1001')
        print(f"JSON data: {retrieved_data}")
        
        # Test invalid JSON
        client.client.set('invalid_json', 'not json data')
        invalid_data = client.get_json('invalid_json')
        
    except DataError as e:
        print(f"Data validation error: {e}")
    
    # Test safe increment
    try:
        client.client.set('counter', 10)
        result = client.safe_incr('counter', 5)
        print(f"Incremented to: {result}")
        
        # Try to increment non-integer
        client.client.set('text_key', 'hello')
        client.safe_incr('text_key')
        
    except DataError as e:
        print(f"Increment validation error: {e}")
    
    # Test safe list operations
    try:
        client.safe_lpush('my_list', 'item1', 'item2', None, 'item3')
        
        # Try to push to non-list
        client.client.set('string_key', 'value')
        client.safe_lpush('string_key', 'item')
        
    except DataError as e:
        print(f"List operation error: {e}")

test_validated_client()

Comprehensive Error Logging

import redis
import logging
from redis.exceptions import RedisError, ConnectionError, TimeoutError

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('redis_client')

class LoggingRedisClient:
    def __init__(self, **redis_kwargs):
        self.client = redis.Redis(**redis_kwargs)
        self.operation_count = 0
        self.error_count = 0
    
    def _log_operation(self, operation, key=None, success=True, error=None):
        """Log Redis operations and errors"""
        self.operation_count += 1
        
        if success:
            logger.info(f"Operation {self.operation_count}: {operation} {'on ' + str(key) if key else ''} - SUCCESS")
        else:
            self.error_count += 1
            logger.error(f"Operation {self.operation_count}: {operation} {'on ' + str(key) if key else ''} - ERROR: {error}")
    
    def safe_execute(self, operation_name, func, *args, **kwargs):
        """Execute Redis operation with comprehensive error logging"""
        try:
            result = func(*args, **kwargs)
            self._log_operation(operation_name, args[0] if args else None, success=True)
            return result
            
        except ConnectionError as e:
            self._log_operation(operation_name, args[0] if args else None, success=False, error=f"Connection error: {e}")
            raise
            
        except TimeoutError as e:
            self._log_operation(operation_name, args[0] if args else None, success=False, error=f"Timeout: {e}")
            raise
            
        except RedisError as e:
            self._log_operation(operation_name, args[0] if args else None, success=False, error=f"Redis error: {e}")
            raise
            
        except Exception as e:
            self._log_operation(operation_name, args[0] if args else None, success=False, error=f"Unexpected error: {e}")
            raise
    
    def set(self, key, value, **kwargs):
        return self.safe_execute('SET', self.client.set, key, value, **kwargs)
    
    def get(self, key):
        return self.safe_execute('GET', self.client.get, key)
    
    def delete(self, *keys):
        return self.safe_execute('DELETE', self.client.delete, *keys)
    
    def ping(self):
        return self.safe_execute('PING', self.client.ping)
    
    def get_stats(self):
        """Get operation statistics"""
        success_rate = ((self.operation_count - self.error_count) / self.operation_count * 100) if self.operation_count > 0 else 0
        return {
            'total_operations': self.operation_count,
            'errors': self.error_count,
            'success_rate': f"{success_rate:.2f}%"
        }

# Usage example
def test_logging_client():
    client = LoggingRedisClient(host='localhost', port=6379, socket_timeout=1)
    
    try:
        # Successful operations
        client.ping()
        client.set('test_key', 'test_value')
        value = client.get('test_key')
        print(f"Retrieved: {value}")
        
        # This might cause an error if Redis is not running
        client.set('another_key', 'another_value')
        client.delete('test_key', 'another_key')
        
    except Exception as e:
        logger.error(f"Client test failed: {e}")
    
    finally:
        stats = client.get_stats()
        logger.info(f"Operation statistics: {stats}")

test_logging_client()

Install with Tessl CLI

npx tessl i tessl/pypi-redis

docs

async-support.md

cluster-support.md

connection-management.md

core-client.md

distributed-locking.md

error-handling.md

high-availability.md

index.md

pipelines-transactions.md

pubsub-messaging.md

tile.json