CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-redis

Python client for Redis database and key-value store

Pending
Overview
Eval results
Files

connection-management.mddocs/

Connection Management

Redis connection management provides efficient connection pooling, SSL support, and various connection types for optimal performance and security. Connection pools manage multiple connections to reduce overhead and improve throughput.

Capabilities

Connection Pool Classes

Connection pools manage multiple connections to Redis servers for improved performance and resource management.

class ConnectionPool:
    def __init__(
        self,
        connection_class: Type[Connection] = Connection,
        max_connections: Optional[int] = None,
        retry_on_timeout: bool = False,
        retry_on_error: Optional[List[Type[Exception]]] = None,
        **connection_kwargs
    ): ...
    
    def get_connection(self, command_name: str, **kwargs) -> Connection: ...
    
    def make_connection(self) -> Connection: ...
    
    def release(self, connection: Connection) -> None: ...
    
    def disconnect(self, inuse_connections: bool = True) -> None: ...
    
    def reset(self) -> None: ...
    
    @classmethod
    def from_url(cls, url: str, **kwargs) -> "ConnectionPool": ...

class BlockingConnectionPool(ConnectionPool):
    def __init__(
        self,
        max_connections: int = 50,
        timeout: int = 20,
        connection_class: Type[Connection] = Connection,
        queue_class: Type = None,
        **connection_kwargs
    ): ...

Connection Classes

Different connection types for various Redis deployment scenarios and security requirements.

class Connection:
    def __init__(
        self,
        host: str = "localhost",
        port: int = 6379,
        db: int = 0,
        username: Optional[str] = None,
        password: Optional[str] = None,
        socket_timeout: Optional[float] = None,
        socket_connect_timeout: Optional[float] = None,
        socket_keepalive: bool = False,
        socket_keepalive_options: Optional[Dict[str, int]] = None,
        socket_type: int = 0,
        retry_on_timeout: bool = False,
        retry_on_error: Optional[List[Type[Exception]]] = None,
        encoding: str = "utf-8",
        encoding_errors: str = "strict",
        decode_responses: bool = False,
        parser_class: Type = None,
        socket_read_size: int = 65536,
        health_check_interval: int = 0,
        client_name: Optional[str] = None,
        lib_name: Optional[str] = None,
        lib_version: Optional[str] = None,
        **kwargs
    ): ...
    
    def connect(self) -> None: ...
    
    def disconnect(self) -> None: ...
    
    def send_command(self, *args) -> None: ...
    
    def send_packed_command(self, command: bytes) -> None: ...
    
    def read_response(self) -> Any: ...
    
    def pack_command(self, *args) -> bytes: ...
    
    def pack_commands(self, commands: List[Tuple]) -> bytes: ...

class SSLConnection(Connection):
    def __init__(
        self,
        ssl_keyfile: Optional[str] = None,
        ssl_certfile: Optional[str] = None,
        ssl_cert_reqs: str = "required",
        ssl_ca_certs: Optional[str] = None,
        ssl_ca_data: Optional[str] = None,
        ssl_check_hostname: bool = False,
        ssl_password: Optional[str] = None,
        ssl_disable_dev_restrictions: bool = False,
        **kwargs
    ): ...

class UnixDomainSocketConnection(Connection):
    def __init__(
        self,
        path: str = "",
        db: int = 0,
        username: Optional[str] = None,
        password: Optional[str] = None,
        socket_timeout: Optional[float] = None,
        encoding: str = "utf-8",
        encoding_errors: str = "strict",
        decode_responses: bool = False,
        retry_on_timeout: bool = False,
        retry_on_error: Optional[List[Type[Exception]]] = None,
        parser_class: Type = None,
        socket_read_size: int = 65536,
        health_check_interval: int = 0,
        client_name: Optional[str] = None,
        **kwargs
    ): ...

Sentinel Connection Classes

Specialized connection classes for Redis Sentinel high availability configurations.

class SentinelConnectionPool(ConnectionPool):
    def __init__(
        self,
        service_name: str,
        sentinel_manager: SentinelManager,
        **kwargs
    ): ...

class SentinelManagedConnection(Connection):
    def __init__(
        self,
        **kwargs
    ): ...

class SentinelManagedSSLConnection(SSLConnection):
    def __init__(
        self,
        **kwargs
    ): ...

Utility Functions

Helper functions for creating connections and managing connection URLs.

def from_url(
    url: str,
    **kwargs
) -> Redis: ...

Usage Examples

Basic Connection Pool Usage

import redis
from redis import ConnectionPool, Connection

# Create a connection pool
pool = ConnectionPool(
    host='localhost',
    port=6379,
    db=0,
    max_connections=20,
    retry_on_timeout=True
)

# Create Redis client with pool
r = redis.Redis(connection_pool=pool)

# Multiple clients can share the same pool
r1 = redis.Redis(connection_pool=pool)
r2 = redis.Redis(connection_pool=pool)

SSL Connection Configuration

from redis import Redis, ConnectionPool, SSLConnection

# SSL connection pool
ssl_pool = ConnectionPool(
    connection_class=SSLConnection,
    host='redis-ssl.example.com',
    port=6380,
    ssl_cert_reqs='required',
    ssl_ca_certs='/path/to/ca-certificates.crt',
    ssl_certfile='/path/to/client.crt',
    ssl_keyfile='/path/to/client.key',
    ssl_check_hostname=True
)

# Create Redis client with SSL
redis_ssl = Redis(connection_pool=ssl_pool)

# Alternatively, create SSL client directly
redis_ssl = Redis(
    host='redis-ssl.example.com',
    port=6380,
    ssl=True,
    ssl_ca_certs='/path/to/ca-certificates.crt',
    ssl_certfile='/path/to/client.crt',
    ssl_keyfile='/path/to/client.key'
)

Unix Domain Socket Connection

from redis import Redis, UnixDomainSocketConnection

# Unix socket connection
r = Redis(
    unix_domain_socket_path='/var/run/redis/redis.sock',
    db=0
)

# With connection pool
unix_pool = ConnectionPool(
    connection_class=UnixDomainSocketConnection,
    path='/var/run/redis/redis.sock',
    db=0
)

r = Redis(connection_pool=unix_pool)

Blocking Connection Pool

from redis import Redis, BlockingConnectionPool

# Blocking pool with connection limit
blocking_pool = BlockingConnectionPool(
    host='localhost',
    port=6379,
    max_connections=10,
    timeout=20,  # Wait up to 20 seconds for available connection
    db=0
)

r = Redis(connection_pool=blocking_pool)

# This will block if all connections are in use
try:
    r.set('key', 'value')
except redis.ConnectionError as e:
    print(f"Could not get connection: {e}")

Connection Pool from URL

from redis import ConnectionPool

# Create pool from URL
pool = ConnectionPool.from_url('redis://localhost:6379/0')

# SSL from URL  
ssl_pool = ConnectionPool.from_url(
    'rediss://username:password@redis.example.com:6380/0'
)

# Unix socket from URL
unix_pool = ConnectionPool.from_url('unix:///var/run/redis/redis.sock?db=0')

Connection Health Monitoring

# Enable connection health checks
pool = ConnectionPool(
    host='localhost',
    port=6379,
    health_check_interval=30,  # Check every 30 seconds
    max_connections=20
)

r = Redis(connection_pool=pool)

# Manual connection health check
connection = pool.get_connection('ping')
try:
    connection.send_command('PING')
    response = connection.read_response()
    print(f"Connection healthy: {response}")
finally:
    pool.release(connection)

Advanced Connection Configuration

# Custom connection with keepalive
pool = ConnectionPool(
    host='localhost',
    port=6379,
    socket_keepalive=True,
    socket_keepalive_options={
        1: 1,     # TCP_KEEPIDLE
        2: 3,     # TCP_KEEPINTVL  
        3: 5      # TCP_KEEPCNT
    },
    socket_timeout=5.0,
    socket_connect_timeout=10.0,
    retry_on_timeout=True,
    retry_on_error=[redis.ConnectionError, redis.TimeoutError]
)

r = Redis(connection_pool=pool)

Connection Pool Management

import redis

# Create pool with monitoring
pool = redis.ConnectionPool(
    host='localhost',
    port=6379,
    max_connections=50
)

r = redis.Redis(connection_pool=pool)

# Check pool status
print(f"Created connections: {pool.created_connections}")
print(f"Available connections: {len(pool._available_connections)}")
print(f"In-use connections: {len(pool._in_use_connections)}")

# Reset all connections (closes existing connections)
pool.reset()

# Disconnect all connections
pool.disconnect()

Error Handling

from redis.exceptions import ConnectionError, TimeoutError

try:
    r = Redis(
        host='unreachable-host',
        port=6379,
        socket_connect_timeout=5,
        retry_on_timeout=True
    )
    r.ping()
except ConnectionError as e:
    print(f"Connection failed: {e}")
except TimeoutError as e:
    print(f"Connection timed out: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-redis

docs

async-support.md

cluster-support.md

connection-management.md

core-client.md

distributed-locking.md

error-handling.md

high-availability.md

index.md

pipelines-transactions.md

pubsub-messaging.md

tile.json