or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdpromise-api.mdstream-classes.mdstream-operators.mdutility-functions.md

stream-operators.mddocs/

0

# Stream Operators

1

2

Functional programming methods available on Readable streams for data transformation and processing. These operators provide a chainable API for stream manipulation and are inspired by functional programming patterns.

3

4

## Capabilities

5

6

### Stream-Returning Operators

7

8

Operators that return new stream instances, allowing for chainable transformations.

9

10

#### map

11

12

Transforms each chunk using the provided function, with full type safety.

13

14

```javascript { .api }

15

/**

16

* Transform each chunk using a mapping function

17

* @param fn - Function to transform each chunk

18

* @param options - Optional configuration

19

* @returns New readable stream with transformed data

20

*/

21

map(fn: (chunk: any, options?: any) => any, options?: any): Readable;

22

```

23

24

**Usage Examples:**

25

26

```javascript

27

const { Readable } = require('readable-stream');

28

29

// Create a source stream

30

const source = Readable.from([1, 2, 3, 4, 5]);

31

32

// Transform each number by doubling it

33

const doubled = source.map((num) => num * 2);

34

35

doubled.on('data', (chunk) => {

36

console.log('Doubled:', chunk); // 2, 4, 6, 8, 10

37

});

38

39

// String transformation example

40

const words = Readable.from(['hello', 'world', 'stream']);

41

const uppercase = words.map((word) => word.toUpperCase());

42

43

uppercase.on('data', (chunk) => {

44

console.log('Uppercase:', chunk); // "HELLO", "WORLD", "STREAM"

45

});

46

```

47

48

#### filter

49

50

Filters chunks based on a predicate function.

51

52

```javascript { .api }

53

/**

54

* Filter chunks based on a predicate

55

* @param fn - Predicate function that returns true to keep the chunk

56

* @param options - Optional configuration

57

* @returns New readable stream with filtered data

58

*/

59

filter(fn: (chunk: any, options?: any) => boolean, options?: any): Readable;

60

```

61

62

**Usage Examples:**

63

64

```javascript

65

const { Readable } = require('readable-stream');

66

67

// Filter even numbers

68

const numbers = Readable.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

69

const evenNumbers = numbers.filter((num) => num % 2 === 0);

70

71

evenNumbers.on('data', (chunk) => {

72

console.log('Even:', chunk); // 2, 4, 6, 8, 10

73

});

74

75

// Filter strings by length

76

const words = Readable.from(['cat', 'elephant', 'dog', 'hippopotamus']);

77

const longWords = words.filter((word) => word.length > 5);

78

79

longWords.on('data', (chunk) => {

80

console.log('Long word:', chunk); // "elephant", "hippopotamus"

81

});

82

```

83

84

#### drop

85

86

Skips the first N chunks from the stream.

87

88

```javascript { .api }

89

/**

90

* Skip the first N chunks

91

* @param number - Number of chunks to skip

92

* @param options - Optional configuration

93

* @returns New readable stream with first N chunks dropped

94

*/

95

drop(number: number, options?: any): Readable;

96

```

97

98

**Usage Examples:**

99

100

```javascript

101

const { Readable } = require('readable-stream');

102

103

// Skip first 3 numbers

104

const numbers = Readable.from([1, 2, 3, 4, 5, 6, 7]);

105

const afterDrop = numbers.drop(3);

106

107

afterDrop.on('data', (chunk) => {

108

console.log('After drop:', chunk); // 4, 5, 6, 7

109

});

110

```

111

112

#### take

113

114

Takes only the first N chunks from the stream.

115

116

```javascript { .api }

117

/**

118

* Take only the first N chunks

119

* @param number - Number of chunks to take

120

* @param options - Optional configuration

121

* @returns New readable stream with only first N chunks

122

*/

123

take(number: number, options?: any): Readable;

124

```

125

126

**Usage Examples:**

127

128

```javascript

129

const { Readable } = require('readable-stream');

130

131

// Take first 3 numbers

132

const numbers = Readable.from([1, 2, 3, 4, 5, 6, 7]);

133

const firstThree = numbers.take(3);

134

135

firstThree.on('data', (chunk) => {

136

console.log('First three:', chunk); // 1, 2, 3

137

});

138

```

139

140

#### asIndexedPairs (Deprecated)

141

142

**⚠️ DEPRECATED:** This operator will be removed in a future version.

143

144

Creates index-value pairs for each chunk in the stream.

145

146

```javascript { .api }

147

/**

148

* @deprecated This operator will be removed in a future version

149

* Map each chunk to [index, value] pairs

150

* @param options - Optional configuration

151

* @returns New readable stream with indexed pairs

152

*/

153

asIndexedPairs(options?: any): Readable;

154

```

155

156

#### flatMap

157

158

Maps each chunk to an iterable and flattens the results.

159

160

```javascript { .api }

161

/**

162

* Map each chunk to an iterable and flatten the results

163

* @param fn - Function that returns an iterable for each chunk

164

* @param options - Optional configuration

165

* @returns New readable stream with flattened results

166

*/

167

flatMap(fn: (chunk: any, options?: any) => Iterable<any>, options?: any): Readable;

168

```

169

170

**Usage Examples:**

171

172

```javascript

173

const { Readable } = require('readable-stream');

174

175

// Split each string into characters

176

const words = Readable.from(['hello', 'world']);

177

const characters = words.flatMap((word) => word.split(''));

178

179

characters.on('data', (chunk) => {

180

console.log('Character:', chunk); // 'h', 'e', 'l', 'l', 'o', 'w', 'o', 'r', 'l', 'd'

181

});

182

183

// Expand numbers into ranges

184

const ranges = Readable.from([2, 3]);

185

const expanded = ranges.flatMap((n) => Array.from({length: n}, (_, i) => i));

186

187

expanded.on('data', (chunk) => {

188

console.log('Expanded:', chunk); // 0, 1, 0, 1, 2

189

});

190

```

191

192

#### compose

193

194

Compose the readable stream with another transform stream.

195

196

```javascript { .api }

197

/**

198

* Compose this readable with a transform stream

199

* @param stream - Transform stream to compose with

200

* @param options - Optional configuration

201

* @returns New readable stream representing the composition

202

*/

203

compose(stream: NodeJS.ReadWriteStream, options?: any): Readable;

204

```

205

206

**Usage Examples:**

207

208

```javascript

209

const { Readable, Transform } = require('readable-stream');

210

211

// Create a transform stream

212

const upperCaseTransform = new Transform({

213

transform(chunk, encoding, callback) {

214

this.push(chunk.toString().toUpperCase());

215

callback();

216

}

217

});

218

219

// Compose with the transform

220

const source = Readable.from(['hello', 'world']);

221

const composed = source.compose(upperCaseTransform);

222

223

composed.on('data', (chunk) => {

224

console.log('Composed result:', chunk.toString()); // "HELLO", "WORLD"

225

});

226

```

227

228

### Promise-Returning Operators

229

230

Operators that return promises, useful for consuming stream data or performing reductions.

231

232

#### reduce

233

234

Reduces the stream to a single value using an accumulator function.

235

236

```javascript { .api }

237

/**

238

* Reduce the stream to a single value

239

* @param fn - Reducer function

240

* @param initial - Initial accumulator value

241

* @param options - Optional configuration

242

* @returns Promise that resolves to the final accumulated value

243

*/

244

reduce(fn: (previous: any, current: any, options?: any) => any, initial?: any, options?: any): Promise<any>;

245

```

246

247

**Usage Examples:**

248

249

```javascript

250

const { Readable } = require('readable-stream');

251

252

// Sum all numbers

253

const numbers = Readable.from([1, 2, 3, 4, 5]);

254

const sum = await numbers.reduce((acc, num) => acc + num, 0);

255

console.log('Sum:', sum); // 15

256

257

// Concatenate strings

258

const words = Readable.from(['hello', ' ', 'world']);

259

const sentence = await words.reduce((acc, word) => acc + word, '');

260

console.log('Sentence:', sentence); // "hello world"

261

262

// Find maximum value

263

const values = Readable.from([3, 7, 2, 9, 1]);

264

const max = await values.reduce((acc, val) => Math.max(acc, val), -Infinity);

265

console.log('Max:', max); // 9

266

```

267

268

#### toArray

269

270

Collects all chunks from the stream into an array.

271

272

```javascript { .api }

273

/**

274

* Collect all chunks into an array

275

* @param options - Optional configuration

276

* @returns Promise that resolves to an array of all chunks

277

*/

278

toArray(options?: any): Promise<any[]>;

279

```

280

281

**Usage Examples:**

282

283

```javascript

284

const { Readable } = require('readable-stream');

285

286

// Collect all numbers

287

const numbers = Readable.from([1, 2, 3, 4, 5]);

288

const array = await numbers.toArray();

289

console.log('Array:', array); // [1, 2, 3, 4, 5]

290

291

// Collect transformed data

292

const doubled = Readable.from([1, 2, 3]).map(x => x * 2);

293

const result = await doubled.toArray();

294

console.log('Doubled array:', result); // [2, 4, 6]

295

```

296

297

#### forEach

298

299

Executes a function for each chunk in the stream.

300

301

```javascript { .api }

302

/**

303

* Execute a function for each chunk

304

* @param fn - Function to execute for each chunk

305

* @param options - Optional configuration

306

* @returns Promise that resolves when all chunks have been processed

307

*/

308

forEach(fn: (chunk: any, options?: any) => void, options?: any): Promise<void>;

309

```

310

311

**Usage Examples:**

312

313

```javascript

314

const { Readable } = require('readable-stream');

315

316

// Log each chunk

317

const numbers = Readable.from([1, 2, 3, 4, 5]);

318

await numbers.forEach((num) => {

319

console.log('Processing:', num);

320

});

321

console.log('All numbers processed');

322

323

// Perform side effects

324

const data = Readable.from(['file1.txt', 'file2.txt']);

325

await data.forEach((filename) => {

326

console.log('Would process file:', filename);

327

// In real code, you might read/process the file here

328

});

329

```

330

331

#### every

332

333

Tests whether all chunks in the stream pass a predicate test.

334

335

```javascript { .api }

336

/**

337

* Test whether all chunks pass a predicate

338

* @param fn - Predicate function to test each chunk

339

* @param options - Optional configuration

340

* @returns Promise that resolves to true if all chunks pass the test

341

*/

342

every(fn: (chunk: any, options?: any) => boolean, options?: any): Promise<boolean>;

343

```

344

345

**Usage Examples:**

346

347

```javascript

348

const { Readable } = require('readable-stream');

349

350

// Check if all numbers are positive

351

const positiveNumbers = Readable.from([1, 2, 3, 4, 5]);

352

const allPositive = await positiveNumbers.every(num => num > 0);

353

console.log('All positive:', allPositive); // true

354

355

// Check if all strings are long enough

356

const words = Readable.from(['hello', 'world', 'stream']);

357

const allLongEnough = await words.every(word => word.length >= 5);

358

console.log('All long enough:', allLongEnough); // true

359

```

360

361

#### some

362

363

Tests whether at least one chunk in the stream passes a predicate test.

364

365

```javascript { .api }

366

/**

367

* Test whether at least one chunk passes a predicate

368

* @param fn - Predicate function to test each chunk

369

* @param options - Optional configuration

370

* @returns Promise that resolves to true if any chunk passes the test

371

*/

372

some(fn: (chunk: any, options?: any) => boolean, options?: any): Promise<boolean>;

373

```

374

375

**Usage Examples:**

376

377

```javascript

378

const { Readable } = require('readable-stream');

379

380

// Check if any number is even

381

const numbers = Readable.from([1, 3, 5, 7, 8]);

382

const hasEven = await numbers.some(num => num % 2 === 0);

383

console.log('Has even number:', hasEven); // true

384

385

// Check if any string contains 'a'

386

const words = Readable.from(['hello', 'world', 'stream']);

387

const hasA = await words.some(word => word.includes('a'));

388

console.log('Has word with "a":', hasA); // true (stream)

389

```

390

391

#### find

392

393

Finds the first chunk that passes a predicate test.

394

395

```javascript { .api }

396

/**

397

* Find the first chunk that passes a predicate

398

* @param fn - Predicate function to test each chunk

399

* @param options - Optional configuration

400

* @returns Promise that resolves to the first matching chunk or undefined

401

*/

402

find(fn: (chunk: any, options?: any) => boolean, options?: any): Promise<any>;

403

```

404

405

**Usage Examples:**

406

407

```javascript

408

const { Readable } = require('readable-stream');

409

410

// Find first even number

411

const numbers = Readable.from([1, 3, 4, 6, 7]);

412

const firstEven = await numbers.find(num => num % 2 === 0);

413

console.log('First even:', firstEven); // 4

414

415

// Find first long word

416

const words = Readable.from(['cat', 'elephant', 'dog']);

417

const firstLongWord = await words.find(word => word.length > 5);

418

console.log('First long word:', firstLongWord); // "elephant"

419

```

420

421

## Chaining Operators

422

423

Stream operators can be chained together to create complex data processing pipelines:

424

425

```javascript

426

const { Readable } = require('readable-stream');

427

428

// Complex processing pipeline

429

const numbers = Readable.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

430

431

const result = await numbers

432

.filter(n => n % 2 === 0) // Keep only even numbers: [2, 4, 6, 8, 10]

433

.map(n => n * n) // Square them: [4, 16, 36, 64, 100]

434

.drop(1) // Skip first: [16, 36, 64, 100]

435

.take(3) // Take first 3: [16, 36, 64]

436

.reduce((sum, n) => sum + n, 0); // Sum them up

437

438

console.log('Result:', result); // 116

439

```

440

441

## Error Handling

442

443

Stream operators properly propagate errors through the chain:

444

445

```javascript

446

const { Readable } = require('readable-stream');

447

448

const source = Readable.from([1, 2, 3, 4, 5]);

449

450

try {

451

const result = await source

452

.map(n => {

453

if (n === 3) throw new Error('Processing error');

454

return n * 2;

455

})

456

.toArray();

457

} catch (error) {

458

console.error('Pipeline error:', error.message); // "Processing error"

459

}

460

```

461

462

## Types

463

464

```javascript { .api }

465

// All operators accept an optional options parameter

466

interface OperatorOptions {

467

signal?: AbortSignal;

468

highWaterMark?: number;

469

[key: string]: any;

470

}

471

```