CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-fakeredis

Python implementation of redis API, can be used for testing purposes

Pending
Overview
Eval results
Files

sorted-set-operations.mddocs/

Sorted Set Operations

Redis sorted set data type operations providing ordered collections of unique strings with associated scores. Sorted sets combine the uniqueness of sets with the ordering capabilities of lists, making them perfect for leaderboards, priority queues, time-series data, and ranked collections. Each member has an associated score used for ordering.

Capabilities

Basic Sorted Set Operations

Core sorted set manipulation functions for adding, removing, and querying members with scores.

def zadd(
    self,
    name: KeyT,
    mapping: Optional[Mapping[AnyKeyT, EncodableT]] = None,
    nx: bool = False,
    xx: bool = False,
    ch: bool = False,
    incr: bool = False,
    gt: bool = False,
    lt: bool = False,
    **kwargs: EncodableT
) -> ResponseT: ...

def zrem(self, name: KeyT, *values: EncodableT) -> ResponseT: ...

def zcard(self, name: KeyT) -> ResponseT: ...

def zcount(self, name: KeyT, min: ZScoreT, max: ZScoreT) -> ResponseT: ...

def zscore(self, name: KeyT, value: EncodableT) -> Optional[float]: ...

def zmscore(self, key: KeyT, members: Iterable[EncodableT]) -> List[Optional[float]]: ...

def zrank(self, name: KeyT, value: EncodableT) -> Optional[int]: ...

def zrevrank(self, name: KeyT, value: EncodableT) -> Optional[int]: ...

def zincrby(self, name: KeyT, amount: float, value: EncodableT) -> ResponseT: ...

Range Operations

Functions for retrieving members by rank or score ranges with flexible ordering options.

def zrange(
    self,
    name: KeyT,
    start: int,
    end: int,
    desc: bool = False,
    withscores: bool = False,
    score_cast_func: Optional[Callable] = float,
    byscore: bool = False,
    bylex: bool = False,
    offset: Optional[int] = None,
    num: Optional[int] = None
) -> Union[List[bytes], List[Tuple[bytes, float]]]: ...

def zrevrange(
    self,
    name: KeyT,
    start: int,
    end: int,
    withscores: bool = False,
    score_cast_func: Optional[Callable] = float
) -> Union[List[bytes], List[Tuple[bytes, float]]]: ...

def zrangebyscore(
    self,
    name: KeyT,
    min: ZScoreT,
    max: ZScoreT,
    start: Optional[int] = None,
    num: Optional[int] = None,
    withscores: bool = False,
    score_cast_func: Optional[Callable] = float
) -> Union[List[bytes], List[Tuple[bytes, float]]]: ...

def zrevrangebyscore(
    self,
    name: KeyT,
    max: ZScoreT,
    min: ZScoreT,
    start: Optional[int] = None,
    num: Optional[int] = None,
    withscores: bool = False,
    score_cast_func: Optional[Callable] = float
) -> Union[List[bytes], List[Tuple[bytes, float]]]: ...

def zrangebylex(
    self,
    name: KeyT,
    min: EncodableT,
    max: EncodableT,
    start: Optional[int] = None,
    num: Optional[int] = None
) -> List[bytes]: ...

def zrevrangebylex(
    self,
    name: KeyT,
    max: EncodableT,
    min: EncodableT,
    start: Optional[int] = None,
    num: Optional[int] = None
) -> List[bytes]: ...

Range Removal Operations

Functions for removing members by rank or score ranges.

def zremrangebyrank(self, name: KeyT, start: int, end: int) -> ResponseT: ...

def zremrangebyscore(self, name: KeyT, min: ZScoreT, max: ZScoreT) -> ResponseT: ...

def zremrangebylex(self, name: KeyT, min: EncodableT, max: EncodableT) -> ResponseT: ...

Pop Operations

Functions for atomically removing and returning members with highest or lowest scores.

def zpopmin(self, name: KeyT, count: Optional[int] = None) -> Union[List[Union[bytes, float]], List[Tuple[bytes, float]]]: ...

def zpopmax(self, name: KeyT, count: Optional[int] = None) -> Union[List[Union[bytes, float]], List[Tuple[bytes, float]]]: ...

def bzpopmin(
    self,
    keys: Union[KeyT, Iterable[KeyT]],
    timeout: float = 0
) -> Optional[Tuple[bytes, bytes, float]]: ...

def bzpopmax(
    self,
    keys: Union[KeyT, Iterable[KeyT]],
    timeout: float = 0
) -> Optional[Tuple[bytes, bytes, float]]: ...

Set Algebra Operations

Functions for performing operations between multiple sorted sets.

def zinter(
    self,
    keys: Iterable[KeyT],
    aggregate: Optional[Literal["SUM", "MIN", "MAX"]] = None,
    withscores: bool = False
) -> Union[List[bytes], List[Tuple[bytes, float]]]: ...

def zinterstore(
    self,
    dest: KeyT,
    keys: Union[Mapping[AnyKeyT, float], Iterable[KeyT]],
    aggregate: Optional[Literal["SUM", "MIN", "MAX"]] = None
) -> ResponseT: ...

def zunionstore(
    self,
    dest: KeyT,
    keys: Union[Mapping[AnyKeyT, float], Iterable[KeyT]],
    aggregate: Optional[Literal["SUM", "MIN", "MAX"]] = None
) -> ResponseT: ...

def zdiff(
    self,
    keys: Iterable[KeyT],
    withscores: bool = False
) -> Union[List[bytes], List[Tuple[bytes, float]]]: ...

def zdiffstore(self, dest: KeyT, keys: Iterable[KeyT]) -> ResponseT: ...

Random Selection and Scanning

Functions for random member selection and iterating through large sorted sets.

def zrandmember(
    self,
    key: KeyT,
    count: Optional[int] = None,
    withscores: bool = False
) -> Union[Optional[bytes], List[bytes], List[Tuple[bytes, float]]]: ...

def zscan(
    self,
    name: KeyT,
    cursor: int = 0,
    match: Optional[PatternT] = None,
    count: Optional[int] = None,
    score_cast_func: Optional[Callable] = float
) -> ResponseT: ...

def zscan_iter(
    self,
    name: KeyT,
    match: Optional[PatternT] = None,
    count: Optional[int] = None,
    score_cast_func: Optional[Callable] = float
) -> Iterator[Tuple[bytes, float]]: ...

Usage Examples

Basic Sorted Set Operations

import fakeredis

client = fakeredis.FakeRedis()

# Add members with scores
client.zadd('leaderboard', {
    'alice': 100,
    'bob': 85,
    'charlie': 92,
    'david': 78
})

# Add single member
client.zadd('leaderboard', {'eve': 95})

# Get sorted set size
size = client.zcard('leaderboard')
print(f"Leaderboard size: {size}")  # 5

# Get member score
alice_score = client.zscore('leaderboard', 'alice')
print(f"Alice's score: {alice_score}")  # 100.0

# Get multiple scores
scores = client.zmscore('leaderboard', ['alice', 'bob', 'unknown'])
print(f"Scores: {scores}")  # [100.0, 85.0, None]

# Get member rank (0-based, lowest score = rank 0)
alice_rank = client.zrank('leaderboard', 'alice')
print(f"Alice's rank (asc): {alice_rank}")  # 4 (highest score)

# Get reverse rank (highest score = rank 0)
alice_revrank = client.zrevrank('leaderboard', 'alice')
print(f"Alice's rank (desc): {alice_revrank}")  # 0 (highest score)

Range Queries

import fakeredis

client = fakeredis.FakeRedis()

# Setup leaderboard
client.zadd('scores', {
    'player1': 1500,
    'player2': 1200,
    'player3': 1800,
    'player4': 900,
    'player5': 1650
})

# Get top 3 players (highest scores)
top_players = client.zrevrange('scores', 0, 2, withscores=True)
print("Top 3 players:")
for player, score in top_players:
    print(f"  {player.decode()}: {score}")

# Get bottom 2 players (lowest scores)
bottom_players = client.zrange('scores', 0, 1, withscores=True)
print("Bottom 2 players:")
for player, score in bottom_players:
    print(f"  {player.decode()}: {score}")

# Get players with scores between 1000 and 1600
mid_range = client.zrangebyscore('scores', 1000, 1600, withscores=True)
print("Mid-range players (1000-1600):")
for player, score in mid_range:
    print(f"  {player.decode()}: {score}")

# Count players in score range
count = client.zcount('scores', 1200, 1700)
print(f"Players with scores 1200-1700: {count}")

Leaderboard Implementation

import fakeredis

class Leaderboard:
    def __init__(self, client, name):
        self.client = client
        self.name = name
    
    def add_score(self, player, score):
        """Add or update player score"""
        return self.client.zadd(self.name, {player: score})
    
    def increment_score(self, player, amount):
        """Increment player's score"""
        return self.client.zincrby(self.name, amount, player)
    
    def get_score(self, player):
        """Get player's current score"""
        return self.client.zscore(self.name, player)
    
    def get_rank(self, player):
        """Get player's rank (1-based, 1 = highest score)"""
        rank = self.client.zrevrank(self.name, player)
        return rank + 1 if rank is not None else None
    
    def get_top_players(self, limit=10):
        """Get top N players"""
        return self.client.zrevrange(self.name, 0, limit - 1, withscores=True)
    
    def get_player_range(self, start_rank, end_rank):
        """Get players in rank range (1-based)"""
        return self.client.zrevrange(
            self.name, 
            start_rank - 1, 
            end_rank - 1, 
            withscores=True
        )
    
    def get_around_player(self, player, context=2):
        """Get players around a specific player"""
        rank = self.client.zrevrank(self.name, player)
        if rank is None:
            return []
        
        start = max(0, rank - context)
        end = rank + context
        return self.client.zrevrange(self.name, start, end, withscores=True)
    
    def remove_player(self, player):
        """Remove player from leaderboard"""
        return self.client.zrem(self.name, player)

# Usage
client = fakeredis.FakeRedis()
leaderboard = Leaderboard(client, 'game_scores')

# Add initial scores
players_scores = {
    'Alice': 2500,
    'Bob': 1800,
    'Charlie': 2200,
    'Diana': 1900,
    'Eve': 2100
}

for player, score in players_scores.items():
    leaderboard.add_score(player, score)

# Game events
leaderboard.increment_score('Bob', 300)  # Bob scores 300 points
leaderboard.increment_score('Diana', 150)  # Diana scores 150 points

# Query leaderboard
print("Top 3 players:")
for i, (player, score) in enumerate(leaderboard.get_top_players(3), 1):
    print(f"{i}. {player.decode()}: {score}")

# Find Alice's position
alice_rank = leaderboard.get_rank('Alice')
alice_score = leaderboard.get_score('Alice')
print(f"Alice: Rank {alice_rank}, Score {alice_score}")

# Show players around Bob
around_bob = leaderboard.get_around_player('Bob')
print("Players around Bob:")
for player, score in around_bob:
    print(f"  {player.decode()}: {score}")

Time-based Rankings

import fakeredis
import time

class TimeBasedRanking:
    def __init__(self, client, name):
        self.client = client
        self.name = name
    
    def add_event(self, item, timestamp=None):
        """Add item with timestamp as score"""
        if timestamp is None:
            timestamp = time.time()
        return self.client.zadd(self.name, {item: timestamp})
    
    def get_recent_items(self, limit=10):
        """Get most recent items"""
        return self.client.zrevrange(self.name, 0, limit - 1, withscores=True)
    
    def get_items_since(self, since_timestamp):
        """Get items added since a specific timestamp"""
        return self.client.zrangebyscore(
            self.name, 
            since_timestamp, 
            '+inf', 
            withscores=True
        )
    
    def get_items_in_time_range(self, start_time, end_time):
        """Get items in a specific time range"""
        return self.client.zrangebyscore(
            self.name,
            start_time,
            end_time,
            withscores=True
        )
    
    def cleanup_old_items(self, before_timestamp):
        """Remove items older than timestamp"""
        return self.client.zremrangebyscore(self.name, '-inf', before_timestamp)

# Usage
client = fakeredis.FakeRedis()
activity = TimeBasedRanking(client, 'user_activity')

# Record activities with timestamps
current_time = time.time()
activity.add_event('login', current_time - 3600)      # 1 hour ago
activity.add_event('page_view', current_time - 1800)  # 30 min ago
activity.add_event('purchase', current_time - 600)    # 10 min ago
activity.add_event('logout', current_time)            # Now

# Get recent activities
recent = activity.get_recent_items(5)
print("Recent activities:")
for event, timestamp in recent:
    print(f"  {event.decode()}: {timestamp}")

# Get activities in last 45 minutes
since_45_min = activity.get_items_since(current_time - 2700)
print("Activities in last 45 minutes:")
for event, timestamp in since_45_min:
    print(f"  {event.decode()}: {timestamp}")

Priority Queue Implementation

import fakeredis

class PriorityQueue:
    def __init__(self, client, name):
        self.client = client
        self.name = name
    
    def enqueue(self, item, priority):
        """Add item with priority (higher number = higher priority)"""
        return self.client.zadd(self.name, {item: priority})
    
    def dequeue(self):
        """Remove and return highest priority item"""
        result = self.client.zpopmax(self.name, 1)
        if result:
            return result[0][0].decode(), result[0][1]  # item, priority
        return None, None
    
    def dequeue_multiple(self, count):
        """Remove and return multiple highest priority items"""
        result = self.client.zpopmax(self.name, count)
        return [(item.decode(), priority) for item, priority in result]
    
    def peek(self):
        """Look at highest priority item without removing"""
        result = self.client.zrevrange(self.name, 0, 0, withscores=True)
        if result:
            return result[0][0].decode(), result[0][1]
        return None, None
    
    def update_priority(self, item, new_priority):
        """Update item's priority"""
        return self.client.zadd(self.name, {item: new_priority})
    
    def size(self):
        """Get queue size"""
        return self.client.zcard(self.name)
    
    def is_empty(self):
        """Check if queue is empty"""
        return self.size() == 0

# Usage
client = fakeredis.FakeRedis()
pq = PriorityQueue(client, 'task_queue')

# Add tasks with priorities
pq.enqueue('send_email', 1)
pq.enqueue('backup_database', 5)
pq.enqueue('process_payment', 10)
pq.enqueue('generate_report', 3)

# Process tasks by priority
while not pq.is_empty():
    task, priority = pq.dequeue()
    print(f"Processing: {task} (priority: {priority})")

Score Manipulation and Set Operations

import fakeredis

client = fakeredis.FakeRedis()

# Setup multiple score sets
client.zadd('math_scores', {'alice': 95, 'bob': 87, 'charlie': 92})
client.zadd('english_scores', {'alice': 88, 'bob': 91, 'diana': 89})
client.zadd('science_scores', {'alice': 93, 'charlie': 85, 'diana': 94})

# Increment scores
client.zincrby('math_scores', 5, 'bob')  # Bob improves by 5 points

# Conditional adds (nx = only if doesn't exist, xx = only if exists)
client.zadd('math_scores', {'eve': 90}, nx=True)  # Add new student
client.zadd('math_scores', {'alice': 100}, xx=True)  # Update existing student

# Union of scores (sum by default)
client.zunionstore('total_scores', ['math_scores', 'english_scores', 'science_scores'])

# Union with weights
client.zunionstore(
    'weighted_scores', 
    {'math_scores': 0.4, 'english_scores': 0.3, 'science_scores': 0.3}
)

# Intersection (students who have all three scores)
client.zinterstore('complete_scores', ['math_scores', 'english_scores', 'science_scores'])

# Get final rankings
total_rankings = client.zrevrange('weighted_scores', 0, -1, withscores=True)
print("Final weighted rankings:")
for student, score in total_rankings:
    print(f"  {student.decode()}: {score:.1f}")

Random Selection from Sorted Set

import fakeredis

client = fakeredis.FakeRedis()

# Setup a sorted set of items with popularity scores
client.zadd('popular_items', {
    'item_a': 100,
    'item_b': 85,
    'item_c': 70,
    'item_d': 95,
    'item_e': 60
})

# Get random item
random_item = client.zrandmember('popular_items')
print(f"Random item: {random_item.decode()}")

# Get multiple random items with scores
random_items = client.zrandmember('popular_items', 3, withscores=True)
print("Random items with scores:")
for item, score in random_items:
    print(f"  {item.decode()}: {score}")

# Weighted random selection (simulate by score ranges)
def weighted_random_selection(client, key, count=1):
    # Get total score sum
    all_items = client.zrange(key, 0, -1, withscores=True)
    total_score = sum(score for _, score in all_items)
    
    # This is a simplified example - in practice you'd implement
    # proper weighted random selection
    return client.zrandmember(key, count, withscores=True)

weighted_selection = weighted_random_selection(client, 'popular_items', 2)
print("Weighted random selection:")
for item, score in weighted_selection:
    print(f"  {item.decode()}: {score}")

Scanning Large Sorted Sets

import fakeredis

client = fakeredis.FakeRedis()

# Create a large sorted set
for i in range(1000):
    client.zadd('large_sorted_set', {f'member_{i}': i})

# Scan through all members
cursor = 0
all_members = []

while True:
    cursor, members = client.zscan('large_sorted_set', cursor=cursor, count=100)
    all_members.extend(members)
    if cursor == 0:
        break

print(f"Total members scanned: {len(all_members)}")

# Scan with pattern matching
matching_members = []
for member, score in client.zscan_iter('large_sorted_set', match='member_1*'):
    matching_members.append((member.decode(), score))

print(f"Members matching 'member_1*': {len(matching_members)}")

# Use the iterator for efficient processing
high_scorers = []
for member, score in client.zscan_iter('large_sorted_set'):
    if score > 900:
        high_scorers.append((member.decode(), score))

print(f"High scorers (>900): {len(high_scorers)}")

Install with Tessl CLI

npx tessl i tessl/pypi-fakeredis

docs

bitmap-operations.md

core-clients.md

generic-operations.md

geospatial-operations.md

hash-operations.md

index.md

list-operations.md

lua-scripting.md

pubsub-operations.md

server-management.md

server-operations.md

set-operations.md

sorted-set-operations.md

stack-extensions.md

stream-operations.md

string-operations.md

transaction-operations.md

valkey-support.md

tile.json