Python implementation of redis API, can be used for testing purposes
—
Redis set data type operations providing unordered collections of unique strings. Sets are ideal for tracking unique items, performing set algebra operations (union, intersection, difference), and implementing features like tags, categories, and relationship tracking. Sets can hold up to 2^32 - 1 members.
Core set manipulation functions for adding, removing, and testing membership.
def sadd(self, name: KeyT, *values: EncodableT) -> ResponseT: ...
def srem(self, name: KeyT, *values: EncodableT) -> ResponseT: ...
def sismember(self, name: KeyT, value: EncodableT) -> ResponseT: ...
def smismember(self, name: KeyT, values: Iterable[EncodableT]) -> List[bool]: ...
def scard(self, name: KeyT) -> ResponseT: ...
def smembers(self, name: KeyT) -> Set[bytes]: ...
def spop(self, name: KeyT, count: Optional[int] = None) -> Union[Optional[bytes], List[bytes]]: ...
def srandmember(self, name: KeyT, number: Optional[int] = None) -> Union[Optional[bytes], List[bytes]]: ...Functions for performing mathematical set operations between multiple sets.
def sinter(self, keys: Union[KeyT, Iterable[KeyT]], *args: KeyT) -> Set[bytes]: ...
def sintercard(self, numkeys: int, keys: Iterable[KeyT], limit: int = 0) -> ResponseT: ...
def sinterstore(self, dest: KeyT, keys: Union[KeyT, Iterable[KeyT]], *args: KeyT) -> ResponseT: ...
def sunion(self, keys: Union[KeyT, Iterable[KeyT]], *args: KeyT) -> Set[bytes]: ...
def sunionstore(self, dest: KeyT, keys: Union[KeyT, Iterable[KeyT]], *args: KeyT) -> ResponseT: ...
def sdiff(self, keys: Union[KeyT, Iterable[KeyT]], *args: KeyT) -> Set[bytes]: ...
def sdiffstore(self, dest: KeyT, keys: Union[KeyT, Iterable[KeyT]], *args: KeyT) -> ResponseT: ...Functions for moving members between sets.
def smove(self, src: KeyT, dst: KeyT, value: EncodableT) -> ResponseT: ...Iterator functions for efficiently traversing large sets.
def sscan(
self,
name: KeyT,
cursor: int = 0,
match: Optional[PatternT] = None,
count: Optional[int] = None
) -> ResponseT: ...
def sscan_iter(
self,
name: KeyT,
match: Optional[PatternT] = None,
count: Optional[int] = None
) -> Iterator[bytes]: ...import fakeredis
client = fakeredis.FakeRedis()
# Add members to sets
client.sadd('fruits', 'apple', 'banana', 'orange')
client.sadd('colors', 'red', 'yellow', 'orange')
# Check membership
is_member = client.sismember('fruits', 'apple')
print(f"Apple in fruits: {is_member}") # True
# Check multiple memberships
memberships = client.smismember('fruits', ['apple', 'grape', 'banana'])
print(f"Memberships: {memberships}") # [True, False, True]
# Get set size
fruit_count = client.scard('fruits')
print(f"Number of fruits: {fruit_count}") # 3
# Get all members
all_fruits = client.smembers('fruits')
print("All fruits:", {member.decode() for member in all_fruits})
# Remove members
removed_count = client.srem('fruits', 'banana', 'grape')
print(f"Removed {removed_count} items") # 1 (grape doesn't exist)import fakeredis
client = fakeredis.FakeRedis()
# Setup a set of items
client.sadd('items', 'item1', 'item2', 'item3', 'item4', 'item5', 'item6')
# Get random member without removing
random_item = client.srandmember('items')
print(f"Random item: {random_item.decode()}")
# Get multiple random members (with possible duplicates)
random_items = client.srandmember('items', 3)
print("Random items:", [item.decode() for item in random_items])
# Get multiple random members (without duplicates)
unique_random = client.srandmember('items', -3) # Negative count = no duplicates
print("Unique random items:", [item.decode() for item in unique_random])
# Pop random members (removes them from set)
popped_item = client.spop('items')
print(f"Popped item: {popped_item.decode()}")
# Pop multiple items
popped_items = client.spop('items', 2)
print("Popped items:", [item.decode() for item in popped_items])
# Check remaining items
remaining = client.smembers('items')
print("Remaining items:", {item.decode() for item in remaining})import fakeredis
client = fakeredis.FakeRedis()
# Setup test sets
client.sadd('programming_languages', 'python', 'java', 'javascript', 'go')
client.sadd('web_languages', 'javascript', 'php', 'python', 'ruby')
client.sadd('compiled_languages', 'java', 'go', 'c', 'rust')
# Intersection - languages that are both programming and web languages
web_and_prog = client.sinter('programming_languages', 'web_languages')
print("Web & Programming:", {lang.decode() for lang in web_and_prog})
# Union - all languages combined
all_languages = client.sunion('programming_languages', 'web_languages', 'compiled_languages')
print("All languages:", {lang.decode() for lang in all_languages})
# Difference - programming languages that are not web languages
prog_not_web = client.sdiff('programming_languages', 'web_languages')
print("Programming but not Web:", {lang.decode() for lang in prog_not_web})
# Store intersection result
client.sinterstore('web_programming', 'programming_languages', 'web_languages')
stored_result = client.smembers('web_programming')
print("Stored intersection:", {lang.decode() for lang in stored_result})
# Intersection cardinality (count without returning members)
intersection_count = client.sintercard(2, ['programming_languages', 'compiled_languages'])
print(f"Programming & Compiled count: {intersection_count}")import fakeredis
client = fakeredis.FakeRedis()
# Setup source and destination sets
client.sadd('todo', 'task1', 'task2', 'task3')
client.sadd('completed', 'old_task')
# Move task from todo to completed
moved = client.smove('todo', 'completed', 'task1')
print(f"Moved task1: {moved}") # 1 if successful
# Try to move non-existent member
moved = client.smove('todo', 'completed', 'nonexistent')
print(f"Moved nonexistent: {moved}") # 0 if member doesn't exist
# Check final state
todo_items = client.smembers('todo')
completed_items = client.smembers('completed')
print("TODO:", {item.decode() for item in todo_items})
print("Completed:", {item.decode() for item in completed_items})import fakeredis
client = fakeredis.FakeRedis()
# Create a large set
for i in range(1000):
client.sadd('large_set', f'member_{i}')
# Scan through all members
cursor = 0
all_members = []
while True:
cursor, members = client.sscan('large_set', cursor=cursor, count=100)
all_members.extend([member.decode() for member in members])
if cursor == 0:
break
print(f"Total members scanned: {len(all_members)}")
# Scan with pattern matching
matching_members = []
for member in client.sscan_iter('large_set', match='member_1*'):
matching_members.append(member.decode())
print(f"Members matching 'member_1*': {len(matching_members)}")import fakeredis
class TagSystem:
def __init__(self, client):
self.client = client
def add_tags(self, item_id, *tags):
"""Add tags to an item"""
return self.client.sadd(f'item:{item_id}:tags', *tags)
def remove_tags(self, item_id, *tags):
"""Remove tags from an item"""
return self.client.srem(f'item:{item_id}:tags', *tags)
def get_tags(self, item_id):
"""Get all tags for an item"""
tags = self.client.smembers(f'item:{item_id}:tags')
return {tag.decode() for tag in tags}
def has_tag(self, item_id, tag):
"""Check if item has a specific tag"""
return bool(self.client.sismember(f'item:{item_id}:tags', tag))
def get_items_with_all_tags(self, *tags):
"""Get items that have all specified tags"""
# This is a simplified version - in practice, you'd maintain reverse indexes
# Here we demonstrate the set operations concept
tag_keys = [f'tag:{tag}:items' for tag in tags]
if tag_keys:
item_ids = self.client.sinter(tag_keys)
return {item_id.decode() for item_id in item_ids}
return set()
def add_item_to_tag_index(self, item_id, tag):
"""Add item to tag's reverse index"""
return self.client.sadd(f'tag:{tag}:items', item_id)
def remove_item_from_tag_index(self, item_id, tag):
"""Remove item from tag's reverse index"""
return self.client.srem(f'tag:{tag}:items', item_id)
# Usage
client = fakeredis.FakeRedis()
tag_system = TagSystem(client)
# Add tags to items
tag_system.add_tags('article:1', 'python', 'programming', 'tutorial')
tag_system.add_tags('article:2', 'python', 'web', 'flask')
tag_system.add_tags('article:3', 'javascript', 'programming', 'tutorial')
# Build reverse indexes
for item, tags in [
('article:1', ['python', 'programming', 'tutorial']),
('article:2', ['python', 'web', 'flask']),
('article:3', ['javascript', 'programming', 'tutorial'])
]:
for tag in tags:
tag_system.add_item_to_tag_index(item, tag)
# Query operations
python_articles = client.smembers('tag:python:items')
print("Python articles:", {item.decode() for item in python_articles})
# Find articles that are both programming and tutorials
prog_tutorials = client.sinter('tag:programming:items', 'tag:tutorial:items')
print("Programming tutorials:", {item.decode() for item in prog_tutorials})
# Check article tags
article_tags = tag_system.get_tags('article:1')
print("Article 1 tags:", article_tags)import fakeredis
class PermissionSystem:
def __init__(self, client):
self.client = client
def grant_permission(self, user_id, *permissions):
"""Grant permissions to a user"""
return self.client.sadd(f'user:{user_id}:permissions', *permissions)
def revoke_permission(self, user_id, *permissions):
"""Revoke permissions from a user"""
return self.client.srem(f'user:{user_id}:permissions', *permissions)
def has_permission(self, user_id, permission):
"""Check if user has a specific permission"""
return bool(self.client.sismember(f'user:{user_id}:permissions', permission))
def has_any_permission(self, user_id, *permissions):
"""Check if user has any of the specified permissions"""
user_perms = self.client.smembers(f'user:{user_id}:permissions')
user_perms_set = {perm.decode() for perm in user_perms}
return any(perm in user_perms_set for perm in permissions)
def has_all_permissions(self, user_id, *permissions):
"""Check if user has all specified permissions"""
memberships = self.client.smismember(f'user:{user_id}:permissions', permissions)
return all(memberships)
def get_user_permissions(self, user_id):
"""Get all permissions for a user"""
perms = self.client.smembers(f'user:{user_id}:permissions')
return {perm.decode() for perm in perms}
def get_common_permissions(self, *user_ids):
"""Get permissions common to all specified users"""
if not user_ids:
return set()
keys = [f'user:{user_id}:permissions' for user_id in user_ids]
common = self.client.sinter(keys)
return {perm.decode() for perm in common}
# Usage
client = fakeredis.FakeRedis()
perms = PermissionSystem(client)
# Grant permissions
perms.grant_permission('user:alice', 'read_posts', 'write_posts', 'edit_profile')
perms.grant_permission('user:bob', 'read_posts', 'edit_profile', 'admin_panel')
perms.grant_permission('user:charlie', 'read_posts', 'moderate_comments')
# Check permissions
can_write = perms.has_permission('user:alice', 'write_posts')
print(f"Alice can write posts: {can_write}")
can_admin = perms.has_any_permission('user:bob', 'admin_panel', 'super_admin')
print(f"Bob has admin permissions: {can_admin}")
has_basic_perms = perms.has_all_permissions('user:charlie', 'read_posts', 'write_posts')
print(f"Charlie has basic permissions: {has_basic_perms}")
# Find common permissions
common = perms.get_common_permissions('user:alice', 'user:bob', 'user:charlie')
print(f"Common permissions: {common}")import fakeredis
class SocialSystem:
def __init__(self, client):
self.client = client
def follow(self, follower_id, following_id):
"""User follows another user"""
# Add to follower's following set
self.client.sadd(f'user:{follower_id}:following', following_id)
# Add to followed user's followers set
self.client.sadd(f'user:{following_id}:followers', follower_id)
def unfollow(self, follower_id, following_id):
"""User unfollows another user"""
self.client.srem(f'user:{follower_id}:following', following_id)
self.client.srem(f'user:{following_id}:followers', follower_id)
def get_followers(self, user_id):
"""Get all followers of a user"""
followers = self.client.smembers(f'user:{user_id}:followers')
return {follower.decode() for follower in followers}
def get_following(self, user_id):
"""Get all users that this user is following"""
following = self.client.smembers(f'user:{user_id}:following')
return {user.decode() for user in following}
def is_following(self, follower_id, following_id):
"""Check if one user is following another"""
return bool(self.client.sismember(f'user:{follower_id}:following', following_id))
def get_mutual_follows(self, user1_id, user2_id):
"""Get users that both users are following"""
mutual = self.client.sinter(
f'user:{user1_id}:following',
f'user:{user2_id}:following'
)
return {user.decode() for user in mutual}
def suggest_follows(self, user_id):
"""Suggest users to follow based on mutual connections"""
following = self.get_following(user_id)
suggestions = set()
# Find followers of people this user follows
for followed_user in following:
their_following = self.get_following(followed_user)
suggestions.update(their_following)
# Remove already following and self
suggestions.discard(user_id)
suggestions -= following
return suggestions
# Usage
client = fakeredis.FakeRedis()
social = SocialSystem(client)
# Build social network
social.follow('alice', 'bob')
social.follow('alice', 'charlie')
social.follow('bob', 'charlie')
social.follow('bob', 'david')
social.follow('charlie', 'david')
social.follow('david', 'alice')
# Query the network
alice_following = social.get_following('alice')
print(f"Alice is following: {alice_following}")
bob_followers = social.get_followers('bob')
print(f"Bob's followers: {bob_followers}")
mutual = social.get_mutual_follows('alice', 'bob')
print(f"Alice and Bob both follow: {mutual}")
# Get suggestions for Alice
suggestions = social.suggest_follows('alice')
print(f"Suggested follows for Alice: {suggestions}")Install with Tessl CLI
npx tessl i tessl/pypi-fakeredisdocs