CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aioredis

asyncio (PEP 3156) Redis support

Overall
score

98%

Overview
Eval results
Files

connection-management.mddocs/

Connection Management

Connection and pooling functionality for managing Redis connections efficiently. This includes URL-based configuration, SSL connections, Unix domain sockets, connection pooling strategies, and connection lifecycle management for optimal performance and reliability.

Capabilities

Redis Client Creation

Multiple ways to create Redis client instances with flexible configuration options.

class Redis:
    """Main Redis client class with async/await support."""
    
    def __init__(
        self,
        *,
        host: str = "localhost",
        port: int = 6379,
        db: Union[str, int] = 0,
        password: Optional[str] = None,
        socket_timeout: Optional[float] = None,
        socket_connect_timeout: Optional[float] = None,
        socket_keepalive: Optional[bool] = None,
        socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
        connection_pool: Optional[ConnectionPool] = None,
        unix_socket_path: Optional[str] = None,
        encoding: str = "utf-8",
        encoding_errors: str = "strict",
        decode_responses: bool = False,
        retry_on_timeout: bool = False,
        ssl: bool = False,
        ssl_keyfile: Optional[str] = None,
        ssl_certfile: Optional[str] = None,
        ssl_cert_reqs: str = "required",
        ssl_ca_certs: Optional[str] = None,
        ssl_check_hostname: bool = False,
        max_connections: Optional[int] = None,
        single_connection_client: bool = False,
        health_check_interval: int = 0,
        client_name: Optional[str] = None,
        username: Optional[str] = None,
    ):
        """
        Initialize Redis client.
        
        Args:
            host: Redis server hostname
            port: Redis server port  
            db: Database number (0-15)
            password: Authentication password
            socket_timeout: Socket operation timeout in seconds
            socket_connect_timeout: Socket connection timeout in seconds
            socket_keepalive: Enable TCP keepalive (None uses system default)
            socket_keepalive_options: TCP keepalive options
            connection_pool: Custom connection pool
            unix_socket_path: Unix domain socket path
            encoding: String encoding for responses
            encoding_errors: Encoding error handling
            decode_responses: Decode byte responses to strings
            retry_on_timeout: Retry commands on timeout
            ssl: Enable SSL/TLS encryption
            ssl_keyfile: SSL private key file path
            ssl_certfile: SSL certificate file path
            ssl_cert_reqs: SSL certificate requirements
            ssl_ca_certs: SSL CA certificates file path
            ssl_check_hostname: Verify SSL hostname
            max_connections: Maximum pool connections
            single_connection_client: Use single persistent connection
            health_check_interval: Health check interval in seconds
            client_name: Client identification name
            username: Authentication username (Redis 6+)
        """
    
    @classmethod
    def from_url(cls, url: str, **kwargs) -> "Redis":
        """
        Create Redis client from connection URL.
        
        Args:
            url: Redis connection URL (redis://[[username]:password@]host[:port][/database])
            kwargs: Additional connection parameters
            
        Returns:
            Configured Redis client instance
        """
    
    async def ping(self) -> bool:
        """
        Test connection to Redis server.
        
        Returns:
            True if server responds to ping
        """
    
    async def echo(self, value: str) -> str:
        """
        Echo message back from server.
        
        Args:
            value: Message to echo
            
        Returns:
            Echoed message
        """
    
    async def aclose(self) -> None:
        """Close all connections and cleanup resources."""
        
    def close(self) -> None:
        """Synchronous connection cleanup (use aclose() in async context)."""

def from_url(url: str, **kwargs) -> Redis:
    """
    Create Redis client from connection URL.
    
    Args:
        url: Redis connection URL
        kwargs: Additional connection parameters
        
    Returns:
        Configured Redis client instance
    """

Connection Pool Management

Connection pooling for efficient connection reuse and resource management.

class ConnectionPool:
    """Connection pool for managing Redis connections."""
    
    def __init__(
        self,
        connection_class: Type[Connection] = Connection,
        max_connections: int = 50,
        **connection_kwargs
    ):
        """
        Initialize connection pool.
        
        Args:
            connection_class: Connection class to use
            max_connections: Maximum number of connections
            connection_kwargs: Arguments for connection creation
        """
    
    @classmethod
    def from_url(cls, url: str, **kwargs) -> "ConnectionPool":
        """
        Create connection pool from URL.
        
        Args:
            url: Redis connection URL
            kwargs: Additional pool parameters
            
        Returns:
            Configured connection pool
        """
    
    async def get_connection(self, command_name: str, *keys, **options) -> Connection:
        """
        Get connection from pool.
        
        Args:
            command_name: Redis command being executed
            keys: Command keys (for sharding)
            options: Command options
            
        Returns:
            Available connection instance
        """
    
    def make_connection(self) -> Connection:
        """
        Create new connection instance.
        
        Returns:
            New connection (not from pool)
        """
    
    async def release(self, connection: Connection) -> None:
        """
        Return connection to pool.
        
        Args:
            connection: Connection to return
        """
    
    async def disconnect(self, inuse_connections: bool = True) -> None:
        """
        Disconnect all pool connections.
        
        Args:
            inuse_connections: Whether to disconnect active connections
        """
    
    def reset(self) -> None:
        """Reset pool state, clearing all connections."""
    
    def get_encoder(self) -> Encoder:
        """
        Get connection encoder instance.
        
        Returns:
            Encoder for data serialization
        """
    
    def owns_connection(self, connection: Connection) -> bool:
        """
        Check if pool owns connection.
        
        Args:
            connection: Connection to check
            
        Returns:
            True if connection belongs to this pool
        """

class BlockingConnectionPool(ConnectionPool):
    """Connection pool that blocks when max connections reached."""
    
    def __init__(
        self, 
        max_connections: int = 50, 
        timeout: float = 20, 
        **connection_kwargs
    ):
        """
        Initialize blocking connection pool.
        
        Args:
            max_connections: Maximum number of connections
            timeout: Timeout when waiting for available connection
            connection_kwargs: Arguments for connection creation
        """

Connection Types

Different connection implementations for various network configurations.

class Connection:
    """Basic TCP connection to Redis server."""
    
    def __init__(
        self,
        *,
        host: str = "localhost",
        port: Union[str, int] = 6379,
        db: Union[str, int] = 0,
        password: Optional[str] = None,
        socket_timeout: Optional[float] = None,
        socket_connect_timeout: Optional[float] = None,
        socket_keepalive: bool = False,
        socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
        socket_type: int = 0,
        retry_on_timeout: bool = False,
        encoding: str = "utf-8",
        encoding_errors: str = "strict",
        decode_responses: bool = False,
        parser_class: Type[BaseParser] = DefaultParser,
        socket_read_size: int = 65536,
        health_check_interval: float = 0,
        client_name: Optional[str] = None,
        username: Optional[str] = None,
        encoder_class: Type[Encoder] = Encoder,
    ):
        """
        Initialize TCP connection.
        
        Args:
            host: Redis server hostname
            port: Redis server port
            db: Database number
            password: Authentication password
            socket_timeout: Socket operation timeout
            socket_connect_timeout: Connection establishment timeout
            socket_keepalive: Enable TCP keepalive
            socket_keepalive_options: Keepalive configuration
            socket_type: Socket type
            retry_on_timeout: Retry operations on timeout
            encoding: Response encoding
            encoding_errors: Encoding error handling
            decode_responses: Decode responses to strings
            parser_class: Protocol parser
            socket_read_size: Read buffer size
            health_check_interval: Health check frequency
            client_name: Client name for identification
            username: Authentication username
            encoder_class: Data encoder class
        """
    
    async def connect(self) -> None:
        """Establish connection to Redis server."""
    
    async def disconnect(self) -> None:
        """Close connection to Redis server."""
    
    async def send_command(self, *args, **kwargs) -> Any:
        """
        Send command to Redis server.
        
        Args:
            args: Command arguments
            kwargs: Command options
            
        Returns:
            Command response
        """
    
    async def send_packed_command(self, command: bytes, check_health: bool = True) -> None:
        """
        Send pre-packed command to server.
        
        Args:
            command: Packed command bytes
            check_health: Check connection health before sending
        """
    
    async def can_read(self, timeout: float = 0) -> bool:
        """
        Check if data is available to read.
        
        Args:
            timeout: Timeout for check in seconds
            
        Returns:
            True if data is available
        """
    
    async def read_response(self) -> Any:
        """
        Read and parse response from server.
        
        Returns:
            Parsed response data
        """
    
    def pack_command(self, *args) -> List[bytes]:
        """
        Pack command arguments for transmission.
        
        Args:
            args: Command arguments
            
        Returns:
            List of packed command parts
        """
    
    def pack_commands(self, commands: List[Tuple]) -> List[bytes]:
        """
        Pack multiple commands for pipeline transmission.
        
        Args:
            commands: List of command tuples
            
        Returns:
            List of packed command bytes
        """
    
    async def check_health(self) -> None:
        """Check and maintain connection health."""
    
    def register_connect_callback(self, callback: Callable) -> None:
        """
        Register callback for connection events.
        
        Args:
            callback: Function to call on connection
        """
    
    def clear_connect_callbacks(self) -> None:
        """Clear all connection callbacks."""
    
    @property
    def is_connected(self) -> bool:
        """
        Whether connection is currently active.
        
        Returns:
            True if connected
        """

class SSLConnection(Connection):
    """SSL-enabled TCP connection to Redis server."""
    
    def __init__(
        self,
        ssl_keyfile: Optional[str] = None,
        ssl_certfile: Optional[str] = None,
        ssl_cert_reqs: str = "required",
        ssl_ca_certs: Optional[str] = None,
        ssl_check_hostname: bool = False,
        **kwargs
    ):
        """
        Initialize SSL connection.
        
        Args:
            ssl_keyfile: Path to SSL private key file
            ssl_certfile: Path to SSL certificate file
            ssl_cert_reqs: Certificate requirement level
            ssl_ca_certs: Path to CA certificates file
            ssl_check_hostname: Verify hostname against certificate
            kwargs: Additional connection parameters
        """

class UnixDomainSocketConnection(Connection):
    """Unix domain socket connection to Redis server."""
    
    def __init__(self, path: str = "/var/run/redis/redis.sock", **kwargs):
        """
        Initialize Unix socket connection.
        
        Args:
            path: Path to Redis Unix socket
            kwargs: Additional connection parameters
        """

Protocol Parsers and Encoders

Low-level protocol handling for Redis communication.

class BaseParser:
    """Base class for Redis protocol parsers."""
    
    def on_connect(self, connection: Connection) -> None:
        """
        Handle connection establishment.
        
        Args:
            connection: Active connection instance
        """
    
    def on_disconnect(self) -> None:
        """Handle connection termination."""
    
    async def can_read(self, timeout: float) -> bool:
        """
        Check if data is available for reading.
        
        Args:
            timeout: Timeout in seconds
            
        Returns:
            True if data is available
        """
    
    async def read_response(self) -> Any:
        """
        Read and parse Redis protocol response.
        
        Returns:
            Parsed response value
        """

class PythonParser(BaseParser):
    """Pure Python implementation of Redis protocol parser."""

class HiredisParser(BaseParser):
    """Hiredis-based parser for improved performance."""

class Encoder:
    """Handles encoding and decoding of Redis data."""
    
    def encode(self, value: Any) -> bytes:
        """
        Encode value for Redis transmission.
        
        Args:
            value: Value to encode
            
        Returns:
            Encoded bytes
        """
    
    def decode(self, value: bytes, force: bool = False) -> Any:
        """
        Decode Redis response value.
        
        Args:
            value: Response bytes to decode
            force: Force decoding even if decode_responses=False
            
        Returns:
            Decoded value
        """

Sentinel Support

High availability Redis setup with automatic failover using Redis Sentinel.

class Sentinel:
    """Redis Sentinel client for high availability."""
    
    def __init__(
        self, 
        sentinels: List[Tuple[str, int]], 
        socket_timeout: float = 0.1,
        **kwargs
    ):
        """
        Initialize Sentinel client.
        
        Args:
            sentinels: List of (host, port) tuples for sentinel servers
            socket_timeout: Socket timeout for sentinel connections
            kwargs: Additional connection parameters
        """
    
    def master_for(
        self, 
        service_name: str, 
        redis_class: Type[Redis] = Redis, 
        **kwargs
    ) -> Redis:
        """
        Get Redis client for master server.
        
        Args:
            service_name: Name of Redis service in Sentinel config
            redis_class: Redis client class to use
            kwargs: Additional client parameters
            
        Returns:
            Redis client connected to current master
        """
    
    def slave_for(
        self, 
        service_name: str, 
        redis_class: Type[Redis] = Redis, 
        **kwargs
    ) -> Redis:
        """
        Get Redis client for slave server.
        
        Args:
            service_name: Name of Redis service in Sentinel config
            redis_class: Redis client class to use
            kwargs: Additional client parameters
            
        Returns:
            Redis client connected to a slave
        """
    
    async def discover_master(self, service_name: str) -> Tuple[str, int]:
        """
        Discover master server address.
        
        Args:
            service_name: Service name to discover
            
        Returns:
            Tuple of (host, port) for master server
        """
    
    async def discover_slaves(self, service_name: str) -> List[Tuple[str, int]]:
        """
        Discover slave server addresses.
        
        Args:
            service_name: Service name to discover
            
        Returns:
            List of (host, port) tuples for slave servers
        """

class SentinelConnectionPool(ConnectionPool):
    """Connection pool managed by Redis Sentinel."""
    
    def __init__(
        self, 
        service_name: str, 
        sentinel_manager: Sentinel, 
        **kwargs
    ):
        """
        Initialize Sentinel-managed connection pool.
        
        Args:
            service_name: Redis service name
            sentinel_manager: Sentinel client instance
            kwargs: Additional pool parameters
        """

class SentinelManagedConnection(Connection):
    """Connection managed by Redis Sentinel with automatic failover."""

Usage Examples

Basic Connection Setup

async def basic_connection_examples():
    # Simple connection
    redis = aioredis.Redis(
        host='localhost',
        port=6379,
        db=0,
        decode_responses=True
    )
    
    # Test connection
    pong = await redis.ping()
    print(f"Connected: {pong}")  # True
    
    # Connection with authentication
    redis_auth = aioredis.Redis(
        host='redis.example.com',
        port=6380,
        password='secretpassword',
        username='myuser',  # Redis 6+
        db=1
    )
    
    # Connection with timeouts
    redis_timeouts = aioredis.Redis(
        host='localhost',
        port=6379,
        socket_connect_timeout=5,  # 5 seconds to connect
        socket_timeout=3,          # 3 seconds per operation
        retry_on_timeout=True
    )
    
    # SSL connection
    redis_ssl = aioredis.Redis(
        host='secure-redis.example.com',
        port=6380,
        ssl=True,
        ssl_certfile='/path/to/client.crt',
        ssl_keyfile='/path/to/client.key',
        ssl_ca_certs='/path/to/ca.crt'
    )
    
    # Unix socket connection
    redis_unix = aioredis.Redis(
        unix_socket_path='/var/run/redis/redis.sock',
        db=0
    )
    
    await redis.aclose()

URL-Based Configuration

async def url_connection_examples():
    # Basic URL
    redis = aioredis.from_url("redis://localhost:6379/0")
    
    # URL with authentication
    redis_auth = aioredis.from_url(
        "redis://myuser:mypassword@redis.example.com:6380/1"
    )
    
    # SSL URL
    redis_ssl = aioredis.from_url(
        "rediss://user:pass@secure-redis.com:6380/0",
        ssl_cert_reqs="required",
        ssl_ca_certs="/path/to/ca.crt"
    )
    
    # Unix socket URL
    redis_unix = aioredis.from_url(
        "unix:///var/run/redis/redis.sock?db=0"
    )
    
    # URL with additional parameters
    redis_custom = aioredis.from_url(
        "redis://localhost:6379/0",
        socket_timeout=5,
        socket_connect_timeout=10,
        decode_responses=True,
        health_check_interval=30
    )
    
    await redis.aclose()

Connection Pooling

async def connection_pool_examples():
    # Create custom connection pool
    pool = aioredis.ConnectionPool(
        host='localhost',
        port=6379,
        max_connections=100,
        retry_on_timeout=True,
        socket_keepalive=True
    )
    
    # Use pool with Redis client
    redis = aioredis.Redis(connection_pool=pool)
    
    # Blocking pool with timeout
    blocking_pool = aioredis.BlockingConnectionPool(
        host='localhost',
        port=6379,
        max_connections=20,
        timeout=10  # Wait up to 10 seconds for available connection
    )
    
    redis_blocking = aioredis.Redis(connection_pool=blocking_pool)
    
    # Pool from URL
    pool_from_url = aioredis.ConnectionPool.from_url(
        "redis://localhost:6379/0",
        max_connections=50
    )
    
    redis_url_pool = aioredis.Redis(connection_pool=pool_from_url)
    
    # Multiple clients sharing pool
    redis1 = aioredis.Redis(connection_pool=pool)
    redis2 = aioredis.Redis(connection_pool=pool)
    redis3 = aioredis.Redis(connection_pool=pool)
    
    # Perform operations (connections automatically managed)
    await redis1.set('key1', 'value1')
    await redis2.set('key2', 'value2') 
    await redis3.set('key3', 'value3')
    
    # Cleanup
    await pool.disconnect()
    await blocking_pool.disconnect()
    await pool_from_url.disconnect()

High Availability with Sentinel

async def sentinel_examples():
    # Configure Sentinel
    sentinel = aioredis.Sentinel([
        ('sentinel1.example.com', 26379),
        ('sentinel2.example.com', 26379),
        ('sentinel3.example.com', 26379)
    ])
    
    # Get master client
    master = sentinel.master_for('mymaster', socket_timeout=0.1)
    
    # Get slave client for read operations
    slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
    
    # Write to master
    await master.set('key', 'value')
    
    # Read from slave (may have replication lag)
    value = await slave.get('key')
    
    # Sentinel can automatically handle failover
    # If master goes down, Sentinel will promote a slave
    
    # Manual master discovery
    master_address = await sentinel.discover_master('mymaster')
    print(f"Current master: {master_address}")
    
    slave_addresses = await sentinel.discover_slaves('mymaster')
    print(f"Available slaves: {slave_addresses}")
    
    await master.aclose()
    await slave.aclose()

Connection Health and Monitoring

async def connection_health_examples():
    # Connection with health checking
    redis = aioredis.Redis(
        host='localhost',
        port=6379,
        health_check_interval=30,  # Check every 30 seconds
        socket_keepalive=True,
        socket_keepalive_options={
            1: 1,      # TCP_KEEPIDLE
            2: 3,      # TCP_KEEPINTVL  
            3: 5       # TCP_KEEPCNT
        }
    )
    
    # Manual health check
    try:
        await redis.ping()
        print("Connection healthy")
    except aioredis.ConnectionError:
        print("Connection failed")
    
    # Echo test
    echo_result = await redis.echo("Hello Redis")
    print(f"Echo: {echo_result}")
    
    # Connection info (if supported)
    try:
        client_info = await redis.client_list()
        print(f"Connected clients: {len(client_info)}")
    except Exception:
        pass
    
    await redis.aclose()

Error Handling and Reconnection

async def error_handling_examples():
    redis = aioredis.Redis(
        host='localhost',
        port=6379,
        retry_on_timeout=True,
        socket_timeout=5
    )
    
    async def robust_operation(key, value):
        max_retries = 3
        retry_count = 0
        
        while retry_count < max_retries:
            try:
                await redis.set(key, value)
                print(f"Successfully set {key} = {value}")
                return
            
            except aioredis.TimeoutError:
                retry_count += 1
                print(f"Timeout, retry {retry_count}/{max_retries}")
                if retry_count < max_retries:
                    await asyncio.sleep(1)
                else:
                    raise
            
            except aioredis.ConnectionError as e:
                print(f"Connection error: {e}")
                # Connection will be automatically recreated on next operation
                retry_count += 1
                if retry_count < max_retries:
                    await asyncio.sleep(2)
                else:
                    raise
    
    # Test robust operation
    try:
        await robust_operation('test_key', 'test_value')
    except Exception as e:
        print(f"Operation failed after retries: {e}")
    
    await redis.aclose()

Performance Optimization

async def performance_optimization_examples():
    # Optimized connection settings
    redis = aioredis.Redis(
        host='localhost',
        port=6379,
        decode_responses=True,
        socket_read_size=65536,  # 64KB read buffer
        socket_keepalive=True,
        health_check_interval=0,  # Disable health checks for max performance
    )
    
    # Use connection pooling for high concurrency
    pool = aioredis.ConnectionPool(
        host='localhost',
        port=6379,
        max_connections=100,  # Tune based on load
        socket_read_size=131072,  # 128KB for high throughput
    )
    
    redis_pooled = aioredis.Redis(connection_pool=pool)
    
    # Single connection client for sequential operations
    redis_single = aioredis.Redis(
        host='localhost',
        port=6379,
        single_connection_client=True  # Reuse single connection
    )
    
    # Benchmark different configurations
    import time
    
    async def benchmark_operations(client, name, count=1000):
        start_time = time.time()
        
        for i in range(count):
            await client.set(f'benchmark:{name}:{i}', f'value_{i}')
        
        elapsed = time.time() - start_time
        print(f"{name}: {count} operations in {elapsed:.2f}s ({count/elapsed:.0f} ops/sec)")
    
    await benchmark_operations(redis, "default", 1000)
    await benchmark_operations(redis_pooled, "pooled", 1000)
    await benchmark_operations(redis_single, "single_conn", 1000)
    
    await redis.aclose()
    await pool.disconnect()
    await redis_single.aclose()

Install with Tessl CLI

npx tessl i tessl/pypi-aioredis

docs

advanced-features.md

basic-operations.md

connection-management.md

data-structures.md

index.md

server-admin.md

tile.json