CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-fakeredis

Python implementation of redis API, can be used for testing purposes

Pending
Overview
Eval results
Files

generic-operations.mddocs/

Generic Operations

Redis generic key operations providing functionality that works across all data types. These operations include key management, expiration handling, pattern matching, and utility functions for working with Redis keys regardless of their underlying data type.

Capabilities

Key Existence and Management

Core functions for testing key existence, deletion, and basic key information.

def exists(self, *names: KeyT) -> ResponseT: ...

def delete(self, *names: KeyT) -> ResponseT: ...
# Note: delete() is the method name, but del_() is also available for Python keyword compatibility

def del_(self, *names: KeyT) -> ResponseT: ...

def unlink(self, *names: KeyT) -> ResponseT: ...

def type(self, name: KeyT) -> ResponseT: ...

def touch(self, *names: KeyT) -> ResponseT: ...

def randomkey(self) -> Optional[bytes]: ...

def dbsize(self) -> ResponseT: ...

Key Expiration

Functions for setting, checking, and managing key expiration times.

def expire(self, name: KeyT, time: ExpiryT) -> ResponseT: ...

def expireat(self, name: KeyT, when: AbsExpiryT) -> ResponseT: ...

def pexpire(self, name: KeyT, time: ExpiryT) -> ResponseT: ...

def pexpireat(self, name: KeyT, when: AbsExpiryT) -> ResponseT: ...

def ttl(self, name: KeyT) -> ResponseT: ...

def pttl(self, name: KeyT) -> ResponseT: ...

def persist(self, name: KeyT) -> ResponseT: ...

Key Movement and Renaming

Functions for moving keys between databases and renaming operations.

def move(self, name: KeyT, db: int) -> ResponseT: ...

def rename(self, src: KeyT, dst: KeyT) -> ResponseT: ...

def renamenx(self, src: KeyT, dst: KeyT) -> ResponseT: ...

def copy(
    self,
    source: KeyT,
    destination: KeyT,
    destination_db: Optional[int] = None,
    replace: bool = False
) -> ResponseT: ...

Pattern Matching and Scanning

Functions for finding keys by patterns and iterating through keyspaces.

def keys(self, pattern: PatternT = "*") -> List[bytes]: ...

def scan(
    self,
    cursor: int = 0,
    match: Optional[PatternT] = None,
    count: Optional[int] = None,
    _type: Optional[str] = None
) -> ResponseT: ...

def scan_iter(
    self,
    match: Optional[PatternT] = None,
    count: Optional[int] = None,
    _type: Optional[str] = None
) -> Iterator[bytes]: ...

Sorting and Object Information

Functions for sorting data and inspecting object properties.

def sort(
    self,
    name: KeyT,
    start: Optional[int] = None,
    num: Optional[int] = None,
    by: Optional[str] = None,
    get: Optional[Union[KeyT, List[KeyT]]] = None,
    desc: bool = False,
    alpha: bool = False,
    store: Optional[KeyT] = None,
    groups: bool = False
) -> Union[List[bytes], int]: ...

def object(self, infotype: str, key: KeyT) -> ResponseT: ...

def memory_usage(self, key: KeyT, samples: Optional[int] = None) -> Optional[int]: ...

Serialization

Functions for dumping and restoring key data.

def dump(self, name: KeyT) -> Optional[bytes]: ...

def restore(
    self,
    name: KeyT,
    ttl: int,
    value: bytes,
    replace: bool = False,
    absttl: bool = False,
    idletime: Optional[int] = None,
    frequency: Optional[int] = None
) -> ResponseT: ...

Usage Examples

Basic Key Operations

import fakeredis

client = fakeredis.FakeRedis()

# Set some test data
client.set('user:1', 'alice')
client.hset('profile:1', 'name', 'Alice')
client.lpush('messages:1', 'hello', 'world')

# Check if keys exist
exists_count = client.exists('user:1', 'user:2', 'profile:1')
print(f"Existing keys: {exists_count}")  # 2

# Check individual key existence
if client.exists('user:1'):
    print("User 1 exists")

# Get key types
user_type = client.type('user:1')
profile_type = client.type('profile:1')
messages_type = client.type('messages:1')
print(f"Types: {user_type.decode()}, {profile_type.decode()}, {messages_type.decode()}")

# Delete keys
deleted_count = client.delete('user:1', 'nonexistent_key')
print(f"Deleted keys: {deleted_count}")  # 1

# Get database size
size = client.dbsize()
print(f"Database size: {size} keys")

# Get random key
random_key = client.randomkey()
if random_key:
    print(f"Random key: {random_key.decode()}")

Key Expiration Management

import fakeredis
import time

client = fakeredis.FakeRedis()

# Set key with expiration
client.set('session:abc123', 'user_data')
client.expire('session:abc123', 30)  # Expires in 30 seconds

# Check time to live
ttl_seconds = client.ttl('session:abc123')
print(f"TTL: {ttl_seconds} seconds")

# Set expiration with millisecond precision
client.set('temp:data', 'temporary')
client.pexpire('temp:data', 5000)  # Expires in 5000 milliseconds (5 seconds)

ttl_ms = client.pttl('temp:data')
print(f"TTL: {ttl_ms} milliseconds")

# Set absolute expiration time
future_timestamp = int(time.time()) + 60  # 1 minute from now
client.set('scheduled:task', 'task_data')
client.expireat('scheduled:task', future_timestamp)

# Remove expiration (make key persistent)
client.persist('scheduled:task')
new_ttl = client.ttl('scheduled:task')
print(f"TTL after persist: {new_ttl}")  # -1 means no expiration

# Touch keys to update last access time
client.set('key1', 'value1')
client.set('key2', 'value2')
touched = client.touch('key1', 'key2', 'nonexistent')
print(f"Touched keys: {touched}")  # 2

Pattern Matching and Key Discovery

import fakeredis

client = fakeredis.FakeRedis()

# Setup test data with patterns
test_data = {
    'user:1:name': 'alice',
    'user:1:email': 'alice@example.com',
    'user:2:name': 'bob',  
    'user:2:email': 'bob@example.com',
    'session:abc': 'session_data_1',
    'session:xyz': 'session_data_2',
    'config:app:debug': 'true',
    'config:db:host': 'localhost'
}

for key, value in test_data.items():
    client.set(key, value)

# Find all user keys
user_keys = client.keys('user:*')
print("User keys:", [key.decode() for key in user_keys])

# Find user names specifically
user_names = client.keys('user:*:name')
print("User name keys:", [key.decode() for key in user_names])

# Find session keys
session_keys = client.keys('session:*')
print("Session keys:", [key.decode() for key in session_keys])

# Use character classes in patterns
config_keys = client.keys('config:*:*')
print("Config keys:", [key.decode() for key in config_keys])

# Find keys with single character wildcards
user_1_keys = client.keys('user:1:*')
print("User 1 keys:", [key.decode() for key in user_1_keys])

Scanning Large Keyspaces

import fakeredis

client = fakeredis.FakeRedis()

# Create many keys for demonstration
for i in range(1000):
    client.set(f'key_{i}', f'value_{i}')
    if i % 100 == 0:
        client.set(f'special_key_{i}', f'special_value_{i}')

# Scan all keys efficiently
cursor = 0
all_keys = []

while True:
    cursor, keys = client.scan(cursor=cursor, count=100)
    all_keys.extend([key.decode() for key in keys])
    if cursor == 0:
        break

print(f"Total keys found: {len(all_keys)}")

# Scan with pattern matching
special_keys = []
for key in client.scan_iter(match='special_key_*', count=50):
    special_keys.append(key.decode())

print(f"Special keys found: {len(special_keys)}")

# Scan by type (Redis 6.0+)
string_keys = []
for key in client.scan_iter(_type='string', count=100):
    string_keys.append(key.decode())

print(f"String type keys: {len(string_keys)}")

Key Renaming and Movement

import fakeredis

client = fakeredis.FakeRedis()

# Setup test data
client.set('old_key', 'important_data')
client.set('temp_key', 'temporary_data')

# Rename key (overwrites destination if exists)
client.rename('old_key', 'new_key')

# Verify rename
if not client.exists('old_key') and client.exists('new_key'):
    print("Key renamed successfully")

# Conditional rename (only if destination doesn't exist)
result = client.renamenx('temp_key', 'new_key')  # Will fail because new_key exists
print(f"Conditional rename result: {result}")  # 0 (failed)

result = client.renamenx('temp_key', 'unique_key')  # Will succeed
print(f"Conditional rename result: {result}")  # 1 (success)

# Copy keys (Redis 6.2+)
client.set('source', 'data_to_copy')
copied = client.copy('source', 'destination')
print(f"Copy result: {copied}")  # 1 if successful

# Copy with replace option
copied = client.copy('source', 'destination', replace=True)
print(f"Copy with replace: {copied}")

# Move keys between databases (if multiple databases supported)
# Note: FakeRedis typically operates on a single database
# client.select(0)
# client.set('moveable_key', 'data')
# moved = client.move('moveable_key', 1)  # Move to database 1

Key Sorting

import fakeredis

client = fakeredis.FakeRedis()

# Create a list to sort
client.lpush('numbers', '30', '10', '50', '20', '40')

# Sort numerically (default)
sorted_numbers = client.sort('numbers')
print("Sorted numbers:", [num.decode() for num in sorted_numbers])

# Sort in descending order
sorted_desc = client.sort('numbers', desc=True)
print("Descending:", [num.decode() for num in sorted_desc])

# Sort alphabetically
client.lpush('words', 'zebra', 'apple', 'banana', 'cherry')
sorted_alpha = client.sort('words', alpha=True)
print("Sorted words:", [word.decode() for word in sorted_alpha])

# Sort with limit
limited_sort = client.sort('numbers', start=0, num=3)
print("Top 3:", [num.decode() for num in limited_sort])

# Sort and store result
client.sort('numbers', store='sorted_numbers')
stored_result = client.lrange('sorted_numbers', 0, -1)
print("Stored result:", [num.decode() for num in stored_result])

Advanced Sorting with External Keys

import fakeredis

client = fakeredis.FakeRedis()

# Setup data for advanced sorting
users = ['user:1', 'user:2', 'user:3']
client.lpush('users', *users)

# Set up external sort keys (scores)
client.set('score:user:1', '85')
client.set('score:user:2', '92')
client.set('score:user:3', '78')

# Set up data to retrieve
client.set('name:user:1', 'Alice')
client.set('name:user:2', 'Bob') 
client.set('name:user:3', 'Charlie')

# Sort users by their scores
sorted_by_score = client.sort('users', by='score:*')
print("Users sorted by score:", [user.decode() for user in sorted_by_score])

# Sort and get additional data
sorted_with_names = client.sort('users', by='score:*', get=['name:*', 'score:*'])
print("Sorted with names and scores:")
for i in range(0, len(sorted_with_names), 2):
    name = sorted_with_names[i].decode()
    score = sorted_with_names[i + 1].decode()
    print(f"  {name}: {score}")

Object Information and Memory Usage

import fakeredis

client = fakeredis.FakeRedis()

# Create various data structures
client.set('simple_string', 'hello world')
client.hset('user_hash', 'name', 'Alice', 'age', '30', 'email', 'alice@example.com')
client.lpush('number_list', *[str(i) for i in range(100)])

# Get object information (encoding, idle time, etc.)
try:
    encoding = client.object('encoding', 'simple_string')
    print(f"String encoding: {encoding}")
except:
    print("Object command not fully supported in this Redis version")

# Get memory usage (Redis 4.0+)
try:
    string_memory = client.memory_usage('simple_string')
    hash_memory = client.memory_usage('user_hash')
    list_memory = client.memory_usage('number_list')
    
    print(f"Memory usage:")
    print(f"  String: {string_memory} bytes")
    print(f"  Hash: {hash_memory} bytes") 
    print(f"  List: {list_memory} bytes")
except:
    print("Memory usage command not available")

Serialization and Backup

import fakeredis

client = fakeredis.FakeRedis()

# Create some data
client.set('backup_test', 'important_data')
client.hset('user_profile', 'name', 'Alice', 'age', '30')
client.lpush('activity_log', 'login', 'view_page', 'logout')

# Dump key data for backup
string_dump = client.dump('backup_test')
hash_dump = client.dump('user_profile')
list_dump = client.dump('activity_log')

print(f"Dump sizes: string={len(string_dump)}, hash={len(hash_dump)}, list={len(list_dump)}")

# Delete original keys
client.delete('backup_test', 'user_profile', 'activity_log')

# Restore from dumps
if string_dump:
    client.restore('backup_test_restored', 0, string_dump)
if hash_dump:
    client.restore('user_profile_restored', 0, hash_dump)
if list_dump:
    client.restore('activity_log_restored', 0, list_dump)

# Verify restoration
restored_string = client.get('backup_test_restored')
restored_hash = client.hgetall('user_profile_restored')
restored_list = client.lrange('activity_log_restored', 0, -1)

print(f"Restored string: {restored_string.decode()}")
print(f"Restored hash: {restored_hash}")
print(f"Restored list: {[item.decode() for item in restored_list]}")

Pattern: Key Expiration Manager

import fakeredis
import time

class ExpirationManager:
    def __init__(self, client):
        self.client = client
    
    def set_with_expiry(self, key, value, seconds):
        """Set key with expiration"""
        self.client.set(key, value)
        return self.client.expire(key, seconds)
    
    def extend_expiry(self, key, additional_seconds):
        """Extend existing expiration"""
        current_ttl = self.client.ttl(key)
        if current_ttl > 0:
            return self.client.expire(key, current_ttl + additional_seconds)
        return False
    
    def get_expiring_keys(self, pattern='*', threshold_seconds=60):
        """Find keys expiring within threshold"""
        expiring_keys = []
        for key in self.client.scan_iter(match=pattern):
            ttl = self.client.ttl(key.decode())
            if 0 < ttl <= threshold_seconds:
                expiring_keys.append((key.decode(), ttl))
        return expiring_keys
    
    def refresh_if_expiring(self, key, threshold=60, extension=300):
        """Refresh key expiration if it's expiring soon"""
        ttl = self.client.ttl(key)
        if 0 < ttl <= threshold:
            self.client.expire(key, extension)
            return True
        return False

# Usage
client = fakeredis.FakeRedis()
exp_manager = ExpirationManager(client)

# Set keys with different expiration times
exp_manager.set_with_expiry('session:user1', 'session_data', 30)
exp_manager.set_with_expiry('cache:page1', 'cached_content', 300)
exp_manager.set_with_expiry('temp:file1', 'temp_data', 10)

# Find keys expiring soon
expiring = exp_manager.get_expiring_keys(threshold_seconds=120)
print("Keys expiring within 2 minutes:")
for key, ttl in expiring:
    print(f"  {key}: {ttl} seconds")

# Extend session expiry
extended = exp_manager.extend_expiry('session:user1', 300)
print(f"Extended session expiry: {extended}")

Pattern: Key Namespace Manager

import fakeredis

class NamespaceManager:
    def __init__(self, client, namespace):
        self.client = client
        self.namespace = namespace
        self.separator = ':'
    
    def _key(self, key):
        """Add namespace prefix to key"""
        return f"{self.namespace}{self.separator}{key}"
    
    def set(self, key, value, **kwargs):
        """Set value in namespace"""
        return self.client.set(self._key(key), value, **kwargs)
    
    def get(self, key):
        """Get value from namespace"""
        return self.client.get(self._key(key))
    
    def delete(self, *keys):
        """Delete keys from namespace"""
        namespaced_keys = [self._key(key) for key in keys]
        return self.client.delete(*namespaced_keys)
    
    def exists(self, key):
        """Check if key exists in namespace"""
        return self.client.exists(self._key(key))
    
    def keys(self, pattern='*'):
        """Get all keys in namespace matching pattern"""
        full_pattern = f"{self.namespace}{self.separator}{pattern}"
        keys = self.client.keys(full_pattern)
        # Remove namespace prefix from results
        prefix_len = len(self.namespace) + len(self.separator)
        return [key[prefix_len:].decode() for key in keys]
    
    def clear_namespace(self):
        """Delete all keys in this namespace"""
        pattern = f"{self.namespace}{self.separator}*"
        keys = self.client.keys(pattern)
        if keys:
            return self.client.delete(*keys)
        return 0
    
    def scan_iter(self, pattern='*'):
        """Iterate over keys in namespace"""
        full_pattern = f"{self.namespace}{self.separator}{pattern}"
        prefix_len = len(self.namespace) + len(self.separator)
        
        for key in self.client.scan_iter(match=full_pattern):
            yield key[prefix_len:].decode()

# Usage
client = fakeredis.FakeRedis()

# Create namespace managers for different applications
user_cache = NamespaceManager(client, 'user_cache')
session_store = NamespaceManager(client, 'sessions')
config_store = NamespaceManager(client, 'config')

# Use namespaced operations
user_cache.set('user:123', 'user_data')
session_store.set('sess_abc', 'session_data')
config_store.set('debug_mode', 'true')

# Keys are automatically namespaced
print("User cache keys:", user_cache.keys())
print("Session keys:", session_store.keys())
print("Config keys:", config_store.keys())

# Check actual keys in Redis
all_keys = client.keys('*')
print("All keys in Redis:", [key.decode() for key in all_keys])

# Clear specific namespace
cleared = user_cache.clear_namespace()
print(f"Cleared {cleared} keys from user_cache namespace")

Install with Tessl CLI

npx tessl i tessl/pypi-fakeredis

docs

bitmap-operations.md

core-clients.md

generic-operations.md

geospatial-operations.md

hash-operations.md

index.md

list-operations.md

lua-scripting.md

pubsub-operations.md

server-management.md

server-operations.md

set-operations.md

sorted-set-operations.md

stack-extensions.md

stream-operations.md

string-operations.md

transaction-operations.md

valkey-support.md

tile.json