docs
0
# Pub/Sub Operations
1
2
Redis publish/subscribe messaging with support for channels, pattern subscriptions, and shard channels. Pub/Sub provides a powerful message broadcasting system for real-time applications, event notifications, and decoupled communication between application components.
3
4
## Capabilities
5
6
### Publishing Messages
7
8
Functions for publishing messages to channels and managing message distribution.
9
10
```python { .api }
11
def publish(self, channel: KeyT, message: EncodableT) -> int: ...
12
13
def spublish(self, channel: KeyT, message: EncodableT) -> int: ...
14
15
def pubsub_channels(self, pattern: str = "*") -> List[bytes]: ...
16
17
def pubsub_numsub(self, *args: KeyT) -> List[Tuple[bytes, int]]: ...
18
19
def pubsub_numpat(self) -> int: ...
20
21
def pubsub_shardchannels(self, pattern: str = "*") -> List[bytes]: ...
22
23
def pubsub_shardnumsub(self, *args: KeyT) -> List[Tuple[bytes, int]]: ...
24
```
25
26
### PubSub Client
27
28
Redis PubSub client for subscribing to channels and receiving messages.
29
30
```python { .api }
31
class PubSub:
32
def subscribe(self, *args: ChannelT) -> None: ...
33
34
def psubscribe(self, *args: PatternT) -> None: ...
35
36
def ssubscribe(self, *args: ChannelT) -> None: ...
37
38
def unsubscribe(self, *args: ChannelT) -> None: ...
39
40
def punsubscribe(self, *args: PatternT) -> None: ...
41
42
def sunsubscribe(self, *args: ChannelT) -> None: ...
43
44
def get_message(
45
self,
46
ignore_subscribe_messages: bool = False,
47
timeout: float = 0.0
48
) -> Optional[Dict[str, Any]]: ...
49
50
def listen(self) -> Iterator[Dict[str, Any]]: ...
51
52
def get_sharded_message(
53
self,
54
ignore_subscribe_messages: bool = False,
55
timeout: float = 0.0
56
) -> Optional[Dict[str, Any]]: ...
57
58
def close(self) -> None: ...
59
60
def reset(self) -> None: ...
61
```
62
63
### Message Handling
64
65
Core message processing functions for PubSub operations.
66
67
```python { .api }
68
def pubsub(self, **kwargs) -> PubSub: ...
69
70
# Message structure returned by get_message()
71
MessageType = Dict[str, Any]
72
# {
73
# 'type': str, # 'message', 'pmessage', 'subscribe', etc.
74
# 'channel': bytes, # Channel name
75
# 'pattern': bytes, # Pattern (for pattern subscriptions)
76
# 'data': Union[bytes, int] # Message data or subscription count
77
# }
78
```
79
80
## Usage Examples
81
82
### Basic Publishing and Subscribing
83
84
```python
85
import fakeredis
86
import threading
87
import time
88
89
client = fakeredis.FakeRedis()
90
91
def publisher():
92
"""Publisher that sends messages every second"""
93
for i in range(5):
94
time.sleep(1)
95
subscribers = client.publish('news', f'Breaking news #{i}')
96
print(f"Published message {i} to {subscribers} subscribers")
97
98
def subscriber():
99
"""Subscriber that listens to messages"""
100
pubsub = client.pubsub()
101
pubsub.subscribe('news')
102
103
# Skip subscription confirmation message
104
confirmation = pubsub.get_message()
105
print(f"Subscribed: {confirmation}")
106
107
# Listen for actual messages
108
for i in range(5):
109
message = pubsub.get_message(timeout=2.0)
110
if message:
111
print(f"Received: {message['data'].decode()}")
112
else:
113
print("No message received")
114
break
115
116
pubsub.close()
117
118
# Start publisher and subscriber
119
pub_thread = threading.Thread(target=publisher)
120
sub_thread = threading.Thread(target=subscriber)
121
122
sub_thread.start()
123
time.sleep(0.5) # Give subscriber time to connect
124
pub_thread.start()
125
126
pub_thread.join()
127
sub_thread.join()
128
```
129
130
### Pattern Subscriptions
131
132
```python
133
import fakeredis
134
import threading
135
import time
136
137
client = fakeredis.FakeRedis()
138
139
def pattern_subscriber():
140
"""Subscriber using pattern matching"""
141
pubsub = client.pubsub()
142
143
# Subscribe to all channels starting with 'user:'
144
pubsub.psubscribe('user:*')
145
146
# Skip subscription confirmation
147
confirmation = pubsub.get_message()
148
print(f"Pattern subscribed: {confirmation}")
149
150
# Listen for pattern-matched messages
151
while True:
152
message = pubsub.get_message(timeout=1.0)
153
if message:
154
if message['type'] == 'pmessage':
155
pattern = message['pattern'].decode()
156
channel = message['channel'].decode()
157
data = message['data'].decode()
158
print(f"Pattern '{pattern}' matched channel '{channel}': {data}")
159
else:
160
break
161
162
pubsub.close()
163
164
# Start pattern subscriber
165
sub_thread = threading.Thread(target=pattern_subscriber)
166
sub_thread.start()
167
168
time.sleep(0.5) # Give subscriber time to connect
169
170
# Publish to various channels
171
client.publish('user:123', 'User 123 logged in')
172
client.publish('user:456', 'User 456 updated profile')
173
client.publish('system:alert', 'System alert') # Won't match pattern
174
client.publish('user:789', 'User 789 logged out')
175
176
time.sleep(2) # Allow messages to be processed
177
sub_thread.join()
178
```
179
180
### Multiple Subscribers
181
182
```python
183
import fakeredis
184
import threading
185
import time
186
187
client = fakeredis.FakeRedis()
188
189
def create_subscriber(name, channels):
190
"""Create a subscriber for specific channels"""
191
def subscriber():
192
pubsub = client.pubsub()
193
pubsub.subscribe(*channels)
194
195
# Skip subscription confirmations
196
for _ in channels:
197
pubsub.get_message()
198
199
print(f"Subscriber {name} ready, listening to {channels}")
200
201
# Listen for messages
202
start_time = time.time()
203
while time.time() - start_time < 5: # Listen for 5 seconds
204
message = pubsub.get_message(timeout=0.1)
205
if message and message['type'] == 'message':
206
channel = message['channel'].decode()
207
data = message['data'].decode()
208
print(f"[{name}] {channel}: {data}")
209
210
pubsub.close()
211
print(f"Subscriber {name} finished")
212
213
return subscriber
214
215
# Create multiple subscribers
216
subscribers = [
217
threading.Thread(target=create_subscriber('News Reader', ['news', 'alerts'])),
218
threading.Thread(target=create_subscriber('Sports Fan', ['sports', 'alerts'])),
219
threading.Thread(target=create_subscriber('Tech Enthusiast', ['tech', 'alerts']))
220
]
221
222
# Start all subscribers
223
for sub in subscribers:
224
sub.start()
225
226
time.sleep(1) # Let subscribers connect
227
228
# Publish messages to different channels
229
messages = [
230
('news', 'Election results announced'),
231
('sports', 'Championship game tonight'),
232
('tech', 'New AI breakthrough'),
233
('alerts', 'System maintenance in 1 hour'), # All subscribers get this
234
('news', 'Weather update: sunny skies'),
235
('sports', 'Trade deadline approaching')
236
]
237
238
for channel, message in messages:
239
subscribers_count = client.publish(channel, message)
240
print(f"Published to {channel}: '{message}' ({subscribers_count} subscribers)")
241
time.sleep(0.5)
242
243
# Wait for all subscribers to finish
244
for sub in subscribers:
245
sub.join()
246
```
247
248
### Channel Information and Monitoring
249
250
```python
251
import fakeredis
252
import time
253
254
client = fakeredis.FakeRedis()
255
256
# Create some subscribers
257
pubsub1 = client.pubsub()
258
pubsub2 = client.pubsub()
259
pubsub3 = client.pubsub()
260
261
# Subscribe to various channels
262
pubsub1.subscribe('news', 'sports')
263
pubsub2.subscribe('news', 'tech')
264
pubsub3.psubscribe('user:*', 'system:*')
265
266
# Skip subscription confirmations
267
for pubsub in [pubsub1, pubsub2, pubsub3]:
268
while True:
269
msg = pubsub.get_message(timeout=0.1)
270
if not msg:
271
break
272
273
# Check active channels
274
channels = client.pubsub_channels()
275
print(f"Active channels: {[ch.decode() for ch in channels]}")
276
277
# Check specific channel patterns
278
news_channels = client.pubsub_channels('news*')
279
print(f"News channels: {[ch.decode() for ch in news_channels]}")
280
281
# Check subscriber counts for specific channels
282
numsub = client.pubsub_numsub('news', 'sports', 'tech')
283
for channel, count in numsub:
284
print(f"Channel {channel.decode()}: {count} subscribers")
285
286
# Check pattern subscription count
287
pattern_count = client.pubsub_numpat()
288
print(f"Active pattern subscriptions: {pattern_count}")
289
290
# Test publishing and monitoring
291
print("\nPublishing test messages:")
292
for channel in ['news', 'sports', 'tech']:
293
count = client.publish(channel, f'Test message for {channel}')
294
print(f" {channel}: reached {count} subscribers")
295
296
# Cleanup
297
pubsub1.close()
298
pubsub2.close()
299
pubsub3.close()
300
```
301
302
### Sharded Pub/Sub (Redis 7.0+)
303
304
```python
305
import fakeredis
306
307
# Create client with Redis 7.0+ for sharded pub/sub support
308
client = fakeredis.FakeRedis(version=(7, 0))
309
310
# Sharded pub/sub provides better performance for high-throughput scenarios
311
# by distributing channels across Redis cluster shards
312
313
def test_sharded_pubsub():
314
# Create sharded pub/sub clients
315
pubsub1 = client.pubsub()
316
pubsub2 = client.pubsub()
317
318
# Subscribe to sharded channels
319
pubsub1.ssubscribe('shard:1', 'shard:2')
320
pubsub2.ssubscribe('shard:2', 'shard:3')
321
322
# Skip subscription confirmations
323
for pubsub in [pubsub1, pubsub2]:
324
while True:
325
msg = pubsub.get_sharded_message(timeout=0.1)
326
if not msg:
327
break
328
329
print("Sharded subscribers ready")
330
331
# Publish to sharded channels
332
for i in range(3):
333
channel = f'shard:{i+1}'
334
count = client.spublish(channel, f'Sharded message {i+1}')
335
print(f"Published to {channel}: {count} subscribers")
336
337
# Read sharded messages
338
print("\nReceived messages:")
339
for name, pubsub in [('Sub1', pubsub1), ('Sub2', pubsub2)]:
340
for _ in range(3): # Try to read multiple messages
341
msg = pubsub.get_sharded_message(timeout=0.1)
342
if msg and msg['type'] == 'smessage':
343
channel = msg['channel'].decode()
344
data = msg['data'].decode()
345
print(f" [{name}] {channel}: {data}")
346
347
# Check sharded channel info
348
shard_channels = client.pubsub_shardchannels()
349
print(f"\nActive sharded channels: {[ch.decode() for ch in shard_channels]}")
350
351
shard_numsub = client.pubsub_shardnumsub('shard:1', 'shard:2', 'shard:3')
352
for channel, count in shard_numsub:
353
print(f"Shard {channel.decode()}: {count} subscribers")
354
355
# Cleanup
356
pubsub1.close()
357
pubsub2.close()
358
359
test_sharded_pubsub()
360
```
361
362
### Message Listener with Automatic Reconnection
363
364
```python
365
import fakeredis
366
import time
367
import threading
368
import logging
369
370
logging.basicConfig(level=logging.INFO)
371
logger = logging.getLogger(__name__)
372
373
class ResilientSubscriber:
374
def __init__(self, client, channels=None, patterns=None):
375
self.client = client
376
self.channels = channels or []
377
self.patterns = patterns or []
378
self.pubsub = None
379
self.running = False
380
381
def connect(self):
382
"""Establish pub/sub connection and subscriptions"""
383
self.pubsub = self.client.pubsub()
384
385
if self.channels:
386
self.pubsub.subscribe(*self.channels)
387
logger.info(f"Subscribed to channels: {self.channels}")
388
389
if self.patterns:
390
self.pubsub.psubscribe(*self.patterns)
391
logger.info(f"Subscribed to patterns: {self.patterns}")
392
393
# Skip subscription confirmation messages
394
expected_confirmations = len(self.channels) + len(self.patterns)
395
for _ in range(expected_confirmations):
396
self.pubsub.get_message(timeout=1.0)
397
398
def start(self):
399
"""Start listening for messages"""
400
self.running = True
401
self.connect()
402
403
logger.info("Starting message listener...")
404
405
while self.running:
406
try:
407
message = self.pubsub.get_message(timeout=1.0)
408
if message:
409
self.handle_message(message)
410
411
except Exception as e:
412
logger.error(f"Error receiving message: {e}")
413
# Attempt to reconnect
414
try:
415
self.pubsub.close()
416
time.sleep(1)
417
self.connect()
418
logger.info("Reconnected successfully")
419
except Exception as reconnect_error:
420
logger.error(f"Reconnection failed: {reconnect_error}")
421
time.sleep(5)
422
423
def handle_message(self, message):
424
"""Process received message"""
425
msg_type = message['type']
426
427
if msg_type == 'message':
428
channel = message['channel'].decode()
429
data = message['data'].decode()
430
logger.info(f"Channel message - {channel}: {data}")
431
432
elif msg_type == 'pmessage':
433
pattern = message['pattern'].decode()
434
channel = message['channel'].decode()
435
data = message['data'].decode()
436
logger.info(f"Pattern message - {pattern} -> {channel}: {data}")
437
438
def stop(self):
439
"""Stop the message listener"""
440
self.running = False
441
if self.pubsub:
442
self.pubsub.close()
443
logger.info("Message listener stopped")
444
445
# Usage example
446
client = fakeredis.FakeRedis()
447
448
# Create resilient subscriber
449
subscriber = ResilientSubscriber(
450
client,
451
channels=['notifications', 'alerts'],
452
patterns=['user:*', 'system:*']
453
)
454
455
# Start subscriber in background thread
456
sub_thread = threading.Thread(target=subscriber.start)
457
sub_thread.start()
458
459
time.sleep(1) # Let subscriber initialize
460
461
# Simulate message publishing
462
test_messages = [
463
('notifications', 'New notification available'),
464
('user:123', 'User 123 logged in'),
465
('alerts', 'Critical system alert'),
466
('system:backup', 'Backup completed successfully'),
467
('notifications', 'Another notification'),
468
('user:456', 'User 456 updated settings')
469
]
470
471
for channel, message in test_messages:
472
client.publish(channel, message)
473
time.sleep(0.5)
474
475
# Let messages be processed
476
time.sleep(2)
477
478
# Stop subscriber
479
subscriber.stop()
480
sub_thread.join()
481
```
482
483
### Pattern: Event Bus
484
485
```python
486
import fakeredis
487
import json
488
import time
489
import threading
490
from typing import Callable, Dict, Any
491
from dataclasses import dataclass
492
493
@dataclass
494
class Event:
495
type: str
496
data: Dict[str, Any]
497
timestamp: float
498
source: str = 'unknown'
499
500
class EventBus:
501
def __init__(self, client: fakeredis.FakeRedis):
502
self.client = client
503
self.handlers: Dict[str, list] = {}
504
self.running = False
505
self.pubsub = None
506
507
def register_handler(self, event_type: str, handler: Callable[[Event], None]):
508
"""Register an event handler for specific event type"""
509
if event_type not in self.handlers:
510
self.handlers[event_type] = []
511
self.handlers[event_type].append(handler)
512
513
def publish_event(self, event: Event) -> int:
514
"""Publish an event to the event bus"""
515
channel = f"events:{event.type}"
516
event_data = {
517
'type': event.type,
518
'data': event.data,
519
'timestamp': event.timestamp,
520
'source': event.source
521
}
522
return self.client.publish(channel, json.dumps(event_data))
523
524
def start_listening(self):
525
"""Start listening for events"""
526
if not self.handlers:
527
return
528
529
self.running = True
530
self.pubsub = self.client.pubsub()
531
532
# Subscribe to all event types we have handlers for
533
channels = [f"events:{event_type}" for event_type in self.handlers.keys()]
534
self.pubsub.subscribe(*channels)
535
536
# Skip subscription confirmations
537
for _ in channels:
538
self.pubsub.get_message(timeout=1.0)
539
540
print(f"Event bus listening for: {list(self.handlers.keys())}")
541
542
while self.running:
543
message = self.pubsub.get_message(timeout=1.0)
544
if message and message['type'] == 'message':
545
try:
546
channel = message['channel'].decode()
547
event_type = channel.replace('events:', '')
548
event_data = json.loads(message['data'].decode())
549
550
event = Event(
551
type=event_data['type'],
552
data=event_data['data'],
553
timestamp=event_data['timestamp'],
554
source=event_data['source']
555
)
556
557
# Call all handlers for this event type
558
for handler in self.handlers.get(event_type, []):
559
try:
560
handler(event)
561
except Exception as e:
562
print(f"Error in event handler: {e}")
563
564
except Exception as e:
565
print(f"Error processing event: {e}")
566
567
def stop_listening(self):
568
"""Stop listening for events"""
569
self.running = False
570
if self.pubsub:
571
self.pubsub.close()
572
573
# Event handlers
574
def user_login_handler(event: Event):
575
print(f"User {event.data['user_id']} logged in at {event.timestamp}")
576
577
def user_logout_handler(event: Event):
578
print(f"User {event.data['user_id']} logged out")
579
580
def order_created_handler(event: Event):
581
print(f"Order {event.data['order_id']} created for ${event.data['amount']}")
582
583
def audit_handler(event: Event):
584
"""Generic audit handler that logs all events"""
585
print(f"AUDIT: {event.type} from {event.source} at {event.timestamp}")
586
587
# Usage example
588
client = fakeredis.FakeRedis()
589
event_bus = EventBus(client)
590
591
# Register event handlers
592
event_bus.register_handler('user_login', user_login_handler)
593
event_bus.register_handler('user_login', audit_handler) # Multiple handlers for same event
594
event_bus.register_handler('user_logout', user_logout_handler)
595
event_bus.register_handler('user_logout', audit_handler)
596
event_bus.register_handler('order_created', order_created_handler)
597
event_bus.register_handler('order_created', audit_handler)
598
599
# Start event bus in background
600
event_thread = threading.Thread(target=event_bus.start_listening)
601
event_thread.start()
602
603
time.sleep(1) # Let event bus initialize
604
605
# Publish events
606
events = [
607
Event('user_login', {'user_id': '123', 'ip': '192.168.1.1'}, time.time(), 'auth_service'),
608
Event('order_created', {'order_id': 'ORD001', 'amount': 99.99, 'user_id': '123'}, time.time(), 'order_service'),
609
Event('user_logout', {'user_id': '123'}, time.time(), 'auth_service'),
610
]
611
612
for event in events:
613
subscribers = event_bus.publish_event(event)
614
print(f"Published {event.type} to {subscribers} subscribers")
615
time.sleep(1)
616
617
# Let events be processed
618
time.sleep(2)
619
620
# Stop event bus
621
event_bus.stop_listening()
622
event_thread.join()
623
```
624
625
### Pattern: Real-time Notifications
626
627
```python
628
import fakeredis
629
import json
630
import time
631
import threading
632
from enum import Enum
633
from dataclasses import dataclass, asdict
634
from typing import List, Optional
635
636
class NotificationPriority(Enum):
637
LOW = "low"
638
NORMAL = "normal"
639
HIGH = "high"
640
URGENT = "urgent"
641
642
@dataclass
643
class Notification:
644
id: str
645
user_id: str
646
title: str
647
message: str
648
priority: NotificationPriority
649
timestamp: float
650
read: bool = False
651
category: str = "general"
652
653
class NotificationService:
654
def __init__(self, client: fakeredis.FakeRedis):
655
self.client = client
656
657
def send_notification(self, notification: Notification):
658
"""Send notification to specific user"""
659
channel = f"notifications:user:{notification.user_id}"
660
data = asdict(notification)
661
data['priority'] = notification.priority.value # Serialize enum
662
663
return self.client.publish(channel, json.dumps(data))
664
665
def send_broadcast(self, title: str, message: str, priority: NotificationPriority = NotificationPriority.NORMAL):
666
"""Send broadcast notification to all users"""
667
notification_data = {
668
'id': f"broadcast_{int(time.time() * 1000)}",
669
'title': title,
670
'message': message,
671
'priority': priority.value,
672
'timestamp': time.time(),
673
'category': 'broadcast',
674
'read': False
675
}
676
677
return self.client.publish("notifications:broadcast", json.dumps(notification_data))
678
679
def send_admin_alert(self, message: str):
680
"""Send urgent alert to admin channels"""
681
alert_data = {
682
'id': f"alert_{int(time.time() * 1000)}",
683
'message': message,
684
'timestamp': time.time(),
685
'severity': 'urgent'
686
}
687
688
return self.client.publish("notifications:admin:alerts", json.dumps(alert_data))
689
690
class NotificationClient:
691
def __init__(self, client: fakeredis.FakeRedis, user_id: str):
692
self.client = client
693
self.user_id = user_id
694
self.pubsub = None
695
self.running = False
696
self.notifications: List[Notification] = []
697
698
def start_listening(self, include_broadcasts: bool = True, is_admin: bool = False):
699
"""Start listening for notifications"""
700
self.running = True
701
self.pubsub = self.client.pubsub()
702
703
# Subscribe to user-specific notifications
704
channels = [f"notifications:user:{self.user_id}"]
705
706
# Subscribe to broadcasts if requested
707
if include_broadcasts:
708
channels.append("notifications:broadcast")
709
710
# Subscribe to admin alerts if user is admin
711
if is_admin:
712
channels.append("notifications:admin:alerts")
713
714
self.pubsub.subscribe(*channels)
715
716
# Skip subscription confirmations
717
for _ in channels:
718
self.pubsub.get_message(timeout=1.0)
719
720
print(f"NotificationClient for user {self.user_id} listening on: {channels}")
721
722
while self.running:
723
message = self.pubsub.get_message(timeout=1.0)
724
if message and message['type'] == 'message':
725
self.handle_notification(message)
726
727
def handle_notification(self, message):
728
"""Process incoming notification"""
729
channel = message['channel'].decode()
730
data = json.loads(message['data'].decode())
731
732
if 'admin:alerts' in channel:
733
# Handle admin alert
734
print(f"π¨ ADMIN ALERT: {data['message']}")
735
736
elif 'broadcast' in channel:
737
# Handle broadcast notification
738
priority_emoji = self.get_priority_emoji(data['priority'])
739
print(f"{priority_emoji} BROADCAST: {data['title']} - {data['message']}")
740
741
else:
742
# Handle personal notification
743
notification = Notification(
744
id=data['id'],
745
user_id=data['user_id'],
746
title=data['title'],
747
message=data['message'],
748
priority=NotificationPriority(data['priority']),
749
timestamp=data['timestamp'],
750
read=data['read'],
751
category=data['category']
752
)
753
754
self.notifications.append(notification)
755
priority_emoji = self.get_priority_emoji(notification.priority.value)
756
print(f"{priority_emoji} {notification.title}: {notification.message}")
757
758
def get_priority_emoji(self, priority: str) -> str:
759
"""Get emoji for notification priority"""
760
emojis = {
761
'low': 'π΅',
762
'normal': 'π’',
763
'high': 'π‘',
764
'urgent': 'π΄'
765
}
766
return emojis.get(priority, 'βͺ')
767
768
def get_unread_count(self) -> int:
769
"""Get count of unread notifications"""
770
return len([n for n in self.notifications if not n.read])
771
772
def mark_all_read(self):
773
"""Mark all notifications as read"""
774
for notification in self.notifications:
775
notification.read = True
776
777
def stop_listening(self):
778
"""Stop listening for notifications"""
779
self.running = False
780
if self.pubsub:
781
self.pubsub.close()
782
783
# Usage example
784
client = fakeredis.FakeRedis()
785
notification_service = NotificationService(client)
786
787
# Create notification clients for different users
788
users = ['user123', 'user456', 'admin789']
789
clients = {}
790
791
for user_id in users:
792
client_obj = NotificationClient(client, user_id)
793
clients[user_id] = client_obj
794
795
# Start listening (admin gets admin alerts)
796
is_admin = user_id.startswith('admin')
797
thread = threading.Thread(
798
target=client_obj.start_listening,
799
args=(True, is_admin)
800
)
801
thread.start()
802
803
time.sleep(1) # Let clients initialize
804
805
# Send various notifications
806
print("Sending notifications...")
807
808
# Personal notifications
809
notification_service.send_notification(Notification(
810
id="notif_001",
811
user_id="user123",
812
title="Welcome!",
813
message="Welcome to our platform",
814
priority=NotificationPriority.NORMAL,
815
timestamp=time.time()
816
))
817
818
notification_service.send_notification(Notification(
819
id="notif_002",
820
user_id="user456",
821
title="Payment Due",
822
message="Your payment is due in 3 days",
823
priority=NotificationPriority.HIGH,
824
timestamp=time.time()
825
))
826
827
time.sleep(1)
828
829
# Broadcast notifications
830
notification_service.send_broadcast(
831
"System Maintenance",
832
"Scheduled maintenance tonight from 2-4 AM",
833
NotificationPriority.HIGH
834
)
835
836
time.sleep(1)
837
838
# Admin alert
839
notification_service.send_admin_alert("High CPU usage detected on server cluster")
840
841
# Let notifications be processed
842
time.sleep(2)
843
844
# Show unread counts
845
for user_id, client_obj in clients.items():
846
unread = client_obj.get_unread_count()
847
print(f"User {user_id} has {unread} unread notifications")
848
849
# Stop all clients
850
for client_obj in clients.values():
851
client_obj.stop_listening()
852
853
time.sleep(1) # Allow threads to finish
854
```