or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-support.mdcluster-support.mdconnection-management.mdcore-client.mddistributed-locking.mderror-handling.mdhigh-availability.mdindex.mdpipelines-transactions.mdpubsub-messaging.md

async-support.mddocs/

0

# Async Support

1

2

Redis async support provides full asynchronous Redis client functionality using Python's asyncio library. The async client offers the same API as the synchronous client with async/await patterns for non-blocking operations.

3

4

```python

5

import asyncio

6

import ssl

7

from typing import TYPE_CHECKING, Optional, Union, List, Dict, Set, Tuple, Callable, Mapping, Type

8

9

from redis.asyncio import Redis, ConnectionPool

10

from redis.credentials import CredentialProvider

11

from redis.retry import Retry

12

from redis.backoff import ExponentialWithJitterBackoff

13

from redis.cache import CacheInterface, CacheConfig

14

from redis.event import EventDispatcher

15

16

# Type checking imports

17

if TYPE_CHECKING:

18

import OpenSSL

19

```

20

21

## Capabilities

22

23

### Async Redis Client

24

25

Asynchronous Redis client for non-blocking operations with identical API to synchronous client.

26

27

```python { .api }

28

class Redis:

29

def __init__(

30

self,

31

host: str = "localhost",

32

port: int = 6379,

33

db: int = 0,

34

password: Optional[str] = None,

35

socket_timeout: Optional[float] = None,

36

socket_connect_timeout: Optional[float] = None,

37

socket_keepalive: Optional[bool] = None,

38

socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,

39

connection_pool: Optional[ConnectionPool] = None,

40

unix_socket_path: Optional[str] = None,

41

encoding: str = "utf-8",

42

encoding_errors: str = "strict",

43

decode_responses: bool = False,

44

retry_on_timeout: bool = False,

45

retry: Retry = Retry(

46

backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3

47

),

48

retry_on_error: Optional[List[Type[Exception]]] = None,

49

ssl: bool = False,

50

ssl_keyfile: Optional[str] = None,

51

ssl_certfile: Optional[str] = None,

52

ssl_cert_reqs: Union[str, "ssl.VerifyMode"] = "required",

53

ssl_ca_certs: Optional[str] = None,

54

ssl_ca_path: Optional[str] = None,

55

ssl_ca_data: Optional[str] = None,

56

ssl_check_hostname: bool = True,

57

ssl_password: Optional[str] = None,

58

ssl_validate_ocsp: bool = False,

59

ssl_validate_ocsp_stapled: bool = False,

60

ssl_ocsp_context: Optional["OpenSSL.SSL.Context"] = None,

61

ssl_ocsp_expected_cert: Optional[str] = None,

62

ssl_min_version: Optional["ssl.TLSVersion"] = None,

63

ssl_ciphers: Optional[str] = None,

64

max_connections: Optional[int] = None,

65

single_connection_client: bool = False,

66

health_check_interval: int = 0,

67

client_name: Optional[str] = None,

68

lib_name: Optional[str] = "redis-py",

69

lib_version: Optional[str] = None,

70

username: Optional[str] = None,

71

retry_on_timeout: bool = False,

72

retry_on_error: Optional[List[Type[Exception]]] = None,

73

redis_connect_func: Optional[Callable[[], None]] = None,

74

credential_provider: Optional[CredentialProvider] = None,

75

protocol: Optional[int] = 2,

76

cache: Optional[CacheInterface] = None,

77

cache_config: Optional[CacheConfig] = None,

78

event_dispatcher: Optional[EventDispatcher] = None

79

): ...

80

81

@classmethod

82

async def from_url(

83

cls,

84

url: str,

85

**kwargs

86

) -> "Redis": ...

87

88

async def close(self) -> None: ...

89

90

async def ping(self, **kwargs) -> Union[bytes, bool]: ...

91

92

async def execute_command(self, *args, **options) -> Any: ...

93

```

94

95

### Async String Operations

96

97

Asynchronous Redis string operations for non-blocking string manipulation.

98

99

```python { .api }

100

async def set(

101

self,

102

name: KeyT,

103

value: EncodableT,

104

ex: Optional[ExpiryT] = None,

105

px: Optional[int] = None,

106

nx: bool = False,

107

xx: bool = False,

108

keepttl: bool = False,

109

get: bool = False,

110

exat: Optional[int] = None,

111

pxat: Optional[int] = None

112

) -> Optional[bool]: ...

113

114

async def get(self, name: KeyT) -> Optional[bytes]: ...

115

116

async def mget(self, keys: List[KeyT], *args: KeyT) -> List[Optional[bytes]]: ...

117

118

async def mset(self, mapping: Dict[KeyT, EncodableT]) -> bool: ...

119

120

async def incr(self, name: KeyT, amount: int = 1) -> int: ...

121

122

async def decr(self, name: KeyT, amount: int = 1) -> int: ...

123

```

124

125

### Async Pipeline

126

127

Asynchronous pipeline for batching multiple commands with non-blocking execution.

128

129

```python { .api }

130

def pipeline(self, transaction: bool = True, shard_hint: Optional[str] = None) -> "Pipeline": ...

131

132

class Pipeline:

133

async def execute(self, raise_on_error: bool = True) -> List[Any]: ...

134

135

def reset(self) -> None: ...

136

137

async def watch(self, *names: KeyT) -> bool: ...

138

139

def multi(self) -> "Pipeline": ...

140

141

def discard(self) -> None: ...

142

```

143

144

### Async Pub/Sub

145

146

Asynchronous publish/subscribe messaging with async iteration and non-blocking message handling.

147

148

```python { .api }

149

def pubsub(self, **kwargs) -> "PubSub": ...

150

151

class PubSub:

152

async def subscribe(self, *args, **kwargs) -> None: ...

153

154

async def unsubscribe(self, *args) -> None: ...

155

156

async def psubscribe(self, *args, **kwargs) -> None: ...

157

158

async def punsubscribe(self, *args) -> None: ...

159

160

def listen(self) -> AsyncIterator[Dict[str, Any]]: ...

161

162

async def get_message(

163

self,

164

ignore_subscribe_messages: bool = False,

165

timeout: Optional[float] = 0.0

166

) -> Optional[Dict[str, Any]]: ...

167

168

async def close(self) -> None: ...

169

```

170

171

### Async Cluster Client

172

173

Asynchronous Redis Cluster client for non-blocking distributed operations.

174

175

```python { .api }

176

class RedisCluster:

177

def __init__(

178

self,

179

host: Optional[str] = None,

180

port: int = 7000,

181

startup_nodes: Optional[List[ClusterNode]] = None,

182

cluster_error_retry_attempts: int = 3,

183

require_full_coverage: bool = True,

184

skip_full_coverage_check: bool = False,

185

reinitialize_steps: int = 10,

186

read_from_replicas: bool = False,

187

dynamic_startup_nodes: bool = True,

188

connection_pool_class: Type[ConnectionPool] = ConnectionPool,

189

**kwargs

190

): ...

191

192

@classmethod

193

async def from_url(

194

cls,

195

url: str,

196

**kwargs

197

) -> "RedisCluster": ...

198

199

async def close(self) -> None: ...

200

201

def pipeline(self, transaction: bool = False) -> "ClusterPipeline": ...

202

```

203

204

### Async Connection Management

205

206

Asynchronous connection pools and connection classes for non-blocking connection management.

207

208

```python { .api }

209

class ConnectionPool:

210

async def get_connection(self, command_name: str, **kwargs) -> Connection: ...

211

212

async def make_connection(self) -> Connection: ...

213

214

def release(self, connection: Connection) -> None: ...

215

216

async def disconnect(self, inuse_connections: bool = True) -> None: ...

217

218

class Connection:

219

async def connect(self) -> None: ...

220

221

async def disconnect(self) -> None: ...

222

223

async def send_command(self, *args) -> None: ...

224

225

async def read_response(self) -> Any: ...

226

```

227

228

## Usage Examples

229

230

### Basic Async Operations

231

232

```python

233

import asyncio

234

import redis.asyncio as redis

235

236

async def main():

237

# Create async Redis client

238

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

239

240

try:

241

# Basic async operations

242

await r.set('key', 'value')

243

value = await r.get('key')

244

print(f"Value: {value}")

245

246

# Multiple operations

247

await r.mset({'key1': 'value1', 'key2': 'value2'})

248

values = await r.mget(['key1', 'key2'])

249

print(f"Values: {values}")

250

251

finally:

252

await r.close()

253

254

# Run the async function

255

asyncio.run(main())

256

```

257

258

### Async with Context Manager

259

260

```python

261

import redis.asyncio as redis

262

263

async def main():

264

# Using async context manager (automatically closes)

265

async with redis.Redis(host='localhost', port=6379) as r:

266

await r.set('session:123', 'session_data')

267

data = await r.get('session:123')

268

print(f"Session data: {data}")

269

270

asyncio.run(main())

271

```

272

273

### Async Pipeline Operations

274

275

```python

276

import redis.asyncio as redis

277

278

async def main():

279

async with redis.Redis(host='localhost', port=6379) as r:

280

# Create async pipeline

281

pipe = r.pipeline()

282

283

# Queue commands

284

pipe.set('user:1001', 'John')

285

pipe.set('user:1002', 'Jane')

286

pipe.get('user:1001')

287

pipe.get('user:1002')

288

pipe.incr('page_views')

289

290

# Execute all commands async

291

results = await pipe.execute()

292

print(f"Pipeline results: {results}")

293

294

asyncio.run(main())

295

```

296

297

### Async Pub/Sub

298

299

```python

300

import asyncio

301

import redis.asyncio as redis

302

303

async def publisher():

304

"""Publish messages to a channel"""

305

async with redis.Redis(host='localhost', port=6379) as r:

306

for i in range(10):

307

await r.publish('notifications', f'Message {i}')

308

await asyncio.sleep(1)

309

310

async def subscriber():

311

"""Subscribe and listen for messages"""

312

async with redis.Redis(host='localhost', port=6379) as r:

313

pubsub = r.pubsub()

314

await pubsub.subscribe('notifications')

315

316

try:

317

# Async iteration over messages

318

async for message in pubsub.listen():

319

if message['type'] == 'message':

320

print(f"Received: {message['data']}")

321

finally:

322

await pubsub.close()

323

324

async def main():

325

# Run publisher and subscriber concurrently

326

await asyncio.gather(

327

publisher(),

328

subscriber()

329

)

330

331

asyncio.run(main())

332

```

333

334

### Async Pub/Sub with Manual Message Handling

335

336

```python

337

import redis.asyncio as redis

338

339

async def main():

340

async with redis.Redis(host='localhost', port=6379) as r:

341

pubsub = r.pubsub()

342

await pubsub.subscribe('chat_room')

343

344

try:

345

while True:

346

# Get message with timeout

347

message = await pubsub.get_message(timeout=1.0)

348

349

if message is None:

350

print("No message received")

351

continue

352

353

if message['type'] == 'message':

354

print(f"Chat message: {message['data']}")

355

356

except KeyboardInterrupt:

357

print("Stopping subscriber")

358

finally:

359

await pubsub.close()

360

361

asyncio.run(main())

362

```

363

364

### Async Cluster Operations

365

366

```python

367

import redis.asyncio as redis

368

from redis.asyncio.cluster import RedisCluster, ClusterNode

369

370

async def main():

371

# Create async cluster client

372

startup_nodes = [

373

ClusterNode("localhost", 7000),

374

ClusterNode("localhost", 7001),

375

ClusterNode("localhost", 7002)

376

]

377

378

cluster = RedisCluster(startup_nodes=startup_nodes, decode_responses=True)

379

380

try:

381

# Cluster operations

382

await cluster.set("user:1001", "John")

383

user = await cluster.get("user:1001")

384

print(f"User: {user}")

385

386

# Cluster info

387

info = await cluster.cluster_info()

388

print(f"Cluster state: {info['cluster_state']}")

389

390

finally:

391

await cluster.close()

392

393

asyncio.run(main())

394

```

395

396

### Async Connection from URL

397

398

```python

399

import redis.asyncio as redis

400

401

async def main():

402

# Create client from URL

403

r = await redis.from_url('redis://localhost:6379/0', decode_responses=True)

404

405

try:

406

await r.set('url_key', 'url_value')

407

value = await r.get('url_key')

408

print(f"Value from URL client: {value}")

409

finally:

410

await r.close()

411

412

asyncio.run(main())

413

```

414

415

### Concurrent Operations

416

417

```python

418

import asyncio

419

import redis.asyncio as redis

420

421

async def worker(worker_id, r):

422

"""Worker function for concurrent operations"""

423

for i in range(5):

424

key = f"worker:{worker_id}:task:{i}"

425

await r.set(key, f"data_{i}")

426

value = await r.get(key)

427

print(f"Worker {worker_id} - Task {i}: {value}")

428

await asyncio.sleep(0.1)

429

430

async def main():

431

async with redis.Redis(host='localhost', port=6379) as r:

432

# Run multiple workers concurrently

433

tasks = [worker(i, r) for i in range(3)]

434

await asyncio.gather(*tasks)

435

436

asyncio.run(main())

437

```

438

439

### Async Error Handling

440

441

```python

442

import redis.asyncio as redis

443

from redis.exceptions import ConnectionError, TimeoutError

444

445

async def main():

446

try:

447

# Attempt connection with timeout

448

r = redis.Redis(

449

host='unreachable-host',

450

port=6379,

451

socket_connect_timeout=5,

452

socket_timeout=2

453

)

454

455

await r.ping()

456

457

except ConnectionError as e:

458

print(f"Connection failed: {e}")

459

except TimeoutError as e:

460

print(f"Operation timed out: {e}")

461

finally:

462

if 'r' in locals():

463

await r.close()

464

465

asyncio.run(main())

466

```

467

468

### Async Transaction with Watch

469

470

```python

471

import redis.asyncio as redis

472

473

async def main():

474

async with redis.Redis(host='localhost', port=6379) as r:

475

# Initialize counter

476

await r.set('counter', 0)

477

478

# Transaction with watch

479

pipe = r.pipeline()

480

481

try:

482

# Watch the counter key

483

await pipe.watch('counter')

484

485

# Get current value

486

current_value = await r.get('counter')

487

current_value = int(current_value) if current_value else 0

488

489

# Start transaction

490

pipe.multi()

491

pipe.set('counter', current_value + 1)

492

493

# Execute transaction

494

result = await pipe.execute()

495

print(f"Counter incremented: {result}")

496

497

except redis.WatchError:

498

print("Counter was modified during transaction")

499

500

asyncio.run(main())

501

```