Python implementation of redis API, can be used for testing purposes
—
Redis server management commands including database operations, configuration, and information retrieval. Server operations provide administrative capabilities for monitoring, configuring, and maintaining Redis instances with support for multiple databases, client management, and system information.
Commands for retrieving server status, statistics, and configuration details.
def info(self, section: Optional[str] = None) -> Dict[str, Any]: ...
def config_get(self, pattern: str = "*") -> Dict[bytes, bytes]: ...
def config_set(self, name: str, value: EncodableT) -> bool: ...
def config_rewrite(self) -> bool: ...
def config_resetstat(self) -> bool: ...
def time(self) -> Tuple[str, str]: ...
def lastsave(self) -> int: ...Operations for managing Redis databases and their contents.
def dbsize(self) -> int: ...
def flushdb(self, asynchronous: bool = False) -> bool: ...
def flushall(self, asynchronous: bool = False) -> bool: ...
def swapdb(self, first: int, second: int) -> bool: ...
def select(self, index: int) -> bool: ...Functions for monitoring and managing client connections.
def client_list(self, _type: Optional[str] = None, client_id: Optional[int] = None) -> str: ...
def client_info(self) -> Dict[str, Union[str, int]]: ...
def client_setname(self, name: str) -> bool: ...
def client_getname(self) -> Optional[bytes]: ...
def client_id(self) -> int: ...
def client_kill(self, address: str) -> bool: ...
def client_pause(self, timeout: int) -> bool: ...Commands for memory management and analysis.
def memory_usage(self, key: KeyT, samples: Optional[int] = None) -> Optional[int]: ...
def memory_stats(self) -> Dict[str, Any]: ...
def memory_purge(self) -> bool: ...Functions for managing Redis data persistence.
def save(self) -> bool: ...
def bgsave(self, schedule: bool = True) -> bool: ...
def bgrewriteaof(self) -> bool: ...Redis Access Control List operations for user management and authentication.
def acl_cat(self, *category: bytes) -> List[bytes]: ...
def acl_genpass(self, *args: bytes) -> bytes: ...
def acl_setuser(self, username: bytes, *args: bytes) -> bytes: ...
def acl_list(self) -> List[bytes]: ...
def acl_deluser(self, username: bytes) -> bytes: ...
def acl_getuser(self, username: bytes) -> List[bytes]: ...
def acl_users(self) -> List[bytes]: ...
def acl_whoami(self) -> bytes: ...
def acl_save(self) -> SimpleString: ...
def acl_load(self) -> SimpleString: ...
def acl_log(self, *args: bytes) -> Union[SimpleString, List[Dict[str, str]]]: ...Operations for inspecting available Redis commands and their properties.
def command(self) -> List[List[Any]]: ...
def command_count(self) -> int: ...
def command_getkeys(self, *args) -> List[bytes]: ...
def command_info(self, *command_names: str) -> List[Optional[List[Any]]]: ...Advanced operations for debugging and system monitoring.
def debug_object(self, key: KeyT) -> str: ...
def monitor(self) -> None: ...
def slowlog_get(self, num: Optional[int] = None) -> List[Dict[str, Any]]: ...
def slowlog_len(self) -> int: ...
def slowlog_reset(self) -> bool: ...import fakeredis
import pprint
client = fakeredis.FakeRedis()
# Add some test data to generate statistics
client.mset({
'user:1': 'alice',
'user:2': 'bob',
'counter': '42',
'session:abc': 'active'
})
client.lpush('queue', 'task1', 'task2', 'task3')
client.sadd('tags', 'redis', 'python', 'testing')
# Get comprehensive server information
print("=== Server Information ===")
info = client.info()
# Display key sections
sections_to_show = ['server', 'clients', 'memory', 'stats', 'keyspace']
for section in sections_to_show:
if section in info:
print(f"\n{section.upper()}:")
section_info = info[section]
if isinstance(section_info, dict):
for key, value in section_info.items():
print(f" {key}: {value}")
else:
print(f" {section_info}")
# Get specific information sections
print("\n=== Memory Information ===")
memory_info = client.info('memory')
if 'memory' in memory_info:
for key, value in memory_info['memory'].items():
if 'memory' in key.lower():
print(f" {key}: {value}")
print("\n=== Statistics ===")
stats_info = client.info('stats')
if 'stats' in stats_info:
for key, value in stats_info['stats'].items():
if 'commands' in key or 'connections' in key:
print(f" {key}: {value}")import fakeredis
client = fakeredis.FakeRedis()
# Get current configuration
print("=== Current Configuration ===")
config = client.config_get()
important_configs = ['maxmemory', 'timeout', 'tcp-keepalive', 'databases']
for key, value in config.items():
key_str = key.decode()
if any(cfg in key_str for cfg in important_configs):
print(f" {key_str}: {value.decode()}")
# Get specific configuration values
print("\n=== Specific Config Values ===")
memory_config = client.config_get('*memory*')
for key, value in memory_config.items():
print(f" {key.decode()}: {value.decode()}")
# Set configuration values
print("\n=== Setting Configuration ===")
try:
# Set maximum memory (example)
result = client.config_set('maxmemory', '100mb')
print(f"Set maxmemory: {result}")
# Verify the change
new_maxmem = client.config_get('maxmemory')
print(f"New maxmemory: {new_maxmem[b'maxmemory'].decode()}")
except Exception as e:
print(f"Config set error: {e}")
# Reset statistics
print("\n=== Reset Statistics ===")
reset_result = client.config_resetstat()
print(f"Statistics reset: {reset_result}")import fakeredis
client = fakeredis.FakeRedis()
# Add data to current database (0)
client.mset({
'db0:key1': 'value1',
'db0:key2': 'value2',
'db0:counter': '100'
})
print(f"Database 0 size: {client.dbsize()}")
# Switch to database 1
client.select(1)
client.mset({
'db1:key1': 'different_value1',
'db1:key2': 'different_value2'
})
print(f"Database 1 size: {client.dbsize()}")
# Switch back to database 0
client.select(0)
print(f"Back to database 0, size: {client.dbsize()}")
# Demonstrate database swapping
print("\n=== Database Swapping ===")
print("Before swap:")
print(f" DB 0 key: {client.get('db0:key1')}")
client.select(1)
print(f" DB 1 key: {client.get('db1:key1')}")
# Swap databases 0 and 1
client.select(0) # Go back to 0 for swap
swap_result = client.swapdb(0, 1)
print(f"\nDatabase swap result: {swap_result}")
print("After swap:")
print(f" DB 0 key (was DB 1): {client.get('db1:key1')}")
client.select(1)
print(f" DB 1 key (was DB 0): {client.get('db0:key1')}")
# Flush operations
print("\n=== Flush Operations ===")
client.select(0)
print(f"DB 0 size before flush: {client.dbsize()}")
# Flush current database
client.flushdb()
print(f"DB 0 size after flushdb: {client.dbsize()}")
client.select(1)
print(f"DB 1 size (untouched): {client.dbsize()}")
# Add some data back and test flushall
client.select(0)
client.set('test_key', 'test_value')
client.select(1)
client.set('another_key', 'another_value')
print(f"\nBefore flushall - DB 0: {client.dbsize()}")
client.select(0)
print(f"Before flushall - DB 0: {client.dbsize()}")
# Flush all databases
client.flushall()
print(f"After flushall - DB 0: {client.dbsize()}")
client.select(1)
print(f"After flushall - DB 1: {client.dbsize()}")import fakeredis
import threading
import time
# Create multiple client connections
clients = []
for i in range(3):
client = fakeredis.FakeRedis()
client.client_setname(f'client_{i}')
clients.append(client)
# Main client for monitoring
monitor_client = fakeredis.FakeRedis()
monitor_client.client_setname('monitor_client')
print("=== Client Information ===")
# Get client list
client_list = monitor_client.client_list()
print("Connected clients:")
print(client_list)
# Get current client info
client_info = monitor_client.client_info()
print(f"\nMonitor client info:")
for key, value in client_info.items():
print(f" {key}: {value}")
# Get client ID
client_id = monitor_client.client_id()
print(f"\nMonitor client ID: {client_id}")
# Get client name
client_name = monitor_client.client_getname()
if client_name:
print(f"Monitor client name: {client_name.decode()}")
# Simulate some client activity
def client_activity(client, client_name, duration):
"""Simulate client activity"""
start_time = time.time()
counter = 0
while time.time() - start_time < duration:
# Perform various operations
client.set(f'{client_name}:counter', str(counter))
client.get(f'{client_name}:counter')
client.incr(f'{client_name}:activity')
counter += 1
time.sleep(0.1)
print("\n=== Starting Client Activity ===")
# Start client activity threads
threads = []
for i, client in enumerate(clients):
thread = threading.Thread(
target=client_activity,
args=(client, f'client_{i}', 2) # 2 seconds of activity
)
threads.append(thread)
thread.start()
# Monitor clients during activity
time.sleep(1)
print(f"\nClients during activity:")
active_client_list = monitor_client.client_list()
print(active_client_list)
# Wait for activity to complete
for thread in threads:
thread.join()
print(f"\nClients after activity:")
final_client_list = monitor_client.client_list()
print(final_client_list)import fakeredis
client = fakeredis.FakeRedis()
# Create data with different memory characteristics
print("=== Creating Test Data ===")
# Small strings
for i in range(100):
client.set(f'small:{i}', f'value_{i}')
# Large strings
large_data = 'x' * 10000
for i in range(10):
client.set(f'large:{i}', large_data)
# Complex data structures
for i in range(20):
# Hashes
client.hset(f'user:{i}', mapping={
'name': f'User {i}',
'email': f'user{i}@example.com',
'age': str(20 + i),
'bio': 'A user with a longer biography ' * 10
})
# Lists
client.lpush(f'list:{i}', *[f'item_{j}' for j in range(50)])
# Sets
client.sadd(f'set:{i}', *[f'member_{j}' for j in range(30)])
# Analyze memory usage
print("\n=== Memory Analysis ===")
# Get overall memory stats
try:
memory_stats = client.memory_stats()
print("Memory statistics:")
for key, value in memory_stats.items():
if isinstance(value, (int, float)):
if 'bytes' in key:
print(f" {key}: {value:,} bytes")
else:
print(f" {key}: {value}")
except:
print("Memory stats not available in this Redis version")
# Analyze specific key memory usage
print("\n=== Key Memory Usage ===")
test_keys = [
'small:0',
'large:0',
'user:0',
'list:0',
'set:0'
]
for key in test_keys:
try:
usage = client.memory_usage(key)
if usage is not None:
print(f" {key}: {usage} bytes")
else:
print(f" {key}: Key not found")
except:
print(f" {key}: Memory usage not available")
# Get database size
total_keys = client.dbsize()
print(f"\nTotal keys in database: {total_keys}")import fakeredis
import time
client = fakeredis.FakeRedis()
# Get server time
print("=== Server Time ===")
server_time = client.time()
server_timestamp = int(server_time[0])
server_microseconds = int(server_time[1])
print(f"Server timestamp: {server_timestamp}")
print(f"Server microseconds: {server_microseconds}")
# Convert to readable format
import datetime
readable_time = datetime.datetime.fromtimestamp(server_timestamp)
print(f"Readable time: {readable_time}")
# Add some data for persistence testing
print("\n=== Adding Data for Persistence Test ===")
test_data = {
'persistent:config': 'important_setting',
'persistent:counter': '12345',
'persistent:timestamp': str(int(time.time()))
}
for key, value in test_data.items():
client.set(key, value)
print(f"Set {key}: {value}")
# Test save operations
print("\n=== Persistence Operations ===")
# Synchronous save
try:
save_result = client.save()
print(f"Synchronous save: {save_result}")
except Exception as e:
print(f"Save error: {e}")
# Background save
try:
bgsave_result = client.bgsave()
print(f"Background save: {bgsave_result}")
except Exception as e:
print(f"Background save error: {e}")
# Get last save time
try:
last_save = client.lastsave()
last_save_time = datetime.datetime.fromtimestamp(last_save)
print(f"Last save time: {last_save_time}")
except Exception as e:
print(f"Last save error: {e}")import fakeredis
client = fakeredis.FakeRedis()
print("=== Command Information ===")
# Get command count
try:
cmd_count = client.command_count()
print(f"Total commands available: {cmd_count}")
except:
print("Command count not available")
# Get information about specific commands
test_commands = ['SET', 'GET', 'HSET', 'LPUSH', 'ZADD']
try:
cmd_info = client.command_info(*test_commands)
print(f"\nCommand information:")
for i, cmd_name in enumerate(test_commands):
if i < len(cmd_info) and cmd_info[i]:
info = cmd_info[i]
print(f" {cmd_name}:")
if len(info) >= 2:
print(f" Arity: {info[1]}") # Number of arguments
if len(info) >= 3:
print(f" Flags: {info[2]}") # Command flags
else:
print(f" {cmd_name}: No information available")
except Exception as e:
print(f"Command info error: {e}")
# Test debug object (if available)
print("\n=== Debug Information ===")
client.set('debug_key', 'debug_value')
client.lpush('debug_list', 'item1', 'item2', 'item3')
debug_keys = ['debug_key', 'debug_list']
for key in debug_keys:
try:
debug_info = client.debug_object(key)
print(f"Debug info for {key}: {debug_info}")
except Exception as e:
print(f"Debug info for {key}: Not available ({e})")
# Slowlog operations (if supported)
print("\n=== Slowlog Operations ===")
try:
# Get slowlog length
slowlog_len = client.slowlog_len()
print(f"Slowlog entries: {slowlog_len}")
# Get slowlog entries
if slowlog_len > 0:
slowlog_entries = client.slowlog_get(5) # Get last 5 entries
print("Recent slow queries:")
for entry in slowlog_entries:
print(f" ID: {entry.get('id', 'N/A')}")
print(f" Duration: {entry.get('duration', 'N/A')} microseconds")
print(f" Command: {entry.get('command', 'N/A')}")
print(f" Time: {entry.get('start_time', 'N/A')}")
print()
# Reset slowlog
reset_result = client.slowlog_reset()
print(f"Slowlog reset: {reset_result}")
except Exception as e:
print(f"Slowlog operations not available: {e}")import fakeredis
import time
import json
from typing import Dict, Any, List
from dataclasses import dataclass
@dataclass
class HealthCheck:
name: str
status: str
response_time: float
message: str
timestamp: int
class RedisHealthMonitor:
def __init__(self, client: fakeredis.FakeRedis):
self.client = client
def perform_health_check(self) -> HealthCheck:
"""Perform comprehensive Redis health check"""
start_time = time.time()
try:
# Test basic connectivity
self.client.ping()
# Test read/write operations
test_key = f"health_check:{int(time.time())}"
self.client.set(test_key, "health_test", ex=60)
value = self.client.get(test_key)
if value != b"health_test":
raise Exception("Read/write test failed")
# Clean up test key
self.client.delete(test_key)
response_time = (time.time() - start_time) * 1000 # ms
return HealthCheck(
name="redis_health_check",
status="healthy",
response_time=response_time,
message="All checks passed",
timestamp=int(time.time())
)
except Exception as e:
response_time = (time.time() - start_time) * 1000
return HealthCheck(
name="redis_health_check",
status="unhealthy",
response_time=response_time,
message=f"Health check failed: {str(e)}",
timestamp=int(time.time())
)
def get_system_metrics(self) -> Dict[str, Any]:
"""Gather comprehensive system metrics"""
metrics = {}
try:
# Server information
info = self.client.info()
# Extract key metrics
if 'server' in info:
server_info = info['server']
metrics['server'] = {
'redis_version': server_info.get('redis_version', 'unknown'),
'uptime_seconds': server_info.get('uptime_in_seconds', 0),
'connected_clients': server_info.get('connected_clients', 0)
}
if 'memory' in info:
memory_info = info['memory']
metrics['memory'] = {
'used_memory': memory_info.get('used_memory', 0),
'used_memory_human': memory_info.get('used_memory_human', '0B'),
'maxmemory': memory_info.get('maxmemory', 0)
}
if 'stats' in info:
stats_info = info['stats']
metrics['stats'] = {
'total_commands_processed': stats_info.get('total_commands_processed', 0),
'total_connections_received': stats_info.get('total_connections_received', 0),
'expired_keys': stats_info.get('expired_keys', 0),
'evicted_keys': stats_info.get('evicted_keys', 0)
}
if 'keyspace' in info:
keyspace_info = info['keyspace']
metrics['keyspace'] = {}
for db, db_info in keyspace_info.items():
if isinstance(db_info, dict):
metrics['keyspace'][db] = {
'keys': db_info.get('keys', 0),
'expires': db_info.get('expires', 0)
}
# Database size
metrics['database'] = {
'total_keys': self.client.dbsize()
}
# Client information
client_info = self.client.client_info()
metrics['client'] = {
'client_id': client_info.get('id', 0),
'client_name': client_info.get('name', ''),
'database': client_info.get('db', 0)
}
except Exception as e:
metrics['error'] = f"Failed to gather metrics: {str(e)}"
return metrics
def store_health_history(self, health_check: HealthCheck, max_history: int = 100):
"""Store health check results for trend analysis"""
history_key = "health_check_history"
# Store as JSON in a list
health_data = {
'name': health_check.name,
'status': health_check.status,
'response_time': health_check.response_time,
'message': health_check.message,
'timestamp': health_check.timestamp
}
# Add to list (most recent first)
self.client.lpush(history_key, json.dumps(health_data))
# Trim to max history
self.client.ltrim(history_key, 0, max_history - 1)
def get_health_history(self, limit: int = 10) -> List[HealthCheck]:
"""Retrieve recent health check history"""
history_key = "health_check_history"
try:
history_data = self.client.lrange(history_key, 0, limit - 1)
health_checks = []
for data in history_data:
health_dict = json.loads(data.decode())
health_check = HealthCheck(
name=health_dict['name'],
status=health_dict['status'],
response_time=health_dict['response_time'],
message=health_dict['message'],
timestamp=health_dict['timestamp']
)
health_checks.append(health_check)
return health_checks
except Exception as e:
print(f"Error retrieving health history: {e}")
return []
def get_health_summary(self) -> Dict[str, Any]:
"""Generate health summary with recent trends"""
history = self.get_health_history(20) # Last 20 checks
if not history:
return {"status": "unknown", "message": "No health history available"}
# Calculate statistics
healthy_count = sum(1 for check in history if check.status == "healthy")
total_checks = len(history)
success_rate = (healthy_count / total_checks) * 100
recent_checks = history[:5] # Last 5 checks
avg_response_time = sum(check.response_time for check in recent_checks) / len(recent_checks)
# Determine overall status
if success_rate >= 95:
overall_status = "healthy"
elif success_rate >= 80:
overall_status = "degraded"
else:
overall_status = "unhealthy"
return {
"overall_status": overall_status,
"success_rate": round(success_rate, 2),
"avg_response_time": round(avg_response_time, 2),
"total_checks": total_checks,
"healthy_checks": healthy_count,
"last_check": {
"status": history[0].status,
"timestamp": history[0].timestamp,
"message": history[0].message
}
}
# Usage example
client = fakeredis.FakeRedis()
health_monitor = RedisHealthMonitor(client)
print("=== Redis Health Monitoring ===")
# Perform several health checks
for i in range(5):
print(f"\nPerforming health check {i+1}...")
health_check = health_monitor.perform_health_check()
print(f"Status: {health_check.status}")
print(f"Response time: {health_check.response_time:.2f}ms")
print(f"Message: {health_check.message}")
# Store in history
health_monitor.store_health_history(health_check)
# Add some test data to make metrics more interesting
if i == 2:
client.mset({f'test_key_{j}': f'test_value_{j}' for j in range(50)})
time.sleep(0.1) # Brief pause between checks
# Get system metrics
print(f"\n=== System Metrics ===")
metrics = health_monitor.get_system_metrics()
print(json.dumps(metrics, indent=2, default=str))
# Get health summary
print(f"\n=== Health Summary ===")
summary = health_monitor.get_health_summary()
print(json.dumps(summary, indent=2))
# Show recent health history
print(f"\n=== Recent Health History ===")
recent_history = health_monitor.get_health_history(5)
for i, check in enumerate(recent_history):
timestamp_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(check.timestamp))
print(f"{i+1}. {timestamp_str}: {check.status} ({check.response_time:.1f}ms) - {check.message}")Install with Tessl CLI
npx tessl i tessl/pypi-fakeredisdocs