or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cluster.mdcommands.mdconfiguration.mdindex.mdpipelining.mdpubsub.mdredis-client.mdstreaming.md
tile.json

pubsub.mddocs/

0

# Pub/Sub Messaging

1

2

ioredis provides comprehensive publish/subscribe messaging capabilities including channel subscriptions, pattern subscriptions, and message handling. The client automatically handles connection lifecycle and resubscription during reconnection.

3

4

## Capabilities

5

6

### Channel Subscription

7

8

Subscribe to specific channels for real-time message delivery.

9

10

```typescript { .api }

11

// Channel subscription

12

subscribe(...channels: string[]): Promise<number>;

13

unsubscribe(...channels: string[]): Promise<number>;

14

15

// Pattern subscription

16

psubscribe(...patterns: string[]): Promise<number>;

17

punsubscribe(...patterns: string[]): Promise<number>;

18

19

// Message publishing

20

publish(channel: string, message: string): Promise<number>;

21

```

22

23

**Usage Examples:**

24

25

```typescript

26

import Redis from "ioredis";

27

28

// Create subscriber and publisher instances

29

const subscriber = new Redis();

30

const publisher = new Redis();

31

32

// Subscribe to channels

33

const channelCount = await subscriber.subscribe("news", "updates", "alerts");

34

console.log(`Subscribed to ${channelCount} channels`);

35

36

// Handle messages

37

subscriber.on("message", (channel, message) => {

38

console.log(`Received message from ${channel}: ${message}`);

39

40

switch (channel) {

41

case "news":

42

handleNewsMessage(message);

43

break;

44

case "updates":

45

handleUpdateMessage(message);

46

break;

47

case "alerts":

48

handleAlertMessage(message);

49

break;

50

}

51

});

52

53

// Publish messages

54

await publisher.publish("news", "Breaking: New Redis version released!");

55

await publisher.publish("updates", JSON.stringify({ version: "7.0", status: "available" }));

56

57

// Unsubscribe from specific channels

58

await subscriber.unsubscribe("alerts");

59

```

60

61

### Pattern Subscription

62

63

Subscribe to channels using glob-style patterns for flexible message routing.

64

65

```typescript { .api }

66

// Pattern subscription methods

67

psubscribe(...patterns: string[]): Promise<number>;

68

punsubscribe(...patterns: string[]): Promise<number>;

69

70

// Pattern message event

71

on(event: 'pmessage', listener: (pattern: string, channel: string, message: string) => void): this;

72

```

73

74

**Usage Examples:**

75

76

```typescript

77

const subscriber = new Redis();

78

const publisher = new Redis();

79

80

// Subscribe to patterns

81

await subscriber.psubscribe("user:*", "admin:*", "system:*:errors");

82

83

// Handle pattern messages

84

subscriber.on("pmessage", (pattern, channel, message) => {

85

console.log(`Pattern ${pattern} matched channel ${channel}: ${message}`);

86

87

if (pattern === "user:*") {

88

const userId = channel.split(":")[1];

89

handleUserMessage(userId, message);

90

} else if (pattern === "admin:*") {

91

handleAdminMessage(channel, message);

92

} else if (pattern === "system:*:errors") {

93

handleSystemError(channel, message);

94

}

95

});

96

97

// Publish to channels that match patterns

98

await publisher.publish("user:123", "Profile updated");

99

await publisher.publish("user:456", "Login detected");

100

await publisher.publish("admin:notifications", "System maintenance scheduled");

101

await publisher.publish("system:database:errors", "Connection timeout");

102

103

// Unsubscribe from patterns

104

await subscriber.punsubscribe("admin:*");

105

```

106

107

### Message Event Handling

108

109

Handle different types of subscription events and messages.

110

111

```typescript { .api }

112

// String message events

113

on(event: 'message', listener: (channel: string, message: string) => void): this;

114

on(event: 'pmessage', listener: (pattern: string, channel: string, message: string) => void): this;

115

116

// Buffer message events (for binary data)

117

on(event: 'messageBuffer', listener: (channel: Buffer, message: Buffer) => void): this;

118

on(event: 'pmessageBuffer', listener: (pattern: Buffer, channel: Buffer, message: Buffer) => void): this;

119

120

// Subscription events

121

on(event: 'subscribe', listener: (channel: string, count: number) => void): this;

122

on(event: 'psubscribe', listener: (pattern: string, count: number) => void): this;

123

on(event: 'unsubscribe', listener: (channel: string, count: number) => void): this;

124

on(event: 'punsubscribe', listener: (pattern: string, count: number) => void): this;

125

```

126

127

**Usage Examples:**

128

129

```typescript

130

const subscriber = new Redis();

131

132

// Track subscription changes

133

subscriber.on("subscribe", (channel, count) => {

134

console.log(`Subscribed to ${channel}. Total subscriptions: ${count}`);

135

});

136

137

subscriber.on("unsubscribe", (channel, count) => {

138

console.log(`Unsubscribed from ${channel}. Remaining subscriptions: ${count}`);

139

});

140

141

subscriber.on("psubscribe", (pattern, count) => {

142

console.log(`Subscribed to pattern ${pattern}. Total patterns: ${count}`);

143

});

144

145

// Handle different message types

146

subscriber.on("message", (channel, message) => {

147

try {

148

const data = JSON.parse(message);

149

handleStructuredMessage(channel, data);

150

} catch {

151

handleTextMessage(channel, message);

152

}

153

});

154

155

// Handle binary messages

156

subscriber.on("messageBuffer", (channel, message) => {

157

console.log(`Binary message from ${channel.toString()}: ${message.length} bytes`);

158

handleBinaryMessage(channel.toString(), message);

159

});

160

161

// Subscribe to channels

162

await subscriber.subscribe("text-channel", "json-channel", "binary-channel");

163

await subscriber.psubscribe("event:*");

164

```

165

166

### Publisher Methods

167

168

Publishing messages to channels with subscriber count feedback.

169

170

```typescript { .api }

171

// Publish message to channel

172

publish(channel: string, message: string): Promise<number>;

173

174

// Publish binary message

175

publish(channel: string, message: Buffer): Promise<number>;

176

```

177

178

**Usage Examples:**

179

180

```typescript

181

const publisher = new Redis();

182

183

// Publish text messages

184

const subscriberCount = await publisher.publish("notifications", "Server maintenance in 5 minutes");

185

console.log(`Message delivered to ${subscriberCount} subscribers`);

186

187

// Publish JSON data

188

const eventData = {

189

type: "user_login",

190

userId: 123,

191

timestamp: Date.now(),

192

ip: "192.168.1.100"

193

};

194

await publisher.publish("events", JSON.stringify(eventData));

195

196

// Publish binary data

197

const binaryData = Buffer.from("Binary message content", "utf8");

198

await publisher.publish("binary-channel", binaryData);

199

200

// Conditional publishing based on subscriber count

201

const alertChannel = "critical-alerts";

202

const alertSubscribers = await publisher.publish(alertChannel, "System overload detected");

203

if (alertSubscribers === 0) {

204

console.warn("No subscribers for critical alerts!");

205

// Fallback notification mechanism

206

sendEmailAlert("System overload detected");

207

}

208

```

209

210

## Advanced Features

211

212

### Subscriber Connection Management

213

214

Manage subscriber connections and handle reconnection scenarios.

215

216

```typescript { .api }

217

interface RedisOptions {

218

autoResubscribe?: boolean; // Auto-resubscribe on reconnect (default: true)

219

autoResendUnfulfilledCommands?: boolean; // Resend pending commands (default: true)

220

}

221

```

222

223

**Usage Examples:**

224

225

```typescript

226

const subscriber = new Redis({

227

autoResubscribe: true, // Automatically resubscribe after reconnection

228

autoResendUnfulfilledCommands: true

229

});

230

231

// Track connection events

232

subscriber.on("connect", () => {

233

console.log("Subscriber connected");

234

});

235

236

subscriber.on("ready", () => {

237

console.log("Subscriber ready for commands");

238

});

239

240

subscriber.on("reconnecting", (ms) => {

241

console.log(`Subscriber reconnecting in ${ms}ms`);

242

});

243

244

subscriber.on("error", (err) => {

245

console.error("Subscriber error:", err);

246

});

247

248

// Subscribe after connection is ready

249

subscriber.on("ready", async () => {

250

await subscriber.subscribe("important-channel");

251

await subscriber.psubscribe("user:*:notifications");

252

});

253

```

254

255

### Message Queue Pattern

256

257

Implement reliable message queuing using pub/sub with acknowledgments.

258

259

```typescript

260

class ReliableMessageQueue {

261

private publisher: Redis;

262

private subscriber: Redis;

263

private processor: Redis;

264

private processingSet = new Set<string>();

265

266

constructor() {

267

this.publisher = new Redis();

268

this.subscriber = new Redis();

269

this.processor = new Redis();

270

271

this.setupSubscriber();

272

}

273

274

private setupSubscriber() {

275

this.subscriber.on("message", async (channel, message) => {

276

if (channel === "work-queue") {

277

await this.processMessage(message);

278

}

279

});

280

281

this.subscriber.subscribe("work-queue");

282

}

283

284

async publishMessage(data: any): Promise<void> {

285

const messageId = `msg:${Date.now()}:${Math.random()}`;

286

const message = JSON.stringify({ id: messageId, data, timestamp: Date.now() });

287

288

// Store message for reliability

289

await this.processor.setex(`pending:${messageId}`, 300, message); // 5 min expiry

290

291

// Publish to queue

292

await this.publisher.publish("work-queue", message);

293

}

294

295

private async processMessage(message: string): Promise<void> {

296

try {

297

const { id, data } = JSON.parse(message);

298

299

// Prevent duplicate processing

300

if (this.processingSet.has(id)) return;

301

this.processingSet.add(id);

302

303

// Process the message

304

await this.handleWork(data);

305

306

// Acknowledge processing

307

await this.processor.del(`pending:${id}`);

308

309

console.log(`Processed message ${id}`);

310

} catch (error) {

311

console.error("Message processing error:", error);

312

} finally {

313

// Clean up processing set

314

if (message) {

315

const { id } = JSON.parse(message);

316

this.processingSet.delete(id);

317

}

318

}

319

}

320

321

private async handleWork(data: any): Promise<void> {

322

// Implement your business logic here

323

await new Promise(resolve => setTimeout(resolve, 100)); // Simulate work

324

console.log("Processed work item:", data);

325

}

326

}

327

328

// Usage

329

const queue = new ReliableMessageQueue();

330

await queue.publishMessage({ task: "send_email", recipient: "user@example.com" });

331

```

332

333

### Event-Driven Architecture

334

335

Build event-driven systems using pub/sub for loose coupling between components.

336

337

```typescript

338

class EventBus {

339

private redis: Redis;

340

private handlers = new Map<string, Array<(data: any) => Promise<void>>>();

341

342

constructor() {

343

this.redis = new Redis();

344

this.setupSubscriptions();

345

}

346

347

private setupSubscriptions() {

348

this.redis.on("pmessage", async (pattern, channel, message) => {

349

const eventType = channel.split(":")[1];

350

const handlers = this.handlers.get(eventType) || [];

351

352

try {

353

const data = JSON.parse(message);

354

await Promise.all(handlers.map(handler => handler(data)));

355

} catch (error) {

356

console.error(`Error processing event ${eventType}:`, error);

357

}

358

});

359

360

this.redis.psubscribe("event:*");

361

}

362

363

on(eventType: string, handler: (data: any) => Promise<void>) {

364

if (!this.handlers.has(eventType)) {

365

this.handlers.set(eventType, []);

366

}

367

this.handlers.get(eventType)!.push(handler);

368

}

369

370

async emit(eventType: string, data: any): Promise<void> {

371

const message = JSON.stringify(data);

372

await this.redis.publish(`event:${eventType}`, message);

373

}

374

}

375

376

// Usage

377

const eventBus = new EventBus();

378

379

// Register event handlers

380

eventBus.on("user_registered", async (userData) => {

381

console.log("Sending welcome email to:", userData.email);

382

// Send welcome email logic

383

});

384

385

eventBus.on("user_registered", async (userData) => {

386

console.log("Creating user profile for:", userData.id);

387

// Create user profile logic

388

});

389

390

eventBus.on("order_placed", async (orderData) => {

391

console.log("Processing order:", orderData.orderId);

392

// Order processing logic

393

});

394

395

// Emit events

396

await eventBus.emit("user_registered", {

397

id: 123,

398

email: "newuser@example.com",

399

name: "John Doe"

400

});

401

402

await eventBus.emit("order_placed", {

403

orderId: "order-456",

404

userId: 123,

405

amount: 99.99

406

});

407

```

408

409

### Keyspace Notifications

410

411

Subscribe to Redis keyspace notifications for database change events.

412

413

```typescript

414

// Enable keyspace notifications in Redis config

415

// CONFIG SET notify-keyspace-events "KEA"

416

417

const subscriber = new Redis();

418

419

// Subscribe to keyspace events

420

await subscriber.psubscribe("__keyspace@0__:*"); // Database 0 keyspace events

421

await subscriber.psubscribe("__keyevent@0__:*"); // Database 0 keyevent events

422

423

subscriber.on("pmessage", (pattern, channel, message) => {

424

if (pattern.includes("keyspace")) {

425

// Key-based notifications: __keyspace@0__:mykey -> set

426

const key = channel.split(":")[1];

427

const operation = message;

428

console.log(`Key '${key}' had operation: ${operation}`);

429

} else if (pattern.includes("keyevent")) {

430

// Event-based notifications: __keyevent@0__:set -> mykey

431

const operation = channel.split(":")[1];

432

const key = message;

433

console.log(`Operation '${operation}' on key: ${key}`);

434

}

435

});

436

437

// Test keyspace notifications

438

const testRedis = new Redis();

439

await testRedis.set("test_key", "value"); // Triggers notifications

440

await testRedis.del("test_key"); // Triggers notifications

441

await testRedis.expire("another_key", 60); // Triggers notifications

442

```

443

444

## Types

445

446

```typescript { .api }

447

type MessageListener = (channel: string, message: string) => void;

448

type PatternMessageListener = (pattern: string, channel: string, message: string) => void;

449

type BufferMessageListener = (channel: Buffer, message: Buffer) => void;

450

type BufferPatternMessageListener = (pattern: Buffer, channel: Buffer, message: Buffer) => void;

451

type SubscriptionListener = (channel: string, count: number) => void;

452

type PatternSubscriptionListener = (pattern: string, count: number) => void;

453

```