0
# Streaming
1
2
ioredis provides readable streams for efficiently scanning large datasets using Redis SCAN operations. These streams allow memory-efficient iteration over keys, set members, hash fields, and sorted set members without loading entire datasets into memory.
3
4
## Capabilities
5
6
### Key Scanning Streams
7
8
Scan Redis keys using patterns with memory-efficient streaming interface.
9
10
```typescript { .api }
11
// Key scanning streams
12
scanStream(options?: ScanStreamOptions): ScanStream;
13
scanBufferStream(options?: ScanStreamOptions): ScanStream;
14
15
interface ScanStreamOptions {
16
match?: string; // Pattern to match keys (default: "*")
17
count?: number; // Hint for batch size (default: 10)
18
type?: string; // Filter by key type
19
}
20
21
class ScanStream extends Readable {
22
close(): void; // Stop scanning and end stream
23
}
24
```
25
26
**Usage Examples:**
27
28
```typescript
29
import Redis from "ioredis";
30
31
const redis = new Redis();
32
33
// Basic key scanning
34
const stream = redis.scanStream();
35
stream.on("data", (keys: string[]) => {
36
console.log("Found keys:", keys);
37
});
38
39
stream.on("end", () => {
40
console.log("Scan completed");
41
});
42
43
// Pattern-based scanning
44
const userStream = redis.scanStream({
45
match: "user:*",
46
count: 100 // Process ~100 keys per batch
47
});
48
49
userStream.on("data", (keys: string[]) => {
50
keys.forEach(key => {
51
console.log("User key:", key);
52
});
53
});
54
55
// Type-based scanning (Redis 6.0+)
56
const hashStream = redis.scanStream({
57
match: "*",
58
type: "hash",
59
count: 50
60
});
61
62
hashStream.on("data", (hashKeys: string[]) => {
63
console.log("Hash keys:", hashKeys);
64
});
65
66
// Manual stream control
67
const controlledStream = redis.scanStream({ match: "temp:*" });
68
controlledStream.on("data", (keys: string[]) => {
69
if (keys.length > 1000) {
70
console.log("Too many keys, stopping scan");
71
controlledStream.close();
72
}
73
});
74
```
75
76
### Set Scanning Streams
77
78
Scan set members efficiently using SSCAN-based streams.
79
80
```typescript { .api }
81
// Set member scanning
82
sscanStream(key: string, options?: ScanStreamOptions): ScanStream;
83
sscanBufferStream(key: string, options?: ScanStreamOptions): ScanStream;
84
```
85
86
**Usage Examples:**
87
88
```typescript
89
// Scan large set members
90
const setStream = redis.sscanStream("large_set", {
91
match: "*pattern*",
92
count: 200
93
});
94
95
setStream.on("data", (members: string[]) => {
96
console.log(`Processing ${members.length} set members:`, members);
97
98
// Process members in batches
99
members.forEach(member => {
100
processSetMember(member);
101
});
102
});
103
104
setStream.on("end", () => {
105
console.log("Set scan completed");
106
});
107
108
// Error handling
109
setStream.on("error", (err) => {
110
console.error("Set scan error:", err);
111
});
112
113
async function processSetMember(member: string) {
114
// Example: validate and process each member
115
if (member.startsWith("valid_")) {
116
await redis.sadd("processed_set", member);
117
}
118
}
119
```
120
121
### Hash Scanning Streams
122
123
Scan hash fields and values using HSCAN-based streams.
124
125
```typescript { .api }
126
// Hash field scanning
127
hscanStream(key: string, options?: ScanStreamOptions): ScanStream;
128
hscanBufferStream(key: string, options?: ScanStreamOptions): ScanStream;
129
```
130
131
**Usage Examples:**
132
133
```typescript
134
// Scan hash fields and values
135
const hashStream = redis.hscanStream("user_profiles", {
136
match: "email*", // Only email-related fields
137
count: 50
138
});
139
140
hashStream.on("data", (fields: string[]) => {
141
// Fields come in pairs: [field1, value1, field2, value2, ...]
142
for (let i = 0; i < fields.length; i += 2) {
143
const field = fields[i];
144
const value = fields[i + 1];
145
console.log(`Field: ${field}, Value: ${value}`);
146
147
// Process field-value pairs
148
if (field.startsWith("email") && value.includes("@")) {
149
validateEmail(value);
150
}
151
}
152
});
153
154
// Batch processing hash data
155
const configStream = redis.hscanStream("app_config");
156
const configBatch = new Map<string, string>();
157
158
configStream.on("data", (fields: string[]) => {
159
for (let i = 0; i < fields.length; i += 2) {
160
configBatch.set(fields[i], fields[i + 1]);
161
}
162
163
// Process batch when it reaches certain size
164
if (configBatch.size >= 100) {
165
procesConfigBatch(configBatch);
166
configBatch.clear();
167
}
168
});
169
170
configStream.on("end", () => {
171
// Process remaining items
172
if (configBatch.size > 0) {
173
procesConfigBatch(configBatch);
174
}
175
});
176
177
function validateEmail(email: string) {
178
// Email validation logic
179
}
180
181
function procesConfigBatch(config: Map<string, string>) {
182
// Batch configuration processing
183
console.log(`Processing ${config.size} config items`);
184
}
185
```
186
187
### Sorted Set Scanning Streams
188
189
Scan sorted set members and scores using ZSCAN-based streams.
190
191
```typescript { .api }
192
// Sorted set member scanning
193
zscanStream(key: string, options?: ScanStreamOptions): ScanStream;
194
zscanBufferStream(key: string, options?: ScanStreamOptions): ScanStream;
195
```
196
197
**Usage Examples:**
198
199
```typescript
200
// Scan leaderboard entries
201
const leaderboardStream = redis.zscanStream("game_leaderboard", {
202
match: "player:*",
203
count: 25
204
});
205
206
leaderboardStream.on("data", (members: string[]) => {
207
// Members come in pairs: [member1, score1, member2, score2, ...]
208
for (let i = 0; i < members.length; i += 2) {
209
const member = members[i];
210
const score = parseFloat(members[i + 1]);
211
212
console.log(`Player: ${member}, Score: ${score}`);
213
214
// Process high scores
215
if (score > 1000) {
216
processHighScore(member, score);
217
}
218
}
219
});
220
221
// Aggregate statistics while streaming
222
let totalScore = 0;
223
let playerCount = 0;
224
const scoreStream = redis.zscanStream("scores");
225
226
scoreStream.on("data", (members: string[]) => {
227
for (let i = 0; i < members.length; i += 2) {
228
totalScore += parseFloat(members[i + 1]);
229
playerCount++;
230
}
231
});
232
233
scoreStream.on("end", () => {
234
const avgScore = totalScore / playerCount;
235
console.log(`Average score: ${avgScore} (${playerCount} players)`);
236
});
237
238
function processHighScore(player: string, score: number) {
239
// High score processing logic
240
console.log(`High score alert: ${player} scored ${score}`);
241
}
242
```
243
244
## Advanced Streaming Patterns
245
246
### Stream Pipeline Processing
247
248
Combine streaming with pipeline operations for efficient batch processing.
249
250
```typescript
251
class StreamProcessor {
252
private redis: Redis;
253
private pipeline: any;
254
private batchSize: number;
255
private batchCount = 0;
256
257
constructor(redis: Redis, batchSize = 100) {
258
this.redis = redis;
259
this.batchSize = batchSize;
260
this.pipeline = redis.pipeline();
261
}
262
263
async processKeysWithPattern(pattern: string): Promise<void> {
264
const stream = this.redis.scanStream({
265
match: pattern,
266
count: 50
267
});
268
269
stream.on("data", async (keys: string[]) => {
270
for (const key of keys) {
271
// Add operations to pipeline
272
this.pipeline.type(key);
273
this.pipeline.ttl(key);
274
this.batchCount += 2;
275
276
// Execute batch when limit reached
277
if (this.batchCount >= this.batchSize) {
278
await this.executeBatch();
279
}
280
}
281
});
282
283
stream.on("end", async () => {
284
// Execute remaining operations
285
if (this.batchCount > 0) {
286
await this.executeBatch();
287
}
288
console.log("Stream processing completed");
289
});
290
}
291
292
private async executeBatch(): Promise<void> {
293
const results = await this.pipeline.exec();
294
295
// Process results in pairs (type, ttl)
296
for (let i = 0; i < results.length; i += 2) {
297
const [typeErr, type] = results[i];
298
const [ttlErr, ttl] = results[i + 1];
299
300
if (!typeErr && !ttlErr) {
301
console.log(`Key type: ${type}, TTL: ${ttl}`);
302
}
303
}
304
305
// Reset for next batch
306
this.pipeline = this.redis.pipeline();
307
this.batchCount = 0;
308
}
309
}
310
311
// Usage
312
const processor = new StreamProcessor(redis, 200);
313
await processor.processKeysWithPattern("session:*");
314
```
315
316
### Stream Data Migration
317
318
Use streams for efficient data migration between Redis instances or data transformation.
319
320
```typescript
321
class DataMigrator {
322
private sourceRedis: Redis;
323
private targetRedis: Redis;
324
325
constructor(sourceOptions: any, targetOptions: any) {
326
this.sourceRedis = new Redis(sourceOptions);
327
this.targetRedis = new Redis(targetOptions);
328
}
329
330
async migrateHashData(sourceKey: string, targetKey: string): Promise<void> {
331
const stream = this.sourceRedis.hscanStream(sourceKey, { count: 100 });
332
const pipeline = this.targetRedis.pipeline();
333
let batchCount = 0;
334
335
stream.on("data", async (fields: string[]) => {
336
// Process field-value pairs
337
for (let i = 0; i < fields.length; i += 2) {
338
const field = fields[i];
339
const value = fields[i + 1];
340
341
// Transform data if needed
342
const transformedValue = this.transformValue(value);
343
pipeline.hset(targetKey, field, transformedValue);
344
batchCount++;
345
346
// Execute in batches
347
if (batchCount >= 50) {
348
await pipeline.exec();
349
pipeline = this.targetRedis.pipeline();
350
batchCount = 0;
351
}
352
}
353
});
354
355
stream.on("end", async () => {
356
// Execute remaining operations
357
if (batchCount > 0) {
358
await pipeline.exec();
359
}
360
console.log(`Migration completed: ${sourceKey} -> ${targetKey}`);
361
});
362
363
stream.on("error", (err) => {
364
console.error("Migration error:", err);
365
});
366
}
367
368
private transformValue(value: string): string {
369
// Example transformation: convert to uppercase
370
return value.toUpperCase();
371
}
372
373
async close(): Promise<void> {
374
await this.sourceRedis.quit();
375
await this.targetRedis.quit();
376
}
377
}
378
379
// Usage
380
const migrator = new DataMigrator(
381
{ host: "source.redis.com" },
382
{ host: "target.redis.com" }
383
);
384
385
await migrator.migrateHashData("old_user_data", "new_user_data");
386
await migrator.close();
387
```
388
389
### Memory-Efficient Analytics
390
391
Process large datasets for analytics without loading everything into memory.
392
393
```typescript
394
class RedisAnalytics {
395
private redis: Redis;
396
397
constructor(redis: Redis) {
398
this.redis = redis;
399
}
400
401
async analyzeKeyPatterns(): Promise<{[type: string]: number}> {
402
const typeStats: {[type: string]: number} = {};
403
const stream = this.redis.scanStream({ count: 100 });
404
405
return new Promise((resolve, reject) => {
406
const pipeline = this.redis.pipeline();
407
let batchCount = 0;
408
409
stream.on("data", async (keys: string[]) => {
410
// Add type commands to pipeline
411
keys.forEach(key => {
412
pipeline.type(key);
413
batchCount++;
414
});
415
416
// Process batch
417
if (batchCount >= 100) {
418
const results = await pipeline.exec();
419
this.processBatchResults(results, typeStats);
420
421
// Reset batch
422
this.redis.pipeline();
423
batchCount = 0;
424
}
425
});
426
427
stream.on("end", async () => {
428
// Process remaining keys
429
if (batchCount > 0) {
430
const results = await pipeline.exec();
431
this.processBatchResults(results, typeStats);
432
}
433
434
resolve(typeStats);
435
});
436
437
stream.on("error", reject);
438
});
439
}
440
441
private processBatchResults(results: any[], typeStats: {[type: string]: number}): void {
442
results.forEach(([err, type]) => {
443
if (!err && type) {
444
typeStats[type] = (typeStats[type] || 0) + 1;
445
}
446
});
447
}
448
449
async analyzeSetDistribution(setKey: string): Promise<{total: number, uniquePatterns: number}> {
450
const stream = this.redis.sscanStream(setKey, { count: 200 });
451
const patterns = new Set<string>();
452
let totalMembers = 0;
453
454
return new Promise((resolve, reject) => {
455
stream.on("data", (members: string[]) => {
456
members.forEach(member => {
457
totalMembers++;
458
// Extract pattern (first 3 characters)
459
const pattern = member.substring(0, 3);
460
patterns.add(pattern);
461
});
462
});
463
464
stream.on("end", () => {
465
resolve({
466
total: totalMembers,
467
uniquePatterns: patterns.size
468
});
469
});
470
471
stream.on("error", reject);
472
});
473
}
474
}
475
476
// Usage
477
const analytics = new RedisAnalytics(redis);
478
479
// Analyze key type distribution
480
const keyStats = await analytics.analyzeKeyPatterns();
481
console.log("Key type distribution:", keyStats);
482
483
// Analyze set member patterns
484
const setStats = await analytics.analyzeSetDistribution("large_member_set");
485
console.log("Set analysis:", setStats);
486
```
487
488
### Error Handling and Resilience
489
490
Implement robust error handling for streaming operations.
491
492
```typescript
493
class ResilientStreamer {
494
private redis: Redis;
495
private maxRetries: number;
496
497
constructor(redis: Redis, maxRetries = 3) {
498
this.redis = redis;
499
this.maxRetries = maxRetries;
500
}
501
502
async processWithRetry(pattern: string, processor: (keys: string[]) => Promise<void>): Promise<void> {
503
let retries = 0;
504
505
while (retries < this.maxRetries) {
506
try {
507
await this.processStream(pattern, processor);
508
return; // Success
509
} catch (error) {
510
retries++;
511
console.error(`Stream processing failed (attempt ${retries}):`, error);
512
513
if (retries >= this.maxRetries) {
514
throw new Error(`Failed after ${this.maxRetries} retries: ${error.message}`);
515
}
516
517
// Wait before retry
518
await new Promise(resolve => setTimeout(resolve, 1000 * retries));
519
}
520
}
521
}
522
523
private processStream(pattern: string, processor: (keys: string[]) => Promise<void>): Promise<void> {
524
return new Promise((resolve, reject) => {
525
const stream = this.redis.scanStream({
526
match: pattern,
527
count: 50
528
});
529
530
stream.on("data", async (keys: string[]) => {
531
try {
532
await processor(keys);
533
} catch (error) {
534
stream.close();
535
reject(error);
536
}
537
});
538
539
stream.on("end", resolve);
540
stream.on("error", reject);
541
542
// Timeout protection
543
setTimeout(() => {
544
stream.close();
545
reject(new Error("Stream processing timeout"));
546
}, 30000); // 30 second timeout
547
});
548
}
549
}
550
551
// Usage
552
const resilientStreamer = new ResilientStreamer(redis);
553
554
await resilientStreamer.processWithRetry("user:*", async (keys) => {
555
// Processing logic that might fail
556
console.log(`Processing ${keys.length} keys`);
557
558
if (Math.random() < 0.1) {
559
throw new Error("Simulated processing error");
560
}
561
562
// Actual processing
563
for (const key of keys) {
564
await processUserKey(key);
565
}
566
});
567
568
async function processUserKey(key: string): Promise<void> {
569
// User key processing logic
570
}
571
```
572
573
## Types
574
575
```typescript { .api }
576
interface ScanStreamOptions {
577
match?: string; // Pattern to match (default: "*")
578
count?: number; // Batch size hint (default: 10)
579
type?: string; // Key type filter
580
}
581
582
type StreamDataHandler = (data: string[]) => void;
583
type StreamEndHandler = () => void;
584
type StreamErrorHandler = (error: Error) => void;
585
```