0
# Pub/Sub Messaging
1
2
Redis Pub/Sub provides publish/subscribe messaging with channels and pattern-based subscriptions. Publishers send messages to named channels, and subscribers receive messages from channels they've subscribed to, enabling real-time messaging and event-driven architectures.
3
4
## Capabilities
5
6
### Pub/Sub Client
7
8
Redis Pub/Sub client for subscribing to channels and receiving messages.
9
10
```python { .api }
11
def pubsub(self, **kwargs) -> "PubSub": ...
12
13
class PubSub:
14
def __init__(
15
self,
16
connection_pool: ConnectionPool,
17
shard_hint: Optional[str] = None,
18
ignore_subscribe_messages: bool = False,
19
**kwargs
20
): ...
21
22
def subscribe(self, *args, **kwargs) -> None: ...
23
24
def unsubscribe(self, *args) -> None: ...
25
26
def psubscribe(self, *args, **kwargs) -> None: ...
27
28
def punsubscribe(self, *args) -> None: ...
29
30
def listen(self) -> Iterator[Dict[str, Any]]: ...
31
32
def get_message(
33
self,
34
ignore_subscribe_messages: bool = False,
35
timeout: float = 0.0
36
) -> Optional[Dict[str, Any]]: ...
37
38
def ping(self, message: Optional[EncodableT] = None) -> None: ...
39
40
def close(self) -> None: ...
41
42
def reset(self) -> None: ...
43
44
@property
45
def subscribed(self) -> bool: ...
46
47
@property
48
def channels(self) -> Dict[bytes, Optional[Callable]]: ...
49
50
@property
51
def patterns(self) -> Dict[bytes, Optional[Callable]]: ...
52
```
53
54
### Publishing Operations
55
56
Redis publish operations for sending messages to channels.
57
58
```python { .api }
59
def publish(self, channel: str, message: EncodableT) -> int: ...
60
61
def pubsub_channels(self, pattern: str = "*") -> List[bytes]: ...
62
63
def pubsub_numsub(self, *args: str) -> List[Tuple[bytes, int]]: ...
64
65
def pubsub_numpat(self) -> int: ...
66
```
67
68
### Message Types
69
70
Different types of messages received through Pub/Sub subscriptions.
71
72
```python { .api }
73
# Message dictionary structure returned by get_message() and listen()
74
MessageDict = Dict[str, Union[str, bytes, int, None]]
75
76
# Message types:
77
# - 'subscribe': Confirmation of channel subscription
78
# - 'unsubscribe': Confirmation of channel unsubscription
79
# - 'psubscribe': Confirmation of pattern subscription
80
# - 'punsubscribe': Confirmation of pattern unsubscription
81
# - 'message': Regular channel message
82
# - 'pmessage': Pattern-matched message
83
```
84
85
## Usage Examples
86
87
### Basic Publisher and Subscriber
88
89
```python
90
import redis
91
import threading
92
import time
93
94
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
95
96
def publisher():
97
"""Publish messages to a channel"""
98
for i in range(10):
99
message = f"Hello {i}"
100
subscribers = r.publish('notifications', message)
101
print(f"Published '{message}' to {subscribers} subscribers")
102
time.sleep(1)
103
104
def subscriber():
105
"""Subscribe and listen for messages"""
106
pubsub = r.pubsub()
107
pubsub.subscribe('notifications')
108
109
print("Subscriber listening for messages...")
110
for message in pubsub.listen():
111
if message['type'] == 'message':
112
print(f"Received: {message['data']}")
113
elif message['type'] == 'subscribe':
114
print(f"Subscribed to: {message['channel']}")
115
116
# Run publisher and subscriber in separate threads
117
subscriber_thread = threading.Thread(target=subscriber)
118
publisher_thread = threading.Thread(target=publisher)
119
120
subscriber_thread.start()
121
time.sleep(0.5) # Let subscriber connect first
122
publisher_thread.start()
123
124
publisher_thread.join()
125
```
126
127
### Multiple Channels Subscription
128
129
```python
130
import redis
131
132
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
133
134
# Subscribe to multiple channels
135
pubsub = r.pubsub()
136
pubsub.subscribe('news', 'sports', 'weather')
137
138
print("Listening to multiple channels...")
139
for message in pubsub.listen():
140
msg_type = message['type']
141
142
if msg_type == 'subscribe':
143
print(f"Subscribed to channel: {message['channel']}")
144
elif msg_type == 'message':
145
channel = message['channel']
146
data = message['data']
147
print(f"[{channel}] {data}")
148
149
# In another process/thread, publish to different channels:
150
# r.publish('news', 'Breaking news update')
151
# r.publish('sports', 'Game results')
152
# r.publish('weather', 'Sunny today')
153
```
154
155
### Pattern-Based Subscriptions
156
157
```python
158
import redis
159
160
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
161
162
# Subscribe to channel patterns
163
pubsub = r.pubsub()
164
pubsub.psubscribe('user:*:notifications', 'system:*')
165
166
print("Listening to channel patterns...")
167
for message in pubsub.listen():
168
msg_type = message['type']
169
170
if msg_type == 'psubscribe':
171
print(f"Subscribed to pattern: {message['pattern']}")
172
elif msg_type == 'pmessage':
173
pattern = message['pattern']
174
channel = message['channel']
175
data = message['data']
176
print(f"Pattern [{pattern}] Channel [{channel}]: {data}")
177
178
# Publish to channels matching patterns:
179
# r.publish('user:1001:notifications', 'New message')
180
# r.publish('user:1002:notifications', 'Friend request')
181
# r.publish('system:alerts', 'System maintenance')
182
```
183
184
### Callback-Based Message Handling
185
186
```python
187
import redis
188
import threading
189
190
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
191
192
# Message handler functions
193
def news_handler(message):
194
print(f"π° NEWS: {message['data']}")
195
196
def alert_handler(message):
197
print(f"π¨ ALERT: {message['data']}")
198
199
def user_notification_handler(message):
200
channel = message['channel']
201
user_id = channel.split(':')[1] # Extract user ID from channel
202
print(f"π€ User {user_id}: {message['data']}")
203
204
# Subscribe with callback handlers
205
pubsub = r.pubsub()
206
pubsub.subscribe(**{
207
'news': news_handler,
208
'alerts': alert_handler
209
})
210
pubsub.psubscribe(**{
211
'user:*:notifications': user_notification_handler
212
})
213
214
# Message processing loop
215
def message_processor():
216
for message in pubsub.listen():
217
# Handlers are called automatically for subscribed channels
218
pass
219
220
# Start message processor
221
processor_thread = threading.Thread(target=message_processor)
222
processor_thread.daemon = True
223
processor_thread.start()
224
225
# Simulate publishing
226
time.sleep(0.5)
227
r.publish('news', 'Market update')
228
r.publish('alerts', 'Server overload warning')
229
r.publish('user:1001:notifications', 'New friend request')
230
231
time.sleep(2)
232
pubsub.close()
233
```
234
235
### Non-Blocking Message Retrieval
236
237
```python
238
import redis
239
import time
240
241
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
242
243
pubsub = r.pubsub()
244
pubsub.subscribe('events')
245
246
print("Non-blocking message retrieval...")
247
start_time = time.time()
248
249
while time.time() - start_time < 10: # Run for 10 seconds
250
# Get message with timeout (non-blocking)
251
message = pubsub.get_message(timeout=1.0)
252
253
if message:
254
if message['type'] == 'message':
255
print(f"Received: {message['data']}")
256
elif message['type'] == 'subscribe':
257
print(f"Subscribed to: {message['channel']}")
258
else:
259
print("No message received, doing other work...")
260
# Perform other tasks while waiting for messages
261
time.sleep(0.1)
262
263
pubsub.close()
264
```
265
266
### Pub/Sub with Authentication and Error Handling
267
268
```python
269
import redis
270
from redis.exceptions import ConnectionError, ResponseError
271
272
def create_authenticated_pubsub():
273
"""Create Pub/Sub client with authentication"""
274
try:
275
r = redis.Redis(
276
host='localhost',
277
port=6379,
278
password='your_password',
279
decode_responses=True,
280
socket_timeout=5,
281
socket_connect_timeout=5,
282
retry_on_timeout=True
283
)
284
285
# Test connection
286
r.ping()
287
288
pubsub = r.pubsub()
289
return r, pubsub
290
291
except ConnectionError as e:
292
print(f"Connection failed: {e}")
293
raise
294
except ResponseError as e:
295
print(f"Authentication failed: {e}")
296
raise
297
298
def robust_subscriber(channels):
299
"""Robust subscriber with error handling and reconnection"""
300
max_retries = 5
301
retry_count = 0
302
303
while retry_count < max_retries:
304
try:
305
r, pubsub = create_authenticated_pubsub()
306
pubsub.subscribe(*channels)
307
308
print(f"Connected and subscribed to: {channels}")
309
retry_count = 0 # Reset retry count on success
310
311
for message in pubsub.listen():
312
if message['type'] == 'message':
313
print(f"[{message['channel']}] {message['data']}")
314
315
except ConnectionError as e:
316
retry_count += 1
317
print(f"Connection lost (attempt {retry_count}): {e}")
318
319
if retry_count < max_retries:
320
wait_time = min(2 ** retry_count, 30) # Exponential backoff
321
print(f"Retrying in {wait_time} seconds...")
322
time.sleep(wait_time)
323
else:
324
print("Max retries exceeded")
325
raise
326
except KeyboardInterrupt:
327
print("Shutting down subscriber...")
328
if 'pubsub' in locals():
329
pubsub.close()
330
break
331
332
# Use robust subscriber
333
robust_subscriber(['important', 'alerts'])
334
```
335
336
### Chat Room Implementation
337
338
```python
339
import redis
340
import threading
341
import time
342
from datetime import datetime
343
344
class ChatRoom:
345
def __init__(self, room_name, username):
346
self.room_name = room_name
347
self.username = username
348
self.r = redis.Redis(host='localhost', port=6379, decode_responses=True)
349
self.pubsub = self.r.pubsub()
350
self.running = False
351
352
def join(self):
353
"""Join the chat room"""
354
channel = f"chat:{self.room_name}"
355
self.pubsub.subscribe(channel)
356
self.running = True
357
358
# Announce joining
359
self.r.publish(channel, f">>> {self.username} joined the room")
360
361
# Start message listener
362
listener_thread = threading.Thread(target=self._listen_messages)
363
listener_thread.daemon = True
364
listener_thread.start()
365
366
print(f"Joined chat room: {self.room_name}")
367
print("Type messages (or 'quit' to leave):")
368
369
# Message input loop
370
try:
371
while self.running:
372
message = input()
373
if message.lower() == 'quit':
374
break
375
self.send_message(message)
376
except KeyboardInterrupt:
377
pass
378
finally:
379
self.leave()
380
381
def send_message(self, message):
382
"""Send message to chat room"""
383
if message.strip():
384
timestamp = datetime.now().strftime('%H:%M:%S')
385
formatted_message = f"[{timestamp}] {self.username}: {message}"
386
channel = f"chat:{self.room_name}"
387
self.r.publish(channel, formatted_message)
388
389
def _listen_messages(self):
390
"""Listen for incoming messages"""
391
for message in self.pubsub.listen():
392
if not self.running:
393
break
394
395
if message['type'] == 'message':
396
print(message['data'])
397
398
def leave(self):
399
"""Leave the chat room"""
400
self.running = False
401
channel = f"chat:{self.room_name}"
402
self.r.publish(channel, f"<<< {self.username} left the room")
403
self.pubsub.close()
404
print(f"Left chat room: {self.room_name}")
405
406
# Usage example
407
if __name__ == "__main__":
408
room = ChatRoom("general", "Alice")
409
room.join()
410
```
411
412
### Pub/Sub Statistics and Monitoring
413
414
```python
415
import redis
416
import time
417
418
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
419
420
def monitor_pubsub():
421
"""Monitor Pub/Sub channels and subscriptions"""
422
print("Pub/Sub Monitoring Dashboard")
423
print("-" * 40)
424
425
while True:
426
try:
427
# Get active channels
428
channels = r.pubsub_channels()
429
print(f"Active channels: {len(channels)}")
430
431
if channels:
432
# Get subscriber counts for each channel
433
channel_stats = r.pubsub_numsub(*channels)
434
for channel, subscriber_count in channel_stats:
435
print(f" {channel}: {subscriber_count} subscribers")
436
437
# Get pattern subscription count
438
pattern_count = r.pubsub_numpat()
439
print(f"Pattern subscriptions: {pattern_count}")
440
441
print("-" * 40)
442
time.sleep(5)
443
444
except KeyboardInterrupt:
445
print("Monitoring stopped")
446
break
447
except Exception as e:
448
print(f"Monitoring error: {e}")
449
time.sleep(5)
450
451
# Monitor Pub/Sub activity
452
monitor_pubsub()
453
```
454
455
### Event-Driven Architecture Example
456
457
```python
458
import redis
459
import json
460
import threading
461
from datetime import datetime
462
463
class EventBus:
464
def __init__(self):
465
self.r = redis.Redis(host='localhost', port=6379, decode_responses=True)
466
self.pubsub = self.r.pubsub()
467
self.handlers = {}
468
self.running = False
469
470
def subscribe_to_events(self, event_types):
471
"""Subscribe to specific event types"""
472
channels = [f"events:{event_type}" for event_type in event_types]
473
self.pubsub.subscribe(*channels)
474
475
self.running = True
476
listener_thread = threading.Thread(target=self._event_listener)
477
listener_thread.daemon = True
478
listener_thread.start()
479
480
def register_handler(self, event_type, handler):
481
"""Register event handler function"""
482
if event_type not in self.handlers:
483
self.handlers[event_type] = []
484
self.handlers[event_type].append(handler)
485
486
def publish_event(self, event_type, data):
487
"""Publish event to the bus"""
488
event = {
489
'type': event_type,
490
'data': data,
491
'timestamp': datetime.now().isoformat(),
492
'id': int(time.time() * 1000000) # Microsecond timestamp as ID
493
}
494
495
channel = f"events:{event_type}"
496
self.r.publish(channel, json.dumps(event))
497
498
def _event_listener(self):
499
"""Listen for events and dispatch to handlers"""
500
for message in self.pubsub.listen():
501
if not self.running:
502
break
503
504
if message['type'] == 'message':
505
try:
506
event = json.loads(message['data'])
507
event_type = event['type']
508
509
# Dispatch to registered handlers
510
if event_type in self.handlers:
511
for handler in self.handlers[event_type]:
512
try:
513
handler(event)
514
except Exception as e:
515
print(f"Handler error for {event_type}: {e}")
516
517
except json.JSONDecodeError as e:
518
print(f"Invalid event format: {e}")
519
520
def stop(self):
521
"""Stop the event bus"""
522
self.running = False
523
self.pubsub.close()
524
525
# Event handlers
526
def user_created_handler(event):
527
user_data = event['data']
528
print(f"π User created: {user_data['name']} ({user_data['email']})")
529
530
def order_placed_handler(event):
531
order_data = event['data']
532
print(f"π Order placed: #{order_data['order_id']} by user {order_data['user_id']}")
533
534
def system_alert_handler(event):
535
alert_data = event['data']
536
print(f"π¨ ALERT: {alert_data['message']} (Level: {alert_data['level']})")
537
538
# Usage example
539
if __name__ == "__main__":
540
# Create event bus
541
event_bus = EventBus()
542
543
# Register handlers
544
event_bus.register_handler('user_created', user_created_handler)
545
event_bus.register_handler('order_placed', order_placed_handler)
546
event_bus.register_handler('system_alert', system_alert_handler)
547
548
# Subscribe to events
549
event_bus.subscribe_to_events(['user_created', 'order_placed', 'system_alert'])
550
551
# Simulate publishing events
552
time.sleep(0.5) # Let subscriber connect
553
554
event_bus.publish_event('user_created', {
555
'user_id': 1001,
556
'name': 'John Doe',
557
'email': 'john@example.com'
558
})
559
560
event_bus.publish_event('order_placed', {
561
'order_id': 'ORD-12345',
562
'user_id': 1001,
563
'total': 99.99
564
})
565
566
event_bus.publish_event('system_alert', {
567
'message': 'High memory usage detected',
568
'level': 'WARNING'
569
})
570
571
# Keep running to receive events
572
try:
573
time.sleep(10)
574
except KeyboardInterrupt:
575
pass
576
finally:
577
event_bus.stop()
578
```