CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-fakeredis

Python implementation of redis API, can be used for testing purposes

Pending
Overview
Eval results
Files

list-operations.mddocs/

List Operations

Redis list data type operations providing ordered collections of strings. Lists are implemented as linked lists, making them excellent for use cases requiring fast insertion and removal at the head and tail, such as queues, stacks, and activity feeds. Lists can hold up to 2^32 - 1 elements.

Capabilities

Basic List Operations

Core list manipulation functions for adding, removing, and accessing list elements.

def lpush(self, name: KeyT, *values: EncodableT) -> ResponseT: ...

def rpush(self, name: KeyT, *values: EncodableT) -> ResponseT: ...

def lpushx(self, name: KeyT, *values: EncodableT) -> ResponseT: ...

def rpushx(self, name: KeyT, *values: EncodableT) -> ResponseT: ...

def lpop(self, name: KeyT, count: Optional[int] = None) -> Union[Optional[bytes], List[Optional[bytes]]]: ...

def rpop(self, name: KeyT, count: Optional[int] = None) -> Union[Optional[bytes], List[Optional[bytes]]]: ...

def llen(self, name: KeyT) -> ResponseT: ...

def lindex(self, name: KeyT, index: int) -> Optional[bytes]: ...

def lset(self, name: KeyT, index: int, value: EncodableT) -> ResponseT: ...

List Range Operations

Functions for working with list ranges including retrieval, trimming, and range-based modifications.

def lrange(self, name: KeyT, start: int, end: int) -> List[bytes]: ...

def ltrim(self, name: KeyT, start: int, end: int) -> ResponseT: ...

def lrem(self, name: KeyT, count: int, value: EncodableT) -> ResponseT: ...

def linsert(
    self,
    name: KeyT,
    where: Literal["BEFORE", "AFTER"],
    refvalue: EncodableT,
    value: EncodableT
) -> ResponseT: ...

def lpos(
    self,
    name: KeyT,
    value: EncodableT,
    rank: Optional[int] = None,
    count: Optional[int] = None,
    maxlen: Optional[int] = None
) -> Union[Optional[int], List[Optional[int]]]: ...

Blocking Operations

Blocking list operations that wait for elements to become available, useful for implementing queues and producer-consumer patterns.

def blpop(
    self,
    keys: Union[KeyT, Iterable[KeyT]],
    timeout: float = 0
) -> Optional[Tuple[bytes, bytes]]: ...

def brpop(
    self,
    keys: Union[KeyT, Iterable[KeyT]],
    timeout: float = 0
) -> Optional[Tuple[bytes, bytes]]: ...

def brpoplpush(
    self,
    src: KeyT,
    dst: KeyT,
    timeout: float = 0
) -> Optional[bytes]: ...

def blmove(
    self,
    first_list: KeyT,
    second_list: KeyT,
    src: Literal["LEFT", "RIGHT"],
    dest: Literal["LEFT", "RIGHT"],
    timeout: float = 0
) -> Optional[bytes]: ...

Atomic Move Operations

Functions for atomically moving elements between lists.

def rpoplpush(self, src: KeyT, dst: KeyT) -> Optional[bytes]: ...

def lmove(
    self,
    first_list: KeyT,
    second_list: KeyT,
    src: Literal["LEFT", "RIGHT"],
    dest: Literal["LEFT", "RIGHT"]
) -> Optional[bytes]: ...

Usage Examples

Basic List Operations

import fakeredis

client = fakeredis.FakeRedis()

# Create a list by pushing elements
client.lpush('tasks', 'task1', 'task2', 'task3')  # Left push (prepend)
client.rpush('tasks', 'task4', 'task5')            # Right push (append)

# Get list length
length = client.llen('tasks')
print(f"List length: {length}")  # 5

# Get entire list
all_tasks = client.lrange('tasks', 0, -1)
for task in all_tasks:
    print(task.decode())

# Access specific elements
first_task = client.lindex('tasks', 0)
last_task = client.lindex('tasks', -1)
print(f"First: {first_task.decode()}, Last: {last_task.decode()}")

# Pop elements
left_popped = client.lpop('tasks')   # Remove from left (head)
right_popped = client.rpop('tasks')  # Remove from right (tail)
print(f"Popped: {left_popped.decode()}, {right_popped.decode()}")

Queue Implementation (FIFO)

import fakeredis

class Queue:
    def __init__(self, client, name):
        self.client = client
        self.name = name
    
    def enqueue(self, *items):
        """Add items to the end of the queue"""
        return self.client.rpush(self.name, *items)
    
    def dequeue(self):
        """Remove and return item from the front of the queue"""
        item = self.client.lpop(self.name)
        return item.decode() if item else None
    
    def dequeue_blocking(self, timeout=0):
        """Block until an item is available to dequeue"""
        result = self.client.blpop(self.name, timeout=timeout)
        return result[1].decode() if result else None
    
    def size(self):
        return self.client.llen(self.name)
    
    def peek(self, count=1):
        """Look at items without removing them"""
        items = self.client.lrange(self.name, 0, count - 1)
        return [item.decode() for item in items]

# Usage
client = fakeredis.FakeRedis()
queue = Queue(client, 'job_queue')

# Add jobs
queue.enqueue('process_image_1', 'send_email_2', 'backup_database')

# Process jobs
while queue.size() > 0:
    job = queue.dequeue()
    print(f"Processing: {job}")

Stack Implementation (LIFO)

import fakeredis

class Stack:
    def __init__(self, client, name):
        self.client = client
        self.name = name
    
    def push(self, *items):
        """Add items to the top of the stack"""
        return self.client.lpush(self.name, *items)
    
    def pop(self):
        """Remove and return item from the top of the stack"""
        item = self.client.lpop(self.name)
        return item.decode() if item else None
    
    def peek(self):
        """Look at the top item without removing it"""
        item = self.client.lindex(self.name, 0)
        return item.decode() if item else None
    
    def size(self):
        return self.client.llen(self.name)
    
    def is_empty(self):
        return self.size() == 0

# Usage
client = fakeredis.FakeRedis()
stack = Stack(client, 'call_stack')

# Add function calls
stack.push('main()', 'process_data()', 'validate_input()')

# Process stack (LIFO)
while not stack.is_empty():
    func = stack.pop()
    print(f"Returning from: {func}")

List Manipulation

import fakeredis

client = fakeredis.FakeRedis()

# Initialize a list
client.rpush('numbers', '1', '2', '3', '2', '4', '2', '5')

# Find positions of elements
positions = client.lpos('numbers', '2', count=0)  # Find all occurrences
print(f"Positions of '2': {positions}")

# Insert element before/after another element
client.linsert('numbers', 'BEFORE', '3', '2.5')
client.linsert('numbers', 'AFTER', '4', '4.5')

# Remove specific elements
removed_count = client.lrem('numbers', 2, '2')  # Remove first 2 occurrences of '2'
print(f"Removed {removed_count} occurrences of '2'")

# Update element at specific index
client.lset('numbers', 0, '0')  # Set first element to '0'

# Trim list to keep only a range
client.ltrim('numbers', 1, -2)  # Keep elements from index 1 to second-to-last

# View final result
result = client.lrange('numbers', 0, -1)
print("Final list:", [item.decode() for item in result])

Multiple List Operations

import fakeredis

client = fakeredis.FakeRedis()

# Pop from multiple lists (first available)
client.rpush('list1', 'a', 'b', 'c')
client.rpush('list2', 'x', 'y', 'z')

result = client.blpop(['list1', 'list2'], timeout=1)
if result:
    list_name, value = result
    print(f"Popped '{value.decode()}' from {list_name.decode()}")

# Move elements between lists atomically
client.rpush('source', 'item1', 'item2', 'item3')
client.lpush('destination', 'existing')

# Move from right of source to left of destination
moved_item = client.rpoplpush('source', 'destination')
print(f"Moved item: {moved_item.decode()}")

# More flexible move operation
moved_item = client.lmove('source', 'destination', 'LEFT', 'RIGHT')
print(f"Moved item: {moved_item.decode()}")

Conditional Push Operations

import fakeredis

client = fakeredis.FakeRedis()

# Push only if list exists
result = client.lpushx('nonexistent', 'value')
print(f"Push to nonexistent list: {result}")  # 0 (failed)

# Create the list first
client.lpush('existing', 'initial')

# Now pushx will work
result = client.lpushx('existing', 'new_item')
print(f"Push to existing list: {result}")  # 2 (new length)

result = client.rpushx('existing', 'another_item')
print(f"Right push to existing list: {result}")  # 3 (new length)

Producer-Consumer Pattern

import fakeredis
import threading
import time
import random

def producer(client, queue_name, producer_id):
    """Produce items and add them to the queue"""
    for i in range(5):
        item = f"item_{producer_id}_{i}"
        client.rpush(queue_name, item)
        print(f"Producer {producer_id} added: {item}")
        time.sleep(random.uniform(0.1, 0.5))

def consumer(client, queue_name, consumer_id):
    """Consume items from the queue"""
    while True:
        # Block for up to 5 seconds waiting for an item
        result = client.blpop(queue_name, timeout=5)
        if result:
            _, item = result
            print(f"Consumer {consumer_id} processed: {item.decode()}")
            time.sleep(random.uniform(0.2, 0.8))  # Simulate processing
        else:
            print(f"Consumer {consumer_id} timed out, stopping")
            break

# Usage
client = fakeredis.FakeRedis()
queue_name = 'work_queue'

# Start producers and consumers
threads = []

# Start 2 producers
for i in range(2):
    t = threading.Thread(target=producer, args=(client, queue_name, i))
    threads.append(t)
    t.start()

# Start 3 consumers
for i in range(3):
    t = threading.Thread(target=consumer, args=(client, queue_name, i))
    threads.append(t)
    t.start()

# Wait for all threads to complete
for t in threads:
    t.join()

print("All producers and consumers finished")

Activity Feed Implementation

import fakeredis
import time
import json

class ActivityFeed:
    def __init__(self, client, user_id, max_items=100):
        self.client = client
        self.key = f'feed:{user_id}'
        self.max_items = max_items
    
    def add_activity(self, activity_type, data):
        """Add a new activity to the feed"""
        activity = {
            'type': activity_type,
            'data': data,
            'timestamp': int(time.time())
        }
        
        # Add to the left (most recent first)
        self.client.lpush(self.key, json.dumps(activity))
        
        # Keep only the most recent max_items
        self.client.ltrim(self.key, 0, self.max_items - 1)
    
    def get_activities(self, limit=10):
        """Get the most recent activities"""
        activities = self.client.lrange(self.key, 0, limit - 1)
        return [json.loads(activity.decode()) for activity in activities]
    
    def get_activity_count(self):
        """Get total number of activities in feed"""
        return self.client.llen(self.key)

# Usage
client = fakeredis.FakeRedis()
feed = ActivityFeed(client, 'user123')

# Add various activities
feed.add_activity('login', {'ip': '192.168.1.1'})
feed.add_activity('purchase', {'item': 'laptop', 'amount': 999.99})
feed.add_activity('comment', {'post_id': 456, 'text': 'Great post!'})
feed.add_activity('like', {'post_id': 789})

# Retrieve recent activities
recent = feed.get_activities(limit=5)
for activity in recent:
    print(f"{activity['type']}: {activity['data']} at {activity['timestamp']}")

Batch Processing with Pop Multiple

import fakeredis

client = fakeredis.FakeRedis()

# Add batch of items to process
items = [f'task_{i}' for i in range(10)]
client.rpush('batch_queue', *items)

# Process items in batches
batch_size = 3
while client.llen('batch_queue') > 0:
    # Pop multiple items at once (Redis 6.2+)
    batch = client.lpop('batch_queue', count=batch_size)
    
    if isinstance(batch, list):
        # Multiple items returned
        print(f"Processing batch: {[item.decode() for item in batch if item]}")
    else:
        # Single item or None
        if batch:
            print(f"Processing single item: {batch.decode()}")
        break

Install with Tessl CLI

npx tessl i tessl/pypi-fakeredis

docs

bitmap-operations.md

core-clients.md

generic-operations.md

geospatial-operations.md

hash-operations.md

index.md

list-operations.md

lua-scripting.md

pubsub-operations.md

server-management.md

server-operations.md

set-operations.md

sorted-set-operations.md

stack-extensions.md

stream-operations.md

string-operations.md

transaction-operations.md

valkey-support.md

tile.json