A robust, performance-focused and full-featured Redis client for Node.js with TypeScript support, clustering, sentinel management, and comprehensive Redis command coverage.
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
ioredis provides readable streams for efficiently scanning large datasets using Redis SCAN operations. These streams allow memory-efficient iteration over keys, set members, hash fields, and sorted set members without loading entire datasets into memory.
Scan Redis keys using patterns with memory-efficient streaming interface.
// Key scanning streams
scanStream(options?: ScanStreamOptions): ScanStream;
scanBufferStream(options?: ScanStreamOptions): ScanStream;
interface ScanStreamOptions {
match?: string; // Pattern to match keys (default: "*")
count?: number; // Hint for batch size (default: 10)
type?: string; // Filter by key type
}
class ScanStream extends Readable {
close(): void; // Stop scanning and end stream
}Usage Examples:
import Redis from "ioredis";
const redis = new Redis();
// Basic key scanning
const stream = redis.scanStream();
stream.on("data", (keys: string[]) => {
console.log("Found keys:", keys);
});
stream.on("end", () => {
console.log("Scan completed");
});
// Pattern-based scanning
const userStream = redis.scanStream({
match: "user:*",
count: 100 // Process ~100 keys per batch
});
userStream.on("data", (keys: string[]) => {
keys.forEach(key => {
console.log("User key:", key);
});
});
// Type-based scanning (Redis 6.0+)
const hashStream = redis.scanStream({
match: "*",
type: "hash",
count: 50
});
hashStream.on("data", (hashKeys: string[]) => {
console.log("Hash keys:", hashKeys);
});
// Manual stream control
const controlledStream = redis.scanStream({ match: "temp:*" });
controlledStream.on("data", (keys: string[]) => {
if (keys.length > 1000) {
console.log("Too many keys, stopping scan");
controlledStream.close();
}
});Scan set members efficiently using SSCAN-based streams.
// Set member scanning
sscanStream(key: string, options?: ScanStreamOptions): ScanStream;
sscanBufferStream(key: string, options?: ScanStreamOptions): ScanStream;Usage Examples:
// Scan large set members
const setStream = redis.sscanStream("large_set", {
match: "*pattern*",
count: 200
});
setStream.on("data", (members: string[]) => {
console.log(`Processing ${members.length} set members:`, members);
// Process members in batches
members.forEach(member => {
processSetMember(member);
});
});
setStream.on("end", () => {
console.log("Set scan completed");
});
// Error handling
setStream.on("error", (err) => {
console.error("Set scan error:", err);
});
async function processSetMember(member: string) {
// Example: validate and process each member
if (member.startsWith("valid_")) {
await redis.sadd("processed_set", member);
}
}Scan hash fields and values using HSCAN-based streams.
// Hash field scanning
hscanStream(key: string, options?: ScanStreamOptions): ScanStream;
hscanBufferStream(key: string, options?: ScanStreamOptions): ScanStream;Usage Examples:
// Scan hash fields and values
const hashStream = redis.hscanStream("user_profiles", {
match: "email*", // Only email-related fields
count: 50
});
hashStream.on("data", (fields: string[]) => {
// Fields come in pairs: [field1, value1, field2, value2, ...]
for (let i = 0; i < fields.length; i += 2) {
const field = fields[i];
const value = fields[i + 1];
console.log(`Field: ${field}, Value: ${value}`);
// Process field-value pairs
if (field.startsWith("email") && value.includes("@")) {
validateEmail(value);
}
}
});
// Batch processing hash data
const configStream = redis.hscanStream("app_config");
const configBatch = new Map<string, string>();
configStream.on("data", (fields: string[]) => {
for (let i = 0; i < fields.length; i += 2) {
configBatch.set(fields[i], fields[i + 1]);
}
// Process batch when it reaches certain size
if (configBatch.size >= 100) {
procesConfigBatch(configBatch);
configBatch.clear();
}
});
configStream.on("end", () => {
// Process remaining items
if (configBatch.size > 0) {
procesConfigBatch(configBatch);
}
});
function validateEmail(email: string) {
// Email validation logic
}
function procesConfigBatch(config: Map<string, string>) {
// Batch configuration processing
console.log(`Processing ${config.size} config items`);
}Scan sorted set members and scores using ZSCAN-based streams.
// Sorted set member scanning
zscanStream(key: string, options?: ScanStreamOptions): ScanStream;
zscanBufferStream(key: string, options?: ScanStreamOptions): ScanStream;Usage Examples:
// Scan leaderboard entries
const leaderboardStream = redis.zscanStream("game_leaderboard", {
match: "player:*",
count: 25
});
leaderboardStream.on("data", (members: string[]) => {
// Members come in pairs: [member1, score1, member2, score2, ...]
for (let i = 0; i < members.length; i += 2) {
const member = members[i];
const score = parseFloat(members[i + 1]);
console.log(`Player: ${member}, Score: ${score}`);
// Process high scores
if (score > 1000) {
processHighScore(member, score);
}
}
});
// Aggregate statistics while streaming
let totalScore = 0;
let playerCount = 0;
const scoreStream = redis.zscanStream("scores");
scoreStream.on("data", (members: string[]) => {
for (let i = 0; i < members.length; i += 2) {
totalScore += parseFloat(members[i + 1]);
playerCount++;
}
});
scoreStream.on("end", () => {
const avgScore = totalScore / playerCount;
console.log(`Average score: ${avgScore} (${playerCount} players)`);
});
function processHighScore(player: string, score: number) {
// High score processing logic
console.log(`High score alert: ${player} scored ${score}`);
}Combine streaming with pipeline operations for efficient batch processing.
class StreamProcessor {
private redis: Redis;
private pipeline: any;
private batchSize: number;
private batchCount = 0;
constructor(redis: Redis, batchSize = 100) {
this.redis = redis;
this.batchSize = batchSize;
this.pipeline = redis.pipeline();
}
async processKeysWithPattern(pattern: string): Promise<void> {
const stream = this.redis.scanStream({
match: pattern,
count: 50
});
stream.on("data", async (keys: string[]) => {
for (const key of keys) {
// Add operations to pipeline
this.pipeline.type(key);
this.pipeline.ttl(key);
this.batchCount += 2;
// Execute batch when limit reached
if (this.batchCount >= this.batchSize) {
await this.executeBatch();
}
}
});
stream.on("end", async () => {
// Execute remaining operations
if (this.batchCount > 0) {
await this.executeBatch();
}
console.log("Stream processing completed");
});
}
private async executeBatch(): Promise<void> {
const results = await this.pipeline.exec();
// Process results in pairs (type, ttl)
for (let i = 0; i < results.length; i += 2) {
const [typeErr, type] = results[i];
const [ttlErr, ttl] = results[i + 1];
if (!typeErr && !ttlErr) {
console.log(`Key type: ${type}, TTL: ${ttl}`);
}
}
// Reset for next batch
this.pipeline = this.redis.pipeline();
this.batchCount = 0;
}
}
// Usage
const processor = new StreamProcessor(redis, 200);
await processor.processKeysWithPattern("session:*");Use streams for efficient data migration between Redis instances or data transformation.
class DataMigrator {
private sourceRedis: Redis;
private targetRedis: Redis;
constructor(sourceOptions: any, targetOptions: any) {
this.sourceRedis = new Redis(sourceOptions);
this.targetRedis = new Redis(targetOptions);
}
async migrateHashData(sourceKey: string, targetKey: string): Promise<void> {
const stream = this.sourceRedis.hscanStream(sourceKey, { count: 100 });
const pipeline = this.targetRedis.pipeline();
let batchCount = 0;
stream.on("data", async (fields: string[]) => {
// Process field-value pairs
for (let i = 0; i < fields.length; i += 2) {
const field = fields[i];
const value = fields[i + 1];
// Transform data if needed
const transformedValue = this.transformValue(value);
pipeline.hset(targetKey, field, transformedValue);
batchCount++;
// Execute in batches
if (batchCount >= 50) {
await pipeline.exec();
pipeline = this.targetRedis.pipeline();
batchCount = 0;
}
}
});
stream.on("end", async () => {
// Execute remaining operations
if (batchCount > 0) {
await pipeline.exec();
}
console.log(`Migration completed: ${sourceKey} -> ${targetKey}`);
});
stream.on("error", (err) => {
console.error("Migration error:", err);
});
}
private transformValue(value: string): string {
// Example transformation: convert to uppercase
return value.toUpperCase();
}
async close(): Promise<void> {
await this.sourceRedis.quit();
await this.targetRedis.quit();
}
}
// Usage
const migrator = new DataMigrator(
{ host: "source.redis.com" },
{ host: "target.redis.com" }
);
await migrator.migrateHashData("old_user_data", "new_user_data");
await migrator.close();Process large datasets for analytics without loading everything into memory.
class RedisAnalytics {
private redis: Redis;
constructor(redis: Redis) {
this.redis = redis;
}
async analyzeKeyPatterns(): Promise<{[type: string]: number}> {
const typeStats: {[type: string]: number} = {};
const stream = this.redis.scanStream({ count: 100 });
return new Promise((resolve, reject) => {
const pipeline = this.redis.pipeline();
let batchCount = 0;
stream.on("data", async (keys: string[]) => {
// Add type commands to pipeline
keys.forEach(key => {
pipeline.type(key);
batchCount++;
});
// Process batch
if (batchCount >= 100) {
const results = await pipeline.exec();
this.processBatchResults(results, typeStats);
// Reset batch
this.redis.pipeline();
batchCount = 0;
}
});
stream.on("end", async () => {
// Process remaining keys
if (batchCount > 0) {
const results = await pipeline.exec();
this.processBatchResults(results, typeStats);
}
resolve(typeStats);
});
stream.on("error", reject);
});
}
private processBatchResults(results: any[], typeStats: {[type: string]: number}): void {
results.forEach(([err, type]) => {
if (!err && type) {
typeStats[type] = (typeStats[type] || 0) + 1;
}
});
}
async analyzeSetDistribution(setKey: string): Promise<{total: number, uniquePatterns: number}> {
const stream = this.redis.sscanStream(setKey, { count: 200 });
const patterns = new Set<string>();
let totalMembers = 0;
return new Promise((resolve, reject) => {
stream.on("data", (members: string[]) => {
members.forEach(member => {
totalMembers++;
// Extract pattern (first 3 characters)
const pattern = member.substring(0, 3);
patterns.add(pattern);
});
});
stream.on("end", () => {
resolve({
total: totalMembers,
uniquePatterns: patterns.size
});
});
stream.on("error", reject);
});
}
}
// Usage
const analytics = new RedisAnalytics(redis);
// Analyze key type distribution
const keyStats = await analytics.analyzeKeyPatterns();
console.log("Key type distribution:", keyStats);
// Analyze set member patterns
const setStats = await analytics.analyzeSetDistribution("large_member_set");
console.log("Set analysis:", setStats);Implement robust error handling for streaming operations.
class ResilientStreamer {
private redis: Redis;
private maxRetries: number;
constructor(redis: Redis, maxRetries = 3) {
this.redis = redis;
this.maxRetries = maxRetries;
}
async processWithRetry(pattern: string, processor: (keys: string[]) => Promise<void>): Promise<void> {
let retries = 0;
while (retries < this.maxRetries) {
try {
await this.processStream(pattern, processor);
return; // Success
} catch (error) {
retries++;
console.error(`Stream processing failed (attempt ${retries}):`, error);
if (retries >= this.maxRetries) {
throw new Error(`Failed after ${this.maxRetries} retries: ${error.message}`);
}
// Wait before retry
await new Promise(resolve => setTimeout(resolve, 1000 * retries));
}
}
}
private processStream(pattern: string, processor: (keys: string[]) => Promise<void>): Promise<void> {
return new Promise((resolve, reject) => {
const stream = this.redis.scanStream({
match: pattern,
count: 50
});
stream.on("data", async (keys: string[]) => {
try {
await processor(keys);
} catch (error) {
stream.close();
reject(error);
}
});
stream.on("end", resolve);
stream.on("error", reject);
// Timeout protection
setTimeout(() => {
stream.close();
reject(new Error("Stream processing timeout"));
}, 30000); // 30 second timeout
});
}
}
// Usage
const resilientStreamer = new ResilientStreamer(redis);
await resilientStreamer.processWithRetry("user:*", async (keys) => {
// Processing logic that might fail
console.log(`Processing ${keys.length} keys`);
if (Math.random() < 0.1) {
throw new Error("Simulated processing error");
}
// Actual processing
for (const key of keys) {
await processUserKey(key);
}
});
async function processUserKey(key: string): Promise<void> {
// User key processing logic
}interface ScanStreamOptions {
match?: string; // Pattern to match (default: "*")
count?: number; // Batch size hint (default: 10)
type?: string; // Key type filter
}
type StreamDataHandler = (data: string[]) => void;
type StreamEndHandler = () => void;
type StreamErrorHandler = (error: Error) => void;