or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdpromise-api.mdstream-classes.mdstream-operators.mdutility-functions.md

promise-api.mddocs/

0

# Promise API

1

2

Promise-based versions of utility functions that integrate seamlessly with modern async/await patterns. These functions provide the same functionality as their callback-based counterparts but return promises for cleaner async code.

3

4

## Capabilities

5

6

### promises.pipeline

7

8

Promise-based version of the pipeline utility function for composing streams.

9

10

```javascript { .api }

11

/**

12

* Promise-based pipeline for composing multiple streams

13

* @param streams - Sequence of streams to pipe together

14

* @returns Promise that resolves when the pipeline completes successfully

15

*/

16

const promises = {

17

pipeline: (...streams: Array<NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream>) => Promise<void>;

18

};

19

```

20

21

**Usage Examples:**

22

23

```javascript

24

const { promises, Transform } = require('readable-stream');

25

const fs = require('fs');

26

27

// Async/await pipeline

28

async function processFile() {

29

try {

30

await promises.pipeline(

31

fs.createReadStream('input.txt'),

32

new Transform({

33

transform(chunk, encoding, callback) {

34

// Convert to uppercase

35

this.push(chunk.toString().toUpperCase());

36

callback();

37

}

38

}),

39

fs.createWriteStream('output.txt')

40

);

41

console.log('File processed successfully');

42

} catch (error) {

43

console.error('Pipeline failed:', error);

44

}

45

}

46

47

// Multiple transform pipeline

48

async function complexProcessing() {

49

const upperCase = new Transform({

50

transform(chunk, encoding, callback) {

51

this.push(chunk.toString().toUpperCase());

52

callback();

53

}

54

});

55

56

const addLineNumbers = new Transform({

57

objectMode: true,

58

transform(chunk, encoding, callback) {

59

const lines = chunk.toString().split('\n');

60

lines.forEach((line, index) => {

61

if (line.trim()) {

62

this.push(`${index + 1}: ${line}\n`);

63

}

64

});

65

callback();

66

}

67

});

68

69

try {

70

await promises.pipeline(

71

fs.createReadStream('source.txt'),

72

upperCase,

73

addLineNumbers,

74

fs.createWriteStream('numbered.txt')

75

);

76

console.log('Complex processing completed');

77

} catch (error) {

78

console.error('Processing failed:', error);

79

}

80

}

81

```

82

83

### promises.finished

84

85

Promise-based version of the finished utility function for monitoring stream completion.

86

87

```javascript { .api }

88

/**

89

* Promise-based stream completion monitoring

90

* @param stream - Stream to monitor for completion

91

* @param options - Optional configuration for what events to wait for

92

* @returns Promise that resolves when the stream is finished

93

*/

94

const promises = {

95

finished: (

96

stream: NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream,

97

options?: FinishedOptions

98

) => Promise<void>;

99

};

100

```

101

102

**Usage Examples:**

103

104

```javascript

105

const { promises, Readable, Writable } = require('readable-stream');

106

107

// Monitor readable stream completion

108

async function monitorReadable() {

109

const readable = new Readable({

110

read() {

111

this.push('chunk 1');

112

this.push('chunk 2');

113

this.push(null); // End stream

114

}

115

});

116

117

// Start reading the stream

118

readable.on('data', (chunk) => {

119

console.log('Read:', chunk.toString());

120

});

121

122

try {

123

await promises.finished(readable);

124

console.log('Readable stream finished successfully');

125

} catch (error) {

126

console.error('Readable stream error:', error);

127

}

128

}

129

130

// Monitor writable stream completion

131

async function monitorWritable() {

132

const writable = new Writable({

133

write(chunk, encoding, callback) {

134

console.log('Writing:', chunk.toString());

135

// Simulate async operation

136

setTimeout(callback, 100);

137

}

138

});

139

140

// Start writing to the stream

141

writable.write('data 1');

142

writable.write('data 2');

143

writable.end();

144

145

try {

146

await promises.finished(writable);

147

console.log('Writable stream finished successfully');

148

} catch (error) {

149

console.error('Writable stream error:', error);

150

}

151

}

152

153

// Monitor with specific options

154

async function monitorWithOptions() {

155

const readable = new Readable({

156

read() {

157

// Simulate data production

158

setTimeout(() => {

159

this.push('data');

160

}, 100);

161

}

162

});

163

164

try {

165

await promises.finished(readable, {

166

readable: true,

167

writable: false, // Don't wait for writable events

168

error: true // Wait for error events

169

});

170

console.log('Stream monitoring completed');

171

} catch (error) {

172

console.error('Stream monitoring failed:', error);

173

}

174

}

175

```

176

177

## Combining Promise API with Stream Operators

178

179

The Promise API works seamlessly with stream operators:

180

181

```javascript

182

const { promises, Readable } = require('readable-stream');

183

184

async function processDataWithOperators() {

185

// Create a readable stream

186

const dataSource = new Readable({

187

objectMode: true,

188

read() {

189

// Simulate data source

190

const data = [1, 2, 3, 4, 5];

191

data.forEach(item => this.push(item));

192

this.push(null);

193

}

194

});

195

196

// Transform with operators

197

const transformed = dataSource

198

.map(x => x * 2)

199

.filter(x => x > 5);

200

201

// Use promises to collect results

202

try {

203

const results = await transformed.toArray();

204

console.log('Results:', results); // [6, 8, 10]

205

} catch (error) {

206

console.error('Processing failed:', error);

207

}

208

}

209

210

// Pipeline with operator-transformed streams

211

async function pipelineWithOperators() {

212

const source = Readable.from([1, 2, 3, 4, 5]);

213

const doubled = source.map(x => x * 2);

214

215

const writable = new Writable({

216

objectMode: true,

217

write(chunk, encoding, callback) {

218

console.log('Received:', chunk);

219

callback();

220

}

221

});

222

223

try {

224

await promises.pipeline(doubled, writable);

225

console.log('Pipeline with operators completed');

226

} catch (error) {

227

console.error('Pipeline failed:', error);

228

}

229

}

230

```

231

232

## Error Handling Patterns

233

234

The Promise API provides clean error handling patterns:

235

236

```javascript

237

const { promises, Readable, Transform } = require('readable-stream');

238

239

// Graceful error handling

240

async function robustProcessing() {

241

const source = new Readable({

242

read() {

243

this.push('valid data');

244

this.push('invalid data');

245

this.push(null);

246

}

247

});

248

249

const validator = new Transform({

250

transform(chunk, encoding, callback) {

251

const data = chunk.toString();

252

if (data === 'invalid data') {

253

callback(new Error('Data validation failed'));

254

return;

255

}

256

this.push(data.toUpperCase());

257

callback();

258

}

259

});

260

261

const output = new Writable({

262

write(chunk, encoding, callback) {

263

console.log('Processed:', chunk.toString());

264

callback();

265

}

266

});

267

268

try {

269

await promises.pipeline(source, validator, output);

270

console.log('Processing completed successfully');

271

} catch (error) {

272

console.error('Processing failed:', error.message);

273

// Handle cleanup or recovery here

274

}

275

}

276

277

// Multiple pipeline error handling

278

async function multipleOperations() {

279

const operations = [

280

() => promises.pipeline(/* pipeline 1 */),

281

() => promises.pipeline(/* pipeline 2 */),

282

() => promises.pipeline(/* pipeline 3 */)

283

];

284

285

const results = await Promise.allSettled(

286

operations.map(op => op())

287

);

288

289

results.forEach((result, index) => {

290

if (result.status === 'fulfilled') {

291

console.log(`Operation ${index + 1} succeeded`);

292

} else {

293

console.error(`Operation ${index + 1} failed:`, result.reason);

294

}

295

});

296

}

297

```

298

299

## AbortSignal Support

300

301

The Promise API supports AbortSignal for cancellation:

302

303

```javascript

304

const { promises, Readable, Transform } = require('readable-stream');

305

306

async function cancellableOperation() {

307

const controller = new AbortController();

308

const { signal } = controller;

309

310

// Cancel after 5 seconds

311

setTimeout(() => {

312

controller.abort();

313

}, 5000);

314

315

const slowSource = new Readable({

316

read() {

317

// Simulate slow data production

318

setTimeout(() => {

319

this.push('data');

320

}, 1000);

321

}

322

});

323

324

const slowTransform = new Transform({

325

transform(chunk, encoding, callback) {

326

// Simulate slow processing

327

setTimeout(() => {

328

this.push(chunk.toString().toUpperCase());

329

callback();

330

}, 2000);

331

}

332

});

333

334

try {

335

await promises.finished(slowSource, { signal });

336

console.log('Operation completed');

337

} catch (error) {

338

if (error.name === 'AbortError') {

339

console.log('Operation was cancelled');

340

} else {

341

console.error('Operation failed:', error);

342

}

343

}

344

}

345

```

346

347

## Integration with Other APIs

348

349

The Promise API integrates well with other Node.js Promise APIs:

350

351

```javascript

352

const { promises, Readable } = require('readable-stream');

353

const { promises: fs } = require('fs');

354

355

async function fileProcessingWorkflow() {

356

try {

357

// Read file list

358

const files = await fs.readdir('./data');

359

360

// Create stream from file list

361

const fileStream = Readable.from(files);

362

363

// Process each file

364

const processedFiles = await fileStream

365

.filter(filename => filename.endsWith('.txt'))

366

.map(async (filename) => {

367

const content = await fs.readFile(`./data/${filename}`, 'utf8');

368

return { filename, content, size: content.length };

369

})

370

.toArray();

371

372

console.log('Processed files:', processedFiles);

373

} catch (error) {

374

console.error('Workflow failed:', error);

375

}

376

}

377

```

378

379

## Types

380

381

```javascript { .api }

382

interface FinishedOptions {

383

error?: boolean; // Wait for error event (default: true)

384

readable?: boolean; // Wait for readable to end (default: true)

385

writable?: boolean; // Wait for writable to finish (default: true)

386

signal?: AbortSignal; // AbortSignal for cancellation

387

}

388

389

// Promise API namespace

390

interface StreamPromises {

391

pipeline: (...streams: Array<NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream>) => Promise<void>;

392

finished: (stream: NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream, options?: FinishedOptions) => Promise<void>;

393

}

394

```