CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-ioredis

A robust, performance-focused and full-featured Redis client for Node.js with TypeScript support, clustering, sentinel management, and comprehensive Redis command coverage.

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

pubsub.mddocs/

Pub/Sub Messaging

ioredis provides comprehensive publish/subscribe messaging capabilities including channel subscriptions, pattern subscriptions, and message handling. The client automatically handles connection lifecycle and resubscription during reconnection.

Capabilities

Channel Subscription

Subscribe to specific channels for real-time message delivery.

// Channel subscription
subscribe(...channels: string[]): Promise<number>;
unsubscribe(...channels: string[]): Promise<number>;

// Pattern subscription  
psubscribe(...patterns: string[]): Promise<number>;
punsubscribe(...patterns: string[]): Promise<number>;

// Message publishing
publish(channel: string, message: string): Promise<number>;

Usage Examples:

import Redis from "ioredis";

// Create subscriber and publisher instances
const subscriber = new Redis();
const publisher = new Redis();

// Subscribe to channels
const channelCount = await subscriber.subscribe("news", "updates", "alerts");
console.log(`Subscribed to ${channelCount} channels`);

// Handle messages
subscriber.on("message", (channel, message) => {
  console.log(`Received message from ${channel}: ${message}`);
  
  switch (channel) {
    case "news":
      handleNewsMessage(message);
      break;
    case "updates":
      handleUpdateMessage(message);
      break;
    case "alerts":
      handleAlertMessage(message);
      break;
  }
});

// Publish messages
await publisher.publish("news", "Breaking: New Redis version released!");
await publisher.publish("updates", JSON.stringify({ version: "7.0", status: "available" }));

// Unsubscribe from specific channels
await subscriber.unsubscribe("alerts");

Pattern Subscription

Subscribe to channels using glob-style patterns for flexible message routing.

// Pattern subscription methods
psubscribe(...patterns: string[]): Promise<number>;
punsubscribe(...patterns: string[]): Promise<number>;

// Pattern message event
on(event: 'pmessage', listener: (pattern: string, channel: string, message: string) => void): this;

Usage Examples:

const subscriber = new Redis();
const publisher = new Redis();

// Subscribe to patterns
await subscriber.psubscribe("user:*", "admin:*", "system:*:errors");

// Handle pattern messages
subscriber.on("pmessage", (pattern, channel, message) => {
  console.log(`Pattern ${pattern} matched channel ${channel}: ${message}`);
  
  if (pattern === "user:*") {
    const userId = channel.split(":")[1];
    handleUserMessage(userId, message);
  } else if (pattern === "admin:*") {
    handleAdminMessage(channel, message);
  } else if (pattern === "system:*:errors") {
    handleSystemError(channel, message);
  }
});

// Publish to channels that match patterns
await publisher.publish("user:123", "Profile updated");
await publisher.publish("user:456", "Login detected");
await publisher.publish("admin:notifications", "System maintenance scheduled");
await publisher.publish("system:database:errors", "Connection timeout");

// Unsubscribe from patterns
await subscriber.punsubscribe("admin:*");

Message Event Handling

Handle different types of subscription events and messages.

// String message events
on(event: 'message', listener: (channel: string, message: string) => void): this;
on(event: 'pmessage', listener: (pattern: string, channel: string, message: string) => void): this;

// Buffer message events (for binary data)
on(event: 'messageBuffer', listener: (channel: Buffer, message: Buffer) => void): this;
on(event: 'pmessageBuffer', listener: (pattern: Buffer, channel: Buffer, message: Buffer) => void): this;

// Subscription events
on(event: 'subscribe', listener: (channel: string, count: number) => void): this;
on(event: 'psubscribe', listener: (pattern: string, count: number) => void): this;
on(event: 'unsubscribe', listener: (channel: string, count: number) => void): this;
on(event: 'punsubscribe', listener: (pattern: string, count: number) => void): this;

Usage Examples:

const subscriber = new Redis();

// Track subscription changes
subscriber.on("subscribe", (channel, count) => {
  console.log(`Subscribed to ${channel}. Total subscriptions: ${count}`);
});

subscriber.on("unsubscribe", (channel, count) => {
  console.log(`Unsubscribed from ${channel}. Remaining subscriptions: ${count}`);
});

subscriber.on("psubscribe", (pattern, count) => {
  console.log(`Subscribed to pattern ${pattern}. Total patterns: ${count}`);
});

// Handle different message types
subscriber.on("message", (channel, message) => {
  try {
    const data = JSON.parse(message);
    handleStructuredMessage(channel, data);
  } catch {
    handleTextMessage(channel, message);
  }
});

// Handle binary messages
subscriber.on("messageBuffer", (channel, message) => {
  console.log(`Binary message from ${channel.toString()}: ${message.length} bytes`);
  handleBinaryMessage(channel.toString(), message);
});

// Subscribe to channels
await subscriber.subscribe("text-channel", "json-channel", "binary-channel");
await subscriber.psubscribe("event:*");

Publisher Methods

Publishing messages to channels with subscriber count feedback.

// Publish message to channel
publish(channel: string, message: string): Promise<number>;

// Publish binary message
publish(channel: string, message: Buffer): Promise<number>;

Usage Examples:

const publisher = new Redis();

// Publish text messages
const subscriberCount = await publisher.publish("notifications", "Server maintenance in 5 minutes");
console.log(`Message delivered to ${subscriberCount} subscribers`);

// Publish JSON data
const eventData = {
  type: "user_login",
  userId: 123,
  timestamp: Date.now(),
  ip: "192.168.1.100"
};
await publisher.publish("events", JSON.stringify(eventData));

// Publish binary data
const binaryData = Buffer.from("Binary message content", "utf8");
await publisher.publish("binary-channel", binaryData);

// Conditional publishing based on subscriber count
const alertChannel = "critical-alerts";
const alertSubscribers = await publisher.publish(alertChannel, "System overload detected");
if (alertSubscribers === 0) {
  console.warn("No subscribers for critical alerts!");
  // Fallback notification mechanism
  sendEmailAlert("System overload detected");
}

Advanced Features

Subscriber Connection Management

Manage subscriber connections and handle reconnection scenarios.

interface RedisOptions {
  autoResubscribe?: boolean;        // Auto-resubscribe on reconnect (default: true)
  autoResendUnfulfilledCommands?: boolean; // Resend pending commands (default: true)
}

Usage Examples:

const subscriber = new Redis({
  autoResubscribe: true,  // Automatically resubscribe after reconnection
  autoResendUnfulfilledCommands: true
});

// Track connection events
subscriber.on("connect", () => {
  console.log("Subscriber connected");
});

subscriber.on("ready", () => {
  console.log("Subscriber ready for commands");
});

subscriber.on("reconnecting", (ms) => {
  console.log(`Subscriber reconnecting in ${ms}ms`);
});

subscriber.on("error", (err) => {
  console.error("Subscriber error:", err);
});

// Subscribe after connection is ready
subscriber.on("ready", async () => {
  await subscriber.subscribe("important-channel");
  await subscriber.psubscribe("user:*:notifications");
});

Message Queue Pattern

Implement reliable message queuing using pub/sub with acknowledgments.

class ReliableMessageQueue {
  private publisher: Redis;
  private subscriber: Redis;
  private processor: Redis;
  private processingSet = new Set<string>();

  constructor() {
    this.publisher = new Redis();
    this.subscriber = new Redis();
    this.processor = new Redis();
    
    this.setupSubscriber();
  }

  private setupSubscriber() {
    this.subscriber.on("message", async (channel, message) => {
      if (channel === "work-queue") {
        await this.processMessage(message);
      }
    });
    
    this.subscriber.subscribe("work-queue");
  }

  async publishMessage(data: any): Promise<void> {
    const messageId = `msg:${Date.now()}:${Math.random()}`;
    const message = JSON.stringify({ id: messageId, data, timestamp: Date.now() });
    
    // Store message for reliability
    await this.processor.setex(`pending:${messageId}`, 300, message); // 5 min expiry
    
    // Publish to queue
    await this.publisher.publish("work-queue", message);
  }

  private async processMessage(message: string): Promise<void> {
    try {
      const { id, data } = JSON.parse(message);
      
      // Prevent duplicate processing
      if (this.processingSet.has(id)) return;
      this.processingSet.add(id);
      
      // Process the message
      await this.handleWork(data);
      
      // Acknowledge processing
      await this.processor.del(`pending:${id}`);
      
      console.log(`Processed message ${id}`);
    } catch (error) {
      console.error("Message processing error:", error);
    } finally {
      // Clean up processing set
      if (message) {
        const { id } = JSON.parse(message);
        this.processingSet.delete(id);
      }
    }
  }

  private async handleWork(data: any): Promise<void> {
    // Implement your business logic here
    await new Promise(resolve => setTimeout(resolve, 100)); // Simulate work
    console.log("Processed work item:", data);
  }
}

// Usage
const queue = new ReliableMessageQueue();
await queue.publishMessage({ task: "send_email", recipient: "user@example.com" });

Event-Driven Architecture

Build event-driven systems using pub/sub for loose coupling between components.

class EventBus {
  private redis: Redis;
  private handlers = new Map<string, Array<(data: any) => Promise<void>>>();

  constructor() {
    this.redis = new Redis();
    this.setupSubscriptions();
  }

  private setupSubscriptions() {
    this.redis.on("pmessage", async (pattern, channel, message) => {
      const eventType = channel.split(":")[1];
      const handlers = this.handlers.get(eventType) || [];
      
      try {
        const data = JSON.parse(message);
        await Promise.all(handlers.map(handler => handler(data)));
      } catch (error) {
        console.error(`Error processing event ${eventType}:`, error);
      }
    });
    
    this.redis.psubscribe("event:*");
  }

  on(eventType: string, handler: (data: any) => Promise<void>) {
    if (!this.handlers.has(eventType)) {
      this.handlers.set(eventType, []);
    }
    this.handlers.get(eventType)!.push(handler);
  }

  async emit(eventType: string, data: any): Promise<void> {
    const message = JSON.stringify(data);
    await this.redis.publish(`event:${eventType}`, message);
  }
}

// Usage
const eventBus = new EventBus();

// Register event handlers
eventBus.on("user_registered", async (userData) => {
  console.log("Sending welcome email to:", userData.email); 
  // Send welcome email logic
});

eventBus.on("user_registered", async (userData) => {
  console.log("Creating user profile for:", userData.id);
  // Create user profile logic
});

eventBus.on("order_placed", async (orderData) => {
  console.log("Processing order:", orderData.orderId);
  // Order processing logic
});

// Emit events
await eventBus.emit("user_registered", { 
  id: 123, 
  email: "newuser@example.com", 
  name: "John Doe" 
});

await eventBus.emit("order_placed", { 
  orderId: "order-456", 
  userId: 123, 
  amount: 99.99 
});

Keyspace Notifications

Subscribe to Redis keyspace notifications for database change events.

// Enable keyspace notifications in Redis config
// CONFIG SET notify-keyspace-events "KEA"

const subscriber = new Redis();

// Subscribe to keyspace events
await subscriber.psubscribe("__keyspace@0__:*"); // Database 0 keyspace events
await subscriber.psubscribe("__keyevent@0__:*"); // Database 0 keyevent events

subscriber.on("pmessage", (pattern, channel, message) => {
  if (pattern.includes("keyspace")) {
    // Key-based notifications: __keyspace@0__:mykey -> set
    const key = channel.split(":")[1];
    const operation = message;
    console.log(`Key '${key}' had operation: ${operation}`);
  } else if (pattern.includes("keyevent")) {
    // Event-based notifications: __keyevent@0__:set -> mykey  
    const operation = channel.split(":")[1];
    const key = message;
    console.log(`Operation '${operation}' on key: ${key}`);
  }
});

// Test keyspace notifications
const testRedis = new Redis();
await testRedis.set("test_key", "value");    // Triggers notifications
await testRedis.del("test_key");             // Triggers notifications
await testRedis.expire("another_key", 60);   // Triggers notifications

Types

type MessageListener = (channel: string, message: string) => void;
type PatternMessageListener = (pattern: string, channel: string, message: string) => void;
type BufferMessageListener = (channel: Buffer, message: Buffer) => void;
type BufferPatternMessageListener = (pattern: Buffer, channel: Buffer, message: Buffer) => void;
type SubscriptionListener = (channel: string, count: number) => void;
type PatternSubscriptionListener = (pattern: string, count: number) => void;

docs

cluster.md

commands.md

configuration.md

index.md

pipelining.md

pubsub.md

redis-client.md

streaming.md

tile.json