CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-fakeredis

Python implementation of redis API, can be used for testing purposes

Pending
Overview
Eval results
Files

server-operations.mddocs/

Server Operations

Redis server management commands including database operations, configuration, and information retrieval. Server operations provide administrative capabilities for monitoring, configuring, and maintaining Redis instances with support for multiple databases, client management, and system information.

Capabilities

Server Information

Commands for retrieving server status, statistics, and configuration details.

def info(self, section: Optional[str] = None) -> Dict[str, Any]: ...

def config_get(self, pattern: str = "*") -> Dict[bytes, bytes]: ...

def config_set(self, name: str, value: EncodableT) -> bool: ...

def config_rewrite(self) -> bool: ...

def config_resetstat(self) -> bool: ...

def time(self) -> Tuple[str, str]: ...

def lastsave(self) -> int: ...

Database Management

Operations for managing Redis databases and their contents.

def dbsize(self) -> int: ...

def flushdb(self, asynchronous: bool = False) -> bool: ...

def flushall(self, asynchronous: bool = False) -> bool: ...

def swapdb(self, first: int, second: int) -> bool: ...

def select(self, index: int) -> bool: ...

Client Management

Functions for monitoring and managing client connections.

def client_list(self, _type: Optional[str] = None, client_id: Optional[int] = None) -> str: ...

def client_info(self) -> Dict[str, Union[str, int]]: ...

def client_setname(self, name: str) -> bool: ...

def client_getname(self) -> Optional[bytes]: ...

def client_id(self) -> int: ...

def client_kill(self, address: str) -> bool: ...

def client_pause(self, timeout: int) -> bool: ...

Memory Operations

Commands for memory management and analysis.

def memory_usage(self, key: KeyT, samples: Optional[int] = None) -> Optional[int]: ...

def memory_stats(self) -> Dict[str, Any]: ...

def memory_purge(self) -> bool: ...

Persistence Operations

Functions for managing Redis data persistence.

def save(self) -> bool: ...

def bgsave(self, schedule: bool = True) -> bool: ...

def bgrewriteaof(self) -> bool: ...

Access Control Lists (ACL)

Redis Access Control List operations for user management and authentication.

def acl_cat(self, *category: bytes) -> List[bytes]: ...

def acl_genpass(self, *args: bytes) -> bytes: ...

def acl_setuser(self, username: bytes, *args: bytes) -> bytes: ...

def acl_list(self) -> List[bytes]: ...

def acl_deluser(self, username: bytes) -> bytes: ...

def acl_getuser(self, username: bytes) -> List[bytes]: ...

def acl_users(self) -> List[bytes]: ...

def acl_whoami(self) -> bytes: ...

def acl_save(self) -> SimpleString: ...

def acl_load(self) -> SimpleString: ...

def acl_log(self, *args: bytes) -> Union[SimpleString, List[Dict[str, str]]]: ...

Command Information

Operations for inspecting available Redis commands and their properties.

def command(self) -> List[List[Any]]: ...

def command_count(self) -> int: ...

def command_getkeys(self, *args) -> List[bytes]: ...

def command_info(self, *command_names: str) -> List[Optional[List[Any]]]: ...

Debugging and Monitoring

Advanced operations for debugging and system monitoring.

def debug_object(self, key: KeyT) -> str: ...

def monitor(self) -> None: ...

def slowlog_get(self, num: Optional[int] = None) -> List[Dict[str, Any]]: ...

def slowlog_len(self) -> int: ...

def slowlog_reset(self) -> bool: ...

Usage Examples

Server Information and Statistics

import fakeredis
import pprint

client = fakeredis.FakeRedis()

# Add some test data to generate statistics
client.mset({
    'user:1': 'alice',
    'user:2': 'bob', 
    'counter': '42',
    'session:abc': 'active'
})
client.lpush('queue', 'task1', 'task2', 'task3')
client.sadd('tags', 'redis', 'python', 'testing')

# Get comprehensive server information
print("=== Server Information ===")
info = client.info()

# Display key sections
sections_to_show = ['server', 'clients', 'memory', 'stats', 'keyspace']
for section in sections_to_show:
    if section in info:
        print(f"\n{section.upper()}:")
        section_info = info[section]
        if isinstance(section_info, dict):
            for key, value in section_info.items():
                print(f"  {key}: {value}")
        else:
            print(f"  {section_info}")

# Get specific information sections
print("\n=== Memory Information ===")
memory_info = client.info('memory')
if 'memory' in memory_info:
    for key, value in memory_info['memory'].items():
        if 'memory' in key.lower():
            print(f"  {key}: {value}")

print("\n=== Statistics ===")
stats_info = client.info('stats')
if 'stats' in stats_info:
    for key, value in stats_info['stats'].items():
        if 'commands' in key or 'connections' in key:
            print(f"  {key}: {value}")

Configuration Management

import fakeredis

client = fakeredis.FakeRedis()

# Get current configuration
print("=== Current Configuration ===")
config = client.config_get()
important_configs = ['maxmemory', 'timeout', 'tcp-keepalive', 'databases']

for key, value in config.items():
    key_str = key.decode()
    if any(cfg in key_str for cfg in important_configs):
        print(f"  {key_str}: {value.decode()}")

# Get specific configuration values
print("\n=== Specific Config Values ===")
memory_config = client.config_get('*memory*')
for key, value in memory_config.items():
    print(f"  {key.decode()}: {value.decode()}")

# Set configuration values
print("\n=== Setting Configuration ===")
try:
    # Set maximum memory (example)
    result = client.config_set('maxmemory', '100mb')
    print(f"Set maxmemory: {result}")
    
    # Verify the change
    new_maxmem = client.config_get('maxmemory')
    print(f"New maxmemory: {new_maxmem[b'maxmemory'].decode()}")
    
except Exception as e:
    print(f"Config set error: {e}")

# Reset statistics
print("\n=== Reset Statistics ===")
reset_result = client.config_resetstat()
print(f"Statistics reset: {reset_result}")

Database Operations

import fakeredis

client = fakeredis.FakeRedis()

# Add data to current database (0)
client.mset({
    'db0:key1': 'value1',
    'db0:key2': 'value2',
    'db0:counter': '100'
})

print(f"Database 0 size: {client.dbsize()}")

# Switch to database 1
client.select(1)
client.mset({
    'db1:key1': 'different_value1',
    'db1:key2': 'different_value2'
})

print(f"Database 1 size: {client.dbsize()}")

# Switch back to database 0
client.select(0)
print(f"Back to database 0, size: {client.dbsize()}")

# Demonstrate database swapping
print("\n=== Database Swapping ===")
print("Before swap:")
print(f"  DB 0 key: {client.get('db0:key1')}")

client.select(1)
print(f"  DB 1 key: {client.get('db1:key1')}")

# Swap databases 0 and 1
client.select(0)  # Go back to 0 for swap
swap_result = client.swapdb(0, 1)
print(f"\nDatabase swap result: {swap_result}")

print("After swap:")
print(f"  DB 0 key (was DB 1): {client.get('db1:key1')}")

client.select(1)
print(f"  DB 1 key (was DB 0): {client.get('db0:key1')}")

# Flush operations
print("\n=== Flush Operations ===")
client.select(0)
print(f"DB 0 size before flush: {client.dbsize()}")

# Flush current database
client.flushdb()
print(f"DB 0 size after flushdb: {client.dbsize()}")

client.select(1)
print(f"DB 1 size (untouched): {client.dbsize()}")

# Add some data back and test flushall
client.select(0)
client.set('test_key', 'test_value')
client.select(1) 
client.set('another_key', 'another_value')

print(f"\nBefore flushall - DB 0: {client.dbsize()}")
client.select(0)
print(f"Before flushall - DB 0: {client.dbsize()}")

# Flush all databases
client.flushall()
print(f"After flushall - DB 0: {client.dbsize()}")
client.select(1)
print(f"After flushall - DB 1: {client.dbsize()}")

Client Management

import fakeredis
import threading
import time

# Create multiple client connections
clients = []
for i in range(3):
    client = fakeredis.FakeRedis()
    client.client_setname(f'client_{i}')
    clients.append(client)

# Main client for monitoring
monitor_client = fakeredis.FakeRedis()
monitor_client.client_setname('monitor_client')

print("=== Client Information ===")

# Get client list
client_list = monitor_client.client_list()
print("Connected clients:")
print(client_list)

# Get current client info
client_info = monitor_client.client_info()
print(f"\nMonitor client info:")
for key, value in client_info.items():
    print(f"  {key}: {value}")

# Get client ID
client_id = monitor_client.client_id()
print(f"\nMonitor client ID: {client_id}")

# Get client name
client_name = monitor_client.client_getname()
if client_name:
    print(f"Monitor client name: {client_name.decode()}")

# Simulate some client activity
def client_activity(client, client_name, duration):
    """Simulate client activity"""
    start_time = time.time()
    counter = 0
    
    while time.time() - start_time < duration:
        # Perform various operations
        client.set(f'{client_name}:counter', str(counter))
        client.get(f'{client_name}:counter')
        client.incr(f'{client_name}:activity')
        
        counter += 1
        time.sleep(0.1)

print("\n=== Starting Client Activity ===")

# Start client activity threads
threads = []
for i, client in enumerate(clients):
    thread = threading.Thread(
        target=client_activity,
        args=(client, f'client_{i}', 2)  # 2 seconds of activity
    )
    threads.append(thread)
    thread.start()

# Monitor clients during activity
time.sleep(1)
print(f"\nClients during activity:")
active_client_list = monitor_client.client_list()
print(active_client_list)

# Wait for activity to complete
for thread in threads:
    thread.join()

print(f"\nClients after activity:")
final_client_list = monitor_client.client_list()
print(final_client_list)

Memory Analysis

import fakeredis

client = fakeredis.FakeRedis()

# Create data with different memory characteristics
print("=== Creating Test Data ===")

# Small strings
for i in range(100):
    client.set(f'small:{i}', f'value_{i}')

# Large strings  
large_data = 'x' * 10000
for i in range(10):
    client.set(f'large:{i}', large_data)

# Complex data structures
for i in range(20):
    # Hashes
    client.hset(f'user:{i}', mapping={
        'name': f'User {i}',
        'email': f'user{i}@example.com',
        'age': str(20 + i),
        'bio': 'A user with a longer biography ' * 10
    })
    
    # Lists
    client.lpush(f'list:{i}', *[f'item_{j}' for j in range(50)])
    
    # Sets
    client.sadd(f'set:{i}', *[f'member_{j}' for j in range(30)])

# Analyze memory usage
print("\n=== Memory Analysis ===")

# Get overall memory stats
try:
    memory_stats = client.memory_stats()
    print("Memory statistics:")
    for key, value in memory_stats.items():
        if isinstance(value, (int, float)):
            if 'bytes' in key:
                print(f"  {key}: {value:,} bytes")
            else:
                print(f"  {key}: {value}")
except:
    print("Memory stats not available in this Redis version")

# Analyze specific key memory usage
print("\n=== Key Memory Usage ===")
test_keys = [
    'small:0',
    'large:0', 
    'user:0',
    'list:0',
    'set:0'
]

for key in test_keys:
    try:
        usage = client.memory_usage(key)
        if usage is not None:
            print(f"  {key}: {usage} bytes")
        else:
            print(f"  {key}: Key not found")
    except:
        print(f"  {key}: Memory usage not available")

# Get database size
total_keys = client.dbsize()
print(f"\nTotal keys in database: {total_keys}")

Server Time and Persistence

import fakeredis
import time

client = fakeredis.FakeRedis()

# Get server time
print("=== Server Time ===")
server_time = client.time()
server_timestamp = int(server_time[0])
server_microseconds = int(server_time[1])

print(f"Server timestamp: {server_timestamp}")
print(f"Server microseconds: {server_microseconds}")

# Convert to readable format
import datetime
readable_time = datetime.datetime.fromtimestamp(server_timestamp)
print(f"Readable time: {readable_time}")

# Add some data for persistence testing
print("\n=== Adding Data for Persistence Test ===")
test_data = {
    'persistent:config': 'important_setting',
    'persistent:counter': '12345',
    'persistent:timestamp': str(int(time.time()))
}

for key, value in test_data.items():
    client.set(key, value)
    print(f"Set {key}: {value}")

# Test save operations
print("\n=== Persistence Operations ===")

# Synchronous save
try:
    save_result = client.save()
    print(f"Synchronous save: {save_result}")
except Exception as e:
    print(f"Save error: {e}")

# Background save
try:
    bgsave_result = client.bgsave()
    print(f"Background save: {bgsave_result}")
except Exception as e:
    print(f"Background save error: {e}")

# Get last save time
try:
    last_save = client.lastsave()
    last_save_time = datetime.datetime.fromtimestamp(last_save)
    print(f"Last save time: {last_save_time}")
except Exception as e:
    print(f"Last save error: {e}")

Command Information and Debugging

import fakeredis

client = fakeredis.FakeRedis()

print("=== Command Information ===")

# Get command count
try:
    cmd_count = client.command_count()
    print(f"Total commands available: {cmd_count}")
except:
    print("Command count not available")

# Get information about specific commands
test_commands = ['SET', 'GET', 'HSET', 'LPUSH', 'ZADD']
try:
    cmd_info = client.command_info(*test_commands)
    print(f"\nCommand information:")
    for i, cmd_name in enumerate(test_commands):
        if i < len(cmd_info) and cmd_info[i]:
            info = cmd_info[i]
            print(f"  {cmd_name}:")
            if len(info) >= 2:
                print(f"    Arity: {info[1]}")  # Number of arguments
            if len(info) >= 3:
                print(f"    Flags: {info[2]}")  # Command flags
        else:
            print(f"  {cmd_name}: No information available")
except Exception as e:
    print(f"Command info error: {e}")

# Test debug object (if available)
print("\n=== Debug Information ===")
client.set('debug_key', 'debug_value')
client.lpush('debug_list', 'item1', 'item2', 'item3')

debug_keys = ['debug_key', 'debug_list']
for key in debug_keys:
    try:
        debug_info = client.debug_object(key)
        print(f"Debug info for {key}: {debug_info}")
    except Exception as e:
        print(f"Debug info for {key}: Not available ({e})")

# Slowlog operations (if supported)
print("\n=== Slowlog Operations ===")
try:
    # Get slowlog length
    slowlog_len = client.slowlog_len()
    print(f"Slowlog entries: {slowlog_len}")
    
    # Get slowlog entries
    if slowlog_len > 0:
        slowlog_entries = client.slowlog_get(5)  # Get last 5 entries
        print("Recent slow queries:")
        for entry in slowlog_entries:
            print(f"  ID: {entry.get('id', 'N/A')}")
            print(f"  Duration: {entry.get('duration', 'N/A')} microseconds")
            print(f"  Command: {entry.get('command', 'N/A')}")
            print(f"  Time: {entry.get('start_time', 'N/A')}")
            print()
    
    # Reset slowlog
    reset_result = client.slowlog_reset()
    print(f"Slowlog reset: {reset_result}")
    
except Exception as e:
    print(f"Slowlog operations not available: {e}")

Pattern: Health Check System

import fakeredis
import time
import json
from typing import Dict, Any, List
from dataclasses import dataclass

@dataclass
class HealthCheck:
    name: str
    status: str
    response_time: float
    message: str
    timestamp: int

class RedisHealthMonitor:
    def __init__(self, client: fakeredis.FakeRedis):
        self.client = client
    
    def perform_health_check(self) -> HealthCheck:
        """Perform comprehensive Redis health check"""
        start_time = time.time()
        
        try:
            # Test basic connectivity
            self.client.ping()
            
            # Test read/write operations
            test_key = f"health_check:{int(time.time())}"
            self.client.set(test_key, "health_test", ex=60)
            value = self.client.get(test_key)
            
            if value != b"health_test":
                raise Exception("Read/write test failed")
            
            # Clean up test key
            self.client.delete(test_key)
            
            response_time = (time.time() - start_time) * 1000  # ms
            
            return HealthCheck(
                name="redis_health_check",
                status="healthy",
                response_time=response_time,
                message="All checks passed",
                timestamp=int(time.time())
            )
            
        except Exception as e:
            response_time = (time.time() - start_time) * 1000
            return HealthCheck(
                name="redis_health_check", 
                status="unhealthy",
                response_time=response_time,
                message=f"Health check failed: {str(e)}",
                timestamp=int(time.time())
            )
    
    def get_system_metrics(self) -> Dict[str, Any]:
        """Gather comprehensive system metrics"""
        metrics = {}
        
        try:
            # Server information
            info = self.client.info()
            
            # Extract key metrics
            if 'server' in info:
                server_info = info['server']
                metrics['server'] = {
                    'redis_version': server_info.get('redis_version', 'unknown'),
                    'uptime_seconds': server_info.get('uptime_in_seconds', 0),
                    'connected_clients': server_info.get('connected_clients', 0)
                }
            
            if 'memory' in info:
                memory_info = info['memory']
                metrics['memory'] = {
                    'used_memory': memory_info.get('used_memory', 0),
                    'used_memory_human': memory_info.get('used_memory_human', '0B'),
                    'maxmemory': memory_info.get('maxmemory', 0)
                }
            
            if 'stats' in info:
                stats_info = info['stats']
                metrics['stats'] = {
                    'total_commands_processed': stats_info.get('total_commands_processed', 0),
                    'total_connections_received': stats_info.get('total_connections_received', 0),
                    'expired_keys': stats_info.get('expired_keys', 0),
                    'evicted_keys': stats_info.get('evicted_keys', 0)
                }
            
            if 'keyspace' in info:
                keyspace_info = info['keyspace']
                metrics['keyspace'] = {}
                for db, db_info in keyspace_info.items():
                    if isinstance(db_info, dict):
                        metrics['keyspace'][db] = {
                            'keys': db_info.get('keys', 0),
                            'expires': db_info.get('expires', 0)
                        }
            
            # Database size
            metrics['database'] = {
                'total_keys': self.client.dbsize()
            }
            
            # Client information
            client_info = self.client.client_info()
            metrics['client'] = {
                'client_id': client_info.get('id', 0),
                'client_name': client_info.get('name', ''),
                'database': client_info.get('db', 0)
            }
            
        except Exception as e:
            metrics['error'] = f"Failed to gather metrics: {str(e)}"
        
        return metrics
    
    def store_health_history(self, health_check: HealthCheck, max_history: int = 100):
        """Store health check results for trend analysis"""
        history_key = "health_check_history"
        
        # Store as JSON in a list
        health_data = {
            'name': health_check.name,
            'status': health_check.status,
            'response_time': health_check.response_time,
            'message': health_check.message,
            'timestamp': health_check.timestamp
        }
        
        # Add to list (most recent first)
        self.client.lpush(history_key, json.dumps(health_data))
        
        # Trim to max history
        self.client.ltrim(history_key, 0, max_history - 1)
    
    def get_health_history(self, limit: int = 10) -> List[HealthCheck]:
        """Retrieve recent health check history"""
        history_key = "health_check_history"
        
        try:
            history_data = self.client.lrange(history_key, 0, limit - 1)
            
            health_checks = []
            for data in history_data:
                health_dict = json.loads(data.decode())
                health_check = HealthCheck(
                    name=health_dict['name'],
                    status=health_dict['status'],
                    response_time=health_dict['response_time'],
                    message=health_dict['message'],
                    timestamp=health_dict['timestamp']
                )
                health_checks.append(health_check)
            
            return health_checks
            
        except Exception as e:
            print(f"Error retrieving health history: {e}")
            return []
    
    def get_health_summary(self) -> Dict[str, Any]:
        """Generate health summary with recent trends"""
        history = self.get_health_history(20)  # Last 20 checks
        
        if not history:
            return {"status": "unknown", "message": "No health history available"}
        
        # Calculate statistics
        healthy_count = sum(1 for check in history if check.status == "healthy")
        total_checks = len(history)
        success_rate = (healthy_count / total_checks) * 100
        
        recent_checks = history[:5]  # Last 5 checks
        avg_response_time = sum(check.response_time for check in recent_checks) / len(recent_checks)
        
        # Determine overall status
        if success_rate >= 95:
            overall_status = "healthy"
        elif success_rate >= 80:
            overall_status = "degraded"
        else:
            overall_status = "unhealthy"
        
        return {
            "overall_status": overall_status,
            "success_rate": round(success_rate, 2),
            "avg_response_time": round(avg_response_time, 2),
            "total_checks": total_checks,
            "healthy_checks": healthy_count,
            "last_check": {
                "status": history[0].status,
                "timestamp": history[0].timestamp,
                "message": history[0].message
            }
        }

# Usage example
client = fakeredis.FakeRedis()
health_monitor = RedisHealthMonitor(client)

print("=== Redis Health Monitoring ===")

# Perform several health checks
for i in range(5):
    print(f"\nPerforming health check {i+1}...")
    
    health_check = health_monitor.perform_health_check()
    print(f"Status: {health_check.status}")
    print(f"Response time: {health_check.response_time:.2f}ms")
    print(f"Message: {health_check.message}")
    
    # Store in history
    health_monitor.store_health_history(health_check)
    
    # Add some test data to make metrics more interesting
    if i == 2:
        client.mset({f'test_key_{j}': f'test_value_{j}' for j in range(50)})
    
    time.sleep(0.1)  # Brief pause between checks

# Get system metrics
print(f"\n=== System Metrics ===")
metrics = health_monitor.get_system_metrics()
print(json.dumps(metrics, indent=2, default=str))

# Get health summary
print(f"\n=== Health Summary ===")
summary = health_monitor.get_health_summary()
print(json.dumps(summary, indent=2))

# Show recent health history
print(f"\n=== Recent Health History ===")
recent_history = health_monitor.get_health_history(5)
for i, check in enumerate(recent_history):
    timestamp_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(check.timestamp))
    print(f"{i+1}. {timestamp_str}: {check.status} ({check.response_time:.1f}ms) - {check.message}")

Install with Tessl CLI

npx tessl i tessl/pypi-fakeredis

docs

bitmap-operations.md

core-clients.md

generic-operations.md

geospatial-operations.md

hash-operations.md

index.md

list-operations.md

lua-scripting.md

pubsub-operations.md

server-management.md

server-operations.md

set-operations.md

sorted-set-operations.md

stack-extensions.md

stream-operations.md

string-operations.md

transaction-operations.md

valkey-support.md

tile.json