CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aioredis

asyncio (PEP 3156) Redis support

Overall
score

98%

Overview
Eval results
Files

server-admin.mddocs/

Server Administration

Server management commands for Redis administration including configuration management, monitoring, persistence control, replication setup, cluster operations, and performance optimization. These commands enable comprehensive Redis server management and maintenance.

Capabilities

Server Information and Statistics

Commands for retrieving server information, statistics, and operational metrics.

async def info(section: Optional[str] = None) -> Dict[str, Any]:
    """
    Get Redis server information and statistics.
    
    Args:
        section: Specific info section ('server', 'memory', 'persistence', etc.)
        
    Returns:
        Dictionary of server information
    """

async def dbsize(self) -> int:
    """
    Get current database size (number of keys).
    
    Returns:
        Number of keys in current database
    """

async def lastsave(self) -> int:
    """
    Get Unix timestamp of last successful save.
    
    Returns:
        Unix timestamp of last BGSAVE
    """

async def time(self) -> Tuple[int, int]:
    """
    Get Redis server time.
    
    Returns:
        Tuple of (timestamp_seconds, microseconds)
    """

async def memory_stats(self) -> Dict[str, Any]:
    """
    Get detailed memory usage statistics.
    
    Returns:
        Dictionary of memory statistics
    """

async def memory_usage(key: str, samples: Optional[int] = None) -> Optional[int]:
    """
    Get memory usage of specific key.
    
    Args:
        key: Key name to analyze
        samples: Number of samples for estimation
        
    Returns:
        Memory usage in bytes
    """

async def memory_purge(self) -> bool:
    """
    Attempt to purge dirty pages for better memory reporting.
    
    Returns:
        True if successful
    """

Configuration Management

Server configuration retrieval and modification for runtime parameter tuning.

async def config_get(pattern: str = "*") -> Dict[str, str]:
    """
    Get Redis server configuration parameters.
    
    Args:
        pattern: Pattern to match configuration keys (* wildcards)
        
    Returns:
        Dictionary of configuration parameter name-value pairs
    """

async def config_set(name: str, value: str) -> bool:
    """
    Set Redis server configuration parameter.
    
    Args:
        name: Configuration parameter name
        value: New parameter value
        
    Returns:
        True if parameter was set successfully
    """

async def config_resetstat(self) -> bool:
    """
    Reset Redis runtime statistics counters.
    
    Returns:
        True if statistics were reset
    """

async def config_rewrite(self) -> bool:
    """
    Rewrite Redis configuration file with current settings.
    
    Returns:
        True if configuration file was rewritten
    """

Persistence Operations

Commands for managing Redis data persistence and backup operations.

async def save(self) -> bool:
    """
    Synchronously save dataset to disk (blocks server).
    
    Returns:
        True if save completed successfully
    """

async def bgsave(self) -> bool:
    """
    Asynchronously save dataset to disk in background.
    
    Returns:
        True if background save started
    """

async def bgrewriteaof(self) -> bool:
    """
    Asynchronously rewrite append-only file in background.
    
    Returns:
        True if AOF rewrite started
    """

Database Management

Commands for managing Redis databases, including flushing and database switching.

async def flushdb(asynchronous: bool = False) -> bool:
    """
    Remove all keys from current database.
    
    Args:
        asynchronous: Perform flush asynchronously
        
    Returns:
        True if database was flushed
    """

async def flushall(asynchronous: bool = False) -> bool:
    """
    Remove all keys from all databases.
    
    Args:
        asynchronous: Perform flush asynchronously
        
    Returns:
        True if all databases were flushed
    """

async def swapdb(first: int, second: int) -> bool:
    """
    Swap two Redis databases.
    
    Args:
        first: First database number
        second: Second database number
        
    Returns:
        True if databases were swapped
    """

async def select(db: int) -> bool:
    """
    Select Redis database by number.
    
    Args:
        db: Database number (0-15 typically)
        
    Returns:
        True if database was selected
    """

Client Management

Commands for managing and monitoring Redis client connections.

async def client_list(_type: Optional[str] = None) -> List[Dict[str, Any]]:
    """
    Get list of connected clients and their information.
    
    Args:
        _type: Filter by client type ('normal', 'master', 'replica', 'pubsub')
        
    Returns:
        List of client information dictionaries
    """

async def client_id(self) -> int:
    """
    Get current connection ID.
    
    Returns:
        Unique client connection ID
    """

async def client_getname(self) -> Optional[str]:
    """
    Get current connection name.
    
    Returns:
        Client name or None if not set
    """

async def client_setname(name: str) -> bool:
    """
    Set current connection name.
    
    Args:
        name: Name to assign to connection
        
    Returns:
        True if name was set
    """

async def client_kill(address: str) -> bool:
    """
    Kill client connection by address.
    
    Args:
        address: Client address (ip:port format)
        
    Returns:
        True if client was killed
    """

async def client_kill_filter(**kwargs) -> int:
    """
    Kill clients matching filter criteria.
    
    Args:
        kwargs: Filter criteria (type, addr, skipme, id, etc.)
        
    Returns:
        Number of clients killed
    """

async def client_unblock(client_id: int, error: bool = False) -> bool:
    """
    Unblock client blocked on blocking operation.
    
    Args:
        client_id: Client ID to unblock
        error: Whether to unblock with error
        
    Returns:
        True if client was unblocked
    """

async def client_pause(timeout: int) -> bool:
    """
    Pause all clients for specified time.
    
    Args:
        timeout: Pause duration in milliseconds
        
    Returns:
        True if clients were paused
    """

Slowlog Management

Commands for monitoring and managing the Redis slowlog for performance analysis.

async def slowlog_get(num: Optional[int] = None) -> List[Dict[str, Any]]:
    """
    Get slowlog entries for performance analysis.
    
    Args:
        num: Maximum number of entries to return
        
    Returns:
        List of slowlog entry dictionaries
    """

async def slowlog_len(self) -> int:
    """
    Get current slowlog length.
    
    Returns:
        Number of entries in slowlog
    """

async def slowlog_reset(self) -> bool:
    """
    Clear all slowlog entries.
    
    Returns:
        True if slowlog was cleared
    """

Replication Management

Commands for managing Redis master-slave replication relationships.

async def slaveof(host: Optional[str] = None, port: Optional[int] = None) -> bool:
    """
    Make server a slave of another Redis server.
    
    Args:
        host: Master server hostname (None to stop replication)
        port: Master server port
        
    Returns:
        True if replication was configured
    """

async def wait(num_replicas: int, timeout: int) -> int:
    """
    Wait for specified number of replicas to acknowledge writes.
    
    Args:
        num_replicas: Minimum number of replicas to wait for
        timeout: Timeout in milliseconds
        
    Returns:
        Number of replicas that acknowledged
    """

Server Control

Commands for server lifecycle management and emergency operations.

async def shutdown(save: bool = False, nosave: bool = False) -> None:
    """
    Shutdown Redis server.
    
    Args:
        save: Save dataset before shutdown
        nosave: Don't save dataset before shutdown
        
    Note:
        This command doesn't return as server shuts down
    """

async def debug_object(key: str) -> Dict[str, Any]:
    """
    Get debugging information about key.
    
    Args:
        key: Key to debug
        
    Returns:
        Debug information dictionary
    """

ACL (Access Control List) Management

Commands for managing Redis 6+ access control and user authentication.

async def acl_list(self) -> List[str]:
    """
    Get list of ACL rules.
    
    Returns:
        List of ACL rule strings
    """

async def acl_users(self) -> List[str]:
    """
    Get list of ACL usernames.
    
    Returns:
        List of username strings
    """

async def acl_whoami(self) -> str:
    """
    Get current ACL username.
    
    Returns:
        Current username
    """

async def acl_getuser(username: str) -> Dict[str, Any]:
    """
    Get ACL information for specific user.
    
    Args:
        username: Username to query
        
    Returns:
        Dictionary of user ACL information
    """

async def acl_setuser(username: str, **kwargs) -> bool:
    """
    Create or modify ACL user.
    
    Args:
        username: Username to modify
        kwargs: ACL parameters (enabled, passwords, commands, etc.)
        
    Returns:
        True if user was modified
    """

async def acl_deluser(username: str) -> int:
    """
    Delete ACL user.
    
    Args:
        username: Username to delete
        
    Returns:
        Number of users deleted
    """

async def acl_cat(category: Optional[str] = None) -> List[str]:
    """
    List ACL command categories.
    
    Args:
        category: Specific category to list commands for
        
    Returns:
        List of categories or commands in category
    """

async def acl_genpass(self) -> str:
    """
    Generate secure password for ACL user.
    
    Returns:
        Generated password string
    """

async def acl_log(count: Optional[int] = None) -> List[Dict[str, Any]]:
    """
    Get ACL log entries.
    
    Args:
        count: Maximum number of entries to return
        
    Returns:
        List of ACL log entry dictionaries
    """

async def acl_log_reset(self) -> bool:
    """
    Clear ACL log.
    
    Returns:
        True if log was cleared
    """

async def acl_load(self) -> bool:
    """
    Reload ACL from configuration file.
    
    Returns:
        True if ACL was reloaded
    """

async def acl_save(self) -> bool:
    """
    Save current ACL to configuration file.
    
    Returns:
        True if ACL was saved
    """

Module Management

Commands for managing Redis modules (Redis 4+ feature).

async def module_list(self) -> List[Dict[str, Any]]:
    """
    Get list of loaded Redis modules.
    
    Returns:
        List of module information dictionaries
    """

async def module_load(path: str) -> bool:
    """
    Load Redis module from file.
    
    Args:
        path: Path to module file
        
    Returns:
        True if module was loaded
    """

async def module_unload(name: str) -> bool:
    """
    Unload Redis module.
    
    Args:
        name: Module name to unload
        
    Returns:
        True if module was unloaded
    """

Cluster Management

Commands for Redis Cluster operations and management.

async def cluster(cluster_arg: str, *args: Any) -> Any:
    """
    Execute Redis Cluster command.
    
    Args:
        cluster_arg: Cluster subcommand ('nodes', 'info', 'slots', etc.)
        args: Additional command arguments
        
    Returns:
        Cluster command result
    """

Usage Examples

Server Monitoring and Information

async def server_monitoring_examples():
    redis = aioredis.Redis(decode_responses=True)
    
    # Get comprehensive server info
    info = await redis.info()
    print(f"Redis version: {info['redis_version']}")
    print(f"Used memory: {info['used_memory_human']}")
    print(f"Connected clients: {info['connected_clients']}")
    print(f"Uptime: {info['uptime_in_seconds']} seconds")
    
    # Get specific info sections
    memory_info = await redis.info('memory')
    stats_info = await redis.info('stats')
    replication_info = await redis.info('replication')
    
    # Database size and timing
    db_size = await redis.dbsize()
    print(f"Keys in current database: {db_size}")
    
    server_time = await redis.time()
    print(f"Server time: {server_time[0]}.{server_time[1]}")
    
    # Memory analysis
    memory_stats = await redis.memory_stats()
    print(f"Peak memory: {memory_stats.get('peak.allocated', 'N/A')}")
    
    # Analyze specific key memory usage
    await redis.set('large_key', 'x' * 10000)
    key_memory = await redis.memory_usage('large_key')
    print(f"Memory used by large_key: {key_memory} bytes")
    
    await redis.aclose()

Configuration Management

async def config_management_examples():
    redis = aioredis.Redis(decode_responses=True)
    
    # Get all configuration
    all_config = await redis.config_get('*')
    print(f"Total config parameters: {len(all_config)}")
    
    # Get specific configuration patterns
    memory_config = await redis.config_get('*memory*')
    timeout_config = await redis.config_get('*timeout*')
    
    print("Memory-related settings:")
    for key, value in memory_config.items():
        print(f"  {key}: {value}")
    
    # Modify configuration (be careful in production!)
    original_timeout = await redis.config_get('timeout')
    
    # Set new timeout (example - adjust as needed)
    await redis.config_set('timeout', '300')
    
    # Verify change
    new_timeout = await redis.config_get('timeout')
    print(f"Timeout changed from {original_timeout} to {new_timeout}")
    
    # Reset statistics
    await redis.config_resetstat()
    print("Runtime statistics reset")
    
    # Rewrite config file (if permissions allow)
    try:
        await redis.config_rewrite()
        print("Configuration file updated")
    except Exception as e:
        print(f"Could not rewrite config: {e}")
    
    await redis.aclose()

Client Connection Management

async def client_management_examples():
    redis = aioredis.Redis(decode_responses=True)
    
    # Set client name for identification
    await redis.client_setname('monitoring_client')
    
    # Get current client info
    client_id = await redis.client_id()
    client_name = await redis.client_getname()
    print(f"Client ID: {client_id}, Name: {client_name}")
    
    # List all connected clients
    clients = await redis.client_list()
    print(f"Total connected clients: {len(clients)}")
    
    for client in clients[:5]:  # Show first 5 clients
        print(f"Client {client.get('id')}: {client.get('addr')} "
              f"({client.get('cmd', 'idle')})")
    
    # Filter clients by type
    normal_clients = await redis.client_list(_type='normal')
    print(f"Normal clients: {len(normal_clients)}")
    
    # Create another connection to demonstrate killing
    redis2 = aioredis.Redis(decode_responses=True)
    await redis2.client_setname('test_client')
    
    # Find and kill the test client
    clients = await redis.client_list()
    for client in clients:
        if client.get('name') == 'test_client':
            killed = await redis.client_kill(client['addr'])
            print(f"Killed test client: {killed}")
            break
    
    # Pause all clients briefly (careful in production!)
    await redis.client_pause(100)  # 100ms pause
    print("All clients paused briefly")
    
    await redis.aclose()
    try:
        await redis2.aclose()
    except:
        pass  # Already killed

Performance Monitoring

async def performance_monitoring_examples():
    redis = aioredis.Redis(decode_responses=True)
    
    # Generate some slow operations for demo
    import time
    
    # Set a large value to potentially trigger slowlog
    large_value = 'x' * 1000000  # 1MB string
    await redis.set('large_key', large_value)
    
    # Check slowlog
    slowlog_entries = await redis.slowlog_get(10)
    print(f"Slowlog entries: {len(slowlog_entries)}")
    
    for entry in slowlog_entries:
        timestamp = entry[1]
        duration_microseconds = entry[2]
        command = ' '.join(str(arg) for arg in entry[3][:3])  # First 3 args
        print(f"Slow command: {command} - {duration_microseconds}μs at {timestamp}")
    
    # Get slowlog length
    slowlog_len = await redis.slowlog_len()
    print(f"Total slowlog entries: {slowlog_len}")
    
    # Performance analysis with info
    info = await redis.info('stats')
    print(f"Total commands processed: {info.get('total_commands_processed')}")
    print(f"Instantaneous ops/sec: {info.get('instantaneous_ops_per_sec')}")
    print(f"Keyspace hits: {info.get('keyspace_hits')}")
    print(f"Keyspace misses: {info.get('keyspace_misses')}")
    
    hit_rate = info.get('keyspace_hits', 0) / max(
        info.get('keyspace_hits', 0) + info.get('keyspace_misses', 0), 1
    )
    print(f"Cache hit rate: {hit_rate:.2%}")
    
    # Memory efficiency
    memory_info = await redis.info('memory')
    used_memory = memory_info.get('used_memory', 0)
    used_memory_peak = memory_info.get('used_memory_peak', 0)
    print(f"Memory usage: {used_memory_peak - used_memory} bytes below peak")
    
    await redis.aclose()

Backup and Persistence

async def backup_persistence_examples():
    redis = aioredis.Redis(decode_responses=True)
    
    # Add some test data
    await redis.mset({
        'backup_test:1': 'value1',
        'backup_test:2': 'value2', 
        'backup_test:3': 'value3'
    })
    
    # Check last save time
    last_save = await redis.lastsave()
    print(f"Last save timestamp: {last_save}")
    
    # Trigger background save
    bgsave_started = await redis.bgsave()
    if bgsave_started:
        print("Background save started")
        
        # Monitor save progress
        while True:
            info = await redis.info('persistence')
            if info.get('rdb_bgsave_in_progress') == '0':
                print("Background save completed")
                break
            await asyncio.sleep(0.1)
    
    # Trigger AOF rewrite if AOF is enabled
    try:
        aof_rewrite_started = await redis.bgrewriteaof()
        if aof_rewrite_started:
            print("AOF rewrite started")
    except Exception as e:
        print(f"AOF rewrite not available: {e}")
    
    # Get persistence info
    persistence_info = await redis.info('persistence')
    print("Persistence configuration:")
    for key, value in persistence_info.items():
        if 'rdb' in key or 'aof' in key:
            print(f"  {key}: {value}")
    
    # Synchronous save (blocks server - use carefully!)
    # await redis.save()  # Uncomment only if safe to block
    
    await redis.aclose()

ACL and Security Management

async def acl_security_examples():
    redis = aioredis.Redis(decode_responses=True)
    
    # Check if ACL is supported (Redis 6+)
    try:
        current_user = await redis.acl_whoami()
        print(f"Current user: {current_user}")
        
        # List all users
        users = await redis.acl_users()
        print(f"ACL users: {users}")
        
        # Get user details
        for user in users[:3]:  # First 3 users
            user_info = await redis.acl_getuser(user)
            print(f"User {user}:")
            print(f"  Enabled: {user_info.get('flags', [])}")
            print(f"  Commands: {user_info.get('commands', 'N/A')}")
        
        # List ACL categories
        categories = await redis.acl_cat()
        print(f"ACL categories: {categories}")
        
        # Generate secure password
        secure_password = await redis.acl_genpass()
        print(f"Generated secure password: {secure_password[:8]}...")
        
        # Create test user (be very careful in production!)
        # await redis.acl_setuser(
        #     'testuser',
        #     enabled=True,
        #     passwords=[secure_password],
        #     commands=['+get', '+set'],
        #     keys=['test:*']
        # )
        # print("Test user created")
        
        # Check ACL log
        acl_log = await redis.acl_log(5)
        if acl_log:
            print(f"Recent ACL events: {len(acl_log)}")
            for event in acl_log:
                print(f"  {event}")
        
    except Exception as e:
        print(f"ACL commands not supported: {e}")
    
    await redis.aclose()

Replication and High Availability

async def replication_examples():
    redis = aioredis.Redis(decode_responses=True)
    
    # Get replication information
    repl_info = await redis.info('replication')
    print("Replication status:")
    print(f"  Role: {repl_info.get('role')}")
    print(f"  Connected slaves: {repl_info.get('connected_slaves', 0)}")
    
    if repl_info.get('role') == 'master':
        print("This is a master server")
        
        # If there are slaves, wait for replication
        if int(repl_info.get('connected_slaves', 0)) > 0:
            # Write test data
            await redis.set('replication_test', 'test_value')
            
            # Wait for at least 1 replica to acknowledge
            replicas_acked = await redis.wait(1, 5000)  # 5 second timeout
            print(f"Replicas that acknowledged write: {replicas_acked}")
    
    elif repl_info.get('role') == 'slave':
        print("This is a slave server")
        master_host = repl_info.get('master_host')
        master_port = repl_info.get('master_port')
        print(f"  Master: {master_host}:{master_port}")
        print(f"  Link status: {repl_info.get('master_link_status')}")
        
        # Promote to master (dangerous operation!)
        # await redis.slaveof()  # No arguments = stop being slave
        # print("Promoted to master")
    
    else:
        print("Setting up as master")
        # This server is not part of replication
        # To make it a slave: await redis.slaveof('master_host', 6379)
    
    await redis.aclose()

Install with Tessl CLI

npx tessl i tessl/pypi-aioredis

docs

advanced-features.md

basic-operations.md

connection-management.md

data-structures.md

index.md

server-admin.md

tile.json