or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

modules

redis-bloom.mdredis-json.mdredis-search.mdredis-timeseries.md
async-support.mdcluster-support.mdconnection-management.mdcore-client.mddistributed-locking.mderror-handling.mdhigh-availability.mdindex.mdpipelines-transactions.mdpubsub-messaging.md
tile.json

tessl/pypi-redis

Python client for Redis database and key-value store

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/redis@6.4.x

To install, run

npx @tessl/cli install tessl/pypi-redis@6.4.0

index.mddocs/

Redis Python Client

A comprehensive Python client for Redis, the popular in-memory data structure store used as database, cache, and message broker. Provides both synchronous and asynchronous APIs with full support for Redis features including clustering, high availability, pub/sub messaging, transactions, pipelining, and Redis modules.

Package Information

  • Package Name: redis
  • Language: Python
  • Installation: pip install redis
  • Optional Dependencies: pip install "redis[hiredis]" for improved performance

Core Imports

import redis

Common usage patterns:

# Main Redis client
from redis import Redis

# Cluster client
from redis import RedisCluster

# Async clients
from redis.asyncio import Redis as AsyncRedis
from redis.asyncio import RedisCluster as AsyncRedisCluster

# High availability
from redis.sentinel import Sentinel

# Connection management
from redis import ConnectionPool, Connection

# Utilities
from redis import from_url

Basic Usage

import redis

# Connect to Redis server
r = redis.Redis(host='localhost', port=6379, db=0)

# Basic key-value operations
r.set('key', 'value')
value = r.get('key')  # Returns b'value'

# Decoded strings (recommended)
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
r.set('key', 'value')
value = r.get('key')  # Returns 'value'

# Connection from URL
r = redis.from_url('redis://localhost:6379/0')

# Pipeline for batching commands
with r.pipeline() as pipe:
    pipe.set('key1', 'value1')
    pipe.set('key2', 'value2')
    pipe.get('key1')
    results = pipe.execute()  # [True, True, 'value1']

# Pub/Sub messaging
pubsub = r.pubsub()
pubsub.subscribe('channel')
for message in pubsub.listen():
    print(message)

Architecture

The Redis client follows a modular architecture:

  • Client Layer: Redis and RedisCluster classes providing the main API
  • Connection Layer: Connection pools and connection management
  • Command Layer: All Redis commands organized by functionality (strings, lists, sets, etc.)
  • Protocol Layer: RESP protocol serialization and parsing
  • Async Layer: Full async/await mirror of synchronous API
  • High Availability: Sentinel support for automatic failover
  • Extensions: Support for Redis modules (JSON, Search, TimeSeries, etc.)

Capabilities

Core Client

Main Redis client with full command support, connection pooling, and configuration options. Supports all Redis data types and operations including strings, lists, sets, hashes, streams, and geospatial commands.

class Redis:
    def __init__(
        self,
        host: str = "localhost",
        port: int = 6379,
        db: int = 0,
        password: Optional[str] = None,
        socket_timeout: Optional[float] = None,
        decode_responses: bool = False,
        **kwargs
    ): ...
    
    @classmethod
    def from_url(cls, url: str, **kwargs) -> "Redis": ...
    
    def pipeline(self, transaction: bool = True) -> "Pipeline": ...
    def pubsub(self, **kwargs) -> "PubSub": ...
    def lock(self, name: str, timeout: Optional[float] = None) -> "Lock": ...

Core Client

Cluster Support

Redis Cluster client for distributed Redis deployments with automatic sharding and node discovery.

class RedisCluster:
    def __init__(
        self,
        host: Optional[str] = None,
        port: int = 7000,
        startup_nodes: Optional[List[ClusterNode]] = None,
        **kwargs
    ): ...
    
    @classmethod
    def from_url(cls, url: str, **kwargs) -> "RedisCluster": ...

Cluster Support

Connection Management

Connection pools, connection types, and connection configuration for managing Redis connections efficiently.

class ConnectionPool:
    def __init__(
        self,
        connection_class: Type[Connection] = Connection,
        max_connections: Optional[int] = None,
        **connection_kwargs
    ): ...

class Connection:
    def __init__(
        self,
        host: str = "localhost", 
        port: int = 6379,
        db: int = 0,
        **kwargs
    ): ...

Connection Management

Async Support

Full asynchronous API using asyncio, providing async versions of all Redis operations with identical interfaces to synchronous clients.

class Redis:  # Async version
    async def get(self, name: str) -> Optional[bytes]: ...
    async def set(self, name: str, value: Any, **kwargs) -> bool: ...
    def pipeline(self, transaction: bool = True) -> "Pipeline": ...

Async Support

High Availability (Sentinel)

Redis Sentinel integration for automatic master/slave discovery and failover in high availability deployments.

class Sentinel:
    def __init__(
        self,
        sentinels: List[Tuple[str, int]],
        **kwargs
    ): ...
    
    def master_for(self, service_name: str, **kwargs) -> Redis: ...
    def slave_for(self, service_name: str, **kwargs) -> Redis: ...

High Availability

Pipelines and Transactions

Command batching with pipelines and atomic transactions with MULTI/EXEC support, including optimistic locking with WATCH.

class Pipeline:
    def execute(self, raise_on_error: bool = True) -> List[Any]: ...
    def watch(self, *names: str) -> bool: ...
    def multi(self) -> None: ...
    def discard(self) -> None: ...

Pipelines and Transactions

Pub/Sub Messaging

Publish/subscribe messaging system for real-time communication between Redis clients.

class PubSub:
    def subscribe(self, *args, **kwargs) -> None: ...
    def unsubscribe(self, *args) -> None: ...
    def psubscribe(self, *args, **kwargs) -> None: ...
    def listen(self) -> Iterator[Dict[str, Any]]: ...
    def get_message(self, ignore_subscribe_messages: bool = False) -> Optional[Dict[str, Any]]: ...

Pub/Sub Messaging

Distributed Locking

Distributed lock implementation for coordinating access to shared resources across multiple Redis clients.

class Lock:
    def acquire(self, blocking: bool = True, timeout: Optional[float] = None) -> bool: ...
    def release(self) -> None: ...
    def extend(self, additional_time: float) -> bool: ...

Distributed Locking

Error Handling

Comprehensive exception hierarchy for handling Redis-specific errors and connection issues.

class RedisError(Exception): ...
class ConnectionError(RedisError): ...
class TimeoutError(RedisError): ...
class ResponseError(RedisError): ...
class AuthenticationError(ConnectionError): ...
class RedisClusterException(RedisError): ...

Error Handling

Redis Modules

Extended functionality through Redis modules providing advanced data structures and capabilities.

RedisJSON

Native JSON data type support with path-based operations for efficient document storage and manipulation. Access via redis_client.json().

# JSON client obtained via r.json()
class JSON:
    def set(self, name: str, path: str, obj: Any, **kwargs) -> Optional[str]: ...
    def get(self, name: str, *args: str, **kwargs) -> Optional[List[Any]]: ...
    def delete(self, name: str, path: str = "$") -> Optional[int]: ...
    def arrappend(self, name: str, path: str, *args: Any) -> List[Optional[int]]: ...
    def numincrby(self, name: str, path: str, number: int) -> str: ...

RedisJSON

RediSearch

Full-text search, secondary indexing, and query capabilities with aggregations and auto-complete.

def ft_create(self, index_name: str, schema: Dict[str, Any], **kwargs) -> str: ...
def ft_search(self, index_name: str, query: str, **kwargs) -> Dict[str, Any]: ...
def ft_aggregate(self, index_name: str, query: str, *args: Any) -> List[Any]: ...
def ft_sugadd(self, key: str, string: str, score: float, **kwargs) -> int: ...
def ft_sugget(self, key: str, prefix: str, **kwargs) -> List[Any]: ...

RediSearch

RedisTimeSeries

Time series data structure for efficient storage and analysis of time-stamped data with downsampling and aggregations.

def ts_create(self, key: str, **kwargs) -> str: ...
def ts_add(self, key: str, timestamp: Union[int, str], value: float, **kwargs) -> int: ...
def ts_get(self, key: str) -> Optional[Tuple[int, float]]: ...
def ts_range(self, key: str, from_timestamp: Union[int, str], to_timestamp: Union[int, str], **kwargs) -> List[Tuple[int, float]]: ...
def ts_mrange(self, from_timestamp: Union[int, str], to_timestamp: Union[int, str], filters: List[str], **kwargs) -> List[Dict[str, Any]]: ...

RedisTimeSeries

RedisBloom

Probabilistic data structures including Bloom filters, Cuckoo filters, Count-Min Sketch, and Top-K for memory-efficient approximate queries.

def bf_reserve(self, key: str, error_rate: float, capacity: int) -> str: ...
def bf_add(self, key: str, item: str) -> bool: ...
def bf_exists(self, key: str, item: str) -> bool: ...
def cf_reserve(self, key: str, capacity: int, **kwargs) -> str: ...
def cms_initbyprob(self, key: str, error: float, probability: float) -> str: ...
def topk_reserve(self, key: str, k: int, width: int, depth: int, decay: float) -> str: ...

RedisBloom

Types

from typing import Any, Dict, List, Optional, Union, Tuple, Iterator, Type

# Key types
KeyT = Union[str, bytes]
FieldT = Union[str, bytes]
EncodableT = Union[bytes, float, int, str]
ExpiryT = Union[int, datetime.timedelta]

# Connection types
ConnectionT = Union[Connection, SSLConnection, UnixDomainSocketConnection]

# Cluster types
class ClusterNode:
    def __init__(self, host: str, port: int, server_type: Optional[str] = None): ...
    host: str
    port: int

# Credential types
class CredentialProvider:
    def get_credentials(self) -> Tuple[str, str]: ...