asyncio (PEP 3156) Redis support
Overall
score
98%
Advanced Redis functionality including pub/sub messaging, Redis Streams consumer groups, transactions, pipelines, Lua scripting, distributed locking, and batch operations for building high-performance and scalable applications.
Batch command execution for improved performance and atomic operations with optimistic locking support.
def pipeline(transaction: bool = True, shard_hint: Any = None) -> Pipeline:
"""
Create a pipeline for batching commands.
Args:
transaction: Whether to execute as transaction (with MULTI/EXEC)
shard_hint: Hint for sharding (cluster mode)
Returns:
Pipeline object for command batching
"""
class Pipeline:
"""Pipeline for batching Redis commands."""
def multi(self) -> None:
"""Start transaction block."""
async def execute(self) -> List[Any]:
"""
Execute all pipelined commands.
Returns:
List of command results in order
"""
def reset(self) -> None:
"""Reset pipeline, clearing all commands."""
def watch(*keys: str) -> None:
"""
Watch keys for changes during transaction.
Args:
keys: Key names to watch
"""
def unwatch(self) -> None:
"""Unwatch all keys."""
async def watch(*keys: str) -> None:
"""
Watch keys for optimistic locking.
Args:
keys: Key names to watch for changes
"""
async def unwatch(self) -> None:
"""Unwatch all keys."""Real-time messaging system for building event-driven applications with channel subscriptions and pattern matching.
def pubsub() -> PubSub:
"""
Create pub/sub client for messaging.
Returns:
PubSub client instance
"""
class PubSub:
"""Pub/Sub client for real-time messaging."""
async def subscribe(*channels: str, **pattern_handlers: Callable) -> None:
"""
Subscribe to channels.
Args:
channels: Channel names to subscribe to
pattern_handlers: Channel name to handler function mapping
"""
async def psubscribe(*patterns: str, **pattern_handlers: Callable) -> None:
"""
Subscribe to channel patterns.
Args:
patterns: Channel patterns to subscribe to (* and ? wildcards)
pattern_handlers: Pattern to handler function mapping
"""
async def unsubscribe(*channels: str) -> None:
"""
Unsubscribe from channels.
Args:
channels: Channel names to unsubscribe from
"""
async def punsubscribe(*patterns: str) -> None:
"""
Unsubscribe from channel patterns.
Args:
patterns: Channel patterns to unsubscribe from
"""
async def get_message(ignore_subscribe_messages: bool = False, timeout: float = 0) -> Optional[Dict]:
"""
Get next message.
Args:
ignore_subscribe_messages: Skip subscription confirmations
timeout: Timeout in seconds (0 for non-blocking)
Returns:
Message dictionary or None
"""
def listen() -> AsyncIterator[Dict]:
"""
Listen for messages asynchronously.
Returns:
Async iterator of message dictionaries
"""
async def close(self) -> None:
"""Close pub/sub client and cleanup resources."""
async def publish(channel: str, message: str) -> int:
"""
Publish message to channel.
Args:
channel: Channel name
message: Message content
Returns:
Number of clients that received the message
"""
async def pubsub_channels(pattern: str = "*") -> List[str]:
"""
Get active pub/sub channels.
Args:
pattern: Channel pattern filter
Returns:
List of active channel names
"""
async def pubsub_numsub(*channels: str) -> Dict[str, int]:
"""
Get channel subscriber counts.
Args:
channels: Channel names
Returns:
Dictionary mapping channel names to subscriber counts
"""Execute custom Lua scripts on the Redis server for atomic operations and complex data processing.
async def eval(script: str, numkeys: int, *keys_and_args: Any) -> Any:
"""
Execute Lua script.
Args:
script: Lua script code
numkeys: Number of keys in script arguments
keys_and_args: Keys followed by arguments for script
Returns:
Script return value
"""
async def evalsha(sha: str, numkeys: int, *keys_and_args: Any) -> Any:
"""
Execute Lua script by SHA hash.
Args:
sha: SHA1 hash of previously loaded script
numkeys: Number of keys in script arguments
keys_and_args: Keys followed by arguments for script
Returns:
Script return value
"""
async def script_load(script: str) -> str:
"""
Load Lua script and return SHA hash.
Args:
script: Lua script code
Returns:
SHA1 hash of loaded script
"""
async def script_exists(*shas: str) -> List[bool]:
"""
Check if scripts exist by SHA hash.
Args:
shas: SHA1 hashes to check
Returns:
List of boolean values indicating existence
"""
async def script_flush(self) -> bool:
"""
Remove all cached scripts.
Returns:
True if successful
"""
def register_script(script: str) -> Script:
"""
Register Lua script for reuse.
Args:
script: Lua script code
Returns:
Script object that can be called
"""
class Script:
"""Registered Lua script wrapper."""
async def __call__(self, keys: List[str] = None, args: List[Any] = None, client: Optional['Redis'] = None) -> Any:
"""
Execute the registered script.
Args:
keys: Keys for script execution
args: Arguments for script execution
client: Redis client to use (defaults to registration client)
Returns:
Script return value
"""Distributed locking mechanism for coordinating access to shared resources across multiple processes or servers.
def lock(
name: str,
timeout: Optional[float] = None,
sleep: float = 0.1,
blocking: bool = True,
blocking_timeout: Optional[float] = None,
thread_local: bool = True
) -> Lock:
"""
Create distributed lock.
Args:
name: Lock name/key
timeout: Lock expiration timeout in seconds
sleep: Sleep interval when waiting for lock
blocking: Whether to block when acquiring
blocking_timeout: Maximum time to block for acquisition
thread_local: Whether lock is thread-local
Returns:
Lock instance
"""
class Lock:
"""Distributed lock implementation."""
async def acquire(
self,
blocking: Optional[bool] = None,
blocking_timeout: Optional[float] = None,
token: Optional[str] = None
) -> bool:
"""
Acquire the lock.
Args:
blocking: Override blocking behavior
blocking_timeout: Override blocking timeout
token: Specific lock token to use
Returns:
True if lock was acquired
"""
async def release(self) -> bool:
"""
Release the lock.
Returns:
True if lock was released by this client
"""
async def extend(self, additional_time: float, replace_ttl: bool = False) -> bool:
"""
Extend lock timeout.
Args:
additional_time: Time to add to current timeout
replace_ttl: Replace TTL instead of extending
Returns:
True if lock was extended
"""
def locked(self) -> bool:
"""
Check if lock is currently held.
Returns:
True if lock is held
"""
def owned(self) -> bool:
"""
Check if lock is owned by this client.
Returns:
True if lock is owned by this client
"""
async def __aenter__(self) -> 'Lock':
"""Async context manager entry."""
await self.acquire()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Async context manager exit."""
await self.release()Monitor Redis server commands and operations in real-time for debugging and performance analysis.
def monitor() -> Monitor:
"""
Create monitor client for watching commands.
Returns:
Monitor client instance
"""
class Monitor:
"""Monitor client for watching Redis commands."""
async def next_command(self) -> Dict[str, Any]:
"""
Get next monitored command.
Returns:
Dictionary with command details
"""
def monitor(self) -> AsyncIterator[Dict[str, Any]]:
"""
Monitor commands asynchronously.
Returns:
Async iterator of command dictionaries
"""Efficient iteration over large keyspaces and data structures using cursor-based scanning.
async def scan(
cursor: int = 0,
match: Optional[str] = None,
count: Optional[int] = None,
_type: Optional[str] = None
) -> Tuple[int, List[str]]:
"""
Incrementally scan keyspace.
Args:
cursor: Scan cursor position (0 to start)
match: Pattern to match keys against
count: Approximate number of keys per iteration
_type: Filter by key type
Returns:
Tuple of (next_cursor, keys)
"""
async def sscan(
name: str,
cursor: int = 0,
match: Optional[str] = None,
count: Optional[int] = None
) -> Tuple[int, List[str]]:
"""
Incrementally scan set members.
Args:
name: Set key name
cursor: Scan cursor position
match: Pattern to match members against
count: Approximate number of members per iteration
Returns:
Tuple of (next_cursor, members)
"""
async def hscan(
name: str,
cursor: int = 0,
match: Optional[str] = None,
count: Optional[int] = None
) -> Tuple[int, Dict[str, str]]:
"""
Incrementally scan hash fields.
Args:
name: Hash key name
cursor: Scan cursor position
match: Pattern to match fields against
count: Approximate number of fields per iteration
Returns:
Tuple of (next_cursor, field_value_dict)
"""
async def zscan(
name: str,
cursor: int = 0,
match: Optional[str] = None,
count: Optional[int] = None,
score_cast_func: Callable = float
) -> Tuple[int, List[Tuple[str, float]]]:
"""
Incrementally scan sorted set members.
Args:
name: Sorted set key name
cursor: Scan cursor position
match: Pattern to match members against
count: Approximate number of members per iteration
score_cast_func: Function to convert scores
Returns:
Tuple of (next_cursor, member_score_pairs)
"""Server-side sorting with support for external keys, patterns, and result storage.
async def sort(
name: str,
start: Optional[int] = None,
num: Optional[int] = None,
by: Optional[str] = None,
get: Optional[List[str]] = None,
desc: bool = False,
alpha: bool = False,
store: Optional[str] = None,
groups: bool = False
) -> Union[List[str], int]:
"""
Sort and return or store list, set, or sorted set.
Args:
name: Key name to sort
start: Skip this many elements
num: Return this many elements
by: Sort by external key pattern
get: Retrieve values from external keys
desc: Sort in descending order
alpha: Sort lexicographically
store: Store result in this key
groups: Group returned values
Returns:
Sorted results or number of stored elements
"""async def pipeline_examples():
redis = aioredis.Redis(decode_responses=True)
# Basic pipeline (non-transactional)
pipe = redis.pipeline(transaction=False)
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.get('key1')
pipe.get('key2')
results = await pipe.execute()
print(results) # [True, True, 'value1', 'value2']
# Transactional pipeline
pipe = redis.pipeline(transaction=True)
pipe.multi()
pipe.incr('counter')
pipe.incr('counter')
pipe.get('counter')
results = await pipe.execute()
print(results) # [1, 2, '2']
# Optimistic locking
await redis.watch('balance')
balance = int(await redis.get('balance') or 0)
if balance >= 100:
pipe = redis.pipeline(transaction=True)
pipe.multi()
pipe.decrby('balance', 100)
pipe.incr('purchases')
await pipe.execute()async def pubsub_examples():
redis = aioredis.Redis(decode_responses=True)
# Publisher
async def publisher():
for i in range(10):
await redis.publish('notifications', f'Message {i}')
await asyncio.sleep(1)
# Subscriber with message handler
async def subscriber():
pubsub = redis.pubsub()
await pubsub.subscribe('notifications')
try:
async for message in pubsub.listen():
if message['type'] == 'message':
print(f"Received: {message['data']}")
if message['data'] == 'STOP':
break
finally:
await pubsub.close()
# Pattern subscription
async def pattern_subscriber():
pubsub = redis.pubsub()
await pubsub.psubscribe('user:*:notifications')
try:
while True:
message = await pubsub.get_message(timeout=1)
if message and message['type'] == 'pmessage':
channel = message['channel']
data = message['data']
print(f"Pattern match {channel}: {data}")
except asyncio.TimeoutError:
pass
finally:
await pubsub.close()
# Run publisher and subscriber concurrently
await asyncio.gather(publisher(), subscriber())async def lua_script_examples():
redis = aioredis.Redis(decode_responses=True)
# Simple script execution
script = """
local key = KEYS[1]
local increment = ARGV[1]
local current = redis.call('GET', key) or 0
local new_value = current + increment
redis.call('SET', key, new_value)
return new_value
"""
result = await redis.eval(script, 1, 'counter', 5)
print(f"Counter value: {result}")
# Register script for reuse
atomic_increment = redis.register_script("""
local key = KEYS[1]
local increment = tonumber(ARGV[1])
local max_value = tonumber(ARGV[2])
local current = tonumber(redis.call('GET', key) or 0)
if current + increment <= max_value then
local new_value = current + increment
redis.call('SET', key, new_value)
return new_value
else
return -1 -- Exceeded maximum
end
""")
# Use registered script
result = await atomic_increment(keys=['limited_counter'], args=[3, 100])
if result == -1:
print("Increment would exceed maximum")
else:
print(f"New value: {result}")
# Complex script with multiple operations
batch_update = redis.register_script("""
local user_key = 'user:' .. ARGV[1]
local points = tonumber(ARGV[2])
local level = tonumber(ARGV[3])
-- Update user data
redis.call('HSET', user_key, 'points', points)
redis.call('HSET', user_key, 'level', level)
redis.call('HSET', user_key, 'last_updated', ARGV[4])
-- Update leaderboard
redis.call('ZADD', 'leaderboard', points, ARGV[1])
-- Update level ranking
redis.call('ZADD', 'level:' .. level, points, ARGV[1])
return redis.call('ZRANK', 'leaderboard', ARGV[1])
""")
import time
rank = await batch_update(
keys=[],
args=['user123', 1500, 5, int(time.time())]
)
print(f"User rank: {rank}")async def locking_examples():
redis = aioredis.Redis(decode_responses=True)
# Basic lock usage
lock = redis.lock('resource_lock', timeout=10)
try:
# Try to acquire lock
acquired = await lock.acquire(blocking_timeout=5)
if acquired:
print("Lock acquired, performing critical section")
await asyncio.sleep(2) # Simulate work
print("Work completed")
else:
print("Could not acquire lock")
finally:
if lock.owned():
await lock.release()
# Context manager usage
async with redis.lock('another_resource', timeout=30) as lock:
print("Automatically acquired lock")
await asyncio.sleep(1)
# Lock automatically released on exit
# Lock with extension
long_task_lock = redis.lock('long_task', timeout=10)
await long_task_lock.acquire()
try:
for i in range(5):
print(f"Working on step {i+1}")
await asyncio.sleep(3)
# Extend lock if needed
if i < 4: # Don't extend on last iteration
extended = await long_task_lock.extend(10)
if extended:
print("Lock extended")
else:
print("Could not extend lock")
break
finally:
await long_task_lock.release()
# Non-blocking lock attempt
quick_lock = redis.lock('quick_resource')
if await quick_lock.acquire(blocking=False):
try:
print("Got lock immediately")
# Quick operation
finally:
await quick_lock.release()
else:
print("Resource busy, skipping operation")async def scanning_examples():
redis = aioredis.Redis(decode_responses=True)
# Scan all keys with pattern
cursor = 0
user_keys = []
while True:
cursor, keys = await redis.scan(cursor, match='user:*', count=100)
user_keys.extend(keys)
if cursor == 0: # Scan complete
break
print(f"Found {len(user_keys)} user keys")
# Scan set members
cursor = 0
all_tags = []
while True:
cursor, members = await redis.sscan('all_tags', cursor, count=50)
all_tags.extend(members)
if cursor == 0:
break
# Scan hash fields
cursor = 0
config_items = {}
while True:
cursor, items = await redis.hscan('config', cursor, match='cache_*')
config_items.update(items)
if cursor == 0:
break
# Scan sorted set with score filtering
cursor = 0
high_scores = []
while True:
cursor, members = await redis.zscan(
'leaderboard',
cursor,
match='player*',
count=25
)
# Filter by score if needed
high_scores.extend([
(member, score) for member, score in members
if score > 1000
])
if cursor == 0:
break
print(f"Found {len(high_scores)} high-scoring players")Install with Tessl CLI
npx tessl i tessl/pypi-aioredisdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10