docs
Python implementation of redis API, can be used for testing purposes
npx @tessl/cli install tessl/pypi-fakeredis@2.31.0A comprehensive Python implementation of the Redis API that provides enhanced versions of the redis-py Python bindings for Redis. It enables running tests requiring Redis, Valkey, DragonflyDB, or KeyDB servers without an actual server. FakeRedis supports all Redis functionality including advanced features such as RedisJSON, RedisBloom, GeoCommands, Time Series, and more.
pip install fakeredisimport fakeredisCommon usage patterns:
from fakeredis import FakeRedis, FakeServerFor async support:
from fakeredis import FakeAsyncRedisFor Redis Stack extensions:
from fakeredis.stack import JSONCommandsMixinimport fakeredis
# Create a fake Redis client (drop-in replacement for redis.Redis)
redis_client = fakeredis.FakeRedis()
# Basic key-value operations
redis_client.set('key', 'value')
value = redis_client.get('key')
print(value.decode()) # 'value'
# Hash operations
redis_client.hmset('user:1', {'name': 'John', 'email': 'john@example.com'})
user_data = redis_client.hgetall('user:1')
# List operations
redis_client.lpush('tasks', 'task1', 'task2', 'task3')
tasks = redis_client.lrange('tasks', 0, -1)
# Set operations
redis_client.sadd('tags', 'python', 'redis', 'testing')
tags = redis_client.smembers('tags')
# Async usage
import asyncio
async def async_example():
async_client = fakeredis.FakeAsyncRedis()
await async_client.set('async_key', 'async_value')
value = await async_client.get('async_key')
return value
# Run async example
# result = asyncio.run(async_example())FakeRedis follows a modular architecture that ensures Redis compatibility:
This architecture allows FakeRedis to maintain exact Redis behavior while providing a lightweight, dependency-minimal solution for testing Redis-dependent applications.
Main client classes that provide drop-in replacements for redis-py clients with full Redis command compatibility and server emulation.
class FakeRedis(redis.Redis):
def __init__(
self,
server: Optional[FakeServer] = None,
version: VersionType = (7,),
server_type: ServerType = "redis",
lua_modules: Optional[Dict[str, Any]] = None,
**kwargs
): ...
@classmethod
def from_url(cls, url: str, **kwargs) -> Self: ...
class FakeAsyncRedis(redis.asyncio.Redis):
def __init__(
self,
server: Optional[FakeServer] = None,
version: VersionType = (7,),
server_type: ServerType = "redis",
lua_modules: Optional[Dict[str, Any]] = None,
**kwargs
): ...
@classmethod
def from_url(cls, url: str, **kwargs) -> Self: ...
class FakeStrictRedis(redis.StrictRedis):
def __init__(self, **kwargs): ...
@classmethod
def from_url(cls, url: str, **kwargs) -> Self: ...Server instances that manage Redis state, databases, and connections with support for different Redis-compatible server types.
class FakeServer:
def __init__(
self,
version: VersionType = (7,),
server_type: ServerType = "redis",
config: Optional[Dict[bytes, bytes]] = None
): ...
def get_server(self) -> FakeServer: ...
def get_next_client_id(self) -> int: ...
class TcpFakeServer:
def __init__(
self,
server_address: Tuple[str, int],
server_type: ServerType = "redis",
server_version: VersionType = (8, 0)
): ...Redis string data type operations including get/set, increment/decrement, and multi-key operations.
def get(self, name: KeyT) -> Optional[bytes]: ...
def set(
self,
name: KeyT,
value: EncodableT,
ex: Optional[ExpiryT] = None,
px: Optional[ExpiryT] = None,
nx: bool = False,
xx: bool = False,
keepttl: bool = False,
get: bool = False,
exat: Optional[AbsExpiryT] = None,
pxat: Optional[AbsExpiryT] = None
) -> Optional[bytes]: ...
def mget(self, keys: Sequence[KeyT]) -> List[Optional[bytes]]: ...
def mset(self, mapping: Mapping[AnyKeyT, EncodableT]) -> bool: ...
def incr(self, name: KeyT, amount: int = 1) -> int: ...
def decr(self, name: KeyT, amount: int = 1) -> int: ...Redis bitmap operations for efficient bit-level manipulation of string values with counting, positioning, and bitwise operations.
def getbit(self, key: KeyT, offset: int) -> int: ...
def setbit(self, key: KeyT, offset: int, value: int) -> int: ...
def bitpos(self, key: KeyT, bit: int, *args: bytes) -> int: ...
def bitcount(self, key: KeyT, *args: bytes) -> int: ...
def bitop(self, op_name: bytes, dst: KeyT, *keys: KeyT) -> int: ...
def bitfield(self, key: KeyT, *args: bytes) -> List[Optional[int]]: ...Redis hash data type operations for field-value mappings within keys.
def hget(self, name: KeyT, key: str) -> Optional[bytes]: ...
def hgetall(self, name: KeyT) -> Dict[bytes, bytes]: ...
def hset(
self,
name: KeyT,
key: Optional[str] = None,
value: Optional[EncodableT] = None,
mapping: Optional[Mapping[AnyKeyT, EncodableT]] = None
) -> int: ...
def hmget(self, name: KeyT, keys: Sequence[str]) -> List[Optional[bytes]]: ...
def hincrby(self, name: KeyT, key: str, amount: int = 1) -> int: ...
def hdel(self, name: KeyT, *keys: str) -> int: ...Redis list data type operations for ordered collections with push/pop operations from both ends.
def lpush(self, name: KeyT, *values: EncodableT) -> int: ...
def rpush(self, name: KeyT, *values: EncodableT) -> int: ...
def lpop(self, name: KeyT, count: Optional[int] = None) -> Union[Optional[bytes], List[bytes]]: ...
def rpop(self, name: KeyT, count: Optional[int] = None) -> Union[Optional[bytes], List[bytes]]: ...
def lrange(self, name: KeyT, start: int, end: int) -> List[bytes]: ...
def llen(self, name: KeyT) -> int: ...
def blpop(self, keys: Sequence[KeyT], timeout: int = 0) -> Optional[Tuple[bytes, bytes]]: ...
def brpop(self, keys: Sequence[KeyT], timeout: int = 0) -> Optional[Tuple[bytes, bytes]]: ...Redis set data type operations for unordered collections of unique elements with set algebra operations.
def sadd(self, name: KeyT, *values: EncodableT) -> int: ...
def smembers(self, name: KeyT) -> Set[bytes]: ...
def sismember(self, name: KeyT, value: EncodableT) -> bool: ...
def srem(self, name: KeyT, *values: EncodableT) -> int: ...
def scard(self, name: KeyT) -> int: ...
def sinter(self, keys: Sequence[KeyT]) -> Set[bytes]: ...
def sunion(self, keys: Sequence[KeyT]) -> Set[bytes]: ...
def sdiff(self, keys: Sequence[KeyT]) -> Set[bytes]: ...Redis sorted set data type operations for ordered collections with numeric scores enabling range queries and ranking.
def zadd(
self,
name: KeyT,
mapping: Mapping[AnyKeyT, EncodableT],
nx: bool = False,
xx: bool = False,
ch: bool = False,
incr: bool = False,
gt: bool = False,
lt: bool = False
) -> Union[int, Optional[float]]: ...
def zrange(
self,
name: KeyT,
start: int,
end: int,
desc: bool = False,
withscores: bool = False,
score_cast_func: Callable[[bytes], Any] = float,
byscore: bool = False,
bylex: bool = False,
offset: Optional[int] = None,
num: Optional[int] = None
) -> Union[List[bytes], List[Tuple[bytes, float]]]: ...
def zrank(self, name: KeyT, value: EncodableT) -> Optional[int]: ...
def zscore(self, name: KeyT, value: EncodableT) -> Optional[float]: ...Redis 5.0+ streams for append-only log data structures with consumer group support for distributed processing.
def xadd(
self,
name: KeyT,
fields: Dict[AnyKeyT, EncodableT],
id: str = "*",
maxlen: Optional[int] = None,
approximate: bool = True,
nomkstream: bool = False,
minid: Optional[str] = None,
limit: Optional[int] = None
) -> str: ...
def xread(
self,
streams: Dict[KeyT, Union[str, int]],
count: Optional[int] = None,
block: Optional[int] = None
) -> List[Tuple[bytes, List[Tuple[bytes, Dict[bytes, bytes]]]]]: ...
def xgroup_create(
self,
name: KeyT,
groupname: str,
id: str = "$",
mkstream: bool = False,
entries_read: Optional[int] = None
) -> bool: ...Redis publish/subscribe messaging with support for channels, pattern subscriptions, and shard channels.
def publish(self, channel: KeyT, message: EncodableT) -> int: ...
def pubsub_channels(self, pattern: str = "*") -> List[bytes]: ...
def pubsub_numsub(self, *args: KeyT) -> List[Tuple[bytes, int]]: ...
class PubSub:
def subscribe(self, *args: KeyT) -> None: ...
def psubscribe(self, *args: str) -> None: ...
def unsubscribe(self, *args: KeyT) -> None: ...
def punsubscribe(self, *args: str) -> None: ...
def get_message(self, ignore_subscribe_messages: bool = False, timeout: float = 0.0) -> Optional[Dict[str, Any]]: ...Redis geospatial operations for location-based data with geographic indexing and radius queries.
def geoadd(
self,
name: KeyT,
values: Sequence[Union[Tuple[float, float, EncodableT], Tuple[EncodableT, float, float]]],
nx: bool = False,
xx: bool = False,
ch: bool = False
) -> int: ...
def geodist(
self,
name: KeyT,
place1: EncodableT,
place2: EncodableT,
unit: Optional[str] = None
) -> Optional[float]: ...
def georadius(
self,
name: KeyT,
longitude: float,
latitude: float,
radius: float,
unit: str = "m",
withdist: bool = False,
withcoord: bool = False,
withhash: bool = False,
count: Optional[int] = None,
sort: Optional[str] = None,
store: Optional[KeyT] = None,
store_dist: Optional[KeyT] = None
) -> List[Union[bytes, List[Union[bytes, float, Tuple[float, float], int]]]]: ...Generic Redis operations that work across all data types including expiration, pattern matching, and key management.
def delete(self, *names: KeyT) -> int: ...
def exists(self, *names: KeyT) -> int: ...
def expire(self, name: KeyT, time: ExpiryT) -> bool: ...
def expireat(self, name: KeyT, when: AbsExpiryT) -> bool: ...
def keys(self, pattern: str = "*") -> List[bytes]: ...
def rename(self, src: KeyT, dst: KeyT) -> bool: ...
def ttl(self, name: KeyT) -> int: ...
def type(self, name: KeyT) -> bytes: ...
def scan(
self,
cursor: int = 0,
match: Optional[str] = None,
count: Optional[int] = None,
_type: Optional[str] = None
) -> Tuple[int, List[bytes]]: ...Redis transaction support with MULTI/EXEC commands and optimistic locking via WATCH.
def multi(self) -> None: ...
def execute(self) -> List[Any]: ...
def discard(self) -> None: ...
def watch(self, *names: KeyT) -> bool: ...
def unwatch(self) -> bool: ...
class Pipeline:
def __enter__(self) -> Pipeline: ...
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
def execute(self, raise_on_error: bool = True) -> List[Any]: ...Redis Lua scripting support with script caching and execution.
def eval(
self,
script: str,
numkeys: int,
*keys_and_args: KeyT
) -> Any: ...
def evalsha(
self,
sha: str,
numkeys: int,
*keys_and_args: KeyT
) -> Any: ...
def script_load(self, script: str) -> str: ...
def script_exists(self, *args: str) -> List[bool]: ...
def script_flush(self, sync_type: Optional[str] = None) -> bool: ...Redis server management commands including database operations, configuration, and information retrieval.
def info(self, section: Optional[str] = None) -> Dict[str, Any]: ...
def dbsize(self) -> int: ...
def flushdb(self, asynchronous: bool = False) -> bool: ...
def flushall(self, asynchronous: bool = False) -> bool: ...
def config_get(self, pattern: str = "*") -> Dict[bytes, bytes]: ...
def config_set(self, name: str, value: EncodableT) -> bool: ...
def time(self) -> Tuple[str, str]: ...
def client_list(self, _type: Optional[str] = None, client_id: Optional[int] = None) -> str: ...Optional Redis Stack module implementations providing advanced data structures and capabilities.
# JSON operations (requires jsonpath-ng)
def json_get(self, name: KeyT, *args: str) -> Any: ...
def json_set(self, name: KeyT, path: str, obj: Any, nx: bool = False, xx: bool = False) -> Optional[bool]: ...
# Bloom Filter operations (requires pyprobables)
def bf_add(self, key: KeyT, item: EncodableT) -> bool: ...
def bf_exists(self, key: KeyT, item: EncodableT) -> bool: ...
# Time Series operations
def ts_add(
self,
key: KeyT,
timestamp: Union[int, str],
value: float,
retention_msecs: Optional[int] = None,
uncompressed: Optional[bool] = None,
chunk_size: Optional[int] = None,
on_duplicate: Optional[str] = None
) -> int: ...Optional Valkey client implementations for compatibility with Valkey servers (requires valkey package).
class FakeValkey(valkey.Valkey):
def __init__(self, **kwargs): ...
@classmethod
def from_url(cls, url: str, **kwargs) -> Self: ...
class FakeAsyncValkey(valkey.asyncio.Valkey):
def __init__(self, **kwargs): ...
@classmethod
def from_url(cls, url: str, **kwargs) -> Self: ...# Core type aliases
VersionType = Tuple[int, ...]
ServerType = Literal["redis", "dragonfly", "valkey"]
KeyT = Union[str, bytes]
EncodableT = Union[bytes, float, int, str]
ExpiryT = Union[int, timedelta]
AbsExpiryT = Union[datetime, int]
AnyKeyT = Union[str, bytes, memoryview]
# Server configuration
class FakeServer:
def __init__(
self,
version: VersionType = (7,),
server_type: ServerType = "redis",
config: Optional[Dict[bytes, bytes]] = None
): ...
# Response types
class SimpleString:
"""Redis simple string response type"""
pass
# Client info model
class ClientInfo:
id: int
addr: str
laddr: str
fd: int
name: str
idle: int
flags: str
db: int
sub: int
psub: int
ssub: int
multi: int
qbuf: int
qbuf_free: int
argv_mem: int
multi_mem: int
rbs: int
rbp: int
obl: int
oll: int
omem: int
tot_mem: int
events: str
cmd: str
user: str
redir: int
resp: int
# Stream models
class StreamEntryKey(NamedTuple):
millis_time: int
sequence_number: int
# Time series models
class TimeSeriesRule:
dest_key: str
aggregation_type: str
bucket_size_msec: int
alignment_timestamp: Optional[int] = None
# Access control
class AccessControlList:
def authenticate(self, username: str, password: str) -> bool: ...
def get_user(self, username: str) -> Optional[Dict[str, Any]]: ...