or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

duplex-transform.mdindex.mdpipeline.mdreadable.mdwritable.md

duplex-transform.mddocs/

0

# Duplex and Transform Streams

1

2

Duplex streams are both readable and writable, while Transform streams provide data transformation capabilities. StreamX provides enhanced implementations with proper lifecycle management and error handling.

3

4

## Capabilities

5

6

### Duplex Class

7

8

A duplex stream is both readable and writable, inheriting from Readable and implementing the Writable API.

9

10

```javascript { .api }

11

/**

12

* Creates a duplex stream that is both readable and writable

13

* @param options - Configuration options for the duplex stream

14

*/

15

class Duplex extends Readable {

16

constructor(options?: DuplexOptions);

17

18

// Inherits all Readable methods

19

_read(cb: () => void): void;

20

push(data: any): boolean;

21

read(): any;

22

23

// Implements Writable methods

24

_write(data: any, cb: (err?: Error) => void): void;

25

_writev(batch: any[], cb: (err?: Error) => void): void;

26

_final(cb: (err?: Error) => void): void;

27

write(data: any): boolean;

28

end(): Duplex;

29

30

// Shared lifecycle methods

31

_open(cb: (err?: Error) => void): void;

32

_destroy(cb: (err?: Error) => void): void;

33

_predestroy(): void;

34

destroy(err?: Error): void;

35

}

36

37

interface DuplexOptions extends ReadableOptions {

38

/** Map function for readable side only */

39

mapReadable?: (data: any) => any;

40

41

/** ByteLength function for readable side only */

42

byteLengthReadable?: (data: any) => number;

43

44

/** Map function for writable side only */

45

mapWritable?: (data: any) => any;

46

47

/** ByteLength function for writable side only */

48

byteLengthWritable?: (data: any) => number;

49

50

/** Shorthand for _write method */

51

write?: (data: any, cb: (err?: Error) => void) => void;

52

53

/** Shorthand for _writev method */

54

writev?: (batch: any[], cb: (err?: Error) => void) => void;

55

56

/** Shorthand for _final method */

57

final?: (cb: (err?: Error) => void) => void;

58

}

59

```

60

61

**Usage Examples:**

62

63

```javascript

64

const { Duplex } = require('streamx');

65

66

// Basic duplex stream

67

const echo = new Duplex({

68

write(data, cb) {

69

// Echo data back to readable side

70

this.push(data);

71

cb();

72

},

73

74

read(cb) {

75

// Data is pushed from write side

76

cb();

77

}

78

});

79

80

// Write data and read it back

81

echo.write('Hello');

82

echo.write('World');

83

84

echo.on('data', (chunk) => {

85

console.log('Echoed:', chunk.toString());

86

});

87

88

// More complex duplex with separate read/write logic

89

const processor = new Duplex({

90

write(data, cb) {

91

console.log('Processing input:', data.toString());

92

// Process and push to readable side

93

this.push(`Processed: ${data}`);

94

cb();

95

},

96

97

read(cb) {

98

// Readable side is fed by write operations

99

cb();

100

},

101

102

final(cb) {

103

console.log('Processing complete');

104

this.push(null); // End readable side

105

cb();

106

}

107

});

108

```

109

110

### Transform Class

111

112

A transform stream is a duplex stream that transforms data from its writable side to its readable side.

113

114

```javascript { .api }

115

/**

116

* Creates a transform stream that maps input data to output data

117

* @param options - Configuration options for the transform stream

118

*/

119

class Transform extends Duplex {

120

constructor(options?: TransformOptions);

121

122

/** Override this method to implement data transformation */

123

_transform(data: any, cb: (err?: Error, output?: any) => void): void;

124

125

/** Override this method for final transformation operations */

126

_flush(cb: (err?: Error, output?: any) => void): void;

127

}

128

129

interface TransformOptions extends DuplexOptions {

130

/** Shorthand for _transform method */

131

transform?: (data: any, cb: (err?: Error, output?: any) => void) => void;

132

133

/** Shorthand for _flush method */

134

flush?: (cb: (err?: Error, output?: any) => void) => void;

135

}

136

```

137

138

**Usage Examples:**

139

140

```javascript

141

const { Transform } = require('streamx');

142

143

// Basic transformation

144

const upperCase = new Transform({

145

transform(data, cb) {

146

const transformed = data.toString().toUpperCase();

147

cb(null, transformed);

148

}

149

});

150

151

upperCase.write('hello');

152

upperCase.write('world');

153

upperCase.end();

154

155

upperCase.on('data', (chunk) => {

156

console.log('Uppercase:', chunk.toString());

157

});

158

159

// JSON parser transform

160

const jsonParser = new Transform({

161

transform(data, cb) {

162

try {

163

const parsed = JSON.parse(data.toString());

164

cb(null, parsed);

165

} catch (err) {

166

cb(err);

167

}

168

}

169

});

170

171

// Line-by-line processor

172

const lineProcessor = new Transform({

173

constructor() {

174

super();

175

this.buffer = '';

176

},

177

178

transform(chunk, cb) {

179

this.buffer += chunk.toString();

180

const lines = this.buffer.split('\n');

181

this.buffer = lines.pop(); // Keep incomplete line

182

183

lines.forEach(line => {

184

if (line.trim()) {

185

this.push(`Processed: ${line}\n`);

186

}

187

});

188

189

cb();

190

},

191

192

flush(cb) {

193

if (this.buffer.trim()) {

194

this.push(`Processed: ${this.buffer}\n`);

195

}

196

cb();

197

}

198

});

199

```

200

201

### PassThrough Class

202

203

A PassThrough stream is a Transform stream that passes data through unchanged.

204

205

```javascript { .api }

206

/**

207

* Creates a pass-through stream (identity transform)

208

* @param options - Configuration options for the pass-through stream

209

*/

210

class PassThrough extends Transform {

211

constructor(options?: TransformOptions);

212

// Automatically passes data through without transformation

213

}

214

```

215

216

**Usage Examples:**

217

218

```javascript

219

const { PassThrough } = require('streamx');

220

221

// Basic pass-through

222

const passThrough = new PassThrough();

223

224

passThrough.write('data flows through');

225

passThrough.end();

226

227

passThrough.on('data', (chunk) => {

228

console.log('Passed through:', chunk.toString());

229

});

230

231

// Use as a proxy with monitoring

232

const monitor = new PassThrough();

233

234

monitor.on('data', (chunk) => {

235

console.log(`Data passing through: ${chunk.length} bytes`);

236

});

237

238

// Pipe data through the monitor

239

someReadable.pipe(monitor).pipe(someWritable);

240

```

241

242

### Advanced Transform Patterns

243

244

StreamX transforms support advanced patterns for complex data processing.

245

246

**Buffering Transform:**

247

248

```javascript

249

const bufferingTransform = new Transform({

250

constructor() {

251

super();

252

this.chunks = [];

253

this.totalSize = 0;

254

},

255

256

transform(chunk, cb) {

257

this.chunks.push(chunk);

258

this.totalSize += chunk.length;

259

260

// Emit when we have enough data

261

if (this.totalSize >= 1024) {

262

const combined = Buffer.concat(this.chunks);

263

this.chunks = [];

264

this.totalSize = 0;

265

cb(null, combined);

266

} else {

267

cb();

268

}

269

},

270

271

flush(cb) {

272

if (this.chunks.length > 0) {

273

const combined = Buffer.concat(this.chunks);

274

cb(null, combined);

275

} else {

276

cb();

277

}

278

}

279

});

280

```

281

282

**Async Transform:**

283

284

```javascript

285

const asyncTransform = new Transform({

286

async transform(data, cb) {

287

try {

288

// Simulate async operation

289

const processed = await processDataAsync(data.toString());

290

cb(null, processed);

291

} catch (err) {

292

cb(err);

293

}

294

}

295

});

296

297

async function processDataAsync(data) {

298

return new Promise((resolve) => {

299

setTimeout(() => {

300

resolve(`Async processed: ${data}`);

301

}, 100);

302

});

303

}

304

```

305

306

**Multi-output Transform:**

307

308

```javascript

309

const multiOutput = new Transform({

310

transform(data, cb) {

311

const input = data.toString();

312

313

// Push multiple outputs for single input

314

this.push(`Original: ${input}`);

315

this.push(`Reversed: ${input.split('').reverse().join('')}`);

316

this.push(`Length: ${input.length}`);

317

318

cb(); // Don't pass data to cb, we used push instead

319

}

320

});

321

```

322

323

### Error Handling

324

325

Transform streams include comprehensive error handling with proper cleanup.

326

327

```javascript

328

const errorHandlingTransform = new Transform({

329

transform(data, cb) {

330

try {

331

if (data.toString().includes('poison')) {

332

throw new Error('Poisoned data detected');

333

}

334

335

const result = data.toString().toUpperCase();

336

cb(null, result);

337

} catch (err) {

338

cb(err); // Pass error to callback

339

}

340

}

341

});

342

343

errorHandlingTransform.on('error', (err) => {

344

console.error('Transform error:', err.message);

345

});

346

347

errorHandlingTransform.on('close', () => {

348

console.log('Transform stream closed');

349

});

350

351

// This will cause an error

352

errorHandlingTransform.write('poison pill');

353

```

354

355

### Events

356

357

Duplex and Transform streams emit events from both readable and writable sides.

358

359

```javascript { .api }

360

interface DuplexTransformEvents {

361

// Readable events

362

'readable': () => void;

363

'data': (chunk: any) => void;

364

'end': () => void;

365

366

// Writable events

367

'drain': () => void;

368

'finish': () => void;

369

370

// Shared events

371

'close': () => void;

372

'error': (err: Error) => void;

373

'pipe': (src: Readable) => void;

374

'piping': (dest: Writable) => void;

375

}

376

```