CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aioredis

asyncio (PEP 3156) Redis support

Overall
score

98%

Overview
Eval results
Files

advanced-features.mddocs/

Advanced Features

Advanced Redis functionality including pub/sub messaging, Redis Streams consumer groups, transactions, pipelines, Lua scripting, distributed locking, and batch operations for building high-performance and scalable applications.

Capabilities

Pipelines and Transactions

Batch command execution for improved performance and atomic operations with optimistic locking support.

def pipeline(transaction: bool = True, shard_hint: Any = None) -> Pipeline:
    """
    Create a pipeline for batching commands.
    
    Args:
        transaction: Whether to execute as transaction (with MULTI/EXEC)
        shard_hint: Hint for sharding (cluster mode)
        
    Returns:
        Pipeline object for command batching
    """

class Pipeline:
    """Pipeline for batching Redis commands."""
    
    def multi(self) -> None:
        """Start transaction block."""
    
    async def execute(self) -> List[Any]:
        """
        Execute all pipelined commands.
        
        Returns:
            List of command results in order
        """
    
    def reset(self) -> None:
        """Reset pipeline, clearing all commands."""
    
    def watch(*keys: str) -> None:
        """
        Watch keys for changes during transaction.
        
        Args:
            keys: Key names to watch
        """
    
    def unwatch(self) -> None:
        """Unwatch all keys."""

async def watch(*keys: str) -> None:
    """
    Watch keys for optimistic locking.
    
    Args:
        keys: Key names to watch for changes
    """

async def unwatch(self) -> None:
    """Unwatch all keys."""

Pub/Sub Messaging

Real-time messaging system for building event-driven applications with channel subscriptions and pattern matching.

def pubsub() -> PubSub:
    """
    Create pub/sub client for messaging.
    
    Returns:
        PubSub client instance
    """

class PubSub:
    """Pub/Sub client for real-time messaging."""
    
    async def subscribe(*channels: str, **pattern_handlers: Callable) -> None:
        """
        Subscribe to channels.
        
        Args:
            channels: Channel names to subscribe to
            pattern_handlers: Channel name to handler function mapping
        """
    
    async def psubscribe(*patterns: str, **pattern_handlers: Callable) -> None:
        """
        Subscribe to channel patterns.
        
        Args:
            patterns: Channel patterns to subscribe to (* and ? wildcards)
            pattern_handlers: Pattern to handler function mapping
        """
    
    async def unsubscribe(*channels: str) -> None:
        """
        Unsubscribe from channels.
        
        Args:
            channels: Channel names to unsubscribe from
        """
    
    async def punsubscribe(*patterns: str) -> None:
        """
        Unsubscribe from channel patterns.
        
        Args:
            patterns: Channel patterns to unsubscribe from
        """
    
    async def get_message(ignore_subscribe_messages: bool = False, timeout: float = 0) -> Optional[Dict]:
        """
        Get next message.
        
        Args:
            ignore_subscribe_messages: Skip subscription confirmations
            timeout: Timeout in seconds (0 for non-blocking)
            
        Returns:
            Message dictionary or None
        """
    
    def listen() -> AsyncIterator[Dict]:
        """
        Listen for messages asynchronously.
        
        Returns:
            Async iterator of message dictionaries
        """
    
    async def close(self) -> None:
        """Close pub/sub client and cleanup resources."""

async def publish(channel: str, message: str) -> int:
    """
    Publish message to channel.
    
    Args:
        channel: Channel name
        message: Message content
        
    Returns:
        Number of clients that received the message
    """

async def pubsub_channels(pattern: str = "*") -> List[str]:
    """
    Get active pub/sub channels.
    
    Args:
        pattern: Channel pattern filter
        
    Returns:
        List of active channel names
    """

async def pubsub_numsub(*channels: str) -> Dict[str, int]:
    """
    Get channel subscriber counts.
    
    Args:
        channels: Channel names
        
    Returns:
        Dictionary mapping channel names to subscriber counts
    """

Lua Scripting

Execute custom Lua scripts on the Redis server for atomic operations and complex data processing.

async def eval(script: str, numkeys: int, *keys_and_args: Any) -> Any:
    """
    Execute Lua script.
    
    Args:
        script: Lua script code
        numkeys: Number of keys in script arguments
        keys_and_args: Keys followed by arguments for script
        
    Returns:
        Script return value
    """

async def evalsha(sha: str, numkeys: int, *keys_and_args: Any) -> Any:
    """
    Execute Lua script by SHA hash.
    
    Args:
        sha: SHA1 hash of previously loaded script
        numkeys: Number of keys in script arguments
        keys_and_args: Keys followed by arguments for script
        
    Returns:
        Script return value
    """

async def script_load(script: str) -> str:
    """
    Load Lua script and return SHA hash.
    
    Args:
        script: Lua script code
        
    Returns:
        SHA1 hash of loaded script
    """

async def script_exists(*shas: str) -> List[bool]:
    """
    Check if scripts exist by SHA hash.
    
    Args:
        shas: SHA1 hashes to check
        
    Returns:
        List of boolean values indicating existence
    """

async def script_flush(self) -> bool:
    """
    Remove all cached scripts.
    
    Returns:
        True if successful
    """

def register_script(script: str) -> Script:
    """
    Register Lua script for reuse.
    
    Args:
        script: Lua script code
        
    Returns:
        Script object that can be called
    """

class Script:
    """Registered Lua script wrapper."""
    
    async def __call__(self, keys: List[str] = None, args: List[Any] = None, client: Optional['Redis'] = None) -> Any:
        """
        Execute the registered script.
        
        Args:
            keys: Keys for script execution
            args: Arguments for script execution
            client: Redis client to use (defaults to registration client)
            
        Returns:
            Script return value
        """

Distributed Locking

Distributed locking mechanism for coordinating access to shared resources across multiple processes or servers.

def lock(
    name: str, 
    timeout: Optional[float] = None, 
    sleep: float = 0.1, 
    blocking: bool = True, 
    blocking_timeout: Optional[float] = None, 
    thread_local: bool = True
) -> Lock:
    """
    Create distributed lock.
    
    Args:
        name: Lock name/key
        timeout: Lock expiration timeout in seconds
        sleep: Sleep interval when waiting for lock
        blocking: Whether to block when acquiring
        blocking_timeout: Maximum time to block for acquisition
        thread_local: Whether lock is thread-local
        
    Returns:
        Lock instance
    """

class Lock:
    """Distributed lock implementation."""
    
    async def acquire(
        self, 
        blocking: Optional[bool] = None, 
        blocking_timeout: Optional[float] = None, 
        token: Optional[str] = None
    ) -> bool:
        """
        Acquire the lock.
        
        Args:
            blocking: Override blocking behavior
            blocking_timeout: Override blocking timeout
            token: Specific lock token to use
            
        Returns:
            True if lock was acquired
        """
    
    async def release(self) -> bool:
        """
        Release the lock.
        
        Returns:
            True if lock was released by this client
        """
    
    async def extend(self, additional_time: float, replace_ttl: bool = False) -> bool:
        """
        Extend lock timeout.
        
        Args:
            additional_time: Time to add to current timeout
            replace_ttl: Replace TTL instead of extending
            
        Returns:
            True if lock was extended
        """
    
    def locked(self) -> bool:
        """
        Check if lock is currently held.
        
        Returns:
            True if lock is held
        """
    
    def owned(self) -> bool:
        """
        Check if lock is owned by this client.
        
        Returns:
            True if lock is owned by this client
        """
    
    async def __aenter__(self) -> 'Lock':
        """Async context manager entry."""
        await self.acquire()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Async context manager exit."""
        await self.release()

Monitoring

Monitor Redis server commands and operations in real-time for debugging and performance analysis.

def monitor() -> Monitor:
    """
    Create monitor client for watching commands.
    
    Returns:
        Monitor client instance
    """

class Monitor:
    """Monitor client for watching Redis commands."""
    
    async def next_command(self) -> Dict[str, Any]:
        """
        Get next monitored command.
        
        Returns:
            Dictionary with command details
        """
    
    def monitor(self) -> AsyncIterator[Dict[str, Any]]:
        """
        Monitor commands asynchronously.
        
        Returns:
            Async iterator of command dictionaries
        """

Scanning Operations

Efficient iteration over large keyspaces and data structures using cursor-based scanning.

async def scan(
    cursor: int = 0, 
    match: Optional[str] = None, 
    count: Optional[int] = None, 
    _type: Optional[str] = None
) -> Tuple[int, List[str]]:
    """
    Incrementally scan keyspace.
    
    Args:
        cursor: Scan cursor position (0 to start)
        match: Pattern to match keys against
        count: Approximate number of keys per iteration
        _type: Filter by key type
        
    Returns:
        Tuple of (next_cursor, keys)
    """

async def sscan(
    name: str, 
    cursor: int = 0, 
    match: Optional[str] = None, 
    count: Optional[int] = None
) -> Tuple[int, List[str]]:
    """
    Incrementally scan set members.
    
    Args:
        name: Set key name
        cursor: Scan cursor position
        match: Pattern to match members against
        count: Approximate number of members per iteration
        
    Returns:
        Tuple of (next_cursor, members)
    """

async def hscan(
    name: str, 
    cursor: int = 0, 
    match: Optional[str] = None, 
    count: Optional[int] = None
) -> Tuple[int, Dict[str, str]]:
    """
    Incrementally scan hash fields.
    
    Args:
        name: Hash key name
        cursor: Scan cursor position
        match: Pattern to match fields against
        count: Approximate number of fields per iteration
        
    Returns:
        Tuple of (next_cursor, field_value_dict)
    """

async def zscan(
    name: str, 
    cursor: int = 0, 
    match: Optional[str] = None, 
    count: Optional[int] = None, 
    score_cast_func: Callable = float
) -> Tuple[int, List[Tuple[str, float]]]:
    """
    Incrementally scan sorted set members.
    
    Args:
        name: Sorted set key name
        cursor: Scan cursor position
        match: Pattern to match members against
        count: Approximate number of members per iteration
        score_cast_func: Function to convert scores
        
    Returns:
        Tuple of (next_cursor, member_score_pairs)
    """

Sort Operations

Server-side sorting with support for external keys, patterns, and result storage.

async def sort(
    name: str, 
    start: Optional[int] = None, 
    num: Optional[int] = None, 
    by: Optional[str] = None, 
    get: Optional[List[str]] = None, 
    desc: bool = False, 
    alpha: bool = False, 
    store: Optional[str] = None, 
    groups: bool = False
) -> Union[List[str], int]:
    """
    Sort and return or store list, set, or sorted set.
    
    Args:
        name: Key name to sort
        start: Skip this many elements
        num: Return this many elements
        by: Sort by external key pattern
        get: Retrieve values from external keys
        desc: Sort in descending order
        alpha: Sort lexicographically
        store: Store result in this key
        groups: Group returned values
        
    Returns:
        Sorted results or number of stored elements
    """

Usage Examples

Pipeline Operations

async def pipeline_examples():
    redis = aioredis.Redis(decode_responses=True)
    
    # Basic pipeline (non-transactional)
    pipe = redis.pipeline(transaction=False)
    pipe.set('key1', 'value1')
    pipe.set('key2', 'value2')
    pipe.get('key1')
    pipe.get('key2')
    results = await pipe.execute()
    print(results)  # [True, True, 'value1', 'value2']
    
    # Transactional pipeline
    pipe = redis.pipeline(transaction=True)
    pipe.multi()
    pipe.incr('counter')
    pipe.incr('counter')  
    pipe.get('counter')
    results = await pipe.execute()
    print(results)  # [1, 2, '2']
    
    # Optimistic locking
    await redis.watch('balance')
    balance = int(await redis.get('balance') or 0)
    if balance >= 100:
        pipe = redis.pipeline(transaction=True)
        pipe.multi()
        pipe.decrby('balance', 100)
        pipe.incr('purchases')
        await pipe.execute()

Pub/Sub Messaging

async def pubsub_examples():
    redis = aioredis.Redis(decode_responses=True)
    
    # Publisher
    async def publisher():
        for i in range(10):
            await redis.publish('notifications', f'Message {i}')
            await asyncio.sleep(1)
    
    # Subscriber with message handler
    async def subscriber():
        pubsub = redis.pubsub()
        await pubsub.subscribe('notifications')
        
        try:
            async for message in pubsub.listen():
                if message['type'] == 'message':
                    print(f"Received: {message['data']}")
                    if message['data'] == 'STOP':
                        break
        finally:
            await pubsub.close()
    
    # Pattern subscription
    async def pattern_subscriber():
        pubsub = redis.pubsub()
        await pubsub.psubscribe('user:*:notifications')
        
        try:
            while True:
                message = await pubsub.get_message(timeout=1)
                if message and message['type'] == 'pmessage':
                    channel = message['channel']
                    data = message['data']
                    print(f"Pattern match {channel}: {data}")
        except asyncio.TimeoutError:
            pass
        finally:
            await pubsub.close()
    
    # Run publisher and subscriber concurrently
    await asyncio.gather(publisher(), subscriber())

Lua Scripting

async def lua_script_examples():
    redis = aioredis.Redis(decode_responses=True)
    
    # Simple script execution
    script = """
    local key = KEYS[1]
    local increment = ARGV[1]
    local current = redis.call('GET', key) or 0
    local new_value = current + increment
    redis.call('SET', key, new_value)
    return new_value
    """
    
    result = await redis.eval(script, 1, 'counter', 5)
    print(f"Counter value: {result}")
    
    # Register script for reuse
    atomic_increment = redis.register_script("""
    local key = KEYS[1]
    local increment = tonumber(ARGV[1])
    local max_value = tonumber(ARGV[2])
    
    local current = tonumber(redis.call('GET', key) or 0)
    if current + increment <= max_value then
        local new_value = current + increment
        redis.call('SET', key, new_value)
        return new_value
    else
        return -1  -- Exceeded maximum
    end
    """)
    
    # Use registered script
    result = await atomic_increment(keys=['limited_counter'], args=[3, 100])
    if result == -1:
        print("Increment would exceed maximum")
    else:
        print(f"New value: {result}")
    
    # Complex script with multiple operations
    batch_update = redis.register_script("""
    local user_key = 'user:' .. ARGV[1]
    local points = tonumber(ARGV[2])
    local level = tonumber(ARGV[3])
    
    -- Update user data
    redis.call('HSET', user_key, 'points', points)
    redis.call('HSET', user_key, 'level', level)
    redis.call('HSET', user_key, 'last_updated', ARGV[4])
    
    -- Update leaderboard
    redis.call('ZADD', 'leaderboard', points, ARGV[1])
    
    -- Update level ranking
    redis.call('ZADD', 'level:' .. level, points, ARGV[1])
    
    return redis.call('ZRANK', 'leaderboard', ARGV[1])
    """)
    
    import time
    rank = await batch_update(
        keys=[], 
        args=['user123', 1500, 5, int(time.time())]
    )
    print(f"User rank: {rank}")

Distributed Locking

async def locking_examples():
    redis = aioredis.Redis(decode_responses=True)
    
    # Basic lock usage
    lock = redis.lock('resource_lock', timeout=10)
    
    try:
        # Try to acquire lock
        acquired = await lock.acquire(blocking_timeout=5)
        if acquired:
            print("Lock acquired, performing critical section")
            await asyncio.sleep(2)  # Simulate work
            print("Work completed")
        else:
            print("Could not acquire lock")
    finally:
        if lock.owned():
            await lock.release()
    
    # Context manager usage
    async with redis.lock('another_resource', timeout=30) as lock:
        print("Automatically acquired lock")
        await asyncio.sleep(1)
        # Lock automatically released on exit
    
    # Lock with extension
    long_task_lock = redis.lock('long_task', timeout=10)
    await long_task_lock.acquire()
    
    try:
        for i in range(5):
            print(f"Working on step {i+1}")
            await asyncio.sleep(3)
            
            # Extend lock if needed
            if i < 4:  # Don't extend on last iteration
                extended = await long_task_lock.extend(10)
                if extended:
                    print("Lock extended")
                else:
                    print("Could not extend lock")
                    break
    finally:
        await long_task_lock.release()
    
    # Non-blocking lock attempt
    quick_lock = redis.lock('quick_resource')
    if await quick_lock.acquire(blocking=False):
        try:
            print("Got lock immediately")
            # Quick operation
        finally:
            await quick_lock.release()
    else:
        print("Resource busy, skipping operation")

Scanning Large Datasets

async def scanning_examples():
    redis = aioredis.Redis(decode_responses=True)
    
    # Scan all keys with pattern
    cursor = 0
    user_keys = []
    
    while True:
        cursor, keys = await redis.scan(cursor, match='user:*', count=100)
        user_keys.extend(keys)
        
        if cursor == 0:  # Scan complete
            break
    
    print(f"Found {len(user_keys)} user keys")
    
    # Scan set members
    cursor = 0
    all_tags = []
    
    while True:
        cursor, members = await redis.sscan('all_tags', cursor, count=50)
        all_tags.extend(members)
        
        if cursor == 0:
            break
    
    # Scan hash fields
    cursor = 0
    config_items = {}
    
    while True:
        cursor, items = await redis.hscan('config', cursor, match='cache_*')
        config_items.update(items)
        
        if cursor == 0:
            break
    
    # Scan sorted set with score filtering
    cursor = 0
    high_scores = []
    
    while True:
        cursor, members = await redis.zscan(
            'leaderboard', 
            cursor, 
            match='player*',
            count=25
        )
        # Filter by score if needed
        high_scores.extend([
            (member, score) for member, score in members 
            if score > 1000
        ])
        
        if cursor == 0:
            break
    
    print(f"Found {len(high_scores)} high-scoring players")

Install with Tessl CLI

npx tessl i tessl/pypi-aioredis

docs

advanced-features.md

basic-operations.md

connection-management.md

data-structures.md

index.md

server-admin.md

tile.json