0
# RedisBloom
1
2
RedisBloom provides probabilistic data structures for Redis including Bloom filters, Cuckoo filters, Count-Min Sketch, and Top-K. These structures enable memory-efficient approximate membership queries, frequency estimation, and heavy hitters detection.
3
4
## Capabilities
5
6
### Bloom Filter Operations
7
8
Bloom filters for memory-efficient approximate set membership testing.
9
10
```python { .api }
11
def bf_reserve(
12
self,
13
key: str,
14
error_rate: float,
15
capacity: int
16
) -> str: ...
17
18
def bf_add(self, key: str, item: str) -> bool: ...
19
20
def bf_madd(self, key: str, *items: str) -> List[bool]: ...
21
22
def bf_exists(self, key: str, item: str) -> bool: ...
23
24
def bf_mexists(self, key: str, *items: str) -> List[bool]: ...
25
26
def bf_scandump(self, key: str, iterator: int) -> Tuple[int, bytes]: ...
27
28
def bf_loadchunk(self, key: str, iterator: int, data: bytes) -> str: ...
29
30
def bf_info(self, key: str) -> Dict[str, Any]: ...
31
```
32
33
### Cuckoo Filter Operations
34
35
Cuckoo filters for approximate set membership with deletion support.
36
37
```python { .api }
38
def cf_reserve(
39
self,
40
key: str,
41
capacity: int,
42
bucket_size: Optional[int] = None,
43
max_iterations: Optional[int] = None,
44
expansion: Optional[int] = None
45
) -> str: ...
46
47
def cf_add(self, key: str, item: str) -> bool: ...
48
49
def cf_addnx(self, key: str, item: str) -> bool: ...
50
51
def cf_insert(
52
self,
53
key: str,
54
*items: str,
55
capacity: Optional[int] = None,
56
nocreate: bool = False
57
) -> List[bool]: ...
58
59
def cf_insertnx(
60
self,
61
key: str,
62
*items: str,
63
capacity: Optional[int] = None,
64
nocreate: bool = False
65
) -> List[bool]: ...
66
67
def cf_exists(self, key: str, item: str) -> bool: ...
68
69
def cf_mexists(self, key: str, *items: str) -> List[bool]: ...
70
71
def cf_del(self, key: str, item: str) -> bool: ...
72
73
def cf_count(self, key: str, item: str) -> int: ...
74
75
def cf_scandump(self, key: str, iterator: int) -> Tuple[int, bytes]: ...
76
77
def cf_loadchunk(self, key: str, iterator: int, data: bytes) -> str: ...
78
79
def cf_info(self, key: str) -> Dict[str, Any]: ...
80
```
81
82
### Count-Min Sketch Operations
83
84
Count-Min Sketch for frequency estimation of items in data streams.
85
86
```python { .api }
87
def cms_initbydim(
88
self,
89
key: str,
90
width: int,
91
depth: int
92
) -> str: ...
93
94
def cms_initbyprob(
95
self,
96
key: str,
97
error: float,
98
probability: float
99
) -> str: ...
100
101
def cms_incrby(
102
self,
103
key: str,
104
*items_increments: Tuple[str, int]
105
) -> List[int]: ...
106
107
def cms_query(self, key: str, *items: str) -> List[int]: ...
108
109
def cms_merge(
110
self,
111
dest_key: str,
112
num_keys: int,
113
*src_keys: str,
114
weights: Optional[List[int]] = None
115
) -> str: ...
116
117
def cms_info(self, key: str) -> Dict[str, Any]: ...
118
```
119
120
### Top-K Operations
121
122
Top-K data structure for tracking the most frequent items.
123
124
```python { .api }
125
def topk_reserve(
126
self,
127
key: str,
128
k: int,
129
width: int,
130
depth: int,
131
decay: float
132
) -> str: ...
133
134
def topk_add(self, key: str, *items: str) -> List[Optional[str]]: ...
135
136
def topk_incrby(
137
self,
138
key: str,
139
*items_increments: Tuple[str, int]
140
) -> List[Optional[str]]: ...
141
142
def topk_query(self, key: str, *items: str) -> List[bool]: ...
143
144
def topk_count(self, key: str, *items: str) -> List[int]: ...
145
146
def topk_list(self, key: str, with_count: bool = False) -> List[Any]: ...
147
148
def topk_info(self, key: str) -> Dict[str, Any]: ...
149
```
150
151
## Usage Examples
152
153
### Bloom Filter for Set Membership
154
155
```python
156
import redis
157
import random
158
import string
159
160
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
161
162
# Create Bloom filter for user email tracking
163
def setup_email_bloom_filter():
164
# Reserve Bloom filter with 1% error rate for 100K items
165
r.bf().reserve("user_emails", 0.01, 100000)
166
print("Created Bloom filter for user emails")
167
168
# Add some email addresses
169
emails = [
170
"user1@example.com",
171
"user2@example.com",
172
"admin@company.com",
173
"support@company.com",
174
"noreply@service.com"
175
]
176
177
# Add emails individually
178
for email in emails[:3]:
179
added = r.bf().add("user_emails", email)
180
print(f"Added {email}: {added}")
181
182
# Add multiple emails at once
183
results = r.bf().madd("user_emails", *emails[3:])
184
print(f"Batch added emails: {results}")
185
186
def test_email_membership():
187
# Test membership
188
test_emails = [
189
"user1@example.com", # Should exist
190
"admin@company.com", # Should exist
191
"unknown@test.com", # Should not exist
192
"fake@domain.com" # Should not exist
193
]
194
195
print("\nTesting email membership:")
196
for email in test_emails:
197
exists = r.bf().exists("user_emails", email)
198
print(f" {email}: {'EXISTS' if exists else 'NOT FOUND'}")
199
200
# Test multiple emails at once
201
results = r.bf().mexists("user_emails", *test_emails)
202
print(f"\nBatch membership test: {results}")
203
204
# Get filter information
205
info = r.bf().info("user_emails")
206
print(f"\nBloom filter info:")
207
print(f" Capacity: {info.get('Capacity', 'N/A')}")
208
print(f" Size: {info.get('Size', 'N/A')}")
209
print(f" Number of filters: {info.get('Number of filters', 'N/A')}")
210
print(f" Number of items inserted: {info.get('Number of items inserted', 'N/A')}")
211
212
setup_email_bloom_filter()
213
test_email_membership()
214
```
215
216
### Cuckoo Filter with Deletion Support
217
218
```python
219
import redis
220
import time
221
222
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
223
224
def setup_session_cuckoo_filter():
225
# Create Cuckoo filter for tracking active sessions
226
r.cf().reserve("active_sessions", 10000) # Capacity for 10K sessions
227
print("Created Cuckoo filter for active sessions")
228
229
# Simulate user sessions
230
sessions = [
231
"session_abc123",
232
"session_def456",
233
"session_ghi789",
234
"session_jkl012",
235
"session_mno345"
236
]
237
238
# Add sessions
239
for session in sessions:
240
added = r.cf().add("active_sessions", session)
241
print(f"Added session {session}: {added}")
242
243
def manage_sessions():
244
# Check which sessions exist
245
test_sessions = [
246
"session_abc123", # Should exist
247
"session_xyz999", # Should not exist
248
"session_def456" # Should exist
249
]
250
251
print("\nChecking session existence:")
252
results = r.cf().mexists("active_sessions", *test_sessions)
253
for session, exists in zip(test_sessions, results):
254
print(f" {session}: {'ACTIVE' if exists else 'INACTIVE'}")
255
256
# Simulate session expiration (delete from filter)
257
expired_session = "session_abc123"
258
deleted = r.cf().del("active_sessions", expired_session)
259
print(f"\nExpired session {expired_session}: {deleted}")
260
261
# Verify deletion
262
still_exists = r.cf().exists("active_sessions", expired_session)
263
print(f"Session still exists after deletion: {still_exists}")
264
265
# Get session count (approximate)
266
count = r.cf().count("active_sessions", "session_def456")
267
print(f"Count for session_def456: {count}")
268
269
# Get filter info
270
info = r.cf().info("active_sessions")
271
print(f"\nCuckoo filter info:")
272
print(f" Size: {info.get('Size', 'N/A')}")
273
print(f" Number of buckets: {info.get('Number of buckets', 'N/A')}")
274
print(f" Number of items: {info.get('Number of items', 'N/A')}")
275
276
setup_session_cuckoo_filter()
277
manage_sessions()
278
```
279
280
### Count-Min Sketch for Frequency Estimation
281
282
```python
283
import redis
284
import random
285
286
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
287
288
def setup_page_view_counter():
289
# Initialize Count-Min Sketch with error rate and probability
290
r.cms().initbyprob("page_views", 0.01, 0.99) # 1% error, 99% probability
291
print("Created Count-Min Sketch for page view counting")
292
293
# Simulate page views
294
pages = [
295
"/home", "/products", "/about", "/contact",
296
"/blog", "/help", "/pricing", "/features"
297
]
298
299
# Simulate random page views
300
view_counts = {}
301
for _ in range(1000):
302
page = random.choice(pages)
303
views = random.randint(1, 5)
304
305
# Increment in CMS
306
r.cms().incrby("page_views", page, views)
307
308
# Keep actual count for comparison
309
view_counts[page] = view_counts.get(page, 0) + views
310
311
print("Simulated 1000 page view events")
312
return view_counts
313
314
def analyze_page_views(actual_counts):
315
# Query estimated counts from CMS
316
pages = list(actual_counts.keys())
317
estimated_counts = r.cms().query("page_views", *pages)
318
319
print("\nPage view analysis (Actual vs Estimated):")
320
print("-" * 50)
321
322
total_error = 0
323
for page, estimated, actual in zip(pages, estimated_counts, actual_counts.values()):
324
error = abs(estimated - actual)
325
error_pct = (error / actual * 100) if actual > 0 else 0
326
total_error += error_pct
327
328
print(f"{page:12} | Actual: {actual:4d} | Estimated: {estimated:4d} | Error: {error_pct:.1f}%")
329
330
avg_error = total_error / len(pages)
331
print(f"\nAverage error rate: {avg_error:.2f}%")
332
333
# Get CMS information
334
info = r.cms().info("page_views")
335
print(f"\nCount-Min Sketch info:")
336
print(f" Width: {info.get('width', 'N/A')}")
337
print(f" Depth: {info.get('depth', 'N/A')}")
338
print(f" Count: {info.get('count', 'N/A')}")
339
340
actual_counts = setup_page_view_counter()
341
analyze_page_views(actual_counts)
342
```
343
344
### Top-K for Heavy Hitters Detection
345
346
```python
347
import redis
348
import random
349
import time
350
351
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
352
353
def setup_trending_hashtags():
354
# Create Top-K structure for top 10 hashtags
355
r.topk().reserve("trending_hashtags", 10, 2000, 7, 0.925)
356
print("Created Top-K for trending hashtags (top 10)")
357
358
# Simulate hashtag usage
359
hashtags = [
360
"#python", "#redis", "#database", "#programming", "#coding",
361
"#tech", "#software", "#development", "#data", "#cloud",
362
"#ai", "#machinelearning", "#devops", "#web", "#mobile"
363
]
364
365
# Simulate trending patterns
366
trending_weights = {
367
"#python": 50,
368
"#redis": 45,
369
"#programming": 40,
370
"#tech": 35,
371
"#coding": 30,
372
"#ai": 25,
373
"#data": 20,
374
"#web": 15,
375
"#cloud": 12,
376
"#software": 10
377
}
378
379
# Add hashtags with different frequencies
380
for hashtag, base_weight in trending_weights.items():
381
# Add some randomness to simulate real usage
382
actual_count = base_weight + random.randint(-5, 15)
383
if actual_count > 0:
384
r.topk().incrby("trending_hashtags", hashtag, actual_count)
385
386
print("Simulated hashtag usage patterns")
387
388
def analyze_trending_hashtags():
389
# Get the current top hashtags
390
top_hashtags = r.topk().list("trending_hashtags", with_count=True)
391
392
print("\nCurrent trending hashtags:")
393
print("-" * 30)
394
for i in range(0, len(top_hashtags), 2):
395
hashtag = top_hashtags[i]
396
count = top_hashtags[i + 1] if i + 1 < len(top_hashtags) else 0
397
rank = (i // 2) + 1
398
print(f"{rank:2d}. {hashtag:15} ({count} mentions)")
399
400
# Test specific hashtag queries
401
test_hashtags = ["#python", "#redis", "#javascript", "#unknown"]
402
403
print("\nHashtag presence in top-K:")
404
presence = r.topk().query("trending_hashtags", *test_hashtags)
405
counts = r.topk().count("trending_hashtags", *test_hashtags)
406
407
for hashtag, is_present, count in zip(test_hashtags, presence, counts):
408
status = "IN TOP-K" if is_present else "NOT IN TOP-K"
409
print(f" {hashtag:15}: {status:12} (count: {count})")
410
411
# Get Top-K information
412
info = r.topk().info("trending_hashtags")
413
print(f"\nTop-K structure info:")
414
print(f" K: {info.get('k', 'N/A')}")
415
print(f" Width: {info.get('width', 'N/A')}")
416
print(f" Depth: {info.get('depth', 'N/A')}")
417
print(f" Decay: {info.get('decay', 'N/A')}")
418
419
def simulate_real_time_trending():
420
print("\nSimulating real-time hashtag updates:")
421
422
# Simulate new hashtag mentions coming in
423
new_mentions = [
424
("#python", 5),
425
("#javascript", 8),
426
("#redis", 3),
427
("#newtech", 12),
428
("#viral", 20)
429
]
430
431
for hashtag, mentions in new_mentions:
432
# Add new mentions
433
evicted = r.topk().incrby("trending_hashtags", hashtag, mentions)
434
435
if evicted and evicted[0]:
436
print(f" Added {mentions} mentions to {hashtag} - evicted: {evicted[0]}")
437
else:
438
print(f" Added {mentions} mentions to {hashtag}")
439
440
# Show updated top list
441
print("\nUpdated trending hashtags:")
442
top_hashtags = r.topk().list("trending_hashtags", with_count=True)
443
for i in range(0, min(10, len(top_hashtags)), 2):
444
hashtag = top_hashtags[i]
445
count = top_hashtags[i + 1] if i + 1 < len(top_hashtags) else 0
446
rank = (i // 2) + 1
447
print(f" {rank}. {hashtag} ({count})")
448
449
setup_trending_hashtags()
450
analyze_trending_hashtags()
451
simulate_real_time_trending()
452
```
453
454
### Combining Multiple Probabilistic Structures
455
456
```python
457
import redis
458
import random
459
import string
460
461
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
462
463
class UserActivityTracker:
464
def __init__(self, redis_client):
465
self.r = redis_client
466
self.setup_structures()
467
468
def setup_structures(self):
469
"""Initialize all probabilistic data structures"""
470
# Bloom filter for registered users
471
self.r.bf().reserve("registered_users", 0.01, 1000000)
472
473
# Cuckoo filter for active users (allows deletion for logout)
474
self.r.cf().reserve("active_users", 100000)
475
476
# Count-Min Sketch for page view frequency
477
self.r.cms().initbyprob("page_view_frequency", 0.01, 0.99)
478
479
# Top-K for most active users
480
self.r.topk().reserve("most_active_users", 50, 1000, 7, 0.9)
481
482
print("Initialized user activity tracking structures")
483
484
def register_user(self, user_id):
485
"""Register a new user"""
486
self.r.bf().add("registered_users", user_id)
487
print(f"Registered user: {user_id}")
488
489
def user_login(self, user_id):
490
"""Handle user login"""
491
# Check if user is registered
492
if not self.r.bf().exists("registered_users", user_id):
493
print(f"Warning: User {user_id} not registered but attempting login")
494
return False
495
496
# Mark as active
497
self.r.cf().add("active_users", user_id)
498
print(f"User {user_id} logged in")
499
return True
500
501
def user_logout(self, user_id):
502
"""Handle user logout"""
503
# Remove from active users
504
removed = self.r.cf().del("active_users", user_id)
505
if removed:
506
print(f"User {user_id} logged out")
507
return removed
508
509
def track_page_view(self, user_id, page):
510
"""Track page view for user"""
511
# Increment page view count
512
self.r.cms().incrby("page_view_frequency", page, 1)
513
514
# Track user activity
515
self.r.topk().incrby("most_active_users", user_id, 1)
516
517
def is_user_registered(self, user_id):
518
"""Check if user is registered (may have false positives)"""
519
return self.r.bf().exists("registered_users", user_id)
520
521
def is_user_active(self, user_id):
522
"""Check if user is currently active"""
523
return self.r.cf().exists("active_users", user_id)
524
525
def get_page_views(self, *pages):
526
"""Get estimated page view counts"""
527
return self.r.cms().query("page_view_frequency", *pages)
528
529
def get_most_active_users(self):
530
"""Get list of most active users"""
531
return self.r.topk().list("most_active_users", with_count=True)
532
533
def get_stats(self):
534
"""Get system statistics"""
535
bf_info = self.r.bf().info("registered_users")
536
cf_info = self.r.cf().info("active_users")
537
cms_info = self.r.cms().info("page_view_frequency")
538
topk_info = self.r.topk().info("most_active_users")
539
540
return {
541
"registered_users": bf_info.get("Number of items inserted", 0),
542
"active_users": cf_info.get("Number of items", 0),
543
"total_page_views": cms_info.get("count", 0),
544
"tracking_top_users": topk_info.get("k", 0)
545
}
546
547
# Usage example
548
tracker = UserActivityTracker(r)
549
550
# Simulate user registrations
551
users = [f"user_{i:04d}" for i in range(1, 101)]
552
for user in users[:50]: # Register first 50 users
553
tracker.register_user(user)
554
555
# Simulate user logins
556
active_users = random.sample(users[:50], 20) # 20 users login
557
for user in active_users:
558
tracker.user_login(user)
559
560
# Simulate page views
561
pages = ["/home", "/dashboard", "/profile", "/settings", "/help"]
562
for _ in range(500): # 500 page views
563
user = random.choice(active_users)
564
page = random.choice(pages)
565
tracker.track_page_view(user, page)
566
567
# Check system stats
568
print("\nSystem Statistics:")
569
stats = tracker.get_stats()
570
for metric, value in stats.items():
571
print(f" {metric}: {value}")
572
573
# Check some users
574
test_users = ["user_0001", "user_0025", "user_0075", "user_0099"]
575
print("\nUser Status Check:")
576
for user in test_users:
577
registered = tracker.is_user_registered(user)
578
active = tracker.is_user_active(user)
579
print(f" {user}: Registered={registered}, Active={active}")
580
581
# Get page view statistics
582
page_views = tracker.get_page_views(*pages)
583
print("\nPage View Statistics:")
584
for page, views in zip(pages, page_views):
585
print(f" {page:12}: {views} views")
586
587
# Get most active users
588
print("\nMost Active Users:")
589
most_active = tracker.get_most_active_users()
590
for i in range(0, min(10, len(most_active)), 2):
591
user = most_active[i]
592
activity_count = most_active[i + 1] if i + 1 < len(most_active) else 0
593
print(f" {user}: {activity_count} activities")
594
595
# Simulate some logouts
596
logout_users = random.sample(active_users, 5)
597
print(f"\nSimulating {len(logout_users)} user logouts:")
598
for user in logout_users:
599
tracker.user_logout(user)
600
601
# Check updated active user count
602
final_stats = tracker.get_stats()
603
print(f"\nFinal active users: {final_stats['active_users']}")
604
```