Python client for Redis database and key-value store
—
Redis async support provides full asynchronous Redis client functionality using Python's asyncio library. The async client offers the same API as the synchronous client with async/await patterns for non-blocking operations.
import asyncio
import ssl
from typing import TYPE_CHECKING, Optional, Union, List, Dict, Set, Tuple, Callable, Mapping, Type
from redis.asyncio import Redis, ConnectionPool
from redis.credentials import CredentialProvider
from redis.retry import Retry
from redis.backoff import ExponentialWithJitterBackoff
from redis.cache import CacheInterface, CacheConfig
from redis.event import EventDispatcher
# Type checking imports
if TYPE_CHECKING:
import OpenSSLAsynchronous Redis client for non-blocking operations with identical API to synchronous client.
class Redis:
def __init__(
self,
host: str = "localhost",
port: int = 6379,
db: int = 0,
password: Optional[str] = None,
socket_timeout: Optional[float] = None,
socket_connect_timeout: Optional[float] = None,
socket_keepalive: Optional[bool] = None,
socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
connection_pool: Optional[ConnectionPool] = None,
unix_socket_path: Optional[str] = None,
encoding: str = "utf-8",
encoding_errors: str = "strict",
decode_responses: bool = False,
retry_on_timeout: bool = False,
retry: Retry = Retry(
backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3
),
retry_on_error: Optional[List[Type[Exception]]] = None,
ssl: bool = False,
ssl_keyfile: Optional[str] = None,
ssl_certfile: Optional[str] = None,
ssl_cert_reqs: Union[str, "ssl.VerifyMode"] = "required",
ssl_ca_certs: Optional[str] = None,
ssl_ca_path: Optional[str] = None,
ssl_ca_data: Optional[str] = None,
ssl_check_hostname: bool = True,
ssl_password: Optional[str] = None,
ssl_validate_ocsp: bool = False,
ssl_validate_ocsp_stapled: bool = False,
ssl_ocsp_context: Optional["OpenSSL.SSL.Context"] = None,
ssl_ocsp_expected_cert: Optional[str] = None,
ssl_min_version: Optional["ssl.TLSVersion"] = None,
ssl_ciphers: Optional[str] = None,
max_connections: Optional[int] = None,
single_connection_client: bool = False,
health_check_interval: int = 0,
client_name: Optional[str] = None,
lib_name: Optional[str] = "redis-py",
lib_version: Optional[str] = None,
username: Optional[str] = None,
retry_on_timeout: bool = False,
retry_on_error: Optional[List[Type[Exception]]] = None,
redis_connect_func: Optional[Callable[[], None]] = None,
credential_provider: Optional[CredentialProvider] = None,
protocol: Optional[int] = 2,
cache: Optional[CacheInterface] = None,
cache_config: Optional[CacheConfig] = None,
event_dispatcher: Optional[EventDispatcher] = None
): ...
@classmethod
async def from_url(
cls,
url: str,
**kwargs
) -> "Redis": ...
async def close(self) -> None: ...
async def ping(self, **kwargs) -> Union[bytes, bool]: ...
async def execute_command(self, *args, **options) -> Any: ...Asynchronous Redis string operations for non-blocking string manipulation.
async def set(
self,
name: KeyT,
value: EncodableT,
ex: Optional[ExpiryT] = None,
px: Optional[int] = None,
nx: bool = False,
xx: bool = False,
keepttl: bool = False,
get: bool = False,
exat: Optional[int] = None,
pxat: Optional[int] = None
) -> Optional[bool]: ...
async def get(self, name: KeyT) -> Optional[bytes]: ...
async def mget(self, keys: List[KeyT], *args: KeyT) -> List[Optional[bytes]]: ...
async def mset(self, mapping: Dict[KeyT, EncodableT]) -> bool: ...
async def incr(self, name: KeyT, amount: int = 1) -> int: ...
async def decr(self, name: KeyT, amount: int = 1) -> int: ...Asynchronous pipeline for batching multiple commands with non-blocking execution.
def pipeline(self, transaction: bool = True, shard_hint: Optional[str] = None) -> "Pipeline": ...
class Pipeline:
async def execute(self, raise_on_error: bool = True) -> List[Any]: ...
def reset(self) -> None: ...
async def watch(self, *names: KeyT) -> bool: ...
def multi(self) -> "Pipeline": ...
def discard(self) -> None: ...Asynchronous publish/subscribe messaging with async iteration and non-blocking message handling.
def pubsub(self, **kwargs) -> "PubSub": ...
class PubSub:
async def subscribe(self, *args, **kwargs) -> None: ...
async def unsubscribe(self, *args) -> None: ...
async def psubscribe(self, *args, **kwargs) -> None: ...
async def punsubscribe(self, *args) -> None: ...
def listen(self) -> AsyncIterator[Dict[str, Any]]: ...
async def get_message(
self,
ignore_subscribe_messages: bool = False,
timeout: Optional[float] = 0.0
) -> Optional[Dict[str, Any]]: ...
async def close(self) -> None: ...Asynchronous Redis Cluster client for non-blocking distributed operations.
class RedisCluster:
def __init__(
self,
host: Optional[str] = None,
port: int = 7000,
startup_nodes: Optional[List[ClusterNode]] = None,
cluster_error_retry_attempts: int = 3,
require_full_coverage: bool = True,
skip_full_coverage_check: bool = False,
reinitialize_steps: int = 10,
read_from_replicas: bool = False,
dynamic_startup_nodes: bool = True,
connection_pool_class: Type[ConnectionPool] = ConnectionPool,
**kwargs
): ...
@classmethod
async def from_url(
cls,
url: str,
**kwargs
) -> "RedisCluster": ...
async def close(self) -> None: ...
def pipeline(self, transaction: bool = False) -> "ClusterPipeline": ...Asynchronous connection pools and connection classes for non-blocking connection management.
class ConnectionPool:
async def get_connection(self, command_name: str, **kwargs) -> Connection: ...
async def make_connection(self) -> Connection: ...
def release(self, connection: Connection) -> None: ...
async def disconnect(self, inuse_connections: bool = True) -> None: ...
class Connection:
async def connect(self) -> None: ...
async def disconnect(self) -> None: ...
async def send_command(self, *args) -> None: ...
async def read_response(self) -> Any: ...import asyncio
import redis.asyncio as redis
async def main():
# Create async Redis client
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
try:
# Basic async operations
await r.set('key', 'value')
value = await r.get('key')
print(f"Value: {value}")
# Multiple operations
await r.mset({'key1': 'value1', 'key2': 'value2'})
values = await r.mget(['key1', 'key2'])
print(f"Values: {values}")
finally:
await r.close()
# Run the async function
asyncio.run(main())import redis.asyncio as redis
async def main():
# Using async context manager (automatically closes)
async with redis.Redis(host='localhost', port=6379) as r:
await r.set('session:123', 'session_data')
data = await r.get('session:123')
print(f"Session data: {data}")
asyncio.run(main())import redis.asyncio as redis
async def main():
async with redis.Redis(host='localhost', port=6379) as r:
# Create async pipeline
pipe = r.pipeline()
# Queue commands
pipe.set('user:1001', 'John')
pipe.set('user:1002', 'Jane')
pipe.get('user:1001')
pipe.get('user:1002')
pipe.incr('page_views')
# Execute all commands async
results = await pipe.execute()
print(f"Pipeline results: {results}")
asyncio.run(main())import asyncio
import redis.asyncio as redis
async def publisher():
"""Publish messages to a channel"""
async with redis.Redis(host='localhost', port=6379) as r:
for i in range(10):
await r.publish('notifications', f'Message {i}')
await asyncio.sleep(1)
async def subscriber():
"""Subscribe and listen for messages"""
async with redis.Redis(host='localhost', port=6379) as r:
pubsub = r.pubsub()
await pubsub.subscribe('notifications')
try:
# Async iteration over messages
async for message in pubsub.listen():
if message['type'] == 'message':
print(f"Received: {message['data']}")
finally:
await pubsub.close()
async def main():
# Run publisher and subscriber concurrently
await asyncio.gather(
publisher(),
subscriber()
)
asyncio.run(main())import redis.asyncio as redis
async def main():
async with redis.Redis(host='localhost', port=6379) as r:
pubsub = r.pubsub()
await pubsub.subscribe('chat_room')
try:
while True:
# Get message with timeout
message = await pubsub.get_message(timeout=1.0)
if message is None:
print("No message received")
continue
if message['type'] == 'message':
print(f"Chat message: {message['data']}")
except KeyboardInterrupt:
print("Stopping subscriber")
finally:
await pubsub.close()
asyncio.run(main())import redis.asyncio as redis
from redis.asyncio.cluster import RedisCluster, ClusterNode
async def main():
# Create async cluster client
startup_nodes = [
ClusterNode("localhost", 7000),
ClusterNode("localhost", 7001),
ClusterNode("localhost", 7002)
]
cluster = RedisCluster(startup_nodes=startup_nodes, decode_responses=True)
try:
# Cluster operations
await cluster.set("user:1001", "John")
user = await cluster.get("user:1001")
print(f"User: {user}")
# Cluster info
info = await cluster.cluster_info()
print(f"Cluster state: {info['cluster_state']}")
finally:
await cluster.close()
asyncio.run(main())import redis.asyncio as redis
async def main():
# Create client from URL
r = await redis.from_url('redis://localhost:6379/0', decode_responses=True)
try:
await r.set('url_key', 'url_value')
value = await r.get('url_key')
print(f"Value from URL client: {value}")
finally:
await r.close()
asyncio.run(main())import asyncio
import redis.asyncio as redis
async def worker(worker_id, r):
"""Worker function for concurrent operations"""
for i in range(5):
key = f"worker:{worker_id}:task:{i}"
await r.set(key, f"data_{i}")
value = await r.get(key)
print(f"Worker {worker_id} - Task {i}: {value}")
await asyncio.sleep(0.1)
async def main():
async with redis.Redis(host='localhost', port=6379) as r:
# Run multiple workers concurrently
tasks = [worker(i, r) for i in range(3)]
await asyncio.gather(*tasks)
asyncio.run(main())import redis.asyncio as redis
from redis.exceptions import ConnectionError, TimeoutError
async def main():
try:
# Attempt connection with timeout
r = redis.Redis(
host='unreachable-host',
port=6379,
socket_connect_timeout=5,
socket_timeout=2
)
await r.ping()
except ConnectionError as e:
print(f"Connection failed: {e}")
except TimeoutError as e:
print(f"Operation timed out: {e}")
finally:
if 'r' in locals():
await r.close()
asyncio.run(main())import redis.asyncio as redis
async def main():
async with redis.Redis(host='localhost', port=6379) as r:
# Initialize counter
await r.set('counter', 0)
# Transaction with watch
pipe = r.pipeline()
try:
# Watch the counter key
await pipe.watch('counter')
# Get current value
current_value = await r.get('counter')
current_value = int(current_value) if current_value else 0
# Start transaction
pipe.multi()
pipe.set('counter', current_value + 1)
# Execute transaction
result = await pipe.execute()
print(f"Counter incremented: {result}")
except redis.WatchError:
print("Counter was modified during transaction")
asyncio.run(main())Install with Tessl CLI
npx tessl i tessl/pypi-redis