Python client for Redis database and key-value store
—
Redis Python client provides comprehensive exception classes for handling different types of errors that can occur during Redis operations. Proper error handling ensures robust applications that can gracefully handle connection issues, data errors, and Redis server states.
Redis exception classes organized by error type for precise error handling.
class RedisError(Exception):
"""Base exception class for all Redis errors"""
pass
# Connection-related exceptions
class ConnectionError(RedisError):
"""Connection to Redis server failed"""
pass
class TimeoutError(RedisError):
"""Operation timed out"""
pass
class BusyLoadingError(ConnectionError):
"""Redis server is busy loading data from disk"""
pass
class MaxConnectionsError(ConnectionError):
"""Maximum connections exceeded in connection pool"""
pass
class ChildDeadlockedError(Exception):
"""Child process deadlocked"""
pass
# Authentication and authorization exceptions
class AuthenticationError(ConnectionError):
"""Authentication with Redis server failed"""
pass
class AuthenticationWrongNumberOfArgsError(AuthenticationError):
"""Wrong number of arguments for AUTH command"""
pass
# Data and response exceptions
class ResponseError(RedisError):
"""Invalid response from Redis server"""
pass
class DataError(RedisError):
"""Invalid data type or value"""
pass
class InvalidResponse(RedisError):
"""Invalid response format from Redis"""
pass
class OutOfMemoryError(RedisError):
"""Redis server is out of memory"""
pass
class ReadOnlyError(RedisError):
"""Attempted write operation on read-only connection"""
pass
# Transaction exceptions
class WatchError(RedisError):
"""Watched key was modified during transaction"""
pass
class InvalidPipelineStack(RedisError):
"""Invalid pipeline stack state"""
pass
# Pub/Sub exceptions
class PubSubError(RedisError):
"""Publish/Subscribe operation error"""
pass
# Cluster exceptions
class RedisClusterException(RedisError):
"""Base exception for Redis Cluster errors"""
pass
class ClusterDownError(RedisClusterException, ConnectionError):
"""Redis Cluster is down or unreachable"""
pass
class ClusterError(RedisClusterException):
"""Generic Redis Cluster error"""
pass
class ClusterCrossSlotError(RedisClusterException):
"""Cross-slot operation in Redis Cluster"""
pass
class CrossSlotTransactionError(RedisClusterException):
"""Cross-slot transaction error"""
pass
class MasterNotFoundError(RedisClusterException):
"""No master found for given key"""
pass
class SlaveNotFoundError(RedisClusterException):
"""No slave found for given key"""
pass
class ClusterTransactionError(RedisClusterException):
"""Transaction error in cluster mode"""
pass
class AskError(RedisClusterException):
"""ASK redirection in Redis Cluster"""
def __init__(self, resp):
self.slot_id, self.node_addr = resp.split(' ')
self.slot_id = int(self.slot_id)
super(AskError, self).__init__(
f"ASK {self.slot_id} {self.node_addr}"
)
class MovedError(RedisClusterException):
"""MOVED redirection in Redis Cluster"""
def __init__(self, resp):
self.slot_id, self.node_addr = resp.split(' ')
self.slot_id = int(self.slot_id)
super(MovedError, self).__init__(
f"MOVED {self.slot_id} {self.node_addr}"
)
class TryAgainError(RedisClusterException):
"""TRYAGAIN error in Redis Cluster"""
passimport redis
from redis.exceptions import (
ConnectionError,
TimeoutError,
ResponseError,
AuthenticationError
)
def safe_redis_operation():
"""Basic error handling for Redis operations"""
try:
r = redis.Redis(
host='localhost',
port=6379,
socket_timeout=5,
socket_connect_timeout=10,
retry_on_timeout=True
)
# Test connection
r.ping()
# Perform operations
r.set('test_key', 'test_value')
value = r.get('test_key')
print(f"Success: {value}")
except ConnectionError as e:
print(f"Connection failed: {e}")
# Handle connection issues (retry, fallback, etc.)
except TimeoutError as e:
print(f"Operation timed out: {e}")
# Handle timeouts (retry with backoff, etc.)
except AuthenticationError as e:
print(f"Authentication failed: {e}")
# Handle auth issues (check credentials, etc.)
except ResponseError as e:
print(f"Redis responded with error: {e}")
# Handle Redis command errors
except Exception as e:
print(f"Unexpected error: {e}")
# Handle any other errors
safe_redis_operation()import redis
import time
import random
from redis.exceptions import ConnectionError, TimeoutError
class RedisRetryClient:
def __init__(self, max_retries=3, backoff_factor=1.0, **redis_kwargs):
self.max_retries = max_retries
self.backoff_factor = backoff_factor
self.redis_kwargs = redis_kwargs
self.client = None
def _get_client(self):
"""Get Redis client with connection retry"""
if self.client is None:
self.client = redis.Redis(**self.redis_kwargs)
return self.client
def _retry_operation(self, operation, *args, **kwargs):
"""Execute operation with retry logic"""
last_error = None
for attempt in range(self.max_retries + 1):
try:
client = self._get_client()
return operation(client, *args, **kwargs)
except (ConnectionError, TimeoutError) as e:
last_error = e
self.client = None # Reset client for next attempt
if attempt < self.max_retries:
# Exponential backoff with jitter
delay = self.backoff_factor * (2 ** attempt)
jitter = random.uniform(0, 0.1) * delay
total_delay = delay + jitter
print(f"Attempt {attempt + 1} failed: {e}")
print(f"Retrying in {total_delay:.2f} seconds...")
time.sleep(total_delay)
else:
print(f"All {self.max_retries + 1} attempts failed")
raise last_error
def set(self, key, value, **kwargs):
"""Set with retry"""
return self._retry_operation(lambda client, k, v, **kw: client.set(k, v, **kw), key, value, **kwargs)
def get(self, key):
"""Get with retry"""
return self._retry_operation(lambda client, k: client.get(k), key)
def ping(self):
"""Ping with retry"""
return self._retry_operation(lambda client: client.ping())
# Usage example
retry_client = RedisRetryClient(
max_retries=3,
backoff_factor=0.5,
host='localhost',
port=6379,
socket_timeout=2,
socket_connect_timeout=5
)
try:
retry_client.ping()
retry_client.set('retry_test', 'success')
value = retry_client.get('retry_test')
print(f"Retrieved: {value}")
except Exception as e:
print(f"Operation failed after retries: {e}")import redis
from redis.exceptions import WatchError, ResponseError
def safe_transaction_increment(key, increment_by=1):
"""Safely increment a counter with transaction error handling"""
r = redis.Redis(host='localhost', port=6379, db=0)
max_attempts = 10
for attempt in range(max_attempts):
try:
pipe = r.pipeline()
pipe.watch(key)
# Get current value
current_value = r.get(key)
current_value = int(current_value) if current_value else 0
# Start transaction
pipe.multi()
pipe.set(key, current_value + increment_by)
# Execute transaction
result = pipe.execute()
new_value = current_value + increment_by
print(f"Successfully incremented {key} to {new_value}")
return new_value
except WatchError:
print(f"Transaction attempt {attempt + 1}: Key '{key}' was modified, retrying...")
if attempt == max_attempts - 1:
raise Exception(f"Failed to increment after {max_attempts} attempts")
time.sleep(0.01 * (2 ** attempt)) # Exponential backoff
except ResponseError as e:
print(f"Redis command error: {e}")
raise
except Exception as e:
print(f"Unexpected error during transaction: {e}")
raise
# Initialize counter
r = redis.Redis(host='localhost', port=6379, db=0)
r.set('safe_counter', 0)
# Test concurrent increments
import threading
def worker(worker_id):
try:
for i in range(5):
safe_transaction_increment('safe_counter')
print(f"Worker {worker_id}: Increment {i+1} completed")
except Exception as e:
print(f"Worker {worker_id} failed: {e}")
# Run multiple workers
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
final_value = r.get('safe_counter')
print(f"Final counter value: {final_value}")import redis
from redis.cluster import RedisCluster, ClusterNode
from redis.exceptions import (
RedisClusterException,
ClusterDownError,
MovedError,
AskError,
ClusterCrossSlotError
)
def safe_cluster_operations():
"""Handle Redis Cluster specific errors"""
startup_nodes = [
ClusterNode("localhost", 7000),
ClusterNode("localhost", 7001),
ClusterNode("localhost", 7002)
]
try:
cluster = RedisCluster(
startup_nodes=startup_nodes,
decode_responses=True,
skip_full_coverage_check=True,
cluster_error_retry_attempts=3
)
# Basic operations
cluster.set("user:1001", "John")
user = cluster.get("user:1001")
print(f"User: {user}")
# Multi-key operation (potential cross-slot error)
try:
keys = ["user:1001", "user:1002", "user:1003"]
values = cluster.mget(keys)
print(f"Multi-get success: {values}")
except ClusterCrossSlotError as e:
print(f"Cross-slot operation error: {e}")
# Handle by getting keys individually
values = []
for key in keys:
try:
value = cluster.get(key)
values.append(value)
except Exception as key_error:
print(f"Error getting {key}: {key_error}")
values.append(None)
print(f"Individual gets: {values}")
except ClusterDownError as e:
print(f"Cluster is down: {e}")
# Implement fallback or circuit breaker
except MovedError as e:
print(f"Slot moved: {e}")
print(f"New location: slot {e.slot_id} at {e.node_addr}")
# Client should automatically handle this
except AskError as e:
print(f"ASK redirection: {e}")
print(f"Temporary redirection: slot {e.slot_id} at {e.node_addr}")
# Client should automatically handle this
except RedisClusterException as e:
print(f"Cluster error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
safe_cluster_operations()import redis
import time
import threading
from redis.exceptions import PubSubError, ConnectionError
class RobustSubscriber:
def __init__(self, channels, **redis_kwargs):
self.channels = channels
self.redis_kwargs = redis_kwargs
self.running = False
self.reconnect_attempts = 0
self.max_reconnect_attempts = 5
def start(self):
"""Start subscriber with automatic reconnection"""
self.running = True
while self.running and self.reconnect_attempts < self.max_reconnect_attempts:
try:
self._subscribe_loop()
break # Normal exit
except (ConnectionError, PubSubError) as e:
self.reconnect_attempts += 1
print(f"Subscription error (attempt {self.reconnect_attempts}): {e}")
if self.reconnect_attempts < self.max_reconnect_attempts:
wait_time = min(2 ** self.reconnect_attempts, 30)
print(f"Reconnecting in {wait_time} seconds...")
time.sleep(wait_time)
else:
print("Max reconnection attempts reached")
raise
except Exception as e:
print(f"Unexpected subscriber error: {e}")
break
def _subscribe_loop(self):
"""Main subscription loop"""
try:
r = redis.Redis(**self.redis_kwargs)
pubsub = r.pubsub()
# Subscribe to channels
pubsub.subscribe(*self.channels)
print(f"Subscribed to channels: {self.channels}")
# Reset reconnect counter on successful connection
self.reconnect_attempts = 0
# Listen for messages
for message in pubsub.listen():
if not self.running:
break
try:
self._handle_message(message)
except Exception as e:
print(f"Error handling message: {e}")
# Continue listening despite handler errors
except Exception as e:
print(f"Subscription loop error: {e}")
raise
finally:
try:
pubsub.close()
except:
pass
def _handle_message(self, message):
"""Handle received message"""
if message['type'] == 'message':
channel = message['channel']
data = message['data']
print(f"Received on {channel}: {data}")
elif message['type'] == 'subscribe':
print(f"Confirmed subscription to: {message['channel']}")
def stop(self):
"""Stop the subscriber"""
self.running = False
# Usage example
def test_robust_subscriber():
subscriber = RobustSubscriber(
channels=['events', 'notifications'],
host='localhost',
port=6379,
socket_timeout=5,
retry_on_timeout=True
)
# Start subscriber in thread
subscriber_thread = threading.Thread(target=subscriber.start)
subscriber_thread.daemon = True
subscriber_thread.start()
# Simulate some publishing
time.sleep(1)
try:
publisher = redis.Redis(host='localhost', port=6379)
for i in range(10):
publisher.publish('events', f'Event {i}')
time.sleep(0.5)
except Exception as e:
print(f"Publishing error: {e}")
# Stop subscriber
subscriber.stop()
subscriber_thread.join(timeout=5)
test_robust_subscriber()import redis
from redis.exceptions import DataError, ResponseError
import json
class ValidatedRedisClient:
def __init__(self, **redis_kwargs):
self.client = redis.Redis(**redis_kwargs)
def set_json(self, key, data, **kwargs):
"""Set JSON data with validation"""
try:
# Validate data is JSON serializable
json_data = json.dumps(data)
return self.client.set(key, json_data, **kwargs)
except (TypeError, ValueError) as e:
raise DataError(f"Invalid JSON data: {e}")
except ResponseError as e:
raise ResponseError(f"Redis error setting JSON: {e}")
def get_json(self, key):
"""Get JSON data with validation"""
try:
raw_data = self.client.get(key)
if raw_data is None:
return None
return json.loads(raw_data)
except json.JSONDecodeError as e:
raise DataError(f"Invalid JSON in Redis key '{key}': {e}")
except ResponseError as e:
raise ResponseError(f"Redis error getting JSON: {e}")
def safe_incr(self, key, amount=1):
"""Increment with type validation"""
try:
return self.client.incr(key, amount)
except ResponseError as e:
if "not an integer" in str(e).lower():
raise DataError(f"Key '{key}' contains non-integer value")
raise
def safe_lpush(self, key, *values):
"""List push with type validation"""
try:
# Filter out None values
valid_values = [v for v in values if v is not None]
if not valid_values:
raise DataError("No valid values provided for list push")
return self.client.lpush(key, *valid_values)
except ResponseError as e:
if "wrong kind" in str(e).lower():
raise DataError(f"Key '{key}' is not a list")
raise
# Usage example
def test_validated_client():
client = ValidatedRedisClient(host='localhost', port=6379, decode_responses=True)
# Test JSON operations
try:
test_data = {'name': 'John', 'age': 30, 'active': True}
client.set_json('user:1001', test_data)
retrieved_data = client.get_json('user:1001')
print(f"JSON data: {retrieved_data}")
# Test invalid JSON
client.client.set('invalid_json', 'not json data')
invalid_data = client.get_json('invalid_json')
except DataError as e:
print(f"Data validation error: {e}")
# Test safe increment
try:
client.client.set('counter', 10)
result = client.safe_incr('counter', 5)
print(f"Incremented to: {result}")
# Try to increment non-integer
client.client.set('text_key', 'hello')
client.safe_incr('text_key')
except DataError as e:
print(f"Increment validation error: {e}")
# Test safe list operations
try:
client.safe_lpush('my_list', 'item1', 'item2', None, 'item3')
# Try to push to non-list
client.client.set('string_key', 'value')
client.safe_lpush('string_key', 'item')
except DataError as e:
print(f"List operation error: {e}")
test_validated_client()import redis
import logging
from redis.exceptions import RedisError, ConnectionError, TimeoutError
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('redis_client')
class LoggingRedisClient:
def __init__(self, **redis_kwargs):
self.client = redis.Redis(**redis_kwargs)
self.operation_count = 0
self.error_count = 0
def _log_operation(self, operation, key=None, success=True, error=None):
"""Log Redis operations and errors"""
self.operation_count += 1
if success:
logger.info(f"Operation {self.operation_count}: {operation} {'on ' + str(key) if key else ''} - SUCCESS")
else:
self.error_count += 1
logger.error(f"Operation {self.operation_count}: {operation} {'on ' + str(key) if key else ''} - ERROR: {error}")
def safe_execute(self, operation_name, func, *args, **kwargs):
"""Execute Redis operation with comprehensive error logging"""
try:
result = func(*args, **kwargs)
self._log_operation(operation_name, args[0] if args else None, success=True)
return result
except ConnectionError as e:
self._log_operation(operation_name, args[0] if args else None, success=False, error=f"Connection error: {e}")
raise
except TimeoutError as e:
self._log_operation(operation_name, args[0] if args else None, success=False, error=f"Timeout: {e}")
raise
except RedisError as e:
self._log_operation(operation_name, args[0] if args else None, success=False, error=f"Redis error: {e}")
raise
except Exception as e:
self._log_operation(operation_name, args[0] if args else None, success=False, error=f"Unexpected error: {e}")
raise
def set(self, key, value, **kwargs):
return self.safe_execute('SET', self.client.set, key, value, **kwargs)
def get(self, key):
return self.safe_execute('GET', self.client.get, key)
def delete(self, *keys):
return self.safe_execute('DELETE', self.client.delete, *keys)
def ping(self):
return self.safe_execute('PING', self.client.ping)
def get_stats(self):
"""Get operation statistics"""
success_rate = ((self.operation_count - self.error_count) / self.operation_count * 100) if self.operation_count > 0 else 0
return {
'total_operations': self.operation_count,
'errors': self.error_count,
'success_rate': f"{success_rate:.2f}%"
}
# Usage example
def test_logging_client():
client = LoggingRedisClient(host='localhost', port=6379, socket_timeout=1)
try:
# Successful operations
client.ping()
client.set('test_key', 'test_value')
value = client.get('test_key')
print(f"Retrieved: {value}")
# This might cause an error if Redis is not running
client.set('another_key', 'another_value')
client.delete('test_key', 'another_key')
except Exception as e:
logger.error(f"Client test failed: {e}")
finally:
stats = client.get_stats()
logger.info(f"Operation statistics: {stats}")
test_logging_client()Install with Tessl CLI
npx tessl i tessl/pypi-redis