0
# Error Handling
1
2
Redis Python client provides comprehensive exception classes for handling different types of errors that can occur during Redis operations. Proper error handling ensures robust applications that can gracefully handle connection issues, data errors, and Redis server states.
3
4
## Capabilities
5
6
### Exception Hierarchy
7
8
Redis exception classes organized by error type for precise error handling.
9
10
```python { .api }
11
class RedisError(Exception):
12
"""Base exception class for all Redis errors"""
13
pass
14
15
# Connection-related exceptions
16
class ConnectionError(RedisError):
17
"""Connection to Redis server failed"""
18
pass
19
20
class TimeoutError(RedisError):
21
"""Operation timed out"""
22
pass
23
24
class BusyLoadingError(ConnectionError):
25
"""Redis server is busy loading data from disk"""
26
pass
27
28
class MaxConnectionsError(ConnectionError):
29
"""Maximum connections exceeded in connection pool"""
30
pass
31
32
class ChildDeadlockedError(Exception):
33
"""Child process deadlocked"""
34
pass
35
36
# Authentication and authorization exceptions
37
class AuthenticationError(ConnectionError):
38
"""Authentication with Redis server failed"""
39
pass
40
41
class AuthenticationWrongNumberOfArgsError(AuthenticationError):
42
"""Wrong number of arguments for AUTH command"""
43
pass
44
45
# Data and response exceptions
46
class ResponseError(RedisError):
47
"""Invalid response from Redis server"""
48
pass
49
50
class DataError(RedisError):
51
"""Invalid data type or value"""
52
pass
53
54
class InvalidResponse(RedisError):
55
"""Invalid response format from Redis"""
56
pass
57
58
class OutOfMemoryError(RedisError):
59
"""Redis server is out of memory"""
60
pass
61
62
class ReadOnlyError(RedisError):
63
"""Attempted write operation on read-only connection"""
64
pass
65
66
# Transaction exceptions
67
class WatchError(RedisError):
68
"""Watched key was modified during transaction"""
69
pass
70
71
class InvalidPipelineStack(RedisError):
72
"""Invalid pipeline stack state"""
73
pass
74
75
# Pub/Sub exceptions
76
class PubSubError(RedisError):
77
"""Publish/Subscribe operation error"""
78
pass
79
80
# Cluster exceptions
81
class RedisClusterException(RedisError):
82
"""Base exception for Redis Cluster errors"""
83
pass
84
85
class ClusterDownError(RedisClusterException, ConnectionError):
86
"""Redis Cluster is down or unreachable"""
87
pass
88
89
class ClusterError(RedisClusterException):
90
"""Generic Redis Cluster error"""
91
pass
92
93
class ClusterCrossSlotError(RedisClusterException):
94
"""Cross-slot operation in Redis Cluster"""
95
pass
96
97
class CrossSlotTransactionError(RedisClusterException):
98
"""Cross-slot transaction error"""
99
pass
100
101
class MasterNotFoundError(RedisClusterException):
102
"""No master found for given key"""
103
pass
104
105
class SlaveNotFoundError(RedisClusterException):
106
"""No slave found for given key"""
107
pass
108
109
class ClusterTransactionError(RedisClusterException):
110
"""Transaction error in cluster mode"""
111
pass
112
113
class AskError(RedisClusterException):
114
"""ASK redirection in Redis Cluster"""
115
def __init__(self, resp):
116
self.slot_id, self.node_addr = resp.split(' ')
117
self.slot_id = int(self.slot_id)
118
super(AskError, self).__init__(
119
f"ASK {self.slot_id} {self.node_addr}"
120
)
121
122
class MovedError(RedisClusterException):
123
"""MOVED redirection in Redis Cluster"""
124
def __init__(self, resp):
125
self.slot_id, self.node_addr = resp.split(' ')
126
self.slot_id = int(self.slot_id)
127
super(MovedError, self).__init__(
128
f"MOVED {self.slot_id} {self.node_addr}"
129
)
130
131
class TryAgainError(RedisClusterException):
132
"""TRYAGAIN error in Redis Cluster"""
133
pass
134
```
135
136
## Usage Examples
137
138
### Basic Error Handling
139
140
```python
141
import redis
142
from redis.exceptions import (
143
ConnectionError,
144
TimeoutError,
145
ResponseError,
146
AuthenticationError
147
)
148
149
def safe_redis_operation():
150
"""Basic error handling for Redis operations"""
151
try:
152
r = redis.Redis(
153
host='localhost',
154
port=6379,
155
socket_timeout=5,
156
socket_connect_timeout=10,
157
retry_on_timeout=True
158
)
159
160
# Test connection
161
r.ping()
162
163
# Perform operations
164
r.set('test_key', 'test_value')
165
value = r.get('test_key')
166
print(f"Success: {value}")
167
168
except ConnectionError as e:
169
print(f"Connection failed: {e}")
170
# Handle connection issues (retry, fallback, etc.)
171
172
except TimeoutError as e:
173
print(f"Operation timed out: {e}")
174
# Handle timeouts (retry with backoff, etc.)
175
176
except AuthenticationError as e:
177
print(f"Authentication failed: {e}")
178
# Handle auth issues (check credentials, etc.)
179
180
except ResponseError as e:
181
print(f"Redis responded with error: {e}")
182
# Handle Redis command errors
183
184
except Exception as e:
185
print(f"Unexpected error: {e}")
186
# Handle any other errors
187
188
safe_redis_operation()
189
```
190
191
### Connection Retry Logic
192
193
```python
194
import redis
195
import time
196
import random
197
from redis.exceptions import ConnectionError, TimeoutError
198
199
class RedisRetryClient:
200
def __init__(self, max_retries=3, backoff_factor=1.0, **redis_kwargs):
201
self.max_retries = max_retries
202
self.backoff_factor = backoff_factor
203
self.redis_kwargs = redis_kwargs
204
self.client = None
205
206
def _get_client(self):
207
"""Get Redis client with connection retry"""
208
if self.client is None:
209
self.client = redis.Redis(**self.redis_kwargs)
210
return self.client
211
212
def _retry_operation(self, operation, *args, **kwargs):
213
"""Execute operation with retry logic"""
214
last_error = None
215
216
for attempt in range(self.max_retries + 1):
217
try:
218
client = self._get_client()
219
return operation(client, *args, **kwargs)
220
221
except (ConnectionError, TimeoutError) as e:
222
last_error = e
223
self.client = None # Reset client for next attempt
224
225
if attempt < self.max_retries:
226
# Exponential backoff with jitter
227
delay = self.backoff_factor * (2 ** attempt)
228
jitter = random.uniform(0, 0.1) * delay
229
total_delay = delay + jitter
230
231
print(f"Attempt {attempt + 1} failed: {e}")
232
print(f"Retrying in {total_delay:.2f} seconds...")
233
time.sleep(total_delay)
234
else:
235
print(f"All {self.max_retries + 1} attempts failed")
236
237
raise last_error
238
239
def set(self, key, value, **kwargs):
240
"""Set with retry"""
241
return self._retry_operation(lambda client, k, v, **kw: client.set(k, v, **kw), key, value, **kwargs)
242
243
def get(self, key):
244
"""Get with retry"""
245
return self._retry_operation(lambda client, k: client.get(k), key)
246
247
def ping(self):
248
"""Ping with retry"""
249
return self._retry_operation(lambda client: client.ping())
250
251
# Usage example
252
retry_client = RedisRetryClient(
253
max_retries=3,
254
backoff_factor=0.5,
255
host='localhost',
256
port=6379,
257
socket_timeout=2,
258
socket_connect_timeout=5
259
)
260
261
try:
262
retry_client.ping()
263
retry_client.set('retry_test', 'success')
264
value = retry_client.get('retry_test')
265
print(f"Retrieved: {value}")
266
except Exception as e:
267
print(f"Operation failed after retries: {e}")
268
```
269
270
### Transaction Error Handling
271
272
```python
273
import redis
274
from redis.exceptions import WatchError, ResponseError
275
276
def safe_transaction_increment(key, increment_by=1):
277
"""Safely increment a counter with transaction error handling"""
278
r = redis.Redis(host='localhost', port=6379, db=0)
279
280
max_attempts = 10
281
282
for attempt in range(max_attempts):
283
try:
284
pipe = r.pipeline()
285
pipe.watch(key)
286
287
# Get current value
288
current_value = r.get(key)
289
current_value = int(current_value) if current_value else 0
290
291
# Start transaction
292
pipe.multi()
293
pipe.set(key, current_value + increment_by)
294
295
# Execute transaction
296
result = pipe.execute()
297
298
new_value = current_value + increment_by
299
print(f"Successfully incremented {key} to {new_value}")
300
return new_value
301
302
except WatchError:
303
print(f"Transaction attempt {attempt + 1}: Key '{key}' was modified, retrying...")
304
if attempt == max_attempts - 1:
305
raise Exception(f"Failed to increment after {max_attempts} attempts")
306
time.sleep(0.01 * (2 ** attempt)) # Exponential backoff
307
308
except ResponseError as e:
309
print(f"Redis command error: {e}")
310
raise
311
312
except Exception as e:
313
print(f"Unexpected error during transaction: {e}")
314
raise
315
316
# Initialize counter
317
r = redis.Redis(host='localhost', port=6379, db=0)
318
r.set('safe_counter', 0)
319
320
# Test concurrent increments
321
import threading
322
323
def worker(worker_id):
324
try:
325
for i in range(5):
326
safe_transaction_increment('safe_counter')
327
print(f"Worker {worker_id}: Increment {i+1} completed")
328
except Exception as e:
329
print(f"Worker {worker_id} failed: {e}")
330
331
# Run multiple workers
332
threads = []
333
for i in range(3):
334
t = threading.Thread(target=worker, args=(i,))
335
threads.append(t)
336
t.start()
337
338
for t in threads:
339
t.join()
340
341
final_value = r.get('safe_counter')
342
print(f"Final counter value: {final_value}")
343
```
344
345
### Cluster Error Handling
346
347
```python
348
import redis
349
from redis.cluster import RedisCluster, ClusterNode
350
from redis.exceptions import (
351
RedisClusterException,
352
ClusterDownError,
353
MovedError,
354
AskError,
355
ClusterCrossSlotError
356
)
357
358
def safe_cluster_operations():
359
"""Handle Redis Cluster specific errors"""
360
startup_nodes = [
361
ClusterNode("localhost", 7000),
362
ClusterNode("localhost", 7001),
363
ClusterNode("localhost", 7002)
364
]
365
366
try:
367
cluster = RedisCluster(
368
startup_nodes=startup_nodes,
369
decode_responses=True,
370
skip_full_coverage_check=True,
371
cluster_error_retry_attempts=3
372
)
373
374
# Basic operations
375
cluster.set("user:1001", "John")
376
user = cluster.get("user:1001")
377
print(f"User: {user}")
378
379
# Multi-key operation (potential cross-slot error)
380
try:
381
keys = ["user:1001", "user:1002", "user:1003"]
382
values = cluster.mget(keys)
383
print(f"Multi-get success: {values}")
384
except ClusterCrossSlotError as e:
385
print(f"Cross-slot operation error: {e}")
386
# Handle by getting keys individually
387
values = []
388
for key in keys:
389
try:
390
value = cluster.get(key)
391
values.append(value)
392
except Exception as key_error:
393
print(f"Error getting {key}: {key_error}")
394
values.append(None)
395
print(f"Individual gets: {values}")
396
397
except ClusterDownError as e:
398
print(f"Cluster is down: {e}")
399
# Implement fallback or circuit breaker
400
401
except MovedError as e:
402
print(f"Slot moved: {e}")
403
print(f"New location: slot {e.slot_id} at {e.node_addr}")
404
# Client should automatically handle this
405
406
except AskError as e:
407
print(f"ASK redirection: {e}")
408
print(f"Temporary redirection: slot {e.slot_id} at {e.node_addr}")
409
# Client should automatically handle this
410
411
except RedisClusterException as e:
412
print(f"Cluster error: {e}")
413
414
except Exception as e:
415
print(f"Unexpected error: {e}")
416
417
safe_cluster_operations()
418
```
419
420
### Pub/Sub Error Handling
421
422
```python
423
import redis
424
import time
425
import threading
426
from redis.exceptions import PubSubError, ConnectionError
427
428
class RobustSubscriber:
429
def __init__(self, channels, **redis_kwargs):
430
self.channels = channels
431
self.redis_kwargs = redis_kwargs
432
self.running = False
433
self.reconnect_attempts = 0
434
self.max_reconnect_attempts = 5
435
436
def start(self):
437
"""Start subscriber with automatic reconnection"""
438
self.running = True
439
440
while self.running and self.reconnect_attempts < self.max_reconnect_attempts:
441
try:
442
self._subscribe_loop()
443
break # Normal exit
444
445
except (ConnectionError, PubSubError) as e:
446
self.reconnect_attempts += 1
447
print(f"Subscription error (attempt {self.reconnect_attempts}): {e}")
448
449
if self.reconnect_attempts < self.max_reconnect_attempts:
450
wait_time = min(2 ** self.reconnect_attempts, 30)
451
print(f"Reconnecting in {wait_time} seconds...")
452
time.sleep(wait_time)
453
else:
454
print("Max reconnection attempts reached")
455
raise
456
457
except Exception as e:
458
print(f"Unexpected subscriber error: {e}")
459
break
460
461
def _subscribe_loop(self):
462
"""Main subscription loop"""
463
try:
464
r = redis.Redis(**self.redis_kwargs)
465
pubsub = r.pubsub()
466
467
# Subscribe to channels
468
pubsub.subscribe(*self.channels)
469
print(f"Subscribed to channels: {self.channels}")
470
471
# Reset reconnect counter on successful connection
472
self.reconnect_attempts = 0
473
474
# Listen for messages
475
for message in pubsub.listen():
476
if not self.running:
477
break
478
479
try:
480
self._handle_message(message)
481
except Exception as e:
482
print(f"Error handling message: {e}")
483
# Continue listening despite handler errors
484
485
except Exception as e:
486
print(f"Subscription loop error: {e}")
487
raise
488
finally:
489
try:
490
pubsub.close()
491
except:
492
pass
493
494
def _handle_message(self, message):
495
"""Handle received message"""
496
if message['type'] == 'message':
497
channel = message['channel']
498
data = message['data']
499
print(f"Received on {channel}: {data}")
500
elif message['type'] == 'subscribe':
501
print(f"Confirmed subscription to: {message['channel']}")
502
503
def stop(self):
504
"""Stop the subscriber"""
505
self.running = False
506
507
# Usage example
508
def test_robust_subscriber():
509
subscriber = RobustSubscriber(
510
channels=['events', 'notifications'],
511
host='localhost',
512
port=6379,
513
socket_timeout=5,
514
retry_on_timeout=True
515
)
516
517
# Start subscriber in thread
518
subscriber_thread = threading.Thread(target=subscriber.start)
519
subscriber_thread.daemon = True
520
subscriber_thread.start()
521
522
# Simulate some publishing
523
time.sleep(1)
524
try:
525
publisher = redis.Redis(host='localhost', port=6379)
526
for i in range(10):
527
publisher.publish('events', f'Event {i}')
528
time.sleep(0.5)
529
except Exception as e:
530
print(f"Publishing error: {e}")
531
532
# Stop subscriber
533
subscriber.stop()
534
subscriber_thread.join(timeout=5)
535
536
test_robust_subscriber()
537
```
538
539
### Data Validation and Error Prevention
540
541
```python
542
import redis
543
from redis.exceptions import DataError, ResponseError
544
import json
545
546
class ValidatedRedisClient:
547
def __init__(self, **redis_kwargs):
548
self.client = redis.Redis(**redis_kwargs)
549
550
def set_json(self, key, data, **kwargs):
551
"""Set JSON data with validation"""
552
try:
553
# Validate data is JSON serializable
554
json_data = json.dumps(data)
555
return self.client.set(key, json_data, **kwargs)
556
except (TypeError, ValueError) as e:
557
raise DataError(f"Invalid JSON data: {e}")
558
except ResponseError as e:
559
raise ResponseError(f"Redis error setting JSON: {e}")
560
561
def get_json(self, key):
562
"""Get JSON data with validation"""
563
try:
564
raw_data = self.client.get(key)
565
if raw_data is None:
566
return None
567
return json.loads(raw_data)
568
except json.JSONDecodeError as e:
569
raise DataError(f"Invalid JSON in Redis key '{key}': {e}")
570
except ResponseError as e:
571
raise ResponseError(f"Redis error getting JSON: {e}")
572
573
def safe_incr(self, key, amount=1):
574
"""Increment with type validation"""
575
try:
576
return self.client.incr(key, amount)
577
except ResponseError as e:
578
if "not an integer" in str(e).lower():
579
raise DataError(f"Key '{key}' contains non-integer value")
580
raise
581
582
def safe_lpush(self, key, *values):
583
"""List push with type validation"""
584
try:
585
# Filter out None values
586
valid_values = [v for v in values if v is not None]
587
if not valid_values:
588
raise DataError("No valid values provided for list push")
589
return self.client.lpush(key, *valid_values)
590
except ResponseError as e:
591
if "wrong kind" in str(e).lower():
592
raise DataError(f"Key '{key}' is not a list")
593
raise
594
595
# Usage example
596
def test_validated_client():
597
client = ValidatedRedisClient(host='localhost', port=6379, decode_responses=True)
598
599
# Test JSON operations
600
try:
601
test_data = {'name': 'John', 'age': 30, 'active': True}
602
client.set_json('user:1001', test_data)
603
604
retrieved_data = client.get_json('user:1001')
605
print(f"JSON data: {retrieved_data}")
606
607
# Test invalid JSON
608
client.client.set('invalid_json', 'not json data')
609
invalid_data = client.get_json('invalid_json')
610
611
except DataError as e:
612
print(f"Data validation error: {e}")
613
614
# Test safe increment
615
try:
616
client.client.set('counter', 10)
617
result = client.safe_incr('counter', 5)
618
print(f"Incremented to: {result}")
619
620
# Try to increment non-integer
621
client.client.set('text_key', 'hello')
622
client.safe_incr('text_key')
623
624
except DataError as e:
625
print(f"Increment validation error: {e}")
626
627
# Test safe list operations
628
try:
629
client.safe_lpush('my_list', 'item1', 'item2', None, 'item3')
630
631
# Try to push to non-list
632
client.client.set('string_key', 'value')
633
client.safe_lpush('string_key', 'item')
634
635
except DataError as e:
636
print(f"List operation error: {e}")
637
638
test_validated_client()
639
```
640
641
### Comprehensive Error Logging
642
643
```python
644
import redis
645
import logging
646
from redis.exceptions import RedisError, ConnectionError, TimeoutError
647
648
# Configure logging
649
logging.basicConfig(
650
level=logging.INFO,
651
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
652
)
653
logger = logging.getLogger('redis_client')
654
655
class LoggingRedisClient:
656
def __init__(self, **redis_kwargs):
657
self.client = redis.Redis(**redis_kwargs)
658
self.operation_count = 0
659
self.error_count = 0
660
661
def _log_operation(self, operation, key=None, success=True, error=None):
662
"""Log Redis operations and errors"""
663
self.operation_count += 1
664
665
if success:
666
logger.info(f"Operation {self.operation_count}: {operation} {'on ' + str(key) if key else ''} - SUCCESS")
667
else:
668
self.error_count += 1
669
logger.error(f"Operation {self.operation_count}: {operation} {'on ' + str(key) if key else ''} - ERROR: {error}")
670
671
def safe_execute(self, operation_name, func, *args, **kwargs):
672
"""Execute Redis operation with comprehensive error logging"""
673
try:
674
result = func(*args, **kwargs)
675
self._log_operation(operation_name, args[0] if args else None, success=True)
676
return result
677
678
except ConnectionError as e:
679
self._log_operation(operation_name, args[0] if args else None, success=False, error=f"Connection error: {e}")
680
raise
681
682
except TimeoutError as e:
683
self._log_operation(operation_name, args[0] if args else None, success=False, error=f"Timeout: {e}")
684
raise
685
686
except RedisError as e:
687
self._log_operation(operation_name, args[0] if args else None, success=False, error=f"Redis error: {e}")
688
raise
689
690
except Exception as e:
691
self._log_operation(operation_name, args[0] if args else None, success=False, error=f"Unexpected error: {e}")
692
raise
693
694
def set(self, key, value, **kwargs):
695
return self.safe_execute('SET', self.client.set, key, value, **kwargs)
696
697
def get(self, key):
698
return self.safe_execute('GET', self.client.get, key)
699
700
def delete(self, *keys):
701
return self.safe_execute('DELETE', self.client.delete, *keys)
702
703
def ping(self):
704
return self.safe_execute('PING', self.client.ping)
705
706
def get_stats(self):
707
"""Get operation statistics"""
708
success_rate = ((self.operation_count - self.error_count) / self.operation_count * 100) if self.operation_count > 0 else 0
709
return {
710
'total_operations': self.operation_count,
711
'errors': self.error_count,
712
'success_rate': f"{success_rate:.2f}%"
713
}
714
715
# Usage example
716
def test_logging_client():
717
client = LoggingRedisClient(host='localhost', port=6379, socket_timeout=1)
718
719
try:
720
# Successful operations
721
client.ping()
722
client.set('test_key', 'test_value')
723
value = client.get('test_key')
724
print(f"Retrieved: {value}")
725
726
# This might cause an error if Redis is not running
727
client.set('another_key', 'another_value')
728
client.delete('test_key', 'another_key')
729
730
except Exception as e:
731
logger.error(f"Client test failed: {e}")
732
733
finally:
734
stats = client.get_stats()
735
logger.info(f"Operation statistics: {stats}")
736
737
test_logging_client()
738
```