0
# Connection Management
1
2
Connection and pooling functionality for managing Redis connections efficiently. This includes URL-based configuration, SSL connections, Unix domain sockets, connection pooling strategies, and connection lifecycle management for optimal performance and reliability.
3
4
## Capabilities
5
6
### Redis Client Creation
7
8
Multiple ways to create Redis client instances with flexible configuration options.
9
10
```python { .api }
11
class Redis:
12
"""Main Redis client class with async/await support."""
13
14
def __init__(
15
self,
16
*,
17
host: str = "localhost",
18
port: int = 6379,
19
db: Union[str, int] = 0,
20
password: Optional[str] = None,
21
socket_timeout: Optional[float] = None,
22
socket_connect_timeout: Optional[float] = None,
23
socket_keepalive: Optional[bool] = None,
24
socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
25
connection_pool: Optional[ConnectionPool] = None,
26
unix_socket_path: Optional[str] = None,
27
encoding: str = "utf-8",
28
encoding_errors: str = "strict",
29
decode_responses: bool = False,
30
retry_on_timeout: bool = False,
31
ssl: bool = False,
32
ssl_keyfile: Optional[str] = None,
33
ssl_certfile: Optional[str] = None,
34
ssl_cert_reqs: str = "required",
35
ssl_ca_certs: Optional[str] = None,
36
ssl_check_hostname: bool = False,
37
max_connections: Optional[int] = None,
38
single_connection_client: bool = False,
39
health_check_interval: int = 0,
40
client_name: Optional[str] = None,
41
username: Optional[str] = None,
42
):
43
"""
44
Initialize Redis client.
45
46
Args:
47
host: Redis server hostname
48
port: Redis server port
49
db: Database number (0-15)
50
password: Authentication password
51
socket_timeout: Socket operation timeout in seconds
52
socket_connect_timeout: Socket connection timeout in seconds
53
socket_keepalive: Enable TCP keepalive (None uses system default)
54
socket_keepalive_options: TCP keepalive options
55
connection_pool: Custom connection pool
56
unix_socket_path: Unix domain socket path
57
encoding: String encoding for responses
58
encoding_errors: Encoding error handling
59
decode_responses: Decode byte responses to strings
60
retry_on_timeout: Retry commands on timeout
61
ssl: Enable SSL/TLS encryption
62
ssl_keyfile: SSL private key file path
63
ssl_certfile: SSL certificate file path
64
ssl_cert_reqs: SSL certificate requirements
65
ssl_ca_certs: SSL CA certificates file path
66
ssl_check_hostname: Verify SSL hostname
67
max_connections: Maximum pool connections
68
single_connection_client: Use single persistent connection
69
health_check_interval: Health check interval in seconds
70
client_name: Client identification name
71
username: Authentication username (Redis 6+)
72
"""
73
74
@classmethod
75
def from_url(cls, url: str, **kwargs) -> "Redis":
76
"""
77
Create Redis client from connection URL.
78
79
Args:
80
url: Redis connection URL (redis://[[username]:password@]host[:port][/database])
81
kwargs: Additional connection parameters
82
83
Returns:
84
Configured Redis client instance
85
"""
86
87
async def ping(self) -> bool:
88
"""
89
Test connection to Redis server.
90
91
Returns:
92
True if server responds to ping
93
"""
94
95
async def echo(self, value: str) -> str:
96
"""
97
Echo message back from server.
98
99
Args:
100
value: Message to echo
101
102
Returns:
103
Echoed message
104
"""
105
106
async def aclose(self) -> None:
107
"""Close all connections and cleanup resources."""
108
109
def close(self) -> None:
110
"""Synchronous connection cleanup (use aclose() in async context)."""
111
112
def from_url(url: str, **kwargs) -> Redis:
113
"""
114
Create Redis client from connection URL.
115
116
Args:
117
url: Redis connection URL
118
kwargs: Additional connection parameters
119
120
Returns:
121
Configured Redis client instance
122
"""
123
```
124
125
### Connection Pool Management
126
127
Connection pooling for efficient connection reuse and resource management.
128
129
```python { .api }
130
class ConnectionPool:
131
"""Connection pool for managing Redis connections."""
132
133
def __init__(
134
self,
135
connection_class: Type[Connection] = Connection,
136
max_connections: int = 50,
137
**connection_kwargs
138
):
139
"""
140
Initialize connection pool.
141
142
Args:
143
connection_class: Connection class to use
144
max_connections: Maximum number of connections
145
connection_kwargs: Arguments for connection creation
146
"""
147
148
@classmethod
149
def from_url(cls, url: str, **kwargs) -> "ConnectionPool":
150
"""
151
Create connection pool from URL.
152
153
Args:
154
url: Redis connection URL
155
kwargs: Additional pool parameters
156
157
Returns:
158
Configured connection pool
159
"""
160
161
async def get_connection(self, command_name: str, *keys, **options) -> Connection:
162
"""
163
Get connection from pool.
164
165
Args:
166
command_name: Redis command being executed
167
keys: Command keys (for sharding)
168
options: Command options
169
170
Returns:
171
Available connection instance
172
"""
173
174
def make_connection(self) -> Connection:
175
"""
176
Create new connection instance.
177
178
Returns:
179
New connection (not from pool)
180
"""
181
182
async def release(self, connection: Connection) -> None:
183
"""
184
Return connection to pool.
185
186
Args:
187
connection: Connection to return
188
"""
189
190
async def disconnect(self, inuse_connections: bool = True) -> None:
191
"""
192
Disconnect all pool connections.
193
194
Args:
195
inuse_connections: Whether to disconnect active connections
196
"""
197
198
def reset(self) -> None:
199
"""Reset pool state, clearing all connections."""
200
201
def get_encoder(self) -> Encoder:
202
"""
203
Get connection encoder instance.
204
205
Returns:
206
Encoder for data serialization
207
"""
208
209
def owns_connection(self, connection: Connection) -> bool:
210
"""
211
Check if pool owns connection.
212
213
Args:
214
connection: Connection to check
215
216
Returns:
217
True if connection belongs to this pool
218
"""
219
220
class BlockingConnectionPool(ConnectionPool):
221
"""Connection pool that blocks when max connections reached."""
222
223
def __init__(
224
self,
225
max_connections: int = 50,
226
timeout: float = 20,
227
**connection_kwargs
228
):
229
"""
230
Initialize blocking connection pool.
231
232
Args:
233
max_connections: Maximum number of connections
234
timeout: Timeout when waiting for available connection
235
connection_kwargs: Arguments for connection creation
236
"""
237
```
238
239
### Connection Types
240
241
Different connection implementations for various network configurations.
242
243
```python { .api }
244
class Connection:
245
"""Basic TCP connection to Redis server."""
246
247
def __init__(
248
self,
249
*,
250
host: str = "localhost",
251
port: Union[str, int] = 6379,
252
db: Union[str, int] = 0,
253
password: Optional[str] = None,
254
socket_timeout: Optional[float] = None,
255
socket_connect_timeout: Optional[float] = None,
256
socket_keepalive: bool = False,
257
socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
258
socket_type: int = 0,
259
retry_on_timeout: bool = False,
260
encoding: str = "utf-8",
261
encoding_errors: str = "strict",
262
decode_responses: bool = False,
263
parser_class: Type[BaseParser] = DefaultParser,
264
socket_read_size: int = 65536,
265
health_check_interval: float = 0,
266
client_name: Optional[str] = None,
267
username: Optional[str] = None,
268
encoder_class: Type[Encoder] = Encoder,
269
):
270
"""
271
Initialize TCP connection.
272
273
Args:
274
host: Redis server hostname
275
port: Redis server port
276
db: Database number
277
password: Authentication password
278
socket_timeout: Socket operation timeout
279
socket_connect_timeout: Connection establishment timeout
280
socket_keepalive: Enable TCP keepalive
281
socket_keepalive_options: Keepalive configuration
282
socket_type: Socket type
283
retry_on_timeout: Retry operations on timeout
284
encoding: Response encoding
285
encoding_errors: Encoding error handling
286
decode_responses: Decode responses to strings
287
parser_class: Protocol parser
288
socket_read_size: Read buffer size
289
health_check_interval: Health check frequency
290
client_name: Client name for identification
291
username: Authentication username
292
encoder_class: Data encoder class
293
"""
294
295
async def connect(self) -> None:
296
"""Establish connection to Redis server."""
297
298
async def disconnect(self) -> None:
299
"""Close connection to Redis server."""
300
301
async def send_command(self, *args, **kwargs) -> Any:
302
"""
303
Send command to Redis server.
304
305
Args:
306
args: Command arguments
307
kwargs: Command options
308
309
Returns:
310
Command response
311
"""
312
313
async def send_packed_command(self, command: bytes, check_health: bool = True) -> None:
314
"""
315
Send pre-packed command to server.
316
317
Args:
318
command: Packed command bytes
319
check_health: Check connection health before sending
320
"""
321
322
async def can_read(self, timeout: float = 0) -> bool:
323
"""
324
Check if data is available to read.
325
326
Args:
327
timeout: Timeout for check in seconds
328
329
Returns:
330
True if data is available
331
"""
332
333
async def read_response(self) -> Any:
334
"""
335
Read and parse response from server.
336
337
Returns:
338
Parsed response data
339
"""
340
341
def pack_command(self, *args) -> List[bytes]:
342
"""
343
Pack command arguments for transmission.
344
345
Args:
346
args: Command arguments
347
348
Returns:
349
List of packed command parts
350
"""
351
352
def pack_commands(self, commands: List[Tuple]) -> List[bytes]:
353
"""
354
Pack multiple commands for pipeline transmission.
355
356
Args:
357
commands: List of command tuples
358
359
Returns:
360
List of packed command bytes
361
"""
362
363
async def check_health(self) -> None:
364
"""Check and maintain connection health."""
365
366
def register_connect_callback(self, callback: Callable) -> None:
367
"""
368
Register callback for connection events.
369
370
Args:
371
callback: Function to call on connection
372
"""
373
374
def clear_connect_callbacks(self) -> None:
375
"""Clear all connection callbacks."""
376
377
@property
378
def is_connected(self) -> bool:
379
"""
380
Whether connection is currently active.
381
382
Returns:
383
True if connected
384
"""
385
386
class SSLConnection(Connection):
387
"""SSL-enabled TCP connection to Redis server."""
388
389
def __init__(
390
self,
391
ssl_keyfile: Optional[str] = None,
392
ssl_certfile: Optional[str] = None,
393
ssl_cert_reqs: str = "required",
394
ssl_ca_certs: Optional[str] = None,
395
ssl_check_hostname: bool = False,
396
**kwargs
397
):
398
"""
399
Initialize SSL connection.
400
401
Args:
402
ssl_keyfile: Path to SSL private key file
403
ssl_certfile: Path to SSL certificate file
404
ssl_cert_reqs: Certificate requirement level
405
ssl_ca_certs: Path to CA certificates file
406
ssl_check_hostname: Verify hostname against certificate
407
kwargs: Additional connection parameters
408
"""
409
410
class UnixDomainSocketConnection(Connection):
411
"""Unix domain socket connection to Redis server."""
412
413
def __init__(self, path: str = "/var/run/redis/redis.sock", **kwargs):
414
"""
415
Initialize Unix socket connection.
416
417
Args:
418
path: Path to Redis Unix socket
419
kwargs: Additional connection parameters
420
"""
421
```
422
423
### Protocol Parsers and Encoders
424
425
Low-level protocol handling for Redis communication.
426
427
```python { .api }
428
class BaseParser:
429
"""Base class for Redis protocol parsers."""
430
431
def on_connect(self, connection: Connection) -> None:
432
"""
433
Handle connection establishment.
434
435
Args:
436
connection: Active connection instance
437
"""
438
439
def on_disconnect(self) -> None:
440
"""Handle connection termination."""
441
442
async def can_read(self, timeout: float) -> bool:
443
"""
444
Check if data is available for reading.
445
446
Args:
447
timeout: Timeout in seconds
448
449
Returns:
450
True if data is available
451
"""
452
453
async def read_response(self) -> Any:
454
"""
455
Read and parse Redis protocol response.
456
457
Returns:
458
Parsed response value
459
"""
460
461
class PythonParser(BaseParser):
462
"""Pure Python implementation of Redis protocol parser."""
463
464
class HiredisParser(BaseParser):
465
"""Hiredis-based parser for improved performance."""
466
467
class Encoder:
468
"""Handles encoding and decoding of Redis data."""
469
470
def encode(self, value: Any) -> bytes:
471
"""
472
Encode value for Redis transmission.
473
474
Args:
475
value: Value to encode
476
477
Returns:
478
Encoded bytes
479
"""
480
481
def decode(self, value: bytes, force: bool = False) -> Any:
482
"""
483
Decode Redis response value.
484
485
Args:
486
value: Response bytes to decode
487
force: Force decoding even if decode_responses=False
488
489
Returns:
490
Decoded value
491
"""
492
```
493
494
### Sentinel Support
495
496
High availability Redis setup with automatic failover using Redis Sentinel.
497
498
```python { .api }
499
class Sentinel:
500
"""Redis Sentinel client for high availability."""
501
502
def __init__(
503
self,
504
sentinels: List[Tuple[str, int]],
505
socket_timeout: float = 0.1,
506
**kwargs
507
):
508
"""
509
Initialize Sentinel client.
510
511
Args:
512
sentinels: List of (host, port) tuples for sentinel servers
513
socket_timeout: Socket timeout for sentinel connections
514
kwargs: Additional connection parameters
515
"""
516
517
def master_for(
518
self,
519
service_name: str,
520
redis_class: Type[Redis] = Redis,
521
**kwargs
522
) -> Redis:
523
"""
524
Get Redis client for master server.
525
526
Args:
527
service_name: Name of Redis service in Sentinel config
528
redis_class: Redis client class to use
529
kwargs: Additional client parameters
530
531
Returns:
532
Redis client connected to current master
533
"""
534
535
def slave_for(
536
self,
537
service_name: str,
538
redis_class: Type[Redis] = Redis,
539
**kwargs
540
) -> Redis:
541
"""
542
Get Redis client for slave server.
543
544
Args:
545
service_name: Name of Redis service in Sentinel config
546
redis_class: Redis client class to use
547
kwargs: Additional client parameters
548
549
Returns:
550
Redis client connected to a slave
551
"""
552
553
async def discover_master(self, service_name: str) -> Tuple[str, int]:
554
"""
555
Discover master server address.
556
557
Args:
558
service_name: Service name to discover
559
560
Returns:
561
Tuple of (host, port) for master server
562
"""
563
564
async def discover_slaves(self, service_name: str) -> List[Tuple[str, int]]:
565
"""
566
Discover slave server addresses.
567
568
Args:
569
service_name: Service name to discover
570
571
Returns:
572
List of (host, port) tuples for slave servers
573
"""
574
575
class SentinelConnectionPool(ConnectionPool):
576
"""Connection pool managed by Redis Sentinel."""
577
578
def __init__(
579
self,
580
service_name: str,
581
sentinel_manager: Sentinel,
582
**kwargs
583
):
584
"""
585
Initialize Sentinel-managed connection pool.
586
587
Args:
588
service_name: Redis service name
589
sentinel_manager: Sentinel client instance
590
kwargs: Additional pool parameters
591
"""
592
593
class SentinelManagedConnection(Connection):
594
"""Connection managed by Redis Sentinel with automatic failover."""
595
```
596
597
## Usage Examples
598
599
### Basic Connection Setup
600
601
```python
602
async def basic_connection_examples():
603
# Simple connection
604
redis = aioredis.Redis(
605
host='localhost',
606
port=6379,
607
db=0,
608
decode_responses=True
609
)
610
611
# Test connection
612
pong = await redis.ping()
613
print(f"Connected: {pong}") # True
614
615
# Connection with authentication
616
redis_auth = aioredis.Redis(
617
host='redis.example.com',
618
port=6380,
619
password='secretpassword',
620
username='myuser', # Redis 6+
621
db=1
622
)
623
624
# Connection with timeouts
625
redis_timeouts = aioredis.Redis(
626
host='localhost',
627
port=6379,
628
socket_connect_timeout=5, # 5 seconds to connect
629
socket_timeout=3, # 3 seconds per operation
630
retry_on_timeout=True
631
)
632
633
# SSL connection
634
redis_ssl = aioredis.Redis(
635
host='secure-redis.example.com',
636
port=6380,
637
ssl=True,
638
ssl_certfile='/path/to/client.crt',
639
ssl_keyfile='/path/to/client.key',
640
ssl_ca_certs='/path/to/ca.crt'
641
)
642
643
# Unix socket connection
644
redis_unix = aioredis.Redis(
645
unix_socket_path='/var/run/redis/redis.sock',
646
db=0
647
)
648
649
await redis.aclose()
650
```
651
652
### URL-Based Configuration
653
654
```python
655
async def url_connection_examples():
656
# Basic URL
657
redis = aioredis.from_url("redis://localhost:6379/0")
658
659
# URL with authentication
660
redis_auth = aioredis.from_url(
661
"redis://myuser:mypassword@redis.example.com:6380/1"
662
)
663
664
# SSL URL
665
redis_ssl = aioredis.from_url(
666
"rediss://user:pass@secure-redis.com:6380/0",
667
ssl_cert_reqs="required",
668
ssl_ca_certs="/path/to/ca.crt"
669
)
670
671
# Unix socket URL
672
redis_unix = aioredis.from_url(
673
"unix:///var/run/redis/redis.sock?db=0"
674
)
675
676
# URL with additional parameters
677
redis_custom = aioredis.from_url(
678
"redis://localhost:6379/0",
679
socket_timeout=5,
680
socket_connect_timeout=10,
681
decode_responses=True,
682
health_check_interval=30
683
)
684
685
await redis.aclose()
686
```
687
688
### Connection Pooling
689
690
```python
691
async def connection_pool_examples():
692
# Create custom connection pool
693
pool = aioredis.ConnectionPool(
694
host='localhost',
695
port=6379,
696
max_connections=100,
697
retry_on_timeout=True,
698
socket_keepalive=True
699
)
700
701
# Use pool with Redis client
702
redis = aioredis.Redis(connection_pool=pool)
703
704
# Blocking pool with timeout
705
blocking_pool = aioredis.BlockingConnectionPool(
706
host='localhost',
707
port=6379,
708
max_connections=20,
709
timeout=10 # Wait up to 10 seconds for available connection
710
)
711
712
redis_blocking = aioredis.Redis(connection_pool=blocking_pool)
713
714
# Pool from URL
715
pool_from_url = aioredis.ConnectionPool.from_url(
716
"redis://localhost:6379/0",
717
max_connections=50
718
)
719
720
redis_url_pool = aioredis.Redis(connection_pool=pool_from_url)
721
722
# Multiple clients sharing pool
723
redis1 = aioredis.Redis(connection_pool=pool)
724
redis2 = aioredis.Redis(connection_pool=pool)
725
redis3 = aioredis.Redis(connection_pool=pool)
726
727
# Perform operations (connections automatically managed)
728
await redis1.set('key1', 'value1')
729
await redis2.set('key2', 'value2')
730
await redis3.set('key3', 'value3')
731
732
# Cleanup
733
await pool.disconnect()
734
await blocking_pool.disconnect()
735
await pool_from_url.disconnect()
736
```
737
738
### High Availability with Sentinel
739
740
```python
741
async def sentinel_examples():
742
# Configure Sentinel
743
sentinel = aioredis.Sentinel([
744
('sentinel1.example.com', 26379),
745
('sentinel2.example.com', 26379),
746
('sentinel3.example.com', 26379)
747
])
748
749
# Get master client
750
master = sentinel.master_for('mymaster', socket_timeout=0.1)
751
752
# Get slave client for read operations
753
slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
754
755
# Write to master
756
await master.set('key', 'value')
757
758
# Read from slave (may have replication lag)
759
value = await slave.get('key')
760
761
# Sentinel can automatically handle failover
762
# If master goes down, Sentinel will promote a slave
763
764
# Manual master discovery
765
master_address = await sentinel.discover_master('mymaster')
766
print(f"Current master: {master_address}")
767
768
slave_addresses = await sentinel.discover_slaves('mymaster')
769
print(f"Available slaves: {slave_addresses}")
770
771
await master.aclose()
772
await slave.aclose()
773
```
774
775
### Connection Health and Monitoring
776
777
```python
778
async def connection_health_examples():
779
# Connection with health checking
780
redis = aioredis.Redis(
781
host='localhost',
782
port=6379,
783
health_check_interval=30, # Check every 30 seconds
784
socket_keepalive=True,
785
socket_keepalive_options={
786
1: 1, # TCP_KEEPIDLE
787
2: 3, # TCP_KEEPINTVL
788
3: 5 # TCP_KEEPCNT
789
}
790
)
791
792
# Manual health check
793
try:
794
await redis.ping()
795
print("Connection healthy")
796
except aioredis.ConnectionError:
797
print("Connection failed")
798
799
# Echo test
800
echo_result = await redis.echo("Hello Redis")
801
print(f"Echo: {echo_result}")
802
803
# Connection info (if supported)
804
try:
805
client_info = await redis.client_list()
806
print(f"Connected clients: {len(client_info)}")
807
except Exception:
808
pass
809
810
await redis.aclose()
811
```
812
813
### Error Handling and Reconnection
814
815
```python
816
async def error_handling_examples():
817
redis = aioredis.Redis(
818
host='localhost',
819
port=6379,
820
retry_on_timeout=True,
821
socket_timeout=5
822
)
823
824
async def robust_operation(key, value):
825
max_retries = 3
826
retry_count = 0
827
828
while retry_count < max_retries:
829
try:
830
await redis.set(key, value)
831
print(f"Successfully set {key} = {value}")
832
return
833
834
except aioredis.TimeoutError:
835
retry_count += 1
836
print(f"Timeout, retry {retry_count}/{max_retries}")
837
if retry_count < max_retries:
838
await asyncio.sleep(1)
839
else:
840
raise
841
842
except aioredis.ConnectionError as e:
843
print(f"Connection error: {e}")
844
# Connection will be automatically recreated on next operation
845
retry_count += 1
846
if retry_count < max_retries:
847
await asyncio.sleep(2)
848
else:
849
raise
850
851
# Test robust operation
852
try:
853
await robust_operation('test_key', 'test_value')
854
except Exception as e:
855
print(f"Operation failed after retries: {e}")
856
857
await redis.aclose()
858
```
859
860
### Performance Optimization
861
862
```python
863
async def performance_optimization_examples():
864
# Optimized connection settings
865
redis = aioredis.Redis(
866
host='localhost',
867
port=6379,
868
decode_responses=True,
869
socket_read_size=65536, # 64KB read buffer
870
socket_keepalive=True,
871
health_check_interval=0, # Disable health checks for max performance
872
)
873
874
# Use connection pooling for high concurrency
875
pool = aioredis.ConnectionPool(
876
host='localhost',
877
port=6379,
878
max_connections=100, # Tune based on load
879
socket_read_size=131072, # 128KB for high throughput
880
)
881
882
redis_pooled = aioredis.Redis(connection_pool=pool)
883
884
# Single connection client for sequential operations
885
redis_single = aioredis.Redis(
886
host='localhost',
887
port=6379,
888
single_connection_client=True # Reuse single connection
889
)
890
891
# Benchmark different configurations
892
import time
893
894
async def benchmark_operations(client, name, count=1000):
895
start_time = time.time()
896
897
for i in range(count):
898
await client.set(f'benchmark:{name}:{i}', f'value_{i}')
899
900
elapsed = time.time() - start_time
901
print(f"{name}: {count} operations in {elapsed:.2f}s ({count/elapsed:.0f} ops/sec)")
902
903
await benchmark_operations(redis, "default", 1000)
904
await benchmark_operations(redis_pooled, "pooled", 1000)
905
await benchmark_operations(redis_single, "single_conn", 1000)
906
907
await redis.aclose()
908
await pool.disconnect()
909
await redis_single.aclose()
910
```