Python client for Redis database and key-value store
—
RedisBloom provides probabilistic data structures for Redis including Bloom filters, Cuckoo filters, Count-Min Sketch, and Top-K. These structures enable memory-efficient approximate membership queries, frequency estimation, and heavy hitters detection.
Bloom filters for memory-efficient approximate set membership testing.
def bf_reserve(
self,
key: str,
error_rate: float,
capacity: int
) -> str: ...
def bf_add(self, key: str, item: str) -> bool: ...
def bf_madd(self, key: str, *items: str) -> List[bool]: ...
def bf_exists(self, key: str, item: str) -> bool: ...
def bf_mexists(self, key: str, *items: str) -> List[bool]: ...
def bf_scandump(self, key: str, iterator: int) -> Tuple[int, bytes]: ...
def bf_loadchunk(self, key: str, iterator: int, data: bytes) -> str: ...
def bf_info(self, key: str) -> Dict[str, Any]: ...Cuckoo filters for approximate set membership with deletion support.
def cf_reserve(
self,
key: str,
capacity: int,
bucket_size: Optional[int] = None,
max_iterations: Optional[int] = None,
expansion: Optional[int] = None
) -> str: ...
def cf_add(self, key: str, item: str) -> bool: ...
def cf_addnx(self, key: str, item: str) -> bool: ...
def cf_insert(
self,
key: str,
*items: str,
capacity: Optional[int] = None,
nocreate: bool = False
) -> List[bool]: ...
def cf_insertnx(
self,
key: str,
*items: str,
capacity: Optional[int] = None,
nocreate: bool = False
) -> List[bool]: ...
def cf_exists(self, key: str, item: str) -> bool: ...
def cf_mexists(self, key: str, *items: str) -> List[bool]: ...
def cf_del(self, key: str, item: str) -> bool: ...
def cf_count(self, key: str, item: str) -> int: ...
def cf_scandump(self, key: str, iterator: int) -> Tuple[int, bytes]: ...
def cf_loadchunk(self, key: str, iterator: int, data: bytes) -> str: ...
def cf_info(self, key: str) -> Dict[str, Any]: ...Count-Min Sketch for frequency estimation of items in data streams.
def cms_initbydim(
self,
key: str,
width: int,
depth: int
) -> str: ...
def cms_initbyprob(
self,
key: str,
error: float,
probability: float
) -> str: ...
def cms_incrby(
self,
key: str,
*items_increments: Tuple[str, int]
) -> List[int]: ...
def cms_query(self, key: str, *items: str) -> List[int]: ...
def cms_merge(
self,
dest_key: str,
num_keys: int,
*src_keys: str,
weights: Optional[List[int]] = None
) -> str: ...
def cms_info(self, key: str) -> Dict[str, Any]: ...Top-K data structure for tracking the most frequent items.
def topk_reserve(
self,
key: str,
k: int,
width: int,
depth: int,
decay: float
) -> str: ...
def topk_add(self, key: str, *items: str) -> List[Optional[str]]: ...
def topk_incrby(
self,
key: str,
*items_increments: Tuple[str, int]
) -> List[Optional[str]]: ...
def topk_query(self, key: str, *items: str) -> List[bool]: ...
def topk_count(self, key: str, *items: str) -> List[int]: ...
def topk_list(self, key: str, with_count: bool = False) -> List[Any]: ...
def topk_info(self, key: str) -> Dict[str, Any]: ...import redis
import random
import string
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Create Bloom filter for user email tracking
def setup_email_bloom_filter():
# Reserve Bloom filter with 1% error rate for 100K items
r.bf().reserve("user_emails", 0.01, 100000)
print("Created Bloom filter for user emails")
# Add some email addresses
emails = [
"user1@example.com",
"user2@example.com",
"admin@company.com",
"support@company.com",
"noreply@service.com"
]
# Add emails individually
for email in emails[:3]:
added = r.bf().add("user_emails", email)
print(f"Added {email}: {added}")
# Add multiple emails at once
results = r.bf().madd("user_emails", *emails[3:])
print(f"Batch added emails: {results}")
def test_email_membership():
# Test membership
test_emails = [
"user1@example.com", # Should exist
"admin@company.com", # Should exist
"unknown@test.com", # Should not exist
"fake@domain.com" # Should not exist
]
print("\nTesting email membership:")
for email in test_emails:
exists = r.bf().exists("user_emails", email)
print(f" {email}: {'EXISTS' if exists else 'NOT FOUND'}")
# Test multiple emails at once
results = r.bf().mexists("user_emails", *test_emails)
print(f"\nBatch membership test: {results}")
# Get filter information
info = r.bf().info("user_emails")
print(f"\nBloom filter info:")
print(f" Capacity: {info.get('Capacity', 'N/A')}")
print(f" Size: {info.get('Size', 'N/A')}")
print(f" Number of filters: {info.get('Number of filters', 'N/A')}")
print(f" Number of items inserted: {info.get('Number of items inserted', 'N/A')}")
setup_email_bloom_filter()
test_email_membership()import redis
import time
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def setup_session_cuckoo_filter():
# Create Cuckoo filter for tracking active sessions
r.cf().reserve("active_sessions", 10000) # Capacity for 10K sessions
print("Created Cuckoo filter for active sessions")
# Simulate user sessions
sessions = [
"session_abc123",
"session_def456",
"session_ghi789",
"session_jkl012",
"session_mno345"
]
# Add sessions
for session in sessions:
added = r.cf().add("active_sessions", session)
print(f"Added session {session}: {added}")
def manage_sessions():
# Check which sessions exist
test_sessions = [
"session_abc123", # Should exist
"session_xyz999", # Should not exist
"session_def456" # Should exist
]
print("\nChecking session existence:")
results = r.cf().mexists("active_sessions", *test_sessions)
for session, exists in zip(test_sessions, results):
print(f" {session}: {'ACTIVE' if exists else 'INACTIVE'}")
# Simulate session expiration (delete from filter)
expired_session = "session_abc123"
deleted = r.cf().del("active_sessions", expired_session)
print(f"\nExpired session {expired_session}: {deleted}")
# Verify deletion
still_exists = r.cf().exists("active_sessions", expired_session)
print(f"Session still exists after deletion: {still_exists}")
# Get session count (approximate)
count = r.cf().count("active_sessions", "session_def456")
print(f"Count for session_def456: {count}")
# Get filter info
info = r.cf().info("active_sessions")
print(f"\nCuckoo filter info:")
print(f" Size: {info.get('Size', 'N/A')}")
print(f" Number of buckets: {info.get('Number of buckets', 'N/A')}")
print(f" Number of items: {info.get('Number of items', 'N/A')}")
setup_session_cuckoo_filter()
manage_sessions()import redis
import random
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def setup_page_view_counter():
# Initialize Count-Min Sketch with error rate and probability
r.cms().initbyprob("page_views", 0.01, 0.99) # 1% error, 99% probability
print("Created Count-Min Sketch for page view counting")
# Simulate page views
pages = [
"/home", "/products", "/about", "/contact",
"/blog", "/help", "/pricing", "/features"
]
# Simulate random page views
view_counts = {}
for _ in range(1000):
page = random.choice(pages)
views = random.randint(1, 5)
# Increment in CMS
r.cms().incrby("page_views", page, views)
# Keep actual count for comparison
view_counts[page] = view_counts.get(page, 0) + views
print("Simulated 1000 page view events")
return view_counts
def analyze_page_views(actual_counts):
# Query estimated counts from CMS
pages = list(actual_counts.keys())
estimated_counts = r.cms().query("page_views", *pages)
print("\nPage view analysis (Actual vs Estimated):")
print("-" * 50)
total_error = 0
for page, estimated, actual in zip(pages, estimated_counts, actual_counts.values()):
error = abs(estimated - actual)
error_pct = (error / actual * 100) if actual > 0 else 0
total_error += error_pct
print(f"{page:12} | Actual: {actual:4d} | Estimated: {estimated:4d} | Error: {error_pct:.1f}%")
avg_error = total_error / len(pages)
print(f"\nAverage error rate: {avg_error:.2f}%")
# Get CMS information
info = r.cms().info("page_views")
print(f"\nCount-Min Sketch info:")
print(f" Width: {info.get('width', 'N/A')}")
print(f" Depth: {info.get('depth', 'N/A')}")
print(f" Count: {info.get('count', 'N/A')}")
actual_counts = setup_page_view_counter()
analyze_page_views(actual_counts)import redis
import random
import time
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def setup_trending_hashtags():
# Create Top-K structure for top 10 hashtags
r.topk().reserve("trending_hashtags", 10, 2000, 7, 0.925)
print("Created Top-K for trending hashtags (top 10)")
# Simulate hashtag usage
hashtags = [
"#python", "#redis", "#database", "#programming", "#coding",
"#tech", "#software", "#development", "#data", "#cloud",
"#ai", "#machinelearning", "#devops", "#web", "#mobile"
]
# Simulate trending patterns
trending_weights = {
"#python": 50,
"#redis": 45,
"#programming": 40,
"#tech": 35,
"#coding": 30,
"#ai": 25,
"#data": 20,
"#web": 15,
"#cloud": 12,
"#software": 10
}
# Add hashtags with different frequencies
for hashtag, base_weight in trending_weights.items():
# Add some randomness to simulate real usage
actual_count = base_weight + random.randint(-5, 15)
if actual_count > 0:
r.topk().incrby("trending_hashtags", hashtag, actual_count)
print("Simulated hashtag usage patterns")
def analyze_trending_hashtags():
# Get the current top hashtags
top_hashtags = r.topk().list("trending_hashtags", with_count=True)
print("\nCurrent trending hashtags:")
print("-" * 30)
for i in range(0, len(top_hashtags), 2):
hashtag = top_hashtags[i]
count = top_hashtags[i + 1] if i + 1 < len(top_hashtags) else 0
rank = (i // 2) + 1
print(f"{rank:2d}. {hashtag:15} ({count} mentions)")
# Test specific hashtag queries
test_hashtags = ["#python", "#redis", "#javascript", "#unknown"]
print("\nHashtag presence in top-K:")
presence = r.topk().query("trending_hashtags", *test_hashtags)
counts = r.topk().count("trending_hashtags", *test_hashtags)
for hashtag, is_present, count in zip(test_hashtags, presence, counts):
status = "IN TOP-K" if is_present else "NOT IN TOP-K"
print(f" {hashtag:15}: {status:12} (count: {count})")
# Get Top-K information
info = r.topk().info("trending_hashtags")
print(f"\nTop-K structure info:")
print(f" K: {info.get('k', 'N/A')}")
print(f" Width: {info.get('width', 'N/A')}")
print(f" Depth: {info.get('depth', 'N/A')}")
print(f" Decay: {info.get('decay', 'N/A')}")
def simulate_real_time_trending():
print("\nSimulating real-time hashtag updates:")
# Simulate new hashtag mentions coming in
new_mentions = [
("#python", 5),
("#javascript", 8),
("#redis", 3),
("#newtech", 12),
("#viral", 20)
]
for hashtag, mentions in new_mentions:
# Add new mentions
evicted = r.topk().incrby("trending_hashtags", hashtag, mentions)
if evicted and evicted[0]:
print(f" Added {mentions} mentions to {hashtag} - evicted: {evicted[0]}")
else:
print(f" Added {mentions} mentions to {hashtag}")
# Show updated top list
print("\nUpdated trending hashtags:")
top_hashtags = r.topk().list("trending_hashtags", with_count=True)
for i in range(0, min(10, len(top_hashtags)), 2):
hashtag = top_hashtags[i]
count = top_hashtags[i + 1] if i + 1 < len(top_hashtags) else 0
rank = (i // 2) + 1
print(f" {rank}. {hashtag} ({count})")
setup_trending_hashtags()
analyze_trending_hashtags()
simulate_real_time_trending()import redis
import random
import string
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
class UserActivityTracker:
def __init__(self, redis_client):
self.r = redis_client
self.setup_structures()
def setup_structures(self):
"""Initialize all probabilistic data structures"""
# Bloom filter for registered users
self.r.bf().reserve("registered_users", 0.01, 1000000)
# Cuckoo filter for active users (allows deletion for logout)
self.r.cf().reserve("active_users", 100000)
# Count-Min Sketch for page view frequency
self.r.cms().initbyprob("page_view_frequency", 0.01, 0.99)
# Top-K for most active users
self.r.topk().reserve("most_active_users", 50, 1000, 7, 0.9)
print("Initialized user activity tracking structures")
def register_user(self, user_id):
"""Register a new user"""
self.r.bf().add("registered_users", user_id)
print(f"Registered user: {user_id}")
def user_login(self, user_id):
"""Handle user login"""
# Check if user is registered
if not self.r.bf().exists("registered_users", user_id):
print(f"Warning: User {user_id} not registered but attempting login")
return False
# Mark as active
self.r.cf().add("active_users", user_id)
print(f"User {user_id} logged in")
return True
def user_logout(self, user_id):
"""Handle user logout"""
# Remove from active users
removed = self.r.cf().del("active_users", user_id)
if removed:
print(f"User {user_id} logged out")
return removed
def track_page_view(self, user_id, page):
"""Track page view for user"""
# Increment page view count
self.r.cms().incrby("page_view_frequency", page, 1)
# Track user activity
self.r.topk().incrby("most_active_users", user_id, 1)
def is_user_registered(self, user_id):
"""Check if user is registered (may have false positives)"""
return self.r.bf().exists("registered_users", user_id)
def is_user_active(self, user_id):
"""Check if user is currently active"""
return self.r.cf().exists("active_users", user_id)
def get_page_views(self, *pages):
"""Get estimated page view counts"""
return self.r.cms().query("page_view_frequency", *pages)
def get_most_active_users(self):
"""Get list of most active users"""
return self.r.topk().list("most_active_users", with_count=True)
def get_stats(self):
"""Get system statistics"""
bf_info = self.r.bf().info("registered_users")
cf_info = self.r.cf().info("active_users")
cms_info = self.r.cms().info("page_view_frequency")
topk_info = self.r.topk().info("most_active_users")
return {
"registered_users": bf_info.get("Number of items inserted", 0),
"active_users": cf_info.get("Number of items", 0),
"total_page_views": cms_info.get("count", 0),
"tracking_top_users": topk_info.get("k", 0)
}
# Usage example
tracker = UserActivityTracker(r)
# Simulate user registrations
users = [f"user_{i:04d}" for i in range(1, 101)]
for user in users[:50]: # Register first 50 users
tracker.register_user(user)
# Simulate user logins
active_users = random.sample(users[:50], 20) # 20 users login
for user in active_users:
tracker.user_login(user)
# Simulate page views
pages = ["/home", "/dashboard", "/profile", "/settings", "/help"]
for _ in range(500): # 500 page views
user = random.choice(active_users)
page = random.choice(pages)
tracker.track_page_view(user, page)
# Check system stats
print("\nSystem Statistics:")
stats = tracker.get_stats()
for metric, value in stats.items():
print(f" {metric}: {value}")
# Check some users
test_users = ["user_0001", "user_0025", "user_0075", "user_0099"]
print("\nUser Status Check:")
for user in test_users:
registered = tracker.is_user_registered(user)
active = tracker.is_user_active(user)
print(f" {user}: Registered={registered}, Active={active}")
# Get page view statistics
page_views = tracker.get_page_views(*pages)
print("\nPage View Statistics:")
for page, views in zip(pages, page_views):
print(f" {page:12}: {views} views")
# Get most active users
print("\nMost Active Users:")
most_active = tracker.get_most_active_users()
for i in range(0, min(10, len(most_active)), 2):
user = most_active[i]
activity_count = most_active[i + 1] if i + 1 < len(most_active) else 0
print(f" {user}: {activity_count} activities")
# Simulate some logouts
logout_users = random.sample(active_users, 5)
print(f"\nSimulating {len(logout_users)} user logouts:")
for user in logout_users:
tracker.user_logout(user)
# Check updated active user count
final_stats = tracker.get_stats()
print(f"\nFinal active users: {final_stats['active_users']}")Install with Tessl CLI
npx tessl i tessl/pypi-redis