Python client for Redis database and key-value store
—
Redis connection management provides efficient connection pooling, SSL support, and various connection types for optimal performance and security. Connection pools manage multiple connections to reduce overhead and improve throughput.
Connection pools manage multiple connections to Redis servers for improved performance and resource management.
class ConnectionPool:
def __init__(
self,
connection_class: Type[Connection] = Connection,
max_connections: Optional[int] = None,
retry_on_timeout: bool = False,
retry_on_error: Optional[List[Type[Exception]]] = None,
**connection_kwargs
): ...
def get_connection(self, command_name: str, **kwargs) -> Connection: ...
def make_connection(self) -> Connection: ...
def release(self, connection: Connection) -> None: ...
def disconnect(self, inuse_connections: bool = True) -> None: ...
def reset(self) -> None: ...
@classmethod
def from_url(cls, url: str, **kwargs) -> "ConnectionPool": ...
class BlockingConnectionPool(ConnectionPool):
def __init__(
self,
max_connections: int = 50,
timeout: int = 20,
connection_class: Type[Connection] = Connection,
queue_class: Type = None,
**connection_kwargs
): ...Different connection types for various Redis deployment scenarios and security requirements.
class Connection:
def __init__(
self,
host: str = "localhost",
port: int = 6379,
db: int = 0,
username: Optional[str] = None,
password: Optional[str] = None,
socket_timeout: Optional[float] = None,
socket_connect_timeout: Optional[float] = None,
socket_keepalive: bool = False,
socket_keepalive_options: Optional[Dict[str, int]] = None,
socket_type: int = 0,
retry_on_timeout: bool = False,
retry_on_error: Optional[List[Type[Exception]]] = None,
encoding: str = "utf-8",
encoding_errors: str = "strict",
decode_responses: bool = False,
parser_class: Type = None,
socket_read_size: int = 65536,
health_check_interval: int = 0,
client_name: Optional[str] = None,
lib_name: Optional[str] = None,
lib_version: Optional[str] = None,
**kwargs
): ...
def connect(self) -> None: ...
def disconnect(self) -> None: ...
def send_command(self, *args) -> None: ...
def send_packed_command(self, command: bytes) -> None: ...
def read_response(self) -> Any: ...
def pack_command(self, *args) -> bytes: ...
def pack_commands(self, commands: List[Tuple]) -> bytes: ...
class SSLConnection(Connection):
def __init__(
self,
ssl_keyfile: Optional[str] = None,
ssl_certfile: Optional[str] = None,
ssl_cert_reqs: str = "required",
ssl_ca_certs: Optional[str] = None,
ssl_ca_data: Optional[str] = None,
ssl_check_hostname: bool = False,
ssl_password: Optional[str] = None,
ssl_disable_dev_restrictions: bool = False,
**kwargs
): ...
class UnixDomainSocketConnection(Connection):
def __init__(
self,
path: str = "",
db: int = 0,
username: Optional[str] = None,
password: Optional[str] = None,
socket_timeout: Optional[float] = None,
encoding: str = "utf-8",
encoding_errors: str = "strict",
decode_responses: bool = False,
retry_on_timeout: bool = False,
retry_on_error: Optional[List[Type[Exception]]] = None,
parser_class: Type = None,
socket_read_size: int = 65536,
health_check_interval: int = 0,
client_name: Optional[str] = None,
**kwargs
): ...Specialized connection classes for Redis Sentinel high availability configurations.
class SentinelConnectionPool(ConnectionPool):
def __init__(
self,
service_name: str,
sentinel_manager: SentinelManager,
**kwargs
): ...
class SentinelManagedConnection(Connection):
def __init__(
self,
**kwargs
): ...
class SentinelManagedSSLConnection(SSLConnection):
def __init__(
self,
**kwargs
): ...Helper functions for creating connections and managing connection URLs.
def from_url(
url: str,
**kwargs
) -> Redis: ...import redis
from redis import ConnectionPool, Connection
# Create a connection pool
pool = ConnectionPool(
host='localhost',
port=6379,
db=0,
max_connections=20,
retry_on_timeout=True
)
# Create Redis client with pool
r = redis.Redis(connection_pool=pool)
# Multiple clients can share the same pool
r1 = redis.Redis(connection_pool=pool)
r2 = redis.Redis(connection_pool=pool)from redis import Redis, ConnectionPool, SSLConnection
# SSL connection pool
ssl_pool = ConnectionPool(
connection_class=SSLConnection,
host='redis-ssl.example.com',
port=6380,
ssl_cert_reqs='required',
ssl_ca_certs='/path/to/ca-certificates.crt',
ssl_certfile='/path/to/client.crt',
ssl_keyfile='/path/to/client.key',
ssl_check_hostname=True
)
# Create Redis client with SSL
redis_ssl = Redis(connection_pool=ssl_pool)
# Alternatively, create SSL client directly
redis_ssl = Redis(
host='redis-ssl.example.com',
port=6380,
ssl=True,
ssl_ca_certs='/path/to/ca-certificates.crt',
ssl_certfile='/path/to/client.crt',
ssl_keyfile='/path/to/client.key'
)from redis import Redis, UnixDomainSocketConnection
# Unix socket connection
r = Redis(
unix_domain_socket_path='/var/run/redis/redis.sock',
db=0
)
# With connection pool
unix_pool = ConnectionPool(
connection_class=UnixDomainSocketConnection,
path='/var/run/redis/redis.sock',
db=0
)
r = Redis(connection_pool=unix_pool)from redis import Redis, BlockingConnectionPool
# Blocking pool with connection limit
blocking_pool = BlockingConnectionPool(
host='localhost',
port=6379,
max_connections=10,
timeout=20, # Wait up to 20 seconds for available connection
db=0
)
r = Redis(connection_pool=blocking_pool)
# This will block if all connections are in use
try:
r.set('key', 'value')
except redis.ConnectionError as e:
print(f"Could not get connection: {e}")from redis import ConnectionPool
# Create pool from URL
pool = ConnectionPool.from_url('redis://localhost:6379/0')
# SSL from URL
ssl_pool = ConnectionPool.from_url(
'rediss://username:password@redis.example.com:6380/0'
)
# Unix socket from URL
unix_pool = ConnectionPool.from_url('unix:///var/run/redis/redis.sock?db=0')# Enable connection health checks
pool = ConnectionPool(
host='localhost',
port=6379,
health_check_interval=30, # Check every 30 seconds
max_connections=20
)
r = Redis(connection_pool=pool)
# Manual connection health check
connection = pool.get_connection('ping')
try:
connection.send_command('PING')
response = connection.read_response()
print(f"Connection healthy: {response}")
finally:
pool.release(connection)# Custom connection with keepalive
pool = ConnectionPool(
host='localhost',
port=6379,
socket_keepalive=True,
socket_keepalive_options={
1: 1, # TCP_KEEPIDLE
2: 3, # TCP_KEEPINTVL
3: 5 # TCP_KEEPCNT
},
socket_timeout=5.0,
socket_connect_timeout=10.0,
retry_on_timeout=True,
retry_on_error=[redis.ConnectionError, redis.TimeoutError]
)
r = Redis(connection_pool=pool)import redis
# Create pool with monitoring
pool = redis.ConnectionPool(
host='localhost',
port=6379,
max_connections=50
)
r = redis.Redis(connection_pool=pool)
# Check pool status
print(f"Created connections: {pool.created_connections}")
print(f"Available connections: {len(pool._available_connections)}")
print(f"In-use connections: {len(pool._in_use_connections)}")
# Reset all connections (closes existing connections)
pool.reset()
# Disconnect all connections
pool.disconnect()from redis.exceptions import ConnectionError, TimeoutError
try:
r = Redis(
host='unreachable-host',
port=6379,
socket_connect_timeout=5,
retry_on_timeout=True
)
r.ping()
except ConnectionError as e:
print(f"Connection failed: {e}")
except TimeoutError as e:
print(f"Connection timed out: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-redis