0
# Pub/Sub Messaging
1
2
ioredis provides comprehensive publish/subscribe messaging capabilities including channel subscriptions, pattern subscriptions, and message handling. The client automatically handles connection lifecycle and resubscription during reconnection.
3
4
## Capabilities
5
6
### Channel Subscription
7
8
Subscribe to specific channels for real-time message delivery.
9
10
```typescript { .api }
11
// Channel subscription
12
subscribe(...channels: string[]): Promise<number>;
13
unsubscribe(...channels: string[]): Promise<number>;
14
15
// Pattern subscription
16
psubscribe(...patterns: string[]): Promise<number>;
17
punsubscribe(...patterns: string[]): Promise<number>;
18
19
// Message publishing
20
publish(channel: string, message: string): Promise<number>;
21
```
22
23
**Usage Examples:**
24
25
```typescript
26
import Redis from "ioredis";
27
28
// Create subscriber and publisher instances
29
const subscriber = new Redis();
30
const publisher = new Redis();
31
32
// Subscribe to channels
33
const channelCount = await subscriber.subscribe("news", "updates", "alerts");
34
console.log(`Subscribed to ${channelCount} channels`);
35
36
// Handle messages
37
subscriber.on("message", (channel, message) => {
38
console.log(`Received message from ${channel}: ${message}`);
39
40
switch (channel) {
41
case "news":
42
handleNewsMessage(message);
43
break;
44
case "updates":
45
handleUpdateMessage(message);
46
break;
47
case "alerts":
48
handleAlertMessage(message);
49
break;
50
}
51
});
52
53
// Publish messages
54
await publisher.publish("news", "Breaking: New Redis version released!");
55
await publisher.publish("updates", JSON.stringify({ version: "7.0", status: "available" }));
56
57
// Unsubscribe from specific channels
58
await subscriber.unsubscribe("alerts");
59
```
60
61
### Pattern Subscription
62
63
Subscribe to channels using glob-style patterns for flexible message routing.
64
65
```typescript { .api }
66
// Pattern subscription methods
67
psubscribe(...patterns: string[]): Promise<number>;
68
punsubscribe(...patterns: string[]): Promise<number>;
69
70
// Pattern message event
71
on(event: 'pmessage', listener: (pattern: string, channel: string, message: string) => void): this;
72
```
73
74
**Usage Examples:**
75
76
```typescript
77
const subscriber = new Redis();
78
const publisher = new Redis();
79
80
// Subscribe to patterns
81
await subscriber.psubscribe("user:*", "admin:*", "system:*:errors");
82
83
// Handle pattern messages
84
subscriber.on("pmessage", (pattern, channel, message) => {
85
console.log(`Pattern ${pattern} matched channel ${channel}: ${message}`);
86
87
if (pattern === "user:*") {
88
const userId = channel.split(":")[1];
89
handleUserMessage(userId, message);
90
} else if (pattern === "admin:*") {
91
handleAdminMessage(channel, message);
92
} else if (pattern === "system:*:errors") {
93
handleSystemError(channel, message);
94
}
95
});
96
97
// Publish to channels that match patterns
98
await publisher.publish("user:123", "Profile updated");
99
await publisher.publish("user:456", "Login detected");
100
await publisher.publish("admin:notifications", "System maintenance scheduled");
101
await publisher.publish("system:database:errors", "Connection timeout");
102
103
// Unsubscribe from patterns
104
await subscriber.punsubscribe("admin:*");
105
```
106
107
### Message Event Handling
108
109
Handle different types of subscription events and messages.
110
111
```typescript { .api }
112
// String message events
113
on(event: 'message', listener: (channel: string, message: string) => void): this;
114
on(event: 'pmessage', listener: (pattern: string, channel: string, message: string) => void): this;
115
116
// Buffer message events (for binary data)
117
on(event: 'messageBuffer', listener: (channel: Buffer, message: Buffer) => void): this;
118
on(event: 'pmessageBuffer', listener: (pattern: Buffer, channel: Buffer, message: Buffer) => void): this;
119
120
// Subscription events
121
on(event: 'subscribe', listener: (channel: string, count: number) => void): this;
122
on(event: 'psubscribe', listener: (pattern: string, count: number) => void): this;
123
on(event: 'unsubscribe', listener: (channel: string, count: number) => void): this;
124
on(event: 'punsubscribe', listener: (pattern: string, count: number) => void): this;
125
```
126
127
**Usage Examples:**
128
129
```typescript
130
const subscriber = new Redis();
131
132
// Track subscription changes
133
subscriber.on("subscribe", (channel, count) => {
134
console.log(`Subscribed to ${channel}. Total subscriptions: ${count}`);
135
});
136
137
subscriber.on("unsubscribe", (channel, count) => {
138
console.log(`Unsubscribed from ${channel}. Remaining subscriptions: ${count}`);
139
});
140
141
subscriber.on("psubscribe", (pattern, count) => {
142
console.log(`Subscribed to pattern ${pattern}. Total patterns: ${count}`);
143
});
144
145
// Handle different message types
146
subscriber.on("message", (channel, message) => {
147
try {
148
const data = JSON.parse(message);
149
handleStructuredMessage(channel, data);
150
} catch {
151
handleTextMessage(channel, message);
152
}
153
});
154
155
// Handle binary messages
156
subscriber.on("messageBuffer", (channel, message) => {
157
console.log(`Binary message from ${channel.toString()}: ${message.length} bytes`);
158
handleBinaryMessage(channel.toString(), message);
159
});
160
161
// Subscribe to channels
162
await subscriber.subscribe("text-channel", "json-channel", "binary-channel");
163
await subscriber.psubscribe("event:*");
164
```
165
166
### Publisher Methods
167
168
Publishing messages to channels with subscriber count feedback.
169
170
```typescript { .api }
171
// Publish message to channel
172
publish(channel: string, message: string): Promise<number>;
173
174
// Publish binary message
175
publish(channel: string, message: Buffer): Promise<number>;
176
```
177
178
**Usage Examples:**
179
180
```typescript
181
const publisher = new Redis();
182
183
// Publish text messages
184
const subscriberCount = await publisher.publish("notifications", "Server maintenance in 5 minutes");
185
console.log(`Message delivered to ${subscriberCount} subscribers`);
186
187
// Publish JSON data
188
const eventData = {
189
type: "user_login",
190
userId: 123,
191
timestamp: Date.now(),
192
ip: "192.168.1.100"
193
};
194
await publisher.publish("events", JSON.stringify(eventData));
195
196
// Publish binary data
197
const binaryData = Buffer.from("Binary message content", "utf8");
198
await publisher.publish("binary-channel", binaryData);
199
200
// Conditional publishing based on subscriber count
201
const alertChannel = "critical-alerts";
202
const alertSubscribers = await publisher.publish(alertChannel, "System overload detected");
203
if (alertSubscribers === 0) {
204
console.warn("No subscribers for critical alerts!");
205
// Fallback notification mechanism
206
sendEmailAlert("System overload detected");
207
}
208
```
209
210
## Advanced Features
211
212
### Subscriber Connection Management
213
214
Manage subscriber connections and handle reconnection scenarios.
215
216
```typescript { .api }
217
interface RedisOptions {
218
autoResubscribe?: boolean; // Auto-resubscribe on reconnect (default: true)
219
autoResendUnfulfilledCommands?: boolean; // Resend pending commands (default: true)
220
}
221
```
222
223
**Usage Examples:**
224
225
```typescript
226
const subscriber = new Redis({
227
autoResubscribe: true, // Automatically resubscribe after reconnection
228
autoResendUnfulfilledCommands: true
229
});
230
231
// Track connection events
232
subscriber.on("connect", () => {
233
console.log("Subscriber connected");
234
});
235
236
subscriber.on("ready", () => {
237
console.log("Subscriber ready for commands");
238
});
239
240
subscriber.on("reconnecting", (ms) => {
241
console.log(`Subscriber reconnecting in ${ms}ms`);
242
});
243
244
subscriber.on("error", (err) => {
245
console.error("Subscriber error:", err);
246
});
247
248
// Subscribe after connection is ready
249
subscriber.on("ready", async () => {
250
await subscriber.subscribe("important-channel");
251
await subscriber.psubscribe("user:*:notifications");
252
});
253
```
254
255
### Message Queue Pattern
256
257
Implement reliable message queuing using pub/sub with acknowledgments.
258
259
```typescript
260
class ReliableMessageQueue {
261
private publisher: Redis;
262
private subscriber: Redis;
263
private processor: Redis;
264
private processingSet = new Set<string>();
265
266
constructor() {
267
this.publisher = new Redis();
268
this.subscriber = new Redis();
269
this.processor = new Redis();
270
271
this.setupSubscriber();
272
}
273
274
private setupSubscriber() {
275
this.subscriber.on("message", async (channel, message) => {
276
if (channel === "work-queue") {
277
await this.processMessage(message);
278
}
279
});
280
281
this.subscriber.subscribe("work-queue");
282
}
283
284
async publishMessage(data: any): Promise<void> {
285
const messageId = `msg:${Date.now()}:${Math.random()}`;
286
const message = JSON.stringify({ id: messageId, data, timestamp: Date.now() });
287
288
// Store message for reliability
289
await this.processor.setex(`pending:${messageId}`, 300, message); // 5 min expiry
290
291
// Publish to queue
292
await this.publisher.publish("work-queue", message);
293
}
294
295
private async processMessage(message: string): Promise<void> {
296
try {
297
const { id, data } = JSON.parse(message);
298
299
// Prevent duplicate processing
300
if (this.processingSet.has(id)) return;
301
this.processingSet.add(id);
302
303
// Process the message
304
await this.handleWork(data);
305
306
// Acknowledge processing
307
await this.processor.del(`pending:${id}`);
308
309
console.log(`Processed message ${id}`);
310
} catch (error) {
311
console.error("Message processing error:", error);
312
} finally {
313
// Clean up processing set
314
if (message) {
315
const { id } = JSON.parse(message);
316
this.processingSet.delete(id);
317
}
318
}
319
}
320
321
private async handleWork(data: any): Promise<void> {
322
// Implement your business logic here
323
await new Promise(resolve => setTimeout(resolve, 100)); // Simulate work
324
console.log("Processed work item:", data);
325
}
326
}
327
328
// Usage
329
const queue = new ReliableMessageQueue();
330
await queue.publishMessage({ task: "send_email", recipient: "user@example.com" });
331
```
332
333
### Event-Driven Architecture
334
335
Build event-driven systems using pub/sub for loose coupling between components.
336
337
```typescript
338
class EventBus {
339
private redis: Redis;
340
private handlers = new Map<string, Array<(data: any) => Promise<void>>>();
341
342
constructor() {
343
this.redis = new Redis();
344
this.setupSubscriptions();
345
}
346
347
private setupSubscriptions() {
348
this.redis.on("pmessage", async (pattern, channel, message) => {
349
const eventType = channel.split(":")[1];
350
const handlers = this.handlers.get(eventType) || [];
351
352
try {
353
const data = JSON.parse(message);
354
await Promise.all(handlers.map(handler => handler(data)));
355
} catch (error) {
356
console.error(`Error processing event ${eventType}:`, error);
357
}
358
});
359
360
this.redis.psubscribe("event:*");
361
}
362
363
on(eventType: string, handler: (data: any) => Promise<void>) {
364
if (!this.handlers.has(eventType)) {
365
this.handlers.set(eventType, []);
366
}
367
this.handlers.get(eventType)!.push(handler);
368
}
369
370
async emit(eventType: string, data: any): Promise<void> {
371
const message = JSON.stringify(data);
372
await this.redis.publish(`event:${eventType}`, message);
373
}
374
}
375
376
// Usage
377
const eventBus = new EventBus();
378
379
// Register event handlers
380
eventBus.on("user_registered", async (userData) => {
381
console.log("Sending welcome email to:", userData.email);
382
// Send welcome email logic
383
});
384
385
eventBus.on("user_registered", async (userData) => {
386
console.log("Creating user profile for:", userData.id);
387
// Create user profile logic
388
});
389
390
eventBus.on("order_placed", async (orderData) => {
391
console.log("Processing order:", orderData.orderId);
392
// Order processing logic
393
});
394
395
// Emit events
396
await eventBus.emit("user_registered", {
397
id: 123,
398
email: "newuser@example.com",
399
name: "John Doe"
400
});
401
402
await eventBus.emit("order_placed", {
403
orderId: "order-456",
404
userId: 123,
405
amount: 99.99
406
});
407
```
408
409
### Keyspace Notifications
410
411
Subscribe to Redis keyspace notifications for database change events.
412
413
```typescript
414
// Enable keyspace notifications in Redis config
415
// CONFIG SET notify-keyspace-events "KEA"
416
417
const subscriber = new Redis();
418
419
// Subscribe to keyspace events
420
await subscriber.psubscribe("__keyspace@0__:*"); // Database 0 keyspace events
421
await subscriber.psubscribe("__keyevent@0__:*"); // Database 0 keyevent events
422
423
subscriber.on("pmessage", (pattern, channel, message) => {
424
if (pattern.includes("keyspace")) {
425
// Key-based notifications: __keyspace@0__:mykey -> set
426
const key = channel.split(":")[1];
427
const operation = message;
428
console.log(`Key '${key}' had operation: ${operation}`);
429
} else if (pattern.includes("keyevent")) {
430
// Event-based notifications: __keyevent@0__:set -> mykey
431
const operation = channel.split(":")[1];
432
const key = message;
433
console.log(`Operation '${operation}' on key: ${key}`);
434
}
435
});
436
437
// Test keyspace notifications
438
const testRedis = new Redis();
439
await testRedis.set("test_key", "value"); // Triggers notifications
440
await testRedis.del("test_key"); // Triggers notifications
441
await testRedis.expire("another_key", 60); // Triggers notifications
442
```
443
444
## Types
445
446
```typescript { .api }
447
type MessageListener = (channel: string, message: string) => void;
448
type PatternMessageListener = (pattern: string, channel: string, message: string) => void;
449
type BufferMessageListener = (channel: Buffer, message: Buffer) => void;
450
type BufferPatternMessageListener = (pattern: Buffer, channel: Buffer, message: Buffer) => void;
451
type SubscriptionListener = (channel: string, count: number) => void;
452
type PatternSubscriptionListener = (pattern: string, count: number) => void;
453
```