Python implementation of redis API, can be used for testing purposes
—
Redis list data type operations providing ordered collections of strings. Lists are implemented as linked lists, making them excellent for use cases requiring fast insertion and removal at the head and tail, such as queues, stacks, and activity feeds. Lists can hold up to 2^32 - 1 elements.
Core list manipulation functions for adding, removing, and accessing list elements.
def lpush(self, name: KeyT, *values: EncodableT) -> ResponseT: ...
def rpush(self, name: KeyT, *values: EncodableT) -> ResponseT: ...
def lpushx(self, name: KeyT, *values: EncodableT) -> ResponseT: ...
def rpushx(self, name: KeyT, *values: EncodableT) -> ResponseT: ...
def lpop(self, name: KeyT, count: Optional[int] = None) -> Union[Optional[bytes], List[Optional[bytes]]]: ...
def rpop(self, name: KeyT, count: Optional[int] = None) -> Union[Optional[bytes], List[Optional[bytes]]]: ...
def llen(self, name: KeyT) -> ResponseT: ...
def lindex(self, name: KeyT, index: int) -> Optional[bytes]: ...
def lset(self, name: KeyT, index: int, value: EncodableT) -> ResponseT: ...Functions for working with list ranges including retrieval, trimming, and range-based modifications.
def lrange(self, name: KeyT, start: int, end: int) -> List[bytes]: ...
def ltrim(self, name: KeyT, start: int, end: int) -> ResponseT: ...
def lrem(self, name: KeyT, count: int, value: EncodableT) -> ResponseT: ...
def linsert(
self,
name: KeyT,
where: Literal["BEFORE", "AFTER"],
refvalue: EncodableT,
value: EncodableT
) -> ResponseT: ...
def lpos(
self,
name: KeyT,
value: EncodableT,
rank: Optional[int] = None,
count: Optional[int] = None,
maxlen: Optional[int] = None
) -> Union[Optional[int], List[Optional[int]]]: ...Blocking list operations that wait for elements to become available, useful for implementing queues and producer-consumer patterns.
def blpop(
self,
keys: Union[KeyT, Iterable[KeyT]],
timeout: float = 0
) -> Optional[Tuple[bytes, bytes]]: ...
def brpop(
self,
keys: Union[KeyT, Iterable[KeyT]],
timeout: float = 0
) -> Optional[Tuple[bytes, bytes]]: ...
def brpoplpush(
self,
src: KeyT,
dst: KeyT,
timeout: float = 0
) -> Optional[bytes]: ...
def blmove(
self,
first_list: KeyT,
second_list: KeyT,
src: Literal["LEFT", "RIGHT"],
dest: Literal["LEFT", "RIGHT"],
timeout: float = 0
) -> Optional[bytes]: ...Functions for atomically moving elements between lists.
def rpoplpush(self, src: KeyT, dst: KeyT) -> Optional[bytes]: ...
def lmove(
self,
first_list: KeyT,
second_list: KeyT,
src: Literal["LEFT", "RIGHT"],
dest: Literal["LEFT", "RIGHT"]
) -> Optional[bytes]: ...import fakeredis
client = fakeredis.FakeRedis()
# Create a list by pushing elements
client.lpush('tasks', 'task1', 'task2', 'task3') # Left push (prepend)
client.rpush('tasks', 'task4', 'task5') # Right push (append)
# Get list length
length = client.llen('tasks')
print(f"List length: {length}") # 5
# Get entire list
all_tasks = client.lrange('tasks', 0, -1)
for task in all_tasks:
print(task.decode())
# Access specific elements
first_task = client.lindex('tasks', 0)
last_task = client.lindex('tasks', -1)
print(f"First: {first_task.decode()}, Last: {last_task.decode()}")
# Pop elements
left_popped = client.lpop('tasks') # Remove from left (head)
right_popped = client.rpop('tasks') # Remove from right (tail)
print(f"Popped: {left_popped.decode()}, {right_popped.decode()}")import fakeredis
class Queue:
def __init__(self, client, name):
self.client = client
self.name = name
def enqueue(self, *items):
"""Add items to the end of the queue"""
return self.client.rpush(self.name, *items)
def dequeue(self):
"""Remove and return item from the front of the queue"""
item = self.client.lpop(self.name)
return item.decode() if item else None
def dequeue_blocking(self, timeout=0):
"""Block until an item is available to dequeue"""
result = self.client.blpop(self.name, timeout=timeout)
return result[1].decode() if result else None
def size(self):
return self.client.llen(self.name)
def peek(self, count=1):
"""Look at items without removing them"""
items = self.client.lrange(self.name, 0, count - 1)
return [item.decode() for item in items]
# Usage
client = fakeredis.FakeRedis()
queue = Queue(client, 'job_queue')
# Add jobs
queue.enqueue('process_image_1', 'send_email_2', 'backup_database')
# Process jobs
while queue.size() > 0:
job = queue.dequeue()
print(f"Processing: {job}")import fakeredis
class Stack:
def __init__(self, client, name):
self.client = client
self.name = name
def push(self, *items):
"""Add items to the top of the stack"""
return self.client.lpush(self.name, *items)
def pop(self):
"""Remove and return item from the top of the stack"""
item = self.client.lpop(self.name)
return item.decode() if item else None
def peek(self):
"""Look at the top item without removing it"""
item = self.client.lindex(self.name, 0)
return item.decode() if item else None
def size(self):
return self.client.llen(self.name)
def is_empty(self):
return self.size() == 0
# Usage
client = fakeredis.FakeRedis()
stack = Stack(client, 'call_stack')
# Add function calls
stack.push('main()', 'process_data()', 'validate_input()')
# Process stack (LIFO)
while not stack.is_empty():
func = stack.pop()
print(f"Returning from: {func}")import fakeredis
client = fakeredis.FakeRedis()
# Initialize a list
client.rpush('numbers', '1', '2', '3', '2', '4', '2', '5')
# Find positions of elements
positions = client.lpos('numbers', '2', count=0) # Find all occurrences
print(f"Positions of '2': {positions}")
# Insert element before/after another element
client.linsert('numbers', 'BEFORE', '3', '2.5')
client.linsert('numbers', 'AFTER', '4', '4.5')
# Remove specific elements
removed_count = client.lrem('numbers', 2, '2') # Remove first 2 occurrences of '2'
print(f"Removed {removed_count} occurrences of '2'")
# Update element at specific index
client.lset('numbers', 0, '0') # Set first element to '0'
# Trim list to keep only a range
client.ltrim('numbers', 1, -2) # Keep elements from index 1 to second-to-last
# View final result
result = client.lrange('numbers', 0, -1)
print("Final list:", [item.decode() for item in result])import fakeredis
client = fakeredis.FakeRedis()
# Pop from multiple lists (first available)
client.rpush('list1', 'a', 'b', 'c')
client.rpush('list2', 'x', 'y', 'z')
result = client.blpop(['list1', 'list2'], timeout=1)
if result:
list_name, value = result
print(f"Popped '{value.decode()}' from {list_name.decode()}")
# Move elements between lists atomically
client.rpush('source', 'item1', 'item2', 'item3')
client.lpush('destination', 'existing')
# Move from right of source to left of destination
moved_item = client.rpoplpush('source', 'destination')
print(f"Moved item: {moved_item.decode()}")
# More flexible move operation
moved_item = client.lmove('source', 'destination', 'LEFT', 'RIGHT')
print(f"Moved item: {moved_item.decode()}")import fakeredis
client = fakeredis.FakeRedis()
# Push only if list exists
result = client.lpushx('nonexistent', 'value')
print(f"Push to nonexistent list: {result}") # 0 (failed)
# Create the list first
client.lpush('existing', 'initial')
# Now pushx will work
result = client.lpushx('existing', 'new_item')
print(f"Push to existing list: {result}") # 2 (new length)
result = client.rpushx('existing', 'another_item')
print(f"Right push to existing list: {result}") # 3 (new length)import fakeredis
import threading
import time
import random
def producer(client, queue_name, producer_id):
"""Produce items and add them to the queue"""
for i in range(5):
item = f"item_{producer_id}_{i}"
client.rpush(queue_name, item)
print(f"Producer {producer_id} added: {item}")
time.sleep(random.uniform(0.1, 0.5))
def consumer(client, queue_name, consumer_id):
"""Consume items from the queue"""
while True:
# Block for up to 5 seconds waiting for an item
result = client.blpop(queue_name, timeout=5)
if result:
_, item = result
print(f"Consumer {consumer_id} processed: {item.decode()}")
time.sleep(random.uniform(0.2, 0.8)) # Simulate processing
else:
print(f"Consumer {consumer_id} timed out, stopping")
break
# Usage
client = fakeredis.FakeRedis()
queue_name = 'work_queue'
# Start producers and consumers
threads = []
# Start 2 producers
for i in range(2):
t = threading.Thread(target=producer, args=(client, queue_name, i))
threads.append(t)
t.start()
# Start 3 consumers
for i in range(3):
t = threading.Thread(target=consumer, args=(client, queue_name, i))
threads.append(t)
t.start()
# Wait for all threads to complete
for t in threads:
t.join()
print("All producers and consumers finished")import fakeredis
import time
import json
class ActivityFeed:
def __init__(self, client, user_id, max_items=100):
self.client = client
self.key = f'feed:{user_id}'
self.max_items = max_items
def add_activity(self, activity_type, data):
"""Add a new activity to the feed"""
activity = {
'type': activity_type,
'data': data,
'timestamp': int(time.time())
}
# Add to the left (most recent first)
self.client.lpush(self.key, json.dumps(activity))
# Keep only the most recent max_items
self.client.ltrim(self.key, 0, self.max_items - 1)
def get_activities(self, limit=10):
"""Get the most recent activities"""
activities = self.client.lrange(self.key, 0, limit - 1)
return [json.loads(activity.decode()) for activity in activities]
def get_activity_count(self):
"""Get total number of activities in feed"""
return self.client.llen(self.key)
# Usage
client = fakeredis.FakeRedis()
feed = ActivityFeed(client, 'user123')
# Add various activities
feed.add_activity('login', {'ip': '192.168.1.1'})
feed.add_activity('purchase', {'item': 'laptop', 'amount': 999.99})
feed.add_activity('comment', {'post_id': 456, 'text': 'Great post!'})
feed.add_activity('like', {'post_id': 789})
# Retrieve recent activities
recent = feed.get_activities(limit=5)
for activity in recent:
print(f"{activity['type']}: {activity['data']} at {activity['timestamp']}")import fakeredis
client = fakeredis.FakeRedis()
# Add batch of items to process
items = [f'task_{i}' for i in range(10)]
client.rpush('batch_queue', *items)
# Process items in batches
batch_size = 3
while client.llen('batch_queue') > 0:
# Pop multiple items at once (Redis 6.2+)
batch = client.lpop('batch_queue', count=batch_size)
if isinstance(batch, list):
# Multiple items returned
print(f"Processing batch: {[item.decode() for item in batch if item]}")
else:
# Single item or None
if batch:
print(f"Processing single item: {batch.decode()}")
breakInstall with Tessl CLI
npx tessl i tessl/pypi-fakeredisdocs