or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

bitmap-operations.mdcore-clients.mdgeneric-operations.mdgeospatial-operations.mdhash-operations.mdindex.mdlist-operations.mdlua-scripting.mdpubsub-operations.mdserver-management.mdserver-operations.mdset-operations.mdsorted-set-operations.mdstack-extensions.mdstream-operations.mdstring-operations.mdtransaction-operations.mdvalkey-support.md
tile.json

tessl/pypi-fakeredis

Python implementation of redis API, can be used for testing purposes

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/fakeredis@2.31.x

To install, run

npx @tessl/cli install tessl/pypi-fakeredis@2.31.0

index.mddocs/

FakeRedis

A comprehensive Python implementation of the Redis API that provides enhanced versions of the redis-py Python bindings for Redis. It enables running tests requiring Redis, Valkey, DragonflyDB, or KeyDB servers without an actual server. FakeRedis supports all Redis functionality including advanced features such as RedisJSON, RedisBloom, GeoCommands, Time Series, and more.

Package Information

  • Package Name: fakeredis
  • Language: Python
  • Installation: pip install fakeredis
  • License: BSD-3-Clause
  • Documentation: https://fakeredis.moransoftware.ca/

Core Imports

import fakeredis

Common usage patterns:

from fakeredis import FakeRedis, FakeServer

For async support:

from fakeredis import FakeAsyncRedis

For Redis Stack extensions:

from fakeredis.stack import JSONCommandsMixin

Basic Usage

import fakeredis

# Create a fake Redis client (drop-in replacement for redis.Redis)
redis_client = fakeredis.FakeRedis()

# Basic key-value operations
redis_client.set('key', 'value')
value = redis_client.get('key')
print(value.decode())  # 'value'

# Hash operations
redis_client.hmset('user:1', {'name': 'John', 'email': 'john@example.com'})
user_data = redis_client.hgetall('user:1')

# List operations
redis_client.lpush('tasks', 'task1', 'task2', 'task3')
tasks = redis_client.lrange('tasks', 0, -1)

# Set operations
redis_client.sadd('tags', 'python', 'redis', 'testing')
tags = redis_client.smembers('tags')

# Async usage
import asyncio
async def async_example():
    async_client = fakeredis.FakeAsyncRedis()
    await async_client.set('async_key', 'async_value')
    value = await async_client.get('async_key')
    return value

# Run async example
# result = asyncio.run(async_example())

Architecture

FakeRedis follows a modular architecture that ensures Redis compatibility:

  • FakeServer: Central server instance managing databases, connections, script cache, and pub/sub state
  • FakeRedis/FakeAsyncRedis: Main client classes inheriting from redis-py with fake connection handling
  • Command Mixins: Modular implementation of Redis commands organized by data type (strings, hashes, lists, etc.)
  • Data Models: Redis-compatible data structures (Hash, ZSet, XStream, TimeSeries, etc.)
  • Stack Extensions: Optional modules providing RedisStack functionality (JSON, Bloom filters, Time Series, etc.)
  • Protocol Layer: RESP protocol implementation with socket emulation for full compatibility

This architecture allows FakeRedis to maintain exact Redis behavior while providing a lightweight, dependency-minimal solution for testing Redis-dependent applications.

Capabilities

Core Redis Clients

Main client classes that provide drop-in replacements for redis-py clients with full Redis command compatibility and server emulation.

class FakeRedis(redis.Redis):
    def __init__(
        self,
        server: Optional[FakeServer] = None,
        version: VersionType = (7,),
        server_type: ServerType = "redis",
        lua_modules: Optional[Dict[str, Any]] = None,
        **kwargs
    ): ...
    
    @classmethod
    def from_url(cls, url: str, **kwargs) -> Self: ...

class FakeAsyncRedis(redis.asyncio.Redis):
    def __init__(
        self,
        server: Optional[FakeServer] = None,
        version: VersionType = (7,),
        server_type: ServerType = "redis", 
        lua_modules: Optional[Dict[str, Any]] = None,
        **kwargs
    ): ...
    
    @classmethod
    def from_url(cls, url: str, **kwargs) -> Self: ...

class FakeStrictRedis(redis.StrictRedis):
    def __init__(self, **kwargs): ...
    
    @classmethod
    def from_url(cls, url: str, **kwargs) -> Self: ...

Core Redis Clients

Server Management

Server instances that manage Redis state, databases, and connections with support for different Redis-compatible server types.

class FakeServer:
    def __init__(
        self,
        version: VersionType = (7,),
        server_type: ServerType = "redis",
        config: Optional[Dict[bytes, bytes]] = None
    ): ...
    
    def get_server(self) -> FakeServer: ...
    def get_next_client_id(self) -> int: ...

class TcpFakeServer:
    def __init__(
        self,
        server_address: Tuple[str, int],
        server_type: ServerType = "redis",
        server_version: VersionType = (8, 0)
    ): ...

Server Management

String Operations

Redis string data type operations including get/set, increment/decrement, and multi-key operations.

def get(self, name: KeyT) -> Optional[bytes]: ...
def set(
    self,
    name: KeyT,
    value: EncodableT,
    ex: Optional[ExpiryT] = None,
    px: Optional[ExpiryT] = None,
    nx: bool = False,
    xx: bool = False,
    keepttl: bool = False,
    get: bool = False,
    exat: Optional[AbsExpiryT] = None,
    pxat: Optional[AbsExpiryT] = None
) -> Optional[bytes]: ...
def mget(self, keys: Sequence[KeyT]) -> List[Optional[bytes]]: ...
def mset(self, mapping: Mapping[AnyKeyT, EncodableT]) -> bool: ...
def incr(self, name: KeyT, amount: int = 1) -> int: ...
def decr(self, name: KeyT, amount: int = 1) -> int: ...

String Operations

Bitmap Operations

Redis bitmap operations for efficient bit-level manipulation of string values with counting, positioning, and bitwise operations.

def getbit(self, key: KeyT, offset: int) -> int: ...
def setbit(self, key: KeyT, offset: int, value: int) -> int: ...
def bitpos(self, key: KeyT, bit: int, *args: bytes) -> int: ...
def bitcount(self, key: KeyT, *args: bytes) -> int: ...
def bitop(self, op_name: bytes, dst: KeyT, *keys: KeyT) -> int: ...
def bitfield(self, key: KeyT, *args: bytes) -> List[Optional[int]]: ...

Bitmap Operations

Hash Operations

Redis hash data type operations for field-value mappings within keys.

def hget(self, name: KeyT, key: str) -> Optional[bytes]: ...
def hgetall(self, name: KeyT) -> Dict[bytes, bytes]: ...
def hset(
    self,
    name: KeyT,
    key: Optional[str] = None,
    value: Optional[EncodableT] = None,
    mapping: Optional[Mapping[AnyKeyT, EncodableT]] = None
) -> int: ...
def hmget(self, name: KeyT, keys: Sequence[str]) -> List[Optional[bytes]]: ...
def hincrby(self, name: KeyT, key: str, amount: int = 1) -> int: ...
def hdel(self, name: KeyT, *keys: str) -> int: ...

Hash Operations

List Operations

Redis list data type operations for ordered collections with push/pop operations from both ends.

def lpush(self, name: KeyT, *values: EncodableT) -> int: ...
def rpush(self, name: KeyT, *values: EncodableT) -> int: ...
def lpop(self, name: KeyT, count: Optional[int] = None) -> Union[Optional[bytes], List[bytes]]: ...
def rpop(self, name: KeyT, count: Optional[int] = None) -> Union[Optional[bytes], List[bytes]]: ...
def lrange(self, name: KeyT, start: int, end: int) -> List[bytes]: ...
def llen(self, name: KeyT) -> int: ...
def blpop(self, keys: Sequence[KeyT], timeout: int = 0) -> Optional[Tuple[bytes, bytes]]: ...
def brpop(self, keys: Sequence[KeyT], timeout: int = 0) -> Optional[Tuple[bytes, bytes]]: ...

List Operations

Set Operations

Redis set data type operations for unordered collections of unique elements with set algebra operations.

def sadd(self, name: KeyT, *values: EncodableT) -> int: ...
def smembers(self, name: KeyT) -> Set[bytes]: ...
def sismember(self, name: KeyT, value: EncodableT) -> bool: ...
def srem(self, name: KeyT, *values: EncodableT) -> int: ...
def scard(self, name: KeyT) -> int: ...
def sinter(self, keys: Sequence[KeyT]) -> Set[bytes]: ...
def sunion(self, keys: Sequence[KeyT]) -> Set[bytes]: ...
def sdiff(self, keys: Sequence[KeyT]) -> Set[bytes]: ...

Set Operations

Sorted Set Operations

Redis sorted set data type operations for ordered collections with numeric scores enabling range queries and ranking.

def zadd(
    self,
    name: KeyT,
    mapping: Mapping[AnyKeyT, EncodableT],
    nx: bool = False,
    xx: bool = False,
    ch: bool = False,
    incr: bool = False,
    gt: bool = False,
    lt: bool = False
) -> Union[int, Optional[float]]: ...
def zrange(
    self,
    name: KeyT,
    start: int,
    end: int,
    desc: bool = False,
    withscores: bool = False,
    score_cast_func: Callable[[bytes], Any] = float,
    byscore: bool = False,
    bylex: bool = False,
    offset: Optional[int] = None,
    num: Optional[int] = None
) -> Union[List[bytes], List[Tuple[bytes, float]]]: ...
def zrank(self, name: KeyT, value: EncodableT) -> Optional[int]: ...
def zscore(self, name: KeyT, value: EncodableT) -> Optional[float]: ...

Sorted Set Operations

Stream Operations

Redis 5.0+ streams for append-only log data structures with consumer group support for distributed processing.

def xadd(
    self,
    name: KeyT,
    fields: Dict[AnyKeyT, EncodableT],
    id: str = "*",
    maxlen: Optional[int] = None,
    approximate: bool = True,
    nomkstream: bool = False,
    minid: Optional[str] = None,
    limit: Optional[int] = None
) -> str: ...
def xread(
    self,
    streams: Dict[KeyT, Union[str, int]],
    count: Optional[int] = None,
    block: Optional[int] = None
) -> List[Tuple[bytes, List[Tuple[bytes, Dict[bytes, bytes]]]]]: ...
def xgroup_create(
    self,
    name: KeyT,
    groupname: str,
    id: str = "$",
    mkstream: bool = False,
    entries_read: Optional[int] = None
) -> bool: ...

Stream Operations

Pub/Sub Operations

Redis publish/subscribe messaging with support for channels, pattern subscriptions, and shard channels.

def publish(self, channel: KeyT, message: EncodableT) -> int: ...
def pubsub_channels(self, pattern: str = "*") -> List[bytes]: ...
def pubsub_numsub(self, *args: KeyT) -> List[Tuple[bytes, int]]: ...

class PubSub:
    def subscribe(self, *args: KeyT) -> None: ...
    def psubscribe(self, *args: str) -> None: ...
    def unsubscribe(self, *args: KeyT) -> None: ...
    def punsubscribe(self, *args: str) -> None: ...
    def get_message(self, ignore_subscribe_messages: bool = False, timeout: float = 0.0) -> Optional[Dict[str, Any]]: ...

Pub/Sub Operations

Geospatial Operations

Redis geospatial operations for location-based data with geographic indexing and radius queries.

def geoadd(
    self,
    name: KeyT,
    values: Sequence[Union[Tuple[float, float, EncodableT], Tuple[EncodableT, float, float]]],
    nx: bool = False,
    xx: bool = False,
    ch: bool = False
) -> int: ...
def geodist(
    self,
    name: KeyT,
    place1: EncodableT,
    place2: EncodableT,
    unit: Optional[str] = None
) -> Optional[float]: ...
def georadius(
    self,
    name: KeyT,
    longitude: float,
    latitude: float,
    radius: float,
    unit: str = "m",
    withdist: bool = False,
    withcoord: bool = False,
    withhash: bool = False,
    count: Optional[int] = None,
    sort: Optional[str] = None,
    store: Optional[KeyT] = None,
    store_dist: Optional[KeyT] = None
) -> List[Union[bytes, List[Union[bytes, float, Tuple[float, float], int]]]]: ...

Geospatial Operations

Generic Key Operations

Generic Redis operations that work across all data types including expiration, pattern matching, and key management.

def delete(self, *names: KeyT) -> int: ...
def exists(self, *names: KeyT) -> int: ...
def expire(self, name: KeyT, time: ExpiryT) -> bool: ...
def expireat(self, name: KeyT, when: AbsExpiryT) -> bool: ...
def keys(self, pattern: str = "*") -> List[bytes]: ...
def rename(self, src: KeyT, dst: KeyT) -> bool: ...
def ttl(self, name: KeyT) -> int: ...
def type(self, name: KeyT) -> bytes: ...
def scan(
    self,
    cursor: int = 0,
    match: Optional[str] = None,
    count: Optional[int] = None,
    _type: Optional[str] = None
) -> Tuple[int, List[bytes]]: ...

Generic Key Operations

Transaction Operations

Redis transaction support with MULTI/EXEC commands and optimistic locking via WATCH.

def multi(self) -> None: ...
def execute(self) -> List[Any]: ...
def discard(self) -> None: ...
def watch(self, *names: KeyT) -> bool: ...
def unwatch(self) -> bool: ...

class Pipeline:
    def __enter__(self) -> Pipeline: ...
    def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
    def execute(self, raise_on_error: bool = True) -> List[Any]: ...

Transaction Operations

Lua Scripting

Redis Lua scripting support with script caching and execution.

def eval(
    self,
    script: str,
    numkeys: int,
    *keys_and_args: KeyT
) -> Any: ...
def evalsha(
    self,
    sha: str,
    numkeys: int,
    *keys_and_args: KeyT
) -> Any: ...
def script_load(self, script: str) -> str: ...
def script_exists(self, *args: str) -> List[bool]: ...
def script_flush(self, sync_type: Optional[str] = None) -> bool: ...

Lua Scripting

Server Operations

Redis server management commands including database operations, configuration, and information retrieval.

def info(self, section: Optional[str] = None) -> Dict[str, Any]: ...
def dbsize(self) -> int: ...
def flushdb(self, asynchronous: bool = False) -> bool: ...
def flushall(self, asynchronous: bool = False) -> bool: ...
def config_get(self, pattern: str = "*") -> Dict[bytes, bytes]: ...
def config_set(self, name: str, value: EncodableT) -> bool: ...
def time(self) -> Tuple[str, str]: ...
def client_list(self, _type: Optional[str] = None, client_id: Optional[int] = None) -> str: ...

Server Operations

Redis Stack Extensions

Optional Redis Stack module implementations providing advanced data structures and capabilities.

# JSON operations (requires jsonpath-ng)
def json_get(self, name: KeyT, *args: str) -> Any: ...
def json_set(self, name: KeyT, path: str, obj: Any, nx: bool = False, xx: bool = False) -> Optional[bool]: ...

# Bloom Filter operations (requires pyprobables) 
def bf_add(self, key: KeyT, item: EncodableT) -> bool: ...
def bf_exists(self, key: KeyT, item: EncodableT) -> bool: ...

# Time Series operations
def ts_add(
    self,
    key: KeyT,
    timestamp: Union[int, str],
    value: float,
    retention_msecs: Optional[int] = None,
    uncompressed: Optional[bool] = None,
    chunk_size: Optional[int] = None,
    on_duplicate: Optional[str] = None
) -> int: ...

Redis Stack Extensions

Valkey Support

Optional Valkey client implementations for compatibility with Valkey servers (requires valkey package).

class FakeValkey(valkey.Valkey):
    def __init__(self, **kwargs): ...
    
    @classmethod
    def from_url(cls, url: str, **kwargs) -> Self: ...

class FakeAsyncValkey(valkey.asyncio.Valkey):
    def __init__(self, **kwargs): ...
    
    @classmethod  
    def from_url(cls, url: str, **kwargs) -> Self: ...

Valkey Support

Types

# Core type aliases
VersionType = Tuple[int, ...]
ServerType = Literal["redis", "dragonfly", "valkey"]
KeyT = Union[str, bytes]
EncodableT = Union[bytes, float, int, str]
ExpiryT = Union[int, timedelta]
AbsExpiryT = Union[datetime, int]
AnyKeyT = Union[str, bytes, memoryview]

# Server configuration
class FakeServer:
    def __init__(
        self,
        version: VersionType = (7,),
        server_type: ServerType = "redis", 
        config: Optional[Dict[bytes, bytes]] = None
    ): ...

# Response types
class SimpleString:
    """Redis simple string response type"""
    pass

# Client info model
class ClientInfo:
    id: int
    addr: str
    laddr: str
    fd: int
    name: str
    idle: int
    flags: str
    db: int
    sub: int
    psub: int
    ssub: int
    multi: int
    qbuf: int
    qbuf_free: int
    argv_mem: int
    multi_mem: int
    rbs: int
    rbp: int
    obl: int
    oll: int
    omem: int
    tot_mem: int
    events: str
    cmd: str
    user: str
    redir: int
    resp: int

# Stream models
class StreamEntryKey(NamedTuple):
    millis_time: int
    sequence_number: int

# Time series models  
class TimeSeriesRule:
    dest_key: str
    aggregation_type: str
    bucket_size_msec: int
    alignment_timestamp: Optional[int] = None

# Access control
class AccessControlList:
    def authenticate(self, username: str, password: str) -> bool: ...
    def get_user(self, username: str) -> Optional[Dict[str, Any]]: ...