0
# Advanced Features
1
2
Advanced Redis functionality including pub/sub messaging, Redis Streams consumer groups, transactions, pipelines, Lua scripting, distributed locking, and batch operations for building high-performance and scalable applications.
3
4
## Capabilities
5
6
### Pipelines and Transactions
7
8
Batch command execution for improved performance and atomic operations with optimistic locking support.
9
10
```python { .api }
11
def pipeline(transaction: bool = True, shard_hint: Any = None) -> Pipeline:
12
"""
13
Create a pipeline for batching commands.
14
15
Args:
16
transaction: Whether to execute as transaction (with MULTI/EXEC)
17
shard_hint: Hint for sharding (cluster mode)
18
19
Returns:
20
Pipeline object for command batching
21
"""
22
23
class Pipeline:
24
"""Pipeline for batching Redis commands."""
25
26
def multi(self) -> None:
27
"""Start transaction block."""
28
29
async def execute(self) -> List[Any]:
30
"""
31
Execute all pipelined commands.
32
33
Returns:
34
List of command results in order
35
"""
36
37
def reset(self) -> None:
38
"""Reset pipeline, clearing all commands."""
39
40
def watch(*keys: str) -> None:
41
"""
42
Watch keys for changes during transaction.
43
44
Args:
45
keys: Key names to watch
46
"""
47
48
def unwatch(self) -> None:
49
"""Unwatch all keys."""
50
51
async def watch(*keys: str) -> None:
52
"""
53
Watch keys for optimistic locking.
54
55
Args:
56
keys: Key names to watch for changes
57
"""
58
59
async def unwatch(self) -> None:
60
"""Unwatch all keys."""
61
```
62
63
### Pub/Sub Messaging
64
65
Real-time messaging system for building event-driven applications with channel subscriptions and pattern matching.
66
67
```python { .api }
68
def pubsub() -> PubSub:
69
"""
70
Create pub/sub client for messaging.
71
72
Returns:
73
PubSub client instance
74
"""
75
76
class PubSub:
77
"""Pub/Sub client for real-time messaging."""
78
79
async def subscribe(*channels: str, **pattern_handlers: Callable) -> None:
80
"""
81
Subscribe to channels.
82
83
Args:
84
channels: Channel names to subscribe to
85
pattern_handlers: Channel name to handler function mapping
86
"""
87
88
async def psubscribe(*patterns: str, **pattern_handlers: Callable) -> None:
89
"""
90
Subscribe to channel patterns.
91
92
Args:
93
patterns: Channel patterns to subscribe to (* and ? wildcards)
94
pattern_handlers: Pattern to handler function mapping
95
"""
96
97
async def unsubscribe(*channels: str) -> None:
98
"""
99
Unsubscribe from channels.
100
101
Args:
102
channels: Channel names to unsubscribe from
103
"""
104
105
async def punsubscribe(*patterns: str) -> None:
106
"""
107
Unsubscribe from channel patterns.
108
109
Args:
110
patterns: Channel patterns to unsubscribe from
111
"""
112
113
async def get_message(ignore_subscribe_messages: bool = False, timeout: float = 0) -> Optional[Dict]:
114
"""
115
Get next message.
116
117
Args:
118
ignore_subscribe_messages: Skip subscription confirmations
119
timeout: Timeout in seconds (0 for non-blocking)
120
121
Returns:
122
Message dictionary or None
123
"""
124
125
def listen() -> AsyncIterator[Dict]:
126
"""
127
Listen for messages asynchronously.
128
129
Returns:
130
Async iterator of message dictionaries
131
"""
132
133
async def close(self) -> None:
134
"""Close pub/sub client and cleanup resources."""
135
136
async def publish(channel: str, message: str) -> int:
137
"""
138
Publish message to channel.
139
140
Args:
141
channel: Channel name
142
message: Message content
143
144
Returns:
145
Number of clients that received the message
146
"""
147
148
async def pubsub_channels(pattern: str = "*") -> List[str]:
149
"""
150
Get active pub/sub channels.
151
152
Args:
153
pattern: Channel pattern filter
154
155
Returns:
156
List of active channel names
157
"""
158
159
async def pubsub_numsub(*channels: str) -> Dict[str, int]:
160
"""
161
Get channel subscriber counts.
162
163
Args:
164
channels: Channel names
165
166
Returns:
167
Dictionary mapping channel names to subscriber counts
168
"""
169
```
170
171
### Lua Scripting
172
173
Execute custom Lua scripts on the Redis server for atomic operations and complex data processing.
174
175
```python { .api }
176
async def eval(script: str, numkeys: int, *keys_and_args: Any) -> Any:
177
"""
178
Execute Lua script.
179
180
Args:
181
script: Lua script code
182
numkeys: Number of keys in script arguments
183
keys_and_args: Keys followed by arguments for script
184
185
Returns:
186
Script return value
187
"""
188
189
async def evalsha(sha: str, numkeys: int, *keys_and_args: Any) -> Any:
190
"""
191
Execute Lua script by SHA hash.
192
193
Args:
194
sha: SHA1 hash of previously loaded script
195
numkeys: Number of keys in script arguments
196
keys_and_args: Keys followed by arguments for script
197
198
Returns:
199
Script return value
200
"""
201
202
async def script_load(script: str) -> str:
203
"""
204
Load Lua script and return SHA hash.
205
206
Args:
207
script: Lua script code
208
209
Returns:
210
SHA1 hash of loaded script
211
"""
212
213
async def script_exists(*shas: str) -> List[bool]:
214
"""
215
Check if scripts exist by SHA hash.
216
217
Args:
218
shas: SHA1 hashes to check
219
220
Returns:
221
List of boolean values indicating existence
222
"""
223
224
async def script_flush(self) -> bool:
225
"""
226
Remove all cached scripts.
227
228
Returns:
229
True if successful
230
"""
231
232
def register_script(script: str) -> Script:
233
"""
234
Register Lua script for reuse.
235
236
Args:
237
script: Lua script code
238
239
Returns:
240
Script object that can be called
241
"""
242
243
class Script:
244
"""Registered Lua script wrapper."""
245
246
async def __call__(self, keys: List[str] = None, args: List[Any] = None, client: Optional['Redis'] = None) -> Any:
247
"""
248
Execute the registered script.
249
250
Args:
251
keys: Keys for script execution
252
args: Arguments for script execution
253
client: Redis client to use (defaults to registration client)
254
255
Returns:
256
Script return value
257
"""
258
```
259
260
### Distributed Locking
261
262
Distributed locking mechanism for coordinating access to shared resources across multiple processes or servers.
263
264
```python { .api }
265
def lock(
266
name: str,
267
timeout: Optional[float] = None,
268
sleep: float = 0.1,
269
blocking: bool = True,
270
blocking_timeout: Optional[float] = None,
271
thread_local: bool = True
272
) -> Lock:
273
"""
274
Create distributed lock.
275
276
Args:
277
name: Lock name/key
278
timeout: Lock expiration timeout in seconds
279
sleep: Sleep interval when waiting for lock
280
blocking: Whether to block when acquiring
281
blocking_timeout: Maximum time to block for acquisition
282
thread_local: Whether lock is thread-local
283
284
Returns:
285
Lock instance
286
"""
287
288
class Lock:
289
"""Distributed lock implementation."""
290
291
async def acquire(
292
self,
293
blocking: Optional[bool] = None,
294
blocking_timeout: Optional[float] = None,
295
token: Optional[str] = None
296
) -> bool:
297
"""
298
Acquire the lock.
299
300
Args:
301
blocking: Override blocking behavior
302
blocking_timeout: Override blocking timeout
303
token: Specific lock token to use
304
305
Returns:
306
True if lock was acquired
307
"""
308
309
async def release(self) -> bool:
310
"""
311
Release the lock.
312
313
Returns:
314
True if lock was released by this client
315
"""
316
317
async def extend(self, additional_time: float, replace_ttl: bool = False) -> bool:
318
"""
319
Extend lock timeout.
320
321
Args:
322
additional_time: Time to add to current timeout
323
replace_ttl: Replace TTL instead of extending
324
325
Returns:
326
True if lock was extended
327
"""
328
329
def locked(self) -> bool:
330
"""
331
Check if lock is currently held.
332
333
Returns:
334
True if lock is held
335
"""
336
337
def owned(self) -> bool:
338
"""
339
Check if lock is owned by this client.
340
341
Returns:
342
True if lock is owned by this client
343
"""
344
345
async def __aenter__(self) -> 'Lock':
346
"""Async context manager entry."""
347
await self.acquire()
348
return self
349
350
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
351
"""Async context manager exit."""
352
await self.release()
353
```
354
355
### Monitoring
356
357
Monitor Redis server commands and operations in real-time for debugging and performance analysis.
358
359
```python { .api }
360
def monitor() -> Monitor:
361
"""
362
Create monitor client for watching commands.
363
364
Returns:
365
Monitor client instance
366
"""
367
368
class Monitor:
369
"""Monitor client for watching Redis commands."""
370
371
async def next_command(self) -> Dict[str, Any]:
372
"""
373
Get next monitored command.
374
375
Returns:
376
Dictionary with command details
377
"""
378
379
def monitor(self) -> AsyncIterator[Dict[str, Any]]:
380
"""
381
Monitor commands asynchronously.
382
383
Returns:
384
Async iterator of command dictionaries
385
"""
386
```
387
388
### Scanning Operations
389
390
Efficient iteration over large keyspaces and data structures using cursor-based scanning.
391
392
```python { .api }
393
async def scan(
394
cursor: int = 0,
395
match: Optional[str] = None,
396
count: Optional[int] = None,
397
_type: Optional[str] = None
398
) -> Tuple[int, List[str]]:
399
"""
400
Incrementally scan keyspace.
401
402
Args:
403
cursor: Scan cursor position (0 to start)
404
match: Pattern to match keys against
405
count: Approximate number of keys per iteration
406
_type: Filter by key type
407
408
Returns:
409
Tuple of (next_cursor, keys)
410
"""
411
412
async def sscan(
413
name: str,
414
cursor: int = 0,
415
match: Optional[str] = None,
416
count: Optional[int] = None
417
) -> Tuple[int, List[str]]:
418
"""
419
Incrementally scan set members.
420
421
Args:
422
name: Set key name
423
cursor: Scan cursor position
424
match: Pattern to match members against
425
count: Approximate number of members per iteration
426
427
Returns:
428
Tuple of (next_cursor, members)
429
"""
430
431
async def hscan(
432
name: str,
433
cursor: int = 0,
434
match: Optional[str] = None,
435
count: Optional[int] = None
436
) -> Tuple[int, Dict[str, str]]:
437
"""
438
Incrementally scan hash fields.
439
440
Args:
441
name: Hash key name
442
cursor: Scan cursor position
443
match: Pattern to match fields against
444
count: Approximate number of fields per iteration
445
446
Returns:
447
Tuple of (next_cursor, field_value_dict)
448
"""
449
450
async def zscan(
451
name: str,
452
cursor: int = 0,
453
match: Optional[str] = None,
454
count: Optional[int] = None,
455
score_cast_func: Callable = float
456
) -> Tuple[int, List[Tuple[str, float]]]:
457
"""
458
Incrementally scan sorted set members.
459
460
Args:
461
name: Sorted set key name
462
cursor: Scan cursor position
463
match: Pattern to match members against
464
count: Approximate number of members per iteration
465
score_cast_func: Function to convert scores
466
467
Returns:
468
Tuple of (next_cursor, member_score_pairs)
469
"""
470
```
471
472
### Sort Operations
473
474
Server-side sorting with support for external keys, patterns, and result storage.
475
476
```python { .api }
477
async def sort(
478
name: str,
479
start: Optional[int] = None,
480
num: Optional[int] = None,
481
by: Optional[str] = None,
482
get: Optional[List[str]] = None,
483
desc: bool = False,
484
alpha: bool = False,
485
store: Optional[str] = None,
486
groups: bool = False
487
) -> Union[List[str], int]:
488
"""
489
Sort and return or store list, set, or sorted set.
490
491
Args:
492
name: Key name to sort
493
start: Skip this many elements
494
num: Return this many elements
495
by: Sort by external key pattern
496
get: Retrieve values from external keys
497
desc: Sort in descending order
498
alpha: Sort lexicographically
499
store: Store result in this key
500
groups: Group returned values
501
502
Returns:
503
Sorted results or number of stored elements
504
"""
505
```
506
507
## Usage Examples
508
509
### Pipeline Operations
510
511
```python
512
async def pipeline_examples():
513
redis = aioredis.Redis(decode_responses=True)
514
515
# Basic pipeline (non-transactional)
516
pipe = redis.pipeline(transaction=False)
517
pipe.set('key1', 'value1')
518
pipe.set('key2', 'value2')
519
pipe.get('key1')
520
pipe.get('key2')
521
results = await pipe.execute()
522
print(results) # [True, True, 'value1', 'value2']
523
524
# Transactional pipeline
525
pipe = redis.pipeline(transaction=True)
526
pipe.multi()
527
pipe.incr('counter')
528
pipe.incr('counter')
529
pipe.get('counter')
530
results = await pipe.execute()
531
print(results) # [1, 2, '2']
532
533
# Optimistic locking
534
await redis.watch('balance')
535
balance = int(await redis.get('balance') or 0)
536
if balance >= 100:
537
pipe = redis.pipeline(transaction=True)
538
pipe.multi()
539
pipe.decrby('balance', 100)
540
pipe.incr('purchases')
541
await pipe.execute()
542
```
543
544
### Pub/Sub Messaging
545
546
```python
547
async def pubsub_examples():
548
redis = aioredis.Redis(decode_responses=True)
549
550
# Publisher
551
async def publisher():
552
for i in range(10):
553
await redis.publish('notifications', f'Message {i}')
554
await asyncio.sleep(1)
555
556
# Subscriber with message handler
557
async def subscriber():
558
pubsub = redis.pubsub()
559
await pubsub.subscribe('notifications')
560
561
try:
562
async for message in pubsub.listen():
563
if message['type'] == 'message':
564
print(f"Received: {message['data']}")
565
if message['data'] == 'STOP':
566
break
567
finally:
568
await pubsub.close()
569
570
# Pattern subscription
571
async def pattern_subscriber():
572
pubsub = redis.pubsub()
573
await pubsub.psubscribe('user:*:notifications')
574
575
try:
576
while True:
577
message = await pubsub.get_message(timeout=1)
578
if message and message['type'] == 'pmessage':
579
channel = message['channel']
580
data = message['data']
581
print(f"Pattern match {channel}: {data}")
582
except asyncio.TimeoutError:
583
pass
584
finally:
585
await pubsub.close()
586
587
# Run publisher and subscriber concurrently
588
await asyncio.gather(publisher(), subscriber())
589
```
590
591
### Lua Scripting
592
593
```python
594
async def lua_script_examples():
595
redis = aioredis.Redis(decode_responses=True)
596
597
# Simple script execution
598
script = """
599
local key = KEYS[1]
600
local increment = ARGV[1]
601
local current = redis.call('GET', key) or 0
602
local new_value = current + increment
603
redis.call('SET', key, new_value)
604
return new_value
605
"""
606
607
result = await redis.eval(script, 1, 'counter', 5)
608
print(f"Counter value: {result}")
609
610
# Register script for reuse
611
atomic_increment = redis.register_script("""
612
local key = KEYS[1]
613
local increment = tonumber(ARGV[1])
614
local max_value = tonumber(ARGV[2])
615
616
local current = tonumber(redis.call('GET', key) or 0)
617
if current + increment <= max_value then
618
local new_value = current + increment
619
redis.call('SET', key, new_value)
620
return new_value
621
else
622
return -1 -- Exceeded maximum
623
end
624
""")
625
626
# Use registered script
627
result = await atomic_increment(keys=['limited_counter'], args=[3, 100])
628
if result == -1:
629
print("Increment would exceed maximum")
630
else:
631
print(f"New value: {result}")
632
633
# Complex script with multiple operations
634
batch_update = redis.register_script("""
635
local user_key = 'user:' .. ARGV[1]
636
local points = tonumber(ARGV[2])
637
local level = tonumber(ARGV[3])
638
639
-- Update user data
640
redis.call('HSET', user_key, 'points', points)
641
redis.call('HSET', user_key, 'level', level)
642
redis.call('HSET', user_key, 'last_updated', ARGV[4])
643
644
-- Update leaderboard
645
redis.call('ZADD', 'leaderboard', points, ARGV[1])
646
647
-- Update level ranking
648
redis.call('ZADD', 'level:' .. level, points, ARGV[1])
649
650
return redis.call('ZRANK', 'leaderboard', ARGV[1])
651
""")
652
653
import time
654
rank = await batch_update(
655
keys=[],
656
args=['user123', 1500, 5, int(time.time())]
657
)
658
print(f"User rank: {rank}")
659
```
660
661
### Distributed Locking
662
663
```python
664
async def locking_examples():
665
redis = aioredis.Redis(decode_responses=True)
666
667
# Basic lock usage
668
lock = redis.lock('resource_lock', timeout=10)
669
670
try:
671
# Try to acquire lock
672
acquired = await lock.acquire(blocking_timeout=5)
673
if acquired:
674
print("Lock acquired, performing critical section")
675
await asyncio.sleep(2) # Simulate work
676
print("Work completed")
677
else:
678
print("Could not acquire lock")
679
finally:
680
if lock.owned():
681
await lock.release()
682
683
# Context manager usage
684
async with redis.lock('another_resource', timeout=30) as lock:
685
print("Automatically acquired lock")
686
await asyncio.sleep(1)
687
# Lock automatically released on exit
688
689
# Lock with extension
690
long_task_lock = redis.lock('long_task', timeout=10)
691
await long_task_lock.acquire()
692
693
try:
694
for i in range(5):
695
print(f"Working on step {i+1}")
696
await asyncio.sleep(3)
697
698
# Extend lock if needed
699
if i < 4: # Don't extend on last iteration
700
extended = await long_task_lock.extend(10)
701
if extended:
702
print("Lock extended")
703
else:
704
print("Could not extend lock")
705
break
706
finally:
707
await long_task_lock.release()
708
709
# Non-blocking lock attempt
710
quick_lock = redis.lock('quick_resource')
711
if await quick_lock.acquire(blocking=False):
712
try:
713
print("Got lock immediately")
714
# Quick operation
715
finally:
716
await quick_lock.release()
717
else:
718
print("Resource busy, skipping operation")
719
```
720
721
### Scanning Large Datasets
722
723
```python
724
async def scanning_examples():
725
redis = aioredis.Redis(decode_responses=True)
726
727
# Scan all keys with pattern
728
cursor = 0
729
user_keys = []
730
731
while True:
732
cursor, keys = await redis.scan(cursor, match='user:*', count=100)
733
user_keys.extend(keys)
734
735
if cursor == 0: # Scan complete
736
break
737
738
print(f"Found {len(user_keys)} user keys")
739
740
# Scan set members
741
cursor = 0
742
all_tags = []
743
744
while True:
745
cursor, members = await redis.sscan('all_tags', cursor, count=50)
746
all_tags.extend(members)
747
748
if cursor == 0:
749
break
750
751
# Scan hash fields
752
cursor = 0
753
config_items = {}
754
755
while True:
756
cursor, items = await redis.hscan('config', cursor, match='cache_*')
757
config_items.update(items)
758
759
if cursor == 0:
760
break
761
762
# Scan sorted set with score filtering
763
cursor = 0
764
high_scores = []
765
766
while True:
767
cursor, members = await redis.zscan(
768
'leaderboard',
769
cursor,
770
match='player*',
771
count=25
772
)
773
# Filter by score if needed
774
high_scores.extend([
775
(member, score) for member, score in members
776
if score > 1000
777
])
778
779
if cursor == 0:
780
break
781
782
print(f"Found {len(high_scores)} high-scoring players")
783
```