or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-features.mdbasic-operations.mdconnection-management.mddata-structures.mdindex.mdserver-admin.md

connection-management.mddocs/

0

# Connection Management

1

2

Connection and pooling functionality for managing Redis connections efficiently. This includes URL-based configuration, SSL connections, Unix domain sockets, connection pooling strategies, and connection lifecycle management for optimal performance and reliability.

3

4

## Capabilities

5

6

### Redis Client Creation

7

8

Multiple ways to create Redis client instances with flexible configuration options.

9

10

```python { .api }

11

class Redis:

12

"""Main Redis client class with async/await support."""

13

14

def __init__(

15

self,

16

*,

17

host: str = "localhost",

18

port: int = 6379,

19

db: Union[str, int] = 0,

20

password: Optional[str] = None,

21

socket_timeout: Optional[float] = None,

22

socket_connect_timeout: Optional[float] = None,

23

socket_keepalive: Optional[bool] = None,

24

socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,

25

connection_pool: Optional[ConnectionPool] = None,

26

unix_socket_path: Optional[str] = None,

27

encoding: str = "utf-8",

28

encoding_errors: str = "strict",

29

decode_responses: bool = False,

30

retry_on_timeout: bool = False,

31

ssl: bool = False,

32

ssl_keyfile: Optional[str] = None,

33

ssl_certfile: Optional[str] = None,

34

ssl_cert_reqs: str = "required",

35

ssl_ca_certs: Optional[str] = None,

36

ssl_check_hostname: bool = False,

37

max_connections: Optional[int] = None,

38

single_connection_client: bool = False,

39

health_check_interval: int = 0,

40

client_name: Optional[str] = None,

41

username: Optional[str] = None,

42

):

43

"""

44

Initialize Redis client.

45

46

Args:

47

host: Redis server hostname

48

port: Redis server port

49

db: Database number (0-15)

50

password: Authentication password

51

socket_timeout: Socket operation timeout in seconds

52

socket_connect_timeout: Socket connection timeout in seconds

53

socket_keepalive: Enable TCP keepalive (None uses system default)

54

socket_keepalive_options: TCP keepalive options

55

connection_pool: Custom connection pool

56

unix_socket_path: Unix domain socket path

57

encoding: String encoding for responses

58

encoding_errors: Encoding error handling

59

decode_responses: Decode byte responses to strings

60

retry_on_timeout: Retry commands on timeout

61

ssl: Enable SSL/TLS encryption

62

ssl_keyfile: SSL private key file path

63

ssl_certfile: SSL certificate file path

64

ssl_cert_reqs: SSL certificate requirements

65

ssl_ca_certs: SSL CA certificates file path

66

ssl_check_hostname: Verify SSL hostname

67

max_connections: Maximum pool connections

68

single_connection_client: Use single persistent connection

69

health_check_interval: Health check interval in seconds

70

client_name: Client identification name

71

username: Authentication username (Redis 6+)

72

"""

73

74

@classmethod

75

def from_url(cls, url: str, **kwargs) -> "Redis":

76

"""

77

Create Redis client from connection URL.

78

79

Args:

80

url: Redis connection URL (redis://[[username]:password@]host[:port][/database])

81

kwargs: Additional connection parameters

82

83

Returns:

84

Configured Redis client instance

85

"""

86

87

async def ping(self) -> bool:

88

"""

89

Test connection to Redis server.

90

91

Returns:

92

True if server responds to ping

93

"""

94

95

async def echo(self, value: str) -> str:

96

"""

97

Echo message back from server.

98

99

Args:

100

value: Message to echo

101

102

Returns:

103

Echoed message

104

"""

105

106

async def aclose(self) -> None:

107

"""Close all connections and cleanup resources."""

108

109

def close(self) -> None:

110

"""Synchronous connection cleanup (use aclose() in async context)."""

111

112

def from_url(url: str, **kwargs) -> Redis:

113

"""

114

Create Redis client from connection URL.

115

116

Args:

117

url: Redis connection URL

118

kwargs: Additional connection parameters

119

120

Returns:

121

Configured Redis client instance

122

"""

123

```

124

125

### Connection Pool Management

126

127

Connection pooling for efficient connection reuse and resource management.

128

129

```python { .api }

130

class ConnectionPool:

131

"""Connection pool for managing Redis connections."""

132

133

def __init__(

134

self,

135

connection_class: Type[Connection] = Connection,

136

max_connections: int = 50,

137

**connection_kwargs

138

):

139

"""

140

Initialize connection pool.

141

142

Args:

143

connection_class: Connection class to use

144

max_connections: Maximum number of connections

145

connection_kwargs: Arguments for connection creation

146

"""

147

148

@classmethod

149

def from_url(cls, url: str, **kwargs) -> "ConnectionPool":

150

"""

151

Create connection pool from URL.

152

153

Args:

154

url: Redis connection URL

155

kwargs: Additional pool parameters

156

157

Returns:

158

Configured connection pool

159

"""

160

161

async def get_connection(self, command_name: str, *keys, **options) -> Connection:

162

"""

163

Get connection from pool.

164

165

Args:

166

command_name: Redis command being executed

167

keys: Command keys (for sharding)

168

options: Command options

169

170

Returns:

171

Available connection instance

172

"""

173

174

def make_connection(self) -> Connection:

175

"""

176

Create new connection instance.

177

178

Returns:

179

New connection (not from pool)

180

"""

181

182

async def release(self, connection: Connection) -> None:

183

"""

184

Return connection to pool.

185

186

Args:

187

connection: Connection to return

188

"""

189

190

async def disconnect(self, inuse_connections: bool = True) -> None:

191

"""

192

Disconnect all pool connections.

193

194

Args:

195

inuse_connections: Whether to disconnect active connections

196

"""

197

198

def reset(self) -> None:

199

"""Reset pool state, clearing all connections."""

200

201

def get_encoder(self) -> Encoder:

202

"""

203

Get connection encoder instance.

204

205

Returns:

206

Encoder for data serialization

207

"""

208

209

def owns_connection(self, connection: Connection) -> bool:

210

"""

211

Check if pool owns connection.

212

213

Args:

214

connection: Connection to check

215

216

Returns:

217

True if connection belongs to this pool

218

"""

219

220

class BlockingConnectionPool(ConnectionPool):

221

"""Connection pool that blocks when max connections reached."""

222

223

def __init__(

224

self,

225

max_connections: int = 50,

226

timeout: float = 20,

227

**connection_kwargs

228

):

229

"""

230

Initialize blocking connection pool.

231

232

Args:

233

max_connections: Maximum number of connections

234

timeout: Timeout when waiting for available connection

235

connection_kwargs: Arguments for connection creation

236

"""

237

```

238

239

### Connection Types

240

241

Different connection implementations for various network configurations.

242

243

```python { .api }

244

class Connection:

245

"""Basic TCP connection to Redis server."""

246

247

def __init__(

248

self,

249

*,

250

host: str = "localhost",

251

port: Union[str, int] = 6379,

252

db: Union[str, int] = 0,

253

password: Optional[str] = None,

254

socket_timeout: Optional[float] = None,

255

socket_connect_timeout: Optional[float] = None,

256

socket_keepalive: bool = False,

257

socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,

258

socket_type: int = 0,

259

retry_on_timeout: bool = False,

260

encoding: str = "utf-8",

261

encoding_errors: str = "strict",

262

decode_responses: bool = False,

263

parser_class: Type[BaseParser] = DefaultParser,

264

socket_read_size: int = 65536,

265

health_check_interval: float = 0,

266

client_name: Optional[str] = None,

267

username: Optional[str] = None,

268

encoder_class: Type[Encoder] = Encoder,

269

):

270

"""

271

Initialize TCP connection.

272

273

Args:

274

host: Redis server hostname

275

port: Redis server port

276

db: Database number

277

password: Authentication password

278

socket_timeout: Socket operation timeout

279

socket_connect_timeout: Connection establishment timeout

280

socket_keepalive: Enable TCP keepalive

281

socket_keepalive_options: Keepalive configuration

282

socket_type: Socket type

283

retry_on_timeout: Retry operations on timeout

284

encoding: Response encoding

285

encoding_errors: Encoding error handling

286

decode_responses: Decode responses to strings

287

parser_class: Protocol parser

288

socket_read_size: Read buffer size

289

health_check_interval: Health check frequency

290

client_name: Client name for identification

291

username: Authentication username

292

encoder_class: Data encoder class

293

"""

294

295

async def connect(self) -> None:

296

"""Establish connection to Redis server."""

297

298

async def disconnect(self) -> None:

299

"""Close connection to Redis server."""

300

301

async def send_command(self, *args, **kwargs) -> Any:

302

"""

303

Send command to Redis server.

304

305

Args:

306

args: Command arguments

307

kwargs: Command options

308

309

Returns:

310

Command response

311

"""

312

313

async def send_packed_command(self, command: bytes, check_health: bool = True) -> None:

314

"""

315

Send pre-packed command to server.

316

317

Args:

318

command: Packed command bytes

319

check_health: Check connection health before sending

320

"""

321

322

async def can_read(self, timeout: float = 0) -> bool:

323

"""

324

Check if data is available to read.

325

326

Args:

327

timeout: Timeout for check in seconds

328

329

Returns:

330

True if data is available

331

"""

332

333

async def read_response(self) -> Any:

334

"""

335

Read and parse response from server.

336

337

Returns:

338

Parsed response data

339

"""

340

341

def pack_command(self, *args) -> List[bytes]:

342

"""

343

Pack command arguments for transmission.

344

345

Args:

346

args: Command arguments

347

348

Returns:

349

List of packed command parts

350

"""

351

352

def pack_commands(self, commands: List[Tuple]) -> List[bytes]:

353

"""

354

Pack multiple commands for pipeline transmission.

355

356

Args:

357

commands: List of command tuples

358

359

Returns:

360

List of packed command bytes

361

"""

362

363

async def check_health(self) -> None:

364

"""Check and maintain connection health."""

365

366

def register_connect_callback(self, callback: Callable) -> None:

367

"""

368

Register callback for connection events.

369

370

Args:

371

callback: Function to call on connection

372

"""

373

374

def clear_connect_callbacks(self) -> None:

375

"""Clear all connection callbacks."""

376

377

@property

378

def is_connected(self) -> bool:

379

"""

380

Whether connection is currently active.

381

382

Returns:

383

True if connected

384

"""

385

386

class SSLConnection(Connection):

387

"""SSL-enabled TCP connection to Redis server."""

388

389

def __init__(

390

self,

391

ssl_keyfile: Optional[str] = None,

392

ssl_certfile: Optional[str] = None,

393

ssl_cert_reqs: str = "required",

394

ssl_ca_certs: Optional[str] = None,

395

ssl_check_hostname: bool = False,

396

**kwargs

397

):

398

"""

399

Initialize SSL connection.

400

401

Args:

402

ssl_keyfile: Path to SSL private key file

403

ssl_certfile: Path to SSL certificate file

404

ssl_cert_reqs: Certificate requirement level

405

ssl_ca_certs: Path to CA certificates file

406

ssl_check_hostname: Verify hostname against certificate

407

kwargs: Additional connection parameters

408

"""

409

410

class UnixDomainSocketConnection(Connection):

411

"""Unix domain socket connection to Redis server."""

412

413

def __init__(self, path: str = "/var/run/redis/redis.sock", **kwargs):

414

"""

415

Initialize Unix socket connection.

416

417

Args:

418

path: Path to Redis Unix socket

419

kwargs: Additional connection parameters

420

"""

421

```

422

423

### Protocol Parsers and Encoders

424

425

Low-level protocol handling for Redis communication.

426

427

```python { .api }

428

class BaseParser:

429

"""Base class for Redis protocol parsers."""

430

431

def on_connect(self, connection: Connection) -> None:

432

"""

433

Handle connection establishment.

434

435

Args:

436

connection: Active connection instance

437

"""

438

439

def on_disconnect(self) -> None:

440

"""Handle connection termination."""

441

442

async def can_read(self, timeout: float) -> bool:

443

"""

444

Check if data is available for reading.

445

446

Args:

447

timeout: Timeout in seconds

448

449

Returns:

450

True if data is available

451

"""

452

453

async def read_response(self) -> Any:

454

"""

455

Read and parse Redis protocol response.

456

457

Returns:

458

Parsed response value

459

"""

460

461

class PythonParser(BaseParser):

462

"""Pure Python implementation of Redis protocol parser."""

463

464

class HiredisParser(BaseParser):

465

"""Hiredis-based parser for improved performance."""

466

467

class Encoder:

468

"""Handles encoding and decoding of Redis data."""

469

470

def encode(self, value: Any) -> bytes:

471

"""

472

Encode value for Redis transmission.

473

474

Args:

475

value: Value to encode

476

477

Returns:

478

Encoded bytes

479

"""

480

481

def decode(self, value: bytes, force: bool = False) -> Any:

482

"""

483

Decode Redis response value.

484

485

Args:

486

value: Response bytes to decode

487

force: Force decoding even if decode_responses=False

488

489

Returns:

490

Decoded value

491

"""

492

```

493

494

### Sentinel Support

495

496

High availability Redis setup with automatic failover using Redis Sentinel.

497

498

```python { .api }

499

class Sentinel:

500

"""Redis Sentinel client for high availability."""

501

502

def __init__(

503

self,

504

sentinels: List[Tuple[str, int]],

505

socket_timeout: float = 0.1,

506

**kwargs

507

):

508

"""

509

Initialize Sentinel client.

510

511

Args:

512

sentinels: List of (host, port) tuples for sentinel servers

513

socket_timeout: Socket timeout for sentinel connections

514

kwargs: Additional connection parameters

515

"""

516

517

def master_for(

518

self,

519

service_name: str,

520

redis_class: Type[Redis] = Redis,

521

**kwargs

522

) -> Redis:

523

"""

524

Get Redis client for master server.

525

526

Args:

527

service_name: Name of Redis service in Sentinel config

528

redis_class: Redis client class to use

529

kwargs: Additional client parameters

530

531

Returns:

532

Redis client connected to current master

533

"""

534

535

def slave_for(

536

self,

537

service_name: str,

538

redis_class: Type[Redis] = Redis,

539

**kwargs

540

) -> Redis:

541

"""

542

Get Redis client for slave server.

543

544

Args:

545

service_name: Name of Redis service in Sentinel config

546

redis_class: Redis client class to use

547

kwargs: Additional client parameters

548

549

Returns:

550

Redis client connected to a slave

551

"""

552

553

async def discover_master(self, service_name: str) -> Tuple[str, int]:

554

"""

555

Discover master server address.

556

557

Args:

558

service_name: Service name to discover

559

560

Returns:

561

Tuple of (host, port) for master server

562

"""

563

564

async def discover_slaves(self, service_name: str) -> List[Tuple[str, int]]:

565

"""

566

Discover slave server addresses.

567

568

Args:

569

service_name: Service name to discover

570

571

Returns:

572

List of (host, port) tuples for slave servers

573

"""

574

575

class SentinelConnectionPool(ConnectionPool):

576

"""Connection pool managed by Redis Sentinel."""

577

578

def __init__(

579

self,

580

service_name: str,

581

sentinel_manager: Sentinel,

582

**kwargs

583

):

584

"""

585

Initialize Sentinel-managed connection pool.

586

587

Args:

588

service_name: Redis service name

589

sentinel_manager: Sentinel client instance

590

kwargs: Additional pool parameters

591

"""

592

593

class SentinelManagedConnection(Connection):

594

"""Connection managed by Redis Sentinel with automatic failover."""

595

```

596

597

## Usage Examples

598

599

### Basic Connection Setup

600

601

```python

602

async def basic_connection_examples():

603

# Simple connection

604

redis = aioredis.Redis(

605

host='localhost',

606

port=6379,

607

db=0,

608

decode_responses=True

609

)

610

611

# Test connection

612

pong = await redis.ping()

613

print(f"Connected: {pong}") # True

614

615

# Connection with authentication

616

redis_auth = aioredis.Redis(

617

host='redis.example.com',

618

port=6380,

619

password='secretpassword',

620

username='myuser', # Redis 6+

621

db=1

622

)

623

624

# Connection with timeouts

625

redis_timeouts = aioredis.Redis(

626

host='localhost',

627

port=6379,

628

socket_connect_timeout=5, # 5 seconds to connect

629

socket_timeout=3, # 3 seconds per operation

630

retry_on_timeout=True

631

)

632

633

# SSL connection

634

redis_ssl = aioredis.Redis(

635

host='secure-redis.example.com',

636

port=6380,

637

ssl=True,

638

ssl_certfile='/path/to/client.crt',

639

ssl_keyfile='/path/to/client.key',

640

ssl_ca_certs='/path/to/ca.crt'

641

)

642

643

# Unix socket connection

644

redis_unix = aioredis.Redis(

645

unix_socket_path='/var/run/redis/redis.sock',

646

db=0

647

)

648

649

await redis.aclose()

650

```

651

652

### URL-Based Configuration

653

654

```python

655

async def url_connection_examples():

656

# Basic URL

657

redis = aioredis.from_url("redis://localhost:6379/0")

658

659

# URL with authentication

660

redis_auth = aioredis.from_url(

661

"redis://myuser:mypassword@redis.example.com:6380/1"

662

)

663

664

# SSL URL

665

redis_ssl = aioredis.from_url(

666

"rediss://user:pass@secure-redis.com:6380/0",

667

ssl_cert_reqs="required",

668

ssl_ca_certs="/path/to/ca.crt"

669

)

670

671

# Unix socket URL

672

redis_unix = aioredis.from_url(

673

"unix:///var/run/redis/redis.sock?db=0"

674

)

675

676

# URL with additional parameters

677

redis_custom = aioredis.from_url(

678

"redis://localhost:6379/0",

679

socket_timeout=5,

680

socket_connect_timeout=10,

681

decode_responses=True,

682

health_check_interval=30

683

)

684

685

await redis.aclose()

686

```

687

688

### Connection Pooling

689

690

```python

691

async def connection_pool_examples():

692

# Create custom connection pool

693

pool = aioredis.ConnectionPool(

694

host='localhost',

695

port=6379,

696

max_connections=100,

697

retry_on_timeout=True,

698

socket_keepalive=True

699

)

700

701

# Use pool with Redis client

702

redis = aioredis.Redis(connection_pool=pool)

703

704

# Blocking pool with timeout

705

blocking_pool = aioredis.BlockingConnectionPool(

706

host='localhost',

707

port=6379,

708

max_connections=20,

709

timeout=10 # Wait up to 10 seconds for available connection

710

)

711

712

redis_blocking = aioredis.Redis(connection_pool=blocking_pool)

713

714

# Pool from URL

715

pool_from_url = aioredis.ConnectionPool.from_url(

716

"redis://localhost:6379/0",

717

max_connections=50

718

)

719

720

redis_url_pool = aioredis.Redis(connection_pool=pool_from_url)

721

722

# Multiple clients sharing pool

723

redis1 = aioredis.Redis(connection_pool=pool)

724

redis2 = aioredis.Redis(connection_pool=pool)

725

redis3 = aioredis.Redis(connection_pool=pool)

726

727

# Perform operations (connections automatically managed)

728

await redis1.set('key1', 'value1')

729

await redis2.set('key2', 'value2')

730

await redis3.set('key3', 'value3')

731

732

# Cleanup

733

await pool.disconnect()

734

await blocking_pool.disconnect()

735

await pool_from_url.disconnect()

736

```

737

738

### High Availability with Sentinel

739

740

```python

741

async def sentinel_examples():

742

# Configure Sentinel

743

sentinel = aioredis.Sentinel([

744

('sentinel1.example.com', 26379),

745

('sentinel2.example.com', 26379),

746

('sentinel3.example.com', 26379)

747

])

748

749

# Get master client

750

master = sentinel.master_for('mymaster', socket_timeout=0.1)

751

752

# Get slave client for read operations

753

slave = sentinel.slave_for('mymaster', socket_timeout=0.1)

754

755

# Write to master

756

await master.set('key', 'value')

757

758

# Read from slave (may have replication lag)

759

value = await slave.get('key')

760

761

# Sentinel can automatically handle failover

762

# If master goes down, Sentinel will promote a slave

763

764

# Manual master discovery

765

master_address = await sentinel.discover_master('mymaster')

766

print(f"Current master: {master_address}")

767

768

slave_addresses = await sentinel.discover_slaves('mymaster')

769

print(f"Available slaves: {slave_addresses}")

770

771

await master.aclose()

772

await slave.aclose()

773

```

774

775

### Connection Health and Monitoring

776

777

```python

778

async def connection_health_examples():

779

# Connection with health checking

780

redis = aioredis.Redis(

781

host='localhost',

782

port=6379,

783

health_check_interval=30, # Check every 30 seconds

784

socket_keepalive=True,

785

socket_keepalive_options={

786

1: 1, # TCP_KEEPIDLE

787

2: 3, # TCP_KEEPINTVL

788

3: 5 # TCP_KEEPCNT

789

}

790

)

791

792

# Manual health check

793

try:

794

await redis.ping()

795

print("Connection healthy")

796

except aioredis.ConnectionError:

797

print("Connection failed")

798

799

# Echo test

800

echo_result = await redis.echo("Hello Redis")

801

print(f"Echo: {echo_result}")

802

803

# Connection info (if supported)

804

try:

805

client_info = await redis.client_list()

806

print(f"Connected clients: {len(client_info)}")

807

except Exception:

808

pass

809

810

await redis.aclose()

811

```

812

813

### Error Handling and Reconnection

814

815

```python

816

async def error_handling_examples():

817

redis = aioredis.Redis(

818

host='localhost',

819

port=6379,

820

retry_on_timeout=True,

821

socket_timeout=5

822

)

823

824

async def robust_operation(key, value):

825

max_retries = 3

826

retry_count = 0

827

828

while retry_count < max_retries:

829

try:

830

await redis.set(key, value)

831

print(f"Successfully set {key} = {value}")

832

return

833

834

except aioredis.TimeoutError:

835

retry_count += 1

836

print(f"Timeout, retry {retry_count}/{max_retries}")

837

if retry_count < max_retries:

838

await asyncio.sleep(1)

839

else:

840

raise

841

842

except aioredis.ConnectionError as e:

843

print(f"Connection error: {e}")

844

# Connection will be automatically recreated on next operation

845

retry_count += 1

846

if retry_count < max_retries:

847

await asyncio.sleep(2)

848

else:

849

raise

850

851

# Test robust operation

852

try:

853

await robust_operation('test_key', 'test_value')

854

except Exception as e:

855

print(f"Operation failed after retries: {e}")

856

857

await redis.aclose()

858

```

859

860

### Performance Optimization

861

862

```python

863

async def performance_optimization_examples():

864

# Optimized connection settings

865

redis = aioredis.Redis(

866

host='localhost',

867

port=6379,

868

decode_responses=True,

869

socket_read_size=65536, # 64KB read buffer

870

socket_keepalive=True,

871

health_check_interval=0, # Disable health checks for max performance

872

)

873

874

# Use connection pooling for high concurrency

875

pool = aioredis.ConnectionPool(

876

host='localhost',

877

port=6379,

878

max_connections=100, # Tune based on load

879

socket_read_size=131072, # 128KB for high throughput

880

)

881

882

redis_pooled = aioredis.Redis(connection_pool=pool)

883

884

# Single connection client for sequential operations

885

redis_single = aioredis.Redis(

886

host='localhost',

887

port=6379,

888

single_connection_client=True # Reuse single connection

889

)

890

891

# Benchmark different configurations

892

import time

893

894

async def benchmark_operations(client, name, count=1000):

895

start_time = time.time()

896

897

for i in range(count):

898

await client.set(f'benchmark:{name}:{i}', f'value_{i}')

899

900

elapsed = time.time() - start_time

901

print(f"{name}: {count} operations in {elapsed:.2f}s ({count/elapsed:.0f} ops/sec)")

902

903

await benchmark_operations(redis, "default", 1000)

904

await benchmark_operations(redis_pooled, "pooled", 1000)

905

await benchmark_operations(redis_single, "single_conn", 1000)

906

907

await redis.aclose()

908

await pool.disconnect()

909

await redis_single.aclose()

910

```