CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-redis

Python client for Redis database and key-value store

Pending
Overview
Eval results
Files

async-support.mddocs/

Async Support

Redis async support provides full asynchronous Redis client functionality using Python's asyncio library. The async client offers the same API as the synchronous client with async/await patterns for non-blocking operations.

import asyncio
import ssl
from typing import TYPE_CHECKING, Optional, Union, List, Dict, Set, Tuple, Callable, Mapping, Type

from redis.asyncio import Redis, ConnectionPool
from redis.credentials import CredentialProvider
from redis.retry import Retry
from redis.backoff import ExponentialWithJitterBackoff
from redis.cache import CacheInterface, CacheConfig
from redis.event import EventDispatcher

# Type checking imports
if TYPE_CHECKING:
    import OpenSSL

Capabilities

Async Redis Client

Asynchronous Redis client for non-blocking operations with identical API to synchronous client.

class Redis:
    def __init__(
        self,
        host: str = "localhost",
        port: int = 6379,
        db: int = 0,
        password: Optional[str] = None,
        socket_timeout: Optional[float] = None,
        socket_connect_timeout: Optional[float] = None,
        socket_keepalive: Optional[bool] = None,
        socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
        connection_pool: Optional[ConnectionPool] = None,
        unix_socket_path: Optional[str] = None,
        encoding: str = "utf-8",
        encoding_errors: str = "strict",
        decode_responses: bool = False,
        retry_on_timeout: bool = False,
        retry: Retry = Retry(
            backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3
        ),
        retry_on_error: Optional[List[Type[Exception]]] = None,
        ssl: bool = False,
        ssl_keyfile: Optional[str] = None,
        ssl_certfile: Optional[str] = None,
        ssl_cert_reqs: Union[str, "ssl.VerifyMode"] = "required",
        ssl_ca_certs: Optional[str] = None,
        ssl_ca_path: Optional[str] = None,
        ssl_ca_data: Optional[str] = None,
        ssl_check_hostname: bool = True,
        ssl_password: Optional[str] = None,
        ssl_validate_ocsp: bool = False,
        ssl_validate_ocsp_stapled: bool = False,
        ssl_ocsp_context: Optional["OpenSSL.SSL.Context"] = None,
        ssl_ocsp_expected_cert: Optional[str] = None,
        ssl_min_version: Optional["ssl.TLSVersion"] = None,
        ssl_ciphers: Optional[str] = None,
        max_connections: Optional[int] = None,
        single_connection_client: bool = False,
        health_check_interval: int = 0,
        client_name: Optional[str] = None,
        lib_name: Optional[str] = "redis-py",
        lib_version: Optional[str] = None,
        username: Optional[str] = None,
        retry_on_timeout: bool = False,
        retry_on_error: Optional[List[Type[Exception]]] = None,
        redis_connect_func: Optional[Callable[[], None]] = None,
        credential_provider: Optional[CredentialProvider] = None,
        protocol: Optional[int] = 2,
        cache: Optional[CacheInterface] = None,
        cache_config: Optional[CacheConfig] = None,
        event_dispatcher: Optional[EventDispatcher] = None
    ): ...

    @classmethod
    async def from_url(
        cls,
        url: str,
        **kwargs
    ) -> "Redis": ...

    async def close(self) -> None: ...
    
    async def ping(self, **kwargs) -> Union[bytes, bool]: ...
    
    async def execute_command(self, *args, **options) -> Any: ...

Async String Operations

Asynchronous Redis string operations for non-blocking string manipulation.

async def set(
    self,
    name: KeyT,
    value: EncodableT,
    ex: Optional[ExpiryT] = None,
    px: Optional[int] = None,
    nx: bool = False,
    xx: bool = False,
    keepttl: bool = False,
    get: bool = False,
    exat: Optional[int] = None,
    pxat: Optional[int] = None
) -> Optional[bool]: ...

async def get(self, name: KeyT) -> Optional[bytes]: ...

async def mget(self, keys: List[KeyT], *args: KeyT) -> List[Optional[bytes]]: ...

async def mset(self, mapping: Dict[KeyT, EncodableT]) -> bool: ...

async def incr(self, name: KeyT, amount: int = 1) -> int: ...

async def decr(self, name: KeyT, amount: int = 1) -> int: ...

Async Pipeline

Asynchronous pipeline for batching multiple commands with non-blocking execution.

def pipeline(self, transaction: bool = True, shard_hint: Optional[str] = None) -> "Pipeline": ...

class Pipeline:
    async def execute(self, raise_on_error: bool = True) -> List[Any]: ...
    
    def reset(self) -> None: ...
    
    async def watch(self, *names: KeyT) -> bool: ...
    
    def multi(self) -> "Pipeline": ...
    
    def discard(self) -> None: ...

Async Pub/Sub

Asynchronous publish/subscribe messaging with async iteration and non-blocking message handling.

def pubsub(self, **kwargs) -> "PubSub": ...

class PubSub:
    async def subscribe(self, *args, **kwargs) -> None: ...
    
    async def unsubscribe(self, *args) -> None: ...
    
    async def psubscribe(self, *args, **kwargs) -> None: ...
    
    async def punsubscribe(self, *args) -> None: ...
    
    def listen(self) -> AsyncIterator[Dict[str, Any]]: ...
    
    async def get_message(
        self,
        ignore_subscribe_messages: bool = False,
        timeout: Optional[float] = 0.0
    ) -> Optional[Dict[str, Any]]: ...
    
    async def close(self) -> None: ...

Async Cluster Client

Asynchronous Redis Cluster client for non-blocking distributed operations.

class RedisCluster:
    def __init__(
        self,
        host: Optional[str] = None,
        port: int = 7000,
        startup_nodes: Optional[List[ClusterNode]] = None,
        cluster_error_retry_attempts: int = 3,
        require_full_coverage: bool = True,
        skip_full_coverage_check: bool = False,
        reinitialize_steps: int = 10,
        read_from_replicas: bool = False,
        dynamic_startup_nodes: bool = True,
        connection_pool_class: Type[ConnectionPool] = ConnectionPool,
        **kwargs
    ): ...

    @classmethod
    async def from_url(
        cls,
        url: str,
        **kwargs
    ) -> "RedisCluster": ...

    async def close(self) -> None: ...
    
    def pipeline(self, transaction: bool = False) -> "ClusterPipeline": ...

Async Connection Management

Asynchronous connection pools and connection classes for non-blocking connection management.

class ConnectionPool:
    async def get_connection(self, command_name: str, **kwargs) -> Connection: ...
    
    async def make_connection(self) -> Connection: ...
    
    def release(self, connection: Connection) -> None: ...
    
    async def disconnect(self, inuse_connections: bool = True) -> None: ...

class Connection:
    async def connect(self) -> None: ...
    
    async def disconnect(self) -> None: ...
    
    async def send_command(self, *args) -> None: ...
    
    async def read_response(self) -> Any: ...

Usage Examples

Basic Async Operations

import asyncio
import redis.asyncio as redis

async def main():
    # Create async Redis client
    r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
    
    try:
        # Basic async operations
        await r.set('key', 'value')
        value = await r.get('key')
        print(f"Value: {value}")
        
        # Multiple operations
        await r.mset({'key1': 'value1', 'key2': 'value2'})
        values = await r.mget(['key1', 'key2'])
        print(f"Values: {values}")
        
    finally:
        await r.close()

# Run the async function
asyncio.run(main())

Async with Context Manager

import redis.asyncio as redis

async def main():
    # Using async context manager (automatically closes)
    async with redis.Redis(host='localhost', port=6379) as r:
        await r.set('session:123', 'session_data')
        data = await r.get('session:123')
        print(f"Session data: {data}")

asyncio.run(main())

Async Pipeline Operations

import redis.asyncio as redis

async def main():
    async with redis.Redis(host='localhost', port=6379) as r:
        # Create async pipeline
        pipe = r.pipeline()
        
        # Queue commands
        pipe.set('user:1001', 'John')
        pipe.set('user:1002', 'Jane')
        pipe.get('user:1001')
        pipe.get('user:1002')
        pipe.incr('page_views')
        
        # Execute all commands async
        results = await pipe.execute()
        print(f"Pipeline results: {results}")

asyncio.run(main())

Async Pub/Sub

import asyncio
import redis.asyncio as redis

async def publisher():
    """Publish messages to a channel"""
    async with redis.Redis(host='localhost', port=6379) as r:
        for i in range(10):
            await r.publish('notifications', f'Message {i}')
            await asyncio.sleep(1)

async def subscriber():
    """Subscribe and listen for messages"""
    async with redis.Redis(host='localhost', port=6379) as r:
        pubsub = r.pubsub()
        await pubsub.subscribe('notifications')
        
        try:
            # Async iteration over messages
            async for message in pubsub.listen():
                if message['type'] == 'message':
                    print(f"Received: {message['data']}")
        finally:
            await pubsub.close()

async def main():
    # Run publisher and subscriber concurrently
    await asyncio.gather(
        publisher(),
        subscriber()
    )

asyncio.run(main())

Async Pub/Sub with Manual Message Handling

import redis.asyncio as redis

async def main():
    async with redis.Redis(host='localhost', port=6379) as r:
        pubsub = r.pubsub()
        await pubsub.subscribe('chat_room')
        
        try:
            while True:
                # Get message with timeout
                message = await pubsub.get_message(timeout=1.0)
                
                if message is None:
                    print("No message received")
                    continue
                    
                if message['type'] == 'message':
                    print(f"Chat message: {message['data']}")
                    
        except KeyboardInterrupt:
            print("Stopping subscriber")
        finally:
            await pubsub.close()

asyncio.run(main())

Async Cluster Operations

import redis.asyncio as redis
from redis.asyncio.cluster import RedisCluster, ClusterNode

async def main():
    # Create async cluster client
    startup_nodes = [
        ClusterNode("localhost", 7000),
        ClusterNode("localhost", 7001),
        ClusterNode("localhost", 7002)
    ]
    
    cluster = RedisCluster(startup_nodes=startup_nodes, decode_responses=True)
    
    try:
        # Cluster operations
        await cluster.set("user:1001", "John")
        user = await cluster.get("user:1001")
        print(f"User: {user}")
        
        # Cluster info
        info = await cluster.cluster_info()
        print(f"Cluster state: {info['cluster_state']}")
        
    finally:
        await cluster.close()

asyncio.run(main())

Async Connection from URL

import redis.asyncio as redis

async def main():
    # Create client from URL
    r = await redis.from_url('redis://localhost:6379/0', decode_responses=True)
    
    try:
        await r.set('url_key', 'url_value')
        value = await r.get('url_key')
        print(f"Value from URL client: {value}")
    finally:
        await r.close()

asyncio.run(main())

Concurrent Operations

import asyncio
import redis.asyncio as redis

async def worker(worker_id, r):
    """Worker function for concurrent operations"""
    for i in range(5):
        key = f"worker:{worker_id}:task:{i}"
        await r.set(key, f"data_{i}")
        value = await r.get(key)
        print(f"Worker {worker_id} - Task {i}: {value}")
        await asyncio.sleep(0.1)

async def main():
    async with redis.Redis(host='localhost', port=6379) as r:
        # Run multiple workers concurrently
        tasks = [worker(i, r) for i in range(3)]
        await asyncio.gather(*tasks)

asyncio.run(main())

Async Error Handling

import redis.asyncio as redis
from redis.exceptions import ConnectionError, TimeoutError

async def main():
    try:
        # Attempt connection with timeout
        r = redis.Redis(
            host='unreachable-host',
            port=6379,
            socket_connect_timeout=5,
            socket_timeout=2
        )
        
        await r.ping()
        
    except ConnectionError as e:
        print(f"Connection failed: {e}")
    except TimeoutError as e:
        print(f"Operation timed out: {e}")
    finally:
        if 'r' in locals():
            await r.close()

asyncio.run(main())

Async Transaction with Watch

import redis.asyncio as redis

async def main():
    async with redis.Redis(host='localhost', port=6379) as r:
        # Initialize counter
        await r.set('counter', 0)
        
        # Transaction with watch
        pipe = r.pipeline()
        
        try:
            # Watch the counter key
            await pipe.watch('counter')
            
            # Get current value
            current_value = await r.get('counter')
            current_value = int(current_value) if current_value else 0
            
            # Start transaction
            pipe.multi()
            pipe.set('counter', current_value + 1)
            
            # Execute transaction
            result = await pipe.execute()
            print(f"Counter incremented: {result}")
            
        except redis.WatchError:
            print("Counter was modified during transaction")

asyncio.run(main())

Install with Tessl CLI

npx tessl i tessl/pypi-redis

docs

async-support.md

cluster-support.md

connection-management.md

core-client.md

distributed-locking.md

error-handling.md

high-availability.md

index.md

pipelines-transactions.md

pubsub-messaging.md

tile.json