0
# Duplex Streams
1
2
Duplex streams that are both readable and writable, inheriting from Readable while implementing the complete Writable interface.
3
4
## Capabilities
5
6
### Duplex Constructor
7
8
Creates a new Duplex stream with configuration options for both readable and writable sides.
9
10
```javascript { .api }
11
/**
12
* Creates a new Duplex stream
13
* @param opts - Configuration options for both readable and writable functionality
14
*/
15
class Duplex extends Readable {
16
constructor(opts?: DuplexOptions);
17
}
18
19
interface DuplexOptions extends ReadableOptions {
20
/** Map function for readable side only */
21
mapReadable?: (data: any) => any;
22
/** ByteLength function for readable side only */
23
byteLengthReadable?: (data: any) => number;
24
/** Map function for writable side only */
25
mapWritable?: (data: any) => any;
26
/** ByteLength function for writable side only */
27
byteLengthWritable?: (data: any) => number;
28
/** Write function shorthand */
29
write?: (data: any, cb: (error?: Error) => void) => void;
30
/** Final function shorthand */
31
final?: (cb: (error?: Error) => void) => void;
32
}
33
```
34
35
**Usage Example:**
36
37
```javascript
38
const { Duplex } = require("@mafintosh/streamx");
39
40
const duplex = new Duplex({
41
read(cb) {
42
this.push(`read-${Date.now()}`);
43
cb(null);
44
},
45
write(data, cb) {
46
console.log("Writing:", data.toString());
47
cb(null);
48
}
49
});
50
```
51
52
### Readable Side
53
54
All methods and events from Readable streams are available:
55
56
#### Data Production Methods
57
58
```javascript { .api }
59
/**
60
* Called when stream wants new data for reading
61
* @param cb - Callback to call when read operation is complete
62
*/
63
_read(cb: (error?: Error) => void): void;
64
65
/**
66
* Push data to the readable buffer
67
* @param data - Data to push, or null to end readable side
68
* @returns True if buffer is not full
69
*/
70
push(data: any): boolean;
71
72
/**
73
* Read data from the readable buffer
74
* @returns Data from buffer, or null if empty
75
*/
76
read(): any;
77
```
78
79
### Writable Side
80
81
All methods and events from Writable streams are available:
82
83
#### Data Consumption Methods
84
85
```javascript { .api }
86
/**
87
* Called when stream wants to write data
88
* @param data - Data to write
89
* @param callback - Callback to call when write is complete
90
*/
91
_write(data: any, callback: (error?: Error) => void): void;
92
93
/**
94
* Write data to the stream
95
* @param data - Data to write
96
* @returns True if buffer is not full
97
*/
98
write(data: any): boolean;
99
100
/**
101
* End the writable side gracefully
102
* @param data - Optional final data to write before ending
103
*/
104
end(data?: any): void;
105
106
/**
107
* Called before finish event for cleanup
108
* @param callback - Callback to call when final is complete
109
*/
110
_final(callback: (error?: Error) => void): void;
111
```
112
113
### Combined Usage
114
115
**Echo Server Example:**
116
117
```javascript
118
const { Duplex } = require("@mafintosh/streamx");
119
120
class EchoStream extends Duplex {
121
constructor() {
122
super();
123
this.buffer = [];
124
}
125
126
_read(cb) {
127
if (this.buffer.length > 0) {
128
this.push(this.buffer.shift());
129
}
130
cb(null);
131
}
132
133
_write(data, cb) {
134
// Echo written data back to readable side
135
this.buffer.push(`Echo: ${data}`);
136
this.push(`Echo: ${data}`);
137
cb(null);
138
}
139
}
140
141
const echo = new EchoStream();
142
143
// Write data
144
echo.write("Hello");
145
echo.write("World");
146
147
// Read echoed data
148
echo.on('data', (chunk) => {
149
console.log('Received:', chunk.toString());
150
});
151
152
echo.end();
153
```
154
155
**Proxy Stream Example:**
156
157
```javascript
158
const { Duplex } = require("@mafintosh/streamx");
159
160
class ProxyStream extends Duplex {
161
constructor(target) {
162
super();
163
this.target = target;
164
165
// Forward data from target to our readable side
166
this.target.on('data', (chunk) => {
167
this.push(chunk);
168
});
169
170
this.target.on('end', () => {
171
this.push(null);
172
});
173
}
174
175
_write(data, cb) {
176
// Forward written data to target
177
this.target.write(data);
178
cb(null);
179
}
180
181
_final(cb) {
182
this.target.end();
183
cb(null);
184
}
185
}
186
```
187
188
**Bidirectional Transform Example:**
189
190
```javascript
191
const { Duplex } = require("@mafintosh/streamx");
192
193
class BidirectionalTransform extends Duplex {
194
constructor() {
195
super();
196
this.readCounter = 0;
197
this.writeCounter = 0;
198
}
199
200
_read(cb) {
201
// Generate data for reading
202
if (this.readCounter < 5) {
203
this.push(`generated-${this.readCounter++}`);
204
} else {
205
this.push(null);
206
}
207
cb(null);
208
}
209
210
_write(data, cb) {
211
// Process written data
212
const processed = data.toString().toUpperCase();
213
console.log(`Processed write #${this.writeCounter++}:`, processed);
214
cb(null);
215
}
216
}
217
218
const transform = new BidirectionalTransform();
219
220
// Read generated data
221
transform.on('data', (chunk) => {
222
console.log('Read:', chunk.toString());
223
});
224
225
// Write data to be processed
226
transform.write("hello");
227
transform.write("world");
228
transform.end();
229
```
230
231
### Events
232
233
Duplex streams emit events from both Readable and Writable:
234
235
#### Readable Events
236
- **`readable`** - Data available to read
237
- **`data`** - Data chunk read (auto-resumes stream)
238
- **`end`** - Readable side ended
239
240
#### Writable Events
241
- **`drain`** - Buffer drained, safe to write more
242
- **`finish`** - All writes completed
243
244
#### Shared Events
245
- **`close`** - Stream fully closed
246
- **`error`** - Error occurred
247
248
**Complete Event Handling Example:**
249
250
```javascript
251
const { Duplex } = require("@mafintosh/streamx");
252
253
const duplex = new Duplex({
254
read(cb) {
255
this.push(`data-${Date.now()}`);
256
setTimeout(() => cb(null), 100);
257
},
258
write(data, cb) {
259
console.log("Writing:", data.toString());
260
setTimeout(() => cb(null), 50);
261
}
262
});
263
264
// Readable events
265
duplex.on('readable', () => {
266
console.log('Readable event');
267
});
268
269
duplex.on('data', (chunk) => {
270
console.log('Data:', chunk.toString());
271
});
272
273
duplex.on('end', () => {
274
console.log('Readable end');
275
});
276
277
// Writable events
278
duplex.on('drain', () => {
279
console.log('Drain event');
280
});
281
282
duplex.on('finish', () => {
283
console.log('Writable finish');
284
});
285
286
// Shared events
287
duplex.on('close', () => {
288
console.log('Stream closed');
289
});
290
291
duplex.on('error', (err) => {
292
console.error('Error:', err);
293
});
294
295
// Use both sides
296
duplex.write("test data");
297
duplex.end();
298
```
299
300
### Advanced Configuration
301
302
**Separate Readable/Writable Configuration:**
303
304
```javascript
305
const duplex = new Duplex({
306
// Readable configuration
307
highWaterMark: 8192,
308
mapReadable: (data) => data.toString().toUpperCase(),
309
byteLengthReadable: (data) => Buffer.byteLength(data),
310
311
// Writable configuration
312
mapWritable: (data) => Buffer.from(data),
313
byteLengthWritable: (data) => data.length,
314
315
// Implementation
316
read(cb) {
317
this.push("readable data");
318
cb(null);
319
},
320
321
write(data, cb) {
322
console.log("Wrote:", data);
323
cb(null);
324
}
325
});
326
```