or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-features.mdbasic-operations.mdconnection-management.mddata-structures.mdindex.mdserver-admin.md

advanced-features.mddocs/

0

# Advanced Features

1

2

Advanced Redis functionality including pub/sub messaging, Redis Streams consumer groups, transactions, pipelines, Lua scripting, distributed locking, and batch operations for building high-performance and scalable applications.

3

4

## Capabilities

5

6

### Pipelines and Transactions

7

8

Batch command execution for improved performance and atomic operations with optimistic locking support.

9

10

```python { .api }

11

def pipeline(transaction: bool = True, shard_hint: Any = None) -> Pipeline:

12

"""

13

Create a pipeline for batching commands.

14

15

Args:

16

transaction: Whether to execute as transaction (with MULTI/EXEC)

17

shard_hint: Hint for sharding (cluster mode)

18

19

Returns:

20

Pipeline object for command batching

21

"""

22

23

class Pipeline:

24

"""Pipeline for batching Redis commands."""

25

26

def multi(self) -> None:

27

"""Start transaction block."""

28

29

async def execute(self) -> List[Any]:

30

"""

31

Execute all pipelined commands.

32

33

Returns:

34

List of command results in order

35

"""

36

37

def reset(self) -> None:

38

"""Reset pipeline, clearing all commands."""

39

40

def watch(*keys: str) -> None:

41

"""

42

Watch keys for changes during transaction.

43

44

Args:

45

keys: Key names to watch

46

"""

47

48

def unwatch(self) -> None:

49

"""Unwatch all keys."""

50

51

async def watch(*keys: str) -> None:

52

"""

53

Watch keys for optimistic locking.

54

55

Args:

56

keys: Key names to watch for changes

57

"""

58

59

async def unwatch(self) -> None:

60

"""Unwatch all keys."""

61

```

62

63

### Pub/Sub Messaging

64

65

Real-time messaging system for building event-driven applications with channel subscriptions and pattern matching.

66

67

```python { .api }

68

def pubsub() -> PubSub:

69

"""

70

Create pub/sub client for messaging.

71

72

Returns:

73

PubSub client instance

74

"""

75

76

class PubSub:

77

"""Pub/Sub client for real-time messaging."""

78

79

async def subscribe(*channels: str, **pattern_handlers: Callable) -> None:

80

"""

81

Subscribe to channels.

82

83

Args:

84

channels: Channel names to subscribe to

85

pattern_handlers: Channel name to handler function mapping

86

"""

87

88

async def psubscribe(*patterns: str, **pattern_handlers: Callable) -> None:

89

"""

90

Subscribe to channel patterns.

91

92

Args:

93

patterns: Channel patterns to subscribe to (* and ? wildcards)

94

pattern_handlers: Pattern to handler function mapping

95

"""

96

97

async def unsubscribe(*channels: str) -> None:

98

"""

99

Unsubscribe from channels.

100

101

Args:

102

channels: Channel names to unsubscribe from

103

"""

104

105

async def punsubscribe(*patterns: str) -> None:

106

"""

107

Unsubscribe from channel patterns.

108

109

Args:

110

patterns: Channel patterns to unsubscribe from

111

"""

112

113

async def get_message(ignore_subscribe_messages: bool = False, timeout: float = 0) -> Optional[Dict]:

114

"""

115

Get next message.

116

117

Args:

118

ignore_subscribe_messages: Skip subscription confirmations

119

timeout: Timeout in seconds (0 for non-blocking)

120

121

Returns:

122

Message dictionary or None

123

"""

124

125

def listen() -> AsyncIterator[Dict]:

126

"""

127

Listen for messages asynchronously.

128

129

Returns:

130

Async iterator of message dictionaries

131

"""

132

133

async def close(self) -> None:

134

"""Close pub/sub client and cleanup resources."""

135

136

async def publish(channel: str, message: str) -> int:

137

"""

138

Publish message to channel.

139

140

Args:

141

channel: Channel name

142

message: Message content

143

144

Returns:

145

Number of clients that received the message

146

"""

147

148

async def pubsub_channels(pattern: str = "*") -> List[str]:

149

"""

150

Get active pub/sub channels.

151

152

Args:

153

pattern: Channel pattern filter

154

155

Returns:

156

List of active channel names

157

"""

158

159

async def pubsub_numsub(*channels: str) -> Dict[str, int]:

160

"""

161

Get channel subscriber counts.

162

163

Args:

164

channels: Channel names

165

166

Returns:

167

Dictionary mapping channel names to subscriber counts

168

"""

169

```

170

171

### Lua Scripting

172

173

Execute custom Lua scripts on the Redis server for atomic operations and complex data processing.

174

175

```python { .api }

176

async def eval(script: str, numkeys: int, *keys_and_args: Any) -> Any:

177

"""

178

Execute Lua script.

179

180

Args:

181

script: Lua script code

182

numkeys: Number of keys in script arguments

183

keys_and_args: Keys followed by arguments for script

184

185

Returns:

186

Script return value

187

"""

188

189

async def evalsha(sha: str, numkeys: int, *keys_and_args: Any) -> Any:

190

"""

191

Execute Lua script by SHA hash.

192

193

Args:

194

sha: SHA1 hash of previously loaded script

195

numkeys: Number of keys in script arguments

196

keys_and_args: Keys followed by arguments for script

197

198

Returns:

199

Script return value

200

"""

201

202

async def script_load(script: str) -> str:

203

"""

204

Load Lua script and return SHA hash.

205

206

Args:

207

script: Lua script code

208

209

Returns:

210

SHA1 hash of loaded script

211

"""

212

213

async def script_exists(*shas: str) -> List[bool]:

214

"""

215

Check if scripts exist by SHA hash.

216

217

Args:

218

shas: SHA1 hashes to check

219

220

Returns:

221

List of boolean values indicating existence

222

"""

223

224

async def script_flush(self) -> bool:

225

"""

226

Remove all cached scripts.

227

228

Returns:

229

True if successful

230

"""

231

232

def register_script(script: str) -> Script:

233

"""

234

Register Lua script for reuse.

235

236

Args:

237

script: Lua script code

238

239

Returns:

240

Script object that can be called

241

"""

242

243

class Script:

244

"""Registered Lua script wrapper."""

245

246

async def __call__(self, keys: List[str] = None, args: List[Any] = None, client: Optional['Redis'] = None) -> Any:

247

"""

248

Execute the registered script.

249

250

Args:

251

keys: Keys for script execution

252

args: Arguments for script execution

253

client: Redis client to use (defaults to registration client)

254

255

Returns:

256

Script return value

257

"""

258

```

259

260

### Distributed Locking

261

262

Distributed locking mechanism for coordinating access to shared resources across multiple processes or servers.

263

264

```python { .api }

265

def lock(

266

name: str,

267

timeout: Optional[float] = None,

268

sleep: float = 0.1,

269

blocking: bool = True,

270

blocking_timeout: Optional[float] = None,

271

thread_local: bool = True

272

) -> Lock:

273

"""

274

Create distributed lock.

275

276

Args:

277

name: Lock name/key

278

timeout: Lock expiration timeout in seconds

279

sleep: Sleep interval when waiting for lock

280

blocking: Whether to block when acquiring

281

blocking_timeout: Maximum time to block for acquisition

282

thread_local: Whether lock is thread-local

283

284

Returns:

285

Lock instance

286

"""

287

288

class Lock:

289

"""Distributed lock implementation."""

290

291

async def acquire(

292

self,

293

blocking: Optional[bool] = None,

294

blocking_timeout: Optional[float] = None,

295

token: Optional[str] = None

296

) -> bool:

297

"""

298

Acquire the lock.

299

300

Args:

301

blocking: Override blocking behavior

302

blocking_timeout: Override blocking timeout

303

token: Specific lock token to use

304

305

Returns:

306

True if lock was acquired

307

"""

308

309

async def release(self) -> bool:

310

"""

311

Release the lock.

312

313

Returns:

314

True if lock was released by this client

315

"""

316

317

async def extend(self, additional_time: float, replace_ttl: bool = False) -> bool:

318

"""

319

Extend lock timeout.

320

321

Args:

322

additional_time: Time to add to current timeout

323

replace_ttl: Replace TTL instead of extending

324

325

Returns:

326

True if lock was extended

327

"""

328

329

def locked(self) -> bool:

330

"""

331

Check if lock is currently held.

332

333

Returns:

334

True if lock is held

335

"""

336

337

def owned(self) -> bool:

338

"""

339

Check if lock is owned by this client.

340

341

Returns:

342

True if lock is owned by this client

343

"""

344

345

async def __aenter__(self) -> 'Lock':

346

"""Async context manager entry."""

347

await self.acquire()

348

return self

349

350

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:

351

"""Async context manager exit."""

352

await self.release()

353

```

354

355

### Monitoring

356

357

Monitor Redis server commands and operations in real-time for debugging and performance analysis.

358

359

```python { .api }

360

def monitor() -> Monitor:

361

"""

362

Create monitor client for watching commands.

363

364

Returns:

365

Monitor client instance

366

"""

367

368

class Monitor:

369

"""Monitor client for watching Redis commands."""

370

371

async def next_command(self) -> Dict[str, Any]:

372

"""

373

Get next monitored command.

374

375

Returns:

376

Dictionary with command details

377

"""

378

379

def monitor(self) -> AsyncIterator[Dict[str, Any]]:

380

"""

381

Monitor commands asynchronously.

382

383

Returns:

384

Async iterator of command dictionaries

385

"""

386

```

387

388

### Scanning Operations

389

390

Efficient iteration over large keyspaces and data structures using cursor-based scanning.

391

392

```python { .api }

393

async def scan(

394

cursor: int = 0,

395

match: Optional[str] = None,

396

count: Optional[int] = None,

397

_type: Optional[str] = None

398

) -> Tuple[int, List[str]]:

399

"""

400

Incrementally scan keyspace.

401

402

Args:

403

cursor: Scan cursor position (0 to start)

404

match: Pattern to match keys against

405

count: Approximate number of keys per iteration

406

_type: Filter by key type

407

408

Returns:

409

Tuple of (next_cursor, keys)

410

"""

411

412

async def sscan(

413

name: str,

414

cursor: int = 0,

415

match: Optional[str] = None,

416

count: Optional[int] = None

417

) -> Tuple[int, List[str]]:

418

"""

419

Incrementally scan set members.

420

421

Args:

422

name: Set key name

423

cursor: Scan cursor position

424

match: Pattern to match members against

425

count: Approximate number of members per iteration

426

427

Returns:

428

Tuple of (next_cursor, members)

429

"""

430

431

async def hscan(

432

name: str,

433

cursor: int = 0,

434

match: Optional[str] = None,

435

count: Optional[int] = None

436

) -> Tuple[int, Dict[str, str]]:

437

"""

438

Incrementally scan hash fields.

439

440

Args:

441

name: Hash key name

442

cursor: Scan cursor position

443

match: Pattern to match fields against

444

count: Approximate number of fields per iteration

445

446

Returns:

447

Tuple of (next_cursor, field_value_dict)

448

"""

449

450

async def zscan(

451

name: str,

452

cursor: int = 0,

453

match: Optional[str] = None,

454

count: Optional[int] = None,

455

score_cast_func: Callable = float

456

) -> Tuple[int, List[Tuple[str, float]]]:

457

"""

458

Incrementally scan sorted set members.

459

460

Args:

461

name: Sorted set key name

462

cursor: Scan cursor position

463

match: Pattern to match members against

464

count: Approximate number of members per iteration

465

score_cast_func: Function to convert scores

466

467

Returns:

468

Tuple of (next_cursor, member_score_pairs)

469

"""

470

```

471

472

### Sort Operations

473

474

Server-side sorting with support for external keys, patterns, and result storage.

475

476

```python { .api }

477

async def sort(

478

name: str,

479

start: Optional[int] = None,

480

num: Optional[int] = None,

481

by: Optional[str] = None,

482

get: Optional[List[str]] = None,

483

desc: bool = False,

484

alpha: bool = False,

485

store: Optional[str] = None,

486

groups: bool = False

487

) -> Union[List[str], int]:

488

"""

489

Sort and return or store list, set, or sorted set.

490

491

Args:

492

name: Key name to sort

493

start: Skip this many elements

494

num: Return this many elements

495

by: Sort by external key pattern

496

get: Retrieve values from external keys

497

desc: Sort in descending order

498

alpha: Sort lexicographically

499

store: Store result in this key

500

groups: Group returned values

501

502

Returns:

503

Sorted results or number of stored elements

504

"""

505

```

506

507

## Usage Examples

508

509

### Pipeline Operations

510

511

```python

512

async def pipeline_examples():

513

redis = aioredis.Redis(decode_responses=True)

514

515

# Basic pipeline (non-transactional)

516

pipe = redis.pipeline(transaction=False)

517

pipe.set('key1', 'value1')

518

pipe.set('key2', 'value2')

519

pipe.get('key1')

520

pipe.get('key2')

521

results = await pipe.execute()

522

print(results) # [True, True, 'value1', 'value2']

523

524

# Transactional pipeline

525

pipe = redis.pipeline(transaction=True)

526

pipe.multi()

527

pipe.incr('counter')

528

pipe.incr('counter')

529

pipe.get('counter')

530

results = await pipe.execute()

531

print(results) # [1, 2, '2']

532

533

# Optimistic locking

534

await redis.watch('balance')

535

balance = int(await redis.get('balance') or 0)

536

if balance >= 100:

537

pipe = redis.pipeline(transaction=True)

538

pipe.multi()

539

pipe.decrby('balance', 100)

540

pipe.incr('purchases')

541

await pipe.execute()

542

```

543

544

### Pub/Sub Messaging

545

546

```python

547

async def pubsub_examples():

548

redis = aioredis.Redis(decode_responses=True)

549

550

# Publisher

551

async def publisher():

552

for i in range(10):

553

await redis.publish('notifications', f'Message {i}')

554

await asyncio.sleep(1)

555

556

# Subscriber with message handler

557

async def subscriber():

558

pubsub = redis.pubsub()

559

await pubsub.subscribe('notifications')

560

561

try:

562

async for message in pubsub.listen():

563

if message['type'] == 'message':

564

print(f"Received: {message['data']}")

565

if message['data'] == 'STOP':

566

break

567

finally:

568

await pubsub.close()

569

570

# Pattern subscription

571

async def pattern_subscriber():

572

pubsub = redis.pubsub()

573

await pubsub.psubscribe('user:*:notifications')

574

575

try:

576

while True:

577

message = await pubsub.get_message(timeout=1)

578

if message and message['type'] == 'pmessage':

579

channel = message['channel']

580

data = message['data']

581

print(f"Pattern match {channel}: {data}")

582

except asyncio.TimeoutError:

583

pass

584

finally:

585

await pubsub.close()

586

587

# Run publisher and subscriber concurrently

588

await asyncio.gather(publisher(), subscriber())

589

```

590

591

### Lua Scripting

592

593

```python

594

async def lua_script_examples():

595

redis = aioredis.Redis(decode_responses=True)

596

597

# Simple script execution

598

script = """

599

local key = KEYS[1]

600

local increment = ARGV[1]

601

local current = redis.call('GET', key) or 0

602

local new_value = current + increment

603

redis.call('SET', key, new_value)

604

return new_value

605

"""

606

607

result = await redis.eval(script, 1, 'counter', 5)

608

print(f"Counter value: {result}")

609

610

# Register script for reuse

611

atomic_increment = redis.register_script("""

612

local key = KEYS[1]

613

local increment = tonumber(ARGV[1])

614

local max_value = tonumber(ARGV[2])

615

616

local current = tonumber(redis.call('GET', key) or 0)

617

if current + increment <= max_value then

618

local new_value = current + increment

619

redis.call('SET', key, new_value)

620

return new_value

621

else

622

return -1 -- Exceeded maximum

623

end

624

""")

625

626

# Use registered script

627

result = await atomic_increment(keys=['limited_counter'], args=[3, 100])

628

if result == -1:

629

print("Increment would exceed maximum")

630

else:

631

print(f"New value: {result}")

632

633

# Complex script with multiple operations

634

batch_update = redis.register_script("""

635

local user_key = 'user:' .. ARGV[1]

636

local points = tonumber(ARGV[2])

637

local level = tonumber(ARGV[3])

638

639

-- Update user data

640

redis.call('HSET', user_key, 'points', points)

641

redis.call('HSET', user_key, 'level', level)

642

redis.call('HSET', user_key, 'last_updated', ARGV[4])

643

644

-- Update leaderboard

645

redis.call('ZADD', 'leaderboard', points, ARGV[1])

646

647

-- Update level ranking

648

redis.call('ZADD', 'level:' .. level, points, ARGV[1])

649

650

return redis.call('ZRANK', 'leaderboard', ARGV[1])

651

""")

652

653

import time

654

rank = await batch_update(

655

keys=[],

656

args=['user123', 1500, 5, int(time.time())]

657

)

658

print(f"User rank: {rank}")

659

```

660

661

### Distributed Locking

662

663

```python

664

async def locking_examples():

665

redis = aioredis.Redis(decode_responses=True)

666

667

# Basic lock usage

668

lock = redis.lock('resource_lock', timeout=10)

669

670

try:

671

# Try to acquire lock

672

acquired = await lock.acquire(blocking_timeout=5)

673

if acquired:

674

print("Lock acquired, performing critical section")

675

await asyncio.sleep(2) # Simulate work

676

print("Work completed")

677

else:

678

print("Could not acquire lock")

679

finally:

680

if lock.owned():

681

await lock.release()

682

683

# Context manager usage

684

async with redis.lock('another_resource', timeout=30) as lock:

685

print("Automatically acquired lock")

686

await asyncio.sleep(1)

687

# Lock automatically released on exit

688

689

# Lock with extension

690

long_task_lock = redis.lock('long_task', timeout=10)

691

await long_task_lock.acquire()

692

693

try:

694

for i in range(5):

695

print(f"Working on step {i+1}")

696

await asyncio.sleep(3)

697

698

# Extend lock if needed

699

if i < 4: # Don't extend on last iteration

700

extended = await long_task_lock.extend(10)

701

if extended:

702

print("Lock extended")

703

else:

704

print("Could not extend lock")

705

break

706

finally:

707

await long_task_lock.release()

708

709

# Non-blocking lock attempt

710

quick_lock = redis.lock('quick_resource')

711

if await quick_lock.acquire(blocking=False):

712

try:

713

print("Got lock immediately")

714

# Quick operation

715

finally:

716

await quick_lock.release()

717

else:

718

print("Resource busy, skipping operation")

719

```

720

721

### Scanning Large Datasets

722

723

```python

724

async def scanning_examples():

725

redis = aioredis.Redis(decode_responses=True)

726

727

# Scan all keys with pattern

728

cursor = 0

729

user_keys = []

730

731

while True:

732

cursor, keys = await redis.scan(cursor, match='user:*', count=100)

733

user_keys.extend(keys)

734

735

if cursor == 0: # Scan complete

736

break

737

738

print(f"Found {len(user_keys)} user keys")

739

740

# Scan set members

741

cursor = 0

742

all_tags = []

743

744

while True:

745

cursor, members = await redis.sscan('all_tags', cursor, count=50)

746

all_tags.extend(members)

747

748

if cursor == 0:

749

break

750

751

# Scan hash fields

752

cursor = 0

753

config_items = {}

754

755

while True:

756

cursor, items = await redis.hscan('config', cursor, match='cache_*')

757

config_items.update(items)

758

759

if cursor == 0:

760

break

761

762

# Scan sorted set with score filtering

763

cursor = 0

764

high_scores = []

765

766

while True:

767

cursor, members = await redis.zscan(

768

'leaderboard',

769

cursor,

770

match='player*',

771

count=25

772

)

773

# Filter by score if needed

774

high_scores.extend([

775

(member, score) for member, score in members

776

if score > 1000

777

])

778

779

if cursor == 0:

780

break

781

782

print(f"Found {len(high_scores)} high-scoring players")

783

```