Python implementation of redis API, can be used for testing purposes
—
Optional Redis Stack module implementations providing advanced data structures and capabilities including JSON, Bloom filters, Time Series, and probabilistic data structures. These extensions require additional dependencies and provide Redis Stack compatibility.
Redis JSON module implementation for storing, querying, and manipulating JSON documents using JSONPath expressions.
Dependencies: jsonpath-ng
# JSON document operations
def json_del(self, name: KeyT, path: str = "$") -> int: ...
def json_get(
self,
name: KeyT,
*args: str,
no_escape: bool = False
) -> Any: ...
def json_mget(self, keys: List[KeyT], path: str) -> List[Any]: ...
def json_set(
self,
name: KeyT,
path: str,
obj: Any,
nx: bool = False,
xx: bool = False
) -> Optional[bool]: ...
# JSON utility operations
def json_clear(self, name: KeyT, path: str = "$") -> int: ...
def json_toggle(self, name: KeyT, path: str = "$") -> List[Union[bool, None]]: ...
def json_type(self, name: KeyT, path: str = "$") -> List[str]: ...
# JSON string operations
def json_strlen(self, name: KeyT, path: str = "$") -> List[Union[int, None]]: ...
def json_strappend(self, name: KeyT, string: str, path: str = "$") -> List[Union[int, None]]: ...
# JSON array operations
def json_arrappend(self, name: KeyT, path: str = "$", *args: Any) -> List[Union[int, None]]: ...
def json_arrindex(
self,
name: KeyT,
path: str,
scalar: Any,
start: Optional[int] = None,
stop: Optional[int] = None
) -> List[Union[int, None]]: ...
def json_arrinsert(self, name: KeyT, path: str, index: int, *args: Any) -> List[Union[int, None]]: ...
def json_arrlen(self, name: KeyT, path: str = "$") -> List[Union[int, None]]: ...
def json_arrpop(self, name: KeyT, path: str = "$", index: int = -1) -> List[Any]: ...
def json_arrtrim(self, name: KeyT, path: str, start: int, stop: int) -> List[Union[int, None]]: ...
# JSON numeric operations
def json_numincrby(self, name: KeyT, path: str, number: float) -> List[Union[float, None]]: ...
def json_nummultby(self, name: KeyT, path: str, number: float) -> List[Union[float, None]]: ...
# JSON object operations
def json_objkeys(self, name: KeyT, path: str = "$") -> List[Union[List[str], None]]: ...
def json_objlen(self, name: KeyT, path: str = "$") -> List[Union[int, None]]: ...Probabilistic data structure for membership testing with tunable false positive rates.
Dependencies: pyprobables
def bf_add(self, key: KeyT, item: EncodableT) -> bool: ...
def bf_madd(self, key: KeyT, *items: EncodableT) -> List[bool]: ...
def bf_exists(self, key: KeyT, item: EncodableT) -> bool: ...
def bf_mexists(self, key: KeyT, *items: EncodableT) -> List[bool]: ...
def bf_card(self, key: KeyT) -> int: ...
def bf_info(self, key: KeyT) -> Dict[str, Union[int, float]]: ...
def bf_insert(
self,
key: KeyT,
items: List[EncodableT],
capacity: Optional[int] = None,
error: Optional[float] = None,
expansion: Optional[int] = None,
nocreate: bool = False,
nonscaling: bool = False
) -> List[bool]: ...
def bf_reserve(
self,
key: KeyT,
error_rate: float,
capacity: int,
expansion: Optional[int] = None,
nonscaling: bool = False
) -> bool: ...Alternative probabilistic data structure supporting deletions with lower memory overhead.
Dependencies: pyprobables
def cf_add(self, key: KeyT, item: EncodableT) -> bool: ...
def cf_addnx(self, key: KeyT, item: EncodableT) -> bool: ...
def cf_count(self, key: KeyT, item: EncodableT) -> int: ...
def cf_del(self, key: KeyT, item: EncodableT) -> bool: ...
def cf_exists(self, key: KeyT, item: EncodableT) -> bool: ...
def cf_info(self, key: KeyT) -> Dict[str, Union[int, float]]: ...
def cf_insert(
self,
key: KeyT,
items: List[EncodableT],
capacity: Optional[int] = None,
nocreate: bool = False
) -> List[bool]: ...
def cf_reserve(self, key: KeyT, capacity: int, bucket_size: Optional[int] = None, max_iterations: Optional[int] = None) -> bool: ...Probabilistic data structure for frequency estimation in data streams.
Dependencies: pyprobables
def cms_incrby(self, key: KeyT, items: Dict[EncodableT, int]) -> List[int]: ...
def cms_info(self, key: KeyT) -> Dict[str, Union[int, float]]: ...
def cms_initbydim(self, key: KeyT, width: int, depth: int) -> bool: ...
def cms_initbyprob(self, key: KeyT, error: float, probability: float) -> bool: ...
def cms_merge(self, dest: KeyT, numkeys: int, src: List[KeyT], weights: Optional[List[int]] = None) -> bool: ...
def cms_query(self, key: KeyT, *items: EncodableT) -> List[int]: ...Probabilistic data structure for tracking the k most frequent items in a stream.
def topk_add(self, key: KeyT, *items: EncodableT) -> List[Union[str, None]]: ...
def topk_count(self, key: KeyT, *items: EncodableT) -> List[int]: ...
def topk_info(self, key: KeyT) -> Dict[str, Union[int, float]]: ...
def topk_incrby(self, key: KeyT, items: Dict[EncodableT, int]) -> List[Union[str, None]]: ...
def topk_list(self, key: KeyT) -> List[str]: ...
def topk_query(self, key: KeyT, *items: EncodableT) -> List[bool]: ...
def topk_reserve(self, key: KeyT, k: int, width: int, depth: int, decay: float) -> bool: ...Probabilistic data structure for accurate estimation of quantiles and percentiles.
def tdigest_create(self, key: KeyT, compression: Optional[int] = None) -> bool: ...
def tdigest_add(self, key: KeyT, values: List[float], weights: Optional[List[float]] = None) -> bool: ...
def tdigest_merge(self, dest_key: KeyT, numkeys: int, src_keys: List[KeyT], compression: Optional[int] = None, override: bool = False) -> bool: ...
def tdigest_max(self, key: KeyT) -> float: ...
def tdigest_min(self, key: KeyT) -> float: ...
def tdigest_quantile(self, key: KeyT, *quantiles: float) -> List[float]: ...
def tdigest_rank(self, key: KeyT, *values: float) -> List[float]: ...
def tdigest_revrank(self, key: KeyT, *values: float) -> List[float]: ...
def tdigest_reset(self, key: KeyT) -> bool: ...
def tdigest_cdf(self, key: KeyT, *values: float) -> List[float]: ...
def tdigest_info(self, key: KeyT) -> Dict[str, Union[int, float]]: ...Time series data structure with aggregation rules, compaction policies, and retention management.
def ts_create(
self,
key: KeyT,
retention_msecs: Optional[int] = None,
uncompressed: Optional[bool] = None,
chunk_size: Optional[int] = None,
duplicate_policy: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
ignore_max_time_diff: Optional[int] = None,
ignore_max_val_diff: Optional[float] = None
) -> bool: ...
def ts_add(
self,
key: KeyT,
timestamp: Union[int, str],
value: float,
retention_msecs: Optional[int] = None,
uncompressed: Optional[bool] = None,
chunk_size: Optional[int] = None,
on_duplicate: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
ignore_max_time_diff: Optional[int] = None,
ignore_max_val_diff: Optional[float] = None
) -> int: ...
def ts_madd(self, ktv_tuples: List[Tuple[KeyT, Union[int, str], float]]) -> List[int]: ...
def ts_get(self, key: KeyT, latest: bool = False) -> Tuple[int, float]: ...
def ts_info(self, key: KeyT) -> Dict[str, Any]: ...
def ts_range(
self,
key: KeyT,
from_time: Union[int, str] = "-",
to_time: Union[int, str] = "+",
count: Optional[int] = None,
aggregation_type: Optional[str] = None,
bucket_size_msec: Optional[int] = None,
filter_by_ts: Optional[List[int]] = None,
filter_by_min_value: Optional[float] = None,
filter_by_max_value: Optional[float] = None,
align: Optional[Union[int, str]] = None,
latest: bool = False
) -> List[Tuple[int, float]]: ...
def ts_revrange(
self,
key: KeyT,
from_time: Union[int, str] = "+",
to_time: Union[int, str] = "-",
count: Optional[int] = None,
aggregation_type: Optional[str] = None,
bucket_size_msec: Optional[int] = None,
filter_by_ts: Optional[List[int]] = None,
filter_by_min_value: Optional[float] = None,
filter_by_max_value: Optional[float] = None,
align: Optional[Union[int, str]] = None,
latest: bool = False
) -> List[Tuple[int, float]]: ...
def ts_mrange(
self,
from_time: Union[int, str],
to_time: Union[int, str],
filters: List[str],
count: Optional[int] = None,
aggregation_type: Optional[str] = None,
bucket_size_msec: Optional[int] = None,
with_labels: bool = False,
filter_by_ts: Optional[List[int]] = None,
filter_by_min_value: Optional[float] = None,
filter_by_max_value: Optional[float] = None,
groupby: Optional[str] = None,
reduce: Optional[str] = None,
select_labels: Optional[List[str]] = None,
align: Optional[Union[int, str]] = None,
latest: bool = False
) -> List[Dict[str, Any]]: ...
def ts_createrule(
self,
source_key: KeyT,
dest_key: KeyT,
aggregation_type: str,
bucket_size_msec: int,
alignment_timestamp: Optional[int] = None
) -> bool: ...
def ts_deleterule(self, source_key: KeyT, dest_key: KeyT) -> bool: ...
def ts_queryindex(self, filters: List[str]) -> List[str]: ...
def ts_del(self, key: KeyT, from_time: Union[int, str], to_time: Union[int, str]) -> int: ...
def ts_alter(
self,
key: KeyT,
retention_msecs: Optional[int] = None,
chunk_size: Optional[int] = None,
duplicate_policy: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
ignore_max_time_diff: Optional[int] = None,
ignore_max_val_diff: Optional[float] = None
) -> bool: ...
def ts_incrby(
self,
key: KeyT,
value: float,
timestamp: Optional[Union[int, str]] = None,
retention_msecs: Optional[int] = None,
uncompressed: Optional[bool] = None,
chunk_size: Optional[int] = None,
labels: Optional[Dict[str, str]] = None,
ignore_max_time_diff: Optional[int] = None,
ignore_max_val_diff: Optional[float] = None
) -> int: ...
def ts_decrby(
self,
key: KeyT,
value: float,
timestamp: Optional[Union[int, str]] = None,
retention_msecs: Optional[int] = None,
uncompressed: Optional[bool] = None,
chunk_size: Optional[int] = None,
labels: Optional[Dict[str, str]] = None,
ignore_max_time_diff: Optional[int] = None,
ignore_max_val_diff: Optional[float] = None
) -> int: ...import fakeredis
# Enable JSON support (requires jsonpath-ng)
client = fakeredis.FakeRedis()
# Set JSON document
user_data = {
"name": "John Doe",
"email": "john@example.com",
"age": 30,
"addresses": [
{"type": "home", "city": "New York"},
{"type": "work", "city": "Boston"}
]
}
client.json_set("user:1", "$", user_data)
# Get entire document
user = client.json_get("user:1")
print(user)
# Get specific fields using JSONPath
name = client.json_get("user:1", "$.name")
print(name) # ["John Doe"]
work_city = client.json_get("user:1", "$.addresses[?(@.type=='work')].city")
print(work_city) # ["Boston"]
# Update nested values
client.json_set("user:1", "$.age", 31)
client.json_arrappend("user:1", "$.addresses", {"type": "vacation", "city": "Miami"})
# Array operations
addresses_count = client.json_arrlen("user:1", "$.addresses")
print(addresses_count) # [3]import fakeredis
# Enable Bloom filter support (requires pyprobables)
client = fakeredis.FakeRedis()
# Create Bloom filter with 1% error rate for 10000 items
client.bf_reserve("user_ids", 0.01, 10000)
# Add items
client.bf_add("user_ids", "user123")
client.bf_add("user_ids", "user456")
client.bf_madd("user_ids", "user789", "user101", "user202")
# Test membership
exists = client.bf_exists("user_ids", "user123") # True
missing = client.bf_exists("user_ids", "user999") # False (probably)
# Check multiple items
results = client.bf_mexists("user_ids", "user123", "user456", "user999")
print(results) # [True, True, False]
# Get filter info
info = client.bf_info("user_ids")
print(f"Capacity: {info['Capacity']}, Items: {info['Number of items inserted']}")import fakeredis
import time
client = fakeredis.FakeRedis()
# Create time series for temperature sensor
client.ts_create(
"sensor:temp:room1",
retention_msecs=86400000, # 24 hours
labels={"sensor": "temperature", "room": "room1"}
)
# Add current temperature reading
current_time = int(time.time() * 1000)
client.ts_add("sensor:temp:room1", current_time, 22.5)
# Add multiple readings
readings = [
("sensor:temp:room1", current_time + 60000, 23.1),
("sensor:temp:room1", current_time + 120000, 22.8),
("sensor:temp:room1", current_time + 180000, 23.4)
]
client.ts_madd(readings)
# Get latest reading
latest = client.ts_get("sensor:temp:room1")
print(f"Latest reading: {latest[1]}°C at {latest[0]}")
# Get range of readings with aggregation
range_data = client.ts_range(
"sensor:temp:room1",
from_time=current_time,
to_time=current_time + 300000,
aggregation_type="avg",
bucket_size_msec=60000 # 1-minute buckets
)
# Create aggregation rule for hourly averages
client.ts_create("sensor:temp:room1:hourly")
client.ts_createrule(
"sensor:temp:room1",
"sensor:temp:room1:hourly",
"avg",
3600000 # 1 hour in milliseconds
)import fakeredis
client = fakeredis.FakeRedis()
# Count-Min Sketch for frequency counting
client.cms_initbyprob("page_views", 0.01, 0.99) # 1% error, 99% confidence
# Track page view counts
client.cms_incrby("page_views", {"/home": 100, "/about": 50, "/contact": 25})
client.cms_incrby("page_views", {"/home": 75, "/products": 200})
# Query frequencies
counts = client.cms_query("page_views", "/home", "/about", "/products")
print(f"Page view estimates: /home={counts[0]}, /about={counts[1]}, /products={counts[2]}")
# Top-K for heavy hitters
client.topk_reserve("popular_pages", 10, 1000, 5, 0.9) # Track top 10 pages
# Add page views
client.topk_incrby("popular_pages", {"/home": 500, "/products": 300, "/blog": 200})
# Get top items
top_pages = client.topk_list("popular_pages")
print(f"Top pages: {top_pages}")
# T-Digest for percentile calculations
client.tdigest_create("response_times", compression=400)
# Add response time measurements (in milliseconds)
response_times = [45.2, 67.8, 89.1, 123.4, 234.5, 345.6, 456.7, 567.8]
client.tdigest_add("response_times", response_times)
# Calculate percentiles
percentiles = client.tdigest_quantile("response_times", 0.5, 0.95, 0.99)
print(f"Response time percentiles - 50th: {percentiles[0]:.1f}ms, 95th: {percentiles[1]:.1f}ms, 99th: {percentiles[2]:.1f}ms")Install with Tessl CLI
npx tessl i tessl/pypi-fakeredisdocs