docs
0
# Redis Stack Extensions
1
2
Optional Redis Stack module implementations providing advanced data structures and capabilities including JSON, Bloom filters, Time Series, and probabilistic data structures. These extensions require additional dependencies and provide Redis Stack compatibility.
3
4
## Capabilities
5
6
### JSON Operations
7
8
Redis JSON module implementation for storing, querying, and manipulating JSON documents using JSONPath expressions.
9
10
**Dependencies**: `jsonpath-ng`
11
12
```python { .api }
13
# JSON document operations
14
def json_del(self, name: KeyT, path: str = "$") -> int: ...
15
def json_get(
16
self,
17
name: KeyT,
18
*args: str,
19
no_escape: bool = False
20
) -> Any: ...
21
def json_mget(self, keys: List[KeyT], path: str) -> List[Any]: ...
22
def json_set(
23
self,
24
name: KeyT,
25
path: str,
26
obj: Any,
27
nx: bool = False,
28
xx: bool = False
29
) -> Optional[bool]: ...
30
31
# JSON utility operations
32
def json_clear(self, name: KeyT, path: str = "$") -> int: ...
33
def json_toggle(self, name: KeyT, path: str = "$") -> List[Union[bool, None]]: ...
34
def json_type(self, name: KeyT, path: str = "$") -> List[str]: ...
35
36
# JSON string operations
37
def json_strlen(self, name: KeyT, path: str = "$") -> List[Union[int, None]]: ...
38
def json_strappend(self, name: KeyT, string: str, path: str = "$") -> List[Union[int, None]]: ...
39
40
# JSON array operations
41
def json_arrappend(self, name: KeyT, path: str = "$", *args: Any) -> List[Union[int, None]]: ...
42
def json_arrindex(
43
self,
44
name: KeyT,
45
path: str,
46
scalar: Any,
47
start: Optional[int] = None,
48
stop: Optional[int] = None
49
) -> List[Union[int, None]]: ...
50
def json_arrinsert(self, name: KeyT, path: str, index: int, *args: Any) -> List[Union[int, None]]: ...
51
def json_arrlen(self, name: KeyT, path: str = "$") -> List[Union[int, None]]: ...
52
def json_arrpop(self, name: KeyT, path: str = "$", index: int = -1) -> List[Any]: ...
53
def json_arrtrim(self, name: KeyT, path: str, start: int, stop: int) -> List[Union[int, None]]: ...
54
55
# JSON numeric operations
56
def json_numincrby(self, name: KeyT, path: str, number: float) -> List[Union[float, None]]: ...
57
def json_nummultby(self, name: KeyT, path: str, number: float) -> List[Union[float, None]]: ...
58
59
# JSON object operations
60
def json_objkeys(self, name: KeyT, path: str = "$") -> List[Union[List[str], None]]: ...
61
def json_objlen(self, name: KeyT, path: str = "$") -> List[Union[int, None]]: ...
62
```
63
64
### Bloom Filter Operations
65
66
Probabilistic data structure for membership testing with tunable false positive rates.
67
68
**Dependencies**: `pyprobables`
69
70
```python { .api }
71
def bf_add(self, key: KeyT, item: EncodableT) -> bool: ...
72
def bf_madd(self, key: KeyT, *items: EncodableT) -> List[bool]: ...
73
def bf_exists(self, key: KeyT, item: EncodableT) -> bool: ...
74
def bf_mexists(self, key: KeyT, *items: EncodableT) -> List[bool]: ...
75
def bf_card(self, key: KeyT) -> int: ...
76
def bf_info(self, key: KeyT) -> Dict[str, Union[int, float]]: ...
77
def bf_insert(
78
self,
79
key: KeyT,
80
items: List[EncodableT],
81
capacity: Optional[int] = None,
82
error: Optional[float] = None,
83
expansion: Optional[int] = None,
84
nocreate: bool = False,
85
nonscaling: bool = False
86
) -> List[bool]: ...
87
def bf_reserve(
88
self,
89
key: KeyT,
90
error_rate: float,
91
capacity: int,
92
expansion: Optional[int] = None,
93
nonscaling: bool = False
94
) -> bool: ...
95
```
96
97
### Cuckoo Filter Operations
98
99
Alternative probabilistic data structure supporting deletions with lower memory overhead.
100
101
**Dependencies**: `pyprobables`
102
103
```python { .api }
104
def cf_add(self, key: KeyT, item: EncodableT) -> bool: ...
105
def cf_addnx(self, key: KeyT, item: EncodableT) -> bool: ...
106
def cf_count(self, key: KeyT, item: EncodableT) -> int: ...
107
def cf_del(self, key: KeyT, item: EncodableT) -> bool: ...
108
def cf_exists(self, key: KeyT, item: EncodableT) -> bool: ...
109
def cf_info(self, key: KeyT) -> Dict[str, Union[int, float]]: ...
110
def cf_insert(
111
self,
112
key: KeyT,
113
items: List[EncodableT],
114
capacity: Optional[int] = None,
115
nocreate: bool = False
116
) -> List[bool]: ...
117
def cf_reserve(self, key: KeyT, capacity: int, bucket_size: Optional[int] = None, max_iterations: Optional[int] = None) -> bool: ...
118
```
119
120
### Count-Min Sketch Operations
121
122
Probabilistic data structure for frequency estimation in data streams.
123
124
**Dependencies**: `pyprobables`
125
126
```python { .api }
127
def cms_incrby(self, key: KeyT, items: Dict[EncodableT, int]) -> List[int]: ...
128
def cms_info(self, key: KeyT) -> Dict[str, Union[int, float]]: ...
129
def cms_initbydim(self, key: KeyT, width: int, depth: int) -> bool: ...
130
def cms_initbyprob(self, key: KeyT, error: float, probability: float) -> bool: ...
131
def cms_merge(self, dest: KeyT, numkeys: int, src: List[KeyT], weights: Optional[List[int]] = None) -> bool: ...
132
def cms_query(self, key: KeyT, *items: EncodableT) -> List[int]: ...
133
```
134
135
### Top-K Operations
136
137
Probabilistic data structure for tracking the k most frequent items in a stream.
138
139
```python { .api }
140
def topk_add(self, key: KeyT, *items: EncodableT) -> List[Union[str, None]]: ...
141
def topk_count(self, key: KeyT, *items: EncodableT) -> List[int]: ...
142
def topk_info(self, key: KeyT) -> Dict[str, Union[int, float]]: ...
143
def topk_incrby(self, key: KeyT, items: Dict[EncodableT, int]) -> List[Union[str, None]]: ...
144
def topk_list(self, key: KeyT) -> List[str]: ...
145
def topk_query(self, key: KeyT, *items: EncodableT) -> List[bool]: ...
146
def topk_reserve(self, key: KeyT, k: int, width: int, depth: int, decay: float) -> bool: ...
147
```
148
149
### T-Digest Operations
150
151
Probabilistic data structure for accurate estimation of quantiles and percentiles.
152
153
```python { .api }
154
def tdigest_create(self, key: KeyT, compression: Optional[int] = None) -> bool: ...
155
def tdigest_add(self, key: KeyT, values: List[float], weights: Optional[List[float]] = None) -> bool: ...
156
def tdigest_merge(self, dest_key: KeyT, numkeys: int, src_keys: List[KeyT], compression: Optional[int] = None, override: bool = False) -> bool: ...
157
def tdigest_max(self, key: KeyT) -> float: ...
158
def tdigest_min(self, key: KeyT) -> float: ...
159
def tdigest_quantile(self, key: KeyT, *quantiles: float) -> List[float]: ...
160
def tdigest_rank(self, key: KeyT, *values: float) -> List[float]: ...
161
def tdigest_revrank(self, key: KeyT, *values: float) -> List[float]: ...
162
def tdigest_reset(self, key: KeyT) -> bool: ...
163
def tdigest_cdf(self, key: KeyT, *values: float) -> List[float]: ...
164
def tdigest_info(self, key: KeyT) -> Dict[str, Union[int, float]]: ...
165
```
166
167
### Time Series Operations
168
169
Time series data structure with aggregation rules, compaction policies, and retention management.
170
171
```python { .api }
172
def ts_create(
173
self,
174
key: KeyT,
175
retention_msecs: Optional[int] = None,
176
uncompressed: Optional[bool] = None,
177
chunk_size: Optional[int] = None,
178
duplicate_policy: Optional[str] = None,
179
labels: Optional[Dict[str, str]] = None,
180
ignore_max_time_diff: Optional[int] = None,
181
ignore_max_val_diff: Optional[float] = None
182
) -> bool: ...
183
184
def ts_add(
185
self,
186
key: KeyT,
187
timestamp: Union[int, str],
188
value: float,
189
retention_msecs: Optional[int] = None,
190
uncompressed: Optional[bool] = None,
191
chunk_size: Optional[int] = None,
192
on_duplicate: Optional[str] = None,
193
labels: Optional[Dict[str, str]] = None,
194
ignore_max_time_diff: Optional[int] = None,
195
ignore_max_val_diff: Optional[float] = None
196
) -> int: ...
197
198
def ts_madd(self, ktv_tuples: List[Tuple[KeyT, Union[int, str], float]]) -> List[int]: ...
199
200
def ts_get(self, key: KeyT, latest: bool = False) -> Tuple[int, float]: ...
201
202
def ts_info(self, key: KeyT) -> Dict[str, Any]: ...
203
204
def ts_range(
205
self,
206
key: KeyT,
207
from_time: Union[int, str] = "-",
208
to_time: Union[int, str] = "+",
209
count: Optional[int] = None,
210
aggregation_type: Optional[str] = None,
211
bucket_size_msec: Optional[int] = None,
212
filter_by_ts: Optional[List[int]] = None,
213
filter_by_min_value: Optional[float] = None,
214
filter_by_max_value: Optional[float] = None,
215
align: Optional[Union[int, str]] = None,
216
latest: bool = False
217
) -> List[Tuple[int, float]]: ...
218
219
def ts_revrange(
220
self,
221
key: KeyT,
222
from_time: Union[int, str] = "+",
223
to_time: Union[int, str] = "-",
224
count: Optional[int] = None,
225
aggregation_type: Optional[str] = None,
226
bucket_size_msec: Optional[int] = None,
227
filter_by_ts: Optional[List[int]] = None,
228
filter_by_min_value: Optional[float] = None,
229
filter_by_max_value: Optional[float] = None,
230
align: Optional[Union[int, str]] = None,
231
latest: bool = False
232
) -> List[Tuple[int, float]]: ...
233
234
def ts_mrange(
235
self,
236
from_time: Union[int, str],
237
to_time: Union[int, str],
238
filters: List[str],
239
count: Optional[int] = None,
240
aggregation_type: Optional[str] = None,
241
bucket_size_msec: Optional[int] = None,
242
with_labels: bool = False,
243
filter_by_ts: Optional[List[int]] = None,
244
filter_by_min_value: Optional[float] = None,
245
filter_by_max_value: Optional[float] = None,
246
groupby: Optional[str] = None,
247
reduce: Optional[str] = None,
248
select_labels: Optional[List[str]] = None,
249
align: Optional[Union[int, str]] = None,
250
latest: bool = False
251
) -> List[Dict[str, Any]]: ...
252
253
def ts_createrule(
254
self,
255
source_key: KeyT,
256
dest_key: KeyT,
257
aggregation_type: str,
258
bucket_size_msec: int,
259
alignment_timestamp: Optional[int] = None
260
) -> bool: ...
261
262
def ts_deleterule(self, source_key: KeyT, dest_key: KeyT) -> bool: ...
263
264
def ts_queryindex(self, filters: List[str]) -> List[str]: ...
265
266
def ts_del(self, key: KeyT, from_time: Union[int, str], to_time: Union[int, str]) -> int: ...
267
268
def ts_alter(
269
self,
270
key: KeyT,
271
retention_msecs: Optional[int] = None,
272
chunk_size: Optional[int] = None,
273
duplicate_policy: Optional[str] = None,
274
labels: Optional[Dict[str, str]] = None,
275
ignore_max_time_diff: Optional[int] = None,
276
ignore_max_val_diff: Optional[float] = None
277
) -> bool: ...
278
279
def ts_incrby(
280
self,
281
key: KeyT,
282
value: float,
283
timestamp: Optional[Union[int, str]] = None,
284
retention_msecs: Optional[int] = None,
285
uncompressed: Optional[bool] = None,
286
chunk_size: Optional[int] = None,
287
labels: Optional[Dict[str, str]] = None,
288
ignore_max_time_diff: Optional[int] = None,
289
ignore_max_val_diff: Optional[float] = None
290
) -> int: ...
291
292
def ts_decrby(
293
self,
294
key: KeyT,
295
value: float,
296
timestamp: Optional[Union[int, str]] = None,
297
retention_msecs: Optional[int] = None,
298
uncompressed: Optional[bool] = None,
299
chunk_size: Optional[int] = None,
300
labels: Optional[Dict[str, str]] = None,
301
ignore_max_time_diff: Optional[int] = None,
302
ignore_max_val_diff: Optional[float] = None
303
) -> int: ...
304
```
305
306
## Usage Examples
307
308
### JSON Operations
309
310
```python
311
import fakeredis
312
313
# Enable JSON support (requires jsonpath-ng)
314
client = fakeredis.FakeRedis()
315
316
# Set JSON document
317
user_data = {
318
"name": "John Doe",
319
"email": "john@example.com",
320
"age": 30,
321
"addresses": [
322
{"type": "home", "city": "New York"},
323
{"type": "work", "city": "Boston"}
324
]
325
}
326
327
client.json_set("user:1", "$", user_data)
328
329
# Get entire document
330
user = client.json_get("user:1")
331
print(user)
332
333
# Get specific fields using JSONPath
334
name = client.json_get("user:1", "$.name")
335
print(name) # ["John Doe"]
336
337
work_city = client.json_get("user:1", "$.addresses[?(@.type=='work')].city")
338
print(work_city) # ["Boston"]
339
340
# Update nested values
341
client.json_set("user:1", "$.age", 31)
342
client.json_arrappend("user:1", "$.addresses", {"type": "vacation", "city": "Miami"})
343
344
# Array operations
345
addresses_count = client.json_arrlen("user:1", "$.addresses")
346
print(addresses_count) # [3]
347
```
348
349
### Bloom Filter Operations
350
351
```python
352
import fakeredis
353
354
# Enable Bloom filter support (requires pyprobables)
355
client = fakeredis.FakeRedis()
356
357
# Create Bloom filter with 1% error rate for 10000 items
358
client.bf_reserve("user_ids", 0.01, 10000)
359
360
# Add items
361
client.bf_add("user_ids", "user123")
362
client.bf_add("user_ids", "user456")
363
client.bf_madd("user_ids", "user789", "user101", "user202")
364
365
# Test membership
366
exists = client.bf_exists("user_ids", "user123") # True
367
missing = client.bf_exists("user_ids", "user999") # False (probably)
368
369
# Check multiple items
370
results = client.bf_mexists("user_ids", "user123", "user456", "user999")
371
print(results) # [True, True, False]
372
373
# Get filter info
374
info = client.bf_info("user_ids")
375
print(f"Capacity: {info['Capacity']}, Items: {info['Number of items inserted']}")
376
```
377
378
### Time Series Operations
379
380
```python
381
import fakeredis
382
import time
383
384
client = fakeredis.FakeRedis()
385
386
# Create time series for temperature sensor
387
client.ts_create(
388
"sensor:temp:room1",
389
retention_msecs=86400000, # 24 hours
390
labels={"sensor": "temperature", "room": "room1"}
391
)
392
393
# Add current temperature reading
394
current_time = int(time.time() * 1000)
395
client.ts_add("sensor:temp:room1", current_time, 22.5)
396
397
# Add multiple readings
398
readings = [
399
("sensor:temp:room1", current_time + 60000, 23.1),
400
("sensor:temp:room1", current_time + 120000, 22.8),
401
("sensor:temp:room1", current_time + 180000, 23.4)
402
]
403
client.ts_madd(readings)
404
405
# Get latest reading
406
latest = client.ts_get("sensor:temp:room1")
407
print(f"Latest reading: {latest[1]}°C at {latest[0]}")
408
409
# Get range of readings with aggregation
410
range_data = client.ts_range(
411
"sensor:temp:room1",
412
from_time=current_time,
413
to_time=current_time + 300000,
414
aggregation_type="avg",
415
bucket_size_msec=60000 # 1-minute buckets
416
)
417
418
# Create aggregation rule for hourly averages
419
client.ts_create("sensor:temp:room1:hourly")
420
client.ts_createrule(
421
"sensor:temp:room1",
422
"sensor:temp:room1:hourly",
423
"avg",
424
3600000 # 1 hour in milliseconds
425
)
426
```
427
428
### Probabilistic Data Structures
429
430
```python
431
import fakeredis
432
433
client = fakeredis.FakeRedis()
434
435
# Count-Min Sketch for frequency counting
436
client.cms_initbyprob("page_views", 0.01, 0.99) # 1% error, 99% confidence
437
438
# Track page view counts
439
client.cms_incrby("page_views", {"/home": 100, "/about": 50, "/contact": 25})
440
client.cms_incrby("page_views", {"/home": 75, "/products": 200})
441
442
# Query frequencies
443
counts = client.cms_query("page_views", "/home", "/about", "/products")
444
print(f"Page view estimates: /home={counts[0]}, /about={counts[1]}, /products={counts[2]}")
445
446
# Top-K for heavy hitters
447
client.topk_reserve("popular_pages", 10, 1000, 5, 0.9) # Track top 10 pages
448
449
# Add page views
450
client.topk_incrby("popular_pages", {"/home": 500, "/products": 300, "/blog": 200})
451
452
# Get top items
453
top_pages = client.topk_list("popular_pages")
454
print(f"Top pages: {top_pages}")
455
456
# T-Digest for percentile calculations
457
client.tdigest_create("response_times", compression=400)
458
459
# Add response time measurements (in milliseconds)
460
response_times = [45.2, 67.8, 89.1, 123.4, 234.5, 345.6, 456.7, 567.8]
461
client.tdigest_add("response_times", response_times)
462
463
# Calculate percentiles
464
percentiles = client.tdigest_quantile("response_times", 0.5, 0.95, 0.99)
465
print(f"Response time percentiles - 50th: {percentiles[0]:.1f}ms, 95th: {percentiles[1]:.1f}ms, 99th: {percentiles[2]:.1f}ms")
466
```