0
# Async Support
1
2
Redis async support provides full asynchronous Redis client functionality using Python's asyncio library. The async client offers the same API as the synchronous client with async/await patterns for non-blocking operations.
3
4
```python
5
import asyncio
6
import ssl
7
from typing import TYPE_CHECKING, Optional, Union, List, Dict, Set, Tuple, Callable, Mapping, Type
8
9
from redis.asyncio import Redis, ConnectionPool
10
from redis.credentials import CredentialProvider
11
from redis.retry import Retry
12
from redis.backoff import ExponentialWithJitterBackoff
13
from redis.cache import CacheInterface, CacheConfig
14
from redis.event import EventDispatcher
15
16
# Type checking imports
17
if TYPE_CHECKING:
18
import OpenSSL
19
```
20
21
## Capabilities
22
23
### Async Redis Client
24
25
Asynchronous Redis client for non-blocking operations with identical API to synchronous client.
26
27
```python { .api }
28
class Redis:
29
def __init__(
30
self,
31
host: str = "localhost",
32
port: int = 6379,
33
db: int = 0,
34
password: Optional[str] = None,
35
socket_timeout: Optional[float] = None,
36
socket_connect_timeout: Optional[float] = None,
37
socket_keepalive: Optional[bool] = None,
38
socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
39
connection_pool: Optional[ConnectionPool] = None,
40
unix_socket_path: Optional[str] = None,
41
encoding: str = "utf-8",
42
encoding_errors: str = "strict",
43
decode_responses: bool = False,
44
retry_on_timeout: bool = False,
45
retry: Retry = Retry(
46
backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3
47
),
48
retry_on_error: Optional[List[Type[Exception]]] = None,
49
ssl: bool = False,
50
ssl_keyfile: Optional[str] = None,
51
ssl_certfile: Optional[str] = None,
52
ssl_cert_reqs: Union[str, "ssl.VerifyMode"] = "required",
53
ssl_ca_certs: Optional[str] = None,
54
ssl_ca_path: Optional[str] = None,
55
ssl_ca_data: Optional[str] = None,
56
ssl_check_hostname: bool = True,
57
ssl_password: Optional[str] = None,
58
ssl_validate_ocsp: bool = False,
59
ssl_validate_ocsp_stapled: bool = False,
60
ssl_ocsp_context: Optional["OpenSSL.SSL.Context"] = None,
61
ssl_ocsp_expected_cert: Optional[str] = None,
62
ssl_min_version: Optional["ssl.TLSVersion"] = None,
63
ssl_ciphers: Optional[str] = None,
64
max_connections: Optional[int] = None,
65
single_connection_client: bool = False,
66
health_check_interval: int = 0,
67
client_name: Optional[str] = None,
68
lib_name: Optional[str] = "redis-py",
69
lib_version: Optional[str] = None,
70
username: Optional[str] = None,
71
retry_on_timeout: bool = False,
72
retry_on_error: Optional[List[Type[Exception]]] = None,
73
redis_connect_func: Optional[Callable[[], None]] = None,
74
credential_provider: Optional[CredentialProvider] = None,
75
protocol: Optional[int] = 2,
76
cache: Optional[CacheInterface] = None,
77
cache_config: Optional[CacheConfig] = None,
78
event_dispatcher: Optional[EventDispatcher] = None
79
): ...
80
81
@classmethod
82
async def from_url(
83
cls,
84
url: str,
85
**kwargs
86
) -> "Redis": ...
87
88
async def close(self) -> None: ...
89
90
async def ping(self, **kwargs) -> Union[bytes, bool]: ...
91
92
async def execute_command(self, *args, **options) -> Any: ...
93
```
94
95
### Async String Operations
96
97
Asynchronous Redis string operations for non-blocking string manipulation.
98
99
```python { .api }
100
async def set(
101
self,
102
name: KeyT,
103
value: EncodableT,
104
ex: Optional[ExpiryT] = None,
105
px: Optional[int] = None,
106
nx: bool = False,
107
xx: bool = False,
108
keepttl: bool = False,
109
get: bool = False,
110
exat: Optional[int] = None,
111
pxat: Optional[int] = None
112
) -> Optional[bool]: ...
113
114
async def get(self, name: KeyT) -> Optional[bytes]: ...
115
116
async def mget(self, keys: List[KeyT], *args: KeyT) -> List[Optional[bytes]]: ...
117
118
async def mset(self, mapping: Dict[KeyT, EncodableT]) -> bool: ...
119
120
async def incr(self, name: KeyT, amount: int = 1) -> int: ...
121
122
async def decr(self, name: KeyT, amount: int = 1) -> int: ...
123
```
124
125
### Async Pipeline
126
127
Asynchronous pipeline for batching multiple commands with non-blocking execution.
128
129
```python { .api }
130
def pipeline(self, transaction: bool = True, shard_hint: Optional[str] = None) -> "Pipeline": ...
131
132
class Pipeline:
133
async def execute(self, raise_on_error: bool = True) -> List[Any]: ...
134
135
def reset(self) -> None: ...
136
137
async def watch(self, *names: KeyT) -> bool: ...
138
139
def multi(self) -> "Pipeline": ...
140
141
def discard(self) -> None: ...
142
```
143
144
### Async Pub/Sub
145
146
Asynchronous publish/subscribe messaging with async iteration and non-blocking message handling.
147
148
```python { .api }
149
def pubsub(self, **kwargs) -> "PubSub": ...
150
151
class PubSub:
152
async def subscribe(self, *args, **kwargs) -> None: ...
153
154
async def unsubscribe(self, *args) -> None: ...
155
156
async def psubscribe(self, *args, **kwargs) -> None: ...
157
158
async def punsubscribe(self, *args) -> None: ...
159
160
def listen(self) -> AsyncIterator[Dict[str, Any]]: ...
161
162
async def get_message(
163
self,
164
ignore_subscribe_messages: bool = False,
165
timeout: Optional[float] = 0.0
166
) -> Optional[Dict[str, Any]]: ...
167
168
async def close(self) -> None: ...
169
```
170
171
### Async Cluster Client
172
173
Asynchronous Redis Cluster client for non-blocking distributed operations.
174
175
```python { .api }
176
class RedisCluster:
177
def __init__(
178
self,
179
host: Optional[str] = None,
180
port: int = 7000,
181
startup_nodes: Optional[List[ClusterNode]] = None,
182
cluster_error_retry_attempts: int = 3,
183
require_full_coverage: bool = True,
184
skip_full_coverage_check: bool = False,
185
reinitialize_steps: int = 10,
186
read_from_replicas: bool = False,
187
dynamic_startup_nodes: bool = True,
188
connection_pool_class: Type[ConnectionPool] = ConnectionPool,
189
**kwargs
190
): ...
191
192
@classmethod
193
async def from_url(
194
cls,
195
url: str,
196
**kwargs
197
) -> "RedisCluster": ...
198
199
async def close(self) -> None: ...
200
201
def pipeline(self, transaction: bool = False) -> "ClusterPipeline": ...
202
```
203
204
### Async Connection Management
205
206
Asynchronous connection pools and connection classes for non-blocking connection management.
207
208
```python { .api }
209
class ConnectionPool:
210
async def get_connection(self, command_name: str, **kwargs) -> Connection: ...
211
212
async def make_connection(self) -> Connection: ...
213
214
def release(self, connection: Connection) -> None: ...
215
216
async def disconnect(self, inuse_connections: bool = True) -> None: ...
217
218
class Connection:
219
async def connect(self) -> None: ...
220
221
async def disconnect(self) -> None: ...
222
223
async def send_command(self, *args) -> None: ...
224
225
async def read_response(self) -> Any: ...
226
```
227
228
## Usage Examples
229
230
### Basic Async Operations
231
232
```python
233
import asyncio
234
import redis.asyncio as redis
235
236
async def main():
237
# Create async Redis client
238
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
239
240
try:
241
# Basic async operations
242
await r.set('key', 'value')
243
value = await r.get('key')
244
print(f"Value: {value}")
245
246
# Multiple operations
247
await r.mset({'key1': 'value1', 'key2': 'value2'})
248
values = await r.mget(['key1', 'key2'])
249
print(f"Values: {values}")
250
251
finally:
252
await r.close()
253
254
# Run the async function
255
asyncio.run(main())
256
```
257
258
### Async with Context Manager
259
260
```python
261
import redis.asyncio as redis
262
263
async def main():
264
# Using async context manager (automatically closes)
265
async with redis.Redis(host='localhost', port=6379) as r:
266
await r.set('session:123', 'session_data')
267
data = await r.get('session:123')
268
print(f"Session data: {data}")
269
270
asyncio.run(main())
271
```
272
273
### Async Pipeline Operations
274
275
```python
276
import redis.asyncio as redis
277
278
async def main():
279
async with redis.Redis(host='localhost', port=6379) as r:
280
# Create async pipeline
281
pipe = r.pipeline()
282
283
# Queue commands
284
pipe.set('user:1001', 'John')
285
pipe.set('user:1002', 'Jane')
286
pipe.get('user:1001')
287
pipe.get('user:1002')
288
pipe.incr('page_views')
289
290
# Execute all commands async
291
results = await pipe.execute()
292
print(f"Pipeline results: {results}")
293
294
asyncio.run(main())
295
```
296
297
### Async Pub/Sub
298
299
```python
300
import asyncio
301
import redis.asyncio as redis
302
303
async def publisher():
304
"""Publish messages to a channel"""
305
async with redis.Redis(host='localhost', port=6379) as r:
306
for i in range(10):
307
await r.publish('notifications', f'Message {i}')
308
await asyncio.sleep(1)
309
310
async def subscriber():
311
"""Subscribe and listen for messages"""
312
async with redis.Redis(host='localhost', port=6379) as r:
313
pubsub = r.pubsub()
314
await pubsub.subscribe('notifications')
315
316
try:
317
# Async iteration over messages
318
async for message in pubsub.listen():
319
if message['type'] == 'message':
320
print(f"Received: {message['data']}")
321
finally:
322
await pubsub.close()
323
324
async def main():
325
# Run publisher and subscriber concurrently
326
await asyncio.gather(
327
publisher(),
328
subscriber()
329
)
330
331
asyncio.run(main())
332
```
333
334
### Async Pub/Sub with Manual Message Handling
335
336
```python
337
import redis.asyncio as redis
338
339
async def main():
340
async with redis.Redis(host='localhost', port=6379) as r:
341
pubsub = r.pubsub()
342
await pubsub.subscribe('chat_room')
343
344
try:
345
while True:
346
# Get message with timeout
347
message = await pubsub.get_message(timeout=1.0)
348
349
if message is None:
350
print("No message received")
351
continue
352
353
if message['type'] == 'message':
354
print(f"Chat message: {message['data']}")
355
356
except KeyboardInterrupt:
357
print("Stopping subscriber")
358
finally:
359
await pubsub.close()
360
361
asyncio.run(main())
362
```
363
364
### Async Cluster Operations
365
366
```python
367
import redis.asyncio as redis
368
from redis.asyncio.cluster import RedisCluster, ClusterNode
369
370
async def main():
371
# Create async cluster client
372
startup_nodes = [
373
ClusterNode("localhost", 7000),
374
ClusterNode("localhost", 7001),
375
ClusterNode("localhost", 7002)
376
]
377
378
cluster = RedisCluster(startup_nodes=startup_nodes, decode_responses=True)
379
380
try:
381
# Cluster operations
382
await cluster.set("user:1001", "John")
383
user = await cluster.get("user:1001")
384
print(f"User: {user}")
385
386
# Cluster info
387
info = await cluster.cluster_info()
388
print(f"Cluster state: {info['cluster_state']}")
389
390
finally:
391
await cluster.close()
392
393
asyncio.run(main())
394
```
395
396
### Async Connection from URL
397
398
```python
399
import redis.asyncio as redis
400
401
async def main():
402
# Create client from URL
403
r = await redis.from_url('redis://localhost:6379/0', decode_responses=True)
404
405
try:
406
await r.set('url_key', 'url_value')
407
value = await r.get('url_key')
408
print(f"Value from URL client: {value}")
409
finally:
410
await r.close()
411
412
asyncio.run(main())
413
```
414
415
### Concurrent Operations
416
417
```python
418
import asyncio
419
import redis.asyncio as redis
420
421
async def worker(worker_id, r):
422
"""Worker function for concurrent operations"""
423
for i in range(5):
424
key = f"worker:{worker_id}:task:{i}"
425
await r.set(key, f"data_{i}")
426
value = await r.get(key)
427
print(f"Worker {worker_id} - Task {i}: {value}")
428
await asyncio.sleep(0.1)
429
430
async def main():
431
async with redis.Redis(host='localhost', port=6379) as r:
432
# Run multiple workers concurrently
433
tasks = [worker(i, r) for i in range(3)]
434
await asyncio.gather(*tasks)
435
436
asyncio.run(main())
437
```
438
439
### Async Error Handling
440
441
```python
442
import redis.asyncio as redis
443
from redis.exceptions import ConnectionError, TimeoutError
444
445
async def main():
446
try:
447
# Attempt connection with timeout
448
r = redis.Redis(
449
host='unreachable-host',
450
port=6379,
451
socket_connect_timeout=5,
452
socket_timeout=2
453
)
454
455
await r.ping()
456
457
except ConnectionError as e:
458
print(f"Connection failed: {e}")
459
except TimeoutError as e:
460
print(f"Operation timed out: {e}")
461
finally:
462
if 'r' in locals():
463
await r.close()
464
465
asyncio.run(main())
466
```
467
468
### Async Transaction with Watch
469
470
```python
471
import redis.asyncio as redis
472
473
async def main():
474
async with redis.Redis(host='localhost', port=6379) as r:
475
# Initialize counter
476
await r.set('counter', 0)
477
478
# Transaction with watch
479
pipe = r.pipeline()
480
481
try:
482
# Watch the counter key
483
await pipe.watch('counter')
484
485
# Get current value
486
current_value = await r.get('counter')
487
current_value = int(current_value) if current_value else 0
488
489
# Start transaction
490
pipe.multi()
491
pipe.set('counter', current_value + 1)
492
493
# Execute transaction
494
result = await pipe.execute()
495
print(f"Counter incremented: {result}")
496
497
except redis.WatchError:
498
print("Counter was modified during transaction")
499
500
asyncio.run(main())
501
```