0
# Writable Streams
1
2
Writable stream implementation providing data consumption, buffering, and backpressure management with proper lifecycle handling.
3
4
## Capabilities
5
6
### Writable Constructor
7
8
Creates a new Writable stream with configuration options.
9
10
```javascript { .api }
11
/**
12
* Creates a new Writable stream
13
* @param opts - Configuration options for the writable stream
14
*/
15
class Writable extends Stream {
16
constructor(opts?: WritableOptions);
17
}
18
19
interface WritableOptions extends StreamOptions {
20
/** Maximum buffer size in bytes (default: 16384) */
21
highWaterMark?: number;
22
/** Function to map input data */
23
map?: (data: any) => any;
24
/** Function to calculate byte size of data */
25
byteLength?: (data: any) => number;
26
/** Write function shorthand */
27
write?: (data: any, cb: (error?: Error) => void) => void;
28
/** Final function shorthand */
29
final?: (cb: (error?: Error) => void) => void;
30
}
31
```
32
33
**Usage Example:**
34
35
```javascript
36
const { Writable } = require("@mafintosh/streamx");
37
38
const writable = new Writable({
39
write(data, cb) {
40
console.log("Writing:", data.toString());
41
cb(null);
42
}
43
});
44
```
45
46
### Data Writing
47
48
#### _write Method
49
50
Called when the stream wants to write data. Override to implement write logic.
51
52
```javascript { .api }
53
/**
54
* Called when stream wants to write data
55
* @param data - Data to write
56
* @param callback - Callback to call when write operation is complete
57
*/
58
_write(data: any, callback: (error?: Error) => void): void;
59
```
60
61
**Usage Example:**
62
63
```javascript
64
const fs = require('fs');
65
66
class FileWritable extends Writable {
67
constructor(filename) {
68
super();
69
this.filename = filename;
70
this.fd = null;
71
}
72
73
_open(cb) {
74
fs.open(this.filename, 'w', (err, fd) => {
75
if (err) return cb(err);
76
this.fd = fd;
77
cb(null);
78
});
79
}
80
81
_write(data, cb) {
82
fs.write(this.fd, data, (err) => {
83
if (err) return cb(err);
84
cb(null);
85
});
86
}
87
88
_destroy(cb) {
89
if (this.fd) {
90
fs.close(this.fd, cb);
91
} else {
92
cb(null);
93
}
94
}
95
}
96
```
97
98
#### write Method
99
100
Writes data to the stream.
101
102
```javascript { .api }
103
/**
104
* Write data to the stream
105
* @param data - Data to write
106
* @returns True if buffer is not full, false if backpressure should be applied
107
*/
108
write(data: any): boolean;
109
```
110
111
**Usage Example:**
112
113
```javascript
114
const writable = new Writable({
115
write(data, cb) {
116
console.log("Received:", data.toString());
117
cb(null);
118
}
119
});
120
121
const canContinue = writable.write("Hello World");
122
if (!canContinue) {
123
// Wait for 'drain' event before writing more
124
writable.once('drain', () => {
125
writable.write("More data");
126
});
127
}
128
```
129
130
### Stream Finalization
131
132
#### end Method
133
134
Ends the writable stream gracefully.
135
136
```javascript { .api }
137
/**
138
* End the writable stream gracefully
139
* @param data - Optional final data to write before ending
140
*/
141
end(data?: any): void;
142
```
143
144
**Usage Example:**
145
146
```javascript
147
const writable = new Writable({
148
write(data, cb) {
149
console.log("Writing:", data.toString());
150
cb(null);
151
}
152
});
153
154
writable.write("First chunk");
155
writable.write("Second chunk");
156
writable.end("Final chunk"); // Write final data and end
157
158
// Or end without final data
159
// writable.end();
160
161
writable.on('finish', () => {
162
console.log('All writes completed');
163
});
164
```
165
166
#### _final Method
167
168
Called just before the 'finish' event when all writes have been processed.
169
170
```javascript { .api }
171
/**
172
* Called before finish event for final cleanup
173
* @param callback - Callback to call when final is complete
174
*/
175
_final(callback: (error?: Error) => void): void;
176
```
177
178
**Usage Example:**
179
180
```javascript
181
class BufferedWritable extends Writable {
182
constructor() {
183
super();
184
this.buffer = [];
185
}
186
187
_write(data, cb) {
188
this.buffer.push(data);
189
cb(null);
190
}
191
192
_final(cb) {
193
// Final processing of remaining buffer
194
console.log("Final processing buffer:", this.buffer);
195
this.buffer = [];
196
cb(null);
197
}
198
}
199
```
200
201
### Events
202
203
#### drain Event
204
205
Emitted when the buffer has been drained after being full.
206
207
```javascript
208
writable.on('drain', () => {
209
// Buffer is no longer full, safe to write more
210
});
211
```
212
213
#### finish Event
214
215
Emitted when the stream has been ended and all writes have been processed.
216
217
```javascript
218
writable.on('finish', () => {
219
console.log('All writes completed');
220
});
221
```
222
223
#### close Event
224
225
Emitted when the stream has fully closed.
226
227
```javascript
228
writable.on('close', () => {
229
console.log('Stream closed');
230
});
231
```
232
233
#### error Event
234
235
Emitted when an error occurs.
236
237
```javascript
238
writable.on('error', (err) => {
239
console.error('Stream error:', err);
240
});
241
```
242
243
### Backpressure Handling
244
245
**Complete Usage Example:**
246
247
```javascript
248
const { Writable } = require("@mafintosh/streamx");
249
250
const writable = new Writable({
251
highWaterMark: 1024, // 1KB buffer
252
write(data, cb) {
253
// Simulate slow write operation
254
setTimeout(() => {
255
console.log("Wrote:", data.toString());
256
cb(null);
257
}, 100);
258
},
259
final(cb) {
260
console.log("Final processing of any remaining data");
261
cb(null);
262
}
263
});
264
265
function writeData(data) {
266
const canContinue = writable.write(data);
267
if (!canContinue) {
268
console.log("Buffer full, waiting for drain...");
269
writable.once('drain', () => {
270
console.log("Buffer drained, can continue writing");
271
});
272
}
273
}
274
275
// Write some data
276
writeData("First chunk");
277
writeData("Second chunk");
278
writeData("Third chunk");
279
280
// End the stream
281
writable.end();
282
283
writable.on('finish', () => {
284
console.log('All writes completed');
285
});
286
287
writable.on('close', () => {
288
console.log('Stream closed');
289
});
290
291
writable.on('error', (err) => {
292
console.error('Error:', err);
293
});
294
```
295
296
### Integration with File System
297
298
```javascript
299
const fs = require('fs');
300
const { Writable } = require("@mafintosh/streamx");
301
302
class FileWriter extends Writable {
303
constructor(filename) {
304
super();
305
this.filename = filename;
306
this.fd = null;
307
}
308
309
_open(cb) {
310
fs.open(this.filename, 'w', (err, fd) => {
311
if (err) return cb(err);
312
this.fd = fd;
313
cb(null);
314
});
315
}
316
317
_write(data, cb) {
318
if (!Buffer.isBuffer(data)) {
319
data = Buffer.from(data);
320
}
321
fs.write(this.fd, data, 0, data.length, null, (err) => {
322
if (err) return cb(err);
323
cb(null);
324
});
325
}
326
327
_destroy(cb) {
328
if (this.fd) {
329
fs.close(this.fd, cb);
330
} else {
331
cb(null);
332
}
333
}
334
}
335
336
// Usage
337
const writer = new FileWriter('output.txt');
338
writer.write('Hello ');
339
writer.write('World!');
340
writer.end();
341
```