Python implementation of redis API, can be used for testing purposes
—
Redis bitmap operations for efficient bit-level manipulation of string values. Bitmap operations provide memory-efficient storage and manipulation of binary data, supporting counting, positioning, and bitwise operations across large datasets.
Get and set individual bits within string values treated as bitmaps.
def getbit(self, key: KeyT, offset: int) -> int: ...
def setbit(self, key: KeyT, offset: int, value: int) -> int: ...Find the position of bits with specific values within bitmap data.
def bitpos(self, key: KeyT, bit: int, *args: bytes) -> int: ...Count the number of set bits in bitmap data with optional range specification.
def bitcount(self, key: KeyT, *args: bytes) -> int: ...Perform bitwise operations (AND, OR, XOR, NOT) between multiple bitmaps.
def bitop(self, op_name: bytes, dst: KeyT, *keys: KeyT) -> int: ...Advanced bit manipulation supporting multiple data types and operations in a single command.
def bitfield(self, key: KeyT, *args: bytes) -> List[Optional[int]]: ...import fakeredis
client = fakeredis.FakeRedis()
print("=== Basic Bitmap Operations ===")
# Set individual bits using SETBIT
client.setbit('bitmap:user_activity', 0, 1) # User 0 active
client.setbit('bitmap:user_activity', 5, 1) # User 5 active
client.setbit('bitmap:user_activity', 11, 1) # User 11 active
client.setbit('bitmap:user_activity', 15, 1) # User 15 active
# Get individual bits
print(f"User 0 active: {client.getbit('bitmap:user_activity', 0)}")
print(f"User 3 active: {client.getbit('bitmap:user_activity', 3)}")
print(f"User 5 active: {client.getbit('bitmap:user_activity', 5)}")
# Count total active users
active_count = client.bitcount('bitmap:user_activity')
print(f"Total active users: {active_count}")
# Find first active user
first_active = client.bitpos('bitmap:user_activity', 1)
print(f"First active user ID: {first_active}")
# Find first inactive user
first_inactive = client.bitpos('bitmap:user_activity', 0)
print(f"First inactive user ID: {first_inactive}")import fakeredis
import datetime
client = fakeredis.FakeRedis()
print("=== User Activity Tracking ===")
# Simulate daily user activity for a week
days = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
user_activities = {
'monday': [0, 1, 2, 5, 8, 10, 15, 20, 25],
'tuesday': [1, 2, 3, 6, 9, 11, 16, 21, 26],
'wednesday': [0, 2, 4, 7, 10, 12, 17, 22, 27],
'thursday': [1, 3, 5, 8, 11, 13, 18, 23, 28],
'friday': [0, 1, 4, 6, 9, 14, 19, 24, 29],
'saturday': [2, 5, 7, 10, 15, 20, 25, 30],
'sunday': [1, 3, 6, 8, 12, 16, 21, 26, 31]
}
# Record activity for each day
for day, active_users in user_activities.items():
bitmap_key = f'activity:{day}'
for user_id in active_users:
client.setbit(bitmap_key, user_id, 1)
daily_count = client.bitcount(bitmap_key)
print(f"{day.capitalize()}: {daily_count} active users")
# Find users active every day (intersection)
print(f"\n=== Users Active Every Day ===")
client.bitop('AND', 'activity:all_days',
'activity:monday', 'activity:tuesday', 'activity:wednesday',
'activity:thursday', 'activity:friday', 'activity:saturday', 'activity:sunday')
consistent_users = client.bitcount('activity:all_days')
print(f"Users active all 7 days: {consistent_users}")
# Find users active on any day (union)
print(f"\n=== Users Active Any Day ===")
client.bitop('OR', 'activity:any_day',
'activity:monday', 'activity:tuesday', 'activity:wednesday',
'activity:thursday', 'activity:friday', 'activity:saturday', 'activity:sunday')
any_day_users = client.bitcount('activity:any_day')
print(f"Users active on any day: {any_day_users}")
# Find weekend-only users
print(f"\n=== Weekend vs Weekday Analysis ===")
client.bitop('OR', 'activity:weekend', 'activity:saturday', 'activity:sunday')
client.bitop('OR', 'activity:weekday',
'activity:monday', 'activity:tuesday', 'activity:wednesday',
'activity:thursday', 'activity:friday')
weekend_count = client.bitcount('activity:weekend')
weekday_count = client.bitcount('activity:weekday')
print(f"Weekend active users: {weekend_count}")
print(f"Weekday active users: {weekday_count}")
# Users active on weekends but not weekdays
client.bitop('NOT', 'activity:not_weekday', 'activity:weekday')
client.bitop('AND', 'activity:weekend_only', 'activity:weekend', 'activity:not_weekday')
weekend_only = client.bitcount('activity:weekend_only')
print(f"Weekend-only users: {weekend_only}")import fakeredis
client = fakeredis.FakeRedis()
print("=== Feature Flag Management ===")
# Define feature flags
features = {
'beta_ui': [1, 5, 10, 15, 20, 25, 30], # Beta UI users
'premium': [2, 7, 12, 17, 22, 27], # Premium users
'analytics': [3, 8, 13, 18, 23, 28], # Analytics access
'api_v2': [4, 9, 14, 19, 24, 29], # API v2 access
'mobile_app': [0, 5, 10, 15, 20, 25, 30] # Mobile app users
}
# Set feature flags
for feature, user_ids in features.items():
for user_id in user_ids:
client.setbit(f'feature:{feature}', user_id, 1)
enabled_count = client.bitcount(f'feature:{feature}')
print(f"{feature}: {enabled_count} users enabled")
# Find users with multiple features
print(f"\n=== Feature Combinations ===")
# Users with both beta UI and mobile app
client.bitop('AND', 'combo:beta_mobile', 'feature:beta_ui', 'feature:mobile_app')
beta_mobile_count = client.bitcount('combo:beta_mobile')
print(f"Beta UI + Mobile app: {beta_mobile_count} users")
# Premium users with analytics
client.bitop('AND', 'combo:premium_analytics', 'feature:premium', 'feature:analytics')
premium_analytics_count = client.bitcount('combo:premium_analytics')
print(f"Premium + Analytics: {premium_analytics_count} users")
# Users with any premium feature (premium OR analytics OR api_v2)
client.bitop('OR', 'temp:premium_or_analytics', 'feature:premium', 'feature:analytics')
client.bitop('OR', 'combo:any_premium', 'temp:premium_or_analytics', 'feature:api_v2')
any_premium_count = client.bitcount('combo:any_premium')
print(f"Any premium feature: {any_premium_count} users")
# Check specific user's features
def check_user_features(user_id: int):
user_features = []
for feature in features.keys():
if client.getbit(f'feature:{feature}', user_id):
user_features.append(feature)
return user_features
print(f"\n=== Individual User Features ===")
test_users = [5, 12, 20]
for user_id in test_users:
user_features = check_user_features(user_id)
print(f"User {user_id}: {', '.join(user_features)}")import fakeredis
import random
import time
client = fakeredis.FakeRedis()
print("=== Performance Monitoring with Bitmaps ===")
# Simulate server response times over 24 hours (1440 minutes)
# Bit = 1 means response time was acceptable (< 500ms)
# Bit = 0 means response time was slow (>= 500ms)
def simulate_server_performance():
"""Simulate 24 hours of server performance data"""
performance_key = 'performance:today'
# Simulate performance for each minute of the day
for minute in range(1440): # 24 * 60 minutes
# 95% chance of good performance, 5% chance of slow response
is_good = random.random() < 0.95
client.setbit(performance_key, minute, 1 if is_good else 0)
return performance_key
# Generate performance data
performance_key = simulate_server_performance()
print("Generated 24 hours of performance data...")
# Analyze performance
print(f"\n=== Performance Analysis ===")
total_minutes = 1440
good_minutes = client.bitcount(performance_key)
bad_minutes = total_minutes - good_minutes
print(f"Total minutes: {total_minutes}")
print(f"Good performance minutes: {good_minutes}")
print(f"Poor performance minutes: {bad_minutes}")
print(f"Uptime percentage: {(good_minutes / total_minutes) * 100:.2f}%")
# Find first incident
first_incident = client.bitpos(performance_key, 0)
if first_incident != -1:
hours = first_incident // 60
minutes = first_incident % 60
print(f"First incident at: {hours:02d}:{minutes:02d}")
else:
print("No incidents detected!")
# Analyze performance by hour
print(f"\n=== Hourly Performance Breakdown ===")
for hour in range(24):
start_minute = hour * 60
end_minute = start_minute + 59
# Count good minutes in this hour using bitcount with range
# Note: Redis bitcount range is in bytes, so we need to be careful
hour_key = f'performance:hour_{hour}'
# Copy the hour's data to analyze it
for minute in range(60):
global_minute = start_minute + minute
bit_value = client.getbit(performance_key, global_minute)
client.setbit(hour_key, minute, bit_value)
good_in_hour = client.bitcount(hour_key)
percentage = (good_in_hour / 60) * 100
print(f"Hour {hour:02d}:00-{hour:02d}:59: {good_in_hour}/60 good minutes ({percentage:.1f}%)")
# Find longest streak of good performance
print(f"\n=== Performance Streaks ===")
def find_longest_streak(bitmap_key, bit_value, total_bits):
"""Find the longest consecutive streak of a specific bit value"""
max_streak = 0
current_streak = 0
max_start = 0
current_start = 0
for i in range(total_bits):
if client.getbit(bitmap_key, i) == bit_value:
if current_streak == 0:
current_start = i
current_streak += 1
if current_streak > max_streak:
max_streak = current_streak
max_start = current_start
else:
current_streak = 0
return max_streak, max_start
# Find longest good streak
good_streak_length, good_streak_start = find_longest_streak(performance_key, 1, 1440)
if good_streak_length > 0:
start_hour = good_streak_start // 60
start_min = good_streak_start % 60
end_minute = good_streak_start + good_streak_length - 1
end_hour = end_minute // 60
end_min = end_minute % 60
print(f"Longest good streak: {good_streak_length} minutes")
print(f"From {start_hour:02d}:{start_min:02d} to {end_hour:02d}:{end_min:02d}")
# Find longest outage streak
bad_streak_length, bad_streak_start = find_longest_streak(performance_key, 0, 1440)
if bad_streak_length > 0:
start_hour = bad_streak_start // 60
start_min = bad_streak_start % 60
end_minute = bad_streak_start + bad_streak_length - 1
end_hour = end_minute // 60
end_min = end_minute % 60
print(f"Longest outage: {bad_streak_length} minutes")
print(f"From {start_hour:02d}:{start_min:02d} to {end_hour:02d}:{end_min:02d}")import fakeredis
client = fakeredis.FakeRedis()
print("=== Advanced Bitfield Operations ===")
# Store multiple counters in a single key using bitfields
metrics_key = 'metrics:counters'
# Initialize counters: 4 x 16-bit unsigned integers
# Counter 0: Page views (offset 0)
# Counter 1: Unique visitors (offset 16)
# Counter 2: API calls (offset 32)
# Counter 3: Errors (offset 48)
print("=== Initializing Counters ===")
result = client.bitfield(
metrics_key,
'SET', 'u16', '0', '1500', # Page views: 1500
'SET', 'u16', '16', '245', # Unique visitors: 245
'SET', 'u16', '32', '892', # API calls: 892
'SET', 'u16', '48', '7' # Errors: 7
)
print(f"Initial values set: {result}")
# Read all counters
print(f"\n=== Reading All Counters ===")
values = client.bitfield(
metrics_key,
'GET', 'u16', '0', # Page views
'GET', 'u16', '16', # Unique visitors
'GET', 'u16', '32', # API calls
'GET', 'u16', '48' # Errors
)
print(f"Page views: {values[0]}")
print(f"Unique visitors: {values[1]}")
print(f"API calls: {values[2]}")
print(f"Errors: {values[3]}")
# Increment counters atomically
print(f"\n=== Incrementing Counters ===")
increments = client.bitfield(
metrics_key,
'INCRBY', 'u16', '0', '25', # +25 page views
'INCRBY', 'u16', '16', '3', # +3 unique visitors
'INCRBY', 'u16', '32', '47', # +47 API calls
'INCRBY', 'u16', '48', '2' # +2 errors
)
print(f"New values after increment: {increments}")
# Demonstrate overflow handling
print(f"\n=== Overflow Handling ===")
# Set a counter near its maximum value (65535 for u16)
client.bitfield(metrics_key, 'SET', 'u16', '64', '65530')
# Try different overflow behaviors
print("WRAP overflow (default):")
wrap_result = client.bitfield(
metrics_key,
'INCRBY', 'u16', '64', '10' # This will wrap around
)
print(f"65530 + 10 with WRAP = {wrap_result[0]}")
# Reset and try FAIL overflow
client.bitfield(metrics_key, 'SET', 'u16', '64', '65530')
print("\nFAIL overflow:")
fail_result = client.bitfield(
metrics_key,
'OVERFLOW', 'FAIL',
'INCRBY', 'u16', '64', '10' # This will return None
)
print(f"65530 + 10 with FAIL = {fail_result[0]}")
# Reset and try SAT overflow
client.bitfield(metrics_key, 'SET', 'u16', '64', '65530')
print("\nSAT overflow:")
sat_result = client.bitfield(
metrics_key,
'OVERFLOW', 'SAT',
'INCRBY', 'u16', '64', '10' # This will saturate at 65535
)
print(f"65530 + 10 with SAT = {sat_result[0]}")
# Working with signed integers
print(f"\n=== Signed Integer Operations ===")
signed_key = 'signed:temperatures'
# Store temperature readings as signed 8-bit integers (-128 to 127)
temperatures = [22, -5, 15, -12, 35, -20, 8]
print("Storing temperature readings:")
for i, temp in enumerate(temperatures):
offset = i * 8 # 8 bits per temperature
client.bitfield(signed_key, 'SET', 'i8', str(offset), str(temp))
print(f"Sensor {i}: {temp}°C")
# Read all temperatures
print(f"\nReading all temperatures:")
temp_commands = []
for i in range(len(temperatures)):
temp_commands.extend(['GET', 'i8', str(i * 8)])
all_temps = client.bitfield(signed_key, *temp_commands)
for i, temp in enumerate(all_temps):
print(f"Sensor {i}: {temp}°C")
# Calculate average temperature increase
print(f"\nSimulating temperature changes:")
changes = [2, -3, 1, 5, -2, 4, -1] # Temperature changes
change_commands = []
for i, change in enumerate(changes):
change_commands.extend(['INCRBY', 'i8', str(i * 8), str(change)])
new_temps = client.bitfield(signed_key, *change_commands)
print("New temperatures after changes:")
for i, temp in enumerate(new_temps):
original = temperatures[i]
change = changes[i]
print(f"Sensor {i}: {original}°C + {change}°C = {temp}°C")# Bitmap operation types
BitmapOperation = Literal["AND", "OR", "XOR", "NOT"]
# Bitfield encoding types
BitfieldType = Union[
str, # e.g., "u8", "i16", "u32", "i64"
bytes # e.g., b"u8", b"i16"
]
# Bitfield overflow policies
OverflowPolicy = Literal["WRAP", "SAT", "FAIL"]
# Bitfield operation result
BitfieldResult = List[Optional[int]]Install with Tessl CLI
npx tessl i tessl/pypi-fakeredisdocs