or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

duplex-transform.mdindex.mdpipeline.mdreadable.mdwritable.md

pipeline.mddocs/

0

# Pipeline Functions

1

2

StreamX provides powerful pipeline utilities for connecting multiple streams with automatic error handling, cleanup, and callback support. Pipelines handle stream lifecycle management and ensure proper resource cleanup.

3

4

## Capabilities

5

6

### Pipeline Function

7

8

Connects multiple streams together with comprehensive error handling and automatic cleanup.

9

10

```javascript { .api }

11

/**

12

* Pipe multiple streams together with error handling and cleanup

13

* @param streams - Streams to connect, with optional callback as last argument

14

* @returns The last stream in the pipeline

15

*/

16

function pipeline(...streams: (Stream | ((err?: Error) => void))[]): Stream;

17

```

18

19

**Usage Examples:**

20

21

```javascript

22

const { pipeline, Readable, Transform, Writable } = require('streamx');

23

24

// Basic pipeline with callback

25

const lastStream = pipeline(

26

Readable.from(['hello', 'world', 'from', 'streamx']),

27

new Transform({

28

transform(data, cb) {

29

cb(null, data.toString().toUpperCase());

30

}

31

}),

32

new Writable({

33

write(data, cb) {

34

console.log('Output:', data.toString());

35

cb();

36

}

37

}),

38

(err) => {

39

if (err) {

40

console.error('Pipeline failed:', err);

41

} else {

42

console.log('Pipeline completed successfully');

43

}

44

}

45

);

46

47

// Pipeline without callback

48

const result = pipeline(

49

source,

50

transformer1,

51

transformer2,

52

destination

53

);

54

55

// Monitor the last stream

56

result.on('finish', () => {

57

console.log('Pipeline finished');

58

});

59

```

60

61

### Pipeline Promise

62

63

Promise-based version of pipeline for async/await workflows.

64

65

```javascript { .api }

66

/**

67

* Promise-based pipeline that resolves when complete or rejects on error

68

* @param streams - Streams to connect in pipeline

69

* @returns Promise that resolves when pipeline completes

70

*/

71

function pipelinePromise(...streams: Stream[]): Promise<void>;

72

```

73

74

**Usage Examples:**

75

76

```javascript

77

const { pipelinePromise, Readable, Transform, Writable } = require('streamx');

78

79

// Using async/await

80

async function processData() {

81

try {

82

await pipelinePromise(

83

Readable.from(['data1', 'data2', 'data3']),

84

new Transform({

85

transform(data, cb) {

86

const processed = `Processed: ${data}`;

87

cb(null, processed);

88

}

89

}),

90

new Writable({

91

write(data, cb) {

92

console.log(data.toString());

93

cb();

94

}

95

})

96

);

97

98

console.log('Pipeline completed successfully');

99

} catch (err) {

100

console.error('Pipeline failed:', err);

101

}

102

}

103

104

// Multiple pipelines in sequence

105

async function sequentialProcessing() {

106

await pipelinePromise(sourceA, transformA, destinationA);

107

await pipelinePromise(sourceB, transformB, destinationB);

108

console.log('All pipelines completed');

109

}

110

111

// Parallel pipelines

112

async function parallelProcessing() {

113

await Promise.all([

114

pipelinePromise(source1, transform1, dest1),

115

pipelinePromise(source2, transform2, dest2),

116

pipelinePromise(source3, transform3, dest3)

117

]);

118

console.log('All parallel pipelines completed');

119

}

120

```

121

122

### Utility Functions

123

124

Helper functions for stream inspection and state checking.

125

126

```javascript { .api }

127

/**

128

* Check if an object is a stream (Node.js or StreamX)

129

* @param obj - Object to check

130

* @returns True if object is a stream

131

*/

132

function isStream(obj: any): boolean;

133

134

/**

135

* Check if an object is specifically a StreamX stream

136

* @param obj - Object to check

137

* @returns True if object is a StreamX stream

138

*/

139

function isStreamx(obj: any): boolean;

140

141

/**

142

* Check if a readable stream has ended

143

* @param stream - Readable stream to check

144

* @returns True if stream has ended

145

*/

146

function isEnded(stream: Readable): boolean;

147

148

/**

149

* Check if a writable stream has finished

150

* @param stream - Writable stream to check

151

* @returns True if stream has finished

152

*/

153

function isFinished(stream: Writable): boolean;

154

155

/**

156

* Check if a readable stream has been disturbed (data read from it)

157

* @param stream - Readable stream to check

158

* @returns True if stream has been disturbed

159

*/

160

function isDisturbed(stream: Readable): boolean;

161

162

/**

163

* Get error from a stream if any exists

164

* @param stream - Stream to check for errors

165

* @param opts - Optional configuration

166

* @returns Error object or null if no error

167

*/

168

function getStreamError(stream: Stream, opts?: object): Error | null;

169

```

170

171

**Utility Function Examples:**

172

173

```javascript

174

const {

175

isStream,

176

isStreamx,

177

isEnded,

178

isFinished,

179

isDisturbed,

180

getStreamError,

181

Readable,

182

Writable

183

} = require('streamx');

184

const fs = require('fs');

185

186

// Check stream types

187

const streamxReadable = new Readable();

188

const nodeReadable = fs.createReadStream('file.txt');

189

190

console.log(isStream(streamxReadable)); // true

191

console.log(isStream(nodeReadable)); // true

192

console.log(isStreamx(streamxReadable)); // true

193

console.log(isStreamx(nodeReadable)); // false

194

195

// Check stream states

196

const readable = Readable.from(['data1', 'data2']);

197

198

console.log(isEnded(readable)); // false

199

console.log(isDisturbed(readable)); // false

200

201

readable.read(); // Disturb the stream

202

console.log(isDisturbed(readable)); // true

203

204

// Monitor writable stream

205

const writable = new Writable({

206

write(data, cb) {

207

console.log('Written:', data.toString());

208

cb();

209

}

210

});

211

212

writable.write('test');

213

writable.end();

214

215

writable.on('finish', () => {

216

console.log(isFinished(writable)); // true

217

});

218

219

// Error checking

220

const errorStream = new Readable({

221

read(cb) {

222

cb(new Error('Read failed'));

223

}

224

});

225

226

setTimeout(() => {

227

const error = getStreamError(errorStream);

228

if (error) {

229

console.log('Stream has error:', error.message);

230

}

231

}, 100);

232

```

233

234

### Advanced Pipeline Patterns

235

236

StreamX pipelines support advanced patterns for complex stream processing workflows.

237

238

**Conditional Pipeline:**

239

240

```javascript

241

async function conditionalPipeline(data, shouldTransform) {

242

const streams = [

243

Readable.from(data)

244

];

245

246

if (shouldTransform) {

247

streams.push(new Transform({

248

transform(chunk, cb) {

249

cb(null, chunk.toString().toUpperCase());

250

}

251

}));

252

}

253

254

streams.push(new Writable({

255

write(chunk, cb) {

256

console.log('Result:', chunk.toString());

257

cb();

258

}

259

}));

260

261

await pipelinePromise(...streams);

262

}

263

```

264

265

**Pipeline with Error Recovery:**

266

267

```javascript

268

async function resilientPipeline() {

269

const maxRetries = 3;

270

let attempt = 0;

271

272

while (attempt < maxRetries) {

273

try {

274

await pipelinePromise(

275

createSource(),

276

createTransform(),

277

createDestination()

278

);

279

280

console.log('Pipeline succeeded');

281

break;

282

} catch (err) {

283

attempt++;

284

console.log(`Attempt ${attempt} failed:`, err.message);

285

286

if (attempt >= maxRetries) {

287

throw new Error(`Pipeline failed after ${maxRetries} attempts`);

288

}

289

290

// Wait before retry

291

await new Promise(resolve => setTimeout(resolve, 1000 * attempt));

292

}

293

}

294

}

295

```

296

297

**Pipeline with Monitoring:**

298

299

```javascript

300

function createMonitoredPipeline() {

301

const source = Readable.from(['data1', 'data2', 'data3']);

302

const transform = new Transform({

303

transform(data, cb) {

304

console.log('Transforming:', data.toString());

305

cb(null, data.toString().toUpperCase());

306

}

307

});

308

const destination = new Writable({

309

write(data, cb) {

310

console.log('Writing:', data.toString());

311

cb();

312

}

313

});

314

315

// Monitor each stream

316

[source, transform, destination].forEach((stream, index) => {

317

stream.on('error', (err) => {

318

console.error(`Stream ${index} error:`, err.message);

319

});

320

321

stream.on('close', () => {

322

console.log(`Stream ${index} closed`);

323

});

324

});

325

326

return pipeline(source, transform, destination, (err) => {

327

if (err) {

328

console.error('Pipeline error:', err.message);

329

} else {

330

console.log('Pipeline completed successfully');

331

}

332

});

333

}

334

```

335

336

**Dynamic Pipeline Construction:**

337

338

```javascript

339

function createDynamicPipeline(config) {

340

const streams = [

341

Readable.from(config.data)

342

];

343

344

// Add transforms based on configuration

345

if (config.uppercase) {

346

streams.push(new Transform({

347

transform(data, cb) {

348

cb(null, data.toString().toUpperCase());

349

}

350

}));

351

}

352

353

if (config.prefix) {

354

streams.push(new Transform({

355

transform(data, cb) {

356

cb(null, `${config.prefix}: ${data}`);

357

}

358

}));

359

}

360

361

if (config.filter) {

362

streams.push(new Transform({

363

transform(data, cb) {

364

if (data.toString().includes(config.filter)) {

365

cb(null, data);

366

} else {

367

cb(); // Skip this data

368

}

369

}

370

}));

371

}

372

373

// Add destination

374

streams.push(new Writable({

375

write(data, cb) {

376

console.log('Final output:', data.toString());

377

cb();

378

}

379

}));

380

381

return pipelinePromise(...streams);

382

}

383

384

// Usage

385

createDynamicPipeline({

386

data: ['hello', 'world', 'test'],

387

uppercase: true,

388

prefix: 'OUTPUT',

389

filter: 'world'

390

});

391

```

392

393

### Error Handling

394

395

Pipelines provide comprehensive error handling with automatic cleanup of all streams.

396

397

```javascript

398

const { pipeline } = require('streamx');

399

400

// Pipeline with error-prone transform

401

pipeline(

402

Readable.from(['good', 'error', 'data']),

403

new Transform({

404

transform(data, cb) {

405

if (data.toString() === 'error') {

406

return cb(new Error('Transform failed'));

407

}

408

cb(null, data.toString().toUpperCase());

409

}

410

}),

411

new Writable({

412

write(data, cb) {

413

console.log('Success:', data.toString());

414

cb();

415

}

416

}),

417

(err) => {

418

if (err) {

419

console.error('Pipeline failed:', err.message);

420

// All streams are automatically cleaned up

421

}

422

}

423

);

424

```