Python client for Redis database and key-value store
npx @tessl/cli install tessl/pypi-redis@6.4.0A comprehensive Python client for Redis, the popular in-memory data structure store used as database, cache, and message broker. Provides both synchronous and asynchronous APIs with full support for Redis features including clustering, high availability, pub/sub messaging, transactions, pipelining, and Redis modules.
pip install redispip install "redis[hiredis]" for improved performanceimport redisCommon usage patterns:
# Main Redis client
from redis import Redis
# Cluster client
from redis import RedisCluster
# Async clients
from redis.asyncio import Redis as AsyncRedis
from redis.asyncio import RedisCluster as AsyncRedisCluster
# High availability
from redis.sentinel import Sentinel
# Connection management
from redis import ConnectionPool, Connection
# Utilities
from redis import from_urlimport redis
# Connect to Redis server
r = redis.Redis(host='localhost', port=6379, db=0)
# Basic key-value operations
r.set('key', 'value')
value = r.get('key') # Returns b'value'
# Decoded strings (recommended)
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
r.set('key', 'value')
value = r.get('key') # Returns 'value'
# Connection from URL
r = redis.from_url('redis://localhost:6379/0')
# Pipeline for batching commands
with r.pipeline() as pipe:
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.get('key1')
results = pipe.execute() # [True, True, 'value1']
# Pub/Sub messaging
pubsub = r.pubsub()
pubsub.subscribe('channel')
for message in pubsub.listen():
print(message)The Redis client follows a modular architecture:
Main Redis client with full command support, connection pooling, and configuration options. Supports all Redis data types and operations including strings, lists, sets, hashes, streams, and geospatial commands.
class Redis:
def __init__(
self,
host: str = "localhost",
port: int = 6379,
db: int = 0,
password: Optional[str] = None,
socket_timeout: Optional[float] = None,
decode_responses: bool = False,
**kwargs
): ...
@classmethod
def from_url(cls, url: str, **kwargs) -> "Redis": ...
def pipeline(self, transaction: bool = True) -> "Pipeline": ...
def pubsub(self, **kwargs) -> "PubSub": ...
def lock(self, name: str, timeout: Optional[float] = None) -> "Lock": ...Redis Cluster client for distributed Redis deployments with automatic sharding and node discovery.
class RedisCluster:
def __init__(
self,
host: Optional[str] = None,
port: int = 7000,
startup_nodes: Optional[List[ClusterNode]] = None,
**kwargs
): ...
@classmethod
def from_url(cls, url: str, **kwargs) -> "RedisCluster": ...Connection pools, connection types, and connection configuration for managing Redis connections efficiently.
class ConnectionPool:
def __init__(
self,
connection_class: Type[Connection] = Connection,
max_connections: Optional[int] = None,
**connection_kwargs
): ...
class Connection:
def __init__(
self,
host: str = "localhost",
port: int = 6379,
db: int = 0,
**kwargs
): ...Full asynchronous API using asyncio, providing async versions of all Redis operations with identical interfaces to synchronous clients.
class Redis: # Async version
async def get(self, name: str) -> Optional[bytes]: ...
async def set(self, name: str, value: Any, **kwargs) -> bool: ...
def pipeline(self, transaction: bool = True) -> "Pipeline": ...Redis Sentinel integration for automatic master/slave discovery and failover in high availability deployments.
class Sentinel:
def __init__(
self,
sentinels: List[Tuple[str, int]],
**kwargs
): ...
def master_for(self, service_name: str, **kwargs) -> Redis: ...
def slave_for(self, service_name: str, **kwargs) -> Redis: ...Command batching with pipelines and atomic transactions with MULTI/EXEC support, including optimistic locking with WATCH.
class Pipeline:
def execute(self, raise_on_error: bool = True) -> List[Any]: ...
def watch(self, *names: str) -> bool: ...
def multi(self) -> None: ...
def discard(self) -> None: ...Publish/subscribe messaging system for real-time communication between Redis clients.
class PubSub:
def subscribe(self, *args, **kwargs) -> None: ...
def unsubscribe(self, *args) -> None: ...
def psubscribe(self, *args, **kwargs) -> None: ...
def listen(self) -> Iterator[Dict[str, Any]]: ...
def get_message(self, ignore_subscribe_messages: bool = False) -> Optional[Dict[str, Any]]: ...Distributed lock implementation for coordinating access to shared resources across multiple Redis clients.
class Lock:
def acquire(self, blocking: bool = True, timeout: Optional[float] = None) -> bool: ...
def release(self) -> None: ...
def extend(self, additional_time: float) -> bool: ...Comprehensive exception hierarchy for handling Redis-specific errors and connection issues.
class RedisError(Exception): ...
class ConnectionError(RedisError): ...
class TimeoutError(RedisError): ...
class ResponseError(RedisError): ...
class AuthenticationError(ConnectionError): ...
class RedisClusterException(RedisError): ...Extended functionality through Redis modules providing advanced data structures and capabilities.
Native JSON data type support with path-based operations for efficient document storage and manipulation. Access via redis_client.json().
# JSON client obtained via r.json()
class JSON:
def set(self, name: str, path: str, obj: Any, **kwargs) -> Optional[str]: ...
def get(self, name: str, *args: str, **kwargs) -> Optional[List[Any]]: ...
def delete(self, name: str, path: str = "$") -> Optional[int]: ...
def arrappend(self, name: str, path: str, *args: Any) -> List[Optional[int]]: ...
def numincrby(self, name: str, path: str, number: int) -> str: ...Full-text search, secondary indexing, and query capabilities with aggregations and auto-complete.
def ft_create(self, index_name: str, schema: Dict[str, Any], **kwargs) -> str: ...
def ft_search(self, index_name: str, query: str, **kwargs) -> Dict[str, Any]: ...
def ft_aggregate(self, index_name: str, query: str, *args: Any) -> List[Any]: ...
def ft_sugadd(self, key: str, string: str, score: float, **kwargs) -> int: ...
def ft_sugget(self, key: str, prefix: str, **kwargs) -> List[Any]: ...Time series data structure for efficient storage and analysis of time-stamped data with downsampling and aggregations.
def ts_create(self, key: str, **kwargs) -> str: ...
def ts_add(self, key: str, timestamp: Union[int, str], value: float, **kwargs) -> int: ...
def ts_get(self, key: str) -> Optional[Tuple[int, float]]: ...
def ts_range(self, key: str, from_timestamp: Union[int, str], to_timestamp: Union[int, str], **kwargs) -> List[Tuple[int, float]]: ...
def ts_mrange(self, from_timestamp: Union[int, str], to_timestamp: Union[int, str], filters: List[str], **kwargs) -> List[Dict[str, Any]]: ...Probabilistic data structures including Bloom filters, Cuckoo filters, Count-Min Sketch, and Top-K for memory-efficient approximate queries.
def bf_reserve(self, key: str, error_rate: float, capacity: int) -> str: ...
def bf_add(self, key: str, item: str) -> bool: ...
def bf_exists(self, key: str, item: str) -> bool: ...
def cf_reserve(self, key: str, capacity: int, **kwargs) -> str: ...
def cms_initbyprob(self, key: str, error: float, probability: float) -> str: ...
def topk_reserve(self, key: str, k: int, width: int, depth: int, decay: float) -> str: ...from typing import Any, Dict, List, Optional, Union, Tuple, Iterator, Type
# Key types
KeyT = Union[str, bytes]
FieldT = Union[str, bytes]
EncodableT = Union[bytes, float, int, str]
ExpiryT = Union[int, datetime.timedelta]
# Connection types
ConnectionT = Union[Connection, SSLConnection, UnixDomainSocketConnection]
# Cluster types
class ClusterNode:
def __init__(self, host: str, port: int, server_type: Optional[str] = None): ...
host: str
port: int
# Credential types
class CredentialProvider:
def get_credentials(self) -> Tuple[str, str]: ...