or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cluster.mdcommands.mdconfiguration.mdindex.mdpipelining.mdpubsub.mdredis-client.mdstreaming.md
tile.json

streaming.mddocs/

0

# Streaming

1

2

ioredis provides readable streams for efficiently scanning large datasets using Redis SCAN operations. These streams allow memory-efficient iteration over keys, set members, hash fields, and sorted set members without loading entire datasets into memory.

3

4

## Capabilities

5

6

### Key Scanning Streams

7

8

Scan Redis keys using patterns with memory-efficient streaming interface.

9

10

```typescript { .api }

11

// Key scanning streams

12

scanStream(options?: ScanStreamOptions): ScanStream;

13

scanBufferStream(options?: ScanStreamOptions): ScanStream;

14

15

interface ScanStreamOptions {

16

match?: string; // Pattern to match keys (default: "*")

17

count?: number; // Hint for batch size (default: 10)

18

type?: string; // Filter by key type

19

}

20

21

class ScanStream extends Readable {

22

close(): void; // Stop scanning and end stream

23

}

24

```

25

26

**Usage Examples:**

27

28

```typescript

29

import Redis from "ioredis";

30

31

const redis = new Redis();

32

33

// Basic key scanning

34

const stream = redis.scanStream();

35

stream.on("data", (keys: string[]) => {

36

console.log("Found keys:", keys);

37

});

38

39

stream.on("end", () => {

40

console.log("Scan completed");

41

});

42

43

// Pattern-based scanning

44

const userStream = redis.scanStream({

45

match: "user:*",

46

count: 100 // Process ~100 keys per batch

47

});

48

49

userStream.on("data", (keys: string[]) => {

50

keys.forEach(key => {

51

console.log("User key:", key);

52

});

53

});

54

55

// Type-based scanning (Redis 6.0+)

56

const hashStream = redis.scanStream({

57

match: "*",

58

type: "hash",

59

count: 50

60

});

61

62

hashStream.on("data", (hashKeys: string[]) => {

63

console.log("Hash keys:", hashKeys);

64

});

65

66

// Manual stream control

67

const controlledStream = redis.scanStream({ match: "temp:*" });

68

controlledStream.on("data", (keys: string[]) => {

69

if (keys.length > 1000) {

70

console.log("Too many keys, stopping scan");

71

controlledStream.close();

72

}

73

});

74

```

75

76

### Set Scanning Streams

77

78

Scan set members efficiently using SSCAN-based streams.

79

80

```typescript { .api }

81

// Set member scanning

82

sscanStream(key: string, options?: ScanStreamOptions): ScanStream;

83

sscanBufferStream(key: string, options?: ScanStreamOptions): ScanStream;

84

```

85

86

**Usage Examples:**

87

88

```typescript

89

// Scan large set members

90

const setStream = redis.sscanStream("large_set", {

91

match: "*pattern*",

92

count: 200

93

});

94

95

setStream.on("data", (members: string[]) => {

96

console.log(`Processing ${members.length} set members:`, members);

97

98

// Process members in batches

99

members.forEach(member => {

100

processSetMember(member);

101

});

102

});

103

104

setStream.on("end", () => {

105

console.log("Set scan completed");

106

});

107

108

// Error handling

109

setStream.on("error", (err) => {

110

console.error("Set scan error:", err);

111

});

112

113

async function processSetMember(member: string) {

114

// Example: validate and process each member

115

if (member.startsWith("valid_")) {

116

await redis.sadd("processed_set", member);

117

}

118

}

119

```

120

121

### Hash Scanning Streams

122

123

Scan hash fields and values using HSCAN-based streams.

124

125

```typescript { .api }

126

// Hash field scanning

127

hscanStream(key: string, options?: ScanStreamOptions): ScanStream;

128

hscanBufferStream(key: string, options?: ScanStreamOptions): ScanStream;

129

```

130

131

**Usage Examples:**

132

133

```typescript

134

// Scan hash fields and values

135

const hashStream = redis.hscanStream("user_profiles", {

136

match: "email*", // Only email-related fields

137

count: 50

138

});

139

140

hashStream.on("data", (fields: string[]) => {

141

// Fields come in pairs: [field1, value1, field2, value2, ...]

142

for (let i = 0; i < fields.length; i += 2) {

143

const field = fields[i];

144

const value = fields[i + 1];

145

console.log(`Field: ${field}, Value: ${value}`);

146

147

// Process field-value pairs

148

if (field.startsWith("email") && value.includes("@")) {

149

validateEmail(value);

150

}

151

}

152

});

153

154

// Batch processing hash data

155

const configStream = redis.hscanStream("app_config");

156

const configBatch = new Map<string, string>();

157

158

configStream.on("data", (fields: string[]) => {

159

for (let i = 0; i < fields.length; i += 2) {

160

configBatch.set(fields[i], fields[i + 1]);

161

}

162

163

// Process batch when it reaches certain size

164

if (configBatch.size >= 100) {

165

procesConfigBatch(configBatch);

166

configBatch.clear();

167

}

168

});

169

170

configStream.on("end", () => {

171

// Process remaining items

172

if (configBatch.size > 0) {

173

procesConfigBatch(configBatch);

174

}

175

});

176

177

function validateEmail(email: string) {

178

// Email validation logic

179

}

180

181

function procesConfigBatch(config: Map<string, string>) {

182

// Batch configuration processing

183

console.log(`Processing ${config.size} config items`);

184

}

185

```

186

187

### Sorted Set Scanning Streams

188

189

Scan sorted set members and scores using ZSCAN-based streams.

190

191

```typescript { .api }

192

// Sorted set member scanning

193

zscanStream(key: string, options?: ScanStreamOptions): ScanStream;

194

zscanBufferStream(key: string, options?: ScanStreamOptions): ScanStream;

195

```

196

197

**Usage Examples:**

198

199

```typescript

200

// Scan leaderboard entries

201

const leaderboardStream = redis.zscanStream("game_leaderboard", {

202

match: "player:*",

203

count: 25

204

});

205

206

leaderboardStream.on("data", (members: string[]) => {

207

// Members come in pairs: [member1, score1, member2, score2, ...]

208

for (let i = 0; i < members.length; i += 2) {

209

const member = members[i];

210

const score = parseFloat(members[i + 1]);

211

212

console.log(`Player: ${member}, Score: ${score}`);

213

214

// Process high scores

215

if (score > 1000) {

216

processHighScore(member, score);

217

}

218

}

219

});

220

221

// Aggregate statistics while streaming

222

let totalScore = 0;

223

let playerCount = 0;

224

const scoreStream = redis.zscanStream("scores");

225

226

scoreStream.on("data", (members: string[]) => {

227

for (let i = 0; i < members.length; i += 2) {

228

totalScore += parseFloat(members[i + 1]);

229

playerCount++;

230

}

231

});

232

233

scoreStream.on("end", () => {

234

const avgScore = totalScore / playerCount;

235

console.log(`Average score: ${avgScore} (${playerCount} players)`);

236

});

237

238

function processHighScore(player: string, score: number) {

239

// High score processing logic

240

console.log(`High score alert: ${player} scored ${score}`);

241

}

242

```

243

244

## Advanced Streaming Patterns

245

246

### Stream Pipeline Processing

247

248

Combine streaming with pipeline operations for efficient batch processing.

249

250

```typescript

251

class StreamProcessor {

252

private redis: Redis;

253

private pipeline: any;

254

private batchSize: number;

255

private batchCount = 0;

256

257

constructor(redis: Redis, batchSize = 100) {

258

this.redis = redis;

259

this.batchSize = batchSize;

260

this.pipeline = redis.pipeline();

261

}

262

263

async processKeysWithPattern(pattern: string): Promise<void> {

264

const stream = this.redis.scanStream({

265

match: pattern,

266

count: 50

267

});

268

269

stream.on("data", async (keys: string[]) => {

270

for (const key of keys) {

271

// Add operations to pipeline

272

this.pipeline.type(key);

273

this.pipeline.ttl(key);

274

this.batchCount += 2;

275

276

// Execute batch when limit reached

277

if (this.batchCount >= this.batchSize) {

278

await this.executeBatch();

279

}

280

}

281

});

282

283

stream.on("end", async () => {

284

// Execute remaining operations

285

if (this.batchCount > 0) {

286

await this.executeBatch();

287

}

288

console.log("Stream processing completed");

289

});

290

}

291

292

private async executeBatch(): Promise<void> {

293

const results = await this.pipeline.exec();

294

295

// Process results in pairs (type, ttl)

296

for (let i = 0; i < results.length; i += 2) {

297

const [typeErr, type] = results[i];

298

const [ttlErr, ttl] = results[i + 1];

299

300

if (!typeErr && !ttlErr) {

301

console.log(`Key type: ${type}, TTL: ${ttl}`);

302

}

303

}

304

305

// Reset for next batch

306

this.pipeline = this.redis.pipeline();

307

this.batchCount = 0;

308

}

309

}

310

311

// Usage

312

const processor = new StreamProcessor(redis, 200);

313

await processor.processKeysWithPattern("session:*");

314

```

315

316

### Stream Data Migration

317

318

Use streams for efficient data migration between Redis instances or data transformation.

319

320

```typescript

321

class DataMigrator {

322

private sourceRedis: Redis;

323

private targetRedis: Redis;

324

325

constructor(sourceOptions: any, targetOptions: any) {

326

this.sourceRedis = new Redis(sourceOptions);

327

this.targetRedis = new Redis(targetOptions);

328

}

329

330

async migrateHashData(sourceKey: string, targetKey: string): Promise<void> {

331

const stream = this.sourceRedis.hscanStream(sourceKey, { count: 100 });

332

const pipeline = this.targetRedis.pipeline();

333

let batchCount = 0;

334

335

stream.on("data", async (fields: string[]) => {

336

// Process field-value pairs

337

for (let i = 0; i < fields.length; i += 2) {

338

const field = fields[i];

339

const value = fields[i + 1];

340

341

// Transform data if needed

342

const transformedValue = this.transformValue(value);

343

pipeline.hset(targetKey, field, transformedValue);

344

batchCount++;

345

346

// Execute in batches

347

if (batchCount >= 50) {

348

await pipeline.exec();

349

pipeline = this.targetRedis.pipeline();

350

batchCount = 0;

351

}

352

}

353

});

354

355

stream.on("end", async () => {

356

// Execute remaining operations

357

if (batchCount > 0) {

358

await pipeline.exec();

359

}

360

console.log(`Migration completed: ${sourceKey} -> ${targetKey}`);

361

});

362

363

stream.on("error", (err) => {

364

console.error("Migration error:", err);

365

});

366

}

367

368

private transformValue(value: string): string {

369

// Example transformation: convert to uppercase

370

return value.toUpperCase();

371

}

372

373

async close(): Promise<void> {

374

await this.sourceRedis.quit();

375

await this.targetRedis.quit();

376

}

377

}

378

379

// Usage

380

const migrator = new DataMigrator(

381

{ host: "source.redis.com" },

382

{ host: "target.redis.com" }

383

);

384

385

await migrator.migrateHashData("old_user_data", "new_user_data");

386

await migrator.close();

387

```

388

389

### Memory-Efficient Analytics

390

391

Process large datasets for analytics without loading everything into memory.

392

393

```typescript

394

class RedisAnalytics {

395

private redis: Redis;

396

397

constructor(redis: Redis) {

398

this.redis = redis;

399

}

400

401

async analyzeKeyPatterns(): Promise<{[type: string]: number}> {

402

const typeStats: {[type: string]: number} = {};

403

const stream = this.redis.scanStream({ count: 100 });

404

405

return new Promise((resolve, reject) => {

406

const pipeline = this.redis.pipeline();

407

let batchCount = 0;

408

409

stream.on("data", async (keys: string[]) => {

410

// Add type commands to pipeline

411

keys.forEach(key => {

412

pipeline.type(key);

413

batchCount++;

414

});

415

416

// Process batch

417

if (batchCount >= 100) {

418

const results = await pipeline.exec();

419

this.processBatchResults(results, typeStats);

420

421

// Reset batch

422

this.redis.pipeline();

423

batchCount = 0;

424

}

425

});

426

427

stream.on("end", async () => {

428

// Process remaining keys

429

if (batchCount > 0) {

430

const results = await pipeline.exec();

431

this.processBatchResults(results, typeStats);

432

}

433

434

resolve(typeStats);

435

});

436

437

stream.on("error", reject);

438

});

439

}

440

441

private processBatchResults(results: any[], typeStats: {[type: string]: number}): void {

442

results.forEach(([err, type]) => {

443

if (!err && type) {

444

typeStats[type] = (typeStats[type] || 0) + 1;

445

}

446

});

447

}

448

449

async analyzeSetDistribution(setKey: string): Promise<{total: number, uniquePatterns: number}> {

450

const stream = this.redis.sscanStream(setKey, { count: 200 });

451

const patterns = new Set<string>();

452

let totalMembers = 0;

453

454

return new Promise((resolve, reject) => {

455

stream.on("data", (members: string[]) => {

456

members.forEach(member => {

457

totalMembers++;

458

// Extract pattern (first 3 characters)

459

const pattern = member.substring(0, 3);

460

patterns.add(pattern);

461

});

462

});

463

464

stream.on("end", () => {

465

resolve({

466

total: totalMembers,

467

uniquePatterns: patterns.size

468

});

469

});

470

471

stream.on("error", reject);

472

});

473

}

474

}

475

476

// Usage

477

const analytics = new RedisAnalytics(redis);

478

479

// Analyze key type distribution

480

const keyStats = await analytics.analyzeKeyPatterns();

481

console.log("Key type distribution:", keyStats);

482

483

// Analyze set member patterns

484

const setStats = await analytics.analyzeSetDistribution("large_member_set");

485

console.log("Set analysis:", setStats);

486

```

487

488

### Error Handling and Resilience

489

490

Implement robust error handling for streaming operations.

491

492

```typescript

493

class ResilientStreamer {

494

private redis: Redis;

495

private maxRetries: number;

496

497

constructor(redis: Redis, maxRetries = 3) {

498

this.redis = redis;

499

this.maxRetries = maxRetries;

500

}

501

502

async processWithRetry(pattern: string, processor: (keys: string[]) => Promise<void>): Promise<void> {

503

let retries = 0;

504

505

while (retries < this.maxRetries) {

506

try {

507

await this.processStream(pattern, processor);

508

return; // Success

509

} catch (error) {

510

retries++;

511

console.error(`Stream processing failed (attempt ${retries}):`, error);

512

513

if (retries >= this.maxRetries) {

514

throw new Error(`Failed after ${this.maxRetries} retries: ${error.message}`);

515

}

516

517

// Wait before retry

518

await new Promise(resolve => setTimeout(resolve, 1000 * retries));

519

}

520

}

521

}

522

523

private processStream(pattern: string, processor: (keys: string[]) => Promise<void>): Promise<void> {

524

return new Promise((resolve, reject) => {

525

const stream = this.redis.scanStream({

526

match: pattern,

527

count: 50

528

});

529

530

stream.on("data", async (keys: string[]) => {

531

try {

532

await processor(keys);

533

} catch (error) {

534

stream.close();

535

reject(error);

536

}

537

});

538

539

stream.on("end", resolve);

540

stream.on("error", reject);

541

542

// Timeout protection

543

setTimeout(() => {

544

stream.close();

545

reject(new Error("Stream processing timeout"));

546

}, 30000); // 30 second timeout

547

});

548

}

549

}

550

551

// Usage

552

const resilientStreamer = new ResilientStreamer(redis);

553

554

await resilientStreamer.processWithRetry("user:*", async (keys) => {

555

// Processing logic that might fail

556

console.log(`Processing ${keys.length} keys`);

557

558

if (Math.random() < 0.1) {

559

throw new Error("Simulated processing error");

560

}

561

562

// Actual processing

563

for (const key of keys) {

564

await processUserKey(key);

565

}

566

});

567

568

async function processUserKey(key: string): Promise<void> {

569

// User key processing logic

570

}

571

```

572

573

## Types

574

575

```typescript { .api }

576

interface ScanStreamOptions {

577

match?: string; // Pattern to match (default: "*")

578

count?: number; // Batch size hint (default: 10)

579

type?: string; // Key type filter

580

}

581

582

type StreamDataHandler = (data: string[]) => void;

583

type StreamEndHandler = () => void;

584

type StreamErrorHandler = (error: Error) => void;

585

```