or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdpromise-api.mdstream-classes.mdstream-operators.mdutility-functions.md

utility-functions.mddocs/

0

# Utility Functions

1

2

Essential utilities for stream composition, error handling, and lifecycle management. These functions provide robust patterns for working with multiple streams and are crucial for building reliable streaming applications.

3

4

## Capabilities

5

6

### pipeline

7

8

Pipes between streams forwarding errors and cleaning up properly, providing a robust way to compose multiple streams.

9

10

```javascript { .api }

11

/**

12

* Pipe between streams, handling errors and cleanup automatically

13

* @param streams - Sequence of streams to pipe together

14

* @param callback - Called when pipeline completes or errors

15

* @returns The last stream in the pipeline

16

*/

17

function pipeline(...streams: Array<NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream>, callback: (err: NodeJS.ErrnoException | null) => void): NodeJS.ReadableStream;

18

19

/**

20

* Promise-based pipeline (available via promises.pipeline)

21

* @param streams - Sequence of streams to pipe together

22

* @returns Promise that resolves when pipeline completes

23

*/

24

function pipeline(...streams: Array<NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream>): Promise<void>;

25

```

26

27

**Usage Examples:**

28

29

```javascript

30

const { pipeline, Readable, Transform } = require('readable-stream');

31

const fs = require('fs');

32

33

// Callback-based pipeline

34

pipeline(

35

fs.createReadStream('input.txt'),

36

new Transform({

37

transform(chunk, encoding, callback) {

38

this.push(chunk.toString().toUpperCase());

39

callback();

40

}

41

}),

42

fs.createWriteStream('output.txt'),

43

(err) => {

44

if (err) {

45

console.error('Pipeline failed:', err);

46

} else {

47

console.log('Pipeline succeeded');

48

}

49

}

50

);

51

52

// Promise-based pipeline

53

const { pipeline: pipelineAsync } = require('readable-stream').promises;

54

55

async function processFile() {

56

try {

57

await pipelineAsync(

58

fs.createReadStream('input.txt'),

59

new Transform({

60

transform(chunk, encoding, callback) {

61

this.push(chunk.toString().toLowerCase());

62

callback();

63

}

64

}),

65

fs.createWriteStream('output.txt')

66

);

67

console.log('Pipeline completed successfully');

68

} catch (error) {

69

console.error('Pipeline failed:', error);

70

}

71

}

72

```

73

74

### finished

75

76

Get notified when a stream is no longer readable, writable, or has experienced an error or premature close.

77

78

```javascript { .api }

79

/**

80

* Get notified when stream is finished

81

* @param stream - Stream to monitor

82

* @param options - Options for what conditions to wait for

83

* @param callback - Called when stream is finished or errors

84

* @returns Cleanup function to remove listeners

85

*/

86

function finished(

87

stream: NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream,

88

options: FinishedOptions,

89

callback: (err?: NodeJS.ErrnoException | null) => void

90

): () => void;

91

92

/**

93

* Simplified version with just callback

94

* @param stream - Stream to monitor

95

* @param callback - Called when stream is finished or errors

96

* @returns Cleanup function to remove listeners

97

*/

98

function finished(

99

stream: NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream,

100

callback: (err?: NodeJS.ErrnoException | null) => void

101

): () => void;

102

103

/**

104

* Promise-based finished (available via promises.finished)

105

* @param stream - Stream to monitor

106

* @param options - Options for what conditions to wait for

107

* @returns Promise that resolves when stream is finished

108

*/

109

function finished(

110

stream: NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream,

111

options?: FinishedOptions

112

): Promise<void>;

113

```

114

115

**Usage Examples:**

116

117

```javascript

118

const { finished, Readable } = require('readable-stream');

119

120

// Monitor a stream with callback

121

const readable = new Readable({

122

read() {

123

this.push('data');

124

this.push(null);

125

}

126

});

127

128

const cleanup = finished(readable, (err) => {

129

if (err) {

130

console.error('Stream error:', err);

131

} else {

132

console.log('Stream finished successfully');

133

}

134

});

135

136

// Promise-based monitoring

137

const { finished: finishedAsync } = require('readable-stream').promises;

138

139

async function monitorStream(stream) {

140

try {

141

await finishedAsync(stream);

142

console.log('Stream completed');

143

} catch (error) {

144

console.error('Stream failed:', error);

145

}

146

}

147

```

148

149

### compose

150

151

Compose multiple transform streams into a single transform stream, useful for creating reusable transformation pipelines.

152

153

```javascript { .api }

154

/**

155

* Compose multiple transform streams into a single transform

156

* @param streams - Transform streams to compose

157

* @returns A single transform stream representing the composition

158

*/

159

function compose(...streams: Array<NodeJS.ReadWriteStream>): NodeJS.ReadWriteStream;

160

```

161

162

**Usage Examples:**

163

164

```javascript

165

const { compose, Transform } = require('readable-stream');

166

167

// Create individual transforms

168

const upperCase = new Transform({

169

transform(chunk, encoding, callback) {

170

this.push(chunk.toString().toUpperCase());

171

callback();

172

}

173

});

174

175

const addPrefix = new Transform({

176

transform(chunk, encoding, callback) {

177

this.push('PREFIX: ' + chunk.toString());

178

callback();

179

}

180

});

181

182

// Compose them into a single transform

183

const composedTransform = compose(upperCase, addPrefix);

184

185

// Use the composed transform

186

composedTransform.write('hello world');

187

composedTransform.end();

188

189

composedTransform.on('data', (chunk) => {

190

console.log('Result:', chunk.toString()); // "PREFIX: HELLO WORLD"

191

});

192

```

193

194

### addAbortSignal

195

196

Add AbortSignal support to a stream, allowing for cancellation of stream operations.

197

198

```javascript { .api }

199

/**

200

* Add abort signal support to a stream

201

* @param signal - AbortSignal to use for cancellation

202

* @param stream - Stream to add abort support to

203

* @returns The stream with abort signal attached

204

*/

205

function addAbortSignal<T extends NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream>(

206

signal: AbortSignal,

207

stream: T

208

): T;

209

```

210

211

**Usage Examples:**

212

213

```javascript

214

const { addAbortSignal, Readable } = require('readable-stream');

215

216

// Create an abort controller

217

const controller = new AbortController();

218

const { signal } = controller;

219

220

// Create a stream with abort support

221

const readable = addAbortSignal(signal, new Readable({

222

read() {

223

// Simulate slow reading

224

setTimeout(() => {

225

this.push('chunk');

226

}, 1000);

227

}

228

}));

229

230

// Abort after 500ms

231

setTimeout(() => {

232

controller.abort();

233

}, 500);

234

235

readable.on('error', (err) => {

236

if (err.name === 'AbortError') {

237

console.log('Stream was aborted');

238

}

239

});

240

```

241

242

### destroy

243

244

Destroy a stream, calling its destroy method if available, or emitting an error.

245

246

```javascript { .api }

247

/**

248

* Destroy a stream

249

* @param stream - Stream to destroy

250

* @param error - Optional error to emit

251

*/

252

function destroy(stream: NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream, error?: Error): void;

253

```

254

255

**Usage Examples:**

256

257

```javascript

258

const { destroy, Readable } = require('readable-stream');

259

260

const readable = new Readable({

261

read() {

262

this.push('data');

263

}

264

});

265

266

// Destroy the stream with an error

267

destroy(readable, new Error('Something went wrong'));

268

269

readable.on('error', (err) => {

270

console.error('Stream destroyed:', err.message);

271

});

272

```

273

274

### Stream State Utilities

275

276

Utility functions for checking stream states.

277

278

```javascript { .api }

279

/**

280

* Check if a stream has been read from or disturbed

281

* @param stream - Stream to check

282

* @returns true if stream has been disturbed

283

*/

284

function isDisturbed(stream: NodeJS.ReadableStream): boolean;

285

286

/**

287

* Check if a stream has errored

288

* @param stream - Stream to check

289

* @returns true if stream has errored

290

*/

291

function isErrored(stream: NodeJS.ReadableStream | NodeJS.WritableStream): boolean;

292

```

293

294

**Usage Examples:**

295

296

```javascript

297

const { isDisturbed, isErrored, Readable } = require('readable-stream');

298

299

const readable = new Readable({

300

read() {

301

this.push('data');

302

this.push(null);

303

}

304

});

305

306

console.log(isDisturbed(readable)); // false

307

308

readable.read(); // Read some data

309

310

console.log(isDisturbed(readable)); // true

311

console.log(isErrored(readable)); // false

312

```

313

314

### High Water Mark Configuration

315

316

Functions for configuring the default high water mark for streams.

317

318

```javascript { .api }

319

/**

320

* Set the default high water mark for streams

321

* @param objectMode - Whether this is for object mode streams

322

* @param value - The high water mark value

323

*/

324

function setDefaultHighWaterMark(objectMode: boolean, value: number): void;

325

326

/**

327

* Get the default high water mark for streams

328

* @param objectMode - Whether this is for object mode streams

329

* @returns The current default high water mark

330

*/

331

function getDefaultHighWaterMark(objectMode: boolean): number;

332

```

333

334

**Usage Examples:**

335

336

```javascript

337

const { setDefaultHighWaterMark, getDefaultHighWaterMark } = require('readable-stream');

338

339

// Get current defaults

340

console.log('Current buffer HWM:', getDefaultHighWaterMark(false)); // 16384

341

console.log('Current object HWM:', getDefaultHighWaterMark(true)); // 16

342

343

// Set new defaults

344

setDefaultHighWaterMark(false, 32768); // Increase buffer default

345

setDefaultHighWaterMark(true, 32); // Increase object default

346

```

347

348

### Internal Utilities

349

350

Internal utility functions exported for compatibility with Node.js streams.

351

352

```javascript { .api }

353

/**

354

* Convert Uint8Array to Buffer (internal utility)

355

* @param chunk - Uint8Array to convert

356

* @returns Buffer representation

357

*/

358

function _uint8ArrayToBuffer(chunk: Uint8Array): Buffer;

359

360

/**

361

* Check if value is Uint8Array (internal utility)

362

* @param value - Value to check

363

* @returns true if value is Uint8Array

364

*/

365

function _isUint8Array(value: any): boolean;

366

```

367

368

**Usage Examples:**

369

370

```javascript

371

const { _uint8ArrayToBuffer, _isUint8Array } = require('readable-stream');

372

373

// Check if value is Uint8Array

374

const buffer = new Uint8Array([1, 2, 3]);

375

console.log(_isUint8Array(buffer)); // true

376

console.log(_isUint8Array([1, 2, 3])); // false

377

378

// Convert Uint8Array to Buffer

379

const convertedBuffer = _uint8ArrayToBuffer(buffer);

380

console.log(Buffer.isBuffer(convertedBuffer)); // true

381

```

382

383

## Types

384

385

```javascript { .api }

386

interface FinishedOptions {

387

error?: boolean; // Wait for error event (default: true)

388

readable?: boolean; // Wait for readable to end (default: true)

389

writable?: boolean; // Wait for writable to finish (default: true)

390

signal?: AbortSignal; // AbortSignal for cancellation

391

}

392

```