Python implementation of redis API, can be used for testing purposes
—
Redis generic key operations providing functionality that works across all data types. These operations include key management, expiration handling, pattern matching, and utility functions for working with Redis keys regardless of their underlying data type.
Core functions for testing key existence, deletion, and basic key information.
def exists(self, *names: KeyT) -> ResponseT: ...
def delete(self, *names: KeyT) -> ResponseT: ...
# Note: delete() is the method name, but del_() is also available for Python keyword compatibility
def del_(self, *names: KeyT) -> ResponseT: ...
def unlink(self, *names: KeyT) -> ResponseT: ...
def type(self, name: KeyT) -> ResponseT: ...
def touch(self, *names: KeyT) -> ResponseT: ...
def randomkey(self) -> Optional[bytes]: ...
def dbsize(self) -> ResponseT: ...Functions for setting, checking, and managing key expiration times.
def expire(self, name: KeyT, time: ExpiryT) -> ResponseT: ...
def expireat(self, name: KeyT, when: AbsExpiryT) -> ResponseT: ...
def pexpire(self, name: KeyT, time: ExpiryT) -> ResponseT: ...
def pexpireat(self, name: KeyT, when: AbsExpiryT) -> ResponseT: ...
def ttl(self, name: KeyT) -> ResponseT: ...
def pttl(self, name: KeyT) -> ResponseT: ...
def persist(self, name: KeyT) -> ResponseT: ...Functions for moving keys between databases and renaming operations.
def move(self, name: KeyT, db: int) -> ResponseT: ...
def rename(self, src: KeyT, dst: KeyT) -> ResponseT: ...
def renamenx(self, src: KeyT, dst: KeyT) -> ResponseT: ...
def copy(
self,
source: KeyT,
destination: KeyT,
destination_db: Optional[int] = None,
replace: bool = False
) -> ResponseT: ...Functions for finding keys by patterns and iterating through keyspaces.
def keys(self, pattern: PatternT = "*") -> List[bytes]: ...
def scan(
self,
cursor: int = 0,
match: Optional[PatternT] = None,
count: Optional[int] = None,
_type: Optional[str] = None
) -> ResponseT: ...
def scan_iter(
self,
match: Optional[PatternT] = None,
count: Optional[int] = None,
_type: Optional[str] = None
) -> Iterator[bytes]: ...Functions for sorting data and inspecting object properties.
def sort(
self,
name: KeyT,
start: Optional[int] = None,
num: Optional[int] = None,
by: Optional[str] = None,
get: Optional[Union[KeyT, List[KeyT]]] = None,
desc: bool = False,
alpha: bool = False,
store: Optional[KeyT] = None,
groups: bool = False
) -> Union[List[bytes], int]: ...
def object(self, infotype: str, key: KeyT) -> ResponseT: ...
def memory_usage(self, key: KeyT, samples: Optional[int] = None) -> Optional[int]: ...Functions for dumping and restoring key data.
def dump(self, name: KeyT) -> Optional[bytes]: ...
def restore(
self,
name: KeyT,
ttl: int,
value: bytes,
replace: bool = False,
absttl: bool = False,
idletime: Optional[int] = None,
frequency: Optional[int] = None
) -> ResponseT: ...import fakeredis
client = fakeredis.FakeRedis()
# Set some test data
client.set('user:1', 'alice')
client.hset('profile:1', 'name', 'Alice')
client.lpush('messages:1', 'hello', 'world')
# Check if keys exist
exists_count = client.exists('user:1', 'user:2', 'profile:1')
print(f"Existing keys: {exists_count}") # 2
# Check individual key existence
if client.exists('user:1'):
print("User 1 exists")
# Get key types
user_type = client.type('user:1')
profile_type = client.type('profile:1')
messages_type = client.type('messages:1')
print(f"Types: {user_type.decode()}, {profile_type.decode()}, {messages_type.decode()}")
# Delete keys
deleted_count = client.delete('user:1', 'nonexistent_key')
print(f"Deleted keys: {deleted_count}") # 1
# Get database size
size = client.dbsize()
print(f"Database size: {size} keys")
# Get random key
random_key = client.randomkey()
if random_key:
print(f"Random key: {random_key.decode()}")import fakeredis
import time
client = fakeredis.FakeRedis()
# Set key with expiration
client.set('session:abc123', 'user_data')
client.expire('session:abc123', 30) # Expires in 30 seconds
# Check time to live
ttl_seconds = client.ttl('session:abc123')
print(f"TTL: {ttl_seconds} seconds")
# Set expiration with millisecond precision
client.set('temp:data', 'temporary')
client.pexpire('temp:data', 5000) # Expires in 5000 milliseconds (5 seconds)
ttl_ms = client.pttl('temp:data')
print(f"TTL: {ttl_ms} milliseconds")
# Set absolute expiration time
future_timestamp = int(time.time()) + 60 # 1 minute from now
client.set('scheduled:task', 'task_data')
client.expireat('scheduled:task', future_timestamp)
# Remove expiration (make key persistent)
client.persist('scheduled:task')
new_ttl = client.ttl('scheduled:task')
print(f"TTL after persist: {new_ttl}") # -1 means no expiration
# Touch keys to update last access time
client.set('key1', 'value1')
client.set('key2', 'value2')
touched = client.touch('key1', 'key2', 'nonexistent')
print(f"Touched keys: {touched}") # 2import fakeredis
client = fakeredis.FakeRedis()
# Setup test data with patterns
test_data = {
'user:1:name': 'alice',
'user:1:email': 'alice@example.com',
'user:2:name': 'bob',
'user:2:email': 'bob@example.com',
'session:abc': 'session_data_1',
'session:xyz': 'session_data_2',
'config:app:debug': 'true',
'config:db:host': 'localhost'
}
for key, value in test_data.items():
client.set(key, value)
# Find all user keys
user_keys = client.keys('user:*')
print("User keys:", [key.decode() for key in user_keys])
# Find user names specifically
user_names = client.keys('user:*:name')
print("User name keys:", [key.decode() for key in user_names])
# Find session keys
session_keys = client.keys('session:*')
print("Session keys:", [key.decode() for key in session_keys])
# Use character classes in patterns
config_keys = client.keys('config:*:*')
print("Config keys:", [key.decode() for key in config_keys])
# Find keys with single character wildcards
user_1_keys = client.keys('user:1:*')
print("User 1 keys:", [key.decode() for key in user_1_keys])import fakeredis
client = fakeredis.FakeRedis()
# Create many keys for demonstration
for i in range(1000):
client.set(f'key_{i}', f'value_{i}')
if i % 100 == 0:
client.set(f'special_key_{i}', f'special_value_{i}')
# Scan all keys efficiently
cursor = 0
all_keys = []
while True:
cursor, keys = client.scan(cursor=cursor, count=100)
all_keys.extend([key.decode() for key in keys])
if cursor == 0:
break
print(f"Total keys found: {len(all_keys)}")
# Scan with pattern matching
special_keys = []
for key in client.scan_iter(match='special_key_*', count=50):
special_keys.append(key.decode())
print(f"Special keys found: {len(special_keys)}")
# Scan by type (Redis 6.0+)
string_keys = []
for key in client.scan_iter(_type='string', count=100):
string_keys.append(key.decode())
print(f"String type keys: {len(string_keys)}")import fakeredis
client = fakeredis.FakeRedis()
# Setup test data
client.set('old_key', 'important_data')
client.set('temp_key', 'temporary_data')
# Rename key (overwrites destination if exists)
client.rename('old_key', 'new_key')
# Verify rename
if not client.exists('old_key') and client.exists('new_key'):
print("Key renamed successfully")
# Conditional rename (only if destination doesn't exist)
result = client.renamenx('temp_key', 'new_key') # Will fail because new_key exists
print(f"Conditional rename result: {result}") # 0 (failed)
result = client.renamenx('temp_key', 'unique_key') # Will succeed
print(f"Conditional rename result: {result}") # 1 (success)
# Copy keys (Redis 6.2+)
client.set('source', 'data_to_copy')
copied = client.copy('source', 'destination')
print(f"Copy result: {copied}") # 1 if successful
# Copy with replace option
copied = client.copy('source', 'destination', replace=True)
print(f"Copy with replace: {copied}")
# Move keys between databases (if multiple databases supported)
# Note: FakeRedis typically operates on a single database
# client.select(0)
# client.set('moveable_key', 'data')
# moved = client.move('moveable_key', 1) # Move to database 1import fakeredis
client = fakeredis.FakeRedis()
# Create a list to sort
client.lpush('numbers', '30', '10', '50', '20', '40')
# Sort numerically (default)
sorted_numbers = client.sort('numbers')
print("Sorted numbers:", [num.decode() for num in sorted_numbers])
# Sort in descending order
sorted_desc = client.sort('numbers', desc=True)
print("Descending:", [num.decode() for num in sorted_desc])
# Sort alphabetically
client.lpush('words', 'zebra', 'apple', 'banana', 'cherry')
sorted_alpha = client.sort('words', alpha=True)
print("Sorted words:", [word.decode() for word in sorted_alpha])
# Sort with limit
limited_sort = client.sort('numbers', start=0, num=3)
print("Top 3:", [num.decode() for num in limited_sort])
# Sort and store result
client.sort('numbers', store='sorted_numbers')
stored_result = client.lrange('sorted_numbers', 0, -1)
print("Stored result:", [num.decode() for num in stored_result])import fakeredis
client = fakeredis.FakeRedis()
# Setup data for advanced sorting
users = ['user:1', 'user:2', 'user:3']
client.lpush('users', *users)
# Set up external sort keys (scores)
client.set('score:user:1', '85')
client.set('score:user:2', '92')
client.set('score:user:3', '78')
# Set up data to retrieve
client.set('name:user:1', 'Alice')
client.set('name:user:2', 'Bob')
client.set('name:user:3', 'Charlie')
# Sort users by their scores
sorted_by_score = client.sort('users', by='score:*')
print("Users sorted by score:", [user.decode() for user in sorted_by_score])
# Sort and get additional data
sorted_with_names = client.sort('users', by='score:*', get=['name:*', 'score:*'])
print("Sorted with names and scores:")
for i in range(0, len(sorted_with_names), 2):
name = sorted_with_names[i].decode()
score = sorted_with_names[i + 1].decode()
print(f" {name}: {score}")import fakeredis
client = fakeredis.FakeRedis()
# Create various data structures
client.set('simple_string', 'hello world')
client.hset('user_hash', 'name', 'Alice', 'age', '30', 'email', 'alice@example.com')
client.lpush('number_list', *[str(i) for i in range(100)])
# Get object information (encoding, idle time, etc.)
try:
encoding = client.object('encoding', 'simple_string')
print(f"String encoding: {encoding}")
except:
print("Object command not fully supported in this Redis version")
# Get memory usage (Redis 4.0+)
try:
string_memory = client.memory_usage('simple_string')
hash_memory = client.memory_usage('user_hash')
list_memory = client.memory_usage('number_list')
print(f"Memory usage:")
print(f" String: {string_memory} bytes")
print(f" Hash: {hash_memory} bytes")
print(f" List: {list_memory} bytes")
except:
print("Memory usage command not available")import fakeredis
client = fakeredis.FakeRedis()
# Create some data
client.set('backup_test', 'important_data')
client.hset('user_profile', 'name', 'Alice', 'age', '30')
client.lpush('activity_log', 'login', 'view_page', 'logout')
# Dump key data for backup
string_dump = client.dump('backup_test')
hash_dump = client.dump('user_profile')
list_dump = client.dump('activity_log')
print(f"Dump sizes: string={len(string_dump)}, hash={len(hash_dump)}, list={len(list_dump)}")
# Delete original keys
client.delete('backup_test', 'user_profile', 'activity_log')
# Restore from dumps
if string_dump:
client.restore('backup_test_restored', 0, string_dump)
if hash_dump:
client.restore('user_profile_restored', 0, hash_dump)
if list_dump:
client.restore('activity_log_restored', 0, list_dump)
# Verify restoration
restored_string = client.get('backup_test_restored')
restored_hash = client.hgetall('user_profile_restored')
restored_list = client.lrange('activity_log_restored', 0, -1)
print(f"Restored string: {restored_string.decode()}")
print(f"Restored hash: {restored_hash}")
print(f"Restored list: {[item.decode() for item in restored_list]}")import fakeredis
import time
class ExpirationManager:
def __init__(self, client):
self.client = client
def set_with_expiry(self, key, value, seconds):
"""Set key with expiration"""
self.client.set(key, value)
return self.client.expire(key, seconds)
def extend_expiry(self, key, additional_seconds):
"""Extend existing expiration"""
current_ttl = self.client.ttl(key)
if current_ttl > 0:
return self.client.expire(key, current_ttl + additional_seconds)
return False
def get_expiring_keys(self, pattern='*', threshold_seconds=60):
"""Find keys expiring within threshold"""
expiring_keys = []
for key in self.client.scan_iter(match=pattern):
ttl = self.client.ttl(key.decode())
if 0 < ttl <= threshold_seconds:
expiring_keys.append((key.decode(), ttl))
return expiring_keys
def refresh_if_expiring(self, key, threshold=60, extension=300):
"""Refresh key expiration if it's expiring soon"""
ttl = self.client.ttl(key)
if 0 < ttl <= threshold:
self.client.expire(key, extension)
return True
return False
# Usage
client = fakeredis.FakeRedis()
exp_manager = ExpirationManager(client)
# Set keys with different expiration times
exp_manager.set_with_expiry('session:user1', 'session_data', 30)
exp_manager.set_with_expiry('cache:page1', 'cached_content', 300)
exp_manager.set_with_expiry('temp:file1', 'temp_data', 10)
# Find keys expiring soon
expiring = exp_manager.get_expiring_keys(threshold_seconds=120)
print("Keys expiring within 2 minutes:")
for key, ttl in expiring:
print(f" {key}: {ttl} seconds")
# Extend session expiry
extended = exp_manager.extend_expiry('session:user1', 300)
print(f"Extended session expiry: {extended}")import fakeredis
class NamespaceManager:
def __init__(self, client, namespace):
self.client = client
self.namespace = namespace
self.separator = ':'
def _key(self, key):
"""Add namespace prefix to key"""
return f"{self.namespace}{self.separator}{key}"
def set(self, key, value, **kwargs):
"""Set value in namespace"""
return self.client.set(self._key(key), value, **kwargs)
def get(self, key):
"""Get value from namespace"""
return self.client.get(self._key(key))
def delete(self, *keys):
"""Delete keys from namespace"""
namespaced_keys = [self._key(key) for key in keys]
return self.client.delete(*namespaced_keys)
def exists(self, key):
"""Check if key exists in namespace"""
return self.client.exists(self._key(key))
def keys(self, pattern='*'):
"""Get all keys in namespace matching pattern"""
full_pattern = f"{self.namespace}{self.separator}{pattern}"
keys = self.client.keys(full_pattern)
# Remove namespace prefix from results
prefix_len = len(self.namespace) + len(self.separator)
return [key[prefix_len:].decode() for key in keys]
def clear_namespace(self):
"""Delete all keys in this namespace"""
pattern = f"{self.namespace}{self.separator}*"
keys = self.client.keys(pattern)
if keys:
return self.client.delete(*keys)
return 0
def scan_iter(self, pattern='*'):
"""Iterate over keys in namespace"""
full_pattern = f"{self.namespace}{self.separator}{pattern}"
prefix_len = len(self.namespace) + len(self.separator)
for key in self.client.scan_iter(match=full_pattern):
yield key[prefix_len:].decode()
# Usage
client = fakeredis.FakeRedis()
# Create namespace managers for different applications
user_cache = NamespaceManager(client, 'user_cache')
session_store = NamespaceManager(client, 'sessions')
config_store = NamespaceManager(client, 'config')
# Use namespaced operations
user_cache.set('user:123', 'user_data')
session_store.set('sess_abc', 'session_data')
config_store.set('debug_mode', 'true')
# Keys are automatically namespaced
print("User cache keys:", user_cache.keys())
print("Session keys:", session_store.keys())
print("Config keys:", config_store.keys())
# Check actual keys in Redis
all_keys = client.keys('*')
print("All keys in Redis:", [key.decode() for key in all_keys])
# Clear specific namespace
cleared = user_cache.clear_namespace()
print(f"Cleared {cleared} keys from user_cache namespace")Install with Tessl CLI
npx tessl i tessl/pypi-fakeredisdocs