CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-ioredis

A robust, performance-focused and full-featured Redis client for Node.js with TypeScript support, clustering, sentinel management, and comprehensive Redis command coverage.

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

streaming.mddocs/

Streaming

ioredis provides readable streams for efficiently scanning large datasets using Redis SCAN operations. These streams allow memory-efficient iteration over keys, set members, hash fields, and sorted set members without loading entire datasets into memory.

Capabilities

Key Scanning Streams

Scan Redis keys using patterns with memory-efficient streaming interface.

// Key scanning streams
scanStream(options?: ScanStreamOptions): ScanStream;
scanBufferStream(options?: ScanStreamOptions): ScanStream;

interface ScanStreamOptions {
  match?: string;      // Pattern to match keys (default: "*")
  count?: number;      // Hint for batch size (default: 10)
  type?: string;       // Filter by key type
}

class ScanStream extends Readable {
  close(): void;       // Stop scanning and end stream
}

Usage Examples:

import Redis from "ioredis";

const redis = new Redis();

// Basic key scanning
const stream = redis.scanStream();
stream.on("data", (keys: string[]) => {
  console.log("Found keys:", keys);
});

stream.on("end", () => {
  console.log("Scan completed");
});

// Pattern-based scanning
const userStream = redis.scanStream({
  match: "user:*",
  count: 100    // Process ~100 keys per batch
});

userStream.on("data", (keys: string[]) => {
  keys.forEach(key => {
    console.log("User key:", key);
  });
});

// Type-based scanning (Redis 6.0+)
const hashStream = redis.scanStream({
  match: "*",
  type: "hash",
  count: 50
});

hashStream.on("data", (hashKeys: string[]) => {
  console.log("Hash keys:", hashKeys);
});

// Manual stream control
const controlledStream = redis.scanStream({ match: "temp:*" });
controlledStream.on("data", (keys: string[]) => {
  if (keys.length > 1000) {
    console.log("Too many keys, stopping scan");
    controlledStream.close();
  }
});

Set Scanning Streams

Scan set members efficiently using SSCAN-based streams.

// Set member scanning
sscanStream(key: string, options?: ScanStreamOptions): ScanStream;
sscanBufferStream(key: string, options?: ScanStreamOptions): ScanStream;

Usage Examples:

// Scan large set members
const setStream = redis.sscanStream("large_set", {
  match: "*pattern*",
  count: 200
});

setStream.on("data", (members: string[]) => {
  console.log(`Processing ${members.length} set members:`, members);
  
  // Process members in batches
  members.forEach(member => {
    processSetMember(member);
  });
});

setStream.on("end", () => {
  console.log("Set scan completed");
});

// Error handling
setStream.on("error", (err) => {
  console.error("Set scan error:", err);
});

async function processSetMember(member: string) {
  // Example: validate and process each member
  if (member.startsWith("valid_")) {
    await redis.sadd("processed_set", member);
  }
}

Hash Scanning Streams

Scan hash fields and values using HSCAN-based streams.

// Hash field scanning
hscanStream(key: string, options?: ScanStreamOptions): ScanStream;
hscanBufferStream(key: string, options?: ScanStreamOptions): ScanStream;

Usage Examples:

// Scan hash fields and values
const hashStream = redis.hscanStream("user_profiles", {
  match: "email*",   // Only email-related fields
  count: 50
});

hashStream.on("data", (fields: string[]) => {
  // Fields come in pairs: [field1, value1, field2, value2, ...]
  for (let i = 0; i < fields.length; i += 2) {
    const field = fields[i];
    const value = fields[i + 1];
    console.log(`Field: ${field}, Value: ${value}`);
    
    // Process field-value pairs
    if (field.startsWith("email") && value.includes("@")) {
      validateEmail(value);
    }
  }
});

// Batch processing hash data
const configStream = redis.hscanStream("app_config");
const configBatch = new Map<string, string>();

configStream.on("data", (fields: string[]) => {
  for (let i = 0; i < fields.length; i += 2) {
    configBatch.set(fields[i], fields[i + 1]);
  }
  
  // Process batch when it reaches certain size
  if (configBatch.size >= 100) {
    procesConfigBatch(configBatch);
    configBatch.clear();
  }
});

configStream.on("end", () => {
  // Process remaining items
  if (configBatch.size > 0) {
    procesConfigBatch(configBatch);
  }
});

function validateEmail(email: string) {
  // Email validation logic
}

function procesConfigBatch(config: Map<string, string>) {
  // Batch configuration processing
  console.log(`Processing ${config.size} config items`);
}

Sorted Set Scanning Streams

Scan sorted set members and scores using ZSCAN-based streams.

// Sorted set member scanning
zscanStream(key: string, options?: ScanStreamOptions): ScanStream;
zscanBufferStream(key: string, options?: ScanStreamOptions): ScanStream;

Usage Examples:

// Scan leaderboard entries
const leaderboardStream = redis.zscanStream("game_leaderboard", {
  match: "player:*",
  count: 25
});

leaderboardStream.on("data", (members: string[]) => {
  // Members come in pairs: [member1, score1, member2, score2, ...]
  for (let i = 0; i < members.length; i += 2) {
    const member = members[i];
    const score = parseFloat(members[i + 1]);
    
    console.log(`Player: ${member}, Score: ${score}`);
    
    // Process high scores
    if (score > 1000) {
      processHighScore(member, score);
    }
  }
});

// Aggregate statistics while streaming
let totalScore = 0;
let playerCount = 0;
const scoreStream = redis.zscanStream("scores");

scoreStream.on("data", (members: string[]) => {
  for (let i = 0; i < members.length; i += 2) {
    totalScore += parseFloat(members[i + 1]);
    playerCount++;
  }
});

scoreStream.on("end", () => {
  const avgScore = totalScore / playerCount;
  console.log(`Average score: ${avgScore} (${playerCount} players)`);
});

function processHighScore(player: string, score: number) {
  // High score processing logic
  console.log(`High score alert: ${player} scored ${score}`);
}

Advanced Streaming Patterns

Stream Pipeline Processing

Combine streaming with pipeline operations for efficient batch processing.

class StreamProcessor {
  private redis: Redis;
  private pipeline: any;
  private batchSize: number;
  private batchCount = 0;

  constructor(redis: Redis, batchSize = 100) {
    this.redis = redis;
    this.batchSize = batchSize;
    this.pipeline = redis.pipeline();
  }

  async processKeysWithPattern(pattern: string): Promise<void> {
    const stream = this.redis.scanStream({ 
      match: pattern, 
      count: 50 
    });

    stream.on("data", async (keys: string[]) => {
      for (const key of keys) {
        // Add operations to pipeline
        this.pipeline.type(key);
        this.pipeline.ttl(key);
        this.batchCount += 2;

        // Execute batch when limit reached  
        if (this.batchCount >= this.batchSize) {
          await this.executeBatch();
        }
      }
    });

    stream.on("end", async () => {
      // Execute remaining operations
      if (this.batchCount > 0) {
        await this.executeBatch();
      }
      console.log("Stream processing completed");
    });
  }

  private async executeBatch(): Promise<void> {
    const results = await this.pipeline.exec();
    
    // Process results in pairs (type, ttl)
    for (let i = 0; i < results.length; i += 2) {
      const [typeErr, type] = results[i];
      const [ttlErr, ttl] = results[i + 1];
      
      if (!typeErr && !ttlErr) {
        console.log(`Key type: ${type}, TTL: ${ttl}`);
      }
    }
    
    // Reset for next batch
    this.pipeline = this.redis.pipeline();
    this.batchCount = 0;
  }
}

// Usage
const processor = new StreamProcessor(redis, 200);
await processor.processKeysWithPattern("session:*");

Stream Data Migration

Use streams for efficient data migration between Redis instances or data transformation.

class DataMigrator {
  private sourceRedis: Redis;
  private targetRedis: Redis;

  constructor(sourceOptions: any, targetOptions: any) {
    this.sourceRedis = new Redis(sourceOptions);
    this.targetRedis = new Redis(targetOptions);
  }

  async migrateHashData(sourceKey: string, targetKey: string): Promise<void> {
    const stream = this.sourceRedis.hscanStream(sourceKey, { count: 100 });
    const pipeline = this.targetRedis.pipeline();
    let batchCount = 0;

    stream.on("data", async (fields: string[]) => {
      // Process field-value pairs
      for (let i = 0; i < fields.length; i += 2) {
        const field = fields[i];
        const value = fields[i + 1];
        
        // Transform data if needed
        const transformedValue = this.transformValue(value);
        pipeline.hset(targetKey, field, transformedValue);
        batchCount++;
        
        // Execute in batches
        if (batchCount >= 50) {
          await pipeline.exec();
          pipeline = this.targetRedis.pipeline();
          batchCount = 0;
        }
      }
    });

    stream.on("end", async () => {
      // Execute remaining operations
      if (batchCount > 0) {
        await pipeline.exec();
      }
      console.log(`Migration completed: ${sourceKey} -> ${targetKey}`);
    });

    stream.on("error", (err) => {
      console.error("Migration error:", err);
    });
  }

  private transformValue(value: string): string {
    // Example transformation: convert to uppercase
    return value.toUpperCase();
  }

  async close(): Promise<void> {
    await this.sourceRedis.quit();
    await this.targetRedis.quit();
  }
}

// Usage
const migrator = new DataMigrator(
  { host: "source.redis.com" },
  { host: "target.redis.com" }
);

await migrator.migrateHashData("old_user_data", "new_user_data");
await migrator.close();

Memory-Efficient Analytics

Process large datasets for analytics without loading everything into memory.

class RedisAnalytics {
  private redis: Redis;

  constructor(redis: Redis) {
    this.redis = redis;
  }

  async analyzeKeyPatterns(): Promise<{[type: string]: number}> {
    const typeStats: {[type: string]: number} = {};
    const stream = this.redis.scanStream({ count: 100 });

    return new Promise((resolve, reject) => {
      const pipeline = this.redis.pipeline();
      let batchCount = 0;

      stream.on("data", async (keys: string[]) => {
        // Add type commands to pipeline
        keys.forEach(key => {
          pipeline.type(key);
          batchCount++;
        });

        // Process batch
        if (batchCount >= 100) {
          const results = await pipeline.exec();
          this.processBatchResults(results, typeStats);
          
          // Reset batch
          this.redis.pipeline();
          batchCount = 0;
        }
      });

      stream.on("end", async () => {
        // Process remaining keys
        if (batchCount > 0) {
          const results = await pipeline.exec();
          this.processBatchResults(results, typeStats);
        }
        
        resolve(typeStats);
      });

      stream.on("error", reject);
    });
  }

  private processBatchResults(results: any[], typeStats: {[type: string]: number}): void {
    results.forEach(([err, type]) => {
      if (!err && type) {
        typeStats[type] = (typeStats[type] || 0) + 1;
      }
    });
  }

  async analyzeSetDistribution(setKey: string): Promise<{total: number, uniquePatterns: number}> {
    const stream = this.redis.sscanStream(setKey, { count: 200 });
    const patterns = new Set<string>();
    let totalMembers = 0;

    return new Promise((resolve, reject) => {
      stream.on("data", (members: string[]) => {
        members.forEach(member => {
          totalMembers++;
          // Extract pattern (first 3 characters)
          const pattern = member.substring(0, 3);
          patterns.add(pattern);
        });
      });

      stream.on("end", () => {
        resolve({
          total: totalMembers,
          uniquePatterns: patterns.size
        });
      });

      stream.on("error", reject);
    });
  }
}

// Usage
const analytics = new RedisAnalytics(redis);

// Analyze key type distribution
const keyStats = await analytics.analyzeKeyPatterns();
console.log("Key type distribution:", keyStats);

// Analyze set member patterns
const setStats = await analytics.analyzeSetDistribution("large_member_set");
console.log("Set analysis:", setStats);

Error Handling and Resilience

Implement robust error handling for streaming operations.

class ResilientStreamer {
  private redis: Redis;
  private maxRetries: number;

  constructor(redis: Redis, maxRetries = 3) {
    this.redis = redis;
    this.maxRetries = maxRetries;
  }

  async processWithRetry(pattern: string, processor: (keys: string[]) => Promise<void>): Promise<void> {
    let retries = 0;
    
    while (retries < this.maxRetries) {
      try {
        await this.processStream(pattern, processor);
        return; // Success
      } catch (error) {
        retries++;
        console.error(`Stream processing failed (attempt ${retries}):`, error);
        
        if (retries >= this.maxRetries) {
          throw new Error(`Failed after ${this.maxRetries} retries: ${error.message}`);
        }
        
        // Wait before retry
        await new Promise(resolve => setTimeout(resolve, 1000 * retries));
      }
    }
  }

  private processStream(pattern: string, processor: (keys: string[]) => Promise<void>): Promise<void> {
    return new Promise((resolve, reject) => {
      const stream = this.redis.scanStream({ 
        match: pattern, 
        count: 50 
      });
      
      stream.on("data", async (keys: string[]) => {
        try {
          await processor(keys);
        } catch (error) {
          stream.close();
          reject(error);
        }
      });
      
      stream.on("end", resolve);
      stream.on("error", reject);
      
      // Timeout protection
      setTimeout(() => {
        stream.close();
        reject(new Error("Stream processing timeout"));
      }, 30000); // 30 second timeout
    });
  }
}

// Usage
const resilientStreamer = new ResilientStreamer(redis);

await resilientStreamer.processWithRetry("user:*", async (keys) => {
  // Processing logic that might fail
  console.log(`Processing ${keys.length} keys`);
  
  if (Math.random() < 0.1) {
    throw new Error("Simulated processing error");
  }
  
  // Actual processing
  for (const key of keys) {
    await processUserKey(key);
  }
});

async function processUserKey(key: string): Promise<void> {
  // User key processing logic  
}

Types

interface ScanStreamOptions {
  match?: string;    // Pattern to match (default: "*")
  count?: number;    // Batch size hint (default: 10)
  type?: string;     // Key type filter
}

type StreamDataHandler = (data: string[]) => void;
type StreamEndHandler = () => void;
type StreamErrorHandler = (error: Error) => void;

docs

cluster.md

commands.md

configuration.md

index.md

pipelining.md

pubsub.md

redis-client.md

streaming.md

tile.json