A robust, performance-focused and full-featured Redis client for Node.js with TypeScript support, clustering, sentinel management, and comprehensive Redis command coverage.
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
ioredis provides comprehensive publish/subscribe messaging capabilities including channel subscriptions, pattern subscriptions, and message handling. The client automatically handles connection lifecycle and resubscription during reconnection.
Subscribe to specific channels for real-time message delivery.
// Channel subscription
subscribe(...channels: string[]): Promise<number>;
unsubscribe(...channels: string[]): Promise<number>;
// Pattern subscription
psubscribe(...patterns: string[]): Promise<number>;
punsubscribe(...patterns: string[]): Promise<number>;
// Message publishing
publish(channel: string, message: string): Promise<number>;Usage Examples:
import Redis from "ioredis";
// Create subscriber and publisher instances
const subscriber = new Redis();
const publisher = new Redis();
// Subscribe to channels
const channelCount = await subscriber.subscribe("news", "updates", "alerts");
console.log(`Subscribed to ${channelCount} channels`);
// Handle messages
subscriber.on("message", (channel, message) => {
console.log(`Received message from ${channel}: ${message}`);
switch (channel) {
case "news":
handleNewsMessage(message);
break;
case "updates":
handleUpdateMessage(message);
break;
case "alerts":
handleAlertMessage(message);
break;
}
});
// Publish messages
await publisher.publish("news", "Breaking: New Redis version released!");
await publisher.publish("updates", JSON.stringify({ version: "7.0", status: "available" }));
// Unsubscribe from specific channels
await subscriber.unsubscribe("alerts");Subscribe to channels using glob-style patterns for flexible message routing.
// Pattern subscription methods
psubscribe(...patterns: string[]): Promise<number>;
punsubscribe(...patterns: string[]): Promise<number>;
// Pattern message event
on(event: 'pmessage', listener: (pattern: string, channel: string, message: string) => void): this;Usage Examples:
const subscriber = new Redis();
const publisher = new Redis();
// Subscribe to patterns
await subscriber.psubscribe("user:*", "admin:*", "system:*:errors");
// Handle pattern messages
subscriber.on("pmessage", (pattern, channel, message) => {
console.log(`Pattern ${pattern} matched channel ${channel}: ${message}`);
if (pattern === "user:*") {
const userId = channel.split(":")[1];
handleUserMessage(userId, message);
} else if (pattern === "admin:*") {
handleAdminMessage(channel, message);
} else if (pattern === "system:*:errors") {
handleSystemError(channel, message);
}
});
// Publish to channels that match patterns
await publisher.publish("user:123", "Profile updated");
await publisher.publish("user:456", "Login detected");
await publisher.publish("admin:notifications", "System maintenance scheduled");
await publisher.publish("system:database:errors", "Connection timeout");
// Unsubscribe from patterns
await subscriber.punsubscribe("admin:*");Handle different types of subscription events and messages.
// String message events
on(event: 'message', listener: (channel: string, message: string) => void): this;
on(event: 'pmessage', listener: (pattern: string, channel: string, message: string) => void): this;
// Buffer message events (for binary data)
on(event: 'messageBuffer', listener: (channel: Buffer, message: Buffer) => void): this;
on(event: 'pmessageBuffer', listener: (pattern: Buffer, channel: Buffer, message: Buffer) => void): this;
// Subscription events
on(event: 'subscribe', listener: (channel: string, count: number) => void): this;
on(event: 'psubscribe', listener: (pattern: string, count: number) => void): this;
on(event: 'unsubscribe', listener: (channel: string, count: number) => void): this;
on(event: 'punsubscribe', listener: (pattern: string, count: number) => void): this;Usage Examples:
const subscriber = new Redis();
// Track subscription changes
subscriber.on("subscribe", (channel, count) => {
console.log(`Subscribed to ${channel}. Total subscriptions: ${count}`);
});
subscriber.on("unsubscribe", (channel, count) => {
console.log(`Unsubscribed from ${channel}. Remaining subscriptions: ${count}`);
});
subscriber.on("psubscribe", (pattern, count) => {
console.log(`Subscribed to pattern ${pattern}. Total patterns: ${count}`);
});
// Handle different message types
subscriber.on("message", (channel, message) => {
try {
const data = JSON.parse(message);
handleStructuredMessage(channel, data);
} catch {
handleTextMessage(channel, message);
}
});
// Handle binary messages
subscriber.on("messageBuffer", (channel, message) => {
console.log(`Binary message from ${channel.toString()}: ${message.length} bytes`);
handleBinaryMessage(channel.toString(), message);
});
// Subscribe to channels
await subscriber.subscribe("text-channel", "json-channel", "binary-channel");
await subscriber.psubscribe("event:*");Publishing messages to channels with subscriber count feedback.
// Publish message to channel
publish(channel: string, message: string): Promise<number>;
// Publish binary message
publish(channel: string, message: Buffer): Promise<number>;Usage Examples:
const publisher = new Redis();
// Publish text messages
const subscriberCount = await publisher.publish("notifications", "Server maintenance in 5 minutes");
console.log(`Message delivered to ${subscriberCount} subscribers`);
// Publish JSON data
const eventData = {
type: "user_login",
userId: 123,
timestamp: Date.now(),
ip: "192.168.1.100"
};
await publisher.publish("events", JSON.stringify(eventData));
// Publish binary data
const binaryData = Buffer.from("Binary message content", "utf8");
await publisher.publish("binary-channel", binaryData);
// Conditional publishing based on subscriber count
const alertChannel = "critical-alerts";
const alertSubscribers = await publisher.publish(alertChannel, "System overload detected");
if (alertSubscribers === 0) {
console.warn("No subscribers for critical alerts!");
// Fallback notification mechanism
sendEmailAlert("System overload detected");
}Manage subscriber connections and handle reconnection scenarios.
interface RedisOptions {
autoResubscribe?: boolean; // Auto-resubscribe on reconnect (default: true)
autoResendUnfulfilledCommands?: boolean; // Resend pending commands (default: true)
}Usage Examples:
const subscriber = new Redis({
autoResubscribe: true, // Automatically resubscribe after reconnection
autoResendUnfulfilledCommands: true
});
// Track connection events
subscriber.on("connect", () => {
console.log("Subscriber connected");
});
subscriber.on("ready", () => {
console.log("Subscriber ready for commands");
});
subscriber.on("reconnecting", (ms) => {
console.log(`Subscriber reconnecting in ${ms}ms`);
});
subscriber.on("error", (err) => {
console.error("Subscriber error:", err);
});
// Subscribe after connection is ready
subscriber.on("ready", async () => {
await subscriber.subscribe("important-channel");
await subscriber.psubscribe("user:*:notifications");
});Implement reliable message queuing using pub/sub with acknowledgments.
class ReliableMessageQueue {
private publisher: Redis;
private subscriber: Redis;
private processor: Redis;
private processingSet = new Set<string>();
constructor() {
this.publisher = new Redis();
this.subscriber = new Redis();
this.processor = new Redis();
this.setupSubscriber();
}
private setupSubscriber() {
this.subscriber.on("message", async (channel, message) => {
if (channel === "work-queue") {
await this.processMessage(message);
}
});
this.subscriber.subscribe("work-queue");
}
async publishMessage(data: any): Promise<void> {
const messageId = `msg:${Date.now()}:${Math.random()}`;
const message = JSON.stringify({ id: messageId, data, timestamp: Date.now() });
// Store message for reliability
await this.processor.setex(`pending:${messageId}`, 300, message); // 5 min expiry
// Publish to queue
await this.publisher.publish("work-queue", message);
}
private async processMessage(message: string): Promise<void> {
try {
const { id, data } = JSON.parse(message);
// Prevent duplicate processing
if (this.processingSet.has(id)) return;
this.processingSet.add(id);
// Process the message
await this.handleWork(data);
// Acknowledge processing
await this.processor.del(`pending:${id}`);
console.log(`Processed message ${id}`);
} catch (error) {
console.error("Message processing error:", error);
} finally {
// Clean up processing set
if (message) {
const { id } = JSON.parse(message);
this.processingSet.delete(id);
}
}
}
private async handleWork(data: any): Promise<void> {
// Implement your business logic here
await new Promise(resolve => setTimeout(resolve, 100)); // Simulate work
console.log("Processed work item:", data);
}
}
// Usage
const queue = new ReliableMessageQueue();
await queue.publishMessage({ task: "send_email", recipient: "user@example.com" });Build event-driven systems using pub/sub for loose coupling between components.
class EventBus {
private redis: Redis;
private handlers = new Map<string, Array<(data: any) => Promise<void>>>();
constructor() {
this.redis = new Redis();
this.setupSubscriptions();
}
private setupSubscriptions() {
this.redis.on("pmessage", async (pattern, channel, message) => {
const eventType = channel.split(":")[1];
const handlers = this.handlers.get(eventType) || [];
try {
const data = JSON.parse(message);
await Promise.all(handlers.map(handler => handler(data)));
} catch (error) {
console.error(`Error processing event ${eventType}:`, error);
}
});
this.redis.psubscribe("event:*");
}
on(eventType: string, handler: (data: any) => Promise<void>) {
if (!this.handlers.has(eventType)) {
this.handlers.set(eventType, []);
}
this.handlers.get(eventType)!.push(handler);
}
async emit(eventType: string, data: any): Promise<void> {
const message = JSON.stringify(data);
await this.redis.publish(`event:${eventType}`, message);
}
}
// Usage
const eventBus = new EventBus();
// Register event handlers
eventBus.on("user_registered", async (userData) => {
console.log("Sending welcome email to:", userData.email);
// Send welcome email logic
});
eventBus.on("user_registered", async (userData) => {
console.log("Creating user profile for:", userData.id);
// Create user profile logic
});
eventBus.on("order_placed", async (orderData) => {
console.log("Processing order:", orderData.orderId);
// Order processing logic
});
// Emit events
await eventBus.emit("user_registered", {
id: 123,
email: "newuser@example.com",
name: "John Doe"
});
await eventBus.emit("order_placed", {
orderId: "order-456",
userId: 123,
amount: 99.99
});Subscribe to Redis keyspace notifications for database change events.
// Enable keyspace notifications in Redis config
// CONFIG SET notify-keyspace-events "KEA"
const subscriber = new Redis();
// Subscribe to keyspace events
await subscriber.psubscribe("__keyspace@0__:*"); // Database 0 keyspace events
await subscriber.psubscribe("__keyevent@0__:*"); // Database 0 keyevent events
subscriber.on("pmessage", (pattern, channel, message) => {
if (pattern.includes("keyspace")) {
// Key-based notifications: __keyspace@0__:mykey -> set
const key = channel.split(":")[1];
const operation = message;
console.log(`Key '${key}' had operation: ${operation}`);
} else if (pattern.includes("keyevent")) {
// Event-based notifications: __keyevent@0__:set -> mykey
const operation = channel.split(":")[1];
const key = message;
console.log(`Operation '${operation}' on key: ${key}`);
}
});
// Test keyspace notifications
const testRedis = new Redis();
await testRedis.set("test_key", "value"); // Triggers notifications
await testRedis.del("test_key"); // Triggers notifications
await testRedis.expire("another_key", 60); // Triggers notificationstype MessageListener = (channel: string, message: string) => void;
type PatternMessageListener = (pattern: string, channel: string, message: string) => void;
type BufferMessageListener = (channel: Buffer, message: Buffer) => void;
type BufferPatternMessageListener = (pattern: Buffer, channel: Buffer, message: Buffer) => void;
type SubscriptionListener = (channel: string, count: number) => void;
type PatternSubscriptionListener = (pattern: string, count: number) => void;