asyncio (PEP 3156) Redis support
Overall
score
98%
Server management commands for Redis administration including configuration management, monitoring, persistence control, replication setup, cluster operations, and performance optimization. These commands enable comprehensive Redis server management and maintenance.
Commands for retrieving server information, statistics, and operational metrics.
async def info(section: Optional[str] = None) -> Dict[str, Any]:
"""
Get Redis server information and statistics.
Args:
section: Specific info section ('server', 'memory', 'persistence', etc.)
Returns:
Dictionary of server information
"""
async def dbsize(self) -> int:
"""
Get current database size (number of keys).
Returns:
Number of keys in current database
"""
async def lastsave(self) -> int:
"""
Get Unix timestamp of last successful save.
Returns:
Unix timestamp of last BGSAVE
"""
async def time(self) -> Tuple[int, int]:
"""
Get Redis server time.
Returns:
Tuple of (timestamp_seconds, microseconds)
"""
async def memory_stats(self) -> Dict[str, Any]:
"""
Get detailed memory usage statistics.
Returns:
Dictionary of memory statistics
"""
async def memory_usage(key: str, samples: Optional[int] = None) -> Optional[int]:
"""
Get memory usage of specific key.
Args:
key: Key name to analyze
samples: Number of samples for estimation
Returns:
Memory usage in bytes
"""
async def memory_purge(self) -> bool:
"""
Attempt to purge dirty pages for better memory reporting.
Returns:
True if successful
"""Server configuration retrieval and modification for runtime parameter tuning.
async def config_get(pattern: str = "*") -> Dict[str, str]:
"""
Get Redis server configuration parameters.
Args:
pattern: Pattern to match configuration keys (* wildcards)
Returns:
Dictionary of configuration parameter name-value pairs
"""
async def config_set(name: str, value: str) -> bool:
"""
Set Redis server configuration parameter.
Args:
name: Configuration parameter name
value: New parameter value
Returns:
True if parameter was set successfully
"""
async def config_resetstat(self) -> bool:
"""
Reset Redis runtime statistics counters.
Returns:
True if statistics were reset
"""
async def config_rewrite(self) -> bool:
"""
Rewrite Redis configuration file with current settings.
Returns:
True if configuration file was rewritten
"""Commands for managing Redis data persistence and backup operations.
async def save(self) -> bool:
"""
Synchronously save dataset to disk (blocks server).
Returns:
True if save completed successfully
"""
async def bgsave(self) -> bool:
"""
Asynchronously save dataset to disk in background.
Returns:
True if background save started
"""
async def bgrewriteaof(self) -> bool:
"""
Asynchronously rewrite append-only file in background.
Returns:
True if AOF rewrite started
"""Commands for managing Redis databases, including flushing and database switching.
async def flushdb(asynchronous: bool = False) -> bool:
"""
Remove all keys from current database.
Args:
asynchronous: Perform flush asynchronously
Returns:
True if database was flushed
"""
async def flushall(asynchronous: bool = False) -> bool:
"""
Remove all keys from all databases.
Args:
asynchronous: Perform flush asynchronously
Returns:
True if all databases were flushed
"""
async def swapdb(first: int, second: int) -> bool:
"""
Swap two Redis databases.
Args:
first: First database number
second: Second database number
Returns:
True if databases were swapped
"""
async def select(db: int) -> bool:
"""
Select Redis database by number.
Args:
db: Database number (0-15 typically)
Returns:
True if database was selected
"""Commands for managing and monitoring Redis client connections.
async def client_list(_type: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Get list of connected clients and their information.
Args:
_type: Filter by client type ('normal', 'master', 'replica', 'pubsub')
Returns:
List of client information dictionaries
"""
async def client_id(self) -> int:
"""
Get current connection ID.
Returns:
Unique client connection ID
"""
async def client_getname(self) -> Optional[str]:
"""
Get current connection name.
Returns:
Client name or None if not set
"""
async def client_setname(name: str) -> bool:
"""
Set current connection name.
Args:
name: Name to assign to connection
Returns:
True if name was set
"""
async def client_kill(address: str) -> bool:
"""
Kill client connection by address.
Args:
address: Client address (ip:port format)
Returns:
True if client was killed
"""
async def client_kill_filter(**kwargs) -> int:
"""
Kill clients matching filter criteria.
Args:
kwargs: Filter criteria (type, addr, skipme, id, etc.)
Returns:
Number of clients killed
"""
async def client_unblock(client_id: int, error: bool = False) -> bool:
"""
Unblock client blocked on blocking operation.
Args:
client_id: Client ID to unblock
error: Whether to unblock with error
Returns:
True if client was unblocked
"""
async def client_pause(timeout: int) -> bool:
"""
Pause all clients for specified time.
Args:
timeout: Pause duration in milliseconds
Returns:
True if clients were paused
"""Commands for monitoring and managing the Redis slowlog for performance analysis.
async def slowlog_get(num: Optional[int] = None) -> List[Dict[str, Any]]:
"""
Get slowlog entries for performance analysis.
Args:
num: Maximum number of entries to return
Returns:
List of slowlog entry dictionaries
"""
async def slowlog_len(self) -> int:
"""
Get current slowlog length.
Returns:
Number of entries in slowlog
"""
async def slowlog_reset(self) -> bool:
"""
Clear all slowlog entries.
Returns:
True if slowlog was cleared
"""Commands for managing Redis master-slave replication relationships.
async def slaveof(host: Optional[str] = None, port: Optional[int] = None) -> bool:
"""
Make server a slave of another Redis server.
Args:
host: Master server hostname (None to stop replication)
port: Master server port
Returns:
True if replication was configured
"""
async def wait(num_replicas: int, timeout: int) -> int:
"""
Wait for specified number of replicas to acknowledge writes.
Args:
num_replicas: Minimum number of replicas to wait for
timeout: Timeout in milliseconds
Returns:
Number of replicas that acknowledged
"""Commands for server lifecycle management and emergency operations.
async def shutdown(save: bool = False, nosave: bool = False) -> None:
"""
Shutdown Redis server.
Args:
save: Save dataset before shutdown
nosave: Don't save dataset before shutdown
Note:
This command doesn't return as server shuts down
"""
async def debug_object(key: str) -> Dict[str, Any]:
"""
Get debugging information about key.
Args:
key: Key to debug
Returns:
Debug information dictionary
"""Commands for managing Redis 6+ access control and user authentication.
async def acl_list(self) -> List[str]:
"""
Get list of ACL rules.
Returns:
List of ACL rule strings
"""
async def acl_users(self) -> List[str]:
"""
Get list of ACL usernames.
Returns:
List of username strings
"""
async def acl_whoami(self) -> str:
"""
Get current ACL username.
Returns:
Current username
"""
async def acl_getuser(username: str) -> Dict[str, Any]:
"""
Get ACL information for specific user.
Args:
username: Username to query
Returns:
Dictionary of user ACL information
"""
async def acl_setuser(username: str, **kwargs) -> bool:
"""
Create or modify ACL user.
Args:
username: Username to modify
kwargs: ACL parameters (enabled, passwords, commands, etc.)
Returns:
True if user was modified
"""
async def acl_deluser(username: str) -> int:
"""
Delete ACL user.
Args:
username: Username to delete
Returns:
Number of users deleted
"""
async def acl_cat(category: Optional[str] = None) -> List[str]:
"""
List ACL command categories.
Args:
category: Specific category to list commands for
Returns:
List of categories or commands in category
"""
async def acl_genpass(self) -> str:
"""
Generate secure password for ACL user.
Returns:
Generated password string
"""
async def acl_log(count: Optional[int] = None) -> List[Dict[str, Any]]:
"""
Get ACL log entries.
Args:
count: Maximum number of entries to return
Returns:
List of ACL log entry dictionaries
"""
async def acl_log_reset(self) -> bool:
"""
Clear ACL log.
Returns:
True if log was cleared
"""
async def acl_load(self) -> bool:
"""
Reload ACL from configuration file.
Returns:
True if ACL was reloaded
"""
async def acl_save(self) -> bool:
"""
Save current ACL to configuration file.
Returns:
True if ACL was saved
"""Commands for managing Redis modules (Redis 4+ feature).
async def module_list(self) -> List[Dict[str, Any]]:
"""
Get list of loaded Redis modules.
Returns:
List of module information dictionaries
"""
async def module_load(path: str) -> bool:
"""
Load Redis module from file.
Args:
path: Path to module file
Returns:
True if module was loaded
"""
async def module_unload(name: str) -> bool:
"""
Unload Redis module.
Args:
name: Module name to unload
Returns:
True if module was unloaded
"""Commands for Redis Cluster operations and management.
async def cluster(cluster_arg: str, *args: Any) -> Any:
"""
Execute Redis Cluster command.
Args:
cluster_arg: Cluster subcommand ('nodes', 'info', 'slots', etc.)
args: Additional command arguments
Returns:
Cluster command result
"""async def server_monitoring_examples():
redis = aioredis.Redis(decode_responses=True)
# Get comprehensive server info
info = await redis.info()
print(f"Redis version: {info['redis_version']}")
print(f"Used memory: {info['used_memory_human']}")
print(f"Connected clients: {info['connected_clients']}")
print(f"Uptime: {info['uptime_in_seconds']} seconds")
# Get specific info sections
memory_info = await redis.info('memory')
stats_info = await redis.info('stats')
replication_info = await redis.info('replication')
# Database size and timing
db_size = await redis.dbsize()
print(f"Keys in current database: {db_size}")
server_time = await redis.time()
print(f"Server time: {server_time[0]}.{server_time[1]}")
# Memory analysis
memory_stats = await redis.memory_stats()
print(f"Peak memory: {memory_stats.get('peak.allocated', 'N/A')}")
# Analyze specific key memory usage
await redis.set('large_key', 'x' * 10000)
key_memory = await redis.memory_usage('large_key')
print(f"Memory used by large_key: {key_memory} bytes")
await redis.aclose()async def config_management_examples():
redis = aioredis.Redis(decode_responses=True)
# Get all configuration
all_config = await redis.config_get('*')
print(f"Total config parameters: {len(all_config)}")
# Get specific configuration patterns
memory_config = await redis.config_get('*memory*')
timeout_config = await redis.config_get('*timeout*')
print("Memory-related settings:")
for key, value in memory_config.items():
print(f" {key}: {value}")
# Modify configuration (be careful in production!)
original_timeout = await redis.config_get('timeout')
# Set new timeout (example - adjust as needed)
await redis.config_set('timeout', '300')
# Verify change
new_timeout = await redis.config_get('timeout')
print(f"Timeout changed from {original_timeout} to {new_timeout}")
# Reset statistics
await redis.config_resetstat()
print("Runtime statistics reset")
# Rewrite config file (if permissions allow)
try:
await redis.config_rewrite()
print("Configuration file updated")
except Exception as e:
print(f"Could not rewrite config: {e}")
await redis.aclose()async def client_management_examples():
redis = aioredis.Redis(decode_responses=True)
# Set client name for identification
await redis.client_setname('monitoring_client')
# Get current client info
client_id = await redis.client_id()
client_name = await redis.client_getname()
print(f"Client ID: {client_id}, Name: {client_name}")
# List all connected clients
clients = await redis.client_list()
print(f"Total connected clients: {len(clients)}")
for client in clients[:5]: # Show first 5 clients
print(f"Client {client.get('id')}: {client.get('addr')} "
f"({client.get('cmd', 'idle')})")
# Filter clients by type
normal_clients = await redis.client_list(_type='normal')
print(f"Normal clients: {len(normal_clients)}")
# Create another connection to demonstrate killing
redis2 = aioredis.Redis(decode_responses=True)
await redis2.client_setname('test_client')
# Find and kill the test client
clients = await redis.client_list()
for client in clients:
if client.get('name') == 'test_client':
killed = await redis.client_kill(client['addr'])
print(f"Killed test client: {killed}")
break
# Pause all clients briefly (careful in production!)
await redis.client_pause(100) # 100ms pause
print("All clients paused briefly")
await redis.aclose()
try:
await redis2.aclose()
except:
pass # Already killedasync def performance_monitoring_examples():
redis = aioredis.Redis(decode_responses=True)
# Generate some slow operations for demo
import time
# Set a large value to potentially trigger slowlog
large_value = 'x' * 1000000 # 1MB string
await redis.set('large_key', large_value)
# Check slowlog
slowlog_entries = await redis.slowlog_get(10)
print(f"Slowlog entries: {len(slowlog_entries)}")
for entry in slowlog_entries:
timestamp = entry[1]
duration_microseconds = entry[2]
command = ' '.join(str(arg) for arg in entry[3][:3]) # First 3 args
print(f"Slow command: {command} - {duration_microseconds}μs at {timestamp}")
# Get slowlog length
slowlog_len = await redis.slowlog_len()
print(f"Total slowlog entries: {slowlog_len}")
# Performance analysis with info
info = await redis.info('stats')
print(f"Total commands processed: {info.get('total_commands_processed')}")
print(f"Instantaneous ops/sec: {info.get('instantaneous_ops_per_sec')}")
print(f"Keyspace hits: {info.get('keyspace_hits')}")
print(f"Keyspace misses: {info.get('keyspace_misses')}")
hit_rate = info.get('keyspace_hits', 0) / max(
info.get('keyspace_hits', 0) + info.get('keyspace_misses', 0), 1
)
print(f"Cache hit rate: {hit_rate:.2%}")
# Memory efficiency
memory_info = await redis.info('memory')
used_memory = memory_info.get('used_memory', 0)
used_memory_peak = memory_info.get('used_memory_peak', 0)
print(f"Memory usage: {used_memory_peak - used_memory} bytes below peak")
await redis.aclose()async def backup_persistence_examples():
redis = aioredis.Redis(decode_responses=True)
# Add some test data
await redis.mset({
'backup_test:1': 'value1',
'backup_test:2': 'value2',
'backup_test:3': 'value3'
})
# Check last save time
last_save = await redis.lastsave()
print(f"Last save timestamp: {last_save}")
# Trigger background save
bgsave_started = await redis.bgsave()
if bgsave_started:
print("Background save started")
# Monitor save progress
while True:
info = await redis.info('persistence')
if info.get('rdb_bgsave_in_progress') == '0':
print("Background save completed")
break
await asyncio.sleep(0.1)
# Trigger AOF rewrite if AOF is enabled
try:
aof_rewrite_started = await redis.bgrewriteaof()
if aof_rewrite_started:
print("AOF rewrite started")
except Exception as e:
print(f"AOF rewrite not available: {e}")
# Get persistence info
persistence_info = await redis.info('persistence')
print("Persistence configuration:")
for key, value in persistence_info.items():
if 'rdb' in key or 'aof' in key:
print(f" {key}: {value}")
# Synchronous save (blocks server - use carefully!)
# await redis.save() # Uncomment only if safe to block
await redis.aclose()async def acl_security_examples():
redis = aioredis.Redis(decode_responses=True)
# Check if ACL is supported (Redis 6+)
try:
current_user = await redis.acl_whoami()
print(f"Current user: {current_user}")
# List all users
users = await redis.acl_users()
print(f"ACL users: {users}")
# Get user details
for user in users[:3]: # First 3 users
user_info = await redis.acl_getuser(user)
print(f"User {user}:")
print(f" Enabled: {user_info.get('flags', [])}")
print(f" Commands: {user_info.get('commands', 'N/A')}")
# List ACL categories
categories = await redis.acl_cat()
print(f"ACL categories: {categories}")
# Generate secure password
secure_password = await redis.acl_genpass()
print(f"Generated secure password: {secure_password[:8]}...")
# Create test user (be very careful in production!)
# await redis.acl_setuser(
# 'testuser',
# enabled=True,
# passwords=[secure_password],
# commands=['+get', '+set'],
# keys=['test:*']
# )
# print("Test user created")
# Check ACL log
acl_log = await redis.acl_log(5)
if acl_log:
print(f"Recent ACL events: {len(acl_log)}")
for event in acl_log:
print(f" {event}")
except Exception as e:
print(f"ACL commands not supported: {e}")
await redis.aclose()async def replication_examples():
redis = aioredis.Redis(decode_responses=True)
# Get replication information
repl_info = await redis.info('replication')
print("Replication status:")
print(f" Role: {repl_info.get('role')}")
print(f" Connected slaves: {repl_info.get('connected_slaves', 0)}")
if repl_info.get('role') == 'master':
print("This is a master server")
# If there are slaves, wait for replication
if int(repl_info.get('connected_slaves', 0)) > 0:
# Write test data
await redis.set('replication_test', 'test_value')
# Wait for at least 1 replica to acknowledge
replicas_acked = await redis.wait(1, 5000) # 5 second timeout
print(f"Replicas that acknowledged write: {replicas_acked}")
elif repl_info.get('role') == 'slave':
print("This is a slave server")
master_host = repl_info.get('master_host')
master_port = repl_info.get('master_port')
print(f" Master: {master_host}:{master_port}")
print(f" Link status: {repl_info.get('master_link_status')}")
# Promote to master (dangerous operation!)
# await redis.slaveof() # No arguments = stop being slave
# print("Promoted to master")
else:
print("Setting up as master")
# This server is not part of replication
# To make it a slave: await redis.slaveof('master_host', 6379)
await redis.aclose()Install with Tessl CLI
npx tessl i tessl/pypi-aioredisdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10