Bloom filter: A Probabilistic data structure
npx @tessl/cli install tessl/pypi-pybloom-live@3.1.0A Python implementation of Bloom filter probabilistic data structures, forked from pybloom with an improved tightening ratio of 0.9 for better average space usage across wide ranges of growth. Provides both standard Bloom Filters for known set sizes and Scalable Bloom Filters that can grow dynamically without knowing the original set size.
pip install pybloom-liveimport pybloom_liveStandard pattern for accessing classes:
from pybloom_live import BloomFilter, ScalableBloomFilterfrom pybloom_live import BloomFilter, ScalableBloomFilter
# Create a standard bloom filter for known set size
bf = BloomFilter(capacity=1000, error_rate=0.001)
# Add items to filter
bf.add("item1")
bf.add("item2")
bf.add("item3")
# Test membership (may have false positives, never false negatives)
print("item1" in bf) # True
print("item4" in bf) # False (or rarely True - false positive)
# Create scalable bloom filter for unknown set size
sbf = ScalableBloomFilter(initial_capacity=100, error_rate=0.001)
for i in range(10000):
sbf.add(f"item{i}")
print(f"item5000" in sbf) # True
print(len(sbf)) # 10000Pybloom-live implements two main probabilistic data structures:
Both implementations use optimized hash function generation (MD5, SHA1, SHA256, SHA384, SHA512) based on required hash bits, and support set operations (union, intersection), serialization to files, and pickling for persistence.
Fixed-capacity probabilistic data structure for set membership testing with configurable false positive probability. Ideal when the maximum set size is known in advance.
class BloomFilter:
def __init__(self, capacity: int, error_rate: float = 0.001):
"""
Initialize bloom filter with fixed capacity.
Parameters:
- capacity: Maximum number of elements before degradation
- error_rate: Target false positive probability (0 < error_rate < 1)
Raises:
- ValueError: If error_rate not between 0 and 1, or capacity <= 0
"""
def add(self, key, skip_check: bool = False) -> bool:
"""
Add a key to the bloom filter.
Parameters:
- key: Item to add (string or any hashable object)
- skip_check: Skip membership check for performance
Returns:
- bool: True if key was already in filter, False if newly added
Raises:
- IndexError: If filter is at capacity
"""
def __contains__(self, key) -> bool:
"""Test key membership in filter (supports 'in' operator)."""
def __len__(self) -> int:
"""Return number of keys stored in filter."""
def copy(self) -> 'BloomFilter':
"""Return a copy of this bloom filter."""
def union(self, other: 'BloomFilter') -> 'BloomFilter':
"""
Calculate union with another filter.
Parameters:
- other: Another BloomFilter with same capacity and error_rate
Returns:
- BloomFilter: New filter containing union of both filters
Raises:
- ValueError: If filters have different capacity or error_rate
"""
def intersection(self, other: 'BloomFilter') -> 'BloomFilter':
"""
Calculate intersection with another filter.
Parameters:
- other: Another BloomFilter with same capacity and error_rate
Returns:
- BloomFilter: New filter containing intersection of both filters
Raises:
- ValueError: If filters have different capacity or error_rate
"""
def __or__(self, other: 'BloomFilter') -> 'BloomFilter':
"""Union operator overload (| operator)."""
def __and__(self, other: 'BloomFilter') -> 'BloomFilter':
"""Intersection operator overload (& operator)."""
def tofile(self, f):
"""
Write bloom filter to file object.
Parameters:
- f: File-like object for writing binary data
"""
@classmethod
def fromfile(cls, f, n: int = -1) -> 'BloomFilter':
"""
Read bloom filter from file object.
Parameters:
- f: File-like object for reading binary data
- n: Maximum bytes to read (-1 for all)
Returns:
- BloomFilter: Deserialized bloom filter
Raises:
- ValueError: If n too small or bit length mismatch
"""# Instance attributes (read-only after initialization)
bf.error_rate: float # False positive error rate
bf.num_slices: int # Number of hash functions
bf.bits_per_slice: int # Bits per hash function
bf.capacity: int # Maximum elements before degradation
bf.num_bits: int # Total bits in filter
bf.count: int # Current number of elements
bf.bitarray # Underlying bitarray storage
# Class attributes
BloomFilter.FILE_FMT = b'<dQQQQ' # File format for serializationDynamic bloom filter that grows automatically without knowing the original set size, maintaining steady false positive rate by creating internal BloomFilter instances with exponentially increasing capacity.
class ScalableBloomFilter:
def __init__(self, initial_capacity: int = 100, error_rate: float = 0.001,
mode: int = 4):
"""
Initialize scalable bloom filter.
Parameters:
- initial_capacity: Initial capacity of first internal filter
- error_rate: Target false positive probability
- mode: Growth mode (SMALL_SET_GROWTH=2 or LARGE_SET_GROWTH=4)
"""
def add(self, key) -> bool:
"""
Add key to scalable filter.
Parameters:
- key: Item to add (string or any hashable object)
Returns:
- bool: True if key was already in filter, False if newly added
"""
def __contains__(self, key) -> bool:
"""Test key membership in filter (supports 'in' operator)."""
def __len__(self) -> int:
"""Return total number of elements stored across all internal filters."""
def union(self, other: 'ScalableBloomFilter') -> 'ScalableBloomFilter':
"""
Calculate union with another scalable filter.
Parameters:
- other: Another ScalableBloomFilter with same parameters
Returns:
- ScalableBloomFilter: New filter containing union
Raises:
- ValueError: If filters have different mode, initial_capacity, or error_rate
"""
def __or__(self, other: 'ScalableBloomFilter') -> 'ScalableBloomFilter':
"""Union operator overload (| operator)."""
def tofile(self, f):
"""
Serialize scalable bloom filter to file object.
Parameters:
- f: File-like object for writing binary data
"""
@classmethod
def fromfile(cls, f) -> 'ScalableBloomFilter':
"""
Deserialize scalable bloom filter from file object.
Parameters:
- f: File-like object for reading binary data
Returns:
- ScalableBloomFilter: Deserialized scalable filter
"""# Properties (computed from internal filters)
@property
def capacity(self) -> int:
"""Total capacity for all internal filters."""
@property
def count(self) -> int:
"""Total number of elements (alias for __len__)."""
# Instance attributes
sbf.scale: int # Growth mode constant
sbf.ratio: float # Tightening ratio (0.9)
sbf.initial_capacity: int # Initial capacity
sbf.error_rate: float # Target error rate
sbf.filters: list # List of internal BloomFilter objects
# Class constants
ScalableBloomFilter.SMALL_SET_GROWTH = 2 # Slower growth, less memory
ScalableBloomFilter.LARGE_SET_GROWTH = 4 # Faster growth, more memory (default)
ScalableBloomFilter.FILE_FMT = '<idQd' # File format for serializationBoth BloomFilter and ScalableBloomFilter support set operations for combining filters with identical parameters.
# Union operations (combine two filters)
union_filter = bloom1.union(bloom2)
union_filter = bloom1 | bloom2
# Intersection operations (BloomFilter only)
intersection_filter = bloom1.intersection(bloom2)
intersection_filter = bloom1 & bloom2
# Copy operations (BloomFilter only)
copied_filter = bloom1.copy()Both filter types support file serialization and Python pickling for persistence.
# File serialization
with open('filter.dat', 'wb') as f:
bloom_filter.tofile(f)
with open('filter.dat', 'rb') as f:
loaded_filter = BloomFilter.fromfile(f)
# Pickle support (automatic via __getstate__/__setstate__)
import pickle
pickled_data = pickle.dumps(bloom_filter)
loaded_filter = pickle.loads(pickled_data)# Type aliases for documentation
HashableKey = Union[str, int, float, bytes, Tuple[Hashable, ...]]
FileObject = Union[io.IOBase, typing.BinaryIO]The package raises these exceptions:
from pybloom_live import BloomFilter
# Create filter for expected 10,000 items with 0.1% false positive rate
bf = BloomFilter(capacity=10000, error_rate=0.001)
# Add items
for i in range(5000):
bf.add(f"user_{i}")
# Test membership
print("user_100" in bf) # True
print("user_9999" in bf) # False
print(f"Filter contains {len(bf)} items") # Filter contains 5000 itemsfrom pybloom_live import ScalableBloomFilter
# Create scalable filter that grows automatically
sbf = ScalableBloomFilter(
initial_capacity=1000,
error_rate=0.001,
mode=ScalableBloomFilter.LARGE_SET_GROWTH
)
# Add large number of items - filter grows automatically
for i in range(100000):
sbf.add(f"item_{i}")
print(f"Total capacity: {sbf.capacity}")
print(f"Total items: {len(sbf)}")
print(f"Number of internal filters: {len(sbf.filters)}")from pybloom_live import BloomFilter
# Create two filters with identical parameters
bf1 = BloomFilter(capacity=1000, error_rate=0.001)
bf2 = BloomFilter(capacity=1000, error_rate=0.001)
# Add different items to each
for i in range(500):
bf1.add(f"set1_item_{i}")
bf2.add(f"set2_item_{i}")
# Union contains items from both filters
union_bf = bf1 | bf2
# Check items exist in union
print("set1_item_100" in union_bf) # True
print("set2_item_100" in union_bf) # True
# Intersection contains items present in both (none in this case)
intersection_bf = bf1 & bf2
print(len(intersection_bf)) # 0 (no common items)