0
# Duplex and Transform Streams
1
2
Duplex streams are both readable and writable, while Transform streams provide data transformation capabilities. StreamX provides enhanced implementations with proper lifecycle management and error handling.
3
4
## Capabilities
5
6
### Duplex Class
7
8
A duplex stream is both readable and writable, inheriting from Readable and implementing the Writable API.
9
10
```javascript { .api }
11
/**
12
* Creates a duplex stream that is both readable and writable
13
* @param options - Configuration options for the duplex stream
14
*/
15
class Duplex extends Readable {
16
constructor(options?: DuplexOptions);
17
18
// Inherits all Readable methods
19
_read(cb: () => void): void;
20
push(data: any): boolean;
21
read(): any;
22
23
// Implements Writable methods
24
_write(data: any, cb: (err?: Error) => void): void;
25
_writev(batch: any[], cb: (err?: Error) => void): void;
26
_final(cb: (err?: Error) => void): void;
27
write(data: any): boolean;
28
end(): Duplex;
29
30
// Shared lifecycle methods
31
_open(cb: (err?: Error) => void): void;
32
_destroy(cb: (err?: Error) => void): void;
33
_predestroy(): void;
34
destroy(err?: Error): void;
35
}
36
37
interface DuplexOptions extends ReadableOptions {
38
/** Map function for readable side only */
39
mapReadable?: (data: any) => any;
40
41
/** ByteLength function for readable side only */
42
byteLengthReadable?: (data: any) => number;
43
44
/** Map function for writable side only */
45
mapWritable?: (data: any) => any;
46
47
/** ByteLength function for writable side only */
48
byteLengthWritable?: (data: any) => number;
49
50
/** Shorthand for _write method */
51
write?: (data: any, cb: (err?: Error) => void) => void;
52
53
/** Shorthand for _writev method */
54
writev?: (batch: any[], cb: (err?: Error) => void) => void;
55
56
/** Shorthand for _final method */
57
final?: (cb: (err?: Error) => void) => void;
58
}
59
```
60
61
**Usage Examples:**
62
63
```javascript
64
const { Duplex } = require('streamx');
65
66
// Basic duplex stream
67
const echo = new Duplex({
68
write(data, cb) {
69
// Echo data back to readable side
70
this.push(data);
71
cb();
72
},
73
74
read(cb) {
75
// Data is pushed from write side
76
cb();
77
}
78
});
79
80
// Write data and read it back
81
echo.write('Hello');
82
echo.write('World');
83
84
echo.on('data', (chunk) => {
85
console.log('Echoed:', chunk.toString());
86
});
87
88
// More complex duplex with separate read/write logic
89
const processor = new Duplex({
90
write(data, cb) {
91
console.log('Processing input:', data.toString());
92
// Process and push to readable side
93
this.push(`Processed: ${data}`);
94
cb();
95
},
96
97
read(cb) {
98
// Readable side is fed by write operations
99
cb();
100
},
101
102
final(cb) {
103
console.log('Processing complete');
104
this.push(null); // End readable side
105
cb();
106
}
107
});
108
```
109
110
### Transform Class
111
112
A transform stream is a duplex stream that transforms data from its writable side to its readable side.
113
114
```javascript { .api }
115
/**
116
* Creates a transform stream that maps input data to output data
117
* @param options - Configuration options for the transform stream
118
*/
119
class Transform extends Duplex {
120
constructor(options?: TransformOptions);
121
122
/** Override this method to implement data transformation */
123
_transform(data: any, cb: (err?: Error, output?: any) => void): void;
124
125
/** Override this method for final transformation operations */
126
_flush(cb: (err?: Error, output?: any) => void): void;
127
}
128
129
interface TransformOptions extends DuplexOptions {
130
/** Shorthand for _transform method */
131
transform?: (data: any, cb: (err?: Error, output?: any) => void) => void;
132
133
/** Shorthand for _flush method */
134
flush?: (cb: (err?: Error, output?: any) => void) => void;
135
}
136
```
137
138
**Usage Examples:**
139
140
```javascript
141
const { Transform } = require('streamx');
142
143
// Basic transformation
144
const upperCase = new Transform({
145
transform(data, cb) {
146
const transformed = data.toString().toUpperCase();
147
cb(null, transformed);
148
}
149
});
150
151
upperCase.write('hello');
152
upperCase.write('world');
153
upperCase.end();
154
155
upperCase.on('data', (chunk) => {
156
console.log('Uppercase:', chunk.toString());
157
});
158
159
// JSON parser transform
160
const jsonParser = new Transform({
161
transform(data, cb) {
162
try {
163
const parsed = JSON.parse(data.toString());
164
cb(null, parsed);
165
} catch (err) {
166
cb(err);
167
}
168
}
169
});
170
171
// Line-by-line processor
172
const lineProcessor = new Transform({
173
constructor() {
174
super();
175
this.buffer = '';
176
},
177
178
transform(chunk, cb) {
179
this.buffer += chunk.toString();
180
const lines = this.buffer.split('\n');
181
this.buffer = lines.pop(); // Keep incomplete line
182
183
lines.forEach(line => {
184
if (line.trim()) {
185
this.push(`Processed: ${line}\n`);
186
}
187
});
188
189
cb();
190
},
191
192
flush(cb) {
193
if (this.buffer.trim()) {
194
this.push(`Processed: ${this.buffer}\n`);
195
}
196
cb();
197
}
198
});
199
```
200
201
### PassThrough Class
202
203
A PassThrough stream is a Transform stream that passes data through unchanged.
204
205
```javascript { .api }
206
/**
207
* Creates a pass-through stream (identity transform)
208
* @param options - Configuration options for the pass-through stream
209
*/
210
class PassThrough extends Transform {
211
constructor(options?: TransformOptions);
212
// Automatically passes data through without transformation
213
}
214
```
215
216
**Usage Examples:**
217
218
```javascript
219
const { PassThrough } = require('streamx');
220
221
// Basic pass-through
222
const passThrough = new PassThrough();
223
224
passThrough.write('data flows through');
225
passThrough.end();
226
227
passThrough.on('data', (chunk) => {
228
console.log('Passed through:', chunk.toString());
229
});
230
231
// Use as a proxy with monitoring
232
const monitor = new PassThrough();
233
234
monitor.on('data', (chunk) => {
235
console.log(`Data passing through: ${chunk.length} bytes`);
236
});
237
238
// Pipe data through the monitor
239
someReadable.pipe(monitor).pipe(someWritable);
240
```
241
242
### Advanced Transform Patterns
243
244
StreamX transforms support advanced patterns for complex data processing.
245
246
**Buffering Transform:**
247
248
```javascript
249
const bufferingTransform = new Transform({
250
constructor() {
251
super();
252
this.chunks = [];
253
this.totalSize = 0;
254
},
255
256
transform(chunk, cb) {
257
this.chunks.push(chunk);
258
this.totalSize += chunk.length;
259
260
// Emit when we have enough data
261
if (this.totalSize >= 1024) {
262
const combined = Buffer.concat(this.chunks);
263
this.chunks = [];
264
this.totalSize = 0;
265
cb(null, combined);
266
} else {
267
cb();
268
}
269
},
270
271
flush(cb) {
272
if (this.chunks.length > 0) {
273
const combined = Buffer.concat(this.chunks);
274
cb(null, combined);
275
} else {
276
cb();
277
}
278
}
279
});
280
```
281
282
**Async Transform:**
283
284
```javascript
285
const asyncTransform = new Transform({
286
async transform(data, cb) {
287
try {
288
// Simulate async operation
289
const processed = await processDataAsync(data.toString());
290
cb(null, processed);
291
} catch (err) {
292
cb(err);
293
}
294
}
295
});
296
297
async function processDataAsync(data) {
298
return new Promise((resolve) => {
299
setTimeout(() => {
300
resolve(`Async processed: ${data}`);
301
}, 100);
302
});
303
}
304
```
305
306
**Multi-output Transform:**
307
308
```javascript
309
const multiOutput = new Transform({
310
transform(data, cb) {
311
const input = data.toString();
312
313
// Push multiple outputs for single input
314
this.push(`Original: ${input}`);
315
this.push(`Reversed: ${input.split('').reverse().join('')}`);
316
this.push(`Length: ${input.length}`);
317
318
cb(); // Don't pass data to cb, we used push instead
319
}
320
});
321
```
322
323
### Error Handling
324
325
Transform streams include comprehensive error handling with proper cleanup.
326
327
```javascript
328
const errorHandlingTransform = new Transform({
329
transform(data, cb) {
330
try {
331
if (data.toString().includes('poison')) {
332
throw new Error('Poisoned data detected');
333
}
334
335
const result = data.toString().toUpperCase();
336
cb(null, result);
337
} catch (err) {
338
cb(err); // Pass error to callback
339
}
340
}
341
});
342
343
errorHandlingTransform.on('error', (err) => {
344
console.error('Transform error:', err.message);
345
});
346
347
errorHandlingTransform.on('close', () => {
348
console.log('Transform stream closed');
349
});
350
351
// This will cause an error
352
errorHandlingTransform.write('poison pill');
353
```
354
355
### Events
356
357
Duplex and Transform streams emit events from both readable and writable sides.
358
359
```javascript { .api }
360
interface DuplexTransformEvents {
361
// Readable events
362
'readable': () => void;
363
'data': (chunk: any) => void;
364
'end': () => void;
365
366
// Writable events
367
'drain': () => void;
368
'finish': () => void;
369
370
// Shared events
371
'close': () => void;
372
'error': (err: Error) => void;
373
'pipe': (src: Readable) => void;
374
'piping': (dest: Writable) => void;
375
}
376
```