Python implementation of redis API, can be used for testing purposes
—
Redis sorted set data type operations providing ordered collections of unique strings with associated scores. Sorted sets combine the uniqueness of sets with the ordering capabilities of lists, making them perfect for leaderboards, priority queues, time-series data, and ranked collections. Each member has an associated score used for ordering.
Core sorted set manipulation functions for adding, removing, and querying members with scores.
def zadd(
self,
name: KeyT,
mapping: Optional[Mapping[AnyKeyT, EncodableT]] = None,
nx: bool = False,
xx: bool = False,
ch: bool = False,
incr: bool = False,
gt: bool = False,
lt: bool = False,
**kwargs: EncodableT
) -> ResponseT: ...
def zrem(self, name: KeyT, *values: EncodableT) -> ResponseT: ...
def zcard(self, name: KeyT) -> ResponseT: ...
def zcount(self, name: KeyT, min: ZScoreT, max: ZScoreT) -> ResponseT: ...
def zscore(self, name: KeyT, value: EncodableT) -> Optional[float]: ...
def zmscore(self, key: KeyT, members: Iterable[EncodableT]) -> List[Optional[float]]: ...
def zrank(self, name: KeyT, value: EncodableT) -> Optional[int]: ...
def zrevrank(self, name: KeyT, value: EncodableT) -> Optional[int]: ...
def zincrby(self, name: KeyT, amount: float, value: EncodableT) -> ResponseT: ...Functions for retrieving members by rank or score ranges with flexible ordering options.
def zrange(
self,
name: KeyT,
start: int,
end: int,
desc: bool = False,
withscores: bool = False,
score_cast_func: Optional[Callable] = float,
byscore: bool = False,
bylex: bool = False,
offset: Optional[int] = None,
num: Optional[int] = None
) -> Union[List[bytes], List[Tuple[bytes, float]]]: ...
def zrevrange(
self,
name: KeyT,
start: int,
end: int,
withscores: bool = False,
score_cast_func: Optional[Callable] = float
) -> Union[List[bytes], List[Tuple[bytes, float]]]: ...
def zrangebyscore(
self,
name: KeyT,
min: ZScoreT,
max: ZScoreT,
start: Optional[int] = None,
num: Optional[int] = None,
withscores: bool = False,
score_cast_func: Optional[Callable] = float
) -> Union[List[bytes], List[Tuple[bytes, float]]]: ...
def zrevrangebyscore(
self,
name: KeyT,
max: ZScoreT,
min: ZScoreT,
start: Optional[int] = None,
num: Optional[int] = None,
withscores: bool = False,
score_cast_func: Optional[Callable] = float
) -> Union[List[bytes], List[Tuple[bytes, float]]]: ...
def zrangebylex(
self,
name: KeyT,
min: EncodableT,
max: EncodableT,
start: Optional[int] = None,
num: Optional[int] = None
) -> List[bytes]: ...
def zrevrangebylex(
self,
name: KeyT,
max: EncodableT,
min: EncodableT,
start: Optional[int] = None,
num: Optional[int] = None
) -> List[bytes]: ...Functions for removing members by rank or score ranges.
def zremrangebyrank(self, name: KeyT, start: int, end: int) -> ResponseT: ...
def zremrangebyscore(self, name: KeyT, min: ZScoreT, max: ZScoreT) -> ResponseT: ...
def zremrangebylex(self, name: KeyT, min: EncodableT, max: EncodableT) -> ResponseT: ...Functions for atomically removing and returning members with highest or lowest scores.
def zpopmin(self, name: KeyT, count: Optional[int] = None) -> Union[List[Union[bytes, float]], List[Tuple[bytes, float]]]: ...
def zpopmax(self, name: KeyT, count: Optional[int] = None) -> Union[List[Union[bytes, float]], List[Tuple[bytes, float]]]: ...
def bzpopmin(
self,
keys: Union[KeyT, Iterable[KeyT]],
timeout: float = 0
) -> Optional[Tuple[bytes, bytes, float]]: ...
def bzpopmax(
self,
keys: Union[KeyT, Iterable[KeyT]],
timeout: float = 0
) -> Optional[Tuple[bytes, bytes, float]]: ...Functions for performing operations between multiple sorted sets.
def zinter(
self,
keys: Iterable[KeyT],
aggregate: Optional[Literal["SUM", "MIN", "MAX"]] = None,
withscores: bool = False
) -> Union[List[bytes], List[Tuple[bytes, float]]]: ...
def zinterstore(
self,
dest: KeyT,
keys: Union[Mapping[AnyKeyT, float], Iterable[KeyT]],
aggregate: Optional[Literal["SUM", "MIN", "MAX"]] = None
) -> ResponseT: ...
def zunionstore(
self,
dest: KeyT,
keys: Union[Mapping[AnyKeyT, float], Iterable[KeyT]],
aggregate: Optional[Literal["SUM", "MIN", "MAX"]] = None
) -> ResponseT: ...
def zdiff(
self,
keys: Iterable[KeyT],
withscores: bool = False
) -> Union[List[bytes], List[Tuple[bytes, float]]]: ...
def zdiffstore(self, dest: KeyT, keys: Iterable[KeyT]) -> ResponseT: ...Functions for random member selection and iterating through large sorted sets.
def zrandmember(
self,
key: KeyT,
count: Optional[int] = None,
withscores: bool = False
) -> Union[Optional[bytes], List[bytes], List[Tuple[bytes, float]]]: ...
def zscan(
self,
name: KeyT,
cursor: int = 0,
match: Optional[PatternT] = None,
count: Optional[int] = None,
score_cast_func: Optional[Callable] = float
) -> ResponseT: ...
def zscan_iter(
self,
name: KeyT,
match: Optional[PatternT] = None,
count: Optional[int] = None,
score_cast_func: Optional[Callable] = float
) -> Iterator[Tuple[bytes, float]]: ...import fakeredis
client = fakeredis.FakeRedis()
# Add members with scores
client.zadd('leaderboard', {
'alice': 100,
'bob': 85,
'charlie': 92,
'david': 78
})
# Add single member
client.zadd('leaderboard', {'eve': 95})
# Get sorted set size
size = client.zcard('leaderboard')
print(f"Leaderboard size: {size}") # 5
# Get member score
alice_score = client.zscore('leaderboard', 'alice')
print(f"Alice's score: {alice_score}") # 100.0
# Get multiple scores
scores = client.zmscore('leaderboard', ['alice', 'bob', 'unknown'])
print(f"Scores: {scores}") # [100.0, 85.0, None]
# Get member rank (0-based, lowest score = rank 0)
alice_rank = client.zrank('leaderboard', 'alice')
print(f"Alice's rank (asc): {alice_rank}") # 4 (highest score)
# Get reverse rank (highest score = rank 0)
alice_revrank = client.zrevrank('leaderboard', 'alice')
print(f"Alice's rank (desc): {alice_revrank}") # 0 (highest score)import fakeredis
client = fakeredis.FakeRedis()
# Setup leaderboard
client.zadd('scores', {
'player1': 1500,
'player2': 1200,
'player3': 1800,
'player4': 900,
'player5': 1650
})
# Get top 3 players (highest scores)
top_players = client.zrevrange('scores', 0, 2, withscores=True)
print("Top 3 players:")
for player, score in top_players:
print(f" {player.decode()}: {score}")
# Get bottom 2 players (lowest scores)
bottom_players = client.zrange('scores', 0, 1, withscores=True)
print("Bottom 2 players:")
for player, score in bottom_players:
print(f" {player.decode()}: {score}")
# Get players with scores between 1000 and 1600
mid_range = client.zrangebyscore('scores', 1000, 1600, withscores=True)
print("Mid-range players (1000-1600):")
for player, score in mid_range:
print(f" {player.decode()}: {score}")
# Count players in score range
count = client.zcount('scores', 1200, 1700)
print(f"Players with scores 1200-1700: {count}")import fakeredis
class Leaderboard:
def __init__(self, client, name):
self.client = client
self.name = name
def add_score(self, player, score):
"""Add or update player score"""
return self.client.zadd(self.name, {player: score})
def increment_score(self, player, amount):
"""Increment player's score"""
return self.client.zincrby(self.name, amount, player)
def get_score(self, player):
"""Get player's current score"""
return self.client.zscore(self.name, player)
def get_rank(self, player):
"""Get player's rank (1-based, 1 = highest score)"""
rank = self.client.zrevrank(self.name, player)
return rank + 1 if rank is not None else None
def get_top_players(self, limit=10):
"""Get top N players"""
return self.client.zrevrange(self.name, 0, limit - 1, withscores=True)
def get_player_range(self, start_rank, end_rank):
"""Get players in rank range (1-based)"""
return self.client.zrevrange(
self.name,
start_rank - 1,
end_rank - 1,
withscores=True
)
def get_around_player(self, player, context=2):
"""Get players around a specific player"""
rank = self.client.zrevrank(self.name, player)
if rank is None:
return []
start = max(0, rank - context)
end = rank + context
return self.client.zrevrange(self.name, start, end, withscores=True)
def remove_player(self, player):
"""Remove player from leaderboard"""
return self.client.zrem(self.name, player)
# Usage
client = fakeredis.FakeRedis()
leaderboard = Leaderboard(client, 'game_scores')
# Add initial scores
players_scores = {
'Alice': 2500,
'Bob': 1800,
'Charlie': 2200,
'Diana': 1900,
'Eve': 2100
}
for player, score in players_scores.items():
leaderboard.add_score(player, score)
# Game events
leaderboard.increment_score('Bob', 300) # Bob scores 300 points
leaderboard.increment_score('Diana', 150) # Diana scores 150 points
# Query leaderboard
print("Top 3 players:")
for i, (player, score) in enumerate(leaderboard.get_top_players(3), 1):
print(f"{i}. {player.decode()}: {score}")
# Find Alice's position
alice_rank = leaderboard.get_rank('Alice')
alice_score = leaderboard.get_score('Alice')
print(f"Alice: Rank {alice_rank}, Score {alice_score}")
# Show players around Bob
around_bob = leaderboard.get_around_player('Bob')
print("Players around Bob:")
for player, score in around_bob:
print(f" {player.decode()}: {score}")import fakeredis
import time
class TimeBasedRanking:
def __init__(self, client, name):
self.client = client
self.name = name
def add_event(self, item, timestamp=None):
"""Add item with timestamp as score"""
if timestamp is None:
timestamp = time.time()
return self.client.zadd(self.name, {item: timestamp})
def get_recent_items(self, limit=10):
"""Get most recent items"""
return self.client.zrevrange(self.name, 0, limit - 1, withscores=True)
def get_items_since(self, since_timestamp):
"""Get items added since a specific timestamp"""
return self.client.zrangebyscore(
self.name,
since_timestamp,
'+inf',
withscores=True
)
def get_items_in_time_range(self, start_time, end_time):
"""Get items in a specific time range"""
return self.client.zrangebyscore(
self.name,
start_time,
end_time,
withscores=True
)
def cleanup_old_items(self, before_timestamp):
"""Remove items older than timestamp"""
return self.client.zremrangebyscore(self.name, '-inf', before_timestamp)
# Usage
client = fakeredis.FakeRedis()
activity = TimeBasedRanking(client, 'user_activity')
# Record activities with timestamps
current_time = time.time()
activity.add_event('login', current_time - 3600) # 1 hour ago
activity.add_event('page_view', current_time - 1800) # 30 min ago
activity.add_event('purchase', current_time - 600) # 10 min ago
activity.add_event('logout', current_time) # Now
# Get recent activities
recent = activity.get_recent_items(5)
print("Recent activities:")
for event, timestamp in recent:
print(f" {event.decode()}: {timestamp}")
# Get activities in last 45 minutes
since_45_min = activity.get_items_since(current_time - 2700)
print("Activities in last 45 minutes:")
for event, timestamp in since_45_min:
print(f" {event.decode()}: {timestamp}")import fakeredis
class PriorityQueue:
def __init__(self, client, name):
self.client = client
self.name = name
def enqueue(self, item, priority):
"""Add item with priority (higher number = higher priority)"""
return self.client.zadd(self.name, {item: priority})
def dequeue(self):
"""Remove and return highest priority item"""
result = self.client.zpopmax(self.name, 1)
if result:
return result[0][0].decode(), result[0][1] # item, priority
return None, None
def dequeue_multiple(self, count):
"""Remove and return multiple highest priority items"""
result = self.client.zpopmax(self.name, count)
return [(item.decode(), priority) for item, priority in result]
def peek(self):
"""Look at highest priority item without removing"""
result = self.client.zrevrange(self.name, 0, 0, withscores=True)
if result:
return result[0][0].decode(), result[0][1]
return None, None
def update_priority(self, item, new_priority):
"""Update item's priority"""
return self.client.zadd(self.name, {item: new_priority})
def size(self):
"""Get queue size"""
return self.client.zcard(self.name)
def is_empty(self):
"""Check if queue is empty"""
return self.size() == 0
# Usage
client = fakeredis.FakeRedis()
pq = PriorityQueue(client, 'task_queue')
# Add tasks with priorities
pq.enqueue('send_email', 1)
pq.enqueue('backup_database', 5)
pq.enqueue('process_payment', 10)
pq.enqueue('generate_report', 3)
# Process tasks by priority
while not pq.is_empty():
task, priority = pq.dequeue()
print(f"Processing: {task} (priority: {priority})")import fakeredis
client = fakeredis.FakeRedis()
# Setup multiple score sets
client.zadd('math_scores', {'alice': 95, 'bob': 87, 'charlie': 92})
client.zadd('english_scores', {'alice': 88, 'bob': 91, 'diana': 89})
client.zadd('science_scores', {'alice': 93, 'charlie': 85, 'diana': 94})
# Increment scores
client.zincrby('math_scores', 5, 'bob') # Bob improves by 5 points
# Conditional adds (nx = only if doesn't exist, xx = only if exists)
client.zadd('math_scores', {'eve': 90}, nx=True) # Add new student
client.zadd('math_scores', {'alice': 100}, xx=True) # Update existing student
# Union of scores (sum by default)
client.zunionstore('total_scores', ['math_scores', 'english_scores', 'science_scores'])
# Union with weights
client.zunionstore(
'weighted_scores',
{'math_scores': 0.4, 'english_scores': 0.3, 'science_scores': 0.3}
)
# Intersection (students who have all three scores)
client.zinterstore('complete_scores', ['math_scores', 'english_scores', 'science_scores'])
# Get final rankings
total_rankings = client.zrevrange('weighted_scores', 0, -1, withscores=True)
print("Final weighted rankings:")
for student, score in total_rankings:
print(f" {student.decode()}: {score:.1f}")import fakeredis
client = fakeredis.FakeRedis()
# Setup a sorted set of items with popularity scores
client.zadd('popular_items', {
'item_a': 100,
'item_b': 85,
'item_c': 70,
'item_d': 95,
'item_e': 60
})
# Get random item
random_item = client.zrandmember('popular_items')
print(f"Random item: {random_item.decode()}")
# Get multiple random items with scores
random_items = client.zrandmember('popular_items', 3, withscores=True)
print("Random items with scores:")
for item, score in random_items:
print(f" {item.decode()}: {score}")
# Weighted random selection (simulate by score ranges)
def weighted_random_selection(client, key, count=1):
# Get total score sum
all_items = client.zrange(key, 0, -1, withscores=True)
total_score = sum(score for _, score in all_items)
# This is a simplified example - in practice you'd implement
# proper weighted random selection
return client.zrandmember(key, count, withscores=True)
weighted_selection = weighted_random_selection(client, 'popular_items', 2)
print("Weighted random selection:")
for item, score in weighted_selection:
print(f" {item.decode()}: {score}")import fakeredis
client = fakeredis.FakeRedis()
# Create a large sorted set
for i in range(1000):
client.zadd('large_sorted_set', {f'member_{i}': i})
# Scan through all members
cursor = 0
all_members = []
while True:
cursor, members = client.zscan('large_sorted_set', cursor=cursor, count=100)
all_members.extend(members)
if cursor == 0:
break
print(f"Total members scanned: {len(all_members)}")
# Scan with pattern matching
matching_members = []
for member, score in client.zscan_iter('large_sorted_set', match='member_1*'):
matching_members.append((member.decode(), score))
print(f"Members matching 'member_1*': {len(matching_members)}")
# Use the iterator for efficient processing
high_scorers = []
for member, score in client.zscan_iter('large_sorted_set'):
if score > 900:
high_scorers.append((member.decode(), score))
print(f"High scorers (>900): {len(high_scorers)}")Install with Tessl CLI
npx tessl i tessl/pypi-fakeredisdocs