or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bitmap-operations.mdcore-clients.mdgeneric-operations.mdgeospatial-operations.mdhash-operations.mdindex.mdlist-operations.mdlua-scripting.mdpubsub-operations.mdserver-management.mdserver-operations.mdset-operations.mdsorted-set-operations.mdstack-extensions.mdstream-operations.mdstring-operations.mdtransaction-operations.mdvalkey-support.md

pubsub-operations.mddocs/

0

# Pub/Sub Operations

1

2

Redis publish/subscribe messaging with support for channels, pattern subscriptions, and shard channels. Pub/Sub provides a powerful message broadcasting system for real-time applications, event notifications, and decoupled communication between application components.

3

4

## Capabilities

5

6

### Publishing Messages

7

8

Functions for publishing messages to channels and managing message distribution.

9

10

```python { .api }

11

def publish(self, channel: KeyT, message: EncodableT) -> int: ...

12

13

def spublish(self, channel: KeyT, message: EncodableT) -> int: ...

14

15

def pubsub_channels(self, pattern: str = "*") -> List[bytes]: ...

16

17

def pubsub_numsub(self, *args: KeyT) -> List[Tuple[bytes, int]]: ...

18

19

def pubsub_numpat(self) -> int: ...

20

21

def pubsub_shardchannels(self, pattern: str = "*") -> List[bytes]: ...

22

23

def pubsub_shardnumsub(self, *args: KeyT) -> List[Tuple[bytes, int]]: ...

24

```

25

26

### PubSub Client

27

28

Redis PubSub client for subscribing to channels and receiving messages.

29

30

```python { .api }

31

class PubSub:

32

def subscribe(self, *args: ChannelT) -> None: ...

33

34

def psubscribe(self, *args: PatternT) -> None: ...

35

36

def ssubscribe(self, *args: ChannelT) -> None: ...

37

38

def unsubscribe(self, *args: ChannelT) -> None: ...

39

40

def punsubscribe(self, *args: PatternT) -> None: ...

41

42

def sunsubscribe(self, *args: ChannelT) -> None: ...

43

44

def get_message(

45

self,

46

ignore_subscribe_messages: bool = False,

47

timeout: float = 0.0

48

) -> Optional[Dict[str, Any]]: ...

49

50

def listen(self) -> Iterator[Dict[str, Any]]: ...

51

52

def get_sharded_message(

53

self,

54

ignore_subscribe_messages: bool = False,

55

timeout: float = 0.0

56

) -> Optional[Dict[str, Any]]: ...

57

58

def close(self) -> None: ...

59

60

def reset(self) -> None: ...

61

```

62

63

### Message Handling

64

65

Core message processing functions for PubSub operations.

66

67

```python { .api }

68

def pubsub(self, **kwargs) -> PubSub: ...

69

70

# Message structure returned by get_message()

71

MessageType = Dict[str, Any]

72

# {

73

# 'type': str, # 'message', 'pmessage', 'subscribe', etc.

74

# 'channel': bytes, # Channel name

75

# 'pattern': bytes, # Pattern (for pattern subscriptions)

76

# 'data': Union[bytes, int] # Message data or subscription count

77

# }

78

```

79

80

## Usage Examples

81

82

### Basic Publishing and Subscribing

83

84

```python

85

import fakeredis

86

import threading

87

import time

88

89

client = fakeredis.FakeRedis()

90

91

def publisher():

92

"""Publisher that sends messages every second"""

93

for i in range(5):

94

time.sleep(1)

95

subscribers = client.publish('news', f'Breaking news #{i}')

96

print(f"Published message {i} to {subscribers} subscribers")

97

98

def subscriber():

99

"""Subscriber that listens to messages"""

100

pubsub = client.pubsub()

101

pubsub.subscribe('news')

102

103

# Skip subscription confirmation message

104

confirmation = pubsub.get_message()

105

print(f"Subscribed: {confirmation}")

106

107

# Listen for actual messages

108

for i in range(5):

109

message = pubsub.get_message(timeout=2.0)

110

if message:

111

print(f"Received: {message['data'].decode()}")

112

else:

113

print("No message received")

114

break

115

116

pubsub.close()

117

118

# Start publisher and subscriber

119

pub_thread = threading.Thread(target=publisher)

120

sub_thread = threading.Thread(target=subscriber)

121

122

sub_thread.start()

123

time.sleep(0.5) # Give subscriber time to connect

124

pub_thread.start()

125

126

pub_thread.join()

127

sub_thread.join()

128

```

129

130

### Pattern Subscriptions

131

132

```python

133

import fakeredis

134

import threading

135

import time

136

137

client = fakeredis.FakeRedis()

138

139

def pattern_subscriber():

140

"""Subscriber using pattern matching"""

141

pubsub = client.pubsub()

142

143

# Subscribe to all channels starting with 'user:'

144

pubsub.psubscribe('user:*')

145

146

# Skip subscription confirmation

147

confirmation = pubsub.get_message()

148

print(f"Pattern subscribed: {confirmation}")

149

150

# Listen for pattern-matched messages

151

while True:

152

message = pubsub.get_message(timeout=1.0)

153

if message:

154

if message['type'] == 'pmessage':

155

pattern = message['pattern'].decode()

156

channel = message['channel'].decode()

157

data = message['data'].decode()

158

print(f"Pattern '{pattern}' matched channel '{channel}': {data}")

159

else:

160

break

161

162

pubsub.close()

163

164

# Start pattern subscriber

165

sub_thread = threading.Thread(target=pattern_subscriber)

166

sub_thread.start()

167

168

time.sleep(0.5) # Give subscriber time to connect

169

170

# Publish to various channels

171

client.publish('user:123', 'User 123 logged in')

172

client.publish('user:456', 'User 456 updated profile')

173

client.publish('system:alert', 'System alert') # Won't match pattern

174

client.publish('user:789', 'User 789 logged out')

175

176

time.sleep(2) # Allow messages to be processed

177

sub_thread.join()

178

```

179

180

### Multiple Subscribers

181

182

```python

183

import fakeredis

184

import threading

185

import time

186

187

client = fakeredis.FakeRedis()

188

189

def create_subscriber(name, channels):

190

"""Create a subscriber for specific channels"""

191

def subscriber():

192

pubsub = client.pubsub()

193

pubsub.subscribe(*channels)

194

195

# Skip subscription confirmations

196

for _ in channels:

197

pubsub.get_message()

198

199

print(f"Subscriber {name} ready, listening to {channels}")

200

201

# Listen for messages

202

start_time = time.time()

203

while time.time() - start_time < 5: # Listen for 5 seconds

204

message = pubsub.get_message(timeout=0.1)

205

if message and message['type'] == 'message':

206

channel = message['channel'].decode()

207

data = message['data'].decode()

208

print(f"[{name}] {channel}: {data}")

209

210

pubsub.close()

211

print(f"Subscriber {name} finished")

212

213

return subscriber

214

215

# Create multiple subscribers

216

subscribers = [

217

threading.Thread(target=create_subscriber('News Reader', ['news', 'alerts'])),

218

threading.Thread(target=create_subscriber('Sports Fan', ['sports', 'alerts'])),

219

threading.Thread(target=create_subscriber('Tech Enthusiast', ['tech', 'alerts']))

220

]

221

222

# Start all subscribers

223

for sub in subscribers:

224

sub.start()

225

226

time.sleep(1) # Let subscribers connect

227

228

# Publish messages to different channels

229

messages = [

230

('news', 'Election results announced'),

231

('sports', 'Championship game tonight'),

232

('tech', 'New AI breakthrough'),

233

('alerts', 'System maintenance in 1 hour'), # All subscribers get this

234

('news', 'Weather update: sunny skies'),

235

('sports', 'Trade deadline approaching')

236

]

237

238

for channel, message in messages:

239

subscribers_count = client.publish(channel, message)

240

print(f"Published to {channel}: '{message}' ({subscribers_count} subscribers)")

241

time.sleep(0.5)

242

243

# Wait for all subscribers to finish

244

for sub in subscribers:

245

sub.join()

246

```

247

248

### Channel Information and Monitoring

249

250

```python

251

import fakeredis

252

import time

253

254

client = fakeredis.FakeRedis()

255

256

# Create some subscribers

257

pubsub1 = client.pubsub()

258

pubsub2 = client.pubsub()

259

pubsub3 = client.pubsub()

260

261

# Subscribe to various channels

262

pubsub1.subscribe('news', 'sports')

263

pubsub2.subscribe('news', 'tech')

264

pubsub3.psubscribe('user:*', 'system:*')

265

266

# Skip subscription confirmations

267

for pubsub in [pubsub1, pubsub2, pubsub3]:

268

while True:

269

msg = pubsub.get_message(timeout=0.1)

270

if not msg:

271

break

272

273

# Check active channels

274

channels = client.pubsub_channels()

275

print(f"Active channels: {[ch.decode() for ch in channels]}")

276

277

# Check specific channel patterns

278

news_channels = client.pubsub_channels('news*')

279

print(f"News channels: {[ch.decode() for ch in news_channels]}")

280

281

# Check subscriber counts for specific channels

282

numsub = client.pubsub_numsub('news', 'sports', 'tech')

283

for channel, count in numsub:

284

print(f"Channel {channel.decode()}: {count} subscribers")

285

286

# Check pattern subscription count

287

pattern_count = client.pubsub_numpat()

288

print(f"Active pattern subscriptions: {pattern_count}")

289

290

# Test publishing and monitoring

291

print("\nPublishing test messages:")

292

for channel in ['news', 'sports', 'tech']:

293

count = client.publish(channel, f'Test message for {channel}')

294

print(f" {channel}: reached {count} subscribers")

295

296

# Cleanup

297

pubsub1.close()

298

pubsub2.close()

299

pubsub3.close()

300

```

301

302

### Sharded Pub/Sub (Redis 7.0+)

303

304

```python

305

import fakeredis

306

307

# Create client with Redis 7.0+ for sharded pub/sub support

308

client = fakeredis.FakeRedis(version=(7, 0))

309

310

# Sharded pub/sub provides better performance for high-throughput scenarios

311

# by distributing channels across Redis cluster shards

312

313

def test_sharded_pubsub():

314

# Create sharded pub/sub clients

315

pubsub1 = client.pubsub()

316

pubsub2 = client.pubsub()

317

318

# Subscribe to sharded channels

319

pubsub1.ssubscribe('shard:1', 'shard:2')

320

pubsub2.ssubscribe('shard:2', 'shard:3')

321

322

# Skip subscription confirmations

323

for pubsub in [pubsub1, pubsub2]:

324

while True:

325

msg = pubsub.get_sharded_message(timeout=0.1)

326

if not msg:

327

break

328

329

print("Sharded subscribers ready")

330

331

# Publish to sharded channels

332

for i in range(3):

333

channel = f'shard:{i+1}'

334

count = client.spublish(channel, f'Sharded message {i+1}')

335

print(f"Published to {channel}: {count} subscribers")

336

337

# Read sharded messages

338

print("\nReceived messages:")

339

for name, pubsub in [('Sub1', pubsub1), ('Sub2', pubsub2)]:

340

for _ in range(3): # Try to read multiple messages

341

msg = pubsub.get_sharded_message(timeout=0.1)

342

if msg and msg['type'] == 'smessage':

343

channel = msg['channel'].decode()

344

data = msg['data'].decode()

345

print(f" [{name}] {channel}: {data}")

346

347

# Check sharded channel info

348

shard_channels = client.pubsub_shardchannels()

349

print(f"\nActive sharded channels: {[ch.decode() for ch in shard_channels]}")

350

351

shard_numsub = client.pubsub_shardnumsub('shard:1', 'shard:2', 'shard:3')

352

for channel, count in shard_numsub:

353

print(f"Shard {channel.decode()}: {count} subscribers")

354

355

# Cleanup

356

pubsub1.close()

357

pubsub2.close()

358

359

test_sharded_pubsub()

360

```

361

362

### Message Listener with Automatic Reconnection

363

364

```python

365

import fakeredis

366

import time

367

import threading

368

import logging

369

370

logging.basicConfig(level=logging.INFO)

371

logger = logging.getLogger(__name__)

372

373

class ResilientSubscriber:

374

def __init__(self, client, channels=None, patterns=None):

375

self.client = client

376

self.channels = channels or []

377

self.patterns = patterns or []

378

self.pubsub = None

379

self.running = False

380

381

def connect(self):

382

"""Establish pub/sub connection and subscriptions"""

383

self.pubsub = self.client.pubsub()

384

385

if self.channels:

386

self.pubsub.subscribe(*self.channels)

387

logger.info(f"Subscribed to channels: {self.channels}")

388

389

if self.patterns:

390

self.pubsub.psubscribe(*self.patterns)

391

logger.info(f"Subscribed to patterns: {self.patterns}")

392

393

# Skip subscription confirmation messages

394

expected_confirmations = len(self.channels) + len(self.patterns)

395

for _ in range(expected_confirmations):

396

self.pubsub.get_message(timeout=1.0)

397

398

def start(self):

399

"""Start listening for messages"""

400

self.running = True

401

self.connect()

402

403

logger.info("Starting message listener...")

404

405

while self.running:

406

try:

407

message = self.pubsub.get_message(timeout=1.0)

408

if message:

409

self.handle_message(message)

410

411

except Exception as e:

412

logger.error(f"Error receiving message: {e}")

413

# Attempt to reconnect

414

try:

415

self.pubsub.close()

416

time.sleep(1)

417

self.connect()

418

logger.info("Reconnected successfully")

419

except Exception as reconnect_error:

420

logger.error(f"Reconnection failed: {reconnect_error}")

421

time.sleep(5)

422

423

def handle_message(self, message):

424

"""Process received message"""

425

msg_type = message['type']

426

427

if msg_type == 'message':

428

channel = message['channel'].decode()

429

data = message['data'].decode()

430

logger.info(f"Channel message - {channel}: {data}")

431

432

elif msg_type == 'pmessage':

433

pattern = message['pattern'].decode()

434

channel = message['channel'].decode()

435

data = message['data'].decode()

436

logger.info(f"Pattern message - {pattern} -> {channel}: {data}")

437

438

def stop(self):

439

"""Stop the message listener"""

440

self.running = False

441

if self.pubsub:

442

self.pubsub.close()

443

logger.info("Message listener stopped")

444

445

# Usage example

446

client = fakeredis.FakeRedis()

447

448

# Create resilient subscriber

449

subscriber = ResilientSubscriber(

450

client,

451

channels=['notifications', 'alerts'],

452

patterns=['user:*', 'system:*']

453

)

454

455

# Start subscriber in background thread

456

sub_thread = threading.Thread(target=subscriber.start)

457

sub_thread.start()

458

459

time.sleep(1) # Let subscriber initialize

460

461

# Simulate message publishing

462

test_messages = [

463

('notifications', 'New notification available'),

464

('user:123', 'User 123 logged in'),

465

('alerts', 'Critical system alert'),

466

('system:backup', 'Backup completed successfully'),

467

('notifications', 'Another notification'),

468

('user:456', 'User 456 updated settings')

469

]

470

471

for channel, message in test_messages:

472

client.publish(channel, message)

473

time.sleep(0.5)

474

475

# Let messages be processed

476

time.sleep(2)

477

478

# Stop subscriber

479

subscriber.stop()

480

sub_thread.join()

481

```

482

483

### Pattern: Event Bus

484

485

```python

486

import fakeredis

487

import json

488

import time

489

import threading

490

from typing import Callable, Dict, Any

491

from dataclasses import dataclass

492

493

@dataclass

494

class Event:

495

type: str

496

data: Dict[str, Any]

497

timestamp: float

498

source: str = 'unknown'

499

500

class EventBus:

501

def __init__(self, client: fakeredis.FakeRedis):

502

self.client = client

503

self.handlers: Dict[str, list] = {}

504

self.running = False

505

self.pubsub = None

506

507

def register_handler(self, event_type: str, handler: Callable[[Event], None]):

508

"""Register an event handler for specific event type"""

509

if event_type not in self.handlers:

510

self.handlers[event_type] = []

511

self.handlers[event_type].append(handler)

512

513

def publish_event(self, event: Event) -> int:

514

"""Publish an event to the event bus"""

515

channel = f"events:{event.type}"

516

event_data = {

517

'type': event.type,

518

'data': event.data,

519

'timestamp': event.timestamp,

520

'source': event.source

521

}

522

return self.client.publish(channel, json.dumps(event_data))

523

524

def start_listening(self):

525

"""Start listening for events"""

526

if not self.handlers:

527

return

528

529

self.running = True

530

self.pubsub = self.client.pubsub()

531

532

# Subscribe to all event types we have handlers for

533

channels = [f"events:{event_type}" for event_type in self.handlers.keys()]

534

self.pubsub.subscribe(*channels)

535

536

# Skip subscription confirmations

537

for _ in channels:

538

self.pubsub.get_message(timeout=1.0)

539

540

print(f"Event bus listening for: {list(self.handlers.keys())}")

541

542

while self.running:

543

message = self.pubsub.get_message(timeout=1.0)

544

if message and message['type'] == 'message':

545

try:

546

channel = message['channel'].decode()

547

event_type = channel.replace('events:', '')

548

event_data = json.loads(message['data'].decode())

549

550

event = Event(

551

type=event_data['type'],

552

data=event_data['data'],

553

timestamp=event_data['timestamp'],

554

source=event_data['source']

555

)

556

557

# Call all handlers for this event type

558

for handler in self.handlers.get(event_type, []):

559

try:

560

handler(event)

561

except Exception as e:

562

print(f"Error in event handler: {e}")

563

564

except Exception as e:

565

print(f"Error processing event: {e}")

566

567

def stop_listening(self):

568

"""Stop listening for events"""

569

self.running = False

570

if self.pubsub:

571

self.pubsub.close()

572

573

# Event handlers

574

def user_login_handler(event: Event):

575

print(f"User {event.data['user_id']} logged in at {event.timestamp}")

576

577

def user_logout_handler(event: Event):

578

print(f"User {event.data['user_id']} logged out")

579

580

def order_created_handler(event: Event):

581

print(f"Order {event.data['order_id']} created for ${event.data['amount']}")

582

583

def audit_handler(event: Event):

584

"""Generic audit handler that logs all events"""

585

print(f"AUDIT: {event.type} from {event.source} at {event.timestamp}")

586

587

# Usage example

588

client = fakeredis.FakeRedis()

589

event_bus = EventBus(client)

590

591

# Register event handlers

592

event_bus.register_handler('user_login', user_login_handler)

593

event_bus.register_handler('user_login', audit_handler) # Multiple handlers for same event

594

event_bus.register_handler('user_logout', user_logout_handler)

595

event_bus.register_handler('user_logout', audit_handler)

596

event_bus.register_handler('order_created', order_created_handler)

597

event_bus.register_handler('order_created', audit_handler)

598

599

# Start event bus in background

600

event_thread = threading.Thread(target=event_bus.start_listening)

601

event_thread.start()

602

603

time.sleep(1) # Let event bus initialize

604

605

# Publish events

606

events = [

607

Event('user_login', {'user_id': '123', 'ip': '192.168.1.1'}, time.time(), 'auth_service'),

608

Event('order_created', {'order_id': 'ORD001', 'amount': 99.99, 'user_id': '123'}, time.time(), 'order_service'),

609

Event('user_logout', {'user_id': '123'}, time.time(), 'auth_service'),

610

]

611

612

for event in events:

613

subscribers = event_bus.publish_event(event)

614

print(f"Published {event.type} to {subscribers} subscribers")

615

time.sleep(1)

616

617

# Let events be processed

618

time.sleep(2)

619

620

# Stop event bus

621

event_bus.stop_listening()

622

event_thread.join()

623

```

624

625

### Pattern: Real-time Notifications

626

627

```python

628

import fakeredis

629

import json

630

import time

631

import threading

632

from enum import Enum

633

from dataclasses import dataclass, asdict

634

from typing import List, Optional

635

636

class NotificationPriority(Enum):

637

LOW = "low"

638

NORMAL = "normal"

639

HIGH = "high"

640

URGENT = "urgent"

641

642

@dataclass

643

class Notification:

644

id: str

645

user_id: str

646

title: str

647

message: str

648

priority: NotificationPriority

649

timestamp: float

650

read: bool = False

651

category: str = "general"

652

653

class NotificationService:

654

def __init__(self, client: fakeredis.FakeRedis):

655

self.client = client

656

657

def send_notification(self, notification: Notification):

658

"""Send notification to specific user"""

659

channel = f"notifications:user:{notification.user_id}"

660

data = asdict(notification)

661

data['priority'] = notification.priority.value # Serialize enum

662

663

return self.client.publish(channel, json.dumps(data))

664

665

def send_broadcast(self, title: str, message: str, priority: NotificationPriority = NotificationPriority.NORMAL):

666

"""Send broadcast notification to all users"""

667

notification_data = {

668

'id': f"broadcast_{int(time.time() * 1000)}",

669

'title': title,

670

'message': message,

671

'priority': priority.value,

672

'timestamp': time.time(),

673

'category': 'broadcast',

674

'read': False

675

}

676

677

return self.client.publish("notifications:broadcast", json.dumps(notification_data))

678

679

def send_admin_alert(self, message: str):

680

"""Send urgent alert to admin channels"""

681

alert_data = {

682

'id': f"alert_{int(time.time() * 1000)}",

683

'message': message,

684

'timestamp': time.time(),

685

'severity': 'urgent'

686

}

687

688

return self.client.publish("notifications:admin:alerts", json.dumps(alert_data))

689

690

class NotificationClient:

691

def __init__(self, client: fakeredis.FakeRedis, user_id: str):

692

self.client = client

693

self.user_id = user_id

694

self.pubsub = None

695

self.running = False

696

self.notifications: List[Notification] = []

697

698

def start_listening(self, include_broadcasts: bool = True, is_admin: bool = False):

699

"""Start listening for notifications"""

700

self.running = True

701

self.pubsub = self.client.pubsub()

702

703

# Subscribe to user-specific notifications

704

channels = [f"notifications:user:{self.user_id}"]

705

706

# Subscribe to broadcasts if requested

707

if include_broadcasts:

708

channels.append("notifications:broadcast")

709

710

# Subscribe to admin alerts if user is admin

711

if is_admin:

712

channels.append("notifications:admin:alerts")

713

714

self.pubsub.subscribe(*channels)

715

716

# Skip subscription confirmations

717

for _ in channels:

718

self.pubsub.get_message(timeout=1.0)

719

720

print(f"NotificationClient for user {self.user_id} listening on: {channels}")

721

722

while self.running:

723

message = self.pubsub.get_message(timeout=1.0)

724

if message and message['type'] == 'message':

725

self.handle_notification(message)

726

727

def handle_notification(self, message):

728

"""Process incoming notification"""

729

channel = message['channel'].decode()

730

data = json.loads(message['data'].decode())

731

732

if 'admin:alerts' in channel:

733

# Handle admin alert

734

print(f"🚨 ADMIN ALERT: {data['message']}")

735

736

elif 'broadcast' in channel:

737

# Handle broadcast notification

738

priority_emoji = self.get_priority_emoji(data['priority'])

739

print(f"{priority_emoji} BROADCAST: {data['title']} - {data['message']}")

740

741

else:

742

# Handle personal notification

743

notification = Notification(

744

id=data['id'],

745

user_id=data['user_id'],

746

title=data['title'],

747

message=data['message'],

748

priority=NotificationPriority(data['priority']),

749

timestamp=data['timestamp'],

750

read=data['read'],

751

category=data['category']

752

)

753

754

self.notifications.append(notification)

755

priority_emoji = self.get_priority_emoji(notification.priority.value)

756

print(f"{priority_emoji} {notification.title}: {notification.message}")

757

758

def get_priority_emoji(self, priority: str) -> str:

759

"""Get emoji for notification priority"""

760

emojis = {

761

'low': 'πŸ”΅',

762

'normal': '🟒',

763

'high': '🟑',

764

'urgent': 'πŸ”΄'

765

}

766

return emojis.get(priority, 'βšͺ')

767

768

def get_unread_count(self) -> int:

769

"""Get count of unread notifications"""

770

return len([n for n in self.notifications if not n.read])

771

772

def mark_all_read(self):

773

"""Mark all notifications as read"""

774

for notification in self.notifications:

775

notification.read = True

776

777

def stop_listening(self):

778

"""Stop listening for notifications"""

779

self.running = False

780

if self.pubsub:

781

self.pubsub.close()

782

783

# Usage example

784

client = fakeredis.FakeRedis()

785

notification_service = NotificationService(client)

786

787

# Create notification clients for different users

788

users = ['user123', 'user456', 'admin789']

789

clients = {}

790

791

for user_id in users:

792

client_obj = NotificationClient(client, user_id)

793

clients[user_id] = client_obj

794

795

# Start listening (admin gets admin alerts)

796

is_admin = user_id.startswith('admin')

797

thread = threading.Thread(

798

target=client_obj.start_listening,

799

args=(True, is_admin)

800

)

801

thread.start()

802

803

time.sleep(1) # Let clients initialize

804

805

# Send various notifications

806

print("Sending notifications...")

807

808

# Personal notifications

809

notification_service.send_notification(Notification(

810

id="notif_001",

811

user_id="user123",

812

title="Welcome!",

813

message="Welcome to our platform",

814

priority=NotificationPriority.NORMAL,

815

timestamp=time.time()

816

))

817

818

notification_service.send_notification(Notification(

819

id="notif_002",

820

user_id="user456",

821

title="Payment Due",

822

message="Your payment is due in 3 days",

823

priority=NotificationPriority.HIGH,

824

timestamp=time.time()

825

))

826

827

time.sleep(1)

828

829

# Broadcast notifications

830

notification_service.send_broadcast(

831

"System Maintenance",

832

"Scheduled maintenance tonight from 2-4 AM",

833

NotificationPriority.HIGH

834

)

835

836

time.sleep(1)

837

838

# Admin alert

839

notification_service.send_admin_alert("High CPU usage detected on server cluster")

840

841

# Let notifications be processed

842

time.sleep(2)

843

844

# Show unread counts

845

for user_id, client_obj in clients.items():

846

unread = client_obj.get_unread_count()

847

print(f"User {user_id} has {unread} unread notifications")

848

849

# Stop all clients

850

for client_obj in clients.values():

851

client_obj.stop_listening()

852

853

time.sleep(1) # Allow threads to finish

854

```