or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-support.mdcluster-support.mdconnection-management.mdcore-client.mddistributed-locking.mderror-handling.mdhigh-availability.mdindex.mdpipelines-transactions.mdpubsub-messaging.md

pubsub-messaging.mddocs/

0

# Pub/Sub Messaging

1

2

Redis Pub/Sub provides publish/subscribe messaging with channels and pattern-based subscriptions. Publishers send messages to named channels, and subscribers receive messages from channels they've subscribed to, enabling real-time messaging and event-driven architectures.

3

4

## Capabilities

5

6

### Pub/Sub Client

7

8

Redis Pub/Sub client for subscribing to channels and receiving messages.

9

10

```python { .api }

11

def pubsub(self, **kwargs) -> "PubSub": ...

12

13

class PubSub:

14

def __init__(

15

self,

16

connection_pool: ConnectionPool,

17

shard_hint: Optional[str] = None,

18

ignore_subscribe_messages: bool = False,

19

**kwargs

20

): ...

21

22

def subscribe(self, *args, **kwargs) -> None: ...

23

24

def unsubscribe(self, *args) -> None: ...

25

26

def psubscribe(self, *args, **kwargs) -> None: ...

27

28

def punsubscribe(self, *args) -> None: ...

29

30

def listen(self) -> Iterator[Dict[str, Any]]: ...

31

32

def get_message(

33

self,

34

ignore_subscribe_messages: bool = False,

35

timeout: float = 0.0

36

) -> Optional[Dict[str, Any]]: ...

37

38

def ping(self, message: Optional[EncodableT] = None) -> None: ...

39

40

def close(self) -> None: ...

41

42

def reset(self) -> None: ...

43

44

@property

45

def subscribed(self) -> bool: ...

46

47

@property

48

def channels(self) -> Dict[bytes, Optional[Callable]]: ...

49

50

@property

51

def patterns(self) -> Dict[bytes, Optional[Callable]]: ...

52

```

53

54

### Publishing Operations

55

56

Redis publish operations for sending messages to channels.

57

58

```python { .api }

59

def publish(self, channel: str, message: EncodableT) -> int: ...

60

61

def pubsub_channels(self, pattern: str = "*") -> List[bytes]: ...

62

63

def pubsub_numsub(self, *args: str) -> List[Tuple[bytes, int]]: ...

64

65

def pubsub_numpat(self) -> int: ...

66

```

67

68

### Message Types

69

70

Different types of messages received through Pub/Sub subscriptions.

71

72

```python { .api }

73

# Message dictionary structure returned by get_message() and listen()

74

MessageDict = Dict[str, Union[str, bytes, int, None]]

75

76

# Message types:

77

# - 'subscribe': Confirmation of channel subscription

78

# - 'unsubscribe': Confirmation of channel unsubscription

79

# - 'psubscribe': Confirmation of pattern subscription

80

# - 'punsubscribe': Confirmation of pattern unsubscription

81

# - 'message': Regular channel message

82

# - 'pmessage': Pattern-matched message

83

```

84

85

## Usage Examples

86

87

### Basic Publisher and Subscriber

88

89

```python

90

import redis

91

import threading

92

import time

93

94

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

95

96

def publisher():

97

"""Publish messages to a channel"""

98

for i in range(10):

99

message = f"Hello {i}"

100

subscribers = r.publish('notifications', message)

101

print(f"Published '{message}' to {subscribers} subscribers")

102

time.sleep(1)

103

104

def subscriber():

105

"""Subscribe and listen for messages"""

106

pubsub = r.pubsub()

107

pubsub.subscribe('notifications')

108

109

print("Subscriber listening for messages...")

110

for message in pubsub.listen():

111

if message['type'] == 'message':

112

print(f"Received: {message['data']}")

113

elif message['type'] == 'subscribe':

114

print(f"Subscribed to: {message['channel']}")

115

116

# Run publisher and subscriber in separate threads

117

subscriber_thread = threading.Thread(target=subscriber)

118

publisher_thread = threading.Thread(target=publisher)

119

120

subscriber_thread.start()

121

time.sleep(0.5) # Let subscriber connect first

122

publisher_thread.start()

123

124

publisher_thread.join()

125

```

126

127

### Multiple Channels Subscription

128

129

```python

130

import redis

131

132

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

133

134

# Subscribe to multiple channels

135

pubsub = r.pubsub()

136

pubsub.subscribe('news', 'sports', 'weather')

137

138

print("Listening to multiple channels...")

139

for message in pubsub.listen():

140

msg_type = message['type']

141

142

if msg_type == 'subscribe':

143

print(f"Subscribed to channel: {message['channel']}")

144

elif msg_type == 'message':

145

channel = message['channel']

146

data = message['data']

147

print(f"[{channel}] {data}")

148

149

# In another process/thread, publish to different channels:

150

# r.publish('news', 'Breaking news update')

151

# r.publish('sports', 'Game results')

152

# r.publish('weather', 'Sunny today')

153

```

154

155

### Pattern-Based Subscriptions

156

157

```python

158

import redis

159

160

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

161

162

# Subscribe to channel patterns

163

pubsub = r.pubsub()

164

pubsub.psubscribe('user:*:notifications', 'system:*')

165

166

print("Listening to channel patterns...")

167

for message in pubsub.listen():

168

msg_type = message['type']

169

170

if msg_type == 'psubscribe':

171

print(f"Subscribed to pattern: {message['pattern']}")

172

elif msg_type == 'pmessage':

173

pattern = message['pattern']

174

channel = message['channel']

175

data = message['data']

176

print(f"Pattern [{pattern}] Channel [{channel}]: {data}")

177

178

# Publish to channels matching patterns:

179

# r.publish('user:1001:notifications', 'New message')

180

# r.publish('user:1002:notifications', 'Friend request')

181

# r.publish('system:alerts', 'System maintenance')

182

```

183

184

### Callback-Based Message Handling

185

186

```python

187

import redis

188

import threading

189

190

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

191

192

# Message handler functions

193

def news_handler(message):

194

print(f"πŸ“° NEWS: {message['data']}")

195

196

def alert_handler(message):

197

print(f"🚨 ALERT: {message['data']}")

198

199

def user_notification_handler(message):

200

channel = message['channel']

201

user_id = channel.split(':')[1] # Extract user ID from channel

202

print(f"πŸ‘€ User {user_id}: {message['data']}")

203

204

# Subscribe with callback handlers

205

pubsub = r.pubsub()

206

pubsub.subscribe(**{

207

'news': news_handler,

208

'alerts': alert_handler

209

})

210

pubsub.psubscribe(**{

211

'user:*:notifications': user_notification_handler

212

})

213

214

# Message processing loop

215

def message_processor():

216

for message in pubsub.listen():

217

# Handlers are called automatically for subscribed channels

218

pass

219

220

# Start message processor

221

processor_thread = threading.Thread(target=message_processor)

222

processor_thread.daemon = True

223

processor_thread.start()

224

225

# Simulate publishing

226

time.sleep(0.5)

227

r.publish('news', 'Market update')

228

r.publish('alerts', 'Server overload warning')

229

r.publish('user:1001:notifications', 'New friend request')

230

231

time.sleep(2)

232

pubsub.close()

233

```

234

235

### Non-Blocking Message Retrieval

236

237

```python

238

import redis

239

import time

240

241

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

242

243

pubsub = r.pubsub()

244

pubsub.subscribe('events')

245

246

print("Non-blocking message retrieval...")

247

start_time = time.time()

248

249

while time.time() - start_time < 10: # Run for 10 seconds

250

# Get message with timeout (non-blocking)

251

message = pubsub.get_message(timeout=1.0)

252

253

if message:

254

if message['type'] == 'message':

255

print(f"Received: {message['data']}")

256

elif message['type'] == 'subscribe':

257

print(f"Subscribed to: {message['channel']}")

258

else:

259

print("No message received, doing other work...")

260

# Perform other tasks while waiting for messages

261

time.sleep(0.1)

262

263

pubsub.close()

264

```

265

266

### Pub/Sub with Authentication and Error Handling

267

268

```python

269

import redis

270

from redis.exceptions import ConnectionError, ResponseError

271

272

def create_authenticated_pubsub():

273

"""Create Pub/Sub client with authentication"""

274

try:

275

r = redis.Redis(

276

host='localhost',

277

port=6379,

278

password='your_password',

279

decode_responses=True,

280

socket_timeout=5,

281

socket_connect_timeout=5,

282

retry_on_timeout=True

283

)

284

285

# Test connection

286

r.ping()

287

288

pubsub = r.pubsub()

289

return r, pubsub

290

291

except ConnectionError as e:

292

print(f"Connection failed: {e}")

293

raise

294

except ResponseError as e:

295

print(f"Authentication failed: {e}")

296

raise

297

298

def robust_subscriber(channels):

299

"""Robust subscriber with error handling and reconnection"""

300

max_retries = 5

301

retry_count = 0

302

303

while retry_count < max_retries:

304

try:

305

r, pubsub = create_authenticated_pubsub()

306

pubsub.subscribe(*channels)

307

308

print(f"Connected and subscribed to: {channels}")

309

retry_count = 0 # Reset retry count on success

310

311

for message in pubsub.listen():

312

if message['type'] == 'message':

313

print(f"[{message['channel']}] {message['data']}")

314

315

except ConnectionError as e:

316

retry_count += 1

317

print(f"Connection lost (attempt {retry_count}): {e}")

318

319

if retry_count < max_retries:

320

wait_time = min(2 ** retry_count, 30) # Exponential backoff

321

print(f"Retrying in {wait_time} seconds...")

322

time.sleep(wait_time)

323

else:

324

print("Max retries exceeded")

325

raise

326

except KeyboardInterrupt:

327

print("Shutting down subscriber...")

328

if 'pubsub' in locals():

329

pubsub.close()

330

break

331

332

# Use robust subscriber

333

robust_subscriber(['important', 'alerts'])

334

```

335

336

### Chat Room Implementation

337

338

```python

339

import redis

340

import threading

341

import time

342

from datetime import datetime

343

344

class ChatRoom:

345

def __init__(self, room_name, username):

346

self.room_name = room_name

347

self.username = username

348

self.r = redis.Redis(host='localhost', port=6379, decode_responses=True)

349

self.pubsub = self.r.pubsub()

350

self.running = False

351

352

def join(self):

353

"""Join the chat room"""

354

channel = f"chat:{self.room_name}"

355

self.pubsub.subscribe(channel)

356

self.running = True

357

358

# Announce joining

359

self.r.publish(channel, f">>> {self.username} joined the room")

360

361

# Start message listener

362

listener_thread = threading.Thread(target=self._listen_messages)

363

listener_thread.daemon = True

364

listener_thread.start()

365

366

print(f"Joined chat room: {self.room_name}")

367

print("Type messages (or 'quit' to leave):")

368

369

# Message input loop

370

try:

371

while self.running:

372

message = input()

373

if message.lower() == 'quit':

374

break

375

self.send_message(message)

376

except KeyboardInterrupt:

377

pass

378

finally:

379

self.leave()

380

381

def send_message(self, message):

382

"""Send message to chat room"""

383

if message.strip():

384

timestamp = datetime.now().strftime('%H:%M:%S')

385

formatted_message = f"[{timestamp}] {self.username}: {message}"

386

channel = f"chat:{self.room_name}"

387

self.r.publish(channel, formatted_message)

388

389

def _listen_messages(self):

390

"""Listen for incoming messages"""

391

for message in self.pubsub.listen():

392

if not self.running:

393

break

394

395

if message['type'] == 'message':

396

print(message['data'])

397

398

def leave(self):

399

"""Leave the chat room"""

400

self.running = False

401

channel = f"chat:{self.room_name}"

402

self.r.publish(channel, f"<<< {self.username} left the room")

403

self.pubsub.close()

404

print(f"Left chat room: {self.room_name}")

405

406

# Usage example

407

if __name__ == "__main__":

408

room = ChatRoom("general", "Alice")

409

room.join()

410

```

411

412

### Pub/Sub Statistics and Monitoring

413

414

```python

415

import redis

416

import time

417

418

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

419

420

def monitor_pubsub():

421

"""Monitor Pub/Sub channels and subscriptions"""

422

print("Pub/Sub Monitoring Dashboard")

423

print("-" * 40)

424

425

while True:

426

try:

427

# Get active channels

428

channels = r.pubsub_channels()

429

print(f"Active channels: {len(channels)}")

430

431

if channels:

432

# Get subscriber counts for each channel

433

channel_stats = r.pubsub_numsub(*channels)

434

for channel, subscriber_count in channel_stats:

435

print(f" {channel}: {subscriber_count} subscribers")

436

437

# Get pattern subscription count

438

pattern_count = r.pubsub_numpat()

439

print(f"Pattern subscriptions: {pattern_count}")

440

441

print("-" * 40)

442

time.sleep(5)

443

444

except KeyboardInterrupt:

445

print("Monitoring stopped")

446

break

447

except Exception as e:

448

print(f"Monitoring error: {e}")

449

time.sleep(5)

450

451

# Monitor Pub/Sub activity

452

monitor_pubsub()

453

```

454

455

### Event-Driven Architecture Example

456

457

```python

458

import redis

459

import json

460

import threading

461

from datetime import datetime

462

463

class EventBus:

464

def __init__(self):

465

self.r = redis.Redis(host='localhost', port=6379, decode_responses=True)

466

self.pubsub = self.r.pubsub()

467

self.handlers = {}

468

self.running = False

469

470

def subscribe_to_events(self, event_types):

471

"""Subscribe to specific event types"""

472

channels = [f"events:{event_type}" for event_type in event_types]

473

self.pubsub.subscribe(*channels)

474

475

self.running = True

476

listener_thread = threading.Thread(target=self._event_listener)

477

listener_thread.daemon = True

478

listener_thread.start()

479

480

def register_handler(self, event_type, handler):

481

"""Register event handler function"""

482

if event_type not in self.handlers:

483

self.handlers[event_type] = []

484

self.handlers[event_type].append(handler)

485

486

def publish_event(self, event_type, data):

487

"""Publish event to the bus"""

488

event = {

489

'type': event_type,

490

'data': data,

491

'timestamp': datetime.now().isoformat(),

492

'id': int(time.time() * 1000000) # Microsecond timestamp as ID

493

}

494

495

channel = f"events:{event_type}"

496

self.r.publish(channel, json.dumps(event))

497

498

def _event_listener(self):

499

"""Listen for events and dispatch to handlers"""

500

for message in self.pubsub.listen():

501

if not self.running:

502

break

503

504

if message['type'] == 'message':

505

try:

506

event = json.loads(message['data'])

507

event_type = event['type']

508

509

# Dispatch to registered handlers

510

if event_type in self.handlers:

511

for handler in self.handlers[event_type]:

512

try:

513

handler(event)

514

except Exception as e:

515

print(f"Handler error for {event_type}: {e}")

516

517

except json.JSONDecodeError as e:

518

print(f"Invalid event format: {e}")

519

520

def stop(self):

521

"""Stop the event bus"""

522

self.running = False

523

self.pubsub.close()

524

525

# Event handlers

526

def user_created_handler(event):

527

user_data = event['data']

528

print(f"πŸ†• User created: {user_data['name']} ({user_data['email']})")

529

530

def order_placed_handler(event):

531

order_data = event['data']

532

print(f"πŸ›’ Order placed: #{order_data['order_id']} by user {order_data['user_id']}")

533

534

def system_alert_handler(event):

535

alert_data = event['data']

536

print(f"🚨 ALERT: {alert_data['message']} (Level: {alert_data['level']})")

537

538

# Usage example

539

if __name__ == "__main__":

540

# Create event bus

541

event_bus = EventBus()

542

543

# Register handlers

544

event_bus.register_handler('user_created', user_created_handler)

545

event_bus.register_handler('order_placed', order_placed_handler)

546

event_bus.register_handler('system_alert', system_alert_handler)

547

548

# Subscribe to events

549

event_bus.subscribe_to_events(['user_created', 'order_placed', 'system_alert'])

550

551

# Simulate publishing events

552

time.sleep(0.5) # Let subscriber connect

553

554

event_bus.publish_event('user_created', {

555

'user_id': 1001,

556

'name': 'John Doe',

557

'email': 'john@example.com'

558

})

559

560

event_bus.publish_event('order_placed', {

561

'order_id': 'ORD-12345',

562

'user_id': 1001,

563

'total': 99.99

564

})

565

566

event_bus.publish_event('system_alert', {

567

'message': 'High memory usage detected',

568

'level': 'WARNING'

569

})

570

571

# Keep running to receive events

572

try:

573

time.sleep(10)

574

except KeyboardInterrupt:

575

pass

576

finally:

577

event_bus.stop()

578

```