CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-redis

Python client for Redis database and key-value store

Pending
Overview
Eval results
Files

redis-bloom.mddocs/modules/

RedisBloom

RedisBloom provides probabilistic data structures for Redis including Bloom filters, Cuckoo filters, Count-Min Sketch, and Top-K. These structures enable memory-efficient approximate membership queries, frequency estimation, and heavy hitters detection.

Capabilities

Bloom Filter Operations

Bloom filters for memory-efficient approximate set membership testing.

def bf_reserve(
    self,
    key: str,
    error_rate: float,
    capacity: int
) -> str: ...

def bf_add(self, key: str, item: str) -> bool: ...

def bf_madd(self, key: str, *items: str) -> List[bool]: ...

def bf_exists(self, key: str, item: str) -> bool: ...

def bf_mexists(self, key: str, *items: str) -> List[bool]: ...

def bf_scandump(self, key: str, iterator: int) -> Tuple[int, bytes]: ...

def bf_loadchunk(self, key: str, iterator: int, data: bytes) -> str: ...

def bf_info(self, key: str) -> Dict[str, Any]: ...

Cuckoo Filter Operations

Cuckoo filters for approximate set membership with deletion support.

def cf_reserve(
    self,
    key: str,
    capacity: int,
    bucket_size: Optional[int] = None,
    max_iterations: Optional[int] = None,
    expansion: Optional[int] = None
) -> str: ...

def cf_add(self, key: str, item: str) -> bool: ...

def cf_addnx(self, key: str, item: str) -> bool: ...

def cf_insert(
    self,
    key: str,
    *items: str,
    capacity: Optional[int] = None,
    nocreate: bool = False
) -> List[bool]: ...

def cf_insertnx(
    self,
    key: str,
    *items: str,
    capacity: Optional[int] = None,
    nocreate: bool = False
) -> List[bool]: ...

def cf_exists(self, key: str, item: str) -> bool: ...

def cf_mexists(self, key: str, *items: str) -> List[bool]: ...

def cf_del(self, key: str, item: str) -> bool: ...

def cf_count(self, key: str, item: str) -> int: ...

def cf_scandump(self, key: str, iterator: int) -> Tuple[int, bytes]: ...

def cf_loadchunk(self, key: str, iterator: int, data: bytes) -> str: ...

def cf_info(self, key: str) -> Dict[str, Any]: ...

Count-Min Sketch Operations

Count-Min Sketch for frequency estimation of items in data streams.

def cms_initbydim(
    self,
    key: str,
    width: int,
    depth: int
) -> str: ...

def cms_initbyprob(
    self,
    key: str,
    error: float,
    probability: float
) -> str: ...

def cms_incrby(
    self,
    key: str,
    *items_increments: Tuple[str, int]
) -> List[int]: ...

def cms_query(self, key: str, *items: str) -> List[int]: ...

def cms_merge(
    self,
    dest_key: str,
    num_keys: int,
    *src_keys: str,
    weights: Optional[List[int]] = None
) -> str: ...

def cms_info(self, key: str) -> Dict[str, Any]: ...

Top-K Operations

Top-K data structure for tracking the most frequent items.

def topk_reserve(
    self,
    key: str,
    k: int,
    width: int,
    depth: int,
    decay: float
) -> str: ...

def topk_add(self, key: str, *items: str) -> List[Optional[str]]: ...

def topk_incrby(
    self,
    key: str,
    *items_increments: Tuple[str, int]
) -> List[Optional[str]]: ...

def topk_query(self, key: str, *items: str) -> List[bool]: ...

def topk_count(self, key: str, *items: str) -> List[int]: ...

def topk_list(self, key: str, with_count: bool = False) -> List[Any]: ...

def topk_info(self, key: str) -> Dict[str, Any]: ...

Usage Examples

Bloom Filter for Set Membership

import redis
import random
import string

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Create Bloom filter for user email tracking
def setup_email_bloom_filter():
    # Reserve Bloom filter with 1% error rate for 100K items
    r.bf().reserve("user_emails", 0.01, 100000)
    print("Created Bloom filter for user emails")
    
    # Add some email addresses
    emails = [
        "user1@example.com",
        "user2@example.com", 
        "admin@company.com",
        "support@company.com",
        "noreply@service.com"
    ]
    
    # Add emails individually
    for email in emails[:3]:
        added = r.bf().add("user_emails", email)
        print(f"Added {email}: {added}")
    
    # Add multiple emails at once
    results = r.bf().madd("user_emails", *emails[3:])
    print(f"Batch added emails: {results}")

def test_email_membership():
    # Test membership
    test_emails = [
        "user1@example.com",      # Should exist
        "admin@company.com",      # Should exist  
        "unknown@test.com",       # Should not exist
        "fake@domain.com"         # Should not exist
    ]
    
    print("\nTesting email membership:")
    for email in test_emails:
        exists = r.bf().exists("user_emails", email)
        print(f"  {email}: {'EXISTS' if exists else 'NOT FOUND'}")
    
    # Test multiple emails at once
    results = r.bf().mexists("user_emails", *test_emails)
    print(f"\nBatch membership test: {results}")
    
    # Get filter information
    info = r.bf().info("user_emails")
    print(f"\nBloom filter info:")
    print(f"  Capacity: {info.get('Capacity', 'N/A')}")
    print(f"  Size: {info.get('Size', 'N/A')}")
    print(f"  Number of filters: {info.get('Number of filters', 'N/A')}")
    print(f"  Number of items inserted: {info.get('Number of items inserted', 'N/A')}")

setup_email_bloom_filter()
test_email_membership()

Cuckoo Filter with Deletion Support

import redis
import time

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def setup_session_cuckoo_filter():
    # Create Cuckoo filter for tracking active sessions
    r.cf().reserve("active_sessions", 10000)  # Capacity for 10K sessions
    print("Created Cuckoo filter for active sessions")
    
    # Simulate user sessions
    sessions = [
        "session_abc123",
        "session_def456", 
        "session_ghi789",
        "session_jkl012",
        "session_mno345"
    ]
    
    # Add sessions
    for session in sessions:
        added = r.cf().add("active_sessions", session)
        print(f"Added session {session}: {added}")

def manage_sessions():
    # Check which sessions exist
    test_sessions = [
        "session_abc123",    # Should exist
        "session_xyz999",    # Should not exist
        "session_def456"     # Should exist
    ]
    
    print("\nChecking session existence:")
    results = r.cf().mexists("active_sessions", *test_sessions)
    for session, exists in zip(test_sessions, results):
        print(f"  {session}: {'ACTIVE' if exists else 'INACTIVE'}")
    
    # Simulate session expiration (delete from filter)
    expired_session = "session_abc123"
    deleted = r.cf().del("active_sessions", expired_session)
    print(f"\nExpired session {expired_session}: {deleted}")
    
    # Verify deletion
    still_exists = r.cf().exists("active_sessions", expired_session)
    print(f"Session still exists after deletion: {still_exists}")
    
    # Get session count (approximate)
    count = r.cf().count("active_sessions", "session_def456")
    print(f"Count for session_def456: {count}")
    
    # Get filter info
    info = r.cf().info("active_sessions")
    print(f"\nCuckoo filter info:")
    print(f"  Size: {info.get('Size', 'N/A')}")
    print(f"  Number of buckets: {info.get('Number of buckets', 'N/A')}")
    print(f"  Number of items: {info.get('Number of items', 'N/A')}")

setup_session_cuckoo_filter()
manage_sessions()

Count-Min Sketch for Frequency Estimation

import redis
import random

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def setup_page_view_counter():
    # Initialize Count-Min Sketch with error rate and probability
    r.cms().initbyprob("page_views", 0.01, 0.99)  # 1% error, 99% probability
    print("Created Count-Min Sketch for page view counting")
    
    # Simulate page views
    pages = [
        "/home", "/products", "/about", "/contact", 
        "/blog", "/help", "/pricing", "/features"
    ]
    
    # Simulate random page views
    view_counts = {}
    for _ in range(1000):
        page = random.choice(pages)
        views = random.randint(1, 5)
        
        # Increment in CMS
        r.cms().incrby("page_views", page, views)
        
        # Keep actual count for comparison
        view_counts[page] = view_counts.get(page, 0) + views
    
    print("Simulated 1000 page view events")
    return view_counts

def analyze_page_views(actual_counts):
    # Query estimated counts from CMS
    pages = list(actual_counts.keys())
    estimated_counts = r.cms().query("page_views", *pages)
    
    print("\nPage view analysis (Actual vs Estimated):")
    print("-" * 50)
    
    total_error = 0
    for page, estimated, actual in zip(pages, estimated_counts, actual_counts.values()):
        error = abs(estimated - actual)
        error_pct = (error / actual * 100) if actual > 0 else 0
        total_error += error_pct
        
        print(f"{page:12} | Actual: {actual:4d} | Estimated: {estimated:4d} | Error: {error_pct:.1f}%")
    
    avg_error = total_error / len(pages)
    print(f"\nAverage error rate: {avg_error:.2f}%")
    
    # Get CMS information
    info = r.cms().info("page_views")
    print(f"\nCount-Min Sketch info:")
    print(f"  Width: {info.get('width', 'N/A')}")
    print(f"  Depth: {info.get('depth', 'N/A')}")
    print(f"  Count: {info.get('count', 'N/A')}")

actual_counts = setup_page_view_counter()
analyze_page_views(actual_counts)

Top-K for Heavy Hitters Detection

import redis
import random
import time

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def setup_trending_hashtags():
    # Create Top-K structure for top 10 hashtags
    r.topk().reserve("trending_hashtags", 10, 2000, 7, 0.925)
    print("Created Top-K for trending hashtags (top 10)")
    
    # Simulate hashtag usage
    hashtags = [
        "#python", "#redis", "#database", "#programming", "#coding",
        "#tech", "#software", "#development", "#data", "#cloud",
        "#ai", "#machinelearning", "#devops", "#web", "#mobile"
    ]
    
    # Simulate trending patterns
    trending_weights = {
        "#python": 50,
        "#redis": 45, 
        "#programming": 40,
        "#tech": 35,
        "#coding": 30,
        "#ai": 25,
        "#data": 20,
        "#web": 15,
        "#cloud": 12,
        "#software": 10
    }
    
    # Add hashtags with different frequencies
    for hashtag, base_weight in trending_weights.items():
        # Add some randomness to simulate real usage
        actual_count = base_weight + random.randint(-5, 15)
        if actual_count > 0:
            r.topk().incrby("trending_hashtags", hashtag, actual_count)
    
    print("Simulated hashtag usage patterns")

def analyze_trending_hashtags():
    # Get the current top hashtags
    top_hashtags = r.topk().list("trending_hashtags", with_count=True)
    
    print("\nCurrent trending hashtags:")
    print("-" * 30)
    for i in range(0, len(top_hashtags), 2):
        hashtag = top_hashtags[i]
        count = top_hashtags[i + 1] if i + 1 < len(top_hashtags) else 0
        rank = (i // 2) + 1
        print(f"{rank:2d}. {hashtag:15} ({count} mentions)")
    
    # Test specific hashtag queries
    test_hashtags = ["#python", "#redis", "#javascript", "#unknown"]
    
    print("\nHashtag presence in top-K:")
    presence = r.topk().query("trending_hashtags", *test_hashtags)
    counts = r.topk().count("trending_hashtags", *test_hashtags)
    
    for hashtag, is_present, count in zip(test_hashtags, presence, counts):
        status = "IN TOP-K" if is_present else "NOT IN TOP-K"
        print(f"  {hashtag:15}: {status:12} (count: {count})")
    
    # Get Top-K information
    info = r.topk().info("trending_hashtags")
    print(f"\nTop-K structure info:")
    print(f"  K: {info.get('k', 'N/A')}")
    print(f"  Width: {info.get('width', 'N/A')}")
    print(f"  Depth: {info.get('depth', 'N/A')}")
    print(f"  Decay: {info.get('decay', 'N/A')}")

def simulate_real_time_trending():
    print("\nSimulating real-time hashtag updates:")
    
    # Simulate new hashtag mentions coming in
    new_mentions = [
        ("#python", 5),
        ("#javascript", 8),
        ("#redis", 3),
        ("#newtech", 12),
        ("#viral", 20)
    ]
    
    for hashtag, mentions in new_mentions:
        # Add new mentions
        evicted = r.topk().incrby("trending_hashtags", hashtag, mentions)
        
        if evicted and evicted[0]:
            print(f"  Added {mentions} mentions to {hashtag} - evicted: {evicted[0]}")
        else:
            print(f"  Added {mentions} mentions to {hashtag}")
    
    # Show updated top list
    print("\nUpdated trending hashtags:")
    top_hashtags = r.topk().list("trending_hashtags", with_count=True)
    for i in range(0, min(10, len(top_hashtags)), 2):
        hashtag = top_hashtags[i]
        count = top_hashtags[i + 1] if i + 1 < len(top_hashtags) else 0
        rank = (i // 2) + 1
        print(f"  {rank}. {hashtag} ({count})")

setup_trending_hashtags()
analyze_trending_hashtags()
simulate_real_time_trending()

Combining Multiple Probabilistic Structures

import redis
import random
import string

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

class UserActivityTracker:
    def __init__(self, redis_client):
        self.r = redis_client
        self.setup_structures()
    
    def setup_structures(self):
        """Initialize all probabilistic data structures"""
        # Bloom filter for registered users
        self.r.bf().reserve("registered_users", 0.01, 1000000)
        
        # Cuckoo filter for active users (allows deletion for logout)
        self.r.cf().reserve("active_users", 100000)
        
        # Count-Min Sketch for page view frequency
        self.r.cms().initbyprob("page_view_frequency", 0.01, 0.99)
        
        # Top-K for most active users
        self.r.topk().reserve("most_active_users", 50, 1000, 7, 0.9)
        
        print("Initialized user activity tracking structures")
    
    def register_user(self, user_id):
        """Register a new user"""
        self.r.bf().add("registered_users", user_id)
        print(f"Registered user: {user_id}")
    
    def user_login(self, user_id):
        """Handle user login"""
        # Check if user is registered
        if not self.r.bf().exists("registered_users", user_id):
            print(f"Warning: User {user_id} not registered but attempting login")
            return False
        
        # Mark as active
        self.r.cf().add("active_users", user_id)
        print(f"User {user_id} logged in")
        return True
    
    def user_logout(self, user_id):
        """Handle user logout"""
        # Remove from active users
        removed = self.r.cf().del("active_users", user_id)
        if removed:
            print(f"User {user_id} logged out")
        return removed
    
    def track_page_view(self, user_id, page):
        """Track page view for user"""
        # Increment page view count
        self.r.cms().incrby("page_view_frequency", page, 1)
        
        # Track user activity
        self.r.topk().incrby("most_active_users", user_id, 1)
    
    def is_user_registered(self, user_id):
        """Check if user is registered (may have false positives)"""
        return self.r.bf().exists("registered_users", user_id)
    
    def is_user_active(self, user_id):
        """Check if user is currently active"""
        return self.r.cf().exists("active_users", user_id)
    
    def get_page_views(self, *pages):
        """Get estimated page view counts"""
        return self.r.cms().query("page_view_frequency", *pages)
    
    def get_most_active_users(self):
        """Get list of most active users"""
        return self.r.topk().list("most_active_users", with_count=True)
    
    def get_stats(self):
        """Get system statistics"""
        bf_info = self.r.bf().info("registered_users")
        cf_info = self.r.cf().info("active_users")
        cms_info = self.r.cms().info("page_view_frequency")
        topk_info = self.r.topk().info("most_active_users")
        
        return {
            "registered_users": bf_info.get("Number of items inserted", 0),
            "active_users": cf_info.get("Number of items", 0),
            "total_page_views": cms_info.get("count", 0),
            "tracking_top_users": topk_info.get("k", 0)
        }

# Usage example
tracker = UserActivityTracker(r)

# Simulate user registrations
users = [f"user_{i:04d}" for i in range(1, 101)]
for user in users[:50]:  # Register first 50 users
    tracker.register_user(user)

# Simulate user logins
active_users = random.sample(users[:50], 20)  # 20 users login
for user in active_users:
    tracker.user_login(user)

# Simulate page views
pages = ["/home", "/dashboard", "/profile", "/settings", "/help"]
for _ in range(500):  # 500 page views
    user = random.choice(active_users)
    page = random.choice(pages)
    tracker.track_page_view(user, page)

# Check system stats
print("\nSystem Statistics:")
stats = tracker.get_stats()
for metric, value in stats.items():
    print(f"  {metric}: {value}")

# Check some users
test_users = ["user_0001", "user_0025", "user_0075", "user_0099"]
print("\nUser Status Check:")
for user in test_users:
    registered = tracker.is_user_registered(user)
    active = tracker.is_user_active(user)
    print(f"  {user}: Registered={registered}, Active={active}")

# Get page view statistics
page_views = tracker.get_page_views(*pages)
print("\nPage View Statistics:")
for page, views in zip(pages, page_views):
    print(f"  {page:12}: {views} views")

# Get most active users
print("\nMost Active Users:")
most_active = tracker.get_most_active_users()
for i in range(0, min(10, len(most_active)), 2):
    user = most_active[i]
    activity_count = most_active[i + 1] if i + 1 < len(most_active) else 0
    print(f"  {user}: {activity_count} activities")

# Simulate some logouts
logout_users = random.sample(active_users, 5)
print(f"\nSimulating {len(logout_users)} user logouts:")
for user in logout_users:
    tracker.user_logout(user)

# Check updated active user count
final_stats = tracker.get_stats()
print(f"\nFinal active users: {final_stats['active_users']}")

Install with Tessl CLI

npx tessl i tessl/pypi-redis

docs

modules

redis-bloom.md

redis-json.md

redis-search.md

redis-timeseries.md

async-support.md

cluster-support.md

connection-management.md

core-client.md

distributed-locking.md

error-handling.md

high-availability.md

index.md

pipelines-transactions.md

pubsub-messaging.md

tile.json