asyncio (PEP 3156) Redis support
Overall
score
98%
Connection and pooling functionality for managing Redis connections efficiently. This includes URL-based configuration, SSL connections, Unix domain sockets, connection pooling strategies, and connection lifecycle management for optimal performance and reliability.
Multiple ways to create Redis client instances with flexible configuration options.
class Redis:
"""Main Redis client class with async/await support."""
def __init__(
self,
*,
host: str = "localhost",
port: int = 6379,
db: Union[str, int] = 0,
password: Optional[str] = None,
socket_timeout: Optional[float] = None,
socket_connect_timeout: Optional[float] = None,
socket_keepalive: Optional[bool] = None,
socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
connection_pool: Optional[ConnectionPool] = None,
unix_socket_path: Optional[str] = None,
encoding: str = "utf-8",
encoding_errors: str = "strict",
decode_responses: bool = False,
retry_on_timeout: bool = False,
ssl: bool = False,
ssl_keyfile: Optional[str] = None,
ssl_certfile: Optional[str] = None,
ssl_cert_reqs: str = "required",
ssl_ca_certs: Optional[str] = None,
ssl_check_hostname: bool = False,
max_connections: Optional[int] = None,
single_connection_client: bool = False,
health_check_interval: int = 0,
client_name: Optional[str] = None,
username: Optional[str] = None,
):
"""
Initialize Redis client.
Args:
host: Redis server hostname
port: Redis server port
db: Database number (0-15)
password: Authentication password
socket_timeout: Socket operation timeout in seconds
socket_connect_timeout: Socket connection timeout in seconds
socket_keepalive: Enable TCP keepalive (None uses system default)
socket_keepalive_options: TCP keepalive options
connection_pool: Custom connection pool
unix_socket_path: Unix domain socket path
encoding: String encoding for responses
encoding_errors: Encoding error handling
decode_responses: Decode byte responses to strings
retry_on_timeout: Retry commands on timeout
ssl: Enable SSL/TLS encryption
ssl_keyfile: SSL private key file path
ssl_certfile: SSL certificate file path
ssl_cert_reqs: SSL certificate requirements
ssl_ca_certs: SSL CA certificates file path
ssl_check_hostname: Verify SSL hostname
max_connections: Maximum pool connections
single_connection_client: Use single persistent connection
health_check_interval: Health check interval in seconds
client_name: Client identification name
username: Authentication username (Redis 6+)
"""
@classmethod
def from_url(cls, url: str, **kwargs) -> "Redis":
"""
Create Redis client from connection URL.
Args:
url: Redis connection URL (redis://[[username]:password@]host[:port][/database])
kwargs: Additional connection parameters
Returns:
Configured Redis client instance
"""
async def ping(self) -> bool:
"""
Test connection to Redis server.
Returns:
True if server responds to ping
"""
async def echo(self, value: str) -> str:
"""
Echo message back from server.
Args:
value: Message to echo
Returns:
Echoed message
"""
async def aclose(self) -> None:
"""Close all connections and cleanup resources."""
def close(self) -> None:
"""Synchronous connection cleanup (use aclose() in async context)."""
def from_url(url: str, **kwargs) -> Redis:
"""
Create Redis client from connection URL.
Args:
url: Redis connection URL
kwargs: Additional connection parameters
Returns:
Configured Redis client instance
"""Connection pooling for efficient connection reuse and resource management.
class ConnectionPool:
"""Connection pool for managing Redis connections."""
def __init__(
self,
connection_class: Type[Connection] = Connection,
max_connections: int = 50,
**connection_kwargs
):
"""
Initialize connection pool.
Args:
connection_class: Connection class to use
max_connections: Maximum number of connections
connection_kwargs: Arguments for connection creation
"""
@classmethod
def from_url(cls, url: str, **kwargs) -> "ConnectionPool":
"""
Create connection pool from URL.
Args:
url: Redis connection URL
kwargs: Additional pool parameters
Returns:
Configured connection pool
"""
async def get_connection(self, command_name: str, *keys, **options) -> Connection:
"""
Get connection from pool.
Args:
command_name: Redis command being executed
keys: Command keys (for sharding)
options: Command options
Returns:
Available connection instance
"""
def make_connection(self) -> Connection:
"""
Create new connection instance.
Returns:
New connection (not from pool)
"""
async def release(self, connection: Connection) -> None:
"""
Return connection to pool.
Args:
connection: Connection to return
"""
async def disconnect(self, inuse_connections: bool = True) -> None:
"""
Disconnect all pool connections.
Args:
inuse_connections: Whether to disconnect active connections
"""
def reset(self) -> None:
"""Reset pool state, clearing all connections."""
def get_encoder(self) -> Encoder:
"""
Get connection encoder instance.
Returns:
Encoder for data serialization
"""
def owns_connection(self, connection: Connection) -> bool:
"""
Check if pool owns connection.
Args:
connection: Connection to check
Returns:
True if connection belongs to this pool
"""
class BlockingConnectionPool(ConnectionPool):
"""Connection pool that blocks when max connections reached."""
def __init__(
self,
max_connections: int = 50,
timeout: float = 20,
**connection_kwargs
):
"""
Initialize blocking connection pool.
Args:
max_connections: Maximum number of connections
timeout: Timeout when waiting for available connection
connection_kwargs: Arguments for connection creation
"""Different connection implementations for various network configurations.
class Connection:
"""Basic TCP connection to Redis server."""
def __init__(
self,
*,
host: str = "localhost",
port: Union[str, int] = 6379,
db: Union[str, int] = 0,
password: Optional[str] = None,
socket_timeout: Optional[float] = None,
socket_connect_timeout: Optional[float] = None,
socket_keepalive: bool = False,
socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
socket_type: int = 0,
retry_on_timeout: bool = False,
encoding: str = "utf-8",
encoding_errors: str = "strict",
decode_responses: bool = False,
parser_class: Type[BaseParser] = DefaultParser,
socket_read_size: int = 65536,
health_check_interval: float = 0,
client_name: Optional[str] = None,
username: Optional[str] = None,
encoder_class: Type[Encoder] = Encoder,
):
"""
Initialize TCP connection.
Args:
host: Redis server hostname
port: Redis server port
db: Database number
password: Authentication password
socket_timeout: Socket operation timeout
socket_connect_timeout: Connection establishment timeout
socket_keepalive: Enable TCP keepalive
socket_keepalive_options: Keepalive configuration
socket_type: Socket type
retry_on_timeout: Retry operations on timeout
encoding: Response encoding
encoding_errors: Encoding error handling
decode_responses: Decode responses to strings
parser_class: Protocol parser
socket_read_size: Read buffer size
health_check_interval: Health check frequency
client_name: Client name for identification
username: Authentication username
encoder_class: Data encoder class
"""
async def connect(self) -> None:
"""Establish connection to Redis server."""
async def disconnect(self) -> None:
"""Close connection to Redis server."""
async def send_command(self, *args, **kwargs) -> Any:
"""
Send command to Redis server.
Args:
args: Command arguments
kwargs: Command options
Returns:
Command response
"""
async def send_packed_command(self, command: bytes, check_health: bool = True) -> None:
"""
Send pre-packed command to server.
Args:
command: Packed command bytes
check_health: Check connection health before sending
"""
async def can_read(self, timeout: float = 0) -> bool:
"""
Check if data is available to read.
Args:
timeout: Timeout for check in seconds
Returns:
True if data is available
"""
async def read_response(self) -> Any:
"""
Read and parse response from server.
Returns:
Parsed response data
"""
def pack_command(self, *args) -> List[bytes]:
"""
Pack command arguments for transmission.
Args:
args: Command arguments
Returns:
List of packed command parts
"""
def pack_commands(self, commands: List[Tuple]) -> List[bytes]:
"""
Pack multiple commands for pipeline transmission.
Args:
commands: List of command tuples
Returns:
List of packed command bytes
"""
async def check_health(self) -> None:
"""Check and maintain connection health."""
def register_connect_callback(self, callback: Callable) -> None:
"""
Register callback for connection events.
Args:
callback: Function to call on connection
"""
def clear_connect_callbacks(self) -> None:
"""Clear all connection callbacks."""
@property
def is_connected(self) -> bool:
"""
Whether connection is currently active.
Returns:
True if connected
"""
class SSLConnection(Connection):
"""SSL-enabled TCP connection to Redis server."""
def __init__(
self,
ssl_keyfile: Optional[str] = None,
ssl_certfile: Optional[str] = None,
ssl_cert_reqs: str = "required",
ssl_ca_certs: Optional[str] = None,
ssl_check_hostname: bool = False,
**kwargs
):
"""
Initialize SSL connection.
Args:
ssl_keyfile: Path to SSL private key file
ssl_certfile: Path to SSL certificate file
ssl_cert_reqs: Certificate requirement level
ssl_ca_certs: Path to CA certificates file
ssl_check_hostname: Verify hostname against certificate
kwargs: Additional connection parameters
"""
class UnixDomainSocketConnection(Connection):
"""Unix domain socket connection to Redis server."""
def __init__(self, path: str = "/var/run/redis/redis.sock", **kwargs):
"""
Initialize Unix socket connection.
Args:
path: Path to Redis Unix socket
kwargs: Additional connection parameters
"""Low-level protocol handling for Redis communication.
class BaseParser:
"""Base class for Redis protocol parsers."""
def on_connect(self, connection: Connection) -> None:
"""
Handle connection establishment.
Args:
connection: Active connection instance
"""
def on_disconnect(self) -> None:
"""Handle connection termination."""
async def can_read(self, timeout: float) -> bool:
"""
Check if data is available for reading.
Args:
timeout: Timeout in seconds
Returns:
True if data is available
"""
async def read_response(self) -> Any:
"""
Read and parse Redis protocol response.
Returns:
Parsed response value
"""
class PythonParser(BaseParser):
"""Pure Python implementation of Redis protocol parser."""
class HiredisParser(BaseParser):
"""Hiredis-based parser for improved performance."""
class Encoder:
"""Handles encoding and decoding of Redis data."""
def encode(self, value: Any) -> bytes:
"""
Encode value for Redis transmission.
Args:
value: Value to encode
Returns:
Encoded bytes
"""
def decode(self, value: bytes, force: bool = False) -> Any:
"""
Decode Redis response value.
Args:
value: Response bytes to decode
force: Force decoding even if decode_responses=False
Returns:
Decoded value
"""High availability Redis setup with automatic failover using Redis Sentinel.
class Sentinel:
"""Redis Sentinel client for high availability."""
def __init__(
self,
sentinels: List[Tuple[str, int]],
socket_timeout: float = 0.1,
**kwargs
):
"""
Initialize Sentinel client.
Args:
sentinels: List of (host, port) tuples for sentinel servers
socket_timeout: Socket timeout for sentinel connections
kwargs: Additional connection parameters
"""
def master_for(
self,
service_name: str,
redis_class: Type[Redis] = Redis,
**kwargs
) -> Redis:
"""
Get Redis client for master server.
Args:
service_name: Name of Redis service in Sentinel config
redis_class: Redis client class to use
kwargs: Additional client parameters
Returns:
Redis client connected to current master
"""
def slave_for(
self,
service_name: str,
redis_class: Type[Redis] = Redis,
**kwargs
) -> Redis:
"""
Get Redis client for slave server.
Args:
service_name: Name of Redis service in Sentinel config
redis_class: Redis client class to use
kwargs: Additional client parameters
Returns:
Redis client connected to a slave
"""
async def discover_master(self, service_name: str) -> Tuple[str, int]:
"""
Discover master server address.
Args:
service_name: Service name to discover
Returns:
Tuple of (host, port) for master server
"""
async def discover_slaves(self, service_name: str) -> List[Tuple[str, int]]:
"""
Discover slave server addresses.
Args:
service_name: Service name to discover
Returns:
List of (host, port) tuples for slave servers
"""
class SentinelConnectionPool(ConnectionPool):
"""Connection pool managed by Redis Sentinel."""
def __init__(
self,
service_name: str,
sentinel_manager: Sentinel,
**kwargs
):
"""
Initialize Sentinel-managed connection pool.
Args:
service_name: Redis service name
sentinel_manager: Sentinel client instance
kwargs: Additional pool parameters
"""
class SentinelManagedConnection(Connection):
"""Connection managed by Redis Sentinel with automatic failover."""async def basic_connection_examples():
# Simple connection
redis = aioredis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True
)
# Test connection
pong = await redis.ping()
print(f"Connected: {pong}") # True
# Connection with authentication
redis_auth = aioredis.Redis(
host='redis.example.com',
port=6380,
password='secretpassword',
username='myuser', # Redis 6+
db=1
)
# Connection with timeouts
redis_timeouts = aioredis.Redis(
host='localhost',
port=6379,
socket_connect_timeout=5, # 5 seconds to connect
socket_timeout=3, # 3 seconds per operation
retry_on_timeout=True
)
# SSL connection
redis_ssl = aioredis.Redis(
host='secure-redis.example.com',
port=6380,
ssl=True,
ssl_certfile='/path/to/client.crt',
ssl_keyfile='/path/to/client.key',
ssl_ca_certs='/path/to/ca.crt'
)
# Unix socket connection
redis_unix = aioredis.Redis(
unix_socket_path='/var/run/redis/redis.sock',
db=0
)
await redis.aclose()async def url_connection_examples():
# Basic URL
redis = aioredis.from_url("redis://localhost:6379/0")
# URL with authentication
redis_auth = aioredis.from_url(
"redis://myuser:mypassword@redis.example.com:6380/1"
)
# SSL URL
redis_ssl = aioredis.from_url(
"rediss://user:pass@secure-redis.com:6380/0",
ssl_cert_reqs="required",
ssl_ca_certs="/path/to/ca.crt"
)
# Unix socket URL
redis_unix = aioredis.from_url(
"unix:///var/run/redis/redis.sock?db=0"
)
# URL with additional parameters
redis_custom = aioredis.from_url(
"redis://localhost:6379/0",
socket_timeout=5,
socket_connect_timeout=10,
decode_responses=True,
health_check_interval=30
)
await redis.aclose()async def connection_pool_examples():
# Create custom connection pool
pool = aioredis.ConnectionPool(
host='localhost',
port=6379,
max_connections=100,
retry_on_timeout=True,
socket_keepalive=True
)
# Use pool with Redis client
redis = aioredis.Redis(connection_pool=pool)
# Blocking pool with timeout
blocking_pool = aioredis.BlockingConnectionPool(
host='localhost',
port=6379,
max_connections=20,
timeout=10 # Wait up to 10 seconds for available connection
)
redis_blocking = aioredis.Redis(connection_pool=blocking_pool)
# Pool from URL
pool_from_url = aioredis.ConnectionPool.from_url(
"redis://localhost:6379/0",
max_connections=50
)
redis_url_pool = aioredis.Redis(connection_pool=pool_from_url)
# Multiple clients sharing pool
redis1 = aioredis.Redis(connection_pool=pool)
redis2 = aioredis.Redis(connection_pool=pool)
redis3 = aioredis.Redis(connection_pool=pool)
# Perform operations (connections automatically managed)
await redis1.set('key1', 'value1')
await redis2.set('key2', 'value2')
await redis3.set('key3', 'value3')
# Cleanup
await pool.disconnect()
await blocking_pool.disconnect()
await pool_from_url.disconnect()async def sentinel_examples():
# Configure Sentinel
sentinel = aioredis.Sentinel([
('sentinel1.example.com', 26379),
('sentinel2.example.com', 26379),
('sentinel3.example.com', 26379)
])
# Get master client
master = sentinel.master_for('mymaster', socket_timeout=0.1)
# Get slave client for read operations
slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
# Write to master
await master.set('key', 'value')
# Read from slave (may have replication lag)
value = await slave.get('key')
# Sentinel can automatically handle failover
# If master goes down, Sentinel will promote a slave
# Manual master discovery
master_address = await sentinel.discover_master('mymaster')
print(f"Current master: {master_address}")
slave_addresses = await sentinel.discover_slaves('mymaster')
print(f"Available slaves: {slave_addresses}")
await master.aclose()
await slave.aclose()async def connection_health_examples():
# Connection with health checking
redis = aioredis.Redis(
host='localhost',
port=6379,
health_check_interval=30, # Check every 30 seconds
socket_keepalive=True,
socket_keepalive_options={
1: 1, # TCP_KEEPIDLE
2: 3, # TCP_KEEPINTVL
3: 5 # TCP_KEEPCNT
}
)
# Manual health check
try:
await redis.ping()
print("Connection healthy")
except aioredis.ConnectionError:
print("Connection failed")
# Echo test
echo_result = await redis.echo("Hello Redis")
print(f"Echo: {echo_result}")
# Connection info (if supported)
try:
client_info = await redis.client_list()
print(f"Connected clients: {len(client_info)}")
except Exception:
pass
await redis.aclose()async def error_handling_examples():
redis = aioredis.Redis(
host='localhost',
port=6379,
retry_on_timeout=True,
socket_timeout=5
)
async def robust_operation(key, value):
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
await redis.set(key, value)
print(f"Successfully set {key} = {value}")
return
except aioredis.TimeoutError:
retry_count += 1
print(f"Timeout, retry {retry_count}/{max_retries}")
if retry_count < max_retries:
await asyncio.sleep(1)
else:
raise
except aioredis.ConnectionError as e:
print(f"Connection error: {e}")
# Connection will be automatically recreated on next operation
retry_count += 1
if retry_count < max_retries:
await asyncio.sleep(2)
else:
raise
# Test robust operation
try:
await robust_operation('test_key', 'test_value')
except Exception as e:
print(f"Operation failed after retries: {e}")
await redis.aclose()async def performance_optimization_examples():
# Optimized connection settings
redis = aioredis.Redis(
host='localhost',
port=6379,
decode_responses=True,
socket_read_size=65536, # 64KB read buffer
socket_keepalive=True,
health_check_interval=0, # Disable health checks for max performance
)
# Use connection pooling for high concurrency
pool = aioredis.ConnectionPool(
host='localhost',
port=6379,
max_connections=100, # Tune based on load
socket_read_size=131072, # 128KB for high throughput
)
redis_pooled = aioredis.Redis(connection_pool=pool)
# Single connection client for sequential operations
redis_single = aioredis.Redis(
host='localhost',
port=6379,
single_connection_client=True # Reuse single connection
)
# Benchmark different configurations
import time
async def benchmark_operations(client, name, count=1000):
start_time = time.time()
for i in range(count):
await client.set(f'benchmark:{name}:{i}', f'value_{i}')
elapsed = time.time() - start_time
print(f"{name}: {count} operations in {elapsed:.2f}s ({count/elapsed:.0f} ops/sec)")
await benchmark_operations(redis, "default", 1000)
await benchmark_operations(redis_pooled, "pooled", 1000)
await benchmark_operations(redis_single, "single_conn", 1000)
await redis.aclose()
await pool.disconnect()
await redis_single.aclose()Install with Tessl CLI
npx tessl i tessl/pypi-aioredisdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10